diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java index 6f707bd2cad..ed9a67e8ee3 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfig.java @@ -422,6 +422,9 @@ public class DatafeedConfig extends ToXContentToBytes implements Writeable { if (types == null || types.isEmpty() || types.contains(null) || types.contains("")) { throw invalidOptionValue(TYPES.getPreferredName(), types); } + if (aggregations != null && (scriptFields != null && !scriptFields.isEmpty())) { + throw new IllegalArgumentException(Messages.getMessage(Messages.DATAFEED_CONFIG_CANNOT_USE_SCRIPT_FIELDS_WITH_AGGS)); + } return new DatafeedConfig(id, jobId, queryDelay, frequency, indexes, types, query, aggregations, scriptFields, scrollSize, source); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java index b18d55bdc44..c5e674dbd89 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/DatafeedJobRunner.java @@ -21,6 +21,7 @@ import org.elasticsearch.xpack.ml.action.InternalStartDatafeedAction; import org.elasticsearch.xpack.ml.action.UpdateDatafeedStatusAction; import org.elasticsearch.xpack.ml.action.util.QueryPage; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; +import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory; import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory; import org.elasticsearch.xpack.ml.job.config.DataDescription; import org.elasticsearch.xpack.ml.job.config.DefaultFrequency; @@ -198,7 +199,8 @@ public class DatafeedJobRunner extends AbstractComponent { } DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeedConfig, Job job) { - return new ScrollDataExtractorFactory(client, datafeedConfig, job); + return datafeedConfig.getAggregations() == null ? new ScrollDataExtractorFactory(client, datafeedConfig, job) + : new AggregationDataExtractorFactory(client, datafeedConfig, job); } private static DataDescription buildDataDescription(Job job) { diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/ExtractorUtils.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/ExtractorUtils.java new file mode 100644 index 00000000000..e0c0b9cbd4f --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/ExtractorUtils.java @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor; + +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; + +/** + * Collects common utility methods needed by various {@link DataExtractor} implementations + */ +public final class ExtractorUtils { + + private static final String EPOCH_MILLIS = "epoch_millis"; + + private ExtractorUtils() {} + + /** + * Combines a user query with a time range query. + */ + public static QueryBuilder wrapInTimeRangeQuery(QueryBuilder userQuery, String timeField, long start, long end) { + QueryBuilder timeQuery = new RangeQueryBuilder(timeField).gte(start).lt(end).format(EPOCH_MILLIS); + return new BoolQueryBuilder().filter(userQuery).filter(timeQuery); + } +} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java new file mode 100644 index 00000000000..98a6fbe92e4 --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractor.java @@ -0,0 +1,112 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; + +import org.apache.logging.log4j.Logger; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.sort.SortOrder; +import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor; +import org.elasticsearch.xpack.ml.datafeed.extractor.ExtractorUtils; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Optional; + +/** + * An implementation that extracts data from elasticsearch using search with aggregations on a client. + * Cancellation is effective only when it is called before the first time {@link #next()} is called. + * Note that this class is NOT thread-safe. + */ +class AggregationDataExtractor implements DataExtractor { + + private static final Logger LOGGER = Loggers.getLogger(AggregationDataExtractor.class); + + private final Client client; + private final AggregationDataExtractorContext context; + private boolean hasNext; + private boolean isCancelled; + + public AggregationDataExtractor(Client client, AggregationDataExtractorContext dataExtractorContext) { + this.client = Objects.requireNonNull(client); + this.context = Objects.requireNonNull(dataExtractorContext); + this.hasNext = true; + } + + @Override + public boolean hasNext() { + return hasNext && !isCancelled; + } + + @Override + public boolean isCancelled() { + return isCancelled; + } + + @Override + public void cancel() { + LOGGER.trace("[{}] Data extractor received cancel request", context.jobId); + isCancelled = true; + } + + @Override + public Optional next() throws IOException { + if (!hasNext()) { + throw new NoSuchElementException(); + } + Optional stream = Optional.ofNullable(search()); + hasNext = false; + return stream; + } + + private InputStream search() throws IOException { + SearchResponse searchResponse = executeSearchRequest(buildSearchRequest()); + if (searchResponse.status() != RestStatus.OK) { + throw new IOException("[" + context.jobId + "] Search request returned status code: " + searchResponse.status() + + ". Response was:\n" + searchResponse.toString()); + } + return processSearchResponse(searchResponse); + } + + protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) { + return searchRequestBuilder.get(); + } + + private SearchRequestBuilder buildSearchRequest() { + SearchRequestBuilder searchRequestBuilder = SearchAction.INSTANCE.newRequestBuilder(client) + .addSort(context.timeField, SortOrder.ASC) + .setIndices(context.indexes) + .setTypes(context.types) + .setSize(0) + .setQuery(ExtractorUtils.wrapInTimeRangeQuery(context.query, context.timeField, context.start, context.end)); + + context.aggs.getAggregatorFactories().forEach(a -> searchRequestBuilder.addAggregation(a)); + context.aggs.getPipelineAggregatorFactories().forEach(a -> searchRequestBuilder.addAggregation(a)); + return searchRequestBuilder; + } + + private InputStream processSearchResponse(SearchResponse searchResponse) throws IOException { + if (searchResponse.getAggregations() == null) { + return null; + } + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(outputStream)) { + for (Aggregation agg : searchResponse.getAggregations().asList()) { + processor.process(agg); + } + } + return new ByteArrayInputStream(outputStream.toByteArray()); + } +} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorContext.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorContext.java new file mode 100644 index 00000000000..4a814b6c2e0 --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorContext.java @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; + +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.aggregations.AggregatorFactories; + +import java.util.List; +import java.util.Objects; + +class AggregationDataExtractorContext { + + final String jobId; + final String timeField; + final String[] indexes; + final String[] types; + final QueryBuilder query; + final AggregatorFactories.Builder aggs; + final long start; + final long end; + + public AggregationDataExtractorContext(String jobId, String timeField, List indexes, List types, QueryBuilder query, + AggregatorFactories.Builder aggs, long start, long end) { + this.jobId = Objects.requireNonNull(jobId); + this.timeField = Objects.requireNonNull(timeField); + this.indexes = indexes.toArray(new String[indexes.size()]); + this.types = types.toArray(new String[types.size()]); + this.query = Objects.requireNonNull(query); + this.aggs = Objects.requireNonNull(aggs); + this.start = start; + this.end = end; + } +} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java new file mode 100644 index 00000000000..757ee0503c9 --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorFactory.java @@ -0,0 +1,41 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; + +import org.elasticsearch.client.Client; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; +import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor; +import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory; +import org.elasticsearch.xpack.ml.job.config.Job; + +import java.util.Objects; + +public class AggregationDataExtractorFactory implements DataExtractorFactory { + + private final Client client; + private final DatafeedConfig datafeedConfig; + private final Job job; + + public AggregationDataExtractorFactory(Client client, DatafeedConfig datafeedConfig, Job job) { + this.client = Objects.requireNonNull(client); + this.datafeedConfig = Objects.requireNonNull(datafeedConfig); + this.job = Objects.requireNonNull(job); + } + + @Override + public DataExtractor newExtractor(long start, long end) { + AggregationDataExtractorContext dataExtractorContext = new AggregationDataExtractorContext( + job.getId(), + job.getDataDescription().getTimeField(), + datafeedConfig.getIndexes(), + datafeedConfig.getTypes(), + datafeedConfig.getQuery(), + datafeedConfig.getAggregations(), + start, + end); + return new AggregationDataExtractor(client, dataExtractorContext); + } +} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java new file mode 100644 index 00000000000..ecf6e29a453 --- /dev/null +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessor.java @@ -0,0 +1,105 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; + +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; +import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Processes {@link Aggregation} objects and writes flat JSON documents for each leaf aggregation. + */ +class AggregationToJsonProcessor implements Releasable { + + private final XContentBuilder jsonBuilder; + private final Map keyValuePairs; + + public AggregationToJsonProcessor(OutputStream outputStream) throws IOException { + jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream); + keyValuePairs = new LinkedHashMap<>(); + } + + /** + * Processes an {@link Aggregation} and writes a flat JSON document for each of its leaf aggregations. + * It expects aggregations to have 0..1 sub-aggregations. + * It expects the top level aggregation to be {@link Histogram}. + * It expects that all sub-aggregations of the top level are either {@link Terms} or {@link NumericMetricsAggregation.SingleValue}. + */ + public void process(Aggregation aggregation) throws IOException { + if (aggregation instanceof Histogram) { + processHistogram((Histogram) aggregation); + } else { + throw new IllegalArgumentException("Top level aggregation should be [histogram]"); + } + } + + private void processHistogram(Histogram histogram) throws IOException { + for (Histogram.Bucket bucket : histogram.getBuckets()) { + keyValuePairs.put(histogram.getName(), bucket.getKey()); + processNestedAggs(bucket.getDocCount(), bucket.getAggregations()); + } + } + + private void processNestedAggs(long docCount, Aggregations subAggs) throws IOException { + List aggs = subAggs == null ? Collections.emptyList() : subAggs.asList(); + if (aggs.isEmpty()) { + writeJsonObject(docCount); + return; + } + if (aggs.size() > 1) { + throw new IllegalArgumentException("Multiple nested aggregations are not supported"); + } + Aggregation nestedAgg = aggs.get(0); + if (nestedAgg instanceof Terms) { + processTerms((Terms) nestedAgg); + } else if (nestedAgg instanceof NumericMetricsAggregation.SingleValue) { + processSingleValue(docCount, (NumericMetricsAggregation.SingleValue) nestedAgg); + } else { + throw new IllegalArgumentException("Unsupported aggregation type [" + nestedAgg.getName() + "]"); + } + } + + private void processTerms(Terms termsAgg) throws IOException { + for (Terms.Bucket bucket : termsAgg.getBuckets()) { + keyValuePairs.put(termsAgg.getName(), bucket.getKey()); + processNestedAggs(bucket.getDocCount(), bucket.getAggregations()); + } + } + + private void processSingleValue(long docCount, NumericMetricsAggregation.SingleValue singleValue) throws IOException { + keyValuePairs.put(singleValue.getName(), singleValue.value()); + writeJsonObject(docCount); + } + + private void writeJsonObject(long docCount) throws IOException { + if (docCount > 0) { + jsonBuilder.startObject(); + for (Map.Entry keyValue : keyValuePairs.entrySet()) { + jsonBuilder.field(keyValue.getKey(), keyValue.getValue()); + } + jsonBuilder.field(DatafeedConfig.DOC_COUNT, docCount); + jsonBuilder.endObject(); + } + } + + @Override + public void close() { + jsonBuilder.close(); + } +} diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java index a857c7c8bbd..0c621cf27c5 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/datafeed/extractor/scroll/ScrollDataExtractor.java @@ -14,14 +14,11 @@ import org.elasticsearch.action.search.SearchScrollAction; import org.elasticsearch.client.Client; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.index.query.BoolQueryBuilder; -import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor; +import org.elasticsearch.xpack.ml.datafeed.extractor.ExtractorUtils; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; @@ -104,7 +101,8 @@ class ScrollDataExtractor implements DataExtractor { .setIndices(context.indexes) .setTypes(context.types) .setSize(context.scrollSize) - .setQuery(createQuery()); + .setQuery(ExtractorUtils.wrapInTimeRangeQuery( + context.query, context.extractedFields.timeField(), context.start, context.end)); for (String docValueField : context.extractedFields.getDocValueFields()) { searchRequestBuilder.addDocValueField(docValueField); @@ -115,10 +113,7 @@ class ScrollDataExtractor implements DataExtractor { } else { searchRequestBuilder.setFetchSource(sourceFields, null); } - - for (SearchSourceBuilder.ScriptField scriptField : context.scriptFields) { - searchRequestBuilder.addScriptField(scriptField.fieldName(), scriptField.script()); - } + context.scriptFields.forEach(f -> searchRequestBuilder.addScriptField(f.fieldName(), f.script())); return searchRequestBuilder; } @@ -166,15 +161,6 @@ class ScrollDataExtractor implements DataExtractor { .get(); } - private QueryBuilder createQuery() { - QueryBuilder userQuery = context.query; - QueryBuilder timeQuery = new RangeQueryBuilder(context.extractedFields.timeField()) - .gte(context.start) - .lt(context.end) - .format("epoch_millis"); - return new BoolQueryBuilder().filter(userQuery).filter(timeQuery); - } - void clearScroll(String scrollId) { ClearScrollAction.INSTANCE.newRequestBuilder(client).addScrollId(scrollId).get(); } diff --git a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java index 8aa2b4bd8eb..58d9ae83845 100644 --- a/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java +++ b/elasticsearch/src/main/java/org/elasticsearch/xpack/ml/job/messages/Messages.java @@ -170,6 +170,7 @@ public final class Messages { public static final String JOB_DATA_CONCURRENT_USE_UPLOAD = "job.data.concurrent.use.upload"; public static final String DATAFEED_CONFIG_INVALID_OPTION_VALUE = "datafeed.config.invalid.option.value"; + public static final String DATAFEED_CONFIG_CANNOT_USE_SCRIPT_FIELDS_WITH_AGGS = "datafeed.config.cannot.use.script.fields.with.aggs"; public static final String DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "datafeed.does.not.support.job.with.latency"; public static final String DATAFEED_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD = diff --git a/elasticsearch/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties b/elasticsearch/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties index 39952d56572..ed00e5f399b 100644 --- a/elasticsearch/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties +++ b/elasticsearch/src/main/resources/org/elasticsearch/xpack/ml/job/messages/ml_messages.properties @@ -116,6 +116,7 @@ job.config.update.model.snapshot.retention.days.invalid = Invalid update value f job.config.update.results.retention.days.invalid = Invalid update value for results_retention_days: value must be an exact number of days job.config.update.datafeed.config.parse.error = JSON parse error reading the update value for datafeed_config job.config.update.datafeed.config.cannot.be.null = Invalid update value for datafeed_config: null +datafeed.config.cannot.use.script.fields.with.aggs = script_fields cannot be used in combination with aggregations job.config.unknown.function = Unknown function ''{0}'' diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/GetDatafeedsActionResponseTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/GetDatafeedsActionResponseTests.java index 401f5f39bbb..48ea779ee24 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/GetDatafeedsActionResponseTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/action/GetDatafeedsActionResponseTests.java @@ -40,8 +40,8 @@ public class GetDatafeedsActionResponseTests extends AbstractStreamableTestCase< if (randomBoolean()) { datafeedConfig.setQuery(QueryBuilders.termQuery(randomAsciiOfLength(10), randomAsciiOfLength(10))); } + int scriptsSize = randomInt(3); if (randomBoolean()) { - int scriptsSize = randomInt(3); List scriptFields = new ArrayList<>(scriptsSize); for (int scriptIndex = 0; scriptIndex < scriptsSize; scriptIndex++) { scriptFields.add(new SearchSourceBuilder.ScriptField(randomAsciiOfLength(10), new Script(randomAsciiOfLength(10)), @@ -52,7 +52,7 @@ public class GetDatafeedsActionResponseTests extends AbstractStreamableTestCase< if (randomBoolean()) { datafeedConfig.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE)); } - if (randomBoolean()) { + if (randomBoolean() && scriptsSize == 0) { AggregatorFactories.Builder aggsBuilder = new AggregatorFactories.Builder(); aggsBuilder.addAggregator(AggregationBuilders.avg(randomAsciiOfLength(10))); datafeedConfig.setAggregations(aggsBuilder); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java index 96bcd325a89..52abbcb1ca3 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/DatafeedConfigTests.java @@ -23,6 +23,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import static org.hamcrest.Matchers.equalTo; + public class DatafeedConfigTests extends AbstractSerializingTestCase { @Override @@ -37,7 +39,13 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase scriptFields = new ArrayList<>(scriptsSize); + for (int scriptIndex = 0; scriptIndex < scriptsSize; scriptIndex++) { + scriptFields.add(new SearchSourceBuilder.ScriptField(randomAsciiOfLength(10), new Script(randomAsciiOfLength(10)), + randomBoolean())); + } + if (randomBoolean() && scriptsSize == 0) { // can only test with a single agg as the xcontent order gets randomized by test base class and then // the actual xcontent isn't the same and test fail. // Testing with a single agg is ok as we don't have special list writeable / xconent logic @@ -45,12 +53,6 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase scriptFields = new ArrayList<>(scriptsSize); - for (int scriptIndex = 0; scriptIndex < scriptsSize; scriptIndex++) { - scriptFields.add(new SearchSourceBuilder.ScriptField(randomAsciiOfLength(10), new Script(randomAsciiOfLength(10)), - randomBoolean())); - } builder.setScriptFields(scriptFields); if (randomBoolean()) { builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE)); @@ -238,6 +240,19 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase datafeed.build()); + + assertThat(e.getMessage(), equalTo("script_fields cannot be used in combination with aggregations")); + } + public static String randomValidDatafeedId() { CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray()); return generator.ofCodePointsLength(random(), 10, 10); diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java new file mode 100644 index 00000000000..20ad65e73b3 --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationDataExtractorTests.java @@ -0,0 +1,170 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; + +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.AggregatorFactories; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.test.ESTestCase; +import org.junit.Before; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term; +import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramBucket; +import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createTerms; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.stringContainsInOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class AggregationDataExtractorTests extends ESTestCase { + + private Client client; + private List capturedSearchRequests; + private String jobId; + private String timeField; + private List types; + private List indexes; + private QueryBuilder query; + private AggregatorFactories.Builder aggs; + + private class TestDataExtractor extends AggregationDataExtractor { + + private SearchResponse nextResponse; + + public TestDataExtractor(long start, long end) { + super(client, createContext(start, end)); + } + + @Override + protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) { + capturedSearchRequests.add(searchRequestBuilder); + return nextResponse; + } + + void setNextResponse(SearchResponse searchResponse) { + nextResponse = searchResponse; + } + } + + @Before + public void setUpTests() { + client = mock(Client.class); + capturedSearchRequests = new ArrayList<>(); + jobId = "test-job"; + timeField = "time"; + indexes = Arrays.asList("index-1", "index-2"); + types = Arrays.asList("type-1", "type-2"); + query = QueryBuilders.matchAllQuery(); + aggs = new AggregatorFactories.Builder() + .addAggregator(AggregationBuilders.histogram("time").field("time").subAggregation( + AggregationBuilders.terms("airline").field("airline").subAggregation( + AggregationBuilders.avg("responsetime").field("responsetime")))); + } + + public void testExtraction() throws IOException { + List histogramBuckets = Arrays.asList( + createHistogramBucket(1000L, 3, Arrays.asList( + createTerms("airline", new Term("a", 1, "responsetime", 11.0), new Term("b", 2, "responsetime", 12.0)))), + createHistogramBucket(2000L, 0, Arrays.asList()), + createHistogramBucket(3000L, 7, Arrays.asList( + createTerms("airline", new Term("c", 4, "responsetime", 31.0), new Term("b", 3, "responsetime", 32.0)))) + ); + + TestDataExtractor extractor = new TestDataExtractor(1000L, 4000L); + + SearchResponse response = createSearchResponse("time", histogramBuckets); + extractor.setNextResponse(response); + + assertThat(extractor.hasNext(), is(true)); + Optional stream = extractor.next(); + assertThat(stream.isPresent(), is(true)); + String expectedStream = "{\"time\":1000,\"airline\":\"a\",\"responsetime\":11.0,\"doc_count\":1} " + + "{\"time\":1000,\"airline\":\"b\",\"responsetime\":12.0,\"doc_count\":2} " + + "{\"time\":3000,\"airline\":\"c\",\"responsetime\":31.0,\"doc_count\":4} " + + "{\"time\":3000,\"airline\":\"b\",\"responsetime\":32.0,\"doc_count\":3}"; + assertThat(asString(stream.get()), equalTo(expectedStream)); + assertThat(extractor.hasNext(), is(false)); + assertThat(capturedSearchRequests.size(), equalTo(1)); + + String searchRequest = capturedSearchRequests.get(0).toString().replaceAll("\\s", ""); + assertThat(searchRequest, containsString("\"size\":0")); + assertThat(searchRequest, containsString("\"query\":{\"bool\":{\"filter\":[{\"match_all\":{\"boost\":1.0}}," + + "{\"range\":{\"time\":{\"from\":1000,\"to\":4000,\"include_lower\":true,\"include_upper\":false," + + "\"format\":\"epoch_millis\",\"boost\":1.0}}}]")); + assertThat(searchRequest, containsString("\"sort\":[{\"time\":{\"order\":\"asc\"}}]")); + assertThat(searchRequest, + stringContainsInOrder(Arrays.asList("aggregations", "histogram", "time", "terms", "airline", "avg", "responsetime"))); + } + + public void testExtractionGivenCancelBeforeNext() throws IOException { + TestDataExtractor extractor = new TestDataExtractor(1000L, 4000L); + SearchResponse response = createSearchResponse("time", Collections.emptyList()); + extractor.setNextResponse(response); + + extractor.cancel(); + assertThat(extractor.hasNext(), is(false)); + } + + public void testExtractionGivenSearchResponseHasError() throws IOException { + TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L); + extractor.setNextResponse(createErrorResponse()); + + assertThat(extractor.hasNext(), is(true)); + expectThrows(IOException.class, () -> extractor.next()); + } + + private AggregationDataExtractorContext createContext(long start, long end) { + return new AggregationDataExtractorContext(jobId, timeField, indexes, types, query, aggs, start, end); + } + + private SearchResponse createSearchResponse(String histogramName, List histogramBuckets) { + SearchResponse searchResponse = mock(SearchResponse.class); + when(searchResponse.status()).thenReturn(RestStatus.OK); + when(searchResponse.getScrollId()).thenReturn(randomAsciiOfLength(1000)); + + Histogram histogram = mock(Histogram.class); + when(histogram.getName()).thenReturn(histogramName); + when(histogram.getBuckets()).thenReturn(histogramBuckets); + + Aggregations searchAggs = mock(Aggregations.class); + when(searchAggs.asList()).thenReturn(Arrays.asList(histogram)); + when(searchResponse.getAggregations()).thenReturn(searchAggs); + return searchResponse; + } + + private SearchResponse createErrorResponse() { + SearchResponse searchResponse = mock(SearchResponse.class); + when(searchResponse.status()).thenReturn(RestStatus.INTERNAL_SERVER_ERROR); + return searchResponse; + } + + private static String asString(InputStream inputStream) throws IOException { + try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { + return reader.lines().collect(Collectors.joining("\n")); + } + } +} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java new file mode 100644 index 00000000000..242a22bb7d6 --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationTestUtils.java @@ -0,0 +1,89 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; + +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public final class AggregationTestUtils { + + private AggregationTestUtils() {} + + static Histogram.Bucket createHistogramBucket(long timestamp, long docCount, List subAggregations) { + Histogram.Bucket bucket = createHistogramBucket(timestamp, docCount); + Aggregations aggs = createAggs(subAggregations); + when(bucket.getAggregations()).thenReturn(aggs); + return bucket; + } + + static Aggregations createAggs(List aggsList) { + Aggregations aggs = mock(Aggregations.class); + when(aggs.asList()).thenReturn(aggsList); + return aggs; + } + + static Histogram.Bucket createHistogramBucket(long timestamp, long docCount) { + Histogram.Bucket bucket = mock(Histogram.Bucket.class); + when(bucket.getKey()).thenReturn(timestamp); + when(bucket.getDocCount()).thenReturn(docCount); + return bucket; + } + + static NumericMetricsAggregation.SingleValue createSingleValue(String name, double value) { + NumericMetricsAggregation.SingleValue singleValue = mock(NumericMetricsAggregation.SingleValue.class); + when(singleValue.getName()).thenReturn(name); + when(singleValue.value()).thenReturn(value); + return singleValue; + } + + static Terms createTerms(String name, Term... terms) { + Terms termsAgg = mock(Terms.class); + when(termsAgg.getName()).thenReturn(name); + List buckets = new ArrayList<>(); + for (Term term: terms) { + StringTerms.Bucket bucket = mock(StringTerms.Bucket.class); + when(bucket.getKey()).thenReturn(term.key); + when(bucket.getDocCount()).thenReturn(term.count); + if (term.value != null) { + NumericMetricsAggregation.SingleValue termValue = createSingleValue(term.valueName, term.value); + Aggregations aggs = createAggs(Arrays.asList(termValue)); + when(bucket.getAggregations()).thenReturn(aggs); + } + buckets.add(bucket); + } + when(termsAgg.getBuckets()).thenReturn(buckets); + return termsAgg; + } + + static class Term { + String key; + long count; + String valueName; + Double value; + + Term(String key, long count) { + this(key, count, null, null); + } + + Term(String key, long count, String valueName, Double value) { + this.key = key; + this.count = count; + this.valueName = valueName; + this.value = value; + } + } +} diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java new file mode 100644 index 00000000000..3439bc29bc3 --- /dev/null +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/datafeed/extractor/aggregation/AggregationToJsonProcessorTests.java @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; + +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.test.ESTestCase; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; + +import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term; +import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createAggs; +import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramBucket; +import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createSingleValue; +import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createTerms; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class AggregationToJsonProcessorTests extends ESTestCase { + + public void testProcessGivenHistogramOnly() throws IOException { + List histogramBuckets = Arrays.asList( + createHistogramBucket(1000L, 3), + createHistogramBucket(2000L, 5) + ); + Histogram histogram = mock(Histogram.class); + when(histogram.getName()).thenReturn("time"); + when(histogram.getBuckets()).thenReturn(histogramBuckets); + + String json = aggToString(histogram); + + assertThat(json, equalTo("{\"time\":1000,\"doc_count\":3} {\"time\":2000,\"doc_count\":5}")); + } + + public void testProcessGivenSingleMetricPerHistogram() throws IOException { + List histogramBuckets = Arrays.asList( + createHistogramBucket(1000L, 3, Arrays.asList(createSingleValue("my_value", 1.0))), + createHistogramBucket(2000L, 5, Arrays.asList(createSingleValue("my_value", 2.0))) + ); + Histogram histogram = mock(Histogram.class); + when(histogram.getName()).thenReturn("time"); + when(histogram.getBuckets()).thenReturn(histogramBuckets); + + String json = aggToString(histogram); + + assertThat(json, equalTo("{\"time\":1000,\"my_value\":1.0,\"doc_count\":3} {\"time\":2000,\"my_value\":2.0,\"doc_count\":5}")); + } + + public void testProcessGivenTermsPerHistogram() throws IOException { + List histogramBuckets = Arrays.asList( + createHistogramBucket(1000L, 4, Arrays.asList( + createTerms("my_field", new Term("a", 1), new Term("b", 2), new Term("c", 1)))), + createHistogramBucket(2000L, 5, Arrays.asList(createTerms("my_field", new Term("a", 5), new Term("b", 2)))), + createHistogramBucket(3000L, 0, Arrays.asList()), + createHistogramBucket(4000L, 7, Arrays.asList(createTerms("my_field", new Term("c", 4), new Term("b", 3)))) + ); + Histogram histogram = mock(Histogram.class); + when(histogram.getName()).thenReturn("time"); + when(histogram.getBuckets()).thenReturn(histogramBuckets); + + String json = aggToString(histogram); + + assertThat(json, equalTo("{\"time\":1000,\"my_field\":\"a\",\"doc_count\":1} " + + "{\"time\":1000,\"my_field\":\"b\",\"doc_count\":2} " + + "{\"time\":1000,\"my_field\":\"c\",\"doc_count\":1} " + + "{\"time\":2000,\"my_field\":\"a\",\"doc_count\":5} " + + "{\"time\":2000,\"my_field\":\"b\",\"doc_count\":2} " + + "{\"time\":4000,\"my_field\":\"c\",\"doc_count\":4} " + + "{\"time\":4000,\"my_field\":\"b\",\"doc_count\":3}")); + } + + public void testProcessGivenSingleMetricPerSingleTermsPerHistogram() throws IOException { + List histogramBuckets = Arrays.asList( + createHistogramBucket(1000L, 4, Arrays.asList(createTerms("my_field", + new Term("a", 1, "my_value", 11.0), new Term("b", 2, "my_value", 12.0), new Term("c", 1, "my_value", 13.0)))), + createHistogramBucket(2000L, 5, Arrays.asList(createTerms("my_field", + new Term("a", 5, "my_value", 21.0), new Term("b", 2, "my_value", 22.0)))), + createHistogramBucket(3000L, 0, Arrays.asList()), + createHistogramBucket(4000L, 7, Arrays.asList(createTerms("my_field", + new Term("c", 4, "my_value", 41.0), new Term("b", 3, "my_value", 42.0)))) + ); + Histogram histogram = mock(Histogram.class); + when(histogram.getName()).thenReturn("time"); + when(histogram.getBuckets()).thenReturn(histogramBuckets); + + String json = aggToString(histogram); + + assertThat(json, equalTo("{\"time\":1000,\"my_field\":\"a\",\"my_value\":11.0,\"doc_count\":1} " + + "{\"time\":1000,\"my_field\":\"b\",\"my_value\":12.0,\"doc_count\":2} " + + "{\"time\":1000,\"my_field\":\"c\",\"my_value\":13.0,\"doc_count\":1} " + + "{\"time\":2000,\"my_field\":\"a\",\"my_value\":21.0,\"doc_count\":5} " + + "{\"time\":2000,\"my_field\":\"b\",\"my_value\":22.0,\"doc_count\":2} " + + "{\"time\":4000,\"my_field\":\"c\",\"my_value\":41.0,\"doc_count\":4} " + + "{\"time\":4000,\"my_field\":\"b\",\"my_value\":42.0,\"doc_count\":3}")); + } + + public void testProcessGivenTopLevelAggIsNotHistogram() throws IOException { + Terms terms = mock(Terms.class); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString(terms)); + assertThat(e.getMessage(), containsString("Top level aggregation should be [histogram]")); + } + + public void testProcessGivenUnsupportedAggregationUnderHistogram() throws IOException { + Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2); + Histogram anotherHistogram = mock(Histogram.class); + when(anotherHistogram.getName()).thenReturn("nested-agg"); + Aggregations subAggs = createAggs(Arrays.asList(anotherHistogram)); + when(histogramBucket.getAggregations()).thenReturn(subAggs); + Histogram histogram = mock(Histogram.class); + when(histogram.getName()).thenReturn("buckets"); + when(histogram.getBuckets()).thenReturn(Arrays.asList(histogramBucket)); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString(histogram)); + assertThat(e.getMessage(), containsString("Unsupported aggregation type [nested-agg]")); + } + + public void testProcessGivenMultipleNestedAggregations() throws IOException { + Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2); + Terms terms1 = mock(Terms.class); + Terms terms2 = mock(Terms.class); + Aggregations subAggs = createAggs(Arrays.asList(terms1, terms2)); + when(histogramBucket.getAggregations()).thenReturn(subAggs); + Histogram histogram = mock(Histogram.class); + when(histogram.getName()).thenReturn("buckets"); + when(histogram.getBuckets()).thenReturn(Arrays.asList(histogramBucket)); + + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString(histogram)); + assertThat(e.getMessage(), containsString("Multiple nested aggregations are not supported")); + } + + private String aggToString(Aggregation aggregation) throws IOException { + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(outputStream)) { + processor.process(aggregation); + } + return outputStream.toString(StandardCharsets.UTF_8.name()); + } +} \ No newline at end of file diff --git a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java index a5176f90819..768f47b625c 100644 --- a/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java +++ b/elasticsearch/src/test/java/org/elasticsearch/xpack/ml/integration/DatafeedJobIT.java @@ -28,7 +28,7 @@ public class DatafeedJobIT extends ESRestTestCase { @Before public void setUpData() throws Exception { - // Create index with source = enabled, doc_values = enabled, stored = false + // Create empty index String mappings = "{" + " \"mappings\": {" + " \"response\": {" @@ -40,6 +40,20 @@ public class DatafeedJobIT extends ESRestTestCase { + " }" + " }" + "}"; + client().performRequest("put", "airline-data-empty", Collections.emptyMap(), new StringEntity(mappings)); + + // Create index with source = enabled, doc_values = enabled, stored = false + mappings = "{" + + " \"mappings\": {" + + " \"response\": {" + + " \"properties\": {" + + " \"time stamp\": { \"type\":\"date\"}," // space in 'time stamp' is intentional + + " \"airline\": { \"type\":\"keyword\"}," + + " \"responsetime\": { \"type\":\"float\"}" + + " }" + + " }" + + " }" + + "}"; client().performRequest("put", "airline-data", Collections.emptyMap(), new StringEntity(mappings)); client().performRequest("put", "airline-data/response/1", Collections.emptyMap(), @@ -103,6 +117,37 @@ public class DatafeedJobIT extends ESRestTestCase { client().performRequest("put", "nested-data/response/2", Collections.emptyMap(), new StringEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"responsetime\":{\"millis\":222.0}}")); + // Create index with multiple docs per time interval for aggregation testing + mappings = "{" + + " \"mappings\": {" + + " \"response\": {" + + " \"properties\": {" + + " \"time stamp\": { \"type\":\"date\"}," // space in 'time stamp' is intentional + + " \"airline\": { \"type\":\"keyword\"}," + + " \"responsetime\": { \"type\":\"float\"}" + + " }" + + " }" + + " }" + + "}"; + client().performRequest("put", "airline-data-aggs", Collections.emptyMap(), new StringEntity(mappings)); + + client().performRequest("put", "airline-data-aggs/response/1", Collections.emptyMap(), + new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":100.0}")); + client().performRequest("put", "airline-data-aggs/response/2", Collections.emptyMap(), + new StringEntity("{\"time stamp\":\"2016-06-01T00:01:00Z\",\"airline\":\"AAA\",\"responsetime\":200.0}")); + client().performRequest("put", "airline-data-aggs/response/3", Collections.emptyMap(), + new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"BBB\",\"responsetime\":1000.0}")); + client().performRequest("put", "airline-data-aggs/response/4", Collections.emptyMap(), + new StringEntity("{\"time stamp\":\"2016-06-01T00:01:00Z\",\"airline\":\"BBB\",\"responsetime\":2000.0}")); + client().performRequest("put", "airline-data-aggs/response/5", Collections.emptyMap(), + new StringEntity("{\"time stamp\":\"2016-06-01T01:00:00Z\",\"airline\":\"AAA\",\"responsetime\":300.0}")); + client().performRequest("put", "airline-data-aggs/response/6", Collections.emptyMap(), + new StringEntity("{\"time stamp\":\"2016-06-01T01:01:00Z\",\"airline\":\"AAA\",\"responsetime\":400.0}")); + client().performRequest("put", "airline-data-aggs/response/7", Collections.emptyMap(), + new StringEntity("{\"time stamp\":\"2016-06-01T01:00:00Z\",\"airline\":\"BBB\",\"responsetime\":3000.0}")); + client().performRequest("put", "airline-data-aggs/response/8", Collections.emptyMap(), + new StringEntity("{\"time stamp\":\"2016-06-01T01:01:00Z\",\"airline\":\"BBB\",\"responsetime\":4000.0}")); + // Ensure all data is searchable client().performRequest("post", "_refresh"); } @@ -140,11 +185,39 @@ public class DatafeedJobIT extends ESRestTestCase { executeTestLookbackOnlyWithNestedFields("lookback-8", true); } + public void testLookbackOnlyGivenEmptyIndex() throws Exception { + new LookbackOnlyTestHelper("lookback-9", "airline-data-empty").setShouldSucceedInput(false).setShouldSucceedProcessing(false) + .execute(); + } + + public void testLookbackOnlyGivenAggregations() throws Exception { + String jobId = "aggs-job"; + String job = "{\"description\":\"Aggs job\",\"analysis_config\" :{\"bucket_span\":3600,\"summary_count_field_name\":\"doc_count\"," + + "\"detectors\":[{\"function\":\"mean\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]}," + + "\"data_description\" : {\"time_field\":\"time stamp\"}" + + "}"; + client().performRequest("put", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(), new StringEntity(job)); + + String datafeedId = "datafeed-" + jobId; + String aggregations = "{\"time stamp\":{\"histogram\":{\"field\":\"time stamp\",\"interval\":3600000}," + + "\"aggregations\":{\"airline\":{\"terms\":{\"field\":\"airline\",\"size\":10}," + + "\"aggregations\":{\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}}}"; + new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response").setAggregations(aggregations).build(); + openJob(client(), jobId); + + startDatafeedAndWaitUntilStopped(datafeedId); + Response jobStatsResponse = client().performRequest("get", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats"); + String jobStatsResponseAsString = responseEntityToString(jobStatsResponse); + assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":4")); + assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":4")); + assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0")); + } + public void testRealtime() throws Exception { String jobId = "job-realtime-1"; createJob(jobId); String datafeedId = jobId + "-datafeed"; - createDatafeed(datafeedId, jobId, "airline-data", false, false); + new DatafeedBuilder(datafeedId, jobId, "airline-data", "response").build(); openJob(client(), jobId); Response response = client().performRequest("post", @@ -222,7 +295,11 @@ public class DatafeedJobIT extends ESRestTestCase { public void execute() throws Exception { createJob(jobId); String datafeedId = "datafeed-" + jobId; - createDatafeed(datafeedId, jobId, dataIndex, enableDatafeedSource, addScriptedFields); + new DatafeedBuilder(datafeedId, jobId, dataIndex, "response") + .setSource(enableDatafeedSource) + .setScriptedFields(addScriptedFields ? + "{\"airline\":{\"script\":{\"lang\":\"painless\",\"inline\":\"doc['airline'].value\"}}}" : null) + .build(); openJob(client(), jobId); startDatafeedAndWaitUntilStopped(datafeedId); @@ -270,16 +347,6 @@ public class DatafeedJobIT extends ESRestTestCase { Collections.emptyMap(), new StringEntity(job)); } - private Response createDatafeed(String datafeedId, String jobId, String dataIndex, boolean source, boolean addScriptedFields) - throws IOException { - String datafeedConfig = "{" + "\"job_id\": \"" + jobId + "\",\n" + "\"indexes\":[\"" + dataIndex + "\"],\n" - + "\"types\":[\"response\"]" + (source ? ",\"_source\":true" : "") + (addScriptedFields ? - ",\"script_fields\":{\"airline\":{\"script\":{\"lang\":\"painless\",\"inline\":\"doc['airline'].value\"}}}" : "") - +"}"; - return client().performRequest("put", MlPlugin.BASE_PATH + "datafeeds/" + datafeedId, Collections.emptyMap(), - new StringEntity(datafeedConfig)); - } - private static String responseEntityToString(Response response) throws Exception { try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) { return reader.lines().collect(Collectors.joining("\n")); @@ -298,7 +365,7 @@ public class DatafeedJobIT extends ESRestTestCase { client().performRequest("put", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(), new StringEntity(job)); String datafeedId = jobId + "-datafeed"; - createDatafeed(datafeedId, jobId, "nested-data", source, false); + new DatafeedBuilder(datafeedId, jobId, "nested-data", "response").setSource(source).build(); openJob(client(), jobId); startDatafeedAndWaitUntilStopped(datafeedId); @@ -313,4 +380,47 @@ public class DatafeedJobIT extends ESRestTestCase { public void clearMlState() throws Exception { new MlRestTestStateCleaner(client(), this).clearMlMetadata(); } + + private static class DatafeedBuilder { + String datafeedId; + String jobId; + String index; + String type; + boolean source; + String scriptedFields; + String aggregations; + + DatafeedBuilder(String datafeedId, String jobId, String index, String type) { + this.datafeedId = datafeedId; + this.jobId = jobId; + this.index = index; + this.type = type; + } + + DatafeedBuilder setSource(boolean enableSource) { + this.source = enableSource; + return this; + } + + DatafeedBuilder setScriptedFields(String scriptedFields) { + this.scriptedFields = scriptedFields; + return this; + } + + DatafeedBuilder setAggregations(String aggregations) { + this.aggregations = aggregations; + return this; + } + + Response build() throws IOException { + String datafeedConfig = "{" + + "\"job_id\": \"" + jobId + "\",\"indexes\":[\"" + index + "\"],\"types\":[\"" + type + "\"]" + + (source ? ",\"_source\":true" : "") + + (scriptedFields == null ? "" : ",\"script_fields\":" + scriptedFields) + + (aggregations == null ? "" : ",\"aggs\":" + aggregations) + + "}"; + return client().performRequest("put", MlPlugin.BASE_PATH + "datafeeds/" + datafeedId, Collections.emptyMap(), + new StringEntity(datafeedConfig)); + } + } }