From cb62d4acdf44d29bc1ceb0148bad8578c7521066 Mon Sep 17 00:00:00 2001 From: David Roberts Date: Wed, 10 Jul 2019 09:35:23 +0100 Subject: [PATCH] [ML-DataFrame] Add a frequency option to transform config, default 1m (#44120) Previously a data frame transform would check whether the source index was changed every 10 seconds. Sometimes it may be desirable for the check to be done less frequently. This commit increases the default to 60 seconds but also allows the frequency to be overridden by a setting in the data frame transform config. --- .../transforms/DataFrameTransformConfig.java | 39 ++++++++++++--- .../DataFrameTransformConfigTests.java | 2 + .../DataFrameTransformDocumentationIT.java | 5 +- .../dataframe/put_data_frame.asciidoc | 5 +- .../data-frames/apis/put-transform.asciidoc | 6 +++ .../xpack/core/dataframe/DataFrameField.java | 1 + .../action/PutDataFrameTransformAction.java | 17 +++++++ .../transforms/DataFrameTransform.java | 37 +++++++++++--- .../transforms/DataFrameTransformConfig.java | 50 ++++++++++++++----- ...wDataFrameTransformActionRequestTests.java | 1 + .../DataFrameTransformConfigTests.java | 17 ++++--- .../transforms/DataFrameTransformTests.java | 4 +- .../integration/DataFrameIntegTestCase.java | 1 + .../DataFrameGetAndGetStatsIT.java | 1 + .../integration/DataFramePivotRestIT.java | 1 + .../DataFrameTransformProgressIT.java | 3 ++ ...ransportStartDataFrameTransformAction.java | 6 +-- .../transforms/DataFrameTransformTask.java | 8 +-- .../dataframe/action/DataFrameNodesTests.java | 4 +- .../transforms/DataFrameIndexerTests.java | 1 + ...TransformPersistentTasksExecutorTests.java | 10 ++-- .../test/data_frame/transforms_crud.yml | 34 +++++++++++++ 22 files changed, 201 insertions(+), 52 deletions(-) diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java index 355e3ad9bbc..2810d6a8cfa 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfig.java @@ -25,6 +25,7 @@ import org.elasticsearch.client.dataframe.transforms.util.TimeUtil; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; @@ -44,6 +45,7 @@ public class DataFrameTransformConfig implements ToXContentObject { public static final ParseField ID = new ParseField("id"); public static final ParseField SOURCE = new ParseField("source"); public static final ParseField DEST = new ParseField("dest"); + public static final ParseField FREQUENCY = new ParseField("frequency"); public static final ParseField DESCRIPTION = new ParseField("description"); public static final ParseField SYNC = new ParseField("sync"); public static final ParseField VERSION = new ParseField("version"); @@ -54,6 +56,7 @@ public class DataFrameTransformConfig implements ToXContentObject { private final String id; private final SourceConfig source; private final DestConfig dest; + private final TimeValue frequency; private final SyncConfig syncConfig; private final PivotConfig pivotConfig; private final String description; @@ -66,14 +69,16 @@ public class DataFrameTransformConfig implements ToXContentObject { String id = (String) args[0]; SourceConfig source = (SourceConfig) args[1]; DestConfig dest = (DestConfig) args[2]; - SyncConfig syncConfig = (SyncConfig) args[3]; - PivotConfig pivotConfig = (PivotConfig) args[4]; - String description = (String)args[5]; - Instant createTime = (Instant)args[6]; - String transformVersion = (String)args[7]; + TimeValue frequency = (TimeValue) args[3]; + SyncConfig syncConfig = (SyncConfig) args[4]; + PivotConfig pivotConfig = (PivotConfig) args[5]; + String description = (String)args[6]; + Instant createTime = (Instant)args[7]; + String transformVersion = (String)args[8]; return new DataFrameTransformConfig(id, source, dest, + frequency, syncConfig, pivotConfig, description, @@ -85,6 +90,8 @@ public class DataFrameTransformConfig implements ToXContentObject { PARSER.declareString(constructorArg(), ID); PARSER.declareObject(constructorArg(), (p, c) -> SourceConfig.PARSER.apply(p, null), SOURCE); PARSER.declareObject(constructorArg(), (p, c) -> DestConfig.PARSER.apply(p, null), DEST); + PARSER.declareField(optionalConstructorArg(), p -> TimeValue.parseTimeValue(p.text(), FREQUENCY.getPreferredName()), + FREQUENCY, ObjectParser.ValueType.STRING); PARSER.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p), SYNC); PARSER.declareObject(optionalConstructorArg(), (p, c) -> PivotConfig.fromXContent(p), PIVOT_TRANSFORM); PARSER.declareString(optionalConstructorArg(), DESCRIPTION); @@ -118,12 +125,13 @@ public class DataFrameTransformConfig implements ToXContentObject { * @return A DataFrameTransformConfig to preview, NOTE it will have a {@code null} id, destination and index. */ public static DataFrameTransformConfig forPreview(final SourceConfig source, final PivotConfig pivotConfig) { - return new DataFrameTransformConfig(null, source, null, null, pivotConfig, null, null, null); + return new DataFrameTransformConfig(null, source, null, null, null, pivotConfig, null, null, null); } DataFrameTransformConfig(final String id, final SourceConfig source, final DestConfig dest, + final TimeValue frequency, final SyncConfig syncConfig, final PivotConfig pivotConfig, final String description, @@ -132,6 +140,7 @@ public class DataFrameTransformConfig implements ToXContentObject { this.id = id; this.source = source; this.dest = dest; + this.frequency = frequency; this.syncConfig = syncConfig; this.pivotConfig = pivotConfig; this.description = description; @@ -151,6 +160,10 @@ public class DataFrameTransformConfig implements ToXContentObject { return dest; } + public TimeValue getFrequency() { + return frequency; + } + public SyncConfig getSyncConfig() { return syncConfig; } @@ -184,6 +197,9 @@ public class DataFrameTransformConfig implements ToXContentObject { if (dest != null) { builder.field(DEST.getPreferredName(), dest); } + if (frequency != null) { + builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep()); + } if (syncConfig != null) { builder.startObject(SYNC.getPreferredName()); builder.field(syncConfig.getName(), syncConfig); @@ -220,6 +236,7 @@ public class DataFrameTransformConfig implements ToXContentObject { return Objects.equals(this.id, that.id) && Objects.equals(this.source, that.source) && Objects.equals(this.dest, that.dest) + && Objects.equals(this.frequency, that.frequency) && Objects.equals(this.description, that.description) && Objects.equals(this.syncConfig, that.syncConfig) && Objects.equals(this.transformVersion, that.transformVersion) @@ -229,7 +246,7 @@ public class DataFrameTransformConfig implements ToXContentObject { @Override public int hashCode() { - return Objects.hash(id, source, dest, syncConfig, pivotConfig, description); + return Objects.hash(id, source, dest, frequency, syncConfig, pivotConfig, description); } @Override @@ -246,6 +263,7 @@ public class DataFrameTransformConfig implements ToXContentObject { private String id; private SourceConfig source; private DestConfig dest; + private TimeValue frequency; private SyncConfig syncConfig; private PivotConfig pivotConfig; private String description; @@ -265,6 +283,11 @@ public class DataFrameTransformConfig implements ToXContentObject { return this; } + public Builder setFrequency(TimeValue frequency) { + this.frequency = frequency; + return this; + } + public Builder setSyncConfig(SyncConfig syncConfig) { this.syncConfig = syncConfig; return this; @@ -281,7 +304,7 @@ public class DataFrameTransformConfig implements ToXContentObject { } public DataFrameTransformConfig build() { - return new DataFrameTransformConfig(id, source, dest, syncConfig, pivotConfig, description, null, null); + return new DataFrameTransformConfig(id, source, dest, frequency, syncConfig, pivotConfig, description, null, null); } } } diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java index 212ff64555e..88191809e22 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/dataframe/transforms/DataFrameTransformConfigTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.client.dataframe.DataFrameNamedXContentProvider; import org.elasticsearch.Version; import org.elasticsearch.client.dataframe.transforms.pivot.PivotConfigTests; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.search.SearchModule; @@ -43,6 +44,7 @@ public class DataFrameTransformConfigTests extends AbstractXContentTestCase .setSource(sourceConfig) // <2> .setDest(destConfig) // <3> - .setPivotConfig(pivotConfig) // <4> - .setDescription("This is my test transform") // <5> + .setFrequency(TimeValue.timeValueSeconds(15)) // <4> + .setPivotConfig(pivotConfig) // <5> + .setDescription("This is my test transform") // <6> .build(); // end::put-data-frame-transform-config diff --git a/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc b/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc index 19c7fe443db..3ba16a987f9 100644 --- a/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc +++ b/docs/java-rest/high-level/dataframe/put_data_frame.asciidoc @@ -34,8 +34,9 @@ include-tagged::{doc-tests-file}[{api}-config] <1> The {dataframe-transform} ID <2> The source indices and query from which to gather data <3> The destination index and optional pipeline -<4> The PivotConfig -<5> Optional free text description of the transform +<4> How often to check for updates to the source indices +<5> The PivotConfig +<6> Optional free text description of the transform [id="{upid}-{api}-query-config"] diff --git a/docs/reference/data-frames/apis/put-transform.asciidoc b/docs/reference/data-frames/apis/put-transform.asciidoc index abc5779e12a..d8fe652639d 100644 --- a/docs/reference/data-frames/apis/put-transform.asciidoc +++ b/docs/reference/data-frames/apis/put-transform.asciidoc @@ -60,6 +60,11 @@ IMPORTANT: You must use {kib} or this API to create a {dataframe-transform}. (object) The destination configuration, which consists of `index` and optionally a `pipeline` id. See <>. +`frequency` (Optional):: + (time units) The interval between checks for changes in the source indices + when the {dataframe-transform} is running continuously. Defaults to `1m`. + The lowest permitted value is `1s`; the highest `1h`. + `pivot` (Optional):: (object) Defines the pivot function `group by` fields and the aggregation to reduce the data. See <>. @@ -90,6 +95,7 @@ PUT _data_frame/transforms/ecommerce_transform "index": "kibana_sample_data_ecommerce_transform", "pipeline": "add_timestamp_pipeline" }, + "frequency": "5m", "pivot": { "group_by": { "customer_id": { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java index 2dedef82eb3..9d5db1e5022 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/DataFrameField.java @@ -26,6 +26,7 @@ public final class DataFrameField { public static final ParseField INDEX_DOC_TYPE = new ParseField("doc_type"); public static final ParseField SOURCE = new ParseField("source"); public static final ParseField DESTINATION = new ParseField("dest"); + public static final ParseField FREQUENCY = new ParseField("frequency"); public static final ParseField FORCE = new ParseField("force"); public static final ParseField MAX_PAGE_SEARCH_SIZE = new ParseField("max_page_search_size"); public static final ParseField FIELD = new ParseField("field"); diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java index 00873d76307..b58c1154a59 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/action/PutDataFrameTransformAction.java @@ -13,6 +13,7 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -34,6 +35,9 @@ public class PutDataFrameTransformAction extends ActionType 0) { + validationException = addValidationError( + "highest permitted [" + DataFrameField.FREQUENCY + "] is [" + MAX_FREQUENCY.getStringRep() + "]", + validationException); + } + } + return validationException; } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransform.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransform.java index e620e4f8595..f7c14b0439a 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransform.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransform.java @@ -11,6 +11,7 @@ import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; @@ -24,25 +25,30 @@ public class DataFrameTransform extends AbstractDiffable imp public static final String NAME = DataFrameField.TASK_NAME; public static final ParseField VERSION = new ParseField(DataFrameField.VERSION); + public static final ParseField FREQUENCY = DataFrameField.FREQUENCY; private final String transformId; private final Version version; + private final TimeValue frequency; - public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, - a -> new DataFrameTransform((String) a[0], (String) a[1])); + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>(NAME, true, + a -> new DataFrameTransform((String) a[0], (String) a[1], (String) a[2])); static { PARSER.declareString(ConstructingObjectParser.constructorArg(), DataFrameField.ID); PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), VERSION); + PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), FREQUENCY); } - private DataFrameTransform(String transformId, String version) { - this(transformId, version == null ? null : Version.fromString(version)); + private DataFrameTransform(String transformId, String version, String frequency) { + this(transformId, version == null ? null : Version.fromString(version), + frequency == null ? null : TimeValue.parseTimeValue(frequency, FREQUENCY.getPreferredName())); } - public DataFrameTransform(String transformId, Version version) { + public DataFrameTransform(String transformId, Version version, TimeValue frequency) { this.transformId = transformId; this.version = version == null ? Version.V_7_2_0 : version; + this.frequency = frequency; } public DataFrameTransform(StreamInput in) throws IOException { @@ -52,6 +58,11 @@ public class DataFrameTransform extends AbstractDiffable imp } else { this.version = Version.V_7_2_0; } + if (in.getVersion().onOrAfter(Version.V_7_3_0)) { + this.frequency = in.readOptionalTimeValue(); + } else { + this.frequency = null; + } } @Override @@ -70,6 +81,9 @@ public class DataFrameTransform extends AbstractDiffable imp if (out.getVersion().onOrAfter(Version.V_7_3_0)) { Version.writeVersion(version, out); } + if (out.getVersion().onOrAfter(Version.V_7_3_0)) { + out.writeOptionalTimeValue(frequency); + } } @Override @@ -77,6 +91,9 @@ public class DataFrameTransform extends AbstractDiffable imp builder.startObject(); builder.field(DataFrameField.ID.getPreferredName(), transformId); builder.field(VERSION.getPreferredName(), version); + if (frequency != null) { + builder.field(FREQUENCY.getPreferredName(), frequency.getStringRep()); + } builder.endObject(); return builder; } @@ -89,6 +106,10 @@ public class DataFrameTransform extends AbstractDiffable imp return version; } + public TimeValue getFrequency() { + return frequency; + } + public static DataFrameTransform fromXContent(XContentParser parser) throws IOException { return PARSER.parse(parser, null); } @@ -105,11 +126,13 @@ public class DataFrameTransform extends AbstractDiffable imp DataFrameTransform that = (DataFrameTransform) other; - return Objects.equals(this.transformId, that.transformId) && Objects.equals(this.version, that.version); + return Objects.equals(this.transformId, that.transformId) + && Objects.equals(this.version, that.version) + && Objects.equals(this.frequency, that.frequency); } @Override public int hashCode() { - return Objects.hash(transformId, version); + return Objects.hash(transformId, version, frequency); } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java index e3ad50d9b88..54d0ff72983 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformConfig.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.ConstructingObjectParser; import org.elasticsearch.common.xcontent.ObjectParser; import org.elasticsearch.common.xcontent.ToXContentObject; @@ -56,6 +57,7 @@ public class DataFrameTransformConfig extends AbstractDiffable headers = (Map) args[5]; + Map headers = (Map) args[6]; - PivotConfig pivotConfig = (PivotConfig) args[6]; - String description = (String)args[7]; + PivotConfig pivotConfig = (PivotConfig) args[7]; + String description = (String)args[8]; return new DataFrameTransformConfig(id, source, dest, + frequency, syncConfig, headers, pivotConfig, description, - (Instant)args[8], - (String)args[9]); + (Instant)args[9], + (String)args[10]); }); parser.declareString(optionalConstructorArg(), DataFrameField.ID); parser.declareObject(constructorArg(), (p, c) -> SourceConfig.fromXContent(p, lenient), DataFrameField.SOURCE); parser.declareObject(constructorArg(), (p, c) -> DestConfig.fromXContent(p, lenient), DataFrameField.DESTINATION); + parser.declareString(optionalConstructorArg(), DataFrameField.FREQUENCY); parser.declareObject(optionalConstructorArg(), (p, c) -> parseSyncConfig(p, lenient), DataFrameField.SYNC); @@ -146,6 +153,7 @@ public class DataFrameTransformConfig extends AbstractDiffable headers, final PivotConfig pivotConfig, @@ -155,6 +163,7 @@ public class DataFrameTransformConfig extends AbstractDiffable headers, final PivotConfig pivotConfig, final String description) { - this(id, source, dest, syncConfig, headers, pivotConfig, description, null, null); + this(id, source, dest, frequency, syncConfig, headers, pivotConfig, description, null, null); } public DataFrameTransformConfig(final StreamInput in) throws IOException { id = in.readString(); source = new SourceConfig(in); dest = new DestConfig(in); + if (in.getVersion().onOrAfter(Version.V_7_3_0)) { + frequency = in.readOptionalTimeValue(); + } else { + frequency = null; + } setHeaders(in.readMap(StreamInput::readString, StreamInput::readString)); pivotConfig = in.readOptionalWriteable(PivotConfig::new); description = in.readOptionalString(); @@ -211,6 +226,10 @@ public class DataFrameTransformConfig extends AbstractDiffable createDataFrameTransformConfigFromString(pivotTransform, "test_header_injection")); } - public void testPreventCreateTimeInjection() throws IOException { + public void testPreventCreateTimeInjection() { String pivotTransform = "{" + " \"create_time\" : " + Instant.now().toEpochMilli() + " }," + " \"source\" : {\"index\":\"src\"}," @@ -188,7 +191,7 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT () -> createDataFrameTransformConfigFromString(pivotTransform, "test_createTime_injection")); } - public void testPreventVersionInjection() throws IOException { + public void testPreventVersionInjection() { String pivotTransform = "{" + " \"version\" : \"7.3.0\"," + " \"source\" : {\"index\":\"src\"}," @@ -229,11 +232,11 @@ public class DataFrameTransformConfigTests extends AbstractSerializingDataFrameT public void testMaxLengthDescription() { IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> new DataFrameTransformConfig("id", - randomSourceConfig(), randomDestConfig(), null, null, PivotConfigTests.randomPivotConfig(), randomAlphaOfLength(1001))); + randomSourceConfig(), randomDestConfig(), null, null, null, PivotConfigTests.randomPivotConfig(), randomAlphaOfLength(1001))); assertThat(exception.getMessage(), equalTo("[description] must be less than 1000 characters in length.")); String description = randomAlphaOfLength(1000); DataFrameTransformConfig config = new DataFrameTransformConfig("id", - randomSourceConfig(), randomDestConfig(), null, null, PivotConfigTests.randomPivotConfig(), description); + randomSourceConfig(), randomDestConfig(), null, null, null, PivotConfigTests.randomPivotConfig(), description); assertThat(description, equalTo(config.getDescription())); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTests.java index d7463a6df71..ce830240c63 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/dataframe/transforms/DataFrameTransformTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable.Reader; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; @@ -25,7 +26,8 @@ public class DataFrameTransformTests extends AbstractSerializingDataFrameTestCas @Override protected DataFrameTransform createTestInstance() { - return new DataFrameTransform(randomAlphaOfLength(10), randomBoolean() ? null : Version.CURRENT); + return new DataFrameTransform(randomAlphaOfLength(10), randomBoolean() ? null : Version.CURRENT, + randomBoolean() ? null : TimeValue.timeValueMillis(randomIntBetween(1_000, 3_600_000))); } @Override diff --git a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java index 5eef30fda81..b8337a9758f 100644 --- a/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java +++ b/x-pack/plugin/data-frame/qa/multi-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameIntegTestCase.java @@ -210,6 +210,7 @@ abstract class DataFrameIntegTestCase extends ESRestTestCase { .setId(id) .setSource(SourceConfig.builder().setIndex(sourceIndices).setQueryConfig(createQueryConfig(queryBuilder)).build()) .setDest(DestConfig.builder().setIndex(destinationIndex).build()) + .setFrequency(TimeValue.timeValueSeconds(10)) .setPivotConfig(createPivotConfig(groups, aggregations)) .setDescription("Test data frame transform config id: " + id); } diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java index a45fee6d966..fd6d21db045 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameGetAndGetStatsIT.java @@ -214,6 +214,7 @@ public class DataFrameGetAndGetStatsIT extends DataFrameRestTestCase { final Request createDataframeTransformRequest = createRequestWithAuth("PUT", DATAFRAME_ENDPOINT + transformId, null); String config = "{ \"dest\": {\"index\":\"" + transformDest + "\"}," + " \"source\": {\"index\":\"" + transformSrc + "\"}," + + " \"frequency\": \"1s\"," + " \"sync\": {\"time\":{\"field\": \"timestamp\", \"delay\": \"1s\"}}," + " \"pivot\": {" + " \"group_by\": {" diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java index f2cd95ed1a9..4fb8ea6fafd 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFramePivotRestIT.java @@ -141,6 +141,7 @@ public class DataFramePivotRestIT extends DataFrameRestTestCase { String config = "{" + " \"source\": {\"index\":\"" + indexName + "\"}," + " \"dest\": {\"index\":\"" + dataFrameIndex + "\"}," + + " \"frequency\": \"1s\"," + " \"sync\": {\"time\": {\"field\": \"timestamp\", \"delay\": \"1s\"}}," + " \"pivot\": {" + " \"group_by\": {" diff --git a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java index 8a90cd71bb8..b2db7783cfd 100644 --- a/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java +++ b/x-pack/plugin/data-frame/qa/single-node-tests/src/test/java/org/elasticsearch/xpack/dataframe/integration/DataFrameTransformProgressIT.java @@ -132,6 +132,7 @@ public class DataFrameTransformProgressIT extends ESRestTestCase { destConfig, null, null, + null, pivotConfig, null); @@ -156,6 +157,7 @@ public class DataFrameTransformProgressIT extends ESRestTestCase { destConfig, null, null, + null, pivotConfig, null); @@ -175,6 +177,7 @@ public class DataFrameTransformProgressIT extends ESRestTestCase { destConfig, null, null, + null, pivotConfig, null); diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java index 20f83daf8cc..8c1d942ec97 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/action/TransportStartDataFrameTransformAction.java @@ -185,7 +185,7 @@ public class TransportStartDataFrameTransformAction extends return; } - transformTaskHolder.set(createDataFrameTransform(config.getId(), config.getVersion())); + transformTaskHolder.set(createDataFrameTransform(config.getId(), config.getVersion(), config.getFrequency())); final String destinationIndex = config.getDestination().getIndex(); String[] dest = indexNameExpressionResolver.concreteIndexNames(state, IndicesOptions.lenientExpandOpen(), @@ -254,8 +254,8 @@ public class TransportStartDataFrameTransformAction extends return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); } - private static DataFrameTransform createDataFrameTransform(String transformId, Version transformVersion) { - return new DataFrameTransform(transformId, transformVersion); + private static DataFrameTransform createDataFrameTransform(String transformId, Version transformVersion, TimeValue frequency) { + return new DataFrameTransform(transformId, transformVersion, frequency); } @SuppressWarnings("unchecked") diff --git a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java index ea67da79620..57cb468fdd8 100644 --- a/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java +++ b/x-pack/plugin/data-frame/src/main/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformTask.java @@ -20,6 +20,7 @@ import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.persistent.AllocatedPersistentTask; import org.elasticsearch.persistent.PersistentTasksCustomMetaData; @@ -59,8 +60,8 @@ import java.util.concurrent.atomic.AtomicReference; public class DataFrameTransformTask extends AllocatedPersistentTask implements SchedulerEngine.Listener { - // interval the scheduler sends an event - private static final int SCHEDULER_NEXT_MILLISECONDS = 10000; + // Default interval the scheduler sends an event if the config does not specify a frequency + private static final long SCHEDULER_NEXT_MILLISECONDS = 60000; private static final Logger logger = LogManager.getLogger(DataFrameTransformTask.class); // TODO consider moving to dynamic cluster setting private static final int MAX_CONTINUOUS_FAILURES = 10; @@ -363,7 +364,8 @@ public class DataFrameTransformTask extends AllocatedPersistentTask implements S private SchedulerEngine.Schedule next() { return (startTime, now) -> { - return now + SCHEDULER_NEXT_MILLISECONDS; + TimeValue frequency = transform.getFrequency(); + return now + (frequency == null ? SCHEDULER_NEXT_MILLISECONDS : frequency.getMillis()); }; } diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/DataFrameNodesTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/DataFrameNodesTests.java index 276d3af39ef..d13c33fe9aa 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/DataFrameNodesTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/action/DataFrameNodesTests.java @@ -31,10 +31,10 @@ public class DataFrameNodesTests extends ESTestCase { PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder(); tasksBuilder.addTask(dataFrameIdFoo, - DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdFoo, Version.CURRENT), + DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdFoo, Version.CURRENT, null), new PersistentTasksCustomMetaData.Assignment("node-1", "test assignment")); tasksBuilder.addTask(dataFrameIdBar, - DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdBar, Version.CURRENT), + DataFrameField.TASK_NAME, new DataFrameTransform(dataFrameIdBar, Version.CURRENT, null), new PersistentTasksCustomMetaData.Assignment("node-2", "test assignment")); tasksBuilder.addTask("test-task1", "testTasks", new PersistentTaskParams() { @Override diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java index e3e9ff81eb6..03a34b01334 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameIndexerTests.java @@ -186,6 +186,7 @@ public class DataFrameIndexerTests extends ESTestCase { randomDestConfig(), null, null, + null, new PivotConfig(GroupConfigTests.randomGroupConfig(), AggregationConfigTests.randomAggregationConfig(), pageSize), randomBoolean() ? null : randomAlphaOfLengthBetween(1, 1000)); AtomicReference state = new AtomicReference<>(IndexerState.STOPPED); diff --git a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java index 59042bc5d89..1186c3972b0 100644 --- a/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java +++ b/x-pack/plugin/data-frame/src/test/java/org/elasticsearch/xpack/dataframe/transforms/DataFrameTransformPersistentTasksExecutorTests.java @@ -52,15 +52,15 @@ public class DataFrameTransformPersistentTasksExecutorTests extends ESTestCase { PersistentTasksCustomMetaData.Builder pTasksBuilder = PersistentTasksCustomMetaData.builder() .addTask("data-frame-task-1", DataFrameTransform.NAME, - new DataFrameTransform("data-frame-task-1", Version.CURRENT), + new DataFrameTransform("data-frame-task-1", Version.CURRENT, null), new PersistentTasksCustomMetaData.Assignment("current-data-node-with-1-tasks", "")) .addTask("data-frame-task-2", DataFrameTransform.NAME, - new DataFrameTransform("data-frame-task-2", Version.CURRENT), + new DataFrameTransform("data-frame-task-2", Version.CURRENT, null), new PersistentTasksCustomMetaData.Assignment("current-data-node-with-2-tasks", "")) .addTask("data-frame-task-3", DataFrameTransform.NAME, - new DataFrameTransform("data-frame-task-3", Version.CURRENT), + new DataFrameTransform("data-frame-task-3", Version.CURRENT, null), new PersistentTasksCustomMetaData.Assignment("current-data-node-with-2-tasks", "")); PersistentTasksCustomMetaData pTasks = pTasksBuilder.build(); @@ -106,9 +106,9 @@ public class DataFrameTransformPersistentTasksExecutorTests extends ESTestCase { new DataFrameAuditor(client, ""), mock(ThreadPool.class)); - assertThat(executor.getAssignment(new DataFrameTransform("new-task-id", Version.CURRENT), cs).getExecutorNode(), + assertThat(executor.getAssignment(new DataFrameTransform("new-task-id", Version.CURRENT, null), cs).getExecutorNode(), equalTo("current-data-node-with-1-tasks")); - assertThat(executor.getAssignment(new DataFrameTransform("new-old-task-id", Version.V_7_2_0), cs).getExecutorNode(), + assertThat(executor.getAssignment(new DataFrameTransform("new-old-task-id", Version.V_7_2_0, null), cs).getExecutorNode(), equalTo("past-data-node-1")); } diff --git a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml index bfde8128b49..4d207d2750a 100644 --- a/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml +++ b/x-pack/plugin/src/test/resources/rest-api-spec/test/data_frame/transforms_crud.yml @@ -42,6 +42,40 @@ setup: data_frame.delete_data_frame_transform: transform_id: "missing transform" +--- +"Test put transform with frequency too low": + - do: + catch: /minimum permitted \[frequency\] is \[1s\]/ + data_frame.put_data_frame_transform: + transform_id: "frequency-too-low" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-dest" }, + "frequency": "999ms", + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + +--- +"Test put transform with frequency too high": + - do: + catch: /highest permitted \[frequency\] is \[1h\]/ + data_frame.put_data_frame_transform: + transform_id: "frequency-too-low" + body: > + { + "source": { "index": "airline-data" }, + "dest": { "index": "airline-dest" }, + "frequency": "3600001ms", + "pivot": { + "group_by": { "airline": {"terms": {"field": "airline"}}}, + "aggs": {"avg_response": {"avg": {"field": "responsetime"}}} + } + } + --- "Test put transform with invalid source index": - do: