Handle manual aggregations in datafeeds (elastic/elasticsearch#784)

* Handle manual aggregations in datafeeds

Adds a DataExtractor implementation that runs aggregated searches.

The manual aggregations supported have the following limitations:

- each aggregation can hava 0 or 1 sub-aggregations
- the top aggregation has to be a histogram
- sub-aggregations have to be either terms aggregations or single value
metric aggregations.

The response is converted into flat JSON documents that contain only the
fields of interest and can be parsed without additional context from our
JSON parser. The fields in the JSON documents correspond to the names of the aggregations.

Closes elastic/elasticsearch#680

Original commit: elastic/x-pack-elasticsearch@7dfd2d31e6
This commit is contained in:
Dimitris Athanasiou 2017-01-25 19:13:03 +00:00 committed by GitHub
parent 716f543f7b
commit 86291c12e2
16 changed files with 890 additions and 42 deletions

View File

@ -422,6 +422,9 @@ public class DatafeedConfig extends ToXContentToBytes implements Writeable {
if (types == null || types.isEmpty() || types.contains(null) || types.contains("")) {
throw invalidOptionValue(TYPES.getPreferredName(), types);
}
if (aggregations != null && (scriptFields != null && !scriptFields.isEmpty())) {
throw new IllegalArgumentException(Messages.getMessage(Messages.DATAFEED_CONFIG_CANNOT_USE_SCRIPT_FIELDS_WITH_AGGS));
}
return new DatafeedConfig(id, jobId, queryDelay, frequency, indexes, types, query, aggregations, scriptFields, scrollSize,
source);
}

View File

@ -21,6 +21,7 @@ import org.elasticsearch.xpack.ml.action.InternalStartDatafeedAction;
import org.elasticsearch.xpack.ml.action.UpdateDatafeedStatusAction;
import org.elasticsearch.xpack.ml.action.util.QueryPage;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationDataExtractorFactory;
import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.DataDescription;
import org.elasticsearch.xpack.ml.job.config.DefaultFrequency;
@ -198,7 +199,8 @@ public class DatafeedJobRunner extends AbstractComponent {
}
DataExtractorFactory createDataExtractorFactory(DatafeedConfig datafeedConfig, Job job) {
return new ScrollDataExtractorFactory(client, datafeedConfig, job);
return datafeedConfig.getAggregations() == null ? new ScrollDataExtractorFactory(client, datafeedConfig, job)
: new AggregationDataExtractorFactory(client, datafeedConfig, job);
}
private static DataDescription buildDataDescription(Job job) {

View File

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
/**
* Collects common utility methods needed by various {@link DataExtractor} implementations
*/
public final class ExtractorUtils {
private static final String EPOCH_MILLIS = "epoch_millis";
private ExtractorUtils() {}
/**
* Combines a user query with a time range query.
*/
public static QueryBuilder wrapInTimeRangeQuery(QueryBuilder userQuery, String timeField, long start, long end) {
QueryBuilder timeQuery = new RangeQueryBuilder(timeField).gte(start).lt(end).format(EPOCH_MILLIS);
return new BoolQueryBuilder().filter(userQuery).filter(timeQuery);
}
}

View File

@ -0,0 +1,112 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.ExtractorUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
/**
* An implementation that extracts data from elasticsearch using search with aggregations on a client.
* Cancellation is effective only when it is called before the first time {@link #next()} is called.
* Note that this class is NOT thread-safe.
*/
class AggregationDataExtractor implements DataExtractor {
private static final Logger LOGGER = Loggers.getLogger(AggregationDataExtractor.class);
private final Client client;
private final AggregationDataExtractorContext context;
private boolean hasNext;
private boolean isCancelled;
public AggregationDataExtractor(Client client, AggregationDataExtractorContext dataExtractorContext) {
this.client = Objects.requireNonNull(client);
this.context = Objects.requireNonNull(dataExtractorContext);
this.hasNext = true;
}
@Override
public boolean hasNext() {
return hasNext && !isCancelled;
}
@Override
public boolean isCancelled() {
return isCancelled;
}
@Override
public void cancel() {
LOGGER.trace("[{}] Data extractor received cancel request", context.jobId);
isCancelled = true;
}
@Override
public Optional<InputStream> next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
Optional<InputStream> stream = Optional.ofNullable(search());
hasNext = false;
return stream;
}
private InputStream search() throws IOException {
SearchResponse searchResponse = executeSearchRequest(buildSearchRequest());
if (searchResponse.status() != RestStatus.OK) {
throw new IOException("[" + context.jobId + "] Search request returned status code: " + searchResponse.status()
+ ". Response was:\n" + searchResponse.toString());
}
return processSearchResponse(searchResponse);
}
protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) {
return searchRequestBuilder.get();
}
private SearchRequestBuilder buildSearchRequest() {
SearchRequestBuilder searchRequestBuilder = SearchAction.INSTANCE.newRequestBuilder(client)
.addSort(context.timeField, SortOrder.ASC)
.setIndices(context.indexes)
.setTypes(context.types)
.setSize(0)
.setQuery(ExtractorUtils.wrapInTimeRangeQuery(context.query, context.timeField, context.start, context.end));
context.aggs.getAggregatorFactories().forEach(a -> searchRequestBuilder.addAggregation(a));
context.aggs.getPipelineAggregatorFactories().forEach(a -> searchRequestBuilder.addAggregation(a));
return searchRequestBuilder;
}
private InputStream processSearchResponse(SearchResponse searchResponse) throws IOException {
if (searchResponse.getAggregations() == null) {
return null;
}
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(outputStream)) {
for (Aggregation agg : searchResponse.getAggregations().asList()) {
processor.process(agg);
}
}
return new ByteArrayInputStream(outputStream.toByteArray());
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import java.util.List;
import java.util.Objects;
class AggregationDataExtractorContext {
final String jobId;
final String timeField;
final String[] indexes;
final String[] types;
final QueryBuilder query;
final AggregatorFactories.Builder aggs;
final long start;
final long end;
public AggregationDataExtractorContext(String jobId, String timeField, List<String> indexes, List<String> types, QueryBuilder query,
AggregatorFactories.Builder aggs, long start, long end) {
this.jobId = Objects.requireNonNull(jobId);
this.timeField = Objects.requireNonNull(timeField);
this.indexes = indexes.toArray(new String[indexes.size()]);
this.types = types.toArray(new String[types.size()]);
this.query = Objects.requireNonNull(query);
this.aggs = Objects.requireNonNull(aggs);
this.start = start;
this.end = end;
}
}

View File

@ -0,0 +1,41 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
import org.elasticsearch.client.Client;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractorFactory;
import org.elasticsearch.xpack.ml.job.config.Job;
import java.util.Objects;
public class AggregationDataExtractorFactory implements DataExtractorFactory {
private final Client client;
private final DatafeedConfig datafeedConfig;
private final Job job;
public AggregationDataExtractorFactory(Client client, DatafeedConfig datafeedConfig, Job job) {
this.client = Objects.requireNonNull(client);
this.datafeedConfig = Objects.requireNonNull(datafeedConfig);
this.job = Objects.requireNonNull(job);
}
@Override
public DataExtractor newExtractor(long start, long end) {
AggregationDataExtractorContext dataExtractorContext = new AggregationDataExtractorContext(
job.getId(),
job.getDataDescription().getTimeField(),
datafeedConfig.getIndexes(),
datafeedConfig.getTypes(),
datafeedConfig.getQuery(),
datafeedConfig.getAggregations(),
start,
end);
return new AggregationDataExtractor(client, dataExtractorContext);
}
}

View File

@ -0,0 +1,105 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* Processes {@link Aggregation} objects and writes flat JSON documents for each leaf aggregation.
*/
class AggregationToJsonProcessor implements Releasable {
private final XContentBuilder jsonBuilder;
private final Map<String, Object> keyValuePairs;
public AggregationToJsonProcessor(OutputStream outputStream) throws IOException {
jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream);
keyValuePairs = new LinkedHashMap<>();
}
/**
* Processes an {@link Aggregation} and writes a flat JSON document for each of its leaf aggregations.
* It expects aggregations to have 0..1 sub-aggregations.
* It expects the top level aggregation to be {@link Histogram}.
* It expects that all sub-aggregations of the top level are either {@link Terms} or {@link NumericMetricsAggregation.SingleValue}.
*/
public void process(Aggregation aggregation) throws IOException {
if (aggregation instanceof Histogram) {
processHistogram((Histogram) aggregation);
} else {
throw new IllegalArgumentException("Top level aggregation should be [histogram]");
}
}
private void processHistogram(Histogram histogram) throws IOException {
for (Histogram.Bucket bucket : histogram.getBuckets()) {
keyValuePairs.put(histogram.getName(), bucket.getKey());
processNestedAggs(bucket.getDocCount(), bucket.getAggregations());
}
}
private void processNestedAggs(long docCount, Aggregations subAggs) throws IOException {
List<Aggregation> aggs = subAggs == null ? Collections.emptyList() : subAggs.asList();
if (aggs.isEmpty()) {
writeJsonObject(docCount);
return;
}
if (aggs.size() > 1) {
throw new IllegalArgumentException("Multiple nested aggregations are not supported");
}
Aggregation nestedAgg = aggs.get(0);
if (nestedAgg instanceof Terms) {
processTerms((Terms) nestedAgg);
} else if (nestedAgg instanceof NumericMetricsAggregation.SingleValue) {
processSingleValue(docCount, (NumericMetricsAggregation.SingleValue) nestedAgg);
} else {
throw new IllegalArgumentException("Unsupported aggregation type [" + nestedAgg.getName() + "]");
}
}
private void processTerms(Terms termsAgg) throws IOException {
for (Terms.Bucket bucket : termsAgg.getBuckets()) {
keyValuePairs.put(termsAgg.getName(), bucket.getKey());
processNestedAggs(bucket.getDocCount(), bucket.getAggregations());
}
}
private void processSingleValue(long docCount, NumericMetricsAggregation.SingleValue singleValue) throws IOException {
keyValuePairs.put(singleValue.getName(), singleValue.value());
writeJsonObject(docCount);
}
private void writeJsonObject(long docCount) throws IOException {
if (docCount > 0) {
jsonBuilder.startObject();
for (Map.Entry<String, Object> keyValue : keyValuePairs.entrySet()) {
jsonBuilder.field(keyValue.getKey(), keyValue.getValue());
}
jsonBuilder.field(DatafeedConfig.DOC_COUNT, docCount);
jsonBuilder.endObject();
}
}
@Override
public void close() {
jsonBuilder.close();
}
}

View File

@ -14,14 +14,11 @@ import org.elasticsearch.action.search.SearchScrollAction;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.ExtractorUtils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@ -104,7 +101,8 @@ class ScrollDataExtractor implements DataExtractor {
.setIndices(context.indexes)
.setTypes(context.types)
.setSize(context.scrollSize)
.setQuery(createQuery());
.setQuery(ExtractorUtils.wrapInTimeRangeQuery(
context.query, context.extractedFields.timeField(), context.start, context.end));
for (String docValueField : context.extractedFields.getDocValueFields()) {
searchRequestBuilder.addDocValueField(docValueField);
@ -115,10 +113,7 @@ class ScrollDataExtractor implements DataExtractor {
} else {
searchRequestBuilder.setFetchSource(sourceFields, null);
}
for (SearchSourceBuilder.ScriptField scriptField : context.scriptFields) {
searchRequestBuilder.addScriptField(scriptField.fieldName(), scriptField.script());
}
context.scriptFields.forEach(f -> searchRequestBuilder.addScriptField(f.fieldName(), f.script()));
return searchRequestBuilder;
}
@ -166,15 +161,6 @@ class ScrollDataExtractor implements DataExtractor {
.get();
}
private QueryBuilder createQuery() {
QueryBuilder userQuery = context.query;
QueryBuilder timeQuery = new RangeQueryBuilder(context.extractedFields.timeField())
.gte(context.start)
.lt(context.end)
.format("epoch_millis");
return new BoolQueryBuilder().filter(userQuery).filter(timeQuery);
}
void clearScroll(String scrollId) {
ClearScrollAction.INSTANCE.newRequestBuilder(client).addScrollId(scrollId).get();
}

View File

@ -170,6 +170,7 @@ public final class Messages {
public static final String JOB_DATA_CONCURRENT_USE_UPLOAD = "job.data.concurrent.use.upload";
public static final String DATAFEED_CONFIG_INVALID_OPTION_VALUE = "datafeed.config.invalid.option.value";
public static final String DATAFEED_CONFIG_CANNOT_USE_SCRIPT_FIELDS_WITH_AGGS = "datafeed.config.cannot.use.script.fields.with.aggs";
public static final String DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY = "datafeed.does.not.support.job.with.latency";
public static final String DATAFEED_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD =

View File

@ -116,6 +116,7 @@ job.config.update.model.snapshot.retention.days.invalid = Invalid update value f
job.config.update.results.retention.days.invalid = Invalid update value for results_retention_days: value must be an exact number of days
job.config.update.datafeed.config.parse.error = JSON parse error reading the update value for datafeed_config
job.config.update.datafeed.config.cannot.be.null = Invalid update value for datafeed_config: null
datafeed.config.cannot.use.script.fields.with.aggs = script_fields cannot be used in combination with aggregations
job.config.unknown.function = Unknown function ''{0}''

View File

@ -40,8 +40,8 @@ public class GetDatafeedsActionResponseTests extends AbstractStreamableTestCase<
if (randomBoolean()) {
datafeedConfig.setQuery(QueryBuilders.termQuery(randomAsciiOfLength(10), randomAsciiOfLength(10)));
}
if (randomBoolean()) {
int scriptsSize = randomInt(3);
if (randomBoolean()) {
List<SearchSourceBuilder.ScriptField> scriptFields = new ArrayList<>(scriptsSize);
for (int scriptIndex = 0; scriptIndex < scriptsSize; scriptIndex++) {
scriptFields.add(new SearchSourceBuilder.ScriptField(randomAsciiOfLength(10), new Script(randomAsciiOfLength(10)),
@ -52,7 +52,7 @@ public class GetDatafeedsActionResponseTests extends AbstractStreamableTestCase<
if (randomBoolean()) {
datafeedConfig.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE));
}
if (randomBoolean()) {
if (randomBoolean() && scriptsSize == 0) {
AggregatorFactories.Builder aggsBuilder = new AggregatorFactories.Builder();
aggsBuilder.addAggregator(AggregationBuilders.avg(randomAsciiOfLength(10)));
datafeedConfig.setAggregations(aggsBuilder);

View File

@ -23,6 +23,8 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import static org.hamcrest.Matchers.equalTo;
public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedConfig> {
@Override
@ -37,7 +39,13 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
if (randomBoolean()) {
builder.setQuery(QueryBuilders.termQuery(randomAsciiOfLength(10), randomAsciiOfLength(10)));
}
if (randomBoolean()) {
int scriptsSize = randomInt(3);
List<SearchSourceBuilder.ScriptField> scriptFields = new ArrayList<>(scriptsSize);
for (int scriptIndex = 0; scriptIndex < scriptsSize; scriptIndex++) {
scriptFields.add(new SearchSourceBuilder.ScriptField(randomAsciiOfLength(10), new Script(randomAsciiOfLength(10)),
randomBoolean()));
}
if (randomBoolean() && scriptsSize == 0) {
// can only test with a single agg as the xcontent order gets randomized by test base class and then
// the actual xcontent isn't the same and test fail.
// Testing with a single agg is ok as we don't have special list writeable / xconent logic
@ -45,12 +53,6 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
aggs.addAggregator(AggregationBuilders.avg(randomAsciiOfLength(10)).field(randomAsciiOfLength(10)));
builder.setAggregations(aggs);
}
int scriptsSize = randomInt(3);
List<SearchSourceBuilder.ScriptField> scriptFields = new ArrayList<>(scriptsSize);
for (int scriptIndex = 0; scriptIndex < scriptsSize; scriptIndex++) {
scriptFields.add(new SearchSourceBuilder.ScriptField(randomAsciiOfLength(10), new Script(randomAsciiOfLength(10)),
randomBoolean()));
}
builder.setScriptFields(scriptFields);
if (randomBoolean()) {
builder.setScrollSize(randomIntBetween(0, Integer.MAX_VALUE));
@ -238,6 +240,19 @@ public class DatafeedConfigTests extends AbstractSerializingTestCase<DatafeedCon
assertEquals(Messages.getMessage(Messages.DATAFEED_CONFIG_INVALID_OPTION_VALUE, "scroll_size", -1000L), e.getMessage());
}
public void testBuild_GivenScriptFieldsAndAggregations() {
DatafeedConfig.Builder datafeed = new DatafeedConfig.Builder("datafeed1", "job1");
datafeed.setIndexes(Arrays.asList("my_index"));
datafeed.setTypes(Arrays.asList("my_type"));
datafeed.setScriptFields(Arrays.asList(new SearchSourceBuilder.ScriptField(randomAsciiOfLength(10),
new Script(randomAsciiOfLength(10)), randomBoolean())));
datafeed.setAggregations(new AggregatorFactories.Builder().addAggregator(AggregationBuilders.avg("foo")));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> datafeed.build());
assertThat(e.getMessage(), equalTo("script_fields cannot be used in combination with aggregations"));
}
public static String randomValidDatafeedId() {
CodepointSetGenerator generator = new CodepointSetGenerator("abcdefghijklmnopqrstuvwxyz".toCharArray());
return generator.ofCodePointsLength(random(), 10, 10);

View File

@ -0,0 +1,170 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramBucket;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createTerms;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.stringContainsInOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class AggregationDataExtractorTests extends ESTestCase {
private Client client;
private List<SearchRequestBuilder> capturedSearchRequests;
private String jobId;
private String timeField;
private List<String> types;
private List<String> indexes;
private QueryBuilder query;
private AggregatorFactories.Builder aggs;
private class TestDataExtractor extends AggregationDataExtractor {
private SearchResponse nextResponse;
public TestDataExtractor(long start, long end) {
super(client, createContext(start, end));
}
@Override
protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) {
capturedSearchRequests.add(searchRequestBuilder);
return nextResponse;
}
void setNextResponse(SearchResponse searchResponse) {
nextResponse = searchResponse;
}
}
@Before
public void setUpTests() {
client = mock(Client.class);
capturedSearchRequests = new ArrayList<>();
jobId = "test-job";
timeField = "time";
indexes = Arrays.asList("index-1", "index-2");
types = Arrays.asList("type-1", "type-2");
query = QueryBuilders.matchAllQuery();
aggs = new AggregatorFactories.Builder()
.addAggregator(AggregationBuilders.histogram("time").field("time").subAggregation(
AggregationBuilders.terms("airline").field("airline").subAggregation(
AggregationBuilders.avg("responsetime").field("responsetime"))));
}
public void testExtraction() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 3, Arrays.asList(
createTerms("airline", new Term("a", 1, "responsetime", 11.0), new Term("b", 2, "responsetime", 12.0)))),
createHistogramBucket(2000L, 0, Arrays.asList()),
createHistogramBucket(3000L, 7, Arrays.asList(
createTerms("airline", new Term("c", 4, "responsetime", 31.0), new Term("b", 3, "responsetime", 32.0))))
);
TestDataExtractor extractor = new TestDataExtractor(1000L, 4000L);
SearchResponse response = createSearchResponse("time", histogramBuckets);
extractor.setNextResponse(response);
assertThat(extractor.hasNext(), is(true));
Optional<InputStream> stream = extractor.next();
assertThat(stream.isPresent(), is(true));
String expectedStream = "{\"time\":1000,\"airline\":\"a\",\"responsetime\":11.0,\"doc_count\":1} "
+ "{\"time\":1000,\"airline\":\"b\",\"responsetime\":12.0,\"doc_count\":2} "
+ "{\"time\":3000,\"airline\":\"c\",\"responsetime\":31.0,\"doc_count\":4} "
+ "{\"time\":3000,\"airline\":\"b\",\"responsetime\":32.0,\"doc_count\":3}";
assertThat(asString(stream.get()), equalTo(expectedStream));
assertThat(extractor.hasNext(), is(false));
assertThat(capturedSearchRequests.size(), equalTo(1));
String searchRequest = capturedSearchRequests.get(0).toString().replaceAll("\\s", "");
assertThat(searchRequest, containsString("\"size\":0"));
assertThat(searchRequest, containsString("\"query\":{\"bool\":{\"filter\":[{\"match_all\":{\"boost\":1.0}}," +
"{\"range\":{\"time\":{\"from\":1000,\"to\":4000,\"include_lower\":true,\"include_upper\":false," +
"\"format\":\"epoch_millis\",\"boost\":1.0}}}]"));
assertThat(searchRequest, containsString("\"sort\":[{\"time\":{\"order\":\"asc\"}}]"));
assertThat(searchRequest,
stringContainsInOrder(Arrays.asList("aggregations", "histogram", "time", "terms", "airline", "avg", "responsetime")));
}
public void testExtractionGivenCancelBeforeNext() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 4000L);
SearchResponse response = createSearchResponse("time", Collections.emptyList());
extractor.setNextResponse(response);
extractor.cancel();
assertThat(extractor.hasNext(), is(false));
}
public void testExtractionGivenSearchResponseHasError() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
extractor.setNextResponse(createErrorResponse());
assertThat(extractor.hasNext(), is(true));
expectThrows(IOException.class, () -> extractor.next());
}
private AggregationDataExtractorContext createContext(long start, long end) {
return new AggregationDataExtractorContext(jobId, timeField, indexes, types, query, aggs, start, end);
}
private SearchResponse createSearchResponse(String histogramName, List<Histogram.Bucket> histogramBuckets) {
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.status()).thenReturn(RestStatus.OK);
when(searchResponse.getScrollId()).thenReturn(randomAsciiOfLength(1000));
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn(histogramName);
when(histogram.getBuckets()).thenReturn(histogramBuckets);
Aggregations searchAggs = mock(Aggregations.class);
when(searchAggs.asList()).thenReturn(Arrays.asList(histogram));
when(searchResponse.getAggregations()).thenReturn(searchAggs);
return searchResponse;
}
private SearchResponse createErrorResponse() {
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.status()).thenReturn(RestStatus.INTERNAL_SERVER_ERROR);
return searchResponse;
}
private static String asString(InputStream inputStream) throws IOException {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) {
return reader.lines().collect(Collectors.joining("\n"));
}
}
}

View File

@ -0,0 +1,89 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public final class AggregationTestUtils {
private AggregationTestUtils() {}
static Histogram.Bucket createHistogramBucket(long timestamp, long docCount, List<Aggregation> subAggregations) {
Histogram.Bucket bucket = createHistogramBucket(timestamp, docCount);
Aggregations aggs = createAggs(subAggregations);
when(bucket.getAggregations()).thenReturn(aggs);
return bucket;
}
static Aggregations createAggs(List<Aggregation> aggsList) {
Aggregations aggs = mock(Aggregations.class);
when(aggs.asList()).thenReturn(aggsList);
return aggs;
}
static Histogram.Bucket createHistogramBucket(long timestamp, long docCount) {
Histogram.Bucket bucket = mock(Histogram.Bucket.class);
when(bucket.getKey()).thenReturn(timestamp);
when(bucket.getDocCount()).thenReturn(docCount);
return bucket;
}
static NumericMetricsAggregation.SingleValue createSingleValue(String name, double value) {
NumericMetricsAggregation.SingleValue singleValue = mock(NumericMetricsAggregation.SingleValue.class);
when(singleValue.getName()).thenReturn(name);
when(singleValue.value()).thenReturn(value);
return singleValue;
}
static Terms createTerms(String name, Term... terms) {
Terms termsAgg = mock(Terms.class);
when(termsAgg.getName()).thenReturn(name);
List<Terms.Bucket> buckets = new ArrayList<>();
for (Term term: terms) {
StringTerms.Bucket bucket = mock(StringTerms.Bucket.class);
when(bucket.getKey()).thenReturn(term.key);
when(bucket.getDocCount()).thenReturn(term.count);
if (term.value != null) {
NumericMetricsAggregation.SingleValue termValue = createSingleValue(term.valueName, term.value);
Aggregations aggs = createAggs(Arrays.asList(termValue));
when(bucket.getAggregations()).thenReturn(aggs);
}
buckets.add(bucket);
}
when(termsAgg.getBuckets()).thenReturn(buckets);
return termsAgg;
}
static class Term {
String key;
long count;
String valueName;
Double value;
Term(String key, long count) {
this(key, count, null, null);
}
Term(String key, long count, String valueName, Double value) {
this.key = key;
this.count = count;
this.valueName = valueName;
this.value = value;
}
}
}

View File

@ -0,0 +1,149 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.test.ESTestCase;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createAggs;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramBucket;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createSingleValue;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createTerms;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class AggregationToJsonProcessorTests extends ESTestCase {
public void testProcessGivenHistogramOnly() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 3),
createHistogramBucket(2000L, 5)
);
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn("time");
when(histogram.getBuckets()).thenReturn(histogramBuckets);
String json = aggToString(histogram);
assertThat(json, equalTo("{\"time\":1000,\"doc_count\":3} {\"time\":2000,\"doc_count\":5}"));
}
public void testProcessGivenSingleMetricPerHistogram() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 3, Arrays.asList(createSingleValue("my_value", 1.0))),
createHistogramBucket(2000L, 5, Arrays.asList(createSingleValue("my_value", 2.0)))
);
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn("time");
when(histogram.getBuckets()).thenReturn(histogramBuckets);
String json = aggToString(histogram);
assertThat(json, equalTo("{\"time\":1000,\"my_value\":1.0,\"doc_count\":3} {\"time\":2000,\"my_value\":2.0,\"doc_count\":5}"));
}
public void testProcessGivenTermsPerHistogram() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 4, Arrays.asList(
createTerms("my_field", new Term("a", 1), new Term("b", 2), new Term("c", 1)))),
createHistogramBucket(2000L, 5, Arrays.asList(createTerms("my_field", new Term("a", 5), new Term("b", 2)))),
createHistogramBucket(3000L, 0, Arrays.asList()),
createHistogramBucket(4000L, 7, Arrays.asList(createTerms("my_field", new Term("c", 4), new Term("b", 3))))
);
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn("time");
when(histogram.getBuckets()).thenReturn(histogramBuckets);
String json = aggToString(histogram);
assertThat(json, equalTo("{\"time\":1000,\"my_field\":\"a\",\"doc_count\":1} " +
"{\"time\":1000,\"my_field\":\"b\",\"doc_count\":2} " +
"{\"time\":1000,\"my_field\":\"c\",\"doc_count\":1} " +
"{\"time\":2000,\"my_field\":\"a\",\"doc_count\":5} " +
"{\"time\":2000,\"my_field\":\"b\",\"doc_count\":2} " +
"{\"time\":4000,\"my_field\":\"c\",\"doc_count\":4} " +
"{\"time\":4000,\"my_field\":\"b\",\"doc_count\":3}"));
}
public void testProcessGivenSingleMetricPerSingleTermsPerHistogram() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 4, Arrays.asList(createTerms("my_field",
new Term("a", 1, "my_value", 11.0), new Term("b", 2, "my_value", 12.0), new Term("c", 1, "my_value", 13.0)))),
createHistogramBucket(2000L, 5, Arrays.asList(createTerms("my_field",
new Term("a", 5, "my_value", 21.0), new Term("b", 2, "my_value", 22.0)))),
createHistogramBucket(3000L, 0, Arrays.asList()),
createHistogramBucket(4000L, 7, Arrays.asList(createTerms("my_field",
new Term("c", 4, "my_value", 41.0), new Term("b", 3, "my_value", 42.0))))
);
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn("time");
when(histogram.getBuckets()).thenReturn(histogramBuckets);
String json = aggToString(histogram);
assertThat(json, equalTo("{\"time\":1000,\"my_field\":\"a\",\"my_value\":11.0,\"doc_count\":1} " +
"{\"time\":1000,\"my_field\":\"b\",\"my_value\":12.0,\"doc_count\":2} " +
"{\"time\":1000,\"my_field\":\"c\",\"my_value\":13.0,\"doc_count\":1} " +
"{\"time\":2000,\"my_field\":\"a\",\"my_value\":21.0,\"doc_count\":5} " +
"{\"time\":2000,\"my_field\":\"b\",\"my_value\":22.0,\"doc_count\":2} " +
"{\"time\":4000,\"my_field\":\"c\",\"my_value\":41.0,\"doc_count\":4} " +
"{\"time\":4000,\"my_field\":\"b\",\"my_value\":42.0,\"doc_count\":3}"));
}
public void testProcessGivenTopLevelAggIsNotHistogram() throws IOException {
Terms terms = mock(Terms.class);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString(terms));
assertThat(e.getMessage(), containsString("Top level aggregation should be [histogram]"));
}
public void testProcessGivenUnsupportedAggregationUnderHistogram() throws IOException {
Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2);
Histogram anotherHistogram = mock(Histogram.class);
when(anotherHistogram.getName()).thenReturn("nested-agg");
Aggregations subAggs = createAggs(Arrays.asList(anotherHistogram));
when(histogramBucket.getAggregations()).thenReturn(subAggs);
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn("buckets");
when(histogram.getBuckets()).thenReturn(Arrays.asList(histogramBucket));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString(histogram));
assertThat(e.getMessage(), containsString("Unsupported aggregation type [nested-agg]"));
}
public void testProcessGivenMultipleNestedAggregations() throws IOException {
Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2);
Terms terms1 = mock(Terms.class);
Terms terms2 = mock(Terms.class);
Aggregations subAggs = createAggs(Arrays.asList(terms1, terms2));
when(histogramBucket.getAggregations()).thenReturn(subAggs);
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn("buckets");
when(histogram.getBuckets()).thenReturn(Arrays.asList(histogramBucket));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString(histogram));
assertThat(e.getMessage(), containsString("Multiple nested aggregations are not supported"));
}
private String aggToString(Aggregation aggregation) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(outputStream)) {
processor.process(aggregation);
}
return outputStream.toString(StandardCharsets.UTF_8.name());
}
}

View File

@ -28,7 +28,7 @@ public class DatafeedJobIT extends ESRestTestCase {
@Before
public void setUpData() throws Exception {
// Create index with source = enabled, doc_values = enabled, stored = false
// Create empty index
String mappings = "{"
+ " \"mappings\": {"
+ " \"response\": {"
@ -40,6 +40,20 @@ public class DatafeedJobIT extends ESRestTestCase {
+ " }"
+ " }"
+ "}";
client().performRequest("put", "airline-data-empty", Collections.emptyMap(), new StringEntity(mappings));
// Create index with source = enabled, doc_values = enabled, stored = false
mappings = "{"
+ " \"mappings\": {"
+ " \"response\": {"
+ " \"properties\": {"
+ " \"time stamp\": { \"type\":\"date\"}," // space in 'time stamp' is intentional
+ " \"airline\": { \"type\":\"keyword\"},"
+ " \"responsetime\": { \"type\":\"float\"}"
+ " }"
+ " }"
+ " }"
+ "}";
client().performRequest("put", "airline-data", Collections.emptyMap(), new StringEntity(mappings));
client().performRequest("put", "airline-data/response/1", Collections.emptyMap(),
@ -103,6 +117,37 @@ public class DatafeedJobIT extends ESRestTestCase {
client().performRequest("put", "nested-data/response/2", Collections.emptyMap(),
new StringEntity("{\"time\":\"2016-06-01T01:59:00Z\",\"responsetime\":{\"millis\":222.0}}"));
// Create index with multiple docs per time interval for aggregation testing
mappings = "{"
+ " \"mappings\": {"
+ " \"response\": {"
+ " \"properties\": {"
+ " \"time stamp\": { \"type\":\"date\"}," // space in 'time stamp' is intentional
+ " \"airline\": { \"type\":\"keyword\"},"
+ " \"responsetime\": { \"type\":\"float\"}"
+ " }"
+ " }"
+ " }"
+ "}";
client().performRequest("put", "airline-data-aggs", Collections.emptyMap(), new StringEntity(mappings));
client().performRequest("put", "airline-data-aggs/response/1", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"AAA\",\"responsetime\":100.0}"));
client().performRequest("put", "airline-data-aggs/response/2", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T00:01:00Z\",\"airline\":\"AAA\",\"responsetime\":200.0}"));
client().performRequest("put", "airline-data-aggs/response/3", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T00:00:00Z\",\"airline\":\"BBB\",\"responsetime\":1000.0}"));
client().performRequest("put", "airline-data-aggs/response/4", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T00:01:00Z\",\"airline\":\"BBB\",\"responsetime\":2000.0}"));
client().performRequest("put", "airline-data-aggs/response/5", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T01:00:00Z\",\"airline\":\"AAA\",\"responsetime\":300.0}"));
client().performRequest("put", "airline-data-aggs/response/6", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T01:01:00Z\",\"airline\":\"AAA\",\"responsetime\":400.0}"));
client().performRequest("put", "airline-data-aggs/response/7", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T01:00:00Z\",\"airline\":\"BBB\",\"responsetime\":3000.0}"));
client().performRequest("put", "airline-data-aggs/response/8", Collections.emptyMap(),
new StringEntity("{\"time stamp\":\"2016-06-01T01:01:00Z\",\"airline\":\"BBB\",\"responsetime\":4000.0}"));
// Ensure all data is searchable
client().performRequest("post", "_refresh");
}
@ -140,11 +185,39 @@ public class DatafeedJobIT extends ESRestTestCase {
executeTestLookbackOnlyWithNestedFields("lookback-8", true);
}
public void testLookbackOnlyGivenEmptyIndex() throws Exception {
new LookbackOnlyTestHelper("lookback-9", "airline-data-empty").setShouldSucceedInput(false).setShouldSucceedProcessing(false)
.execute();
}
public void testLookbackOnlyGivenAggregations() throws Exception {
String jobId = "aggs-job";
String job = "{\"description\":\"Aggs job\",\"analysis_config\" :{\"bucket_span\":3600,\"summary_count_field_name\":\"doc_count\","
+ "\"detectors\":[{\"function\":\"mean\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]},"
+ "\"data_description\" : {\"time_field\":\"time stamp\"}"
+ "}";
client().performRequest("put", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(), new StringEntity(job));
String datafeedId = "datafeed-" + jobId;
String aggregations = "{\"time stamp\":{\"histogram\":{\"field\":\"time stamp\",\"interval\":3600000},"
+ "\"aggregations\":{\"airline\":{\"terms\":{\"field\":\"airline\",\"size\":10},"
+ "\"aggregations\":{\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}}}";
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response").setAggregations(aggregations).build();
openJob(client(), jobId);
startDatafeedAndWaitUntilStopped(datafeedId);
Response jobStatsResponse = client().performRequest("get", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String jobStatsResponseAsString = responseEntityToString(jobStatsResponse);
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":4"));
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":4"));
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0"));
}
public void testRealtime() throws Exception {
String jobId = "job-realtime-1";
createJob(jobId);
String datafeedId = jobId + "-datafeed";
createDatafeed(datafeedId, jobId, "airline-data", false, false);
new DatafeedBuilder(datafeedId, jobId, "airline-data", "response").build();
openJob(client(), jobId);
Response response = client().performRequest("post",
@ -222,7 +295,11 @@ public class DatafeedJobIT extends ESRestTestCase {
public void execute() throws Exception {
createJob(jobId);
String datafeedId = "datafeed-" + jobId;
createDatafeed(datafeedId, jobId, dataIndex, enableDatafeedSource, addScriptedFields);
new DatafeedBuilder(datafeedId, jobId, dataIndex, "response")
.setSource(enableDatafeedSource)
.setScriptedFields(addScriptedFields ?
"{\"airline\":{\"script\":{\"lang\":\"painless\",\"inline\":\"doc['airline'].value\"}}}" : null)
.build();
openJob(client(), jobId);
startDatafeedAndWaitUntilStopped(datafeedId);
@ -270,16 +347,6 @@ public class DatafeedJobIT extends ESRestTestCase {
Collections.emptyMap(), new StringEntity(job));
}
private Response createDatafeed(String datafeedId, String jobId, String dataIndex, boolean source, boolean addScriptedFields)
throws IOException {
String datafeedConfig = "{" + "\"job_id\": \"" + jobId + "\",\n" + "\"indexes\":[\"" + dataIndex + "\"],\n"
+ "\"types\":[\"response\"]" + (source ? ",\"_source\":true" : "") + (addScriptedFields ?
",\"script_fields\":{\"airline\":{\"script\":{\"lang\":\"painless\",\"inline\":\"doc['airline'].value\"}}}" : "")
+"}";
return client().performRequest("put", MlPlugin.BASE_PATH + "datafeeds/" + datafeedId, Collections.emptyMap(),
new StringEntity(datafeedConfig));
}
private static String responseEntityToString(Response response) throws Exception {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) {
return reader.lines().collect(Collectors.joining("\n"));
@ -298,7 +365,7 @@ public class DatafeedJobIT extends ESRestTestCase {
client().performRequest("put", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(), new StringEntity(job));
String datafeedId = jobId + "-datafeed";
createDatafeed(datafeedId, jobId, "nested-data", source, false);
new DatafeedBuilder(datafeedId, jobId, "nested-data", "response").setSource(source).build();
openJob(client(), jobId);
startDatafeedAndWaitUntilStopped(datafeedId);
@ -313,4 +380,47 @@ public class DatafeedJobIT extends ESRestTestCase {
public void clearMlState() throws Exception {
new MlRestTestStateCleaner(client(), this).clearMlMetadata();
}
private static class DatafeedBuilder {
String datafeedId;
String jobId;
String index;
String type;
boolean source;
String scriptedFields;
String aggregations;
DatafeedBuilder(String datafeedId, String jobId, String index, String type) {
this.datafeedId = datafeedId;
this.jobId = jobId;
this.index = index;
this.type = type;
}
DatafeedBuilder setSource(boolean enableSource) {
this.source = enableSource;
return this;
}
DatafeedBuilder setScriptedFields(String scriptedFields) {
this.scriptedFields = scriptedFields;
return this;
}
DatafeedBuilder setAggregations(String aggregations) {
this.aggregations = aggregations;
return this;
}
Response build() throws IOException {
String datafeedConfig = "{"
+ "\"job_id\": \"" + jobId + "\",\"indexes\":[\"" + index + "\"],\"types\":[\"" + type + "\"]"
+ (source ? ",\"_source\":true" : "")
+ (scriptedFields == null ? "" : ",\"script_fields\":" + scriptedFields)
+ (aggregations == null ? "" : ",\"aggs\":" + aggregations)
+ "}";
return client().performRequest("put", MlPlugin.BASE_PATH + "datafeeds/" + datafeedId, Collections.emptyMap(),
new StringEntity(datafeedConfig));
}
}
}