[ML-DataFrame] Add a frequency option to transform config, default 1m (#44120)
Previously a data frame transform would check whether the source index was changed every 10 seconds. Sometimes it may be desirable for the check to be done less frequently. This commit increases the default to 60 seconds but also allows the frequency to be overridden by a setting in the data frame transform config.
This commit is contained in:
parent
64ff895a32
commit
cb62d4acdf
|
@ -25,6 +25,7 @@ import org.elasticsearch.client.dataframe.transforms.util.TimeUtil;
|
||||||
import org.elasticsearch.common.Nullable;
|
import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.ParseField;
|
import org.elasticsearch.common.ParseField;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||||
|
@ -44,6 +45,7 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
||||||
public static final ParseField ID = new ParseField("id");
|
public static final ParseField ID = new ParseField("id");
|
||||||
public static final ParseField SOURCE = new ParseField("source");
|
public static final ParseField SOURCE = new ParseField("source");
|
||||||
public static final ParseField DEST = new ParseField("dest");
|
public static final ParseField DEST = new ParseField("dest");
|
||||||
|
public static final ParseField FREQUENCY = new ParseField("frequency");
|
||||||
public static final ParseField DESCRIPTION = new ParseField("description");
|
public static final ParseField DESCRIPTION = new ParseField("description");
|
||||||
public static final ParseField SYNC = new ParseField("sync");
|
public static final ParseField SYNC = new ParseField("sync");
|
||||||
public static final ParseField VERSION = new ParseField("version");
|
public static final ParseField VERSION = new ParseField("version");
|
||||||
|
@ -54,6 +56,7 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
||||||
private final String id;
|
private final String id;
|
||||||
private final SourceConfig source;
|
private final SourceConfig source;
|
||||||
private final DestConfig dest;
|
private final DestConfig dest;
|
||||||
|
private final TimeValue frequency;
|
||||||
private final SyncConfig syncConfig;
|
private final SyncConfig syncConfig;
|
||||||
private final PivotConfig pivotConfig;
|
private final PivotConfig pivotConfig;
|
||||||
private final String description;
|
private final String description;
|
||||||
|
@ -66,14 +69,16 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
||||||
String id = (String) args[0];
|
String id = (String) args[0];
|
||||||
SourceConfig source = (SourceConfig) args[1];
|
SourceConfig source = (SourceConfig) args[1];
|
||||||
DestConfig dest = (DestConfig) args[2];
|
DestConfig dest = (DestConfig) args[2];
|
||||||
SyncConfig syncConfig = (SyncConfig) args[3];
|
TimeValue frequency = (TimeValue) args[3];
|
||||||
PivotConfig pivotConfig = (PivotConfig) args[4];
|
SyncConfig syncConfig = (SyncConfig) args[4];
|
||||||
String description = (String)args[5];
|
PivotConfig pivotConfig = (PivotConfig) args[5];
|
||||||
Instant createTime = (Instant)args[6];
|
String description = (String)args[6];
|
||||||
String transformVersion = (String)args[7];
|
Instant createTime = (Instant)args[7];
|
||||||
|
String transformVersion = (String)args[8];
|
||||||
return new DataFrameTransformConfig(id,
|
return new DataFrameTransformConfig(id,
|
||||||
source,
|
source,
|
||||||
dest,
|
dest,
|
||||||
|
frequency,
|
||||||
syncConfig,
|
syncConfig,
|
||||||
pivotConfig,
|
pivotConfig,
|
||||||
description,
|
description,
|
||||||
|
@ -85,6 +90,8 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
||||||
PARSER.declareString(constructorArg(), ID);
|
PARSER.declareString(constructorArg(), ID);
|
||||||
PARSER.declareObject(constructorArg(), (p, c) -> SourceConfig.PARSER.apply(p, null), SOURCE);
|
PARSER.declareObject(constructorArg(), (p, c) -> SourceConfig.PARSER.apply(p, null), SOURCE);
|
||||||
PARSER.declareObject(constructorArg(), (p, c) -> DestConfig.PARSER.apply(p, null), DEST);
|
PARSER.declareObject(constructorArg(), (p, c) -> DestConfig.PARSER.apply(p, null), DEST);
|
||||||
|
PARSER.declareField(optionalConstructorArg(), p -> TimeValue.parseTimeValue(p.text(), FREQUENCY.getPreferredName()),
|
||||||
|
FREQUENCY, ObjectParser.ValueType.STRING);
|
||||||
PARSER.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p), SYNC);
|
PARSER.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p), SYNC);
|
||||||
PARSER.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p), PIVOT_TRANSFORM);
|
PARSER.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p), PIVOT_TRANSFORM);
|
||||||
PARSER.declareString(optionalConstructorArg(), DESCRIPTION);
|
PARSER.declareString(optionalConstructorArg(), DESCRIPTION);
|
||||||
|
@ -118,12 +125,13 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
||||||
* @return A DataFrameTransformConfig to preview, NOTE it will have a {@code null} id, destination and index.
|
* @return A DataFrameTransformConfig to preview, NOTE it will have a {@code null} id, destination and index.
|
||||||
*/
|
*/
|
||||||
public static DataFrameTransformConfig forPreview(final SourceConfig source, final PivotConfig pivotConfig) {
|
public static DataFrameTransformConfig forPreview(final SourceConfig source, final PivotConfig pivotConfig) {
|
||||||
return new DataFrameTransformConfig(null, source, null, null, pivotConfig, null, null, null);
|
return new DataFrameTransformConfig(null, source, null, null, null, pivotConfig, null, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
DataFrameTransformConfig(final String id,
|
DataFrameTransformConfig(final String id,
|
||||||
final SourceConfig source,
|
final SourceConfig source,
|
||||||
final DestConfig dest,
|
final DestConfig dest,
|
||||||
|
final TimeValue frequency,
|
||||||
final SyncConfig syncConfig,
|
final SyncConfig syncConfig,
|
||||||
final PivotConfig pivotConfig,
|
final PivotConfig pivotConfig,
|
||||||
final String description,
|
final String description,
|
||||||
|
@ -132,6 +140,7 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
||||||
this.id = id;
|
this.id = id;
|
||||||
this.source = source;
|
this.source = source;
|
||||||
this.dest = dest;
|
this.dest = dest;
|
||||||
|
this.frequency = frequency;
|
||||||
this.syncConfig = syncConfig;
|
this.syncConfig = syncConfig;
|
||||||
this.pivotConfig = pivotConfig;
|
this.pivotConfig = pivotConfig;
|
||||||
this.description = description;
|
this.description = description;
|
||||||
|
@ -151,6 +160,10 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
||||||
return dest;
|
return dest;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public TimeValue getFrequency() {
|
||||||
|
return frequency;
|
||||||
|
}
|
||||||
|
|
||||||
public SyncConfig getSyncConfig() {
|
public SyncConfig getSyncConfig() {
|
||||||
return syncConfig;
|
return syncConfig;
|
||||||
}
|
}
|
||||||
|
@ -184,6 +197,9 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
||||||
if (dest != null) {
|
if (dest != null) {
|
||||||
builder.field(DEST.getPreferredName(), dest);
|
builder.field(DEST.getPreferredName(), dest);
|
||||||
}
|
}
|
||||||
|
if (frequency != null) {
|
||||||
|
builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
|
||||||
|
}
|
||||||
if (syncConfig != null) {
|
if (syncConfig != null) {
|
||||||
builder.startObject(SYNC.getPreferredName());
|
builder.startObject(SYNC.getPreferredName());
|
||||||
builder.field(syncConfig.getName(), syncConfig);
|
builder.field(syncConfig.getName(), syncConfig);
|
||||||
|
@ -220,6 +236,7 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
||||||
return Objects.equals(this.id, that.id)
|
return Objects.equals(this.id, that.id)
|
||||||
&& Objects.equals(this.source, that.source)
|
&& Objects.equals(this.source, that.source)
|
||||||
&& Objects.equals(this.dest, that.dest)
|
&& Objects.equals(this.dest, that.dest)
|
||||||
|
&& Objects.equals(this.frequency, that.frequency)
|
||||||
&& Objects.equals(this.description, that.description)
|
&& Objects.equals(this.description, that.description)
|
||||||
&& Objects.equals(this.syncConfig, that.syncConfig)
|
&& Objects.equals(this.syncConfig, that.syncConfig)
|
||||||
&& Objects.equals(this.transformVersion, that.transformVersion)
|
&& Objects.equals(this.transformVersion, that.transformVersion)
|
||||||
|
@ -229,7 +246,7 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(id, source, dest, syncConfig, pivotConfig, description);
|
return Objects.hash(id, source, dest, frequency, syncConfig, pivotConfig, description);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -246,6 +263,7 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
||||||
private String id;
|
private String id;
|
||||||
private SourceConfig source;
|
private SourceConfig source;
|
||||||
private DestConfig dest;
|
private DestConfig dest;
|
||||||
|
private TimeValue frequency;
|
||||||
private SyncConfig syncConfig;
|
private SyncConfig syncConfig;
|
||||||
private PivotConfig pivotConfig;
|
private PivotConfig pivotConfig;
|
||||||
private String description;
|
private String description;
|
||||||
|
@ -265,6 +283,11 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Builder setFrequency(TimeValue frequency) {
|
||||||
|
this.frequency = frequency;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public Builder setSyncConfig(SyncConfig syncConfig) {
|
public Builder setSyncConfig(SyncConfig syncConfig) {
|
||||||
this.syncConfig = syncConfig;
|
this.syncConfig = syncConfig;
|
||||||
return this;
|
return this;
|
||||||
|
@ -281,7 +304,7 @@ public class DataFrameTransformConfig implements ToXContentObject {
|
||||||
}
|
}
|
||||||
|
|
||||||
public DataFrameTransformConfig build() {
|
public DataFrameTransformConfig build() {
|
||||||
return new DataFrameTransformConfig(id, source, dest, syncConfig, pivotConfig, description, null, null);
|
return new DataFrameTransformConfig(id, source, dest, frequency, syncConfig, pivotConfig, description, null, null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.client.dataframe.DataFrameNamedXContentProvider;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfigTests;
|
import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfigTests;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
import org.elasticsearch.search.SearchModule;
|
import org.elasticsearch.search.SearchModule;
|
||||||
|
@ -43,6 +44,7 @@ public class DataFrameTransformConfigTests extends AbstractXContentTestCase<Data
|
||||||
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10),
|
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10),
|
||||||
randomSourceConfig(),
|
randomSourceConfig(),
|
||||||
randomDestConfig(),
|
randomDestConfig(),
|
||||||
|
randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1000, 1000000)),
|
||||||
randomBoolean() ? null : randomSyncConfig(),
|
randomBoolean() ? null : randomSyncConfig(),
|
||||||
PivotConfigTests.randomPivotConfig(),
|
PivotConfigTests.randomPivotConfig(),
|
||||||
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 100),
|
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 100),
|
||||||
|
|
|
@ -156,8 +156,9 @@ public class DataFrameTransformDocumentationIT extends ESRestHighLevelClientTest
|
||||||
.setId("reviewer-avg-rating") // <1>
|
.setId("reviewer-avg-rating") // <1>
|
||||||
.setSource(sourceConfig) // <2>
|
.setSource(sourceConfig) // <2>
|
||||||
.setDest(destConfig) // <3>
|
.setDest(destConfig) // <3>
|
||||||
.setPivotConfig(pivotConfig) // <4>
|
.setFrequency(TimeValue.timeValueSeconds(15)) // <4>
|
||||||
.setDescription("This is my test transform") // <5>
|
.setPivotConfig(pivotConfig) // <5>
|
||||||
|
.setDescription("This is my test transform") // <6>
|
||||||
.build();
|
.build();
|
||||||
// end::put-data-frame-transform-config
|
// end::put-data-frame-transform-config
|
||||||
|
|
||||||
|
|
|
@ -34,8 +34,9 @@ include-tagged::{doc-tests-file}[{api}-config]
|
||||||
<1> The {dataframe-transform} ID
|
<1> The {dataframe-transform} ID
|
||||||
<2> The source indices and query from which to gather data
|
<2> The source indices and query from which to gather data
|
||||||
<3> The destination index and optional pipeline
|
<3> The destination index and optional pipeline
|
||||||
<4> The PivotConfig
|
<4> How often to check for updates to the source indices
|
||||||
<5> Optional free text description of the transform
|
<5> The PivotConfig
|
||||||
|
<6> Optional free text description of the transform
|
||||||
|
|
||||||
[id="{upid}-{api}-query-config"]
|
[id="{upid}-{api}-query-config"]
|
||||||
|
|
||||||
|
|
|
@ -60,6 +60,11 @@ IMPORTANT: You must use {kib} or this API to create a {dataframe-transform}.
|
||||||
(object) The destination configuration, which consists of `index` and
|
(object) The destination configuration, which consists of `index` and
|
||||||
optionally a `pipeline` id. See <<data-frame-transform-dest>>.
|
optionally a `pipeline` id. See <<data-frame-transform-dest>>.
|
||||||
|
|
||||||
|
`frequency` (Optional)::
|
||||||
|
(time units) The interval between checks for changes in the source indices
|
||||||
|
when the {dataframe-transform} is running continuously. Defaults to `1m`.
|
||||||
|
The lowest permitted value is `1s`; the highest `1h`.
|
||||||
|
|
||||||
`pivot` (Optional)::
|
`pivot` (Optional)::
|
||||||
(object) Defines the pivot function `group by` fields and the aggregation to
|
(object) Defines the pivot function `group by` fields and the aggregation to
|
||||||
reduce the data. See <<data-frame-transform-pivot>>.
|
reduce the data. See <<data-frame-transform-pivot>>.
|
||||||
|
@ -90,6 +95,7 @@ PUT _data_frame/transforms/ecommerce_transform
|
||||||
"index": "kibana_sample_data_ecommerce_transform",
|
"index": "kibana_sample_data_ecommerce_transform",
|
||||||
"pipeline": "add_timestamp_pipeline"
|
"pipeline": "add_timestamp_pipeline"
|
||||||
},
|
},
|
||||||
|
"frequency": "5m",
|
||||||
"pivot": {
|
"pivot": {
|
||||||
"group_by": {
|
"group_by": {
|
||||||
"customer_id": {
|
"customer_id": {
|
||||||
|
|
|
@ -26,6 +26,7 @@ public final class DataFrameField {
|
||||||
public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type");
|
public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type");
|
||||||
public static final ParseField SOURCE = new ParseField("source");
|
public static final ParseField SOURCE = new ParseField("source");
|
||||||
public static final ParseField DESTINATION = new ParseField("dest");
|
public static final ParseField DESTINATION = new ParseField("dest");
|
||||||
|
public static final ParseField FREQUENCY = new ParseField("frequency");
|
||||||
public static final ParseField FORCE = new ParseField("force");
|
public static final ParseField FORCE = new ParseField("force");
|
||||||
public static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size");
|
public static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size");
|
||||||
public static final ParseField FIELD = new ParseField("field");
|
public static final ParseField FIELD = new ParseField("field");
|
||||||
|
|
|
@ -13,6 +13,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.Writeable;
|
import org.elasticsearch.common.io.stream.Writeable;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
|
@ -34,6 +35,9 @@ public class PutDataFrameTransformAction extends ActionType<AcknowledgedResponse
|
||||||
public static final PutDataFrameTransformAction INSTANCE = new PutDataFrameTransformAction();
|
public static final PutDataFrameTransformAction INSTANCE = new PutDataFrameTransformAction();
|
||||||
public static final String NAME = "cluster:admin/data_frame/put";
|
public static final String NAME = "cluster:admin/data_frame/put";
|
||||||
|
|
||||||
|
private static final TimeValue MIN_FREQUENCY = TimeValue.timeValueSeconds(1);
|
||||||
|
private static final TimeValue MAX_FREQUENCY = TimeValue.timeValueHours(1);
|
||||||
|
|
||||||
private PutDataFrameTransformAction() {
|
private PutDataFrameTransformAction() {
|
||||||
super(NAME);
|
super(NAME);
|
||||||
}
|
}
|
||||||
|
@ -93,6 +97,19 @@ public class PutDataFrameTransformAction extends ActionType<AcknowledgedResponse
|
||||||
DataFrameMessages.getMessage(DataFrameMessages.ID_TOO_LONG, DataFrameStrings.ID_LENGTH_LIMIT),
|
DataFrameMessages.getMessage(DataFrameMessages.ID_TOO_LONG, DataFrameStrings.ID_LENGTH_LIMIT),
|
||||||
validationException);
|
validationException);
|
||||||
}
|
}
|
||||||
|
TimeValue frequency = config.getFrequency();
|
||||||
|
if (frequency != null) {
|
||||||
|
if (frequency.compareTo(MIN_FREQUENCY) < 0) {
|
||||||
|
validationException = addValidationError(
|
||||||
|
"minimum permitted [" + DataFrameField.FREQUENCY + "] is [" + MIN_FREQUENCY.getStringRep() + "]",
|
||||||
|
validationException);
|
||||||
|
} else if (frequency.compareTo(MAX_FREQUENCY) > 0) {
|
||||||
|
validationException = addValidationError(
|
||||||
|
"highest permitted [" + DataFrameField.FREQUENCY + "] is [" + MAX_FREQUENCY.getStringRep() + "]",
|
||||||
|
validationException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return validationException;
|
return validationException;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -11,6 +11,7 @@ import org.elasticsearch.cluster.AbstractDiffable;
|
||||||
import org.elasticsearch.common.ParseField;
|
import org.elasticsearch.common.ParseField;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
|
@ -24,25 +25,30 @@ public class DataFrameTransform extends AbstractDiffable<DataFrameTransform> imp
|
||||||
|
|
||||||
public static final String NAME = DataFrameField.TASK_NAME;
|
public static final String NAME = DataFrameField.TASK_NAME;
|
||||||
public static final ParseField VERSION = new ParseField(DataFrameField.VERSION);
|
public static final ParseField VERSION = new ParseField(DataFrameField.VERSION);
|
||||||
|
public static final ParseField FREQUENCY = DataFrameField.FREQUENCY;
|
||||||
|
|
||||||
private final String transformId;
|
private final String transformId;
|
||||||
private final Version version;
|
private final Version version;
|
||||||
|
private final TimeValue frequency;
|
||||||
|
|
||||||
public static final ConstructingObjectParser<DataFrameTransform, Void> PARSER = new ConstructingObjectParser<>(NAME,
|
public static final ConstructingObjectParser<DataFrameTransform, Void> PARSER = new ConstructingObjectParser<>(NAME, true,
|
||||||
a -> new DataFrameTransform((String) a[0], (String) a[1]));
|
a -> new DataFrameTransform((String) a[0], (String) a[1], (String) a[2]));
|
||||||
|
|
||||||
static {
|
static {
|
||||||
PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameField.ID);
|
PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameField.ID);
|
||||||
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), VERSION);
|
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), VERSION);
|
||||||
|
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FREQUENCY);
|
||||||
}
|
}
|
||||||
|
|
||||||
private DataFrameTransform(String transformId, String version) {
|
private DataFrameTransform(String transformId, String version, String frequency) {
|
||||||
this(transformId, version == null ? null : Version.fromString(version));
|
this(transformId, version == null ? null : Version.fromString(version),
|
||||||
|
frequency == null ? null : TimeValue.parseTimeValue(frequency, FREQUENCY.getPreferredName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
public DataFrameTransform(String transformId, Version version) {
|
public DataFrameTransform(String transformId, Version version, TimeValue frequency) {
|
||||||
this.transformId = transformId;
|
this.transformId = transformId;
|
||||||
this.version = version == null ? Version.V_7_2_0 : version;
|
this.version = version == null ? Version.V_7_2_0 : version;
|
||||||
|
this.frequency = frequency;
|
||||||
}
|
}
|
||||||
|
|
||||||
public DataFrameTransform(StreamInput in) throws IOException {
|
public DataFrameTransform(StreamInput in) throws IOException {
|
||||||
|
@ -52,6 +58,11 @@ public class DataFrameTransform extends AbstractDiffable<DataFrameTransform> imp
|
||||||
} else {
|
} else {
|
||||||
this.version = Version.V_7_2_0;
|
this.version = Version.V_7_2_0;
|
||||||
}
|
}
|
||||||
|
if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
|
||||||
|
this.frequency = in.readOptionalTimeValue();
|
||||||
|
} else {
|
||||||
|
this.frequency = null;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -70,6 +81,9 @@ public class DataFrameTransform extends AbstractDiffable<DataFrameTransform> imp
|
||||||
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
|
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
|
||||||
Version.writeVersion(version, out);
|
Version.writeVersion(version, out);
|
||||||
}
|
}
|
||||||
|
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
|
||||||
|
out.writeOptionalTimeValue(frequency);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -77,6 +91,9 @@ public class DataFrameTransform extends AbstractDiffable<DataFrameTransform> imp
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
builder.field(DataFrameField.ID.getPreferredName(), transformId);
|
builder.field(DataFrameField.ID.getPreferredName(), transformId);
|
||||||
builder.field(VERSION.getPreferredName(), version);
|
builder.field(VERSION.getPreferredName(), version);
|
||||||
|
if (frequency != null) {
|
||||||
|
builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep());
|
||||||
|
}
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
return builder;
|
return builder;
|
||||||
}
|
}
|
||||||
|
@ -89,6 +106,10 @@ public class DataFrameTransform extends AbstractDiffable<DataFrameTransform> imp
|
||||||
return version;
|
return version;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public TimeValue getFrequency() {
|
||||||
|
return frequency;
|
||||||
|
}
|
||||||
|
|
||||||
public static DataFrameTransform fromXContent(XContentParser parser) throws IOException {
|
public static DataFrameTransform fromXContent(XContentParser parser) throws IOException {
|
||||||
return PARSER.parse(parser, null);
|
return PARSER.parse(parser, null);
|
||||||
}
|
}
|
||||||
|
@ -105,11 +126,13 @@ public class DataFrameTransform extends AbstractDiffable<DataFrameTransform> imp
|
||||||
|
|
||||||
DataFrameTransform that = (DataFrameTransform) other;
|
DataFrameTransform that = (DataFrameTransform) other;
|
||||||
|
|
||||||
return Objects.equals(this.transformId, that.transformId) && Objects.equals(this.version, that.version);
|
return Objects.equals(this.transformId, that.transformId)
|
||||||
|
&& Objects.equals(this.version, that.version)
|
||||||
|
&& Objects.equals(this.frequency, that.frequency);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(transformId, version);
|
return Objects.hash(transformId, version, frequency);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,7 @@ import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.Writeable;
|
import org.elasticsearch.common.io.stream.Writeable;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
|
||||||
import org.elasticsearch.common.xcontent.ObjectParser;
|
import org.elasticsearch.common.xcontent.ObjectParser;
|
||||||
import org.elasticsearch.common.xcontent.ToXContentObject;
|
import org.elasticsearch.common.xcontent.ToXContentObject;
|
||||||
|
@ -56,6 +57,7 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
||||||
private final String id;
|
private final String id;
|
||||||
private final SourceConfig source;
|
private final SourceConfig source;
|
||||||
private final DestConfig dest;
|
private final DestConfig dest;
|
||||||
|
private final TimeValue frequency;
|
||||||
private final SyncConfig syncConfig;
|
private final SyncConfig syncConfig;
|
||||||
private final String description;
|
private final String description;
|
||||||
// headers store the user context from the creating user, which allows us to run the transform as this user
|
// headers store the user context from the creating user, which allows us to run the transform as this user
|
||||||
|
@ -88,35 +90,40 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
||||||
SourceConfig source = (SourceConfig) args[1];
|
SourceConfig source = (SourceConfig) args[1];
|
||||||
DestConfig dest = (DestConfig) args[2];
|
DestConfig dest = (DestConfig) args[2];
|
||||||
|
|
||||||
SyncConfig syncConfig = (SyncConfig) args[3];
|
TimeValue frequency =
|
||||||
// ignored, only for internal storage: String docType = (String) args[4];
|
args[3] == null ? null : TimeValue.parseTimeValue((String) args[3], DataFrameField.FREQUENCY.getPreferredName());
|
||||||
|
|
||||||
|
SyncConfig syncConfig = (SyncConfig) args[4];
|
||||||
|
// ignored, only for internal storage: String docType = (String) args[5];
|
||||||
|
|
||||||
// on strict parsing do not allow injection of headers, transform version, or create time
|
// on strict parsing do not allow injection of headers, transform version, or create time
|
||||||
if (lenient == false) {
|
if (lenient == false) {
|
||||||
validateStrictParsingParams(args[5], HEADERS.getPreferredName());
|
validateStrictParsingParams(args[6], HEADERS.getPreferredName());
|
||||||
validateStrictParsingParams(args[8], CREATE_TIME.getPreferredName());
|
validateStrictParsingParams(args[9], CREATE_TIME.getPreferredName());
|
||||||
validateStrictParsingParams(args[9], VERSION.getPreferredName());
|
validateStrictParsingParams(args[10], VERSION.getPreferredName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
Map<String, String> headers = (Map<String, String>) args[5];
|
Map<String, String> headers = (Map<String, String>) args[6];
|
||||||
|
|
||||||
PivotConfig pivotConfig = (PivotConfig) args[6];
|
PivotConfig pivotConfig = (PivotConfig) args[7];
|
||||||
String description = (String)args[7];
|
String description = (String)args[8];
|
||||||
return new DataFrameTransformConfig(id,
|
return new DataFrameTransformConfig(id,
|
||||||
source,
|
source,
|
||||||
dest,
|
dest,
|
||||||
|
frequency,
|
||||||
syncConfig,
|
syncConfig,
|
||||||
headers,
|
headers,
|
||||||
pivotConfig,
|
pivotConfig,
|
||||||
description,
|
description,
|
||||||
(Instant)args[8],
|
(Instant)args[9],
|
||||||
(String)args[9]);
|
(String)args[10]);
|
||||||
});
|
});
|
||||||
|
|
||||||
parser.declareString(optionalConstructorArg(), DataFrameField.ID);
|
parser.declareString(optionalConstructorArg(), DataFrameField.ID);
|
||||||
parser.declareObject(constructorArg(), (p, c) -> SourceConfig.fromXContent(p, lenient), DataFrameField.SOURCE);
|
parser.declareObject(constructorArg(), (p, c) -> SourceConfig.fromXContent(p, lenient), DataFrameField.SOURCE);
|
||||||
parser.declareObject(constructorArg(), (p, c) -> DestConfig.fromXContent(p, lenient), DataFrameField.DESTINATION);
|
parser.declareObject(constructorArg(), (p, c) -> DestConfig.fromXContent(p, lenient), DataFrameField.DESTINATION);
|
||||||
|
parser.declareString(optionalConstructorArg(), DataFrameField.FREQUENCY);
|
||||||
|
|
||||||
parser.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p, lenient), DataFrameField.SYNC);
|
parser.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p, lenient), DataFrameField.SYNC);
|
||||||
|
|
||||||
|
@ -146,6 +153,7 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
||||||
DataFrameTransformConfig(final String id,
|
DataFrameTransformConfig(final String id,
|
||||||
final SourceConfig source,
|
final SourceConfig source,
|
||||||
final DestConfig dest,
|
final DestConfig dest,
|
||||||
|
final TimeValue frequency,
|
||||||
final SyncConfig syncConfig,
|
final SyncConfig syncConfig,
|
||||||
final Map<String, String> headers,
|
final Map<String, String> headers,
|
||||||
final PivotConfig pivotConfig,
|
final PivotConfig pivotConfig,
|
||||||
|
@ -155,6 +163,7 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
||||||
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
|
this.id = ExceptionsHelper.requireNonNull(id, DataFrameField.ID.getPreferredName());
|
||||||
this.source = ExceptionsHelper.requireNonNull(source, DataFrameField.SOURCE.getPreferredName());
|
this.source = ExceptionsHelper.requireNonNull(source, DataFrameField.SOURCE.getPreferredName());
|
||||||
this.dest = ExceptionsHelper.requireNonNull(dest, DataFrameField.DESTINATION.getPreferredName());
|
this.dest = ExceptionsHelper.requireNonNull(dest, DataFrameField.DESTINATION.getPreferredName());
|
||||||
|
this.frequency = frequency;
|
||||||
this.syncConfig = syncConfig;
|
this.syncConfig = syncConfig;
|
||||||
this.setHeaders(headers == null ? Collections.emptyMap() : headers);
|
this.setHeaders(headers == null ? Collections.emptyMap() : headers);
|
||||||
this.pivotConfig = pivotConfig;
|
this.pivotConfig = pivotConfig;
|
||||||
|
@ -174,17 +183,23 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
||||||
public DataFrameTransformConfig(final String id,
|
public DataFrameTransformConfig(final String id,
|
||||||
final SourceConfig source,
|
final SourceConfig source,
|
||||||
final DestConfig dest,
|
final DestConfig dest,
|
||||||
|
final TimeValue frequency,
|
||||||
final SyncConfig syncConfig,
|
final SyncConfig syncConfig,
|
||||||
final Map<String, String> headers,
|
final Map<String, String> headers,
|
||||||
final PivotConfig pivotConfig,
|
final PivotConfig pivotConfig,
|
||||||
final String description) {
|
final String description) {
|
||||||
this(id, source, dest, syncConfig, headers, pivotConfig, description, null, null);
|
this(id, source, dest, frequency, syncConfig, headers, pivotConfig, description, null, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
public DataFrameTransformConfig(final StreamInput in) throws IOException {
|
public DataFrameTransformConfig(final StreamInput in) throws IOException {
|
||||||
id = in.readString();
|
id = in.readString();
|
||||||
source = new SourceConfig(in);
|
source = new SourceConfig(in);
|
||||||
dest = new DestConfig(in);
|
dest = new DestConfig(in);
|
||||||
|
if (in.getVersion().onOrAfter(Version.V_7_3_0)) {
|
||||||
|
frequency = in.readOptionalTimeValue();
|
||||||
|
} else {
|
||||||
|
frequency = null;
|
||||||
|
}
|
||||||
setHeaders(in.readMap(StreamInput::readString, StreamInput::readString));
|
setHeaders(in.readMap(StreamInput::readString, StreamInput::readString));
|
||||||
pivotConfig = in.readOptionalWriteable(PivotConfig::new);
|
pivotConfig = in.readOptionalWriteable(PivotConfig::new);
|
||||||
description = in.readOptionalString();
|
description = in.readOptionalString();
|
||||||
|
@ -211,6 +226,10 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
||||||
return dest;
|
return dest;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public TimeValue getFrequency() {
|
||||||
|
return frequency;
|
||||||
|
}
|
||||||
|
|
||||||
public SyncConfig getSyncConfig() {
|
public SyncConfig getSyncConfig() {
|
||||||
return syncConfig;
|
return syncConfig;
|
||||||
}
|
}
|
||||||
|
@ -269,6 +288,9 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
||||||
out.writeString(id);
|
out.writeString(id);
|
||||||
source.writeTo(out);
|
source.writeTo(out);
|
||||||
dest.writeTo(out);
|
dest.writeTo(out);
|
||||||
|
if (out.getVersion().onOrAfter(Version.V_7_3_0)) {
|
||||||
|
out.writeOptionalTimeValue(frequency);
|
||||||
|
}
|
||||||
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
|
out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString);
|
||||||
out.writeOptionalWriteable(pivotConfig);
|
out.writeOptionalWriteable(pivotConfig);
|
||||||
out.writeOptionalString(description);
|
out.writeOptionalString(description);
|
||||||
|
@ -290,6 +312,9 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
||||||
builder.field(DataFrameField.ID.getPreferredName(), id);
|
builder.field(DataFrameField.ID.getPreferredName(), id);
|
||||||
builder.field(DataFrameField.SOURCE.getPreferredName(), source);
|
builder.field(DataFrameField.SOURCE.getPreferredName(), source);
|
||||||
builder.field(DataFrameField.DESTINATION.getPreferredName(), dest);
|
builder.field(DataFrameField.DESTINATION.getPreferredName(), dest);
|
||||||
|
if (frequency != null) {
|
||||||
|
builder.field(DataFrameField.FREQUENCY.getPreferredName(), frequency.getStringRep());
|
||||||
|
}
|
||||||
if (syncConfig != null) {
|
if (syncConfig != null) {
|
||||||
builder.startObject(DataFrameField.SYNC.getPreferredName());
|
builder.startObject(DataFrameField.SYNC.getPreferredName());
|
||||||
builder.field(syncConfig.getWriteableName(), syncConfig);
|
builder.field(syncConfig.getWriteableName(), syncConfig);
|
||||||
|
@ -332,6 +357,7 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
||||||
return Objects.equals(this.id, that.id)
|
return Objects.equals(this.id, that.id)
|
||||||
&& Objects.equals(this.source, that.source)
|
&& Objects.equals(this.source, that.source)
|
||||||
&& Objects.equals(this.dest, that.dest)
|
&& Objects.equals(this.dest, that.dest)
|
||||||
|
&& Objects.equals(this.frequency, that.frequency)
|
||||||
&& Objects.equals(this.syncConfig, that.syncConfig)
|
&& Objects.equals(this.syncConfig, that.syncConfig)
|
||||||
&& Objects.equals(this.headers, that.headers)
|
&& Objects.equals(this.headers, that.headers)
|
||||||
&& Objects.equals(this.pivotConfig, that.pivotConfig)
|
&& Objects.equals(this.pivotConfig, that.pivotConfig)
|
||||||
|
@ -342,7 +368,7 @@ public class DataFrameTransformConfig extends AbstractDiffable<DataFrameTransfor
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode(){
|
public int hashCode(){
|
||||||
return Objects.hash(id, source, dest, syncConfig, headers, pivotConfig, description, createTime, transformVersion);
|
return Objects.hash(id, source, dest, frequency, syncConfig, headers, pivotConfig, description, createTime, transformVersion);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -44,6 +44,7 @@ public class PreviewDataFrameTransformActionRequestTests extends AbstractSeriali
|
||||||
"transform-preview",
|
"transform-preview",
|
||||||
randomSourceConfig(),
|
randomSourceConfig(),
|
||||||
new DestConfig("unused-transform-preview-index", null),
|
new DestConfig("unused-transform-preview-index", null),
|
||||||
|
null,
|
||||||
randomBoolean() ? DataFrameTransformConfigTests.randomSyncConfig() : null,
|
randomBoolean() ? DataFrameTransformConfigTests.randomSyncConfig() : null,
|
||||||
null,
|
null,
|
||||||
PivotConfigTests.randomPivotConfig(),
|
PivotConfigTests.randomPivotConfig(),
|
||||||
|
|
|
@ -9,6 +9,7 @@ package org.elasticsearch.xpack.core.dataframe.transforms;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.DeprecationHandler;
|
import org.elasticsearch.common.xcontent.DeprecationHandler;
|
||||||
import org.elasticsearch.common.xcontent.ToXContent;
|
import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
@ -46,6 +47,7 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
|
||||||
return new DataFrameTransformConfig(id,
|
return new DataFrameTransformConfig(id,
|
||||||
randomSourceConfig(),
|
randomSourceConfig(),
|
||||||
randomDestConfig(),
|
randomDestConfig(),
|
||||||
|
randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)),
|
||||||
randomBoolean() ? null : randomSyncConfig(),
|
randomBoolean() ? null : randomSyncConfig(),
|
||||||
null,
|
null,
|
||||||
PivotConfigTests.randomPivotConfig(),
|
PivotConfigTests.randomPivotConfig(),
|
||||||
|
@ -58,6 +60,7 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
|
||||||
return new DataFrameTransformConfig(id,
|
return new DataFrameTransformConfig(id,
|
||||||
randomSourceConfig(),
|
randomSourceConfig(),
|
||||||
randomDestConfig(),
|
randomDestConfig(),
|
||||||
|
randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)),
|
||||||
randomBoolean() ? null : randomSyncConfig(),
|
randomBoolean() ? null : randomSyncConfig(),
|
||||||
randomHeaders(),
|
randomHeaders(),
|
||||||
PivotConfigTests.randomPivotConfig(),
|
PivotConfigTests.randomPivotConfig(),
|
||||||
|
@ -69,11 +72,11 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
|
||||||
public static DataFrameTransformConfig randomInvalidDataFrameTransformConfig() {
|
public static DataFrameTransformConfig randomInvalidDataFrameTransformConfig() {
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomInvalidSourceConfig(), randomDestConfig(),
|
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomInvalidSourceConfig(), randomDestConfig(),
|
||||||
randomBoolean() ? randomSyncConfig() : null, randomHeaders(), PivotConfigTests.randomPivotConfig(),
|
null, randomBoolean() ? randomSyncConfig() : null, randomHeaders(), PivotConfigTests.randomPivotConfig(),
|
||||||
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000));
|
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000));
|
||||||
} // else
|
} // else
|
||||||
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomSourceConfig(), randomDestConfig(),
|
return new DataFrameTransformConfig(randomAlphaOfLengthBetween(1, 10), randomSourceConfig(), randomDestConfig(),
|
||||||
randomBoolean() ? randomSyncConfig() : null, randomHeaders(), PivotConfigTests.randomInvalidPivotConfig(),
|
null, randomBoolean() ? randomSyncConfig() : null, randomHeaders(), PivotConfigTests.randomInvalidPivotConfig(),
|
||||||
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000));
|
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,7 +149,7 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testPreventHeaderInjection() throws IOException {
|
public void testPreventHeaderInjection() {
|
||||||
String pivotTransform = "{"
|
String pivotTransform = "{"
|
||||||
+ " \"headers\" : {\"key\" : \"value\" },"
|
+ " \"headers\" : {\"key\" : \"value\" },"
|
||||||
+ " \"source\" : {\"index\":\"src\"},"
|
+ " \"source\" : {\"index\":\"src\"},"
|
||||||
|
@ -167,7 +170,7 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
|
||||||
() -> createDataFrameTransformConfigFromString(pivotTransform, "test_header_injection"));
|
() -> createDataFrameTransformConfigFromString(pivotTransform, "test_header_injection"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testPreventCreateTimeInjection() throws IOException {
|
public void testPreventCreateTimeInjection() {
|
||||||
String pivotTransform = "{"
|
String pivotTransform = "{"
|
||||||
+ " \"create_time\" : " + Instant.now().toEpochMilli() + " },"
|
+ " \"create_time\" : " + Instant.now().toEpochMilli() + " },"
|
||||||
+ " \"source\" : {\"index\":\"src\"},"
|
+ " \"source\" : {\"index\":\"src\"},"
|
||||||
|
@ -188,7 +191,7 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
|
||||||
() -> createDataFrameTransformConfigFromString(pivotTransform, "test_createTime_injection"));
|
() -> createDataFrameTransformConfigFromString(pivotTransform, "test_createTime_injection"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testPreventVersionInjection() throws IOException {
|
public void testPreventVersionInjection() {
|
||||||
String pivotTransform = "{"
|
String pivotTransform = "{"
|
||||||
+ " \"version\" : \"7.3.0\","
|
+ " \"version\" : \"7.3.0\","
|
||||||
+ " \"source\" : {\"index\":\"src\"},"
|
+ " \"source\" : {\"index\":\"src\"},"
|
||||||
|
@ -229,11 +232,11 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT
|
||||||
|
|
||||||
public void testMaxLengthDescription() {
|
public void testMaxLengthDescription() {
|
||||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> new DataFrameTransformConfig("id",
|
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> new DataFrameTransformConfig("id",
|
||||||
randomSourceConfig(), randomDestConfig(), null, null, PivotConfigTests.randomPivotConfig(), randomAlphaOfLength(1001)));
|
randomSourceConfig(), randomDestConfig(), null, null, null, PivotConfigTests.randomPivotConfig(), randomAlphaOfLength(1001)));
|
||||||
assertThat(exception.getMessage(), equalTo("[description] must be less than 1000 characters in length."));
|
assertThat(exception.getMessage(), equalTo("[description] must be less than 1000 characters in length."));
|
||||||
String description = randomAlphaOfLength(1000);
|
String description = randomAlphaOfLength(1000);
|
||||||
DataFrameTransformConfig config = new DataFrameTransformConfig("id",
|
DataFrameTransformConfig config = new DataFrameTransformConfig("id",
|
||||||
randomSourceConfig(), randomDestConfig(), null, null, PivotConfigTests.randomPivotConfig(), description);
|
randomSourceConfig(), randomDestConfig(), null, null, null, PivotConfigTests.randomPivotConfig(), description);
|
||||||
assertThat(description, equalTo(config.getDescription()));
|
assertThat(description, equalTo(config.getDescription()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
import org.elasticsearch.common.io.stream.Writeable.Reader;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -25,7 +26,8 @@ public class DataFrameTransformTests extends AbstractSerializingDataFrameTestCas
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected DataFrameTransform createTestInstance() {
|
protected DataFrameTransform createTestInstance() {
|
||||||
return new DataFrameTransform(randomAlphaOfLength(10), randomBoolean() ? null : Version.CURRENT);
|
return new DataFrameTransform(randomAlphaOfLength(10), randomBoolean() ? null : Version.CURRENT,
|
||||||
|
randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000)));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -210,6 +210,7 @@ abstract class DataFrameIntegTestCase extends ESRestTestCase {
|
||||||
.setId(id)
|
.setId(id)
|
||||||
.setSource(SourceConfig.builder().setIndex(sourceIndices).setQueryConfig(createQueryConfig(queryBuilder)).build())
|
.setSource(SourceConfig.builder().setIndex(sourceIndices).setQueryConfig(createQueryConfig(queryBuilder)).build())
|
||||||
.setDest(DestConfig.builder().setIndex(destinationIndex).build())
|
.setDest(DestConfig.builder().setIndex(destinationIndex).build())
|
||||||
|
.setFrequency(TimeValue.timeValueSeconds(10))
|
||||||
.setPivotConfig(createPivotConfig(groups, aggregations))
|
.setPivotConfig(createPivotConfig(groups, aggregations))
|
||||||
.setDescription("Test data frame transform config id: " + id);
|
.setDescription("Test data frame transform config id: " + id);
|
||||||
}
|
}
|
||||||
|
|
|
@ -214,6 +214,7 @@ public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase {
|
||||||
final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId, null);
|
final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId, null);
|
||||||
String config = "{ \"dest\": {\"index\":\"" + transformDest + "\"},"
|
String config = "{ \"dest\": {\"index\":\"" + transformDest + "\"},"
|
||||||
+ " \"source\": {\"index\":\"" + transformSrc + "\"},"
|
+ " \"source\": {\"index\":\"" + transformSrc + "\"},"
|
||||||
|
+ " \"frequency\": \"1s\","
|
||||||
+ " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"1s\"}},"
|
+ " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"1s\"}},"
|
||||||
+ " \"pivot\": {"
|
+ " \"pivot\": {"
|
||||||
+ " \"group_by\": {"
|
+ " \"group_by\": {"
|
||||||
|
|
|
@ -141,6 +141,7 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase {
|
||||||
String config = "{"
|
String config = "{"
|
||||||
+ " \"source\": {\"index\":\"" + indexName + "\"},"
|
+ " \"source\": {\"index\":\"" + indexName + "\"},"
|
||||||
+ " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},"
|
+ " \"dest\": {\"index\":\"" + dataFrameIndex + "\"},"
|
||||||
|
+ " \"frequency\": \"1s\","
|
||||||
+ " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}},"
|
+ " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}},"
|
||||||
+ " \"pivot\": {"
|
+ " \"pivot\": {"
|
||||||
+ " \"group_by\": {"
|
+ " \"group_by\": {"
|
||||||
|
|
|
@ -132,6 +132,7 @@ public class DataFrameTransformProgressIT extends ESRestTestCase {
|
||||||
destConfig,
|
destConfig,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
pivotConfig,
|
pivotConfig,
|
||||||
null);
|
null);
|
||||||
|
|
||||||
|
@ -156,6 +157,7 @@ public class DataFrameTransformProgressIT extends ESRestTestCase {
|
||||||
destConfig,
|
destConfig,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
pivotConfig,
|
pivotConfig,
|
||||||
null);
|
null);
|
||||||
|
|
||||||
|
@ -175,6 +177,7 @@ public class DataFrameTransformProgressIT extends ESRestTestCase {
|
||||||
destConfig,
|
destConfig,
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
pivotConfig,
|
pivotConfig,
|
||||||
null);
|
null);
|
||||||
|
|
||||||
|
|
|
@ -185,7 +185,7 @@ public class TransportStartDataFrameTransformAction extends
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
transformTaskHolder.set(createDataFrameTransform(config.getId(), config.getVersion()));
|
transformTaskHolder.set(createDataFrameTransform(config.getId(), config.getVersion(), config.getFrequency()));
|
||||||
final String destinationIndex = config.getDestination().getIndex();
|
final String destinationIndex = config.getDestination().getIndex();
|
||||||
String[] dest = indexNameExpressionResolver.concreteIndexNames(state,
|
String[] dest = indexNameExpressionResolver.concreteIndexNames(state,
|
||||||
IndicesOptions.lenientExpandOpen(),
|
IndicesOptions.lenientExpandOpen(),
|
||||||
|
@ -254,8 +254,8 @@ public class TransportStartDataFrameTransformAction extends
|
||||||
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static DataFrameTransform createDataFrameTransform(String transformId, Version transformVersion) {
|
private static DataFrameTransform createDataFrameTransform(String transformId, Version transformVersion, TimeValue frequency) {
|
||||||
return new DataFrameTransform(transformId, transformVersion);
|
return new DataFrameTransform(transformId, transformVersion, frequency);
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
|
|
|
@ -20,6 +20,7 @@ import org.elasticsearch.action.search.SearchAction;
|
||||||
import org.elasticsearch.action.search.SearchRequest;
|
import org.elasticsearch.action.search.SearchRequest;
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.index.IndexNotFoundException;
|
import org.elasticsearch.index.IndexNotFoundException;
|
||||||
import org.elasticsearch.persistent.AllocatedPersistentTask;
|
import org.elasticsearch.persistent.AllocatedPersistentTask;
|
||||||
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
|
||||||
|
@ -59,8 +60,8 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
public class DataFrameTransformTask extends AllocatedPersistentTask implements SchedulerEngine.Listener {
|
public class DataFrameTransformTask extends AllocatedPersistentTask implements SchedulerEngine.Listener {
|
||||||
|
|
||||||
// interval the scheduler sends an event
|
// Default interval the scheduler sends an event if the config does not specify a frequency
|
||||||
private static final int SCHEDULER_NEXT_MILLISECONDS = 10000;
|
private static final long SCHEDULER_NEXT_MILLISECONDS = 60000;
|
||||||
private static final Logger logger = LogManager.getLogger(DataFrameTransformTask.class);
|
private static final Logger logger = LogManager.getLogger(DataFrameTransformTask.class);
|
||||||
// TODO consider moving to dynamic cluster setting
|
// TODO consider moving to dynamic cluster setting
|
||||||
private static final int MAX_CONTINUOUS_FAILURES = 10;
|
private static final int MAX_CONTINUOUS_FAILURES = 10;
|
||||||
|
@ -363,7 +364,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S
|
||||||
|
|
||||||
private SchedulerEngine.Schedule next() {
|
private SchedulerEngine.Schedule next() {
|
||||||
return (startTime, now) -> {
|
return (startTime, now) -> {
|
||||||
return now + SCHEDULER_NEXT_MILLISECONDS;
|
TimeValue frequency = transform.getFrequency();
|
||||||
|
return now + (frequency == null ? SCHEDULER_NEXT_MILLISECONDS : frequency.getMillis());
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,10 +31,10 @@ public class DataFrameNodesTests extends ESTestCase {
|
||||||
|
|
||||||
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
|
||||||
tasksBuilder.addTask(dataFrameIdFoo,
|
tasksBuilder.addTask(dataFrameIdFoo,
|
||||||
DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdFoo, Version.CURRENT),
|
DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdFoo, Version.CURRENT, null),
|
||||||
new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
|
new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment"));
|
||||||
tasksBuilder.addTask(dataFrameIdBar,
|
tasksBuilder.addTask(dataFrameIdBar,
|
||||||
DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdBar, Version.CURRENT),
|
DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdBar, Version.CURRENT, null),
|
||||||
new PersistentTasksCustomMetaData.Assignment("node-2", "test assignment"));
|
new PersistentTasksCustomMetaData.Assignment("node-2", "test assignment"));
|
||||||
tasksBuilder.addTask("test-task1", "testTasks", new PersistentTaskParams() {
|
tasksBuilder.addTask("test-task1", "testTasks", new PersistentTaskParams() {
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -186,6 +186,7 @@ public class DataFrameIndexerTests extends ESTestCase {
|
||||||
randomDestConfig(),
|
randomDestConfig(),
|
||||||
null,
|
null,
|
||||||
null,
|
null,
|
||||||
|
null,
|
||||||
new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig(), pageSize),
|
new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig(), pageSize),
|
||||||
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000));
|
randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000));
|
||||||
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
|
AtomicReference<IndexerState> state = new AtomicReference<>(IndexerState.STOPPED);
|
||||||
|
|
|
@ -52,15 +52,15 @@ public class DataFrameTransformPersistentTasksExecutorTests extends ESTestCase {
|
||||||
PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder()
|
PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder()
|
||||||
.addTask("data-frame-task-1",
|
.addTask("data-frame-task-1",
|
||||||
DataFrameTransform.NAME,
|
DataFrameTransform.NAME,
|
||||||
new DataFrameTransform("data-frame-task-1", Version.CURRENT),
|
new DataFrameTransform("data-frame-task-1", Version.CURRENT, null),
|
||||||
new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", ""))
|
new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", ""))
|
||||||
.addTask("data-frame-task-2",
|
.addTask("data-frame-task-2",
|
||||||
DataFrameTransform.NAME,
|
DataFrameTransform.NAME,
|
||||||
new DataFrameTransform("data-frame-task-2", Version.CURRENT),
|
new DataFrameTransform("data-frame-task-2", Version.CURRENT, null),
|
||||||
new PersistentTasksCustomMetaData.Assignment("current-data-node-with-2-tasks", ""))
|
new PersistentTasksCustomMetaData.Assignment("current-data-node-with-2-tasks", ""))
|
||||||
.addTask("data-frame-task-3",
|
.addTask("data-frame-task-3",
|
||||||
DataFrameTransform.NAME,
|
DataFrameTransform.NAME,
|
||||||
new DataFrameTransform("data-frame-task-3", Version.CURRENT),
|
new DataFrameTransform("data-frame-task-3", Version.CURRENT, null),
|
||||||
new PersistentTasksCustomMetaData.Assignment("current-data-node-with-2-tasks", ""));
|
new PersistentTasksCustomMetaData.Assignment("current-data-node-with-2-tasks", ""));
|
||||||
|
|
||||||
PersistentTasksCustomMetaData pTasks = pTasksBuilder.build();
|
PersistentTasksCustomMetaData pTasks = pTasksBuilder.build();
|
||||||
|
@ -106,9 +106,9 @@ public class DataFrameTransformPersistentTasksExecutorTests extends ESTestCase {
|
||||||
new DataFrameAuditor(client, ""),
|
new DataFrameAuditor(client, ""),
|
||||||
mock(ThreadPool.class));
|
mock(ThreadPool.class));
|
||||||
|
|
||||||
assertThat(executor.getAssignment(new DataFrameTransform("new-task-id", Version.CURRENT), cs).getExecutorNode(),
|
assertThat(executor.getAssignment(new DataFrameTransform("new-task-id", Version.CURRENT, null), cs).getExecutorNode(),
|
||||||
equalTo("current-data-node-with-1-tasks"));
|
equalTo("current-data-node-with-1-tasks"));
|
||||||
assertThat(executor.getAssignment(new DataFrameTransform("new-old-task-id", Version.V_7_2_0), cs).getExecutorNode(),
|
assertThat(executor.getAssignment(new DataFrameTransform("new-old-task-id", Version.V_7_2_0, null), cs).getExecutorNode(),
|
||||||
equalTo("past-data-node-1"));
|
equalTo("past-data-node-1"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -42,6 +42,40 @@ setup:
|
||||||
data_frame.delete_data_frame_transform:
|
data_frame.delete_data_frame_transform:
|
||||||
transform_id: "missing transform"
|
transform_id: "missing transform"
|
||||||
|
|
||||||
|
---
|
||||||
|
"Test put transform with frequency too low":
|
||||||
|
- do:
|
||||||
|
catch: /minimum permitted \[frequency\] is \[1s\]/
|
||||||
|
data_frame.put_data_frame_transform:
|
||||||
|
transform_id: "frequency-too-low"
|
||||||
|
body: >
|
||||||
|
{
|
||||||
|
"source": { "index": "airline-data" },
|
||||||
|
"dest": { "index": "airline-dest" },
|
||||||
|
"frequency": "999ms",
|
||||||
|
"pivot": {
|
||||||
|
"group_by": { "airline": {"terms": {"field": "airline"}}},
|
||||||
|
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
---
|
||||||
|
"Test put transform with frequency too high":
|
||||||
|
- do:
|
||||||
|
catch: /highest permitted \[frequency\] is \[1h\]/
|
||||||
|
data_frame.put_data_frame_transform:
|
||||||
|
transform_id: "frequency-too-low"
|
||||||
|
body: >
|
||||||
|
{
|
||||||
|
"source": { "index": "airline-data" },
|
||||||
|
"dest": { "index": "airline-dest" },
|
||||||
|
"frequency": "3600001ms",
|
||||||
|
"pivot": {
|
||||||
|
"group_by": { "airline": {"terms": {"field": "airline"}}},
|
||||||
|
"aggs": {"avg_response": {"avg": {"field": "responsetime"}}}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
---
|
---
|
||||||
"Test put transform with invalid source index":
|
"Test put transform with invalid source index":
|
||||||
- do:
|
- do:
|
||||||
|
|
Loading…
Reference in New Issue