Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify the UDF Time Dimension #14434

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,8 @@
public class UDAFIntegral implements UDTF {

private static final String TIME_UNIT_KEY = "unit";
private static final String TIME_UNIT_MS = "1S";
private static final String TIME_UNIT_S = "1s";
private static final String TIME_UNIT_M = "1m";
private static final String TIME_UNIT_H = "1H";
private static final String TIME_UNIT_D = "1d";


long unitTime;
long lastTime = -1;
Expand All @@ -53,41 +50,19 @@ public void validate(UDFParameterValidator validator) throws Exception {
validator
.validateInputSeriesNumber(1)
.validate(
unit ->
TIME_UNIT_D.equals(unit)
|| TIME_UNIT_H.equals(unit)
|| TIME_UNIT_M.equals(unit)
|| TIME_UNIT_S.equals(unit)
|| TIME_UNIT_MS.equals(unit),
"Unknown time unit input",
validator.getParameters().getStringOrDefault(TIME_UNIT_KEY, TIME_UNIT_S));
x -> (long) x > 0,
"Unknown time unit input.",
Util.parseTime(
validator.getParameters().getStringOrDefault(TIME_UNIT_KEY, TIME_UNIT_S),
validator.getParameters()));
}

@Override
public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
throws Exception {
configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
switch (parameters.getStringOrDefault(TIME_UNIT_KEY, TIME_UNIT_S)) {
case TIME_UNIT_MS:
unitTime = 1L;
break;
case TIME_UNIT_S:
unitTime = 1000L;
break;
case TIME_UNIT_M:
unitTime = 60000L;
break;
case TIME_UNIT_H:
unitTime = 3600000L;
break;
case TIME_UNIT_D:
unitTime = 3600000L * 24L;
break;
default:
throw new UDFException(
"Unknown time unit input: "
+ parameters.getStringOrDefault(TIME_UNIT_KEY, TIME_UNIT_S));
}
unitTime =
Util.parseTime(parameters.getStringOrDefault(TIME_UNIT_KEY, TIME_UNIT_S), parameters);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@

/** This function does upsample or downsample of input series. */
public class UDTFResample implements UDTF {

private static final String START_PARAM = "start";
private Resampler resampler;

@Override
public void validate(UDFParameterValidator validator) throws Exception {

validator
.validateInputSeriesNumber(1)
.validateInputSeriesDataType(0, Type.DOUBLE, Type.FLOAT, Type.INT32, Type.INT64)
.validate(
x -> (long) x > 0,
"gap should be a time period whose unit is ms, s, m, h, d.",
Util.parseTime(validator.getParameters().getString("every")))
Util.parseTime(validator.getParameters().getString("every"), validator.getParameters()))
.validate(
x ->
"min".equals(x)
Expand Down Expand Up @@ -80,7 +80,7 @@ public void validate(UDFParameterValidator validator) throws Exception {
public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
throws Exception {
configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
long newPeriod = Util.parseTime(parameters.getString("every"));
long newPeriod = Util.parseTime(parameters.getString("every"), parameters);
String aggregator = parameters.getStringOrDefault("aggr", "mean").toLowerCase();
String interpolator = parameters.getStringOrDefault("interp", "nan").toLowerCase();
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

/** This function calculates completeness of input series. */
public class UDTFCompleteness implements UDTF {

private boolean downtime;

@Override
Expand All @@ -44,7 +45,7 @@ public void beforeStart(UDFParameters udfp, UDTFConfigurations udtfc) throws Exc
long window = Integer.MAX_VALUE;
if (udfp.hasAttribute("window")) {
String s = udfp.getString("window");
window = Util.parseTime(s);
window = Util.parseTime(s, udfp);
if (window > 0) {
isTime = true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void beforeStart(UDFParameters udfp, UDTFConfigurations udtfc) throws Exc
long window = Integer.MAX_VALUE;
if (udfp.hasAttribute("window")) {
String s = udfp.getString("window");
window = Util.parseTime(s);
window = Util.parseTime(s, udfp);
if (window > 0) {
isTime = true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@

/** This function calculates timeliness of input series. */
public class UDTFTimeliness implements UDTF {

@Override
public void beforeStart(UDFParameters udfp, UDTFConfigurations udtfc) throws Exception {
boolean isTime = false;
long window = Integer.MAX_VALUE;
if (udfp.hasAttribute("window")) {
String s = udfp.getString("window");
window = Util.parseTime(s);
window = Util.parseTime(s, udfp);
if (window > 0) {
isTime = true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,14 @@

/** This function calculates validity of input series. */
public class UDTFValidity implements UDTF {

@Override
public void beforeStart(UDFParameters udfp, UDTFConfigurations udtfc) throws Exception {
boolean isTime = false;
long window = Integer.MAX_VALUE;
if (udfp.hasAttribute("window")) {
String s = udfp.getString("window");
window = Util.parseTime(s);
window = Util.parseTime(s, udfp);
if (window > 0) {
isTime = true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,26 @@
/** This function is used for timestamp repair. */
public class UDTFTimestampRepair implements UDTF {
String intervalMethod;
int interval;
int intervalMode;
long interval;
long intervalMode;

@Override
public void validate(UDFParameterValidator validator) throws Exception {
validator
.validateInputSeriesNumber(1)
.validateInputSeriesDataType(0, Type.DOUBLE, Type.FLOAT, Type.INT32, Type.INT64)
.validate(
x -> (Integer) x >= 0,
"Interval should be a positive integer.",
validator.getParameters().getIntOrDefault("interval", 0));
.validateInputSeriesDataType(0, Type.DOUBLE, Type.FLOAT, Type.INT32, Type.INT64);
String intervalString = validator.getParameters().getStringOrDefault("interval", null);
if (intervalString != null) {
try {
long parsedInterval = Util.parseTime(intervalString, validator.getParameters());
validator.validate(
x -> parsedInterval > 0,
"Invalid time unit input. Supported units are ns, us, ms, s, m, h, d.");
} catch (Exception e) {
throw new UDFException(
"Invalid time format for interval.");
}
}
}

@Override
Expand All @@ -54,15 +62,20 @@ public void beforeStart(UDFParameters parameters, UDTFConfigurations configurati
.setAccessStrategy(new SlidingSizeWindowAccessStrategy(Integer.MAX_VALUE))
.setOutputDataType(parameters.getDataType(0));
intervalMethod = parameters.getStringOrDefault("method", "Median");
interval = parameters.getIntOrDefault("interval", 0);
String intervalString = parameters.getStringOrDefault("interval", null);
if (intervalString != null) {
interval = Util.parseTime(intervalString, parameters);
} else {
interval = 0;
}
if (interval > 0) {
intervalMode = interval;
} else if ("Median".equalsIgnoreCase(intervalMethod)) {
intervalMode = -1;
intervalMode = -1L;
} else if ("Mode".equalsIgnoreCase(intervalMethod)) {
intervalMode = -2;
intervalMode = -2L;
} else if ("Cluster".equalsIgnoreCase(intervalMethod)) {
intervalMode = -3;
intervalMode = -3L;
} else {
throw new UDFException("Illegal method.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,15 @@ public TimestampInterval(long[] time, double[] original) {

// get standard interval
// -1 median -2 mode -3 cluster
public long getInterval(int mode) {
switch (mode) {
case -1:
this.deltaT = getIntervalByMedian();
break;
case -2:
this.deltaT = getIntervalByMode();
break;
case -3:
this.deltaT = getIntervalByCluster();
break;
default:
this.deltaT = mode;
public long getInterval(long mode) {
if (mode == -1L) {
this.deltaT = getIntervalByMedian();
} else if (mode == -2L) {
this.deltaT = getIntervalByMode();
} else if (mode == -3L) {
this.deltaT = getIntervalByCluster();
} else {
this.deltaT = mode;
}
return this.deltaT;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class TimestampRepair {
protected long deltaT;
protected long start0;

public TimestampRepair(RowIterator dataIterator, int intervalMode, int startPointMode)
public TimestampRepair(RowIterator dataIterator, long intervalMode, int startPointMode)
throws Exception {
ArrayList<Long> timeList = new ArrayList<>();
ArrayList<Double> originList = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class UDTFIFFT implements UDTF {
private final DoubleArrayList real = new DoubleArrayList();
private final DoubleArrayList imag = new DoubleArrayList();
private final IntArrayList time = new IntArrayList();

private long start;
private long interval;

Expand All @@ -53,7 +54,9 @@ public void validate(UDFParameterValidator validator) throws Exception {
.validate(
x -> (long) x > 0,
"interval should be a time period whose unit is ms, s, m, h, d.",
Util.parseTime(validator.getParameters().getStringOrDefault("interval", "1s")));
Util.parseTime(
validator.getParameters().getStringOrDefault("interval", "1s"),
validator.getParameters()));
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
if (validator.getParameters().hasAttribute(START_PARAM)) {
validator.validate(
Expand All @@ -67,7 +70,7 @@ public void validate(UDFParameterValidator validator) throws Exception {
public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
throws Exception {
configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.DOUBLE);
this.interval = Util.parseTime(parameters.getStringOrDefault("interval", "1s"));
this.interval = Util.parseTime(parameters.getStringOrDefault("interval", "1s"), parameters);
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
this.start = 0;
if (parameters.hasAttribute(START_PARAM)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,15 @@ public void validate(UDFParameterValidator validator) throws Exception {
validator.validate(
x -> (long) x > 0,
"gap should be a time period whose unit is ms, s, m, h.",
Util.parseTime(validator.getParameters().getStringOrDefault("gap", "1ms")));
Util.parseTime(
validator.getParameters().getStringOrDefault("gap", "1ms"), validator.getParameters()));
}

@Override
public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
throws Exception {
configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.INT32);
long gap = Util.parseTime(parameters.getStringOrDefault("gap", "0ms"));
long gap = Util.parseTime(parameters.getStringOrDefault("gap", "0ms"), parameters);
consUtil = new ConsecutiveUtil(-gap, -gap, gap);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,19 +44,22 @@ public void validate(UDFParameterValidator validator) throws Exception {
.validate(
x -> (long) x > 0,
"gap should be a time period whose unit is ms, s, m, h.",
Util.parseTime(validator.getParameters().getStringOrDefault("gap", "1ms")))
Util.parseTime(
validator.getParameters().getStringOrDefault("gap", "1ms"),
validator.getParameters()))
.validate(
x -> (long) x > 0,
"length should be a time period whose unit is ms, s, m, h.",
Util.parseTime(validator.getParameters().getString("length")));
Util.parseTime(
validator.getParameters().getString("length"), validator.getParameters()));
}

@Override
public void beforeStart(UDFParameters parameters, UDTFConfigurations configurations)
throws Exception {
configurations.setAccessStrategy(new RowByRowAccessStrategy()).setOutputDataType(Type.INT32);
long gap = Util.parseTime(parameters.getStringOrDefault("gap", "0ms"));
len = Util.parseTime(parameters.getString("length"));
long gap = Util.parseTime(parameters.getStringOrDefault("gap", "0ms"), parameters);
len = Util.parseTime(parameters.getString("length"), parameters);
int count = gap == 0 ? 0 : (int) (len / gap + 1);
consUtil = new ConsecutiveUtil(-gap, -gap, gap);
consUtil.setCount(count);
Expand Down
Loading
Loading