[ML] Extend aggregation support (elastic/x-pack-elasticsearch#705)

* [ML] Allow summary_count_field_name to be set to custom field

... in aggregated datafeeds.

This allows implementing e.g. distinct_count anomaly detection
with aggregations. (see case 1 in elastic/x-pack-elasticsearch#659)

Relates to elastic/x-pack-elasticsearch#659

* [ML] Handle multiple leaf aggregations

This commit allows multiple numeric leaf aggregations.
That enables use cases where the data are presummarised and
thus the summary count field is a sum aggregation on the
event rate field. (see 2 in elastic/x-pack-elasticsearch#659)

Relates to elastic/x-pack-elasticsearch#659

Original commit: elastic/x-pack-elasticsearch@07a34a18c6
This commit is contained in:
Dimitris Athanasiou 2017-03-09 15:26:44 +00:00 committed by GitHub
parent 1c52495c5a
commit 72f7698647
11 changed files with 306 additions and 56 deletions

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.ml.datafeed; package org.elasticsearch.xpack.ml.datafeed;
import org.elasticsearch.common.Strings;
import org.elasticsearch.xpack.ml.job.config.AnalysisConfig; import org.elasticsearch.xpack.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.ml.job.config.Job; import org.elasticsearch.xpack.ml.job.config.Job;
import org.elasticsearch.xpack.ml.job.messages.Messages; import org.elasticsearch.xpack.ml.job.messages.Messages;
@ -23,7 +24,7 @@ public final class DatafeedJobValidator {
if (analysisConfig.getLatency() != null && analysisConfig.getLatency() > 0) { if (analysisConfig.getLatency() != null && analysisConfig.getLatency() > 0) {
throw new IllegalArgumentException(Messages.getMessage(Messages.DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY)); throw new IllegalArgumentException(Messages.getMessage(Messages.DATAFEED_DOES_NOT_SUPPORT_JOB_WITH_LATENCY));
} }
if (datafeedConfig.hasAggregations() && !DatafeedConfig.DOC_COUNT.equals(analysisConfig.getSummaryCountFieldName())) { if (datafeedConfig.hasAggregations() && Strings.isNullOrEmpty(analysisConfig.getSummaryCountFieldName())) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
Messages.getMessage(Messages.DATAFEED_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, DatafeedConfig.DOC_COUNT)); Messages.getMessage(Messages.DATAFEED_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, DatafeedConfig.DOC_COUNT));
} }

View File

@ -98,7 +98,7 @@ class AggregationDataExtractor implements DataExtractor {
return null; return null;
} }
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(outputStream)) { try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(context.includeDocCount, outputStream)) {
for (Aggregation agg : searchResponse.getAggregations().asList()) { for (Aggregation agg : searchResponse.getAggregations().asList()) {
processor.process(agg); processor.process(agg);
} }

View File

@ -21,9 +21,10 @@ class AggregationDataExtractorContext {
final AggregatorFactories.Builder aggs; final AggregatorFactories.Builder aggs;
final long start; final long start;
final long end; final long end;
final boolean includeDocCount;
AggregationDataExtractorContext(String jobId, String timeField, List<String> indexes, List<String> types, QueryBuilder query, AggregationDataExtractorContext(String jobId, String timeField, List<String> indexes, List<String> types, QueryBuilder query,
AggregatorFactories.Builder aggs, long start, long end) { AggregatorFactories.Builder aggs, long start, long end, boolean includeDocCount) {
this.jobId = Objects.requireNonNull(jobId); this.jobId = Objects.requireNonNull(jobId);
this.timeField = Objects.requireNonNull(timeField); this.timeField = Objects.requireNonNull(timeField);
this.indexes = indexes.toArray(new String[indexes.size()]); this.indexes = indexes.toArray(new String[indexes.size()]);
@ -32,5 +33,6 @@ class AggregationDataExtractorContext {
this.aggs = Objects.requireNonNull(aggs); this.aggs = Objects.requireNonNull(aggs);
this.start = start; this.start = start;
this.end = end; this.end = end;
this.includeDocCount = includeDocCount;
} }
} }

View File

@ -35,7 +35,8 @@ public class AggregationDataExtractorFactory implements DataExtractorFactory {
datafeedConfig.getQuery(), datafeedConfig.getQuery(),
datafeedConfig.getAggregations(), datafeedConfig.getAggregations(),
start, start,
end); end,
job.getAnalysisConfig().getSummaryCountFieldName().equals(DatafeedConfig.DOC_COUNT));
return new AggregationDataExtractor(client, dataExtractorContext); return new AggregationDataExtractor(client, dataExtractorContext);
} }
} }

View File

@ -18,6 +18,7 @@ import org.joda.time.base.BaseDateTime;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
@ -28,10 +29,12 @@ import java.util.Map;
*/ */
class AggregationToJsonProcessor implements Releasable { class AggregationToJsonProcessor implements Releasable {
private final boolean includeDocCount;
private final XContentBuilder jsonBuilder; private final XContentBuilder jsonBuilder;
private final Map<String, Object> keyValuePairs; private final Map<String, Object> keyValuePairs;
AggregationToJsonProcessor(OutputStream outputStream) throws IOException { AggregationToJsonProcessor(boolean includeDocCount, OutputStream outputStream) throws IOException {
this.includeDocCount = includeDocCount;
jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream); jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream);
keyValuePairs = new LinkedHashMap<>(); keyValuePairs = new LinkedHashMap<>();
} }
@ -67,16 +70,22 @@ class AggregationToJsonProcessor implements Releasable {
writeJsonObject(docCount); writeJsonObject(docCount);
return; return;
} }
if (aggs.size() > 1) { if (aggs.get(0) instanceof Terms) {
throw new IllegalArgumentException("Multiple nested aggregations are not supported"); if (aggs.size() > 1) {
} throw new IllegalArgumentException("Multiple non-leaf nested aggregations are not supported");
Aggregation nestedAgg = aggs.get(0); }
if (nestedAgg instanceof Terms) { processTerms((Terms) aggs.get(0));
processTerms((Terms) nestedAgg);
} else if (nestedAgg instanceof NumericMetricsAggregation.SingleValue) {
processSingleValue(docCount, (NumericMetricsAggregation.SingleValue) nestedAgg);
} else { } else {
throw new IllegalArgumentException("Unsupported aggregation type [" + nestedAgg.getName() + "]"); List<String> addedKeys = new ArrayList<>();
for (Aggregation nestedAgg : aggs) {
if (nestedAgg instanceof NumericMetricsAggregation.SingleValue) {
addedKeys.add(processSingleValue(docCount, (NumericMetricsAggregation.SingleValue) nestedAgg));
} else {
throw new IllegalArgumentException("Unsupported aggregation type [" + nestedAgg.getName() + "]");
}
}
writeJsonObject(docCount);
addedKeys.forEach(k -> keyValuePairs.remove(k));
} }
} }
@ -84,12 +93,13 @@ class AggregationToJsonProcessor implements Releasable {
for (Terms.Bucket bucket : termsAgg.getBuckets()) { for (Terms.Bucket bucket : termsAgg.getBuckets()) {
keyValuePairs.put(termsAgg.getName(), bucket.getKey()); keyValuePairs.put(termsAgg.getName(), bucket.getKey());
processNestedAggs(bucket.getDocCount(), bucket.getAggregations()); processNestedAggs(bucket.getDocCount(), bucket.getAggregations());
keyValuePairs.remove(termsAgg.getName());
} }
} }
private void processSingleValue(long docCount, NumericMetricsAggregation.SingleValue singleValue) throws IOException { private String processSingleValue(long docCount, NumericMetricsAggregation.SingleValue singleValue) throws IOException {
keyValuePairs.put(singleValue.getName(), singleValue.value()); keyValuePairs.put(singleValue.getName(), singleValue.value());
writeJsonObject(docCount); return singleValue.getName();
} }
private void writeJsonObject(long docCount) throws IOException { private void writeJsonObject(long docCount) throws IOException {
@ -98,7 +108,9 @@ class AggregationToJsonProcessor implements Releasable {
for (Map.Entry<String, Object> keyValue : keyValuePairs.entrySet()) { for (Map.Entry<String, Object> keyValue : keyValuePairs.entrySet()) {
jsonBuilder.field(keyValue.getKey(), keyValue.getValue()); jsonBuilder.field(keyValue.getKey(), keyValue.getValue());
} }
jsonBuilder.field(DatafeedConfig.DOC_COUNT, docCount); if (includeDocCount) {
jsonBuilder.field(DatafeedConfig.DOC_COUNT, docCount);
}
jsonBuilder.endObject(); jsonBuilder.endObject();
} }
} }

View File

@ -14,7 +14,7 @@ import java.util.Locale;
public final class Messages { public final class Messages {
public static final String DATAFEED_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD = public static final String DATAFEED_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD =
"A job configured with a datafeed with aggregations must have summary_count_field_name ''{0}''"; "A job configured with a datafeed with aggregations must set summary_count_field_name; use doc_count or suitable alternative";
public static final String DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE = "Cannot delete datafeed [{0}] while its status is {1}"; public static final String DATAFEED_CANNOT_DELETE_IN_CURRENT_STATE = "Cannot delete datafeed [{0}] while its status is {1}";
public static final String DATAFEED_CANNOT_UPDATE_IN_CURRENT_STATE = "Cannot update datafeed [{0}] while its status is {1}"; public static final String DATAFEED_CANNOT_UPDATE_IN_CURRENT_STATE = "Cannot update datafeed [{0}] while its status is {1}";
public static final String DATAFEED_CONFIG_CANNOT_USE_SCRIPT_FIELDS_WITH_AGGS = public static final String DATAFEED_CONFIG_CANNOT_USE_SCRIPT_FIELDS_WITH_AGGS =

View File

@ -60,23 +60,12 @@ public class DatafeedJobValidatorTests extends ESTestCase {
DatafeedJobValidator.validate(datafeedConfig, job); DatafeedJobValidator.validate(datafeedConfig, job);
} }
public void testVerify_GivenAggsAndCorrectSummaryCountField() throws IOException {
Job.Builder builder = buildJobBuilder("foo");
AnalysisConfig.Builder ac = createAnalysisConfig();
ac.setBucketSpan(1800L);
ac.setSummaryCountFieldName("doc_count");
builder.setAnalysisConfig(ac);
Job job = builder.build();
DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs().build();
DatafeedJobValidator.validate(datafeedConfig, job);
}
public void testVerify_GivenAggsAndNoSummaryCountField() throws IOException { public void testVerify_GivenAggsAndNoSummaryCountField() throws IOException {
String errorMessage = Messages.getMessage(Messages.DATAFEED_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, String errorMessage = Messages.getMessage(Messages.DATAFEED_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD,
DatafeedConfig.DOC_COUNT); DatafeedConfig.DOC_COUNT);
Job.Builder builder = buildJobBuilder("foo"); Job.Builder builder = buildJobBuilder("foo");
AnalysisConfig.Builder ac = createAnalysisConfig(); AnalysisConfig.Builder ac = createAnalysisConfig();
ac.setSummaryCountFieldName(null);
ac.setBucketSpan(1800L); ac.setBucketSpan(1800L);
builder.setAnalysisConfig(ac); builder.setAnalysisConfig(ac);
Job job = builder.build(); Job job = builder.build();
@ -88,13 +77,13 @@ public class DatafeedJobValidatorTests extends ESTestCase {
assertEquals(errorMessage, e.getMessage()); assertEquals(errorMessage, e.getMessage());
} }
public void testVerify_GivenAggsAndWrongSummaryCountField() throws IOException { public void testVerify_GivenAggsAndEmptySummaryCountField() throws IOException {
String errorMessage = Messages.getMessage( String errorMessage = Messages.getMessage(Messages.DATAFEED_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD,
Messages.DATAFEED_AGGREGATIONS_REQUIRES_JOB_WITH_SUMMARY_COUNT_FIELD, DatafeedConfig.DOC_COUNT); DatafeedConfig.DOC_COUNT);
Job.Builder builder = buildJobBuilder("foo"); Job.Builder builder = buildJobBuilder("foo");
AnalysisConfig.Builder ac = createAnalysisConfig(); AnalysisConfig.Builder ac = createAnalysisConfig();
ac.setSummaryCountFieldName("");
ac.setBucketSpan(1800L); ac.setBucketSpan(1800L);
ac.setSummaryCountFieldName("wrong");
builder.setAnalysisConfig(ac); builder.setAnalysisConfig(ac);
Job job = builder.build(); Job job = builder.build();
DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs().build(); DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs().build();
@ -105,6 +94,18 @@ public class DatafeedJobValidatorTests extends ESTestCase {
assertEquals(errorMessage, e.getMessage()); assertEquals(errorMessage, e.getMessage());
} }
public void testVerify_GivenAggsAndSummaryCountField() throws IOException {
Job.Builder builder = buildJobBuilder("foo");
AnalysisConfig.Builder ac = createAnalysisConfig();
ac.setBucketSpan(1800L);
ac.setSummaryCountFieldName("some_count");
builder.setAnalysisConfig(ac);
Job job = builder.build();
DatafeedConfig datafeedConfig = createValidDatafeedConfigWithAggs().build();
DatafeedJobValidator.validate(datafeedConfig, job);
}
public static Job.Builder buildJobBuilder(String id) { public static Job.Builder buildJobBuilder(String id) {
Job.Builder builder = new Job.Builder(id); Job.Builder builder = new Job.Builder(id);
builder.setCreateTime(new Date()); builder.setCreateTime(new Date());

View File

@ -17,7 +17,6 @@ import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.AggregatorFactories; import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.ml.datafeed.extractor.scroll.ScrollDataExtractorTests;
import org.junit.Before; import org.junit.Before;
import java.io.BufferedReader; import java.io.BufferedReader;
@ -156,7 +155,7 @@ public class AggregationDataExtractorTests extends ESTestCase {
} }
private AggregationDataExtractorContext createContext(long start, long end) { private AggregationDataExtractorContext createContext(long start, long end) {
return new AggregationDataExtractorContext(jobId, timeField, indexes, types, query, aggs, start, end); return new AggregationDataExtractorContext(jobId, timeField, indexes, types, query, aggs, start, end, true);
} }
private SearchResponse createSearchResponse(String histogramName, List<Histogram.Bucket> histogramBuckets) { private SearchResponse createSearchResponse(String histogramName, List<Histogram.Bucket> histogramBuckets) {

View File

@ -15,7 +15,10 @@ import org.joda.time.DateTime;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -66,9 +69,12 @@ public final class AggregationTestUtils {
StringTerms.Bucket bucket = mock(StringTerms.Bucket.class); StringTerms.Bucket bucket = mock(StringTerms.Bucket.class);
when(bucket.getKey()).thenReturn(term.key); when(bucket.getKey()).thenReturn(term.key);
when(bucket.getDocCount()).thenReturn(term.count); when(bucket.getDocCount()).thenReturn(term.count);
if (term.value != null) { List<Aggregation> numericAggs = new ArrayList<>();
NumericMetricsAggregation.SingleValue termValue = createSingleValue(term.valueName, term.value); for (Map.Entry<String, Double> keyValue : term.values.entrySet()) {
Aggregations aggs = createAggs(Arrays.asList(termValue)); numericAggs.add(createSingleValue(keyValue.getKey(), keyValue.getValue()));
}
if (!numericAggs.isEmpty()) {
Aggregations aggs = createAggs(numericAggs);
when(bucket.getAggregations()).thenReturn(aggs); when(bucket.getAggregations()).thenReturn(aggs);
} }
buckets.add(bucket); buckets.add(bucket);
@ -80,18 +86,26 @@ public final class AggregationTestUtils {
static class Term { static class Term {
String key; String key;
long count; long count;
String valueName; Map<String, Double> values;
Double value;
Term(String key, long count) { Term(String key, long count) {
this(key, count, null, null); this(key, count, Collections.emptyMap());
} }
Term(String key, long count, String valueName, Double value) { Term(String key, long count, String valueName, Double value) {
this(key, count, newKeyValue(valueName, value));
}
Term(String key, long count, Map<String, Double> values) {
this.key = key; this.key = key;
this.count = count; this.count = count;
this.valueName = valueName; this.values = values;
this.value = value; }
private static Map<String, Double> newKeyValue(String key, Double value) {
Map<String, Double> keyValue = new HashMap<>();
keyValue.put(key, value);
return keyValue;
} }
} }
} }

View File

@ -16,7 +16,9 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Arrays; import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createAggs; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createAggs;
@ -45,6 +47,20 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
assertThat(json, equalTo("{\"time\":1000,\"doc_count\":3} {\"time\":2000,\"doc_count\":5}")); assertThat(json, equalTo("{\"time\":1000,\"doc_count\":3} {\"time\":2000,\"doc_count\":5}"));
} }
public void testProcessGivenHistogramOnlyAndNoDocCount() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 3),
createHistogramBucket(2000L, 5)
);
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn("time");
when(histogram.getBuckets()).thenReturn(histogramBuckets);
String json = aggToString(histogram, false);
assertThat(json, equalTo("{\"time\":1000} {\"time\":2000}"));
}
public void testProcessGivenSingleMetricPerHistogram() throws IOException { public void testProcessGivenSingleMetricPerHistogram() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList( List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 3, Arrays.asList(createSingleValue("my_value", 1.0))), createHistogramBucket(1000L, 3, Arrays.asList(createSingleValue("my_value", 1.0))),
@ -107,6 +123,52 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
"{\"time\":4000,\"my_field\":\"b\",\"my_value\":42.0,\"doc_count\":3}")); "{\"time\":4000,\"my_field\":\"b\",\"my_value\":42.0,\"doc_count\":3}"));
} }
public void testProcessGivenMultipleSingleMetricPerSingleTermsPerHistogram() throws IOException {
Map<String, Double> a1NumericAggs = new LinkedHashMap<>();
a1NumericAggs.put("my_value", 111.0);
a1NumericAggs.put("my_value2", 112.0);
Map<String, Double> b1NumericAggs = new LinkedHashMap<>();
b1NumericAggs.put("my_value", 121.0);
b1NumericAggs.put("my_value2", 122.0);
Map<String, Double> c1NumericAggs = new LinkedHashMap<>();
c1NumericAggs.put("my_value", 131.0);
c1NumericAggs.put("my_value2", 132.0);
Map<String, Double> a2NumericAggs = new LinkedHashMap<>();
a2NumericAggs.put("my_value", 211.0);
a2NumericAggs.put("my_value2", 212.0);
Map<String, Double> b2NumericAggs = new LinkedHashMap<>();
b2NumericAggs.put("my_value", 221.0);
b2NumericAggs.put("my_value2", 222.0);
Map<String, Double> c4NumericAggs = new LinkedHashMap<>();
c4NumericAggs.put("my_value", 411.0);
c4NumericAggs.put("my_value2", 412.0);
Map<String, Double> b4NumericAggs = new LinkedHashMap<>();
b4NumericAggs.put("my_value", 421.0);
b4NumericAggs.put("my_value2", 422.0);
List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 4, Arrays.asList(createTerms("my_field",
new Term("a", 1, a1NumericAggs), new Term("b", 2, b1NumericAggs), new Term("c", 1, c1NumericAggs)))),
createHistogramBucket(2000L, 5, Arrays.asList(createTerms("my_field",
new Term("a", 5, a2NumericAggs), new Term("b", 2, b2NumericAggs)))),
createHistogramBucket(3000L, 0, Arrays.asList()),
createHistogramBucket(4000L, 7, Arrays.asList(createTerms("my_field",
new Term("c", 4, c4NumericAggs), new Term("b", 3, b4NumericAggs))))
);
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn("time");
when(histogram.getBuckets()).thenReturn(histogramBuckets);
String json = aggToString(histogram, false);
assertThat(json, equalTo("{\"time\":1000,\"my_field\":\"a\",\"my_value\":111.0,\"my_value2\":112.0} " +
"{\"time\":1000,\"my_field\":\"b\",\"my_value\":121.0,\"my_value2\":122.0} " +
"{\"time\":1000,\"my_field\":\"c\",\"my_value\":131.0,\"my_value2\":132.0} " +
"{\"time\":2000,\"my_field\":\"a\",\"my_value\":211.0,\"my_value2\":212.0} " +
"{\"time\":2000,\"my_field\":\"b\",\"my_value\":221.0,\"my_value2\":222.0} " +
"{\"time\":4000,\"my_field\":\"c\",\"my_value\":411.0,\"my_value2\":412.0} " +
"{\"time\":4000,\"my_field\":\"b\",\"my_value\":421.0,\"my_value2\":422.0}"));
}
public void testProcessGivenTopLevelAggIsNotHistogram() throws IOException { public void testProcessGivenTopLevelAggIsNotHistogram() throws IOException {
Terms terms = mock(Terms.class); Terms terms = mock(Terms.class);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString(terms)); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString(terms));
@ -138,7 +200,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
when(histogram.getBuckets()).thenReturn(Arrays.asList(histogramBucket)); when(histogram.getBuckets()).thenReturn(Arrays.asList(histogramBucket));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString(histogram)); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString(histogram));
assertThat(e.getMessage(), containsString("Multiple nested aggregations are not supported")); assertThat(e.getMessage(), containsString("Multiple non-leaf nested aggregations are not supported"));
} }
public void testProcessGivenHistogramWithDateTimeKeys() throws IOException { public void testProcessGivenHistogramWithDateTimeKeys() throws IOException {
@ -156,8 +218,12 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
} }
private String aggToString(Aggregation aggregation) throws IOException { private String aggToString(Aggregation aggregation) throws IOException {
return aggToString(aggregation, true);
}
private String aggToString(Aggregation aggregation, boolean includeDocCount) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(outputStream)) { try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(includeDocCount, outputStream)) {
processor.process(aggregation); processor.process(aggregation);
} }
return outputStream.toString(StandardCharsets.UTF_8.name()); return outputStream.toString(StandardCharsets.UTF_8.name());

View File

@ -12,6 +12,8 @@ setup:
type: keyword type: keyword
responsetime: responsetime:
type: float type: float
event_rate:
type: integer
- do: - do:
index: index:
@ -22,7 +24,8 @@ setup:
{ {
"time": "2017-02-18T00:00:00Z", "time": "2017-02-18T00:00:00Z",
"airline": "foo", "airline": "foo",
"responsetime": 1.0 "responsetime": 1.0,
"event_rate": 5
} }
- do: - do:
@ -34,7 +37,8 @@ setup:
{ {
"time": "2017-02-18T00:30:00Z", "time": "2017-02-18T00:30:00Z",
"airline": "foo", "airline": "foo",
"responsetime": 1.0 "responsetime": 1.0,
"event_rate": 6
} }
- do: - do:
@ -46,7 +50,21 @@ setup:
{ {
"time": "2017-02-18T01:00:00Z", "time": "2017-02-18T01:00:00Z",
"airline": "bar", "airline": "bar",
"responsetime": 42.0 "responsetime": 42.0,
"event_rate": 8
}
- do:
index:
index: airline-data
type: response
id: 4
body: >
{
"time": "2017-02-18T01:01:00Z",
"airline": "foo",
"responsetime": 42.0,
"event_rate": 7
} }
- do: - do:
@ -83,7 +101,7 @@ setup:
- do: - do:
xpack.ml.preview_datafeed: xpack.ml.preview_datafeed:
datafeed_id: scroll-feed datafeed_id: scroll-feed
- length: { $body: 3 } - length: { $body: 4 }
- match: { 0.time: 1487376000000 } - match: { 0.time: 1487376000000 }
- match: { 0.airline: foo } - match: { 0.airline: foo }
- match: { 0.responsetime: 1.0 } - match: { 0.responsetime: 1.0 }
@ -93,13 +111,16 @@ setup:
- match: { 2.time: 1487379600000 } - match: { 2.time: 1487379600000 }
- match: { 2.airline: bar } - match: { 2.airline: bar }
- match: { 2.responsetime: 42.0 } - match: { 2.responsetime: 42.0 }
- match: { 3.time: 1487379660000 }
- match: { 3.airline: foo }
- match: { 3.responsetime: 42.0 }
--- ---
"Test preview aggregation datafeed": "Test preview aggregation datafeed with doc_count":
- do: - do:
xpack.ml.put_job: xpack.ml.put_job:
job_id: aggregation-job job_id: aggregation-doc-count-job
body: > body: >
{ {
"analysis_config" : { "analysis_config" : {
@ -114,10 +135,10 @@ setup:
- do: - do:
xpack.ml.put_datafeed: xpack.ml.put_datafeed:
datafeed_id: aggregation-feed datafeed_id: aggregation-doc-count-feed
body: > body: >
{ {
"job_id":"aggregation-job", "job_id":"aggregation-doc-count-job",
"indexes":"airline-data", "indexes":"airline-data",
"types":"response", "types":"response",
"aggregations": { "aggregations": {
@ -147,8 +168,8 @@ setup:
- do: - do:
xpack.ml.preview_datafeed: xpack.ml.preview_datafeed:
datafeed_id: aggregation-feed datafeed_id: aggregation-doc-count-feed
- length: { $body: 2 } - length: { $body: 3 }
- match: { 0.time: 1.487376E12 } - match: { 0.time: 1.487376E12 }
- match: { 0.airline: foo } - match: { 0.airline: foo }
- match: { 0.responsetime: 2.0 } - match: { 0.responsetime: 2.0 }
@ -157,6 +178,139 @@ setup:
- match: { 1.airline: bar } - match: { 1.airline: bar }
- match: { 1.responsetime: 42.0 } - match: { 1.responsetime: 42.0 }
- match: { 1.doc_count: 1 } - match: { 1.doc_count: 1 }
- match: { 1.time: 1.4873796E12 }
- match: { 2.airline: foo }
- match: { 2.responsetime: 42.0 }
- match: { 2.doc_count: 1 }
---
"Test preview single metric aggregation datafeed with different summary count field":
- do:
xpack.ml.put_job:
job_id: aggregation-custom-single-metric-summary-job
body: >
{
"analysis_config" : {
"bucket_span":3600,
"summary_count_field_name": "dc_airline",
"detectors" :[{"function":"count"}]
},
"data_description" : {
"time_field":"time"
}
}
- do:
xpack.ml.put_datafeed:
datafeed_id: aggregation-custom-single-metric-summary-feed
body: >
{
"job_id":"aggregation-custom-single-metric-summary-job",
"indexes":"airline-data",
"types":"response",
"aggregations": {
"time": {
"histogram": {
"field": "time",
"interval": 3600000
},
"aggregations": {
"dc_airline": {
"cardinality": {
"field": "airline"
}
}
}
}
}
}
- do:
xpack.ml.preview_datafeed:
datafeed_id: aggregation-custom-single-metric-summary-feed
- length: { $body: 2 }
- match: { 0.time: 1.487376E12 }
- match: { 0.dc_airline: 1 }
- is_false: 0.doc_count
- match: { 1.time: 1.4873796E12 }
- match: { 1.dc_airline: 2 }
- is_false: 1.doc_count
---
"Test preview multi metric aggregation datafeed with different summary count field":
- do:
xpack.ml.put_job:
job_id: aggregation-custom-multi-metric-summary-job
body: >
{
"analysis_config" : {
"bucket_span":3600,
"summary_count_field_name": "event_rate",
"detectors" :[{"function":"mean","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
"time_field":"time"
}
}
- do:
xpack.ml.put_datafeed:
datafeed_id: aggregation-custom-multi-metric-summary-feed
body: >
{
"job_id":"aggregation-custom-multi-metric-summary-job",
"indexes":"airline-data",
"types":"response",
"aggregations": {
"time": {
"histogram": {
"field": "time",
"interval": 3600000
},
"aggregations": {
"airline": {
"terms": {
"field": "airline"
},
"aggs": {
"responsetime": {
"sum": {
"field": "responsetime"
}
},
"event_rate": {
"sum": {
"field": "event_rate"
}
}
}
}
}
}
}
}
- do:
xpack.ml.preview_datafeed:
datafeed_id: aggregation-custom-multi-metric-summary-feed
- length: { $body: 3 }
- match: { 0.time: 1.487376E12 }
- match: { 0.airline: foo }
- match: { 0.responsetime: 2.0 }
- match: { 0.event_rate: 11 }
- is_false: 0.doc_count
- match: { 1.time: 1.4873796E12 }
- match: { 1.airline: bar }
- match: { 1.responsetime: 42.0 }
- match: { 1.event_rate: 8 }
- is_false: 1.doc_count
- match: { 1.time: 1.4873796E12 }
- match: { 2.airline: foo }
- match: { 2.responsetime: 42.0 }
- match: { 2.event_rate: 7 }
- is_false: 2.doc_count
--- ---
"Test preview missing datafeed": "Test preview missing datafeed":