[ML][7.x] Add option to stop datafeed that finds no data (#47995)

Adds a new datafeed config option, max_empty_searches,
that tells a datafeed that has never found any data to stop
itself and close its associated job after a certain number
of real-time searches have returned no data.

Backport of #47922
This commit is contained in:
David Roberts 2019-10-14 17:19:13 +01:00 committed by GitHub
parent 5f3ef2e09c
commit 1ca25bed38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 310 additions and 69 deletions

View File

@ -62,6 +62,7 @@ public class DatafeedConfig implements ToXContentObject {
public static final ParseField SCRIPT_FIELDS = new ParseField("script_fields");
public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config");
public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config");
public static final ParseField MAX_EMPTY_SEARCHES = new ParseField("max_empty_searches");
public static final ConstructingObjectParser<Builder, Void> PARSER = new ConstructingObjectParser<>(
"datafeed_config", true, a -> new Builder((String)a[0], (String)a[1]));
@ -88,6 +89,7 @@ public class DatafeedConfig implements ToXContentObject {
PARSER.declareInt(Builder::setScrollSize, SCROLL_SIZE);
PARSER.declareObject(Builder::setChunkingConfig, ChunkingConfig.PARSER, CHUNKING_CONFIG);
PARSER.declareObject(Builder::setDelayedDataCheckConfig, DelayedDataCheckConfig.PARSER, DELAYED_DATA_CHECK_CONFIG);
PARSER.declareInt(Builder::setMaxEmptySearches, MAX_EMPTY_SEARCHES);
}
private static BytesReference parseBytes(XContentParser parser) throws IOException {
@ -107,11 +109,12 @@ public class DatafeedConfig implements ToXContentObject {
private final Integer scrollSize;
private final ChunkingConfig chunkingConfig;
private final DelayedDataCheckConfig delayedDataCheckConfig;
private final Integer maxEmptySearches;
private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, BytesReference query,
BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields, Integer scrollSize,
ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) {
ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig,
Integer maxEmptySearches) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
@ -123,6 +126,7 @@ public class DatafeedConfig implements ToXContentObject {
this.scrollSize = scrollSize;
this.chunkingConfig = chunkingConfig;
this.delayedDataCheckConfig = delayedDataCheckConfig;
this.maxEmptySearches = maxEmptySearches;
}
public String getId() {
@ -169,6 +173,10 @@ public class DatafeedConfig implements ToXContentObject {
return delayedDataCheckConfig;
}
public Integer getMaxEmptySearches() {
return maxEmptySearches;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
@ -205,6 +213,9 @@ public class DatafeedConfig implements ToXContentObject {
if (delayedDataCheckConfig != null) {
builder.field(DELAYED_DATA_CHECK_CONFIG.getPreferredName(), delayedDataCheckConfig);
}
if (maxEmptySearches != null) {
builder.field(MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches);
}
builder.endObject();
return builder;
@ -245,7 +256,8 @@ public class DatafeedConfig implements ToXContentObject {
&& Objects.equals(asMap(this.aggregations), asMap(that.aggregations))
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig)
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig);
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig)
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches);
}
/**
@ -256,7 +268,7 @@ public class DatafeedConfig implements ToXContentObject {
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields,
chunkingConfig, delayedDataCheckConfig);
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
}
public static Builder builder(String id, String jobId) {
@ -276,6 +288,7 @@ public class DatafeedConfig implements ToXContentObject {
private Integer scrollSize;
private ChunkingConfig chunkingConfig;
private DelayedDataCheckConfig delayedDataCheckConfig;
private Integer maxEmptySearches;
public Builder(String id, String jobId) {
this.id = Objects.requireNonNull(id, ID.getPreferredName());
@ -294,6 +307,7 @@ public class DatafeedConfig implements ToXContentObject {
this.scrollSize = config.scrollSize;
this.chunkingConfig = config.chunkingConfig;
this.delayedDataCheckConfig = config.getDelayedDataCheckConfig();
this.maxEmptySearches = config.getMaxEmptySearches();
}
public Builder setIndices(List<String> indices) {
@ -376,9 +390,14 @@ public class DatafeedConfig implements ToXContentObject {
return this;
}
public Builder setMaxEmptySearches(int maxEmptySearches) {
this.maxEmptySearches = maxEmptySearches;
return this;
}
public DatafeedConfig build() {
return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize,
chunkingConfig, delayedDataCheckConfig);
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
}
private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {

View File

@ -79,6 +79,7 @@ public class DatafeedUpdate implements ToXContentObject {
PARSER.declareObject(Builder::setDelayedDataCheckConfig,
DelayedDataCheckConfig.PARSER,
DatafeedConfig.DELAYED_DATA_CHECK_CONFIG);
PARSER.declareInt(Builder::setMaxEmptySearches, DatafeedConfig.MAX_EMPTY_SEARCHES);
}
private static BytesReference parseBytes(XContentParser parser) throws IOException {
@ -98,10 +99,12 @@ public class DatafeedUpdate implements ToXContentObject {
private final Integer scrollSize;
private final ChunkingConfig chunkingConfig;
private final DelayedDataCheckConfig delayedDataCheckConfig;
private final Integer maxEmptySearches;
private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices, BytesReference query,
BytesReference aggregations, List<SearchSourceBuilder.ScriptField> scriptFields, Integer scrollSize,
ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) {
ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig,
Integer maxEmptySearches) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
@ -113,6 +116,7 @@ public class DatafeedUpdate implements ToXContentObject {
this.scrollSize = scrollSize;
this.chunkingConfig = chunkingConfig;
this.delayedDataCheckConfig = delayedDataCheckConfig;
this.maxEmptySearches = maxEmptySearches;
}
/**
@ -152,6 +156,7 @@ public class DatafeedUpdate implements ToXContentObject {
}
addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize);
addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig);
addOptionalField(builder, DatafeedConfig.MAX_EMPTY_SEARCHES, maxEmptySearches);
builder.endObject();
return builder;
}
@ -202,6 +207,10 @@ public class DatafeedUpdate implements ToXContentObject {
return delayedDataCheckConfig;
}
public Integer getMaxEmptySearches() {
return maxEmptySearches;
}
private static Map<String, Object> asMap(BytesReference bytesReference) {
return bytesReference == null ? null : XContentHelper.convertToMap(bytesReference, true, XContentType.JSON).v2();
}
@ -237,7 +246,8 @@ public class DatafeedUpdate implements ToXContentObject {
&& Objects.equals(asMap(this.aggregations), asMap(that.aggregations))
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig)
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig);
&& Objects.equals(this.chunkingConfig, that.chunkingConfig)
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches);
}
/**
@ -248,7 +258,7 @@ public class DatafeedUpdate implements ToXContentObject {
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indices, asMap(query), scrollSize, asMap(aggregations), scriptFields,
chunkingConfig, delayedDataCheckConfig);
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
}
public static Builder builder(String id) {
@ -268,6 +278,7 @@ public class DatafeedUpdate implements ToXContentObject {
private Integer scrollSize;
private ChunkingConfig chunkingConfig;
private DelayedDataCheckConfig delayedDataCheckConfig;
private Integer maxEmptySearches;
public Builder(String id) {
this.id = Objects.requireNonNull(id, DatafeedConfig.ID.getPreferredName());
@ -285,6 +296,7 @@ public class DatafeedUpdate implements ToXContentObject {
this.scrollSize = config.scrollSize;
this.chunkingConfig = config.chunkingConfig;
this.delayedDataCheckConfig = config.delayedDataCheckConfig;
this.maxEmptySearches = config.maxEmptySearches;
}
@Deprecated
@ -364,9 +376,14 @@ public class DatafeedUpdate implements ToXContentObject {
return this;
}
public Builder setMaxEmptySearches(int maxEmptySearches) {
this.maxEmptySearches = maxEmptySearches;
return this;
}
public DatafeedUpdate build() {
return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, query, aggregations, scriptFields, scrollSize,
chunkingConfig, delayedDataCheckConfig);
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
}
private static BytesReference xContentToBytes(ToXContentObject object) throws IOException {

View File

@ -106,6 +106,9 @@ public class DatafeedConfigTests extends AbstractXContentTestCase<DatafeedConfig
if (randomBoolean()) {
builder.setDelayedDataCheckConfig(DelayedDataCheckConfigTests.createRandomizedConfig());
}
if (randomBoolean()) {
builder.setMaxEmptySearches(randomIntBetween(10, 100));
}
return builder;
}

View File

@ -83,6 +83,9 @@ public class DatafeedUpdateTests extends AbstractXContentTestCase<DatafeedUpdate
if (randomBoolean()) {
builder.setDelayedDataCheckConfig(DelayedDataCheckConfigTests.createRandomizedConfig());
}
if (randomBoolean()) {
builder.setMaxEmptySearches(randomIntBetween(10, 100));
}
return builder.build();
}

View File

@ -65,6 +65,15 @@ A {dfeed} resource has the following properties:
`{"enabled": true, "check_window": "1h"}` See
<<ml-datafeed-delayed-data-check-config>>.
`max_empty_searches`::
(integer) If a real-time {dfeed} has never seen any data (including during
any initial training period) then it will automatically stop itself and
close its associated job after this many real-time searches that return no
documents. In other words, it will stop after `frequency` times
`max_empty_searches` of real-time operation. If not set
then a {dfeed} with no end time that sees no data will remain started until
it is explicitly stopped. By default this setting is not set.
[[ml-datafeed-chunking-config]]
==== Chunking configuration objects

View File

@ -101,6 +101,15 @@ parallel and close one when you are satisfied with the results of the other job.
(Optional, unsigned integer) The `size` parameter that is used in {es}
searches. The default value is `1000`.
`max_empty_searches`::
(Optional, integer) If a real-time {dfeed} has never seen any data (including
during any initial training period) then it will automatically stop itself
and close its associated job after this many real-time searches that return
no documents. In other words, it will stop after `frequency` times
`max_empty_searches` of real-time operation. If not set
then a {dfeed} with no end time that sees no data will remain started until
it is explicitly stopped. The special value `-1` unsets this setting.
For more information about these properties, see <<ml-datafeed-resource>>.

View File

@ -91,6 +91,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
public static final ParseField CHUNKING_CONFIG = new ParseField("chunking_config");
public static final ParseField HEADERS = new ParseField("headers");
public static final ParseField DELAYED_DATA_CHECK_CONFIG = new ParseField("delayed_data_check_config");
public static final ParseField MAX_EMPTY_SEARCHES = new ParseField("max_empty_searches");
// These parsers follow the pattern that metadata is parsed leniently (to allow for enhancements), whilst config is parsed strictly
public static final ObjectParser<Builder, Void> LENIENT_PARSER = createParser(true);
@ -152,6 +153,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
parser.declareObject(Builder::setDelayedDataCheckConfig,
ignoreUnknownFields ? DelayedDataCheckConfig.LENIENT_PARSER : DelayedDataCheckConfig.STRICT_PARSER,
DELAYED_DATA_CHECK_CONFIG);
parser.declareInt(Builder::setMaxEmptySearches, MAX_EMPTY_SEARCHES);
return parser;
}
@ -176,11 +178,12 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
private final ChunkingConfig chunkingConfig;
private final Map<String, String> headers;
private final DelayedDataCheckConfig delayedDataCheckConfig;
private final Integer maxEmptySearches;
private DatafeedConfig(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices,
QueryProvider queryProvider, AggProvider aggProvider, List<SearchSourceBuilder.ScriptField> scriptFields,
Integer scrollSize, ChunkingConfig chunkingConfig, Map<String, String> headers,
DelayedDataCheckConfig delayedDataCheckConfig) {
DelayedDataCheckConfig delayedDataCheckConfig, Integer maxEmptySearches) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
@ -193,6 +196,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
this.chunkingConfig = chunkingConfig;
this.headers = Collections.unmodifiableMap(headers);
this.delayedDataCheckConfig = delayedDataCheckConfig;
this.maxEmptySearches = maxEmptySearches;
}
public DatafeedConfig(StreamInput in) throws IOException {
@ -233,6 +237,11 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
} else {
delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig();
}
if (in.getVersion().onOrAfter(Version.V_7_5_0)) {
maxEmptySearches = in.readOptionalVInt();
} else {
maxEmptySearches = null;
}
}
/**
@ -401,6 +410,10 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
return delayedDataCheckConfig;
}
public Integer getMaxEmptySearches() {
return maxEmptySearches;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(id);
@ -439,6 +452,9 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
out.writeOptionalWriteable(delayedDataCheckConfig);
}
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
out.writeOptionalVInt(maxEmptySearches);
}
}
@Override
@ -475,6 +491,9 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
if (delayedDataCheckConfig != null) {
builder.field(DELAYED_DATA_CHECK_CONFIG.getPreferredName(), delayedDataCheckConfig);
}
if (maxEmptySearches != null) {
builder.field(MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches);
}
builder.endObject();
return builder;
}
@ -507,13 +526,14 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig)
&& Objects.equals(this.headers, that.headers)
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig);
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig)
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches);
}
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indices, queryProvider, scrollSize, aggProvider, scriptFields, chunkingConfig,
headers, delayedDataCheckConfig);
headers, delayedDataCheckConfig, maxEmptySearches);
}
@Override
@ -586,6 +606,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
private ChunkingConfig chunkingConfig;
private Map<String, String> headers = Collections.emptyMap();
private DelayedDataCheckConfig delayedDataCheckConfig = DelayedDataCheckConfig.defaultDelayedDataCheckConfig();
private Integer maxEmptySearches;
public Builder() { }
@ -608,6 +629,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
this.chunkingConfig = config.chunkingConfig;
this.headers = new HashMap<>(config.headers);
this.delayedDataCheckConfig = config.getDelayedDataCheckConfig();
this.maxEmptySearches = config.getMaxEmptySearches();
}
public void setId(String datafeedId) {
@ -701,6 +723,18 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
this.delayedDataCheckConfig = delayedDataCheckConfig;
}
public void setMaxEmptySearches(int maxEmptySearches) {
if (maxEmptySearches == -1) {
this.maxEmptySearches = null;
} else if (maxEmptySearches <= 0) {
String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE,
DatafeedConfig.MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches);
throw ExceptionsHelper.badRequestException(msg);
} else {
this.maxEmptySearches = maxEmptySearches;
}
}
public DatafeedConfig build() {
ExceptionsHelper.requireNonNull(id, ID.getPreferredName());
ExceptionsHelper.requireNonNull(jobId, Job.ID.getPreferredName());
@ -716,7 +750,7 @@ public class DatafeedConfig extends AbstractDiffable<DatafeedConfig> implements
setDefaultQueryDelay();
return new DatafeedConfig(id, jobId, queryDelay, frequency, indices, queryProvider, aggProvider, scriptFields, scrollSize,
chunkingConfig, headers, delayedDataCheckConfig);
chunkingConfig, headers, delayedDataCheckConfig, maxEmptySearches);
}
void validateScriptFields() {

View File

@ -81,6 +81,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
PARSER.declareObject(Builder::setDelayedDataCheckConfig,
DelayedDataCheckConfig.STRICT_PARSER,
DatafeedConfig.DELAYED_DATA_CHECK_CONFIG);
PARSER.declareInt(Builder::setMaxEmptySearches, DatafeedConfig.MAX_EMPTY_SEARCHES);
}
private final String id;
@ -94,11 +95,13 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
private final Integer scrollSize;
private final ChunkingConfig chunkingConfig;
private final DelayedDataCheckConfig delayedDataCheckConfig;
private final Integer maxEmptySearches;
private DatafeedUpdate(String id, String jobId, TimeValue queryDelay, TimeValue frequency, List<String> indices,
QueryProvider queryProvider, AggProvider aggProvider,
List<SearchSourceBuilder.ScriptField> scriptFields,
Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig) {
Integer scrollSize, ChunkingConfig chunkingConfig, DelayedDataCheckConfig delayedDataCheckConfig,
Integer maxEmptySearches) {
this.id = id;
this.jobId = jobId;
this.queryDelay = queryDelay;
@ -110,6 +113,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
this.scrollSize = scrollSize;
this.chunkingConfig = chunkingConfig;
this.delayedDataCheckConfig = delayedDataCheckConfig;
this.maxEmptySearches = maxEmptySearches;
}
public DatafeedUpdate(StreamInput in) throws IOException {
@ -147,6 +151,11 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
} else {
delayedDataCheckConfig = null;
}
if (in.getVersion().onOrAfter(Version.V_7_5_0)) {
maxEmptySearches = in.readOptionalInt();
} else {
maxEmptySearches = null;
}
}
/**
@ -192,6 +201,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
if (out.getVersion().onOrAfter(Version.V_6_6_0)) {
out.writeOptionalWriteable(delayedDataCheckConfig);
}
if (out.getVersion().onOrAfter(Version.V_7_5_0)) {
out.writeOptionalInt(maxEmptySearches);
}
}
@Override
@ -222,6 +234,8 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
addOptionalField(builder, DatafeedConfig.SCROLL_SIZE, scrollSize);
addOptionalField(builder, DatafeedConfig.CHUNKING_CONFIG, chunkingConfig);
addOptionalField(builder, DatafeedConfig.DELAYED_DATA_CHECK_CONFIG, delayedDataCheckConfig);
addOptionalField(builder, DatafeedConfig.MAX_EMPTY_SEARCHES, maxEmptySearches);
builder.endObject();
return builder;
}
@ -290,6 +304,10 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
return delayedDataCheckConfig;
}
public Integer getMaxEmptySearches() {
return maxEmptySearches;
}
/**
* Applies the update to the given {@link DatafeedConfig}
* @return a new {@link DatafeedConfig} that contains the update
@ -334,6 +352,9 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
if (delayedDataCheckConfig != null) {
builder.setDelayedDataCheckConfig(delayedDataCheckConfig);
}
if (maxEmptySearches != null) {
builder.setMaxEmptySearches(maxEmptySearches);
}
if (headers.isEmpty() == false) {
// Adjust the request, adding security headers from the current thread context
@ -373,13 +394,14 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
&& Objects.equals(this.aggProvider, that.aggProvider)
&& Objects.equals(this.delayedDataCheckConfig, that.delayedDataCheckConfig)
&& Objects.equals(this.scriptFields, that.scriptFields)
&& Objects.equals(this.chunkingConfig, that.chunkingConfig);
&& Objects.equals(this.chunkingConfig, that.chunkingConfig)
&& Objects.equals(this.maxEmptySearches, that.maxEmptySearches);
}
@Override
public int hashCode() {
return Objects.hash(id, jobId, frequency, queryDelay, indices, queryProvider, scrollSize, aggProvider, scriptFields, chunkingConfig,
delayedDataCheckConfig);
delayedDataCheckConfig, maxEmptySearches);
}
@Override
@ -396,7 +418,8 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
&& (aggProvider == null || Objects.equals(aggProvider.getAggs(), datafeed.getAggregations()))
&& (scriptFields == null || Objects.equals(scriptFields, datafeed.getScriptFields()))
&& (delayedDataCheckConfig == null || Objects.equals(delayedDataCheckConfig, datafeed.getDelayedDataCheckConfig()))
&& (chunkingConfig == null || Objects.equals(chunkingConfig, datafeed.getChunkingConfig()));
&& (chunkingConfig == null || Objects.equals(chunkingConfig, datafeed.getChunkingConfig()))
&& (maxEmptySearches == null || Objects.equals(maxEmptySearches, datafeed.getMaxEmptySearches()));
}
public static class Builder {
@ -412,6 +435,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
private Integer scrollSize;
private ChunkingConfig chunkingConfig;
private DelayedDataCheckConfig delayedDataCheckConfig;
private Integer maxEmptySearches;
public Builder() {
}
@ -432,6 +456,7 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
this.scrollSize = config.scrollSize;
this.chunkingConfig = config.chunkingConfig;
this.delayedDataCheckConfig = config.delayedDataCheckConfig;
this.maxEmptySearches = config.maxEmptySearches;
}
public Builder setId(String datafeedId) {
@ -499,9 +524,19 @@ public class DatafeedUpdate implements Writeable, ToXContentObject {
return this;
}
public Builder setMaxEmptySearches(int maxEmptySearches) {
if (maxEmptySearches < -1 || maxEmptySearches == 0) {
String msg = Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE,
DatafeedConfig.MAX_EMPTY_SEARCHES.getPreferredName(), maxEmptySearches);
throw ExceptionsHelper.badRequestException(msg);
}
this.maxEmptySearches = maxEmptySearches;
return this;
}
public DatafeedUpdate build() {
return new DatafeedUpdate(id, jobId, queryDelay, frequency, indices, queryProvider, aggProvider, scriptFields, scrollSize,
chunkingConfig, delayedDataCheckConfig);
chunkingConfig, delayedDataCheckConfig, maxEmptySearches);
}
}
}

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.core.ml.datafeed;
import com.carrotsearch.randomizedtesting.generators.CodepointSetGenerator;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
@ -68,6 +69,7 @@ import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedConfig> {
@ -149,6 +151,9 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
if (randomBoolean()) {
builder.setDelayedDataCheckConfig(DelayedDataCheckConfigTests.createRandomizedConfig(bucketSpanMillis));
}
if (randomBoolean()) {
builder.setMaxEmptySearches(randomIntBetween(10, 100));
}
return builder;
}
@ -378,10 +383,10 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
defaultFeedBuilder.setIndices(Collections.singletonList("index"));
DatafeedConfig defaultFeed = defaultFeedBuilder.build();
assertThat(defaultFeed.getScrollSize(), equalTo(1000));
assertThat(defaultFeed.getQueryDelay().seconds(), greaterThanOrEqualTo(60L));
assertThat(defaultFeed.getQueryDelay().seconds(), lessThan(120L));
assertThat(defaultFeed.getMaxEmptySearches(), is(nullValue()));
}
public void testDefaultQueryDelay() {
@ -406,6 +411,20 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
expectThrows(IllegalArgumentException.class, () -> conf.setIndices(null));
}
public void testCheckValid_GivenInvalidMaxEmptySearches() {
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
ElasticsearchStatusException e =
expectThrows(ElasticsearchStatusException.class, () -> conf.setMaxEmptySearches(randomFrom(-2, 0)));
assertThat(e.getMessage(), containsString("Invalid max_empty_searches value"));
}
public void testCheckValid_GivenMaxEmptySearchesMinusOne() {
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
conf.setIndices(Collections.singletonList("whatever"));
conf.setMaxEmptySearches(-1);
assertThat(conf.build().getMaxEmptySearches(), is(nullValue()));
}
public void testCheckValid_GivenEmptyIndices() {
DatafeedConfig.Builder conf = new DatafeedConfig.Builder("datafeed1", "job1");
conf.setIndices(Collections.emptyList());
@ -824,7 +843,7 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
@Override
protected DatafeedConfig mutateInstance(DatafeedConfig instance) throws IOException {
DatafeedConfig.Builder builder = new DatafeedConfig.Builder(instance);
switch (between(0, 9)) {
switch (between(0, 10)) {
case 0:
builder.setId(instance.getId() + randomValidDatafeedId());
break;
@ -886,6 +905,13 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
builder.setChunkingConfig(ChunkingConfig.newAuto());
}
break;
case 10:
if (instance.getMaxEmptySearches() == null) {
builder.setMaxEmptySearches(randomIntBetween(10, 100));
} else {
builder.setMaxEmptySearches(instance.getMaxEmptySearches() + 1);
}
break;
default:
throw new AssertionError("Illegal randomisation branch");
}

View File

@ -121,6 +121,9 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
if (randomBoolean()) {
builder.setDelayedDataCheckConfig(DelayedDataCheckConfigTests.createRandomizedConfig(randomLongBetween(300_001, 400_000)));
}
if (randomBoolean()) {
builder.setMaxEmptySearches(randomBoolean() ? -1 : randomIntBetween(10, 100));
}
return builder.build();
}
@ -339,7 +342,7 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
@Override
protected DatafeedUpdate mutateInstance(DatafeedUpdate instance) throws IOException {
DatafeedUpdate.Builder builder = new DatafeedUpdate.Builder(instance);
switch (between(0, 9)) {
switch (between(0, 10)) {
case 0:
builder.setId(instance.getId() + DatafeedConfigTests.randomValidDatafeedId());
break;
@ -413,6 +416,13 @@ public class DatafeedUpdateTests extends AbstractSerializingTestCase<DatafeedUpd
builder.setChunkingConfig(null);
}
break;
case 10:
if (instance.getMaxEmptySearches() == null) {
builder.setMaxEmptySearches(randomFrom(-1, 10));
} else {
builder.setMaxEmptySearches(instance.getMaxEmptySearches() + 100);
}
break;
default:
throw new AssertionError("Illegal randomisation branch");
}

View File

@ -17,6 +17,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.ConcurrentMapLong;
import org.elasticsearch.xpack.core.ml.action.DeleteDatafeedAction;
import org.elasticsearch.xpack.core.ml.action.GetDatafeedsStatsAction;
import org.elasticsearch.xpack.core.ml.action.GetJobsStatsAction;
import org.elasticsearch.xpack.core.ml.action.KillProcessAction;
import org.elasticsearch.xpack.core.ml.action.PutJobAction;
import org.elasticsearch.xpack.core.ml.action.StopDatafeedAction;
@ -51,7 +52,7 @@ import static org.hamcrest.Matchers.hasSize;
public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
@After
public void cleanup() throws Exception {
public void cleanup() {
cleanUp();
}
@ -111,7 +112,7 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
Job.Builder job = createScheduledJob("lookback-job-datafeed-recreated");
String datafeedId = "lookback-datafeed-datafeed-recreated";
DatafeedConfig datafeedConfig = createDatafeed(datafeedId, job.getId(), Arrays.asList("data"));
DatafeedConfig datafeedConfig = createDatafeed(datafeedId, job.getId(), Collections.singletonList("data"));
registerJob(job);
putJob(job);
@ -192,7 +193,7 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
putJob(job);
String datafeedId = "lookback-datafeed-query-delay-updated";
DatafeedConfig datafeedConfig = createDatafeed(datafeedId, job.getId(), Arrays.asList("data"));
DatafeedConfig datafeedConfig = createDatafeed(datafeedId, job.getId(), Collections.singletonList("data"));
registerDatafeed(datafeedConfig);
putDatafeed(datafeedConfig);
@ -247,6 +248,26 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
});
}
public void testRealtime_noDataAndAutoStop() throws Exception {
String jobId = "realtime-job-auto-stop";
String datafeedId = jobId + "-datafeed";
startRealtime(jobId, randomIntBetween(1, 3));
// Datafeed should auto-stop...
assertBusy(() -> {
GetDatafeedsStatsAction.Request request = new GetDatafeedsStatsAction.Request(datafeedId);
GetDatafeedsStatsAction.Response response = client().execute(GetDatafeedsStatsAction.INSTANCE, request).actionGet();
assertThat(response.getResponse().results().get(0).getDatafeedState(), equalTo(DatafeedState.STOPPED));
});
// ...and should have auto-closed the job too
assertBusy(() -> {
GetJobsStatsAction.Request request = new GetJobsStatsAction.Request(jobId);
GetJobsStatsAction.Response response = client().execute(GetJobsStatsAction.INSTANCE, request).actionGet();
assertThat(response.getResponse().results().get(0).getState(), equalTo(JobState.CLOSED));
});
}
public void testRealtime_multipleStopCalls() throws Exception {
String jobId = "realtime-job-multiple-stop";
final String datafeedId = jobId + "-datafeed";
@ -402,13 +423,22 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
}
private void startRealtime(String jobId) throws Exception {
startRealtime(jobId, null);
}
private void startRealtime(String jobId, Integer maxEmptySearches) throws Exception {
client().admin().indices().prepareCreate("data")
.addMapping("type", "time", "type=date")
.get();
long numDocs1 = randomIntBetween(32, 2048);
long now = System.currentTimeMillis();
long lastWeek = now - 604800000;
indexDocs(logger, "data", numDocs1, lastWeek, now);
long numDocs1;
if (maxEmptySearches == null) {
numDocs1 = randomIntBetween(32, 2048);
long lastWeek = now - 604800000;
indexDocs(logger, "data", numDocs1, lastWeek, now);
} else {
numDocs1 = 0;
}
Job.Builder job = createScheduledJob(jobId);
registerJob(job);
@ -416,7 +446,12 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
openJob(job.getId());
assertBusy(() -> assertEquals(getJobStats(job.getId()).get(0).getState(), JobState.OPENED));
DatafeedConfig datafeedConfig = createDatafeed(job.getId() + "-datafeed", job.getId(), Collections.singletonList("data"));
DatafeedConfig.Builder datafeedConfigBuilder =
createDatafeedBuilder(job.getId() + "-datafeed", job.getId(), Collections.singletonList("data"));
if (maxEmptySearches != null) {
datafeedConfigBuilder.setMaxEmptySearches(maxEmptySearches);
}
DatafeedConfig datafeedConfig = datafeedConfigBuilder.build();
registerDatafeed(datafeedConfig);
putDatafeed(datafeedConfig);
startDatafeed(datafeedConfig.getId(), 0L, null);
@ -426,9 +461,15 @@ public class DatafeedJobsIT extends MlNativeAutodetectIntegTestCase {
assertThat(dataCounts.getOutOfOrderTimeStampCount(), equalTo(0L));
});
long numDocs2 = randomIntBetween(2, 64);
now = System.currentTimeMillis();
indexDocs(logger, "data", numDocs2, now + 5000, now + 6000);
long numDocs2;
if (maxEmptySearches == null) {
numDocs2 = randomIntBetween(2, 64);
indexDocs(logger, "data", numDocs2, now + 5000, now + 6000);
} else {
numDocs2 = 0;
}
assertBusy(() -> {
DataCounts dataCounts = getDataCounts(job.getId());
assertThat(dataCounts.getProcessedRecordCount(), equalTo(numDocs1 + numDocs2));

View File

@ -64,6 +64,7 @@ class DatafeedJob {
private final DatafeedTimingStatsReporter timingStatsReporter;
private final Supplier<Long> currentTimeSupplier;
private final DelayedDataDetector delayedDataDetector;
private final Integer maxEmptySearches;
private volatile long lookbackStartTimeMs;
private volatile long latestFinalBucketEndTimeMs;
@ -73,11 +74,12 @@ class DatafeedJob {
private volatile Long lastEndTimeMs;
private AtomicBoolean running = new AtomicBoolean(true);
private volatile boolean isIsolated;
private volatile boolean haveEverSeenData;
DatafeedJob(String jobId, DataDescription dataDescription, long frequencyMs, long queryDelayMs,
DataExtractorFactory dataExtractorFactory, DatafeedTimingStatsReporter timingStatsReporter, Client client,
AnomalyDetectionAuditor auditor, Supplier<Long> currentTimeSupplier, DelayedDataDetector delayedDataDetector,
long latestFinalBucketEndTimeMs, long latestRecordTimeMs) {
Integer maxEmptySearches, long latestFinalBucketEndTimeMs, long latestRecordTimeMs, boolean haveSeenDataPreviously) {
this.jobId = jobId;
this.dataDescription = Objects.requireNonNull(dataDescription);
this.frequencyMs = frequencyMs;
@ -88,11 +90,13 @@ class DatafeedJob {
this.auditor = auditor;
this.currentTimeSupplier = currentTimeSupplier;
this.delayedDataDetector = delayedDataDetector;
this.maxEmptySearches = maxEmptySearches;
this.latestFinalBucketEndTimeMs = latestFinalBucketEndTimeMs;
long lastEndTime = Math.max(latestFinalBucketEndTimeMs, latestRecordTimeMs);
if (lastEndTime > 0) {
lastEndTimeMs = lastEndTime;
}
this.haveEverSeenData = haveSeenDataPreviously;
}
void isolate() {
@ -108,6 +112,10 @@ class DatafeedJob {
return jobId;
}
public Integer getMaxEmptySearches() {
return maxEmptySearches;
}
public void finishReportingTimingStats() {
timingStatsReporter.finishReporting();
}
@ -380,6 +388,7 @@ class DatafeedJob {
break;
}
recordCount += counts.getProcessedRecordCount();
haveEverSeenData |= (recordCount > 0);
if (counts.getLatestRecordTimeStamp() != null) {
lastEndTimeMs = counts.getLatestRecordTimeStamp().getTime();
}
@ -406,7 +415,7 @@ class DatafeedJob {
}
if (recordCount == 0) {
throw new EmptyDataCountException(nextRealtimeTimestamp());
throw new EmptyDataCountException(nextRealtimeTimestamp(), haveEverSeenData);
}
}
@ -509,10 +518,11 @@ class DatafeedJob {
static class EmptyDataCountException extends RuntimeException {
final long nextDelayInMsSinceEpoch;
final boolean haveEverSeenData;
EmptyDataCountException(long nextDelayInMsSinceEpoch) {
EmptyDataCountException(long nextDelayInMsSinceEpoch, boolean haveEverSeenData) {
this.nextDelayInMsSinceEpoch = nextDelayInMsSinceEpoch;
this.haveEverSeenData = haveEverSeenData;
}
}
}

View File

@ -92,8 +92,10 @@ public class DatafeedJobBuilder {
auditor,
currentTimeSupplier,
delayedDataDetector,
datafeedConfigHolder.get().getMaxEmptySearches(),
context.latestFinalBucketEndMs,
context.latestRecordTimeMs);
context.latestRecordTimeMs,
context.haveSeenDataPreviously);
listener.onResponse(datafeedJob);
};
@ -128,6 +130,7 @@ public class DatafeedJobBuilder {
if (dataCounts.getLatestRecordTimeStamp() != null) {
context.latestRecordTimeMs = dataCounts.getLatestRecordTimeStamp().getTime();
}
context.haveSeenDataPreviously = (dataCounts.getInputRecordCount() > 0);
jobResultsProvider.datafeedTimingStats(jobHolder.get().getId(), datafeedTimingStatsHandler, listener::onFailure);
};
@ -223,6 +226,7 @@ public class DatafeedJobBuilder {
private static class Context {
volatile long latestFinalBucketEndMs = -1L;
volatile long latestRecordTimeMs = -1L;
volatile boolean haveSeenDataPreviously;
volatile DataExtractorFactory dataExtractorFactory;
volatile DatafeedTimingStatsReporter timingStatsReporter;
}

View File

@ -80,7 +80,6 @@ public class DatafeedManager {
clusterService.addListener(taskRunner);
}
public void run(TransportStartDatafeedAction.DatafeedTask task, Consumer<Exception> finishHandler) {
String datafeedId = task.getDatafeedId();
@ -233,7 +232,7 @@ public class DatafeedManager {
long nextDelayInMsSinceEpoch;
try {
nextDelayInMsSinceEpoch = holder.executeRealTime();
holder.problemTracker.reportNoneEmptyCount();
holder.problemTracker.reportNonEmptyDataCount();
} catch (DatafeedJob.ExtractionProblemException e) {
nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
holder.problemTracker.reportExtractionProblem(e.getCause().getMessage());
@ -245,8 +244,15 @@ public class DatafeedManager {
return;
}
} catch (DatafeedJob.EmptyDataCountException e) {
int emptyDataCount = holder.problemTracker.reportEmptyDataCount();
if (e.haveEverSeenData == false && holder.shouldStopAfterEmptyData(emptyDataCount)) {
logger.warn("Datafeed for [" + jobId + "] has seen no data in [" + emptyDataCount
+ "] attempts, and never seen any data previously, so stopping...");
// In this case we auto-close the job, as though a lookback-only datafeed stopped
holder.stop("no_data", TimeValue.timeValueSeconds(20), e, true);
return;
}
nextDelayInMsSinceEpoch = e.nextDelayInMsSinceEpoch;
holder.problemTracker.reportEmptyDataCount();
} catch (Exception e) {
logger.error("Unexpected datafeed failure for job [" + jobId + "] stopping...", e);
holder.stop("general_realtime_error", TimeValue.timeValueSeconds(20), e);
@ -303,7 +309,7 @@ public class DatafeedManager {
// To ensure that we wait until lookback / realtime search has completed before we stop the datafeed
private final ReentrantLock datafeedJobLock = new ReentrantLock(true);
private final DatafeedJob datafeedJob;
private final boolean autoCloseJob;
private final boolean defaultAutoCloseJob;
private final ProblemTracker problemTracker;
private final Consumer<Exception> finishHandler;
volatile Scheduler.Cancellable cancellable;
@ -315,11 +321,16 @@ public class DatafeedManager {
this.allocationId = task.getAllocationId();
this.datafeedId = datafeedId;
this.datafeedJob = datafeedJob;
this.autoCloseJob = task.isLookbackOnly();
this.defaultAutoCloseJob = task.isLookbackOnly();
this.problemTracker = problemTracker;
this.finishHandler = finishHandler;
}
boolean shouldStopAfterEmptyData(int emptyDataCount) {
Integer emptyDataCountToStopAt = datafeedJob.getMaxEmptySearches();
return emptyDataCountToStopAt != null && emptyDataCount >= emptyDataCountToStopAt;
}
String getJobId() {
return datafeedJob.getJobId();
}
@ -333,6 +344,10 @@ public class DatafeedManager {
}
public void stop(String source, TimeValue timeout, Exception e) {
stop(source, timeout, e, defaultAutoCloseJob);
}
public void stop(String source, TimeValue timeout, Exception e, boolean autoCloseJob) {
if (isNodeShuttingDown) {
return;
}

View File

@ -74,16 +74,14 @@ class ProblemTracker {
* Updates the tracking of empty data cycles. If the number of consecutive empty data
* cycles reaches {@code EMPTY_DATA_WARN_COUNT}, a warning is reported.
*/
public void reportEmptyDataCount() {
if (emptyDataCount < EMPTY_DATA_WARN_COUNT) {
emptyDataCount++;
if (emptyDataCount == EMPTY_DATA_WARN_COUNT) {
auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_NO_DATA));
}
public int reportEmptyDataCount() {
if (++emptyDataCount == EMPTY_DATA_WARN_COUNT) {
auditor.warning(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_NO_DATA));
}
return emptyDataCount;
}
public void reportNoneEmptyCount() {
public void reportNonEmptyDataCount() {
if (emptyDataCount >= EMPTY_DATA_WARN_COUNT) {
auditor.info(jobId, Messages.getMessage(Messages.JOB_AUDIT_DATAFEED_DATA_SEEN_AGAIN));
}

View File

@ -133,7 +133,7 @@ public class DatafeedJobTests extends ESTestCase {
}
public void testLookBackRunWithEndTime() throws Exception {
DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1);
DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1, randomBoolean());
assertNull(datafeedJob.runLookBack(0L, 1000L));
verify(dataExtractorFactory).newExtractor(0L, 1000L);
@ -145,7 +145,7 @@ public class DatafeedJobTests extends ESTestCase {
public void testSetIsolated() throws Exception {
currentTime = 2000L;
DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1);
DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1, randomBoolean());
datafeedJob.isolate();
assertNull(datafeedJob.runLookBack(0L, null));
@ -158,7 +158,7 @@ public class DatafeedJobTests extends ESTestCase {
currentTime = 2000L;
long frequencyMs = 1000;
long queryDelayMs = 500;
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, -1, -1);
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, -1, -1, randomBoolean());
long next = datafeedJob.runLookBack(0L, null);
assertEquals(2000 + frequencyMs + queryDelayMs + 100, next);
@ -181,7 +181,7 @@ public class DatafeedJobTests extends ESTestCase {
long frequencyMs = 1000;
long queryDelayMs = 500;
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, latestFinalBucketEndTimeMs, latestRecordTimeMs);
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, latestFinalBucketEndTimeMs, latestRecordTimeMs, true);
long next = datafeedJob.runLookBack(0L, null);
assertEquals(10000 + frequencyMs + queryDelayMs + 100, next);
@ -206,7 +206,7 @@ public class DatafeedJobTests extends ESTestCase {
long frequencyMs = 1000;
long queryDelayMs = 500;
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, latestFinalBucketEndTimeMs, latestRecordTimeMs);
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, latestFinalBucketEndTimeMs, latestRecordTimeMs, true);
datafeedJob.runLookBack(currentTime, null);
// advance time
@ -238,7 +238,7 @@ public class DatafeedJobTests extends ESTestCase {
currentTime = 60000L;
long frequencyMs = 100;
long queryDelayMs = 1000;
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1);
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1, false);
long next = datafeedJob.runRealtime();
assertEquals(currentTime + frequencyMs + 100, next);
@ -344,7 +344,7 @@ public class DatafeedJobTests extends ESTestCase {
public void testEmptyDataCountGivenlookback() throws Exception {
when(dataExtractor.hasNext()).thenReturn(false);
DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1);
DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1, false);
expectThrows(DatafeedJob.EmptyDataCountException.class, () -> datafeedJob.runLookBack(0L, 1000L));
verify(client, times(1)).execute(same(FlushJobAction.INSTANCE), any());
verify(client, never()).execute(same(PersistJobAction.INSTANCE), any());
@ -355,7 +355,7 @@ public class DatafeedJobTests extends ESTestCase {
when(dataExtractor.hasNext()).thenReturn(true);
when(dataExtractor.next()).thenThrow(new IOException());
DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1);
DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1, randomBoolean());
expectThrows(DatafeedJob.ExtractionProblemException.class, () -> datafeedJob.runLookBack(0L, 1000L));
currentTime = 3001;
@ -382,7 +382,7 @@ public class DatafeedJobTests extends ESTestCase {
when(dataExtractor.getEndTime()).thenReturn(1000L);
DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1);
DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1, randomBoolean());
DatafeedJob.AnalysisProblemException analysisProblemException =
expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runLookBack(0L, 1000L));
assertThat(analysisProblemException.shouldStop, is(false));
@ -411,7 +411,7 @@ public class DatafeedJobTests extends ESTestCase {
when(dataExtractor.getEndTime()).thenReturn(1000L);
DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1);
DatafeedJob datafeedJob = createDatafeedJob(1000, 500, -1, -1, randomBoolean());
DatafeedJob.AnalysisProblemException analysisProblemException =
expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runLookBack(0L, 1000L));
assertThat(analysisProblemException.shouldStop, is(true));
@ -436,7 +436,7 @@ public class DatafeedJobTests extends ESTestCase {
currentTime = 60000L;
long frequencyMs = 100;
long queryDelayMs = 1000;
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1);
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1, randomBoolean());
DatafeedJob.AnalysisProblemException analysisProblemException =
expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runRealtime());
assertThat(analysisProblemException.shouldStop, is(false));
@ -448,16 +448,17 @@ public class DatafeedJobTests extends ESTestCase {
currentTime = 60000L;
long frequencyMs = 100;
long queryDelayMs = 1000;
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1);
DatafeedJob datafeedJob = createDatafeedJob(frequencyMs, queryDelayMs, 1000, -1, randomBoolean());
DatafeedJob.AnalysisProblemException analysisProblemException =
expectThrows(DatafeedJob.AnalysisProblemException.class, () -> datafeedJob.runRealtime());
assertThat(analysisProblemException.shouldStop, is(true));
}
private DatafeedJob createDatafeedJob(long frequencyMs, long queryDelayMs, long latestFinalBucketEndTimeMs,
long latestRecordTimeMs) {
long latestRecordTimeMs, boolean haveSeenDataPreviously) {
Supplier<Long> currentTimeSupplier = () -> currentTime;
return new DatafeedJob(jobId, dataDescription.build(), frequencyMs, queryDelayMs, dataExtractorFactory, timingStatsReporter,
client, auditor, currentTimeSupplier, delayedDataDetector, latestFinalBucketEndTimeMs, latestRecordTimeMs);
client, auditor, currentTimeSupplier, delayedDataDetector, null, latestFinalBucketEndTimeMs, latestRecordTimeMs,
haveSeenDataPreviously);
}
}

View File

@ -114,6 +114,7 @@ public class DatafeedManagerTests extends ESTestCase {
when(datafeedJob.isRunning()).thenReturn(true);
when(datafeedJob.stop()).thenReturn(true);
when(datafeedJob.getJobId()).thenReturn(job.getId());
when(datafeedJob.getMaxEmptySearches()).thenReturn(null);
DatafeedJobBuilder datafeedJobBuilder = mock(DatafeedJobBuilder.class);
doAnswer(invocationOnMock -> {
@SuppressWarnings("rawtypes")
@ -133,7 +134,7 @@ public class DatafeedManagerTests extends ESTestCase {
}
public void testLookbackOnly_WarnsWhenNoDataIsRetrieved() throws Exception {
when(datafeedJob.runLookBack(0L, 60000L)).thenThrow(new DatafeedJob.EmptyDataCountException(0L));
when(datafeedJob.runLookBack(0L, 60000L)).thenThrow(new DatafeedJob.EmptyDataCountException(0L, false));
Consumer<Exception> handler = mockConsumer();
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, 60000L);
datafeedManager.run(task, handler);
@ -176,8 +177,8 @@ public class DatafeedManagerTests extends ESTestCase {
return mock(Scheduler.ScheduledCancellable.class);
}).when(threadPool).schedule(any(), any(), any());
when(datafeedJob.runLookBack(anyLong(), anyLong())).thenThrow(new DatafeedJob.EmptyDataCountException(0L));
when(datafeedJob.runRealtime()).thenThrow(new DatafeedJob.EmptyDataCountException(0L));
when(datafeedJob.runLookBack(anyLong(), anyLong())).thenThrow(new DatafeedJob.EmptyDataCountException(0L, false));
when(datafeedJob.runRealtime()).thenThrow(new DatafeedJob.EmptyDataCountException(0L, false));
Consumer<Exception> handler = mockConsumer();
DatafeedTask task = createDatafeedTask("datafeed_id", 0L, null);

View File

@ -85,7 +85,7 @@ public class ProblemTrackerTests extends ESTestCase {
for (int i = 0; i < 9; i++) {
problemTracker.reportEmptyDataCount();
}
problemTracker.reportNoneEmptyCount();
problemTracker.reportNonEmptyDataCount();
Mockito.verifyNoMoreInteractions(auditor);
}
@ -94,7 +94,7 @@ public class ProblemTrackerTests extends ESTestCase {
for (int i = 0; i < 10; i++) {
problemTracker.reportEmptyDataCount();
}
problemTracker.reportNoneEmptyCount();
problemTracker.reportNonEmptyDataCount();
verify(auditor).warning("foo", "Datafeed has been retrieving no data for a while");
verify(auditor).info("foo", "Datafeed has started retrieving data again");

View File

@ -181,8 +181,10 @@ setup:
"indexes":["index-foo"],
"scroll_size": 2000,
"frequency": "1m",
"query_delay": "30s"
"query_delay": "30s",
"max_empty_searches": 42
}
- match: { max_empty_searches: 42 }
- do:
ml.update_datafeed:
@ -192,7 +194,8 @@ setup:
"indexes":["index-*"],
"scroll_size": 10000,
"frequency": "2m",
"query_delay": "0s"
"query_delay": "0s",
"max_empty_searches": -1
}
- match: { datafeed_id: "test-datafeed-1" }
- match: { job_id: "datafeeds-crud-1" }
@ -200,6 +203,7 @@ setup:
- match: { scroll_size: 10000 }
- match: { frequency: "2m" }
- match: { query_delay: "0s" }
- is_false: max_empty_searches
---
"Test update datafeed to point to different job":
@ -364,7 +368,8 @@ setup:
}
}
}
}
},
"max_empty_searches": -1
}
- do:
ml.get_datafeeds:
@ -374,6 +379,7 @@ setup:
- match: { datafeeds.0.aggregations.histogram_buckets.aggs.@timestamp.max.field: "@timestamp" }
- match: { datafeeds.0.aggregations.histogram_buckets.aggs.bytes_in_avg.avg.field: "system.network.in.bytes" }
- match: { datafeeds.0.aggregations.histogram_buckets.aggs.non_negative_bytes.bucket_script.buckets_path.bytes: "bytes_in_derivative" }
- is_false: max_empty_searches
---
"Test delete datafeed":