[ML] Require max time aggregation to ensure correct datafeed restart (elastic/x-pack-elasticsearch#948)

Before this change, aggregation datafeeds used the histogram bucket
key as the record timestamp that is posted to the job. That meant
that the latest_record_timestamp at the end of a datafeed run was
the start of the latest seen histogram bucket. Upon continuing the
datafeed, the search starts from one millisecond after the
latest_record_timestamp. Hence, data may be fetched for a second time.

This change requires a max aggregation on the time_field nested in
the histogram bucket. It then reads the timestamp from that agg.
This ensures datafeed can restart without duplicating data.

relates elastic/x-pack-elasticsearch#874

Original commit: elastic/x-pack-elasticsearch@f820efa866
This commit is contained in:
Dimitris Athanasiou 2017-04-04 17:15:44 +01:00 committed by GitHub
parent 2153c71e8f
commit c9834bc826
7 changed files with 177 additions and 93 deletions

View File

@ -132,10 +132,6 @@ class AggregationDataExtractor implements DataExtractor {
Aggregation topAgg = aggsAsList.get(0); Aggregation topAgg = aggsAsList.get(0);
if (topAgg instanceof Histogram) { if (topAgg instanceof Histogram) {
if (context.timeField.equals(topAgg.getName()) == false) {
throw new IllegalArgumentException("Histogram name [" + topAgg.getName()
+ "] does not match time field [" + context.timeField + "]");
}
return ((Histogram) topAgg).getBuckets(); return ((Histogram) topAgg).getBuckets();
} else { } else {
throw new IllegalArgumentException("Top level aggregation should be [histogram]"); throw new IllegalArgumentException("Top level aggregation should be [histogram]");
@ -149,9 +145,10 @@ class AggregationDataExtractor implements DataExtractor {
} }
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(context.includeDocCount, outputStream)) { try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(
context.timeField, context.includeDocCount, outputStream)) {
while (histogramBuckets.isEmpty() == false && processor.getKeyValueCount() < BATCH_KEY_VALUE_PAIRS) { while (histogramBuckets.isEmpty() == false && processor.getKeyValueCount() < BATCH_KEY_VALUE_PAIRS) {
processor.process(context.timeField, histogramBuckets.removeFirst()); processor.process(histogramBuckets.removeFirst());
} }
if (histogramBuckets.isEmpty()) { if (histogramBuckets.isEmpty()) {
hasNext = false; hasNext = false;

View File

@ -5,6 +5,7 @@
*/ */
package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation; package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.json.JsonXContent;
@ -13,10 +14,10 @@ import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile; import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile;
import org.elasticsearch.search.aggregations.metrics.percentiles.Percentiles; import org.elasticsearch.search.aggregations.metrics.percentiles.Percentiles;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig; import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.joda.time.base.BaseDateTime;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
@ -26,18 +27,24 @@ import java.util.Iterator;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
/** /**
* Processes {@link Aggregation} objects and writes flat JSON documents for each leaf aggregation. * Processes {@link Aggregation} objects and writes flat JSON documents for each leaf aggregation.
* In order to ensure that datafeeds can restart without duplicating data, we require that
* each histogram bucket has a nested max aggregation matching the time_field.
*/ */
class AggregationToJsonProcessor implements Releasable { class AggregationToJsonProcessor implements Releasable {
private final String timeField;
private final boolean includeDocCount; private final boolean includeDocCount;
private final XContentBuilder jsonBuilder; private final XContentBuilder jsonBuilder;
private final Map<String, Object> keyValuePairs; private final Map<String, Object> keyValuePairs;
private long keyValueWrittenCount; private long keyValueWrittenCount;
AggregationToJsonProcessor(boolean includeDocCount, OutputStream outputStream) throws IOException { AggregationToJsonProcessor(String timeField, boolean includeDocCount, OutputStream outputStream)
throws IOException {
this.timeField = Objects.requireNonNull(timeField);
this.includeDocCount = includeDocCount; this.includeDocCount = includeDocCount;
jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream); jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream);
keyValuePairs = new LinkedHashMap<>(); keyValuePairs = new LinkedHashMap<>();
@ -53,17 +60,28 @@ class AggregationToJsonProcessor implements Releasable {
* <li>{@link Percentiles}</li> * <li>{@link Percentiles}</li>
* </ul> * </ul>
*/ */
public void process(String timeField, Histogram.Bucket bucket) throws IOException { public void process(Histogram.Bucket bucket) throws IOException {
Object timestamp = bucket.getKey(); if (bucket.getDocCount() == 0) {
if (timestamp instanceof BaseDateTime) { return;
timestamp = ((BaseDateTime) timestamp).getMillis();
}
keyValuePairs.put(timeField, timestamp);
processNestedAggs(bucket.getDocCount(), bucket.getAggregations());
} }
private void processNestedAggs(long docCount, Aggregations subAggs) throws IOException { Aggregations aggs = bucket.getAggregations();
List<Aggregation> aggs = subAggs == null ? Collections.emptyList() : subAggs.asList(); Aggregation timeAgg = aggs == null ? null : aggs.get(timeField);
if (timeAgg instanceof Max == false) {
throw new IllegalArgumentException("Missing max aggregation for time_field [" + timeField + "]");
}
// We want to handle the max time aggregation only at the bucket level.
// So, we add the value here and then remove the aggregation before
// processing the rest of the sub aggs.
long timestamp = (long) ((Max) timeAgg).value();
keyValuePairs.put(timeField, timestamp);
List<Aggregation> subAggs = new ArrayList<>(aggs.asList());
subAggs.remove(timeAgg);
processNestedAggs(bucket.getDocCount(), subAggs);
}
private void processNestedAggs(long docCount, List<Aggregation> aggs) throws IOException {
if (aggs.isEmpty()) { if (aggs.isEmpty()) {
writeJsonObject(docCount); writeJsonObject(docCount);
return; return;
@ -92,7 +110,7 @@ class AggregationToJsonProcessor implements Releasable {
private void processTerms(Terms termsAgg) throws IOException { private void processTerms(Terms termsAgg) throws IOException {
for (Terms.Bucket bucket : termsAgg.getBuckets()) { for (Terms.Bucket bucket : termsAgg.getBuckets()) {
keyValuePairs.put(termsAgg.getName(), bucket.getKey()); keyValuePairs.put(termsAgg.getName(), bucket.getKey());
processNestedAggs(bucket.getDocCount(), bucket.getAggregations()); processNestedAggs(bucket.getDocCount(), asList(bucket.getAggregations()));
keyValuePairs.remove(termsAgg.getName()); keyValuePairs.remove(termsAgg.getName());
} }
} }
@ -137,4 +155,8 @@ class AggregationToJsonProcessor implements Releasable {
public long getKeyValueCount() { public long getKeyValueCount() {
return keyValueWrittenCount; return keyValueWrittenCount;
} }
private static List<Aggregation> asList(@Nullable Aggregations aggs) {
return aggs == null ? Collections.emptyList() : aggs.asList();
}
} }

View File

@ -34,6 +34,7 @@ import java.util.stream.Collectors;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramBucket; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramBucket;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createMax;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createTerms; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createTerms;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -90,9 +91,11 @@ public class AggregationDataExtractorTests extends ESTestCase {
public void testExtraction() throws IOException { public void testExtraction() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList( List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 3, Arrays.asList( createHistogramBucket(1000L, 3, Arrays.asList(
createMax("time", 1999),
createTerms("airline", new Term("a", 1, "responsetime", 11.0), new Term("b", 2, "responsetime", 12.0)))), createTerms("airline", new Term("a", 1, "responsetime", 11.0), new Term("b", 2, "responsetime", 12.0)))),
createHistogramBucket(2000L, 0, Arrays.asList()), createHistogramBucket(2000L, 0, Arrays.asList()),
createHistogramBucket(3000L, 7, Arrays.asList( createHistogramBucket(3000L, 7, Arrays.asList(
createMax("time", 3999),
createTerms("airline", new Term("c", 4, "responsetime", 31.0), new Term("b", 3, "responsetime", 32.0)))) createTerms("airline", new Term("c", 4, "responsetime", 31.0), new Term("b", 3, "responsetime", 32.0))))
); );
@ -104,10 +107,10 @@ public class AggregationDataExtractorTests extends ESTestCase {
assertThat(extractor.hasNext(), is(true)); assertThat(extractor.hasNext(), is(true));
Optional<InputStream> stream = extractor.next(); Optional<InputStream> stream = extractor.next();
assertThat(stream.isPresent(), is(true)); assertThat(stream.isPresent(), is(true));
String expectedStream = "{\"time\":1000,\"airline\":\"a\",\"responsetime\":11.0,\"doc_count\":1} " String expectedStream = "{\"time\":1999,\"airline\":\"a\",\"responsetime\":11.0,\"doc_count\":1} "
+ "{\"time\":1000,\"airline\":\"b\",\"responsetime\":12.0,\"doc_count\":2} " + "{\"time\":1999,\"airline\":\"b\",\"responsetime\":12.0,\"doc_count\":2} "
+ "{\"time\":3000,\"airline\":\"c\",\"responsetime\":31.0,\"doc_count\":4} " + "{\"time\":3999,\"airline\":\"c\",\"responsetime\":31.0,\"doc_count\":4} "
+ "{\"time\":3000,\"airline\":\"b\",\"responsetime\":32.0,\"doc_count\":3}"; + "{\"time\":3999,\"airline\":\"b\",\"responsetime\":32.0,\"doc_count\":3}";
assertThat(asString(stream.get()), equalTo(expectedStream)); assertThat(asString(stream.get()), equalTo(expectedStream));
assertThat(extractor.hasNext(), is(false)); assertThat(extractor.hasNext(), is(false));
assertThat(capturedSearchRequests.size(), equalTo(1)); assertThat(capturedSearchRequests.size(), equalTo(1));
@ -128,7 +131,7 @@ public class AggregationDataExtractorTests extends ESTestCase {
List<Histogram.Bucket> histogramBuckets = new ArrayList<>(buckets); List<Histogram.Bucket> histogramBuckets = new ArrayList<>(buckets);
long timestamp = 1000; long timestamp = 1000;
for (int i = 0; i < buckets; i++) { for (int i = 0; i < buckets; i++) {
histogramBuckets.add(createHistogramBucket(timestamp, 3)); histogramBuckets.add(createHistogramBucket(timestamp, 3, Arrays.asList(createMax("time", timestamp))));
timestamp += 1000L; timestamp += 1000L;
} }
@ -222,7 +225,7 @@ public class AggregationDataExtractorTests extends ESTestCase {
List<Histogram.Bucket> histogramBuckets = new ArrayList<>(buckets); List<Histogram.Bucket> histogramBuckets = new ArrayList<>(buckets);
long timestamp = 1000; long timestamp = 1000;
for (int i = 0; i < buckets; i++) { for (int i = 0; i < buckets; i++) {
histogramBuckets.add(createHistogramBucket(timestamp, 3)); histogramBuckets.add(createHistogramBucket(timestamp, 3, Arrays.asList(createMax("time", timestamp))));
timestamp += 1000L; timestamp += 1000L;
} }

View File

@ -11,6 +11,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
import org.elasticsearch.search.aggregations.metrics.max.Max;
import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile; import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile;
import org.elasticsearch.search.aggregations.metrics.percentiles.Percentiles; import org.elasticsearch.search.aggregations.metrics.percentiles.Percentiles;
import org.joda.time.DateTime; import org.joda.time.DateTime;
@ -39,6 +40,9 @@ public final class AggregationTestUtils {
static Aggregations createAggs(List<Aggregation> aggsList) { static Aggregations createAggs(List<Aggregation> aggsList) {
Aggregations aggs = mock(Aggregations.class); Aggregations aggs = mock(Aggregations.class);
when(aggs.asList()).thenReturn(aggsList); when(aggs.asList()).thenReturn(aggsList);
for (Aggregation agg: aggsList) {
when(aggs.get(agg.getName())).thenReturn(agg);
}
return aggs; return aggs;
} }
@ -56,6 +60,14 @@ public final class AggregationTestUtils {
return bucket; return bucket;
} }
static Max createMax(String name, double value) {
Max max = mock(Max.class);
when(max.getName()).thenReturn(name);
when(max.value()).thenReturn(value);
when(max.getValue()).thenReturn(value);
return max;
}
static NumericMetricsAggregation.SingleValue createSingleValue(String name, double value) { static NumericMetricsAggregation.SingleValue createSingleValue(String name, double value) {
NumericMetricsAggregation.SingleValue singleValue = mock(NumericMetricsAggregation.SingleValue.class); NumericMetricsAggregation.SingleValue singleValue = mock(NumericMetricsAggregation.SingleValue.class);
when(singleValue.getName()).thenReturn(name); when(singleValue.getName()).thenReturn(name);

View File

@ -9,7 +9,6 @@ import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram; import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms; import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.joda.time.DateTime;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
@ -22,8 +21,8 @@ import java.util.Map;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createAggs; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createAggs;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createDateHistogramBucket;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramBucket; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramBucket;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createMax;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createPercentiles; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createPercentiles;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createSingleValue; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createSingleValue;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createTerms; import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createTerms;
@ -36,22 +35,42 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
private long keyValuePairsWritten = 0; private long keyValuePairsWritten = 0;
public void testProcessGivenHistogramOnly() throws IOException { public void testProcessGivenMaxTimeIsMissing() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList( List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 3), createHistogramBucket(1000L, 3),
createHistogramBucket(2000L, 5) createHistogramBucket(2000L, 5)
); );
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBuckets));
assertThat(e.getMessage(), containsString("Missing max aggregation for time_field [time]"));
}
public void testProcessGivenNonMaxTimeAgg() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 3, Arrays.asList(createTerms("time"))),
createHistogramBucket(2000L, 5, Arrays.asList(createTerms("time")))
);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBuckets));
assertThat(e.getMessage(), containsString("Missing max aggregation for time_field [time]"));
}
public void testProcessGivenHistogramOnly() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 3, Arrays.asList(createMax("timestamp", 1200))),
createHistogramBucket(2000L, 5, Arrays.asList(createMax("timestamp", 2800)))
);
String json = aggToString("timestamp", histogramBuckets); String json = aggToString("timestamp", histogramBuckets);
assertThat(json, equalTo("{\"timestamp\":1000,\"doc_count\":3} {\"timestamp\":2000,\"doc_count\":5}")); assertThat(json, equalTo("{\"timestamp\":1200,\"doc_count\":3} {\"timestamp\":2800,\"doc_count\":5}"));
assertThat(keyValuePairsWritten, equalTo(4L)); assertThat(keyValuePairsWritten, equalTo(4L));
} }
public void testProcessGivenHistogramOnlyAndNoDocCount() throws IOException { public void testProcessGivenHistogramOnlyAndNoDocCount() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList( List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 3), createHistogramBucket(1000L, 3, Arrays.asList(createMax("time", 1000))),
createHistogramBucket(2000L, 5) createHistogramBucket(2000L, 5, Arrays.asList(createMax("time", 2000)))
); );
String json = aggToString("time", false, histogramBuckets); String json = aggToString("time", false, histogramBuckets);
@ -62,8 +81,10 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
public void testProcessGivenSingleMetricPerHistogram() throws IOException { public void testProcessGivenSingleMetricPerHistogram() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList( List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 3, Arrays.asList(createSingleValue("my_value", 1.0))), createHistogramBucket(1000L, 3, Arrays.asList(
createHistogramBucket(2000L, 5, Arrays.asList(createSingleValue("my_value", 2.0))) createMax("time", 1000), createSingleValue("my_value", 1.0))),
createHistogramBucket(2000L, 5, Arrays.asList(
createMax("time", 2000), createSingleValue("my_value", 2.0)))
); );
String json = aggToString("time", histogramBuckets); String json = aggToString("time", histogramBuckets);
@ -74,32 +95,41 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
public void testProcessGivenTermsPerHistogram() throws IOException { public void testProcessGivenTermsPerHistogram() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList( List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 4, Arrays.asList( createHistogramBucket(1000L, 4, Arrays.asList(
createMax("time", 1100),
createTerms("my_field", new Term("a", 1), new Term("b", 2), new Term("c", 1)))), createTerms("my_field", new Term("a", 1), new Term("b", 2), new Term("c", 1)))),
createHistogramBucket(2000L, 5, Arrays.asList(createTerms("my_field", new Term("a", 5), new Term("b", 2)))), createHistogramBucket(2000L, 5, Arrays.asList(
createHistogramBucket(3000L, 0, Arrays.asList()), createMax("time", 2200),
createHistogramBucket(4000L, 7, Arrays.asList(createTerms("my_field", new Term("c", 4), new Term("b", 3)))) createTerms("my_field", new Term("a", 5), new Term("b", 2)))),
createHistogramBucket(3000L, 0, Arrays.asList(createMax("time", -1))),
createHistogramBucket(4000L, 7, Arrays.asList(
createMax("time", 4400),
createTerms("my_field", new Term("c", 4), new Term("b", 3))))
); );
String json = aggToString("time", histogramBuckets); String json = aggToString("time", histogramBuckets);
assertThat(json, equalTo("{\"time\":1000,\"my_field\":\"a\",\"doc_count\":1} " + assertThat(json, equalTo("{\"time\":1100,\"my_field\":\"a\",\"doc_count\":1} " +
"{\"time\":1000,\"my_field\":\"b\",\"doc_count\":2} " + "{\"time\":1100,\"my_field\":\"b\",\"doc_count\":2} " +
"{\"time\":1000,\"my_field\":\"c\",\"doc_count\":1} " + "{\"time\":1100,\"my_field\":\"c\",\"doc_count\":1} " +
"{\"time\":2000,\"my_field\":\"a\",\"doc_count\":5} " + "{\"time\":2200,\"my_field\":\"a\",\"doc_count\":5} " +
"{\"time\":2000,\"my_field\":\"b\",\"doc_count\":2} " + "{\"time\":2200,\"my_field\":\"b\",\"doc_count\":2} " +
"{\"time\":4000,\"my_field\":\"c\",\"doc_count\":4} " + "{\"time\":4400,\"my_field\":\"c\",\"doc_count\":4} " +
"{\"time\":4000,\"my_field\":\"b\",\"doc_count\":3}")); "{\"time\":4400,\"my_field\":\"b\",\"doc_count\":3}"));
} }
public void testProcessGivenSingleMetricPerSingleTermsPerHistogram() throws IOException { public void testProcessGivenSingleMetricPerSingleTermsPerHistogram() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList( List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 4, Arrays.asList(createTerms("my_field", createHistogramBucket(1000L, 4, Arrays.asList(
new Term("a", 1, "my_value", 11.0), new Term("b", 2, "my_value", 12.0), new Term("c", 1, "my_value", 13.0)))), createMax("time", 1000),
createHistogramBucket(2000L, 5, Arrays.asList(createTerms("my_field", createTerms("my_field", new Term("a", 1, "my_value", 11.0),
new Term("a", 5, "my_value", 21.0), new Term("b", 2, "my_value", 22.0)))), new Term("b", 2, "my_value", 12.0), new Term("c", 1, "my_value", 13.0)))),
createHistogramBucket(3000L, 0, Arrays.asList()), createHistogramBucket(2000L, 5, Arrays.asList(
createHistogramBucket(4000L, 7, Arrays.asList(createTerms("my_field", createMax("time", 2000),
new Term("c", 4, "my_value", 41.0), new Term("b", 3, "my_value", 42.0)))) createTerms("my_field", new Term("a", 5, "my_value", 21.0), new Term("b", 2, "my_value", 22.0)))),
createHistogramBucket(3000L, 0, Arrays.asList(createMax("time", 3000))),
createHistogramBucket(4000L, 7, Arrays.asList(
createMax("time", 4000),
createTerms("my_field", new Term("c", 4, "my_value", 41.0), new Term("b", 3, "my_value", 42.0))))
); );
String json = aggToString("time", histogramBuckets); String json = aggToString("time", histogramBuckets);
@ -136,13 +166,17 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
b4NumericAggs.put("my_value", 421.0); b4NumericAggs.put("my_value", 421.0);
b4NumericAggs.put("my_value2", 422.0); b4NumericAggs.put("my_value2", 422.0);
List<Histogram.Bucket> histogramBuckets = Arrays.asList( List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 4, Arrays.asList(createTerms("my_field", createHistogramBucket(1000L, 4, Arrays.asList(
new Term("a", 1, a1NumericAggs), new Term("b", 2, b1NumericAggs), new Term("c", 1, c1NumericAggs)))), createMax("time", 1000),
createHistogramBucket(2000L, 5, Arrays.asList(createTerms("my_field", createTerms("my_field", new Term("a", 1, a1NumericAggs),
new Term("a", 5, a2NumericAggs), new Term("b", 2, b2NumericAggs)))), new Term("b", 2, b1NumericAggs), new Term("c", 1, c1NumericAggs)))),
createHistogramBucket(3000L, 0, Arrays.asList()), createHistogramBucket(2000L, 5, Arrays.asList(
createHistogramBucket(4000L, 7, Arrays.asList(createTerms("my_field", createMax("time", 2000),
new Term("c", 4, c4NumericAggs), new Term("b", 3, b4NumericAggs)))) createTerms("my_field", new Term("a", 5, a2NumericAggs), new Term("b", 2, b2NumericAggs)))),
createHistogramBucket(3000L, 0, Arrays.asList(createMax("time", 3000))),
createHistogramBucket(4000L, 7, Arrays.asList(
createMax("time", 4000),
createTerms("my_field", new Term("c", 4, c4NumericAggs), new Term("b", 3, b4NumericAggs))))
); );
String json = aggToString("time", false, histogramBuckets); String json = aggToString("time", false, histogramBuckets);
@ -160,7 +194,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2); Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2);
Histogram anotherHistogram = mock(Histogram.class); Histogram anotherHistogram = mock(Histogram.class);
when(anotherHistogram.getName()).thenReturn("nested-agg"); when(anotherHistogram.getName()).thenReturn("nested-agg");
Aggregations subAggs = createAggs(Arrays.asList(anotherHistogram)); Aggregations subAggs = createAggs(Arrays.asList(createMax("time", 1000), anotherHistogram));
when(histogramBucket.getAggregations()).thenReturn(subAggs); when(histogramBucket.getAggregations()).thenReturn(subAggs);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBucket)); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBucket));
@ -171,30 +205,23 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2); Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2);
Terms terms1 = mock(Terms.class); Terms terms1 = mock(Terms.class);
Terms terms2 = mock(Terms.class); Terms terms2 = mock(Terms.class);
Aggregations subAggs = createAggs(Arrays.asList(terms1, terms2)); Aggregations subAggs = createAggs(Arrays.asList(createMax("time", 1000), terms1, terms2));
when(histogramBucket.getAggregations()).thenReturn(subAggs); when(histogramBucket.getAggregations()).thenReturn(subAggs);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBucket)); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBucket));
assertThat(e.getMessage(), containsString("Multiple non-leaf nested aggregations are not supported")); assertThat(e.getMessage(), containsString("Multiple non-leaf nested aggregations are not supported"));
} }
public void testProcessGivenHistogramWithDateTimeKeys() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createDateHistogramBucket(new DateTime(1000L), 3),
createDateHistogramBucket(new DateTime(2000L), 5)
);
String json = aggToString("time", histogramBuckets);
assertThat(json, equalTo("{\"time\":1000,\"doc_count\":3} {\"time\":2000,\"doc_count\":5}"));
}
public void testProcessGivenSinglePercentilesPerHistogram() throws IOException { public void testProcessGivenSinglePercentilesPerHistogram() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList( List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 4, Arrays.asList(createPercentiles("my_field", 1.0))), createHistogramBucket(1000L, 4, Arrays.asList(
createHistogramBucket(2000L, 7, Arrays.asList(createPercentiles("my_field", 2.0))), createMax("time", 1000), createPercentiles("my_field", 1.0))),
createHistogramBucket(3000L, 10, Arrays.asList(createPercentiles("my_field", 3.0))), createHistogramBucket(2000L, 7, Arrays.asList(
createHistogramBucket(4000L, 14, Arrays.asList(createPercentiles("my_field", 4.0))) createMax("time", 2000), createPercentiles("my_field", 2.0))),
createHistogramBucket(3000L, 10, Arrays.asList(
createMax("time", 3000), createPercentiles("my_field", 3.0))),
createHistogramBucket(4000L, 14, Arrays.asList(
createMax("time", 4000), createPercentiles("my_field", 4.0)))
); );
String json = aggToString("time", histogramBuckets); String json = aggToString("time", histogramBuckets);
@ -207,10 +234,14 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
public void testProcessGivenMultiplePercentilesPerHistogram() throws IOException { public void testProcessGivenMultiplePercentilesPerHistogram() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList( List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 4, Arrays.asList(createPercentiles("my_field", 1.0))), createHistogramBucket(1000L, 4, Arrays.asList(
createHistogramBucket(2000L, 7, Arrays.asList(createPercentiles("my_field", 2.0, 5.0))), createMax("time", 1000), createPercentiles("my_field", 1.0))),
createHistogramBucket(3000L, 10, Arrays.asList(createPercentiles("my_field", 3.0))), createHistogramBucket(2000L, 7, Arrays.asList(
createHistogramBucket(4000L, 14, Arrays.asList(createPercentiles("my_field", 4.0))) createMax("time", 2000), createPercentiles("my_field", 2.0, 5.0))),
createHistogramBucket(3000L, 10, Arrays.asList(
createMax("time", 3000), createPercentiles("my_field", 3.0))),
createHistogramBucket(4000L, 14, Arrays.asList(
createMax("time", 4000), createPercentiles("my_field", 4.0)))
); );
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBuckets)); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBuckets));
@ -227,9 +258,9 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
private String aggToString(String timeField, boolean includeDocCount, List<Histogram.Bucket> buckets) throws IOException { private String aggToString(String timeField, boolean includeDocCount, List<Histogram.Bucket> buckets) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(includeDocCount, outputStream)) { try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(timeField, includeDocCount, outputStream)) {
for (Histogram.Bucket bucket : buckets) { for (Histogram.Bucket bucket : buckets) {
processor.process(timeField, bucket); processor.process(bucket);
} }
keyValuePairsWritten = processor.getKeyValueCount(); keyValuePairsWritten = processor.getKeyValueCount();
} }

View File

@ -305,8 +305,10 @@ public class DatafeedJobIT extends ESRestTestCase {
new StringEntity(job, ContentType.APPLICATION_JSON)); new StringEntity(job, ContentType.APPLICATION_JSON));
String datafeedId = "datafeed-" + jobId; String datafeedId = "datafeed-" + jobId;
String aggregations = "{\"time stamp\":{\"histogram\":{\"field\":\"time stamp\",\"interval\":3600000}," String aggregations = "{\"buckets\":{\"histogram\":{\"field\":\"time stamp\",\"interval\":3600000},"
+ "\"aggregations\":{\"airline\":{\"terms\":{\"field\":\"airline\",\"size\":10}," + "\"aggregations\":{"
+ "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}},"
+ "\"airline\":{\"terms\":{\"field\":\"airline\",\"size\":10},"
+ " \"aggregations\":{\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}}}"; + " \"aggregations\":{\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}}}";
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response").setAggregations(aggregations).build(); new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response").setAggregations(aggregations).build();
openJob(client(), jobId); openJob(client(), jobId);
@ -332,7 +334,9 @@ public class DatafeedJobIT extends ESRestTestCase {
String datafeedId = "datafeed-" + jobId; String datafeedId = "datafeed-" + jobId;
String aggregations = "{\"time stamp\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":\"1h\"}," String aggregations = "{\"time stamp\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":\"1h\"},"
+ "\"aggregations\":{\"airline\":{\"terms\":{\"field\":\"airline\",\"size\":10}," + "\"aggregations\":{"
+ "\"time stamp\":{\"max\":{\"field\":\"time stamp\"}},"
+ "\"airline\":{\"terms\":{\"field\":\"airline\",\"size\":10},"
+ " \"aggregations\":{\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}}}"; + " \"aggregations\":{\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}}}";
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response").setAggregations(aggregations).build(); new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response").setAggregations(aggregations).build();
openJob(client(), jobId); openJob(client(), jobId);

View File

@ -142,12 +142,17 @@ setup:
"indexes":"airline-data", "indexes":"airline-data",
"types":"response", "types":"response",
"aggregations": { "aggregations": {
"time": { "buckets": {
"histogram": { "histogram": {
"field": "time", "field": "time",
"interval": 3600000 "interval": 3600000
}, },
"aggregations": { "aggregations": {
"time": {
"max": {
"field": "time"
}
},
"airline": { "airline": {
"terms": { "terms": {
"field": "airline", "field": "airline",
@ -170,15 +175,15 @@ setup:
xpack.ml.preview_datafeed: xpack.ml.preview_datafeed:
datafeed_id: aggregation-doc-count-feed datafeed_id: aggregation-doc-count-feed
- length: { $body: 3 } - length: { $body: 3 }
- match: { 0.time: 1.487376E12 } - match: { 0.time: 1487377800000 }
- match: { 0.airline: foo } - match: { 0.airline: foo }
- match: { 0.responsetime: 2.0 } - match: { 0.responsetime: 2.0 }
- match: { 0.doc_count: 2 } - match: { 0.doc_count: 2 }
- match: { 1.time: 1.4873796E12 } - match: { 1.time: 1487379660000 }
- match: { 1.airline: bar } - match: { 1.airline: bar }
- match: { 1.responsetime: 42.0 } - match: { 1.responsetime: 42.0 }
- match: { 1.doc_count: 1 } - match: { 1.doc_count: 1 }
- match: { 1.time: 1.4873796E12 } - match: { 1.time: 1487379660000 }
- match: { 2.airline: foo } - match: { 2.airline: foo }
- match: { 2.responsetime: 42.0 } - match: { 2.responsetime: 42.0 }
- match: { 2.doc_count: 1 } - match: { 2.doc_count: 1 }
@ -210,12 +215,17 @@ setup:
"indexes":"airline-data", "indexes":"airline-data",
"types":"response", "types":"response",
"aggregations": { "aggregations": {
"time": { "buckets": {
"histogram": { "histogram": {
"field": "time", "field": "time",
"interval": 3600000 "interval": 3600000
}, },
"aggregations": { "aggregations": {
"time": {
"max": {
"field": "time"
}
},
"dc_airline": { "dc_airline": {
"cardinality": { "cardinality": {
"field": "airline" "field": "airline"
@ -230,10 +240,10 @@ setup:
xpack.ml.preview_datafeed: xpack.ml.preview_datafeed:
datafeed_id: aggregation-custom-single-metric-summary-feed datafeed_id: aggregation-custom-single-metric-summary-feed
- length: { $body: 2 } - length: { $body: 2 }
- match: { 0.time: 1.487376E12 } - match: { 0.time: 1487377800000 }
- match: { 0.dc_airline: 1 } - match: { 0.dc_airline: 1 }
- is_false: 0.doc_count - is_false: 0.doc_count
- match: { 1.time: 1.4873796E12 } - match: { 1.time: 1487379660000 }
- match: { 1.dc_airline: 2 } - match: { 1.dc_airline: 2 }
- is_false: 1.doc_count - is_false: 1.doc_count
@ -264,12 +274,17 @@ setup:
"indexes":"airline-data", "indexes":"airline-data",
"types":"response", "types":"response",
"aggregations": { "aggregations": {
"time": { "buckets": {
"histogram": { "histogram": {
"field": "time", "field": "time",
"interval": 3600000 "interval": 3600000
}, },
"aggregations": { "aggregations": {
"time": {
"max": {
"field": "time"
}
},
"airline": { "airline": {
"terms": { "terms": {
"field": "airline" "field": "airline"
@ -296,17 +311,17 @@ setup:
xpack.ml.preview_datafeed: xpack.ml.preview_datafeed:
datafeed_id: aggregation-custom-multi-metric-summary-feed datafeed_id: aggregation-custom-multi-metric-summary-feed
- length: { $body: 3 } - length: { $body: 3 }
- match: { 0.time: 1.487376E12 } - match: { 0.time: 1487377800000 }
- match: { 0.airline: foo } - match: { 0.airline: foo }
- match: { 0.responsetime: 2.0 } - match: { 0.responsetime: 2.0 }
- match: { 0.event_rate: 11 } - match: { 0.event_rate: 11 }
- is_false: 0.doc_count - is_false: 0.doc_count
- match: { 1.time: 1.4873796E12 } - match: { 1.time: 1487379660000 }
- match: { 1.airline: bar } - match: { 1.airline: bar }
- match: { 1.responsetime: 42.0 } - match: { 1.responsetime: 42.0 }
- match: { 1.event_rate: 8 } - match: { 1.event_rate: 8 }
- is_false: 1.doc_count - is_false: 1.doc_count
- match: { 1.time: 1.4873796E12 } - match: { 1.time: 1487379660000 }
- match: { 2.airline: foo } - match: { 2.airline: foo }
- match: { 2.responsetime: 42.0 } - match: { 2.responsetime: 42.0 }
- match: { 2.event_rate: 7 } - match: { 2.event_rate: 7 }