Fix datafeed with date_histogram aggregation (elastic/elasticsearch#876)

date_histogram buckets return the key as a DateTime object.
This PR checks if the key is DateTime and returns the epoch
millis when suitable.

Fixes elastic/elasticsearch#869

Original commit: elastic/x-pack-elasticsearch@8e39760dad
This commit is contained in:
Dimitris Athanasiou 2017-02-07 14:45:02 +00:00 committed by GitHub
parent 678ae53596
commit 15160e41a2
4 changed files with 55 additions and 3 deletions

View File

@ -14,6 +14,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
import org.elasticsearch.xpack.ml.datafeed.DatafeedConfig;
import org.joda.time.base.BaseDateTime;
import java.io.IOException;
import java.io.OutputStream;
@ -51,7 +52,11 @@ class AggregationToJsonProcessor implements Releasable {
private void processHistogram(Histogram histogram) throws IOException {
for (Histogram.Bucket bucket : histogram.getBuckets()) {
keyValuePairs.put(histogram.getName(), bucket.getKey());
Object timestamp = bucket.getKey();
if (timestamp instanceof BaseDateTime) {
timestamp = ((BaseDateTime) timestamp).getMillis();
}
keyValuePairs.put(histogram.getName(), timestamp);
processNestedAggs(bucket.getDocCount(), bucket.getAggregations());
}
}

View File

@ -11,6 +11,7 @@ import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
import org.joda.time.DateTime;
import java.util.ArrayList;
import java.util.Arrays;
@ -43,6 +44,13 @@ public final class AggregationTestUtils {
return bucket;
}
static Histogram.Bucket createDateHistogramBucket(DateTime timestamp, long docCount) {
Histogram.Bucket bucket = mock(Histogram.Bucket.class);
when(bucket.getKey()).thenReturn(timestamp);
when(bucket.getDocCount()).thenReturn(docCount);
return bucket;
}
static NumericMetricsAggregation.SingleValue createSingleValue(String name, double value) {
NumericMetricsAggregation.SingleValue singleValue = mock(NumericMetricsAggregation.SingleValue.class);
when(singleValue.getName()).thenReturn(name);

View File

@ -10,6 +10,7 @@ import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.test.ESTestCase;
import org.joda.time.DateTime;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
@ -19,6 +20,7 @@ import java.util.List;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createAggs;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createDateHistogramBucket;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createHistogramBucket;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createSingleValue;
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createTerms;
@ -139,6 +141,20 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
assertThat(e.getMessage(), containsString("Multiple nested aggregations are not supported"));
}
public void testProcessGivenHistogramWithDateTimeKeys() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createDateHistogramBucket(new DateTime(1000L), 3),
createDateHistogramBucket(new DateTime(2000L), 5)
);
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn("time");
when(histogram.getBuckets()).thenReturn(histogramBuckets);
String json = aggToString(histogram);
assertThat(json, equalTo("{\"time\":1000,\"doc_count\":3} {\"time\":2000,\"doc_count\":5}"));
}
private String aggToString(Aggregation aggregation) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(outputStream)) {

View File

@ -191,8 +191,8 @@ public class DatafeedJobIT extends ESRestTestCase {
.execute();
}
public void testLookbackOnlyGivenAggregations() throws Exception {
String jobId = "aggs-job";
public void testLookbackOnlyGivenAggregationsWithHistogram() throws Exception {
String jobId = "aggs-histogram-job";
String job = "{\"description\":\"Aggs job\",\"analysis_config\" :{\"bucket_span\":3600,\"summary_count_field_name\":\"doc_count\","
+ "\"detectors\":[{\"function\":\"mean\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]},"
+ "\"data_description\" : {\"time_field\":\"time stamp\"}"
@ -214,6 +214,29 @@ public class DatafeedJobIT extends ESRestTestCase {
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0"));
}
public void testLookbackOnlyGivenAggregationsWithDateHistogram() throws Exception {
String jobId = "aggs-date-histogram-job";
String job = "{\"description\":\"Aggs job\",\"analysis_config\" :{\"bucket_span\":3600,\"summary_count_field_name\":\"doc_count\","
+ "\"detectors\":[{\"function\":\"mean\",\"field_name\":\"responsetime\",\"by_field_name\":\"airline\"}]},"
+ "\"data_description\" : {\"time_field\":\"time stamp\"}"
+ "}";
client().performRequest("put", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId, Collections.emptyMap(), new StringEntity(job));
String datafeedId = "datafeed-" + jobId;
String aggregations = "{\"time stamp\":{\"date_histogram\":{\"field\":\"time stamp\",\"interval\":\"1h\"},"
+ "\"aggregations\":{\"airline\":{\"terms\":{\"field\":\"airline\",\"size\":10},"
+ "\"aggregations\":{\"responsetime\":{\"avg\":{\"field\":\"responsetime\"}}}}}}}";
new DatafeedBuilder(datafeedId, jobId, "airline-data-aggs", "response").setAggregations(aggregations).build();
openJob(client(), jobId);
startDatafeedAndWaitUntilStopped(datafeedId);
Response jobStatsResponse = client().performRequest("get", MlPlugin.BASE_PATH + "anomaly_detectors/" + jobId + "/_stats");
String jobStatsResponseAsString = responseEntityToString(jobStatsResponse);
assertThat(jobStatsResponseAsString, containsString("\"input_record_count\":4"));
assertThat(jobStatsResponseAsString, containsString("\"processed_record_count\":4"));
assertThat(jobStatsResponseAsString, containsString("\"missing_field_count\":0"));
}
public void testRealtime() throws Exception {
String jobId = "job-realtime-1";
createJob(jobId);