From f7ada9b29b6074d17f8f969171a3fc733877690f Mon Sep 17 00:00:00 2001 From: Benjamin Trent Date: Thu, 15 Nov 2018 13:32:45 -0600 Subject: [PATCH] Add delayed datacheck to the datafeed job runner (#35387) * ML: Adding missing datacheck to datafeedjob * Adding client side and docs * Making adjustments to validations * Making values default to on, having more sensible limits * Intermittent commit, still need to figure out interval * Adjusting delayed data check interval * updating docs * Making parameter Boolean, so it is nullable * bumping bwc to 7 before backport * changing to version current * moving delayed data check config its own object * Separation of duties for delayed data detection * fixing checkstyles * fixing checkstyles * Adjusting default behavior so that null windows are allowed * Mentioning the default value * Fixing comments, syncing up validations --- .../client/ml/datafeed/DatafeedConfig.java | 37 ++++- .../client/ml/datafeed/DatafeedUpdate.java | 26 +++- .../ml/datafeed/DelayedDataCheckConfig.java | 130 ++++++++++++++++++ .../MlClientDocumentationIT.java | 9 ++ .../ml/datafeed/DatafeedConfigTests.java | 3 + .../ml/datafeed/DatafeedUpdateTests.java | 3 + .../datafeed/DelayedDataCheckConfigTests.java | 65 +++++++++ .../high-level/ml/put-datafeed.asciidoc | 11 ++ .../ml/apis/datafeedresource.asciidoc | 26 ++++ docs/reference/ml/apis/put-datafeed.asciidoc | 4 + .../core/ml/datafeed/DatafeedConfig.java | 39 +++++- .../ml/datafeed/DatafeedJobValidator.java | 24 ++++ .../core/ml/datafeed/DatafeedUpdate.java | 36 ++++- .../ml/datafeed/DelayedDataCheckConfig.java | 127 +++++++++++++++++ .../xpack/core/ml/job/messages/Messages.java | 10 ++ .../xpack/core/ml/utils/time/TimeUtils.java | 24 +++- .../core/ml/datafeed/DatafeedConfigTests.java | 3 + .../core/ml/datafeed/DatafeedUpdateTests.java | 6 + .../datafeed/DelayedDataCheckConfigTests.java | 95 +++++++++++++ .../ml/job/config/AnalysisConfigTests.java | 2 +- .../ml/integration/DelayedDataDetectorIT.java | 48 +++++-- .../xpack/ml/datafeed/DatafeedJob.java | 62 ++++++++- .../xpack/ml/datafeed/DatafeedJobBuilder.java | 5 +- .../DatafeedDelayedDataDetector.java} | 84 +++++------ .../delayeddatacheck/DelayedDataDetector.java | 14 ++ .../DelayedDataDetectorFactory.java | 125 +++++++++++++++++ .../NullDelayedDataDetector.java | 35 +++++ .../xpack/ml/datafeed/DatafeedJobTests.java | 44 +++++- .../datafeed/DatafeedJobValidatorTests.java | 25 ++++ .../ml/datafeed/DelayedDataDetectorTests.java | 76 ---------- .../DelayedDataDetectorFactoryTests.java | 103 ++++++++++++++ 31 files changed, 1136 insertions(+), 165 deletions(-) create mode 100644 client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DelayedDataCheckConfig.java create mode 100644 client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DelayedDataCheckConfigTests.java create mode 100644 x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DelayedDataCheckConfig.java create mode 100644 x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DelayedDataCheckConfigTests.java rename x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/{DelayedDataDetector.java => delayeddatacheck/DatafeedDelayedDataDetector.java} (72%) create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetector.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java create mode 100644 x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/NullDelayedDataDetector.java delete mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetectorTests.java create mode 100644 x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactoryTests.java diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java index 84deae61f8e..4b9bc8abf53 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedConfig.java @@ -62,6 +62,7 @@ public class DatafeedConfig implements ToXContentObject { public static final ParseField AGGREGATIONS = new ParseField("aggregations"); public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields"); public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config"); + public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config"); public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( "datafeed_config", true, a -> new Builder((String)a[0], (String)a[1])); @@ -88,6 +89,7 @@ public class DatafeedConfig implements ToXContentObject { }, SCRIPT_FIELDS); PARSER.declareInt(Builder::setScrollSize, SCROLL_SIZE); PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, CHUNKING_CONFIG); + PARSER.declareObject(Builder::setDelayedDataCheckConfig, DelayedDataCheckConfig.PARSER, DELAYED_DATA_CHECK_CONFIG); } private static BytesReference parseBytes(XContentParser parser) throws IOException { @@ -107,10 +109,12 @@ public class DatafeedConfig implements ToXContentObject { private final List scriptFields; private final Integer scrollSize; private final ChunkingConfig chunkingConfig; + private final DelayedDataCheckConfig delayedDataCheckConfig; + private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, List types, BytesReference query, BytesReference aggregations, List scriptFields, - Integer scrollSize, ChunkingConfig chunkingConfig) { + Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -122,6 +126,7 @@ public class DatafeedConfig implements ToXContentObject { this.scriptFields = scriptFields == null ? null : Collections.unmodifiableList(scriptFields); this.scrollSize = scrollSize; this.chunkingConfig = chunkingConfig; + this.delayedDataCheckConfig = delayedDataCheckConfig; } public String getId() { @@ -168,6 +173,10 @@ public class DatafeedConfig implements ToXContentObject { return chunkingConfig; } + public DelayedDataCheckConfig getDelayedDataCheckConfig() { + return delayedDataCheckConfig; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); @@ -204,6 +213,9 @@ public class DatafeedConfig implements ToXContentObject { if (chunkingConfig != null) { builder.field(CHUNKING_CONFIG.getPreferredName(), chunkingConfig); } + if (delayedDataCheckConfig != null) { + builder.field(DELAYED_DATA_CHECK_CONFIG.getPreferredName(), delayedDataCheckConfig); + } builder.endObject(); return builder; @@ -244,7 +256,8 @@ public class DatafeedConfig implements ToXContentObject { && Objects.equals(this.scrollSize, that.scrollSize) && Objects.equals(asMap(this.aggregations), asMap(that.aggregations)) && Objects.equals(this.scriptFields, that.scriptFields) - && Objects.equals(this.chunkingConfig, that.chunkingConfig); + && Objects.equals(this.chunkingConfig, that.chunkingConfig) + && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig); } /** @@ -255,7 +268,7 @@ public class DatafeedConfig implements ToXContentObject { @Override public int hashCode() { return Objects.hash(id, jobId, frequency, queryDelay, indices, types, asMap(query), scrollSize, asMap(aggregations), scriptFields, - chunkingConfig); + chunkingConfig, delayedDataCheckConfig); } public static Builder builder(String id, String jobId) { @@ -275,6 +288,7 @@ public class DatafeedConfig implements ToXContentObject { private List scriptFields; private Integer scrollSize; private ChunkingConfig chunkingConfig; + private DelayedDataCheckConfig delayedDataCheckConfig; public Builder(String id, String jobId) { this.id = Objects.requireNonNull(id, ID.getPreferredName()); @@ -293,6 +307,7 @@ public class DatafeedConfig implements ToXContentObject { this.scriptFields = config.scriptFields; this.scrollSize = config.scrollSize; this.chunkingConfig = config.chunkingConfig; + this.delayedDataCheckConfig = config.getDelayedDataCheckConfig(); } public Builder setIndices(List indices) { @@ -366,9 +381,23 @@ public class DatafeedConfig implements ToXContentObject { return this; } + /** + * This sets the {@link DelayedDataCheckConfig} settings. + * + * See {@link DelayedDataCheckConfig} for more information. + * + * @param delayedDataCheckConfig the delayed data check configuration + * Default value is enabled, with `check_window` being null. This means the true window is + * calculated when the real-time Datafeed runs. + */ + public Builder setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheckConfig) { + this.delayedDataCheckConfig = delayedDataCheckConfig; + return this; + } + public DatafeedConfig build() { return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize, - chunkingConfig); + chunkingConfig, delayedDataCheckConfig); } private static BytesReference xContentToBytes(ToXContentObject object) throws IOException { diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java index 119f70fc797..5daacdd9a05 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DatafeedUpdate.java @@ -77,6 +77,9 @@ public class DatafeedUpdate implements ToXContentObject { }, DatafeedConfig.SCRIPT_FIELDS); PARSER.declareInt(Builder::setScrollSize, DatafeedConfig.SCROLL_SIZE); PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, DatafeedConfig.CHUNKING_CONFIG); + PARSER.declareObject(Builder::setDelayedDataCheckConfig, + DelayedDataCheckConfig.PARSER, + DatafeedConfig.DELAYED_DATA_CHECK_CONFIG); } private static BytesReference parseBytes(XContentParser parser) throws IOException { @@ -96,10 +99,11 @@ public class DatafeedUpdate implements ToXContentObject { private final List scriptFields; private final Integer scrollSize; private final ChunkingConfig chunkingConfig; + private final DelayedDataCheckConfig delayedDataCheckConfig; private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, List types, BytesReference query, BytesReference aggregations, List scriptFields, - Integer scrollSize, ChunkingConfig chunkingConfig) { + Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -111,6 +115,7 @@ public class DatafeedUpdate implements ToXContentObject { this.scriptFields = scriptFields; this.scrollSize = scrollSize; this.chunkingConfig = chunkingConfig; + this.delayedDataCheckConfig = delayedDataCheckConfig; } /** @@ -146,6 +151,9 @@ public class DatafeedUpdate implements ToXContentObject { } builder.endObject(); } + if (delayedDataCheckConfig != null) { + builder.field(DatafeedConfig.DELAYED_DATA_CHECK_CONFIG.getPreferredName(), delayedDataCheckConfig); + } addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize); addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig); builder.endObject(); @@ -198,6 +206,10 @@ public class DatafeedUpdate implements ToXContentObject { return chunkingConfig; } + public DelayedDataCheckConfig getDelayedDataCheckConfig() { + return delayedDataCheckConfig; + } + private static Map asMap(BytesReference bytesReference) { return bytesReference == null ? null : XContentHelper.convertToMap(bytesReference, true, XContentType.JSON).v2(); } @@ -232,6 +244,7 @@ public class DatafeedUpdate implements ToXContentObject { && Objects.equals(asMap(this.query), asMap(that.query)) && Objects.equals(this.scrollSize, that.scrollSize) && Objects.equals(asMap(this.aggregations), asMap(that.aggregations)) + && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig) && Objects.equals(this.scriptFields, that.scriptFields) && Objects.equals(this.chunkingConfig, that.chunkingConfig); } @@ -244,7 +257,7 @@ public class DatafeedUpdate implements ToXContentObject { @Override public int hashCode() { return Objects.hash(id, jobId, frequency, queryDelay, indices, types, asMap(query), scrollSize, asMap(aggregations), scriptFields, - chunkingConfig); + chunkingConfig, delayedDataCheckConfig); } public static Builder builder(String id) { @@ -264,6 +277,7 @@ public class DatafeedUpdate implements ToXContentObject { private List scriptFields; private Integer scrollSize; private ChunkingConfig chunkingConfig; + private DelayedDataCheckConfig delayedDataCheckConfig; public Builder(String id) { this.id = Objects.requireNonNull(id, DatafeedConfig.ID.getPreferredName()); @@ -281,6 +295,7 @@ public class DatafeedUpdate implements ToXContentObject { this.scriptFields = config.scriptFields; this.scrollSize = config.scrollSize; this.chunkingConfig = config.chunkingConfig; + this.delayedDataCheckConfig = config.delayedDataCheckConfig; } public Builder setJobId(String jobId) { @@ -359,9 +374,14 @@ public class DatafeedUpdate implements ToXContentObject { return this; } + public Builder setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheckConfig) { + this.delayedDataCheckConfig = delayedDataCheckConfig; + return this; + } + public DatafeedUpdate build() { return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize, - chunkingConfig); + chunkingConfig, delayedDataCheckConfig); } private static BytesReference xContentToBytes(ToXContentObject object) throws IOException { diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DelayedDataCheckConfig.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DelayedDataCheckConfig.java new file mode 100644 index 00000000000..43dd2c9a5a9 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/ml/datafeed/DelayedDataCheckConfig.java @@ -0,0 +1,130 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client.ml.datafeed; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; + +import java.io.IOException; +import java.util.Objects; + +/** + * The configuration object containing the delayed data check settings. + * + * See {@link DelayedDataCheckConfig#enabledDelayedDataCheckConfig(TimeValue)} for creating a new + * enabled datacheck with the given check_window + * + * See {@link DelayedDataCheckConfig#disabledDelayedDataCheckConfig()} for creating a config for disabling + * delayed data checking. + */ +public class DelayedDataCheckConfig implements ToXContentObject { + + public static final ParseField ENABLED = new ParseField("enabled"); + public static final ParseField CHECK_WINDOW = new ParseField("check_window"); + + // These parsers follow the pattern that metadata is parsed leniently (to allow for enhancements), whilst config is parsed strictly + public static final ConstructingObjectParser PARSER = new ConstructingObjectParser<>( + "delayed_data_check_config", true, a -> new DelayedDataCheckConfig((Boolean) a[0], (TimeValue) a[1])); + static { + PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED); + PARSER.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> { + if (p.currentToken() == XContentParser.Token.VALUE_STRING) { + return TimeValue.parseTimeValue(p.text(), CHECK_WINDOW.getPreferredName()); + } + throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); + }, CHECK_WINDOW, ObjectParser.ValueType.STRING); + } + + /** + * This creates a new DelayedDataCheckConfig that has a check_window of the passed `timeValue` + * + * We query the index to the latest finalized bucket from this TimeValue in the past looking to see if any data has been indexed + * since the data was read with the Datafeed. + * + * The window must be larger than the {@link org.elasticsearch.client.ml.job.config.AnalysisConfig#bucketSpan}, less than + * 24 hours, and span less than 10,000x buckets. + * + * + * @param timeValue The time length in the past from the latest finalized bucket to look for latent data. + * If `null` is provided, the appropriate window is calculated when it is used + **/ + public static DelayedDataCheckConfig enabledDelayedDataCheckConfig(TimeValue timeValue) { + return new DelayedDataCheckConfig(true, timeValue); + } + + /** + * This creates a new DelayedDataCheckConfig that disables the data check. + */ + public static DelayedDataCheckConfig disabledDelayedDataCheckConfig() { + return new DelayedDataCheckConfig(false, null); + } + + private final boolean enabled; + private final TimeValue checkWindow; + + DelayedDataCheckConfig(Boolean enabled, TimeValue checkWindow) { + this.enabled = enabled; + this.checkWindow = checkWindow; + } + + public boolean isEnabled() { + return enabled; + } + + @Nullable + public TimeValue getCheckWindow() { + return checkWindow; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(ENABLED.getPreferredName(), enabled); + if (checkWindow != null) { + builder.field(CHECK_WINDOW.getPreferredName(), checkWindow.getStringRep()); + } + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(enabled, checkWindow); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + DelayedDataCheckConfig other = (DelayedDataCheckConfig) obj; + return Objects.equals(this.enabled, other.enabled) && Objects.equals(this.checkWindow, other.checkWindow); + } + +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java index 3b1352b1706..556e25a2b0e 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/documentation/MlClientDocumentationIT.java @@ -95,6 +95,7 @@ import org.elasticsearch.client.ml.datafeed.ChunkingConfig; import org.elasticsearch.client.ml.datafeed.DatafeedConfig; import org.elasticsearch.client.ml.datafeed.DatafeedStats; import org.elasticsearch.client.ml.datafeed.DatafeedUpdate; +import org.elasticsearch.client.ml.datafeed.DelayedDataCheckConfig; import org.elasticsearch.client.ml.job.config.AnalysisConfig; import org.elasticsearch.client.ml.job.config.AnalysisLimits; import org.elasticsearch.client.ml.job.config.DataDescription; @@ -583,6 +584,14 @@ public class MlClientDocumentationIT extends ESRestHighLevelClientTestCase { datafeedBuilder.setQueryDelay(TimeValue.timeValueMinutes(1)); // <1> // end::put-datafeed-config-set-query-delay + // tag::put-datafeed-config-set-delayed-data-check-config + datafeedBuilder.setDelayedDataCheckConfig(DelayedDataCheckConfig + .enabledDelayedDataCheckConfig(TimeValue.timeValueHours(1))); // <1> + // end::put-datafeed-config-set-delayed-data-check-config + + // no need to accidentally trip internal validations due to job bucket size + datafeedBuilder.setDelayedDataCheckConfig(null); + List scriptFields = Collections.emptyList(); // tag::put-datafeed-config-set-script-fields datafeedBuilder.setScriptFields(scriptFields); // <1> diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java index 7f92d1690f9..c7fb1c80388 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/ml/datafeed/DatafeedConfigTests.java @@ -103,6 +103,9 @@ public class DatafeedConfigTests extends AbstractXContentTestCase { + + @Override + protected DelayedDataCheckConfig createTestInstance() { + return createRandomizedConfig(); + } + + @Override + protected DelayedDataCheckConfig doParseInstance(XContentParser parser) { + return DelayedDataCheckConfig.PARSER.apply(parser, null); + } + + @Override + protected boolean supportsUnknownFields() { + return true; + } + + public void testEnabledDelayedDataCheckConfig() { + DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueHours(5)); + assertThat(delayedDataCheckConfig.isEnabled(), equalTo(true)); + assertThat(delayedDataCheckConfig.getCheckWindow(), equalTo(TimeValue.timeValueHours(5))); + } + + public void testDisabledDelayedDataCheckConfig() { + DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.disabledDelayedDataCheckConfig(); + assertThat(delayedDataCheckConfig.isEnabled(), equalTo(false)); + assertThat(delayedDataCheckConfig.getCheckWindow(), equalTo(null)); + } + + public static DelayedDataCheckConfig createRandomizedConfig() { + boolean enabled = randomBoolean(); + TimeValue timeWindow = null; + if (enabled || randomBoolean()) { + timeWindow = TimeValue.timeValueMillis(randomLongBetween(1, 1_000)); + } + return new DelayedDataCheckConfig(enabled, timeWindow); + } +} + diff --git a/docs/java-rest/high-level/ml/put-datafeed.asciidoc b/docs/java-rest/high-level/ml/put-datafeed.asciidoc index e9f66f0b61d..ed8a089c7be 100644 --- a/docs/java-rest/high-level/ml/put-datafeed.asciidoc +++ b/docs/java-rest/high-level/ml/put-datafeed.asciidoc @@ -63,6 +63,17 @@ include-tagged::{doc-tests-file}[{api}-config-set-query-delay] -------------------------------------------------- <1> The time interval behind real time that data is queried. +["source","java",subs="attributes,callouts,macros"] +-------------------------------------------------- +include-tagged::{doc-tests-file}[{api}-config-set-delayed-data-check-config] +-------------------------------------------------- +<1> Sets the delayed data check configuration. +The window must be larger than the Job's bucket size, but smaller than 24 hours, +and span less than 10,000 buckets. +Defaults to `null`, which causes an appropriate window span to be calculated when +the datafeed runs. +To explicitly disable, pass `DelayedDataCheckConfig.disabledDelayedDataCheckConfig()`. + ["source","java",subs="attributes,callouts,macros"] -------------------------------------------------- include-tagged::{doc-tests-file}[{api}-config-set-script-fields] diff --git a/docs/reference/ml/apis/datafeedresource.asciidoc b/docs/reference/ml/apis/datafeedresource.asciidoc index 6fe0b35d951..73361b12454 100644 --- a/docs/reference/ml/apis/datafeedresource.asciidoc +++ b/docs/reference/ml/apis/datafeedresource.asciidoc @@ -64,6 +64,11 @@ A {dfeed} resource has the following properties: example: `[]`. This property is provided for backwards compatibility with releases earlier than 6.0.0. For more information, see <>. +`delayed_data_check_config`:: + (object) Specifies if and with how large a window should the data feed check + for missing data. See <>. + For example: `{"enabled": true, "check_window": "1h"}` + [[ml-datafeed-chunking-config]] ==== Chunking Configuration Objects @@ -86,6 +91,27 @@ A chunking configuration object has the following properties: This setting is only applicable when the mode is set to `manual`. For example: `3h`. +[[ml-datafeed-delayed-data-check-config]] +==== Delayed Data Check Configuration Objects + +The {dfeed} can optionally search over indices that have already been read in +an effort to find if any data has since been added to the index. If missing data +is found, it is a good indication that the `query_delay` option is set too low and +the data is being indexed after the {dfeed} has passed that moment in time. + +This check only runs on real-time {dfeeds} + +The configuration object has the following properties: + +`enabled`:: + (boolean) Should the {dfeed} periodically check for data being indexed after reading. + Defaults to `true` + +`check_window`:: + (time units) The window of time before the latest finalized bucket that should be searched + for late data. Defaults to `null` which causes an appropriate `check_window` to be calculated + when the real-time {dfeed} runs. + [float] [[ml-datafeed-counts]] ==== {dfeed-cap} Counts diff --git a/docs/reference/ml/apis/put-datafeed.asciidoc b/docs/reference/ml/apis/put-datafeed.asciidoc index b5c99fc8e36..223b88760be 100644 --- a/docs/reference/ml/apis/put-datafeed.asciidoc +++ b/docs/reference/ml/apis/put-datafeed.asciidoc @@ -78,6 +78,10 @@ You must create a job before you create a {dfeed}. You can associate only one For example: `[]`. This property is provided for backwards compatibility with releases earlier than 6.0.0. For more information, see <>. +`delayed_data_check_config`:: + (object) Specifies if and with how large a window should the data feed check + for missing data. See <>. + For more information about these properties, see <>. diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java index b5aac1e6225..bd829a2cdec 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfig.java @@ -84,6 +84,7 @@ public class DatafeedConfig extends AbstractDiffable implements public static final ParseField SOURCE = new ParseField("_source"); public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config"); public static final ParseField HEADERS = new ParseField("headers"); + public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config"); // These parsers follow the pattern that metadata is parsed leniently (to allow for enhancements), whilst config is parsed strictly public static final ObjectParser LENIENT_PARSER = createParser(true); @@ -124,7 +125,9 @@ public class DatafeedConfig extends AbstractDiffable implements // (For config, headers are explicitly transferred from the auth headers by code in the put/update datafeed actions.) parser.declareObject(Builder::setHeaders, (p, c) -> p.mapStrings(), HEADERS); } - + parser.declareObject(Builder::setDelayedDataCheckConfig, + ignoreUnknownFields ? DelayedDataCheckConfig.LENIENT_PARSER : DelayedDataCheckConfig.STRICT_PARSER, + DELAYED_DATA_CHECK_CONFIG); return parser; } @@ -149,10 +152,12 @@ public class DatafeedConfig extends AbstractDiffable implements private final Integer scrollSize; private final ChunkingConfig chunkingConfig; private final Map headers; + private final DelayedDataCheckConfig delayedDataCheckConfig; private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, List types, QueryBuilder query, AggregatorFactories.Builder aggregations, List scriptFields, - Integer scrollSize, ChunkingConfig chunkingConfig, Map headers) { + Integer scrollSize, ChunkingConfig chunkingConfig, Map headers, + DelayedDataCheckConfig delayedDataCheckConfig) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -165,6 +170,7 @@ public class DatafeedConfig extends AbstractDiffable implements this.scrollSize = scrollSize; this.chunkingConfig = chunkingConfig; this.headers = Collections.unmodifiableMap(headers); + this.delayedDataCheckConfig = delayedDataCheckConfig; } public DatafeedConfig(StreamInput in) throws IOException { @@ -196,6 +202,11 @@ public class DatafeedConfig extends AbstractDiffable implements } else { this.headers = Collections.emptyMap(); } + if (in.getVersion().onOrAfter(Version.CURRENT)) { + delayedDataCheckConfig = in.readOptionalWriteable(DelayedDataCheckConfig::new); + } else { + delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig(); + } } public String getId() { @@ -260,6 +271,10 @@ public class DatafeedConfig extends AbstractDiffable implements return headers; } + public DelayedDataCheckConfig getDelayedDataCheckConfig() { + return delayedDataCheckConfig; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(id); @@ -291,6 +306,9 @@ public class DatafeedConfig extends AbstractDiffable implements if (out.getVersion().onOrAfter(Version.V_6_2_0)) { out.writeMap(headers, StreamOutput::writeString, StreamOutput::writeString); } + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeOptionalWriteable(delayedDataCheckConfig); + } } @Override @@ -328,6 +346,9 @@ public class DatafeedConfig extends AbstractDiffable implements if (headers.isEmpty() == false && params.paramAsBoolean(ToXContentParams.FOR_CLUSTER_STATE, false) == true) { builder.field(HEADERS.getPreferredName(), headers); } + if (delayedDataCheckConfig != null) { + builder.field(DELAYED_DATA_CHECK_CONFIG.getPreferredName(), delayedDataCheckConfig); + } return builder; } @@ -359,13 +380,14 @@ public class DatafeedConfig extends AbstractDiffable implements && Objects.equals(this.aggregations, that.aggregations) && Objects.equals(this.scriptFields, that.scriptFields) && Objects.equals(this.chunkingConfig, that.chunkingConfig) - && Objects.equals(this.headers, that.headers); + && Objects.equals(this.headers, that.headers) + && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig); } @Override public int hashCode() { return Objects.hash(id, jobId, frequency, queryDelay, indices, types, query, scrollSize, aggregations, scriptFields, - chunkingConfig, headers); + chunkingConfig, headers, delayedDataCheckConfig); } @Override @@ -438,6 +460,7 @@ public class DatafeedConfig extends AbstractDiffable implements private Integer scrollSize = DEFAULT_SCROLL_SIZE; private ChunkingConfig chunkingConfig; private Map headers = Collections.emptyMap(); + private DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig(); public Builder() { } @@ -461,6 +484,7 @@ public class DatafeedConfig extends AbstractDiffable implements this.scrollSize = config.scrollSize; this.chunkingConfig = config.chunkingConfig; this.headers = config.headers; + this.delayedDataCheckConfig = config.getDelayedDataCheckConfig(); } public void setId(String datafeedId) { @@ -523,6 +547,10 @@ public class DatafeedConfig extends AbstractDiffable implements this.chunkingConfig = chunkingConfig; } + public void setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheckConfig) { + this.delayedDataCheckConfig = delayedDataCheckConfig; + } + public DatafeedConfig build() { ExceptionsHelper.requireNonNull(id, ID.getPreferredName()); ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName()); @@ -535,11 +563,12 @@ public class DatafeedConfig extends AbstractDiffable implements if (types == null || types.contains(null) || types.contains("")) { throw invalidOptionValue(TYPES.getPreferredName(), types); } + validateAggregations(); setDefaultChunkingConfig(); setDefaultQueryDelay(); return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize, - chunkingConfig, headers); + chunkingConfig, headers, delayedDataCheckConfig); } void validateAggregations() { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedJobValidator.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedJobValidator.java index b829b3fa443..8a49b955445 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedJobValidator.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedJobValidator.java @@ -31,6 +31,30 @@ public final class DatafeedJobValidator { checkValidHistogramInterval(datafeedConfig, analysisConfig); checkFrequencyIsMultipleOfHistogramInterval(datafeedConfig); } + + DelayedDataCheckConfig delayedDataCheckConfig = datafeedConfig.getDelayedDataCheckConfig(); + TimeValue bucketSpan = analysisConfig.getBucketSpan(); + if (delayedDataCheckConfig.isEnabled()) { + checkValidDelayedDataCheckConfig(bucketSpan, delayedDataCheckConfig); + } + } + + private static void checkValidDelayedDataCheckConfig(TimeValue bucketSpan, DelayedDataCheckConfig delayedDataCheckConfig) { + TimeValue delayedDataCheckWindow = delayedDataCheckConfig.getCheckWindow(); + if (delayedDataCheckWindow != null) { // NULL implies we calculate on use and thus is always valid + if (delayedDataCheckWindow.compareTo(bucketSpan) < 0) { + throw ExceptionsHelper.badRequestException( + Messages.getMessage(Messages.DATAFEED_CONFIG_DELAYED_DATA_CHECK_TOO_SMALL, + delayedDataCheckWindow, + bucketSpan)); + } + if (delayedDataCheckWindow.millis() > bucketSpan.millis() * DelayedDataCheckConfig.MAX_NUMBER_SPANABLE_BUCKETS) { + throw ExceptionsHelper.badRequestException( + Messages.getMessage(Messages.DATAFEED_CONFIG_DELAYED_DATA_CHECK_SPANS_TOO_MANY_BUCKETS, + delayedDataCheckWindow, + bucketSpan)); + } + } } private static void checkSummaryCountFieldNameIsSet(AnalysisConfig analysisConfig) { diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java index d5425bdd1f4..bfc59687bb5 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedUpdate.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.core.ml.datafeed; +import org.elasticsearch.Version; import org.elasticsearch.common.ParseField; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; @@ -68,6 +69,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { }, DatafeedConfig.SCRIPT_FIELDS); PARSER.declareInt(Builder::setScrollSize, DatafeedConfig.SCROLL_SIZE); PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.STRICT_PARSER, DatafeedConfig.CHUNKING_CONFIG); + PARSER.declareObject(Builder::setDelayedDataCheckConfig, + DelayedDataCheckConfig.STRICT_PARSER, + DatafeedConfig.DELAYED_DATA_CHECK_CONFIG); } private final String id; @@ -81,10 +85,11 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { private final List scriptFields; private final Integer scrollSize; private final ChunkingConfig chunkingConfig; + private final DelayedDataCheckConfig delayedDataCheckConfig; private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List indices, List types, QueryBuilder query, AggregatorFactories.Builder aggregations, List scriptFields, - Integer scrollSize, ChunkingConfig chunkingConfig) { + Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) { this.id = id; this.jobId = jobId; this.queryDelay = queryDelay; @@ -96,6 +101,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { this.scriptFields = scriptFields; this.scrollSize = scrollSize; this.chunkingConfig = chunkingConfig; + this.delayedDataCheckConfig = delayedDataCheckConfig; } public DatafeedUpdate(StreamInput in) throws IOException { @@ -122,6 +128,11 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { } this.scrollSize = in.readOptionalVInt(); this.chunkingConfig = in.readOptionalWriteable(ChunkingConfig::new); + if (in.getVersion().onOrAfter(Version.CURRENT)) { + delayedDataCheckConfig = in.readOptionalWriteable(DelayedDataCheckConfig::new); + } else { + delayedDataCheckConfig = null; + } } /** @@ -159,6 +170,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { } out.writeOptionalVInt(scrollSize); out.writeOptionalWriteable(chunkingConfig); + if (out.getVersion().onOrAfter(Version.CURRENT)) { + out.writeOptionalWriteable(delayedDataCheckConfig); + } } @Override @@ -185,6 +199,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { } addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize); addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig); + addOptionalField(builder, DatafeedConfig.DELAYED_DATA_CHECK_CONFIG, delayedDataCheckConfig); builder.endObject(); return builder; } @@ -250,6 +265,10 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { return chunkingConfig; } + public DelayedDataCheckConfig getDelayedDataCheckConfig() { + return delayedDataCheckConfig; + } + /** * Applies the update to the given {@link DatafeedConfig} * @return a new {@link DatafeedConfig} that contains the update @@ -290,6 +309,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { if (chunkingConfig != null) { builder.setChunkingConfig(chunkingConfig); } + if (delayedDataCheckConfig != null) { + builder.setDelayedDataCheckConfig(delayedDataCheckConfig); + } if (headers.isEmpty() == false) { // Adjust the request, adding security headers from the current thread context @@ -328,6 +350,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { && Objects.equals(this.query, that.query) && Objects.equals(this.scrollSize, that.scrollSize) && Objects.equals(this.aggregations, that.aggregations) + && Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig) && Objects.equals(this.scriptFields, that.scriptFields) && Objects.equals(this.chunkingConfig, that.chunkingConfig); } @@ -335,7 +358,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { @Override public int hashCode() { return Objects.hash(id, jobId, frequency, queryDelay, indices, types, query, scrollSize, aggregations, scriptFields, - chunkingConfig); + chunkingConfig, delayedDataCheckConfig); } @Override @@ -352,6 +375,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { && (scrollSize == null || Objects.equals(scrollSize, datafeed.getQueryDelay())) && (aggregations == null || Objects.equals(aggregations, datafeed.getAggregations())) && (scriptFields == null || Objects.equals(scriptFields, datafeed.getScriptFields())) + && (delayedDataCheckConfig == null || Objects.equals(delayedDataCheckConfig, datafeed.getDelayedDataCheckConfig())) && (chunkingConfig == null || Objects.equals(chunkingConfig, datafeed.getChunkingConfig())); } @@ -368,6 +392,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { private List scriptFields; private Integer scrollSize; private ChunkingConfig chunkingConfig; + private DelayedDataCheckConfig delayedDataCheckConfig; public Builder() { } @@ -388,6 +413,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { this.scriptFields = config.scriptFields; this.scrollSize = config.scrollSize; this.chunkingConfig = config.chunkingConfig; + this.delayedDataCheckConfig = config.delayedDataCheckConfig; } public void setId(String datafeedId) { @@ -428,6 +454,10 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { this.scriptFields = sorted; } + public void setDelayedDataCheckConfig(DelayedDataCheckConfig delayedDataCheckConfig) { + this.delayedDataCheckConfig = delayedDataCheckConfig; + } + public void setScrollSize(int scrollSize) { this.scrollSize = scrollSize; } @@ -438,7 +468,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject { public DatafeedUpdate build() { return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, types, query, aggregations, scriptFields, scrollSize, - chunkingConfig); + chunkingConfig, delayedDataCheckConfig); } } } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DelayedDataCheckConfig.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DelayedDataCheckConfig.java new file mode 100644 index 00000000000..9406b91d119 --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/datafeed/DelayedDataCheckConfig.java @@ -0,0 +1,127 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.core.ml.datafeed; + +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ConstructingObjectParser; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.ToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils; + +import java.io.IOException; +import java.util.Objects; + +public class DelayedDataCheckConfig implements ToXContentObject, Writeable { + + public static final TimeValue MAX_DELAYED_DATA_WINDOW = TimeValue.timeValueHours(24); + public static final int MAX_NUMBER_SPANABLE_BUCKETS = 10_000; + + public static final ParseField ENABLED = new ParseField("enabled"); + public static final ParseField CHECK_WINDOW = new ParseField("check_window"); + + // These parsers follow the pattern that metadata is parsed leniently (to allow for enhancements), whilst config is parsed strictly + public static final ConstructingObjectParser LENIENT_PARSER = createParser(true); + public static final ConstructingObjectParser STRICT_PARSER = createParser(false); + + private static ConstructingObjectParser createParser(boolean ignoreUnknownFields) { + ConstructingObjectParser parser = new ConstructingObjectParser<>( + "delayed_data_check_config", ignoreUnknownFields, a -> new DelayedDataCheckConfig((Boolean) a[0], (TimeValue) a[1])); + + parser.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED); + parser.declareField(ConstructingObjectParser.optionalConstructorArg(), p -> { + if (p.currentToken() == XContentParser.Token.VALUE_STRING) { + return TimeValue.parseTimeValue(p.text(), CHECK_WINDOW.getPreferredName()); + } + throw new IllegalArgumentException("Unsupported token [" + p.currentToken() + "]"); + }, CHECK_WINDOW, ObjectParser.ValueType.STRING); + + return parser; + } + + public static DelayedDataCheckConfig defaultDelayedDataCheckConfig() { + return new DelayedDataCheckConfig(true, null); + } + + public static DelayedDataCheckConfig enabledDelayedDataCheckConfig(TimeValue timeValue) { + return new DelayedDataCheckConfig(true, timeValue); + } + + public static DelayedDataCheckConfig disabledDelayedDataCheckConfig() { + return new DelayedDataCheckConfig(false, null); + } + + private final boolean enabled; + private final TimeValue checkWindow; + + DelayedDataCheckConfig(Boolean enabled, TimeValue checkWindow) { + this.enabled = enabled; + if (enabled && checkWindow != null) { + TimeUtils.checkPositive(checkWindow, CHECK_WINDOW); + if (checkWindow.compareTo(MAX_DELAYED_DATA_WINDOW) > 0) { + throw new IllegalArgumentException("check_window [" + checkWindow.getStringRep() + "] must be less than or equal to [24h]"); + } + } + this.checkWindow = checkWindow; + } + + public DelayedDataCheckConfig(StreamInput in) throws IOException { + enabled = in.readBoolean(); + checkWindow = in.readOptionalTimeValue(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeBoolean(enabled); + out.writeOptionalTimeValue(checkWindow); + } + + public boolean isEnabled() { + return enabled; + } + + @Nullable + public TimeValue getCheckWindow() { + return checkWindow; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException { + builder.startObject(); + builder.field(ENABLED.getPreferredName(), enabled); + if (checkWindow != null) { + builder.field(CHECK_WINDOW.getPreferredName(), checkWindow.getStringRep()); + } + builder.endObject(); + return builder; + } + + @Override + public int hashCode() { + return Objects.hash(enabled, checkWindow); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + + DelayedDataCheckConfig other = (DelayedDataCheckConfig) obj; + return Objects.equals(this.enabled, other.enabled) && Objects.equals(this.checkWindow, other.checkWindow); + } + +} diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java index b669e8f1edc..4792180ec51 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/job/messages/Messages.java @@ -22,6 +22,13 @@ public final class Messages { public static final String DATAFEED_CONFIG_CANNOT_USE_SCRIPT_FIELDS_WITH_AGGS = "script_fields cannot be used in combination with aggregations"; public static final String DATAFEED_CONFIG_INVALID_OPTION_VALUE = "Invalid {0} value ''{1}'' in datafeed configuration"; + public static final String DATAFEED_CONFIG_DELAYED_DATA_CHECK_TOO_SMALL = + "delayed_data_check_window [{0}] must be greater than the bucket_span [{1}]"; + public static final String DATAFEED_CONFIG_DELAYED_DATA_CHECK_TOO_LARGE = + "delayed_data_check_window [{0}] must be less than or equal to [24h]"; + public static final String DATAFEED_CONFIG_DELAYED_DATA_CHECK_SPANS_TOO_MANY_BUCKETS = + "delayed_data_check_window [{0}] must be less than 10,000x the bucket_span [{1}]"; + public static final String DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "A job configured with datafeed cannot support latency"; public static final String DATAFEED_NOT_FOUND = "No datafeed with id [{0}] exists"; public static final String DATAFEED_AGGREGATIONS_REQUIRES_DATE_HISTOGRAM = @@ -63,6 +70,9 @@ public final class Messages { public static final String JOB_AUDIT_DATAFEED_LOOKBACK_COMPLETED = "Datafeed lookback completed"; public static final String JOB_AUDIT_DATAFEED_LOOKBACK_NO_DATA = "Datafeed lookback retrieved no data"; public static final String JOB_AUDIT_DATAFEED_NO_DATA = "Datafeed has been retrieving no data for a while"; + public static final String JOB_AUDIT_DATAFEED_MISSING_DATA = + "Datafeed has missed {0} documents due to ingest latency, latest bucket with missing data is [{1}]." + + " Consider increasing query_delay"; public static final String JOB_AUDIT_DATAFEED_RECOVERED = "Datafeed has recovered data extraction and analysis"; public static final String JOB_AUDIT_DATAFEED_STARTED_FROM_TO = "Datafeed started (from: {0} to: {1}) with frequency [{2}]"; public static final String JOB_AUDIT_DATAFEED_STARTED_REALTIME = "Datafeed started in real-time"; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/time/TimeUtils.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/time/TimeUtils.java index 019668f1a3c..bebcc0a6ec3 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/time/TimeUtils.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/utils/time/TimeUtils.java @@ -87,6 +87,22 @@ public final class TimeUtils { checkMultiple(timeValue, baseUnit, field); } + /** + * Checks that the given {@code timeValue} is positive. + * + *
    + *
  • 1s is valid
  • + *
  • -1s is invalid
  • + *
+ */ + public static void checkPositive(TimeValue timeValue, ParseField field) { + long nanos = timeValue.getNanos(); + if (nanos <= 0) { + throw new IllegalArgumentException(field.getPreferredName() + " cannot be less or equal than 0. Value = " + + timeValue.toString()); + } + } + private static void checkNonNegative(TimeValue timeValue, ParseField field) { long nanos = timeValue.getNanos(); if (nanos < 0) { @@ -94,13 +110,7 @@ public final class TimeUtils { } } - private static void checkPositive(TimeValue timeValue, ParseField field) { - long nanos = timeValue.getNanos(); - if (nanos <= 0) { - throw new IllegalArgumentException(field.getPreferredName() + " cannot be less or equal than 0. Value = " - + timeValue.toString()); - } - } + /** * Check the given {@code timeValue} is a multiple of the {@code baseUnit} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java index 36bd2fbcb46..fe7c5b1a1d1 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/datafeed/DatafeedConfigTests.java @@ -109,6 +109,9 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase { + + @Override + protected DelayedDataCheckConfig createTestInstance(){ + return createRandomizedConfig(100); + } + + @Override + protected Writeable.Reader instanceReader() { + return DelayedDataCheckConfig::new; + } + + @Override + protected DelayedDataCheckConfig doParseInstance(XContentParser parser) { + return DelayedDataCheckConfig.STRICT_PARSER.apply(parser, null); + } + + public void testConstructor() { + expectThrows(IllegalArgumentException.class, () -> new DelayedDataCheckConfig(true, TimeValue.MINUS_ONE)); + expectThrows(IllegalArgumentException.class, () -> new DelayedDataCheckConfig(true, TimeValue.timeValueHours(25))); + } + + public void testEnabledDelayedDataCheckConfig() { + DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueHours(5)); + assertThat(delayedDataCheckConfig.isEnabled(), equalTo(true)); + assertThat(delayedDataCheckConfig.getCheckWindow(), equalTo(TimeValue.timeValueHours(5))); + } + + public void testDisabledDelayedDataCheckConfig() { + DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.disabledDelayedDataCheckConfig(); + assertThat(delayedDataCheckConfig.isEnabled(), equalTo(false)); + assertThat(delayedDataCheckConfig.getCheckWindow(), equalTo(null)); + } + + public void testDefaultDelayedDataCheckConfig() { + DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig(); + assertThat(delayedDataCheckConfig.isEnabled(), equalTo(true)); + assertThat(delayedDataCheckConfig.getCheckWindow(), is(nullValue())); + } + + public static DelayedDataCheckConfig createRandomizedConfig(long bucketSpanMillis) { + boolean enabled = randomBoolean(); + TimeValue timeWindow = null; + if (enabled || randomBoolean()) { + // time span is required to be at least 1 millis, so we use a custom method to generate a time value here + timeWindow = new TimeValue(randomLongBetween(bucketSpanMillis,bucketSpanMillis*2)); + } + return new DelayedDataCheckConfig(enabled, timeWindow); + } + + @Override + protected DelayedDataCheckConfig mutateInstance(DelayedDataCheckConfig instance) throws IOException { + boolean enabled = instance.isEnabled(); + TimeValue timeWindow = instance.getCheckWindow(); + switch (between(0, 1)) { + case 0: + enabled = !enabled; + if (randomBoolean()) { + timeWindow = TimeValue.timeValueMillis(randomLongBetween(1, 1000)); + } else { + timeWindow = null; + } + break; + case 1: + if (timeWindow == null) { + timeWindow = TimeValue.timeValueMillis(randomLongBetween(1, 1000)); + } else { + timeWindow = new TimeValue(timeWindow.getMillis() + between(10, 100)); + } + enabled = true; + break; + default: + throw new AssertionError("Illegal randomisation branch"); + } + return new DelayedDataCheckConfig(enabled, timeWindow); + } +} diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java index 8843a336bde..c95403a112d 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/job/config/AnalysisConfigTests.java @@ -46,7 +46,7 @@ public class AnalysisConfigTests extends AbstractSerializingTestCase response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000); assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo(0L)); long missingDocs = randomIntBetween(32, 128); // Simply adding data within the current delayed data detection, the choice of 43100000 is arbitrary and within the window - // for the DelayedDataDetector + // for the DatafeedDelayedDataDetector writeData(logger, index, missingDocs, now - 43100000, lastBucket.getEpoch()*1000); response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000); assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo(missingDocs)); + // Assert that the are returned in order + List timeStamps = response.stream().map(BucketWithMissingData::getTimeStamp).collect(Collectors.toList()); + assertEquals(timeStamps.stream().sorted().collect(Collectors.toList()), timeStamps); } public void testMissingDataDetectionInSpecificBucket() throws Exception { final String jobId = "delayed-data-detection-job-missing-test-specific-bucket"; Job.Builder job = createJob(jobId, TimeValue.timeValueMinutes(5), "count", null); - DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index)); + DatafeedConfig.Builder datafeedConfigBuilder = + createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), Collections.singletonList(index)); + datafeedConfigBuilder.setDelayedDataCheckConfig(DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueHours(12))); + DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); + registerJob(job); putJob(job); openJob(job.getId()); @@ -110,8 +121,7 @@ public class DelayedDataDetectorIT extends MlNativeAutodetectIntegTestCase { // Get the latest finalized bucket Bucket lastBucket = getLatestFinalizedBucket(jobId); - DelayedDataDetector delayedDataDetector = - new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); + DelayedDataDetector delayedDataDetector = newDetector(job.build(new Date()), datafeedConfig); long missingDocs = randomIntBetween(1, 10); @@ -127,6 +137,10 @@ public class DelayedDataDetectorIT extends MlNativeAutodetectIntegTestCase { } } assertThat(hasBucketWithMissing, equalTo(true)); + + // Assert that the are returned in order + List timeStamps = response.stream().map(BucketWithMissingData::getTimeStamp).collect(Collectors.toList()); + assertEquals(timeStamps.stream().sorted().collect(Collectors.toList()), timeStamps); } public void testMissingDataDetectionWithAggregationsAndQuery() throws Exception { @@ -147,6 +161,8 @@ public class DelayedDataDetectorIT extends MlNativeAutodetectIntegTestCase { .interval(TimeValue.timeValueMinutes(5).millis()))); datafeedConfigBuilder.setQuery(new RangeQueryBuilder("value").gte(numDocs/2)); datafeedConfigBuilder.setFrequency(TimeValue.timeValueMinutes(5)); + datafeedConfigBuilder.setDelayedDataCheckConfig(DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueHours(12))); + DatafeedConfig datafeedConfig = datafeedConfigBuilder.build(); registerJob(job); putJob(job); @@ -160,19 +176,21 @@ public class DelayedDataDetectorIT extends MlNativeAutodetectIntegTestCase { // Get the latest finalized bucket Bucket lastBucket = getLatestFinalizedBucket(jobId); - DelayedDataDetector delayedDataDetector = - new DelayedDataDetector(job.build(new Date()), datafeedConfig, TimeValue.timeValueHours(12), client()); + DelayedDataDetector delayedDataDetector = newDetector(job.build(new Date()), datafeedConfig); List response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000); assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo(0L)); long missingDocs = numDocs; // Simply adding data within the current delayed data detection, the choice of 43100000 is arbitrary and within the window - // for the DelayedDataDetector + // for the DatafeedDelayedDataDetector writeData(logger, index, missingDocs, now - 43100000, lastBucket.getEpoch()*1000); response = delayedDataDetector.detectMissingData(lastBucket.getEpoch()*1000); assertThat(response.stream().mapToLong(BucketWithMissingData::getMissingDocumentCount).sum(), equalTo((missingDocs+1)/2)); + // Assert that the are returned in order + List timeStamps = response.stream().map(BucketWithMissingData::getTimeStamp).collect(Collectors.toList()); + assertEquals(timeStamps.stream().sorted().collect(Collectors.toList()), timeStamps); } private Job.Builder createJob(String id, TimeValue bucketSpan, String function, String field) { @@ -231,4 +249,8 @@ public class DelayedDataDetectorIT extends MlNativeAutodetectIntegTestCase { getBucketsRequest.setPageParams(new PageParams(0, 1)); return getBuckets(getBucketsRequest).get(0); } + + private DelayedDataDetector newDetector(Job job, DatafeedConfig datafeedConfig) { + return DelayedDataDetectorFactory.buildDetector(job, datafeedConfig, client()); + } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java index 54a79ee199e..952e1c1f27e 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJob.java @@ -12,6 +12,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.XContentElasticsearchExtension; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.core.internal.io.Streams; import org.elasticsearch.index.mapper.DateFieldMapper; @@ -23,12 +24,16 @@ import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.messages.Messages; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; +import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector; +import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.ml.notifications.Auditor; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; +import java.util.Date; +import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; @@ -41,6 +46,7 @@ class DatafeedJob { private static final Logger LOGGER = LogManager.getLogger(DatafeedJob.class); private static final int NEXT_TASK_DELAY_MS = 100; + static final long MISSING_DATA_CHECK_INTERVAL_MS = 900_000; //15 minutes in ms private final Auditor auditor; private final String jobId; @@ -50,15 +56,19 @@ class DatafeedJob { private final Client client; private final DataExtractorFactory dataExtractorFactory; private final Supplier currentTimeSupplier; + private final DelayedDataDetector delayedDataDetector; private volatile long lookbackStartTimeMs; + private volatile long latestFinalBucketEndTimeMs; + private volatile long lastDataCheckTimeMs; + private volatile int lastDataCheckAudit; private volatile Long lastEndTimeMs; private AtomicBoolean running = new AtomicBoolean(true); private volatile boolean isIsolated; DatafeedJob(String jobId, DataDescription dataDescription, long frequencyMs, long queryDelayMs, - DataExtractorFactory dataExtractorFactory, Client client, Auditor auditor, Supplier currentTimeSupplier, - long latestFinalBucketEndTimeMs, long latestRecordTimeMs) { + DataExtractorFactory dataExtractorFactory, Client client, Auditor auditor, Supplier currentTimeSupplier, + DelayedDataDetector delayedDataDetector, long latestFinalBucketEndTimeMs, long latestRecordTimeMs) { this.jobId = jobId; this.dataDescription = Objects.requireNonNull(dataDescription); this.frequencyMs = frequencyMs; @@ -67,7 +77,8 @@ class DatafeedJob { this.client = client; this.auditor = auditor; this.currentTimeSupplier = currentTimeSupplier; - + this.delayedDataDetector = delayedDataDetector; + this.latestFinalBucketEndTimeMs = latestFinalBucketEndTimeMs; long lastEndTime = Math.max(latestFinalBucketEndTimeMs, latestRecordTimeMs); if (lastEndTime > 0) { lastEndTimeMs = lastEndTime; @@ -151,9 +162,49 @@ class DatafeedJob { request.setCalcInterim(true); request.setAdvanceTime(String.valueOf(end)); run(start, end, request); + checkForMissingDataIfNecessary(); return nextRealtimeTimestamp(); } + private void checkForMissingDataIfNecessary() { + if (isRunning() && !isIsolated && checkForMissingDataTriggered()) { + + // Keep track of the last bucket time for which we did a missing data check + this.lastDataCheckTimeMs = this.currentTimeSupplier.get(); + List missingDataBuckets = delayedDataDetector.detectMissingData(latestFinalBucketEndTimeMs); + if (missingDataBuckets.isEmpty() == false) { + + long totalRecordsMissing = missingDataBuckets.stream() + .mapToLong(BucketWithMissingData::getMissingDocumentCount) + .sum(); + // The response is sorted by asc timestamp, so the last entry is the last bucket + Date lastBucketDate = missingDataBuckets.get(missingDataBuckets.size() - 1).getBucket().getTimestamp(); + int newAudit = Objects.hash(totalRecordsMissing, lastBucketDate); + if (newAudit != lastDataCheckAudit) { + auditor.warning(jobId, + Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, totalRecordsMissing, + XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(lastBucketDate.getTime()))); + lastDataCheckAudit = newAudit; + } + } + } + } + + /** + * We wait a static interval of 15 minutes till the next missing data check. + * + * However, if our delayed data window is smaller than that, we will probably want to check at every available window (if freq. allows). + * This is to help to miss as few buckets in the delayed data check as possible. + * + * If our frequency/query delay are longer then our default interval or window size, we will end up looking for missing data on + * every real-time trigger. This should be OK as the we are pulling from the Index as such a slow pace, another query will + * probably not even be noticeable at such a large timescale. + */ + private boolean checkForMissingDataTriggered() { + return this.currentTimeSupplier.get() > this.lastDataCheckTimeMs + + Math.min(MISSING_DATA_CHECK_INTERVAL_MS, delayedDataDetector.getWindow()); + } + /** * Stops the datafeed job * @@ -260,7 +311,10 @@ class DatafeedJob { // we call flush the job is closed. Thus, we don't flush unless the // datafeed is still running. if (isRunning() && !isIsolated) { - flushJob(flushRequest); + Date lastFinalizedBucketEnd = flushJob(flushRequest).getLastFinalizedBucketEnd(); + if (lastFinalizedBucketEnd != null) { + this.latestFinalBucketEndTimeMs = lastFinalizedBucketEnd.getTime(); + } } if (recordCount == 0) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java index efe332346ef..22d7bec2da2 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobBuilder.java @@ -13,6 +13,8 @@ import org.elasticsearch.xpack.core.ml.action.util.QueryPage; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector; +import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory; import org.elasticsearch.xpack.ml.job.persistence.BucketsQueryBuilder; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; import org.elasticsearch.xpack.core.ml.job.results.Bucket; @@ -46,8 +48,9 @@ public class DatafeedJobBuilder { Consumer contextHanlder = context -> { TimeValue frequency = getFrequencyOrDefault(datafeed, job); TimeValue queryDelay = datafeed.getQueryDelay(); + DelayedDataDetector delayedDataDetector = DelayedDataDetectorFactory.buildDetector(job, datafeed, client); DatafeedJob datafeedJob = new DatafeedJob(job.getId(), buildDataDescription(job), frequency.millis(), queryDelay.millis(), - context.dataExtractorFactory, client, auditor, currentTimeSupplier, + context.dataExtractorFactory, client, auditor, currentTimeSupplier, delayedDataDetector, context.latestFinalBucketEndMs, context.latestRecordTimeMs); listener.onResponse(datafeedJob); }; diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DatafeedDelayedDataDetector.java similarity index 72% rename from x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java rename to x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DatafeedDelayedDataDetector.java index 3c7c6ff963e..86fe439ac16 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetector.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DatafeedDelayedDataDetector.java @@ -3,26 +3,26 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -package org.elasticsearch.xpack.ml.datafeed; +package org.elasticsearch.xpack.ml.datafeed.delayeddatacheck; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.Client; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.xpack.core.ml.action.GetBucketsAction; import org.elasticsearch.xpack.core.ml.action.util.PageParams; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.extractor.ExtractorUtils; -import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData; import org.elasticsearch.xpack.core.ml.job.results.Bucket; import org.elasticsearch.xpack.core.ml.utils.Intervals; import org.joda.time.DateTime; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -35,32 +35,33 @@ import static org.elasticsearch.xpack.core.ClientHelper.stashWithOrigin; /** * This class will search the buckets and indices over a given window to determine if any data is missing */ -public class DelayedDataDetector { +public class DatafeedDelayedDataDetector implements DelayedDataDetector { private static final String DATE_BUCKETS = "date_buckets"; + private final long bucketSpan; private final long window; - private final DatafeedConfig datafeedConfig; private final Client client; - private final Job job; + private final String timeField; + private final String jobId; + private final QueryBuilder datafeedQuery; + private final String[] datafeedIndices; - public DelayedDataDetector(Job job, DatafeedConfig datafeedConfig, TimeValue window, Client client) { - this.job = job; - this.bucketSpan = job.getAnalysisConfig().getBucketSpan().millis(); - this.datafeedConfig = datafeedConfig; - long windowMillis = window.millis(); - if (windowMillis < bucketSpan) { - throw new IllegalArgumentException("[window] must be greater or equal to the [bucket_span]"); - } - if (Intervals.alignToFloor(windowMillis/bucketSpan, bucketSpan) >= 10000) { - throw new IllegalArgumentException("[window] must contain less than 10000 buckets at the current [bucket_span]"); - } - this.window = windowMillis; + DatafeedDelayedDataDetector(long bucketSpan, long window, String jobId, String timeField, QueryBuilder datafeedQuery, + String[] datafeedIndices, Client client) { + this.bucketSpan = bucketSpan; + this.window = window; + this.jobId = jobId; + this.timeField = timeField; + this.datafeedQuery = datafeedQuery; + this.datafeedIndices = datafeedIndices; this.client = client; } /** - * This method looks at the {@link DatafeedConfig} from {@code latestFinalizedBucket - window} to {@code latestFinalizedBucket}. + * This method looks at the {@link DatafeedDelayedDataDetector#datafeedIndices} + * from {@code latestFinalizedBucket - window} to {@code latestFinalizedBucket} and compares the document counts with the + * {@link DatafeedDelayedDataDetector#jobId}'s finalized buckets' event counts. * * It is done synchronously, and can block for a considerable amount of time, it should only be executed within the appropriate * thread pool. @@ -68,9 +69,15 @@ public class DelayedDataDetector { * @param latestFinalizedBucketMs The latest finalized bucket timestamp in milliseconds, signifies the end of the time window check * @return A List of {@link BucketWithMissingData} objects that contain each bucket with the current number of missing docs */ + @Override public List detectMissingData(long latestFinalizedBucketMs) { final long end = Intervals.alignToFloor(latestFinalizedBucketMs, bucketSpan); final long start = Intervals.alignToFloor(latestFinalizedBucketMs - window, bucketSpan); + + if (end <= start) { + return Collections.emptyList(); + } + List finalizedBuckets = checkBucketEvents(start, end); Map indexedData = checkCurrentBucketEventCount(start, end); return finalizedBuckets.stream() @@ -81,10 +88,17 @@ public class DelayedDataDetector { .collect(Collectors.toList()); } + @Override + public long getWindow() { + return window; + } + private List checkBucketEvents(long start, long end) { - GetBucketsAction.Request request = new GetBucketsAction.Request(job.getId()); + GetBucketsAction.Request request = new GetBucketsAction.Request(jobId); request.setStart(Long.toString(start)); request.setEnd(Long.toString(end)); + request.setSort("timestamp"); + request.setDescending(false); request.setExcludeInterim(true); request.setPageParams(new PageParams(0, (int)((end - start)/bucketSpan))); @@ -95,13 +109,12 @@ public class DelayedDataDetector { } private Map checkCurrentBucketEventCount(long start, long end) { - String timeField = job.getDataDescription().getTimeField(); SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() .size(0) .aggregation(new DateHistogramAggregationBuilder(DATE_BUCKETS).interval(bucketSpan).field(timeField)) - .query(ExtractorUtils.wrapInTimeRangeQuery(datafeedConfig.getQuery(), timeField, start, end)); + .query(ExtractorUtils.wrapInTimeRangeQuery(datafeedQuery, timeField, start, end)); - SearchRequest searchRequest = new SearchRequest(datafeedConfig.getIndices().toArray(new String[0])).source(searchSourceBuilder); + SearchRequest searchRequest = new SearchRequest(datafeedIndices).source(searchSourceBuilder); try (ThreadContext.StoredContext ignore = stashWithOrigin(client.threadPool().getThreadContext(), ML_ORIGIN)) { SearchResponse response = client.execute(SearchAction.INSTANCE, searchRequest).actionGet(); List buckets = ((Histogram)response.getAggregations().get(DATE_BUCKETS)).getBuckets(); @@ -132,27 +145,4 @@ public class DelayedDataDetector { private static long calculateMissing(Map indexedData, Bucket bucket) { return indexedData.getOrDefault(bucket.getEpoch() * 1000, 0L) - bucket.getEventCount(); } - - public static class BucketWithMissingData { - - private final long missingDocumentCount; - private final Bucket bucket; - - static BucketWithMissingData fromMissingAndBucket(long missingDocumentCount, Bucket bucket) { - return new BucketWithMissingData(missingDocumentCount, bucket); - } - - private BucketWithMissingData(long missingDocumentCount, Bucket bucket) { - this.missingDocumentCount = missingDocumentCount; - this.bucket = bucket; - } - - public Bucket getBucket() { - return bucket; - } - - public long getMissingDocumentCount() { - return missingDocumentCount; - } - } } diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetector.java new file mode 100644 index 00000000000..3d36f3576fd --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetector.java @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.delayeddatacheck; + +import java.util.List; + +public interface DelayedDataDetector { + List detectMissingData(long endingTimeStamp); + + long getWindow(); +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java new file mode 100644 index 00000000000..a9aeb398141 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactory.java @@ -0,0 +1,125 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.delayeddatacheck; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; + +import java.util.Objects; + +/** + * Builds the appropriate {@link DelayedDataDetector} implementation, with the appropriate settings, given the parameters. + */ +public class DelayedDataDetectorFactory { + + // There are eight 15min buckets in a two hour span, so matching that number as the fallback for very long buckets + private static final int FALLBACK_NUMBER_OF_BUCKETS_TO_SPAN = 8; + private static final TimeValue DEFAULT_CHECK_WINDOW = TimeValue.timeValueHours(2); + + /** + * This will build the appropriate detector given the parameters. + * + * If {@link DatafeedConfig#getDelayedDataCheckConfig()} is not `isEnabled()`, then a {@link NullDelayedDataDetector} is returned, which + * does not do any checks, and only supplies an empty collection. + * + * @param job The {@link Job} object for the given `datafeedConfig` + * @param datafeedConfig The {@link DatafeedConfig} for which to create the {@link DelayedDataDetector} + * @param client The {@link Client} capable of taking action against the ES Cluster. + * @return A new {@link DelayedDataDetector} + */ + public static DelayedDataDetector buildDetector(Job job, DatafeedConfig datafeedConfig, Client client) { + if (datafeedConfig.getDelayedDataCheckConfig().isEnabled()) { + long window = validateAndCalculateWindowLength(job.getAnalysisConfig().getBucketSpan(), + datafeedConfig.getDelayedDataCheckConfig().getCheckWindow()); + long bucketSpan = job.getAnalysisConfig().getBucketSpan() == null ? 0 : job.getAnalysisConfig().getBucketSpan().millis(); + return new DatafeedDelayedDataDetector(bucketSpan, + window, + job.getId(), + job.getDataDescription().getTimeField(), + datafeedConfig.getQuery(), + datafeedConfig.getIndices().toArray(new String[0]), + client); + } else { + return new NullDelayedDataDetector(); + } + } + + private static long validateAndCalculateWindowLength(TimeValue bucketSpan, TimeValue currentWindow) { + if (bucketSpan == null) { + return 0; + } + if (currentWindow == null) { // we should provide a good default as the user did not specify a window + if(bucketSpan.compareTo(DEFAULT_CHECK_WINDOW) >= 0) { + return FALLBACK_NUMBER_OF_BUCKETS_TO_SPAN * bucketSpan.millis(); + } else { + return DEFAULT_CHECK_WINDOW.millis(); + } + } + if (currentWindow.compareTo(bucketSpan) < 0) { + throw new IllegalArgumentException( + Messages.getMessage(Messages.DATAFEED_CONFIG_DELAYED_DATA_CHECK_TOO_SMALL, currentWindow.getStringRep(), + bucketSpan.getStringRep())); + } else if (currentWindow.millis() > bucketSpan.millis() * DelayedDataCheckConfig.MAX_NUMBER_SPANABLE_BUCKETS) { + throw new IllegalArgumentException( + Messages.getMessage(Messages.DATAFEED_CONFIG_DELAYED_DATA_CHECK_SPANS_TOO_MANY_BUCKETS, currentWindow.getStringRep(), + bucketSpan.getStringRep())); + } + return currentWindow.millis(); + } + + public static class BucketWithMissingData { + + private final long missingDocumentCount; + private final Bucket bucket; + + public static BucketWithMissingData fromMissingAndBucket(long missingDocumentCount, Bucket bucket) { + return new BucketWithMissingData(missingDocumentCount, bucket); + } + + private BucketWithMissingData(long missingDocumentCount, Bucket bucket) { + this.missingDocumentCount = missingDocumentCount; + this.bucket = bucket; + } + + public long getTimeStamp() { + return bucket.getEpoch(); + } + + public Bucket getBucket() { + return bucket; + } + + public long getMissingDocumentCount() { + return missingDocumentCount; + } + + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } + + if (other == null || getClass() != other.getClass()) { + return false; + } + + BucketWithMissingData that = (BucketWithMissingData) other; + + return Objects.equals(that.bucket, bucket) && Objects.equals(that.missingDocumentCount, missingDocumentCount); + } + + @Override + public int hashCode() { + return Objects.hash(bucket, missingDocumentCount); + } + } + +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/NullDelayedDataDetector.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/NullDelayedDataDetector.java new file mode 100644 index 00000000000..ee6178e6e84 --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/NullDelayedDataDetector.java @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.delayeddatacheck; + +import java.util.Collections; +import java.util.List; + +/** + * This class will always return an {@link Collections#emptyList()}. + */ +public class NullDelayedDataDetector implements DelayedDataDetector { + + /** + * Always returns an empty collection + * @param unusedTimeStamp unused Parameter + * @return {@link Collections#emptyList()} + */ + @Override + public List detectMissingData(long unusedTimeStamp) { + return Collections.emptyList(); + } + + /** + * Always returns 0 + * @return a 0 + */ + @Override + public long getWindow() { + return 0L; + } + +} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java index 268a351cd24..930817b5021 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.common.xcontent.XContentElasticsearchExtension; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.mock.orig.Mockito; import org.elasticsearch.test.ESTestCase; @@ -18,6 +19,10 @@ import org.elasticsearch.xpack.core.ml.action.FlushJobAction; import org.elasticsearch.xpack.core.ml.action.PersistJobAction; import org.elasticsearch.xpack.core.ml.action.PostDataAction; import org.elasticsearch.xpack.core.ml.datafeed.extractor.DataExtractor; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; +import org.elasticsearch.xpack.core.ml.job.results.Bucket; +import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetector; +import org.elasticsearch.xpack.ml.datafeed.delayeddatacheck.DelayedDataDetectorFactory.BucketWithMissingData; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts; @@ -30,6 +35,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Optional; @@ -56,10 +62,12 @@ public class DatafeedJobTests extends ESTestCase { private DataExtractorFactory dataExtractorFactory; private DataExtractor dataExtractor; private Client client; + private DelayedDataDetector delayedDataDetector; private DataDescription.Builder dataDescription; ActionFuture postDataFuture; private ActionFuture flushJobFuture; private ArgumentCaptor flushJobRequests; + private FlushJobAction.Response flushJobResponse; private long currentTime; private XContentType xContentType; @@ -79,6 +87,9 @@ public class DatafeedJobTests extends ESTestCase { dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); postDataFuture = mock(ActionFuture.class); flushJobFuture = mock(ActionFuture.class); + flushJobResponse = new FlushJobAction.Response(); + delayedDataDetector = mock(DelayedDataDetector.class); + when(delayedDataDetector.getWindow()).thenReturn(DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS); currentTime = 0; xContentType = XContentType.JSON; @@ -96,6 +107,7 @@ public class DatafeedJobTests extends ESTestCase { when(postDataFuture.actionGet()).thenReturn(new PostDataAction.Response(dataCounts)); flushJobRequests = ArgumentCaptor.forClass(FlushJobAction.Request.class); + when(flushJobFuture.actionGet()).thenReturn(flushJobResponse); when(client.execute(same(FlushJobAction.INSTANCE), flushJobRequests.capture())).thenReturn(flushJobFuture); } @@ -193,6 +205,13 @@ public class DatafeedJobTests extends ESTestCase { } public void testRealtimeRun() throws Exception { + flushJobResponse = new FlushJobAction.Response(true, new Date(2000)); + Bucket bucket = mock(Bucket.class); + when(bucket.getTimestamp()).thenReturn(new Date(2000)); + when(flushJobFuture.actionGet()).thenReturn(flushJobResponse); + when(client.execute(same(FlushJobAction.INSTANCE), flushJobRequests.capture())).thenReturn(flushJobFuture); + when(delayedDataDetector.detectMissingData(2000)) + .thenReturn(Collections.singletonList(BucketWithMissingData.fromMissingAndBucket(10, bucket))); currentTime = 60000L; long frequencyMs = 100; long queryDelayMs = 1000; @@ -206,6 +225,29 @@ public class DatafeedJobTests extends ESTestCase { flushRequest.setAdvanceTime("59000"); verify(client).execute(same(FlushJobAction.INSTANCE), eq(flushRequest)); verify(client, never()).execute(same(PersistJobAction.INSTANCE), any()); + + // Execute a second valid time, but do so in a smaller window than the interval + currentTime = 62000L; + byte[] contentBytes = "content".getBytes(StandardCharsets.UTF_8); + InputStream inputStream = new ByteArrayInputStream(contentBytes); + when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); + when(dataExtractor.next()).thenReturn(Optional.of(inputStream)); + when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor); + datafeedJob.runRealtime(); + + // Execute a third time, but this time make sure we exceed the data check interval, but keep the delayedDataDetector response + // the same + currentTime = 62000L + DatafeedJob.MISSING_DATA_CHECK_INTERVAL_MS + 1; + inputStream = new ByteArrayInputStream(contentBytes); + when(dataExtractor.hasNext()).thenReturn(true).thenReturn(false); + when(dataExtractor.next()).thenReturn(Optional.of(inputStream)); + when(dataExtractorFactory.newExtractor(anyLong(), anyLong())).thenReturn(dataExtractor); + datafeedJob.runRealtime(); + + verify(auditor, times(1)).warning(jobId, + Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_MISSING_DATA, + 10, + XContentElasticsearchExtension.DEFAULT_DATE_PRINTER.print(2000))); } public void testEmptyDataCountGivenlookback() throws Exception { @@ -321,6 +363,6 @@ public class DatafeedJobTests extends ESTestCase { long latestRecordTimeMs) { Supplier currentTimeSupplier = () -> currentTime; return new DatafeedJob(jobId, dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, client, auditor, - currentTimeSupplier, latestFinalBucketEndTimeMs, latestRecordTimeMs); + currentTimeSupplier, delayedDataDetector, latestFinalBucketEndTimeMs, latestRecordTimeMs); } } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidatorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidatorTests.java index 35fd9bb98ab..1507e106c61 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidatorTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobValidatorTests.java @@ -14,6 +14,7 @@ import org.elasticsearch.search.aggregations.metrics.MaxAggregationBuilder; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.core.ml.datafeed.DatafeedJobValidator; +import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig; import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.core.ml.job.config.DataDescription; import org.elasticsearch.xpack.core.ml.job.config.Detector; @@ -176,6 +177,30 @@ public class DatafeedJobValidatorTests extends ESTestCase { assertEquals("Datafeed frequency [1.5m] must be a multiple of the aggregation interval [60000ms]", e.getMessage()); } + public void testVerify_BucketIntervalAndDataCheckWindowAreValid() { + Job.Builder builder = buildJobBuilder("foo"); + AnalysisConfig.Builder ac = createAnalysisConfig(); + ac.setSummaryCountFieldName("some_count"); + ac.setBucketSpan(TimeValue.timeValueSeconds(2)); + builder.setAnalysisConfig(ac); + Job job = builder.build(new Date()); + DatafeedConfig.Builder datafeedBuilder = createValidDatafeedConfig(); + datafeedBuilder.setDelayedDataCheckConfig(DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueMinutes(10))); + + DatafeedJobValidator.validate(datafeedBuilder.build(), job); + + datafeedBuilder.setDelayedDataCheckConfig(DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueSeconds(1))); + ElasticsearchStatusException e = ESTestCase.expectThrows(ElasticsearchStatusException.class, + () -> DatafeedJobValidator.validate(datafeedBuilder.build(), job)); + assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_DELAYED_DATA_CHECK_TOO_SMALL, "1s", "2s"), e.getMessage()); + + datafeedBuilder.setDelayedDataCheckConfig(DelayedDataCheckConfig.enabledDelayedDataCheckConfig(TimeValue.timeValueHours(24))); + e = ESTestCase.expectThrows(ElasticsearchStatusException.class, + () -> DatafeedJobValidator.validate(datafeedBuilder.build(), job)); + assertEquals(Messages.getMessage( + Messages.DATAFEED_CONFIG_DELAYED_DATA_CHECK_SPANS_TOO_MANY_BUCKETS, "1d", "2s"), e.getMessage()); + } + private static Job.Builder buildJobBuilder(String id) { Job.Builder builder = new Job.Builder(id); AnalysisConfig.Builder ac = createAnalysisConfig(); diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetectorTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetectorTests.java deleted file mode 100644 index 9a54181af9c..00000000000 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/DelayedDataDetectorTests.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ -package org.elasticsearch.xpack.ml.datafeed; - -import org.elasticsearch.client.Client; -import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.test.ESTestCase; -import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; -import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; -import org.elasticsearch.xpack.core.ml.job.config.DataDescription; -import org.elasticsearch.xpack.core.ml.job.config.Detector; -import org.elasticsearch.xpack.core.ml.job.config.Job; - -import java.util.Collections; -import java.util.Date; - -import static org.hamcrest.Matchers.equalTo; -import static org.mockito.Mockito.mock; - - -public class DelayedDataDetectorTests extends ESTestCase { - - - public void testConstructorWithValueValues() { - TimeValue window = TimeValue.timeValueSeconds(10); - Job job = createJob(TimeValue.timeValueSeconds(1)); - DelayedDataDetector delayedDataDetector = new DelayedDataDetector(job, createDatafeed(), window, mock(Client.class)); - assertNotNull(delayedDataDetector); - } - - public void testConstructorWithInvalidValues() { - TimeValue shortWindow = TimeValue.timeValueMillis(500); - Job job = createJob(TimeValue.timeValueSeconds(1)); - - Exception exception = expectThrows(IllegalArgumentException.class, - ()-> new DelayedDataDetector(job, createDatafeed(), shortWindow, mock(Client.class))); - assertThat(exception.getMessage(), equalTo("[window] must be greater or equal to the [bucket_span]")); - - TimeValue longWindow = TimeValue.timeValueSeconds(20000); - - exception = expectThrows(IllegalArgumentException.class, - ()-> new DelayedDataDetector(job, createDatafeed(), longWindow, mock(Client.class))); - assertThat(exception.getMessage(), equalTo("[window] must contain less than 10000 buckets at the current [bucket_span]")); - } - - - private Job createJob(TimeValue bucketSpan) { - DataDescription.Builder dataDescription = new DataDescription.Builder(); - dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); - dataDescription.setTimeField("time"); - dataDescription.setTimeFormat(DataDescription.EPOCH_MS); - - Detector.Builder d = new Detector.Builder("count", null); - AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); - analysisConfig.setBucketSpan(bucketSpan); - - Job.Builder builder = new Job.Builder(); - builder.setId("test-job"); - builder.setAnalysisConfig(analysisConfig); - builder.setDataDescription(dataDescription); - return builder.build(new Date()); - } - - private DatafeedConfig createDatafeed() { - DatafeedConfig.Builder builder = new DatafeedConfig.Builder("id", "jobId"); - builder.setIndices(Collections.singletonList("index1")); - builder.setTypes(Collections.singletonList("doc")); - return builder.build(); - } - - - -} diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactoryTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactoryTests.java new file mode 100644 index 00000000000..12cf97734c9 --- /dev/null +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/datafeed/delayeddatacheck/DelayedDataDetectorFactoryTests.java @@ -0,0 +1,103 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.delayeddatacheck; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.core.ml.datafeed.DelayedDataCheckConfig; +import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig; +import org.elasticsearch.xpack.core.ml.job.config.DataDescription; +import org.elasticsearch.xpack.core.ml.job.config.Detector; +import org.elasticsearch.xpack.core.ml.job.config.Job; +import org.elasticsearch.xpack.core.ml.job.messages.Messages; + +import java.util.Collections; +import java.util.Date; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; +import static org.mockito.Mockito.mock; + + +public class DelayedDataDetectorFactoryTests extends ESTestCase { + + public void testBuilder() { + Job job = createJob(TimeValue.timeValueSeconds(2)); + + DatafeedConfig datafeedConfig = createDatafeed(false, null); + + // Should not throw + assertThat(DelayedDataDetectorFactory.buildDetector(job, datafeedConfig, mock(Client.class)), + instanceOf(NullDelayedDataDetector.class)); + + datafeedConfig = createDatafeed(true, TimeValue.timeValueMinutes(10)); + + // Should not throw + assertThat(DelayedDataDetectorFactory.buildDetector(job, datafeedConfig, mock(Client.class)), + instanceOf(DatafeedDelayedDataDetector.class)); + + DatafeedConfig tooSmallDatafeedConfig = createDatafeed(true, TimeValue.timeValueSeconds(1)); + IllegalArgumentException e = ESTestCase.expectThrows(IllegalArgumentException.class, + () -> DelayedDataDetectorFactory.buildDetector(job, tooSmallDatafeedConfig, mock(Client.class))); + assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_DELAYED_DATA_CHECK_TOO_SMALL, "1s", "2s"), e.getMessage()); + + DatafeedConfig tooBigDatafeedConfig = createDatafeed(true, TimeValue.timeValueHours(12)); + e = ESTestCase.expectThrows(IllegalArgumentException.class, + () -> DelayedDataDetectorFactory.buildDetector(job, tooBigDatafeedConfig, mock(Client.class))); + assertEquals(Messages.getMessage( + Messages.DATAFEED_CONFIG_DELAYED_DATA_CHECK_SPANS_TOO_MANY_BUCKETS, "12h", "2s"), e.getMessage()); + + Job withBigBucketSpan = createJob(TimeValue.timeValueHours(3)); + datafeedConfig = createDatafeed(true, null); + + // Should not throw + DelayedDataDetector delayedDataDetector = + DelayedDataDetectorFactory.buildDetector(withBigBucketSpan, datafeedConfig, mock(Client.class)); + assertThat(delayedDataDetector.getWindow(), equalTo(TimeValue.timeValueHours(3).millis() * 8)); + + datafeedConfig = createDatafeed(true, null); + + // Should not throw + delayedDataDetector = + DelayedDataDetectorFactory.buildDetector(job, datafeedConfig, mock(Client.class)); + assertThat(delayedDataDetector.getWindow(), equalTo(TimeValue.timeValueHours(2).millis())); + + } + + private Job createJob(TimeValue bucketSpan) { + DataDescription.Builder dataDescription = new DataDescription.Builder(); + dataDescription.setFormat(DataDescription.DataFormat.XCONTENT); + dataDescription.setTimeField("time"); + dataDescription.setTimeFormat(DataDescription.EPOCH_MS); + + Detector.Builder d = new Detector.Builder("count", null); + AnalysisConfig.Builder analysisConfig = new AnalysisConfig.Builder(Collections.singletonList(d.build())); + analysisConfig.setBucketSpan(bucketSpan); + + Job.Builder builder = new Job.Builder(); + builder.setId("test-job"); + builder.setAnalysisConfig(analysisConfig); + builder.setDataDescription(dataDescription); + return builder.build(new Date()); + } + + private DatafeedConfig createDatafeed(boolean shouldDetectDelayedData, TimeValue delayedDatacheckWindow) { + DatafeedConfig.Builder builder = new DatafeedConfig.Builder("id", "jobId"); + builder.setIndices(Collections.singletonList("index1")); + builder.setTypes(Collections.singletonList("doc")); + + if (shouldDetectDelayedData) { + builder.setDelayedDataCheckConfig(DelayedDataCheckConfig.enabledDelayedDataCheckConfig(delayedDatacheckWindow)); + } else { + builder.setDelayedDataCheckConfig(DelayedDataCheckConfig.disabledDelayedDataCheckConfig()); + } + return builder.build(); + } + + +}