[ML] Implement cancellation policy for aggregated datafeeds (elastic/x-pack-elasticsearch#862)

Aggregated data extraction is done in 2 phases:

1. search
2. process response

The first phase cannot be currently cancelled. However, it usually
is the fastest of the two.

The second phase processes the histogram buckets in the search
response into flat JSON and then posts the result stream to the job.
This phase can be split into batches where a few buckets are posted
to the job at a time. Cancelling can then work between batches.

This commit changes the AggregationDataExtractor to process the
search response in batches. The definition of a batch is crucial
as it has to be short enough to allow for responsive cancelling,
yet long enough to minimise overhead due to multiple calls to the
post data action. The number of key-value pairs written by the
processor is a good candidate for a batch size measure. By testing,
1000 seems to be an effective number.

relates elastic/x-pack-elasticsearch#802

Original commit: elastic/x-pack-elasticsearch@ce3a172411
This commit is contained in:
Dimitris Athanasiou 2017-03-31 10:15:43 +01:00 committed by GitHub
parent cbfa5b5f0e
commit 0cb2b18265
5 changed files with 247 additions and 94 deletions

View File

@ -10,9 +10,11 @@ import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.xpack.ml.datafeed.extractor.DataExtractor;
import org.elasticsearch.xpack.ml.datafeed.extractor.ExtractorUtils;
@ -20,33 +22,50 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
/**
* An implementation that extracts data from elasticsearch using search with aggregations on a client.
* Cancellation is effective only when it is called before the first time {@link #next()} is called.
* The first time {@link #next()} is called, the search is executed. The result aggregations are
* stored and they are then processed in batches. Cancellation is supported between batches.
* Note that this class is NOT thread-safe.
*/
class AggregationDataExtractor implements DataExtractor {
private static final Logger LOGGER = Loggers.getLogger(AggregationDataExtractor.class);
/**
* The number of key-value pairs written in each batch to process.
* This has to be a number that is small enough to allow for responsive
* cancelling and big enough to not cause overhead by calling the
* post data action too often. The value of 1000 was determined via
* such testing.
*/
private static int BATCH_KEY_VALUE_PAIRS = 1000;
private final Client client;
private final AggregationDataExtractorContext context;
private boolean hasNext;
private boolean isCancelled;
private LinkedList<Histogram.Bucket> histogramBuckets;
AggregationDataExtractor(Client client, AggregationDataExtractorContext dataExtractorContext) {
this.client = Objects.requireNonNull(client);
this.context = Objects.requireNonNull(dataExtractorContext);
this.hasNext = true;
this.isCancelled = false;
this.histogramBuckets = null;
}
@Override
public boolean hasNext() {
return hasNext && !isCancelled;
return hasNext;
}
@Override
@ -58,6 +77,7 @@ class AggregationDataExtractor implements DataExtractor {
public void cancel() {
LOGGER.trace("[{}] Data extractor received cancel request", context.jobId);
isCancelled = true;
hasNext = false;
}
@Override
@ -65,16 +85,20 @@ class AggregationDataExtractor implements DataExtractor {
if (!hasNext()) {
throw new NoSuchElementException();
}
Optional<InputStream> stream = Optional.ofNullable(search());
hasNext = false;
return stream;
if (histogramBuckets == null) {
histogramBuckets = search();
}
return Optional.ofNullable(processNextBatch());
}
private InputStream search() throws IOException {
private LinkedList<Histogram.Bucket> search() throws IOException {
if (histogramBuckets != null) {
throw new IllegalStateException("search should only be performed once");
}
LOGGER.debug("[{}] Executing aggregated search", context.jobId);
SearchResponse searchResponse = executeSearchRequest(buildSearchRequest());
ExtractorUtils.checkSearchWasSuccessful(context.jobId, searchResponse);
return processSearchResponse(searchResponse);
return new LinkedList<>(getHistogramBuckets(searchResponse.getAggregations()));
}
protected SearchResponse executeSearchRequest(SearchRequestBuilder searchRequestBuilder) {
@ -93,14 +117,44 @@ class AggregationDataExtractor implements DataExtractor {
return searchRequestBuilder;
}
private InputStream processSearchResponse(SearchResponse searchResponse) throws IOException {
if (searchResponse.getAggregations() == null) {
private List<Histogram.Bucket> getHistogramBuckets(@Nullable Aggregations aggs) {
if (aggs == null) {
return Collections.emptyList();
}
List<Aggregation> aggsAsList = aggs.asList();
if (aggsAsList.isEmpty()) {
return Collections.emptyList();
}
if (aggsAsList.size() > 1) {
throw new IllegalArgumentException("Multiple top level aggregations not supported; found: "
+ aggsAsList.stream().map(a -> a.getName()).collect(Collectors.toList()));
}
Aggregation topAgg = aggsAsList.get(0);
if (topAgg instanceof Histogram) {
if (context.timeField.equals(topAgg.getName()) == false) {
throw new IllegalArgumentException("Histogram name [" + topAgg.getName()
+ "] does not match time field [" + context.timeField + "]");
}
return ((Histogram) topAgg).getBuckets();
} else {
throw new IllegalArgumentException("Top level aggregation should be [histogram]");
}
}
private InputStream processNextBatch() throws IOException {
if (histogramBuckets.isEmpty()) {
hasNext = false;
return null;
}
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(context.includeDocCount, outputStream)) {
for (Aggregation agg : searchResponse.getAggregations().asList()) {
processor.process(agg);
while (histogramBuckets.isEmpty() == false && processor.getKeyValueCount() < BATCH_KEY_VALUE_PAIRS) {
processor.process(context.timeField, histogramBuckets.removeFirst());
}
if (histogramBuckets.isEmpty()) {
hasNext = false;
}
}
return new ByteArrayInputStream(outputStream.toByteArray());

View File

@ -35,36 +35,31 @@ class AggregationToJsonProcessor implements Releasable {
private final boolean includeDocCount;
private final XContentBuilder jsonBuilder;
private final Map<String, Object> keyValuePairs;
private long keyValueWrittenCount;
AggregationToJsonProcessor(boolean includeDocCount, OutputStream outputStream) throws IOException {
this.includeDocCount = includeDocCount;
jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream);
keyValuePairs = new LinkedHashMap<>();
keyValueWrittenCount = 0;
}
/**
* Processes an {@link Aggregation} and writes a flat JSON document for each of its leaf aggregations.
* It expects aggregations to have 0..1 sub-aggregations.
* It expects the top level aggregation to be {@link Histogram}.
* It expects that all sub-aggregations of the top level are either {@link Terms} or {@link NumericMetricsAggregation.SingleValue}.
* Processes a {@link Histogram.Bucket} and writes a flat JSON document for each of its leaf aggregations.
* Supported sub-aggregations include:
* <ul>
* <li>{@link Terms}</li>
* <li>{@link NumericMetricsAggregation.SingleValue}</li>
* <li>{@link Percentiles}</li>
* </ul>
*/
public void process(Aggregation aggregation) throws IOException {
if (aggregation instanceof Histogram) {
processHistogram((Histogram) aggregation);
} else {
throw new IllegalArgumentException("Top level aggregation should be [histogram]");
}
}
private void processHistogram(Histogram histogram) throws IOException {
for (Histogram.Bucket bucket : histogram.getBuckets()) {
Object timestamp = bucket.getKey();
if (timestamp instanceof BaseDateTime) {
timestamp = ((BaseDateTime) timestamp).getMillis();
}
keyValuePairs.put(histogram.getName(), timestamp);
processNestedAggs(bucket.getDocCount(), bucket.getAggregations());
public void process(String timeField, Histogram.Bucket bucket) throws IOException {
Object timestamp = bucket.getKey();
if (timestamp instanceof BaseDateTime) {
timestamp = ((BaseDateTime) timestamp).getMillis();
}
keyValuePairs.put(timeField, timestamp);
processNestedAggs(bucket.getDocCount(), bucket.getAggregations());
}
private void processNestedAggs(long docCount, Aggregations subAggs) throws IOException {
@ -121,9 +116,11 @@ class AggregationToJsonProcessor implements Releasable {
jsonBuilder.startObject();
for (Map.Entry<String, Object> keyValue : keyValuePairs.entrySet()) {
jsonBuilder.field(keyValue.getKey(), keyValue.getValue());
keyValueWrittenCount++;
}
if (includeDocCount) {
jsonBuilder.field(DatafeedConfig.DOC_COUNT, docCount);
keyValueWrittenCount++;
}
jsonBuilder.endObject();
}
@ -133,4 +130,11 @@ class AggregationToJsonProcessor implements Releasable {
public void close() {
jsonBuilder.close();
}
/**
* The key-value pairs that have been written so far
*/
public long getKeyValueCount() {
return keyValueWrittenCount;
}
}

View File

@ -86,7 +86,7 @@ public class AutodetectCommunicator implements Closeable {
CountingInputStream countingStream = new CountingInputStream(inputStream, dataCountsReporter);
DataToProcessWriter autoDetectWriter = createProcessWriter(params.getDataDescription());
DataCounts results = autoDetectWriter.write(countingStream, xContentType);
DataCounts results = autoDetectWriter.write(countingStream, xContentType);
autoDetectWriter.flush();
return results;
}, handler);

View File

@ -16,6 +16,7 @@ import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
@ -120,6 +121,93 @@ public class AggregationDataExtractorTests extends ESTestCase {
stringContainsInOrder(Arrays.asList("aggregations", "histogram", "time", "terms", "airline", "avg", "responsetime")));
}
public void testExtractionGivenMultipleBatches() throws IOException {
// Each bucket is 2 key-value pairs, thus 1200 buckets will be 2400
// key-value pairs. They should be processed in 3 batches.
int buckets = 1200;
List<Histogram.Bucket> histogramBuckets = new ArrayList<>(buckets);
long timestamp = 1000;
for (int i = 0; i < buckets; i++) {
histogramBuckets.add(createHistogramBucket(timestamp, 3));
timestamp += 1000L;
}
TestDataExtractor extractor = new TestDataExtractor(1000L, timestamp + 1);
SearchResponse response = createSearchResponse("time", histogramBuckets);
extractor.setNextResponse(response);
assertThat(extractor.hasNext(), is(true));
assertThat(countMatches('{', asString(extractor.next().get())), equalTo(500L));
assertThat(extractor.hasNext(), is(true));
assertThat(countMatches('{', asString(extractor.next().get())), equalTo(500L));
assertThat(extractor.hasNext(), is(true));
assertThat(countMatches('{', asString(extractor.next().get())), equalTo(200L));
assertThat(extractor.hasNext(), is(false));
assertThat(capturedSearchRequests.size(), equalTo(1));
}
public void testExtractionGivenResponseHasNullAggs() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
SearchResponse response = createSearchResponse(null);
extractor.setNextResponse(response);
assertThat(extractor.hasNext(), is(true));
assertThat(extractor.next().isPresent(), is(false));
assertThat(extractor.hasNext(), is(false));
assertThat(capturedSearchRequests.size(), equalTo(1));
}
public void testExtractionGivenResponseHasEmptyAggs() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
Aggregations emptyAggs = mock(Aggregations.class);
when(emptyAggs.asList()).thenReturn(Collections.emptyList());
SearchResponse response = createSearchResponse(emptyAggs);
extractor.setNextResponse(response);
assertThat(extractor.hasNext(), is(true));
assertThat(extractor.next().isPresent(), is(false));
assertThat(extractor.hasNext(), is(false));
assertThat(capturedSearchRequests.size(), equalTo(1));
}
public void testExtractionGivenResponseHasInvalidTopLevelAgg() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
Terms termsAgg = mock(Terms.class);
Aggregations emptyAggs = mock(Aggregations.class);
when(emptyAggs.asList()).thenReturn(Collections.singletonList(termsAgg));
SearchResponse response = createSearchResponse(emptyAggs);
extractor.setNextResponse(response);
assertThat(extractor.hasNext(), is(true));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> extractor.next());
assertThat(e.getMessage(), containsString("Top level aggregation should be [histogram]"));
}
public void testExtractionGivenResponseHasMultipleTopLevelAggs() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
Histogram histogram1 = mock(Histogram.class);
when(histogram1.getName()).thenReturn("hist_1");
Histogram histogram2 = mock(Histogram.class);
when(histogram2.getName()).thenReturn("hist_2");
Aggregations emptyAggs = mock(Aggregations.class);
when(emptyAggs.asList()).thenReturn(Arrays.asList(histogram1, histogram2));
SearchResponse response = createSearchResponse(emptyAggs);
extractor.setNextResponse(response);
assertThat(extractor.hasNext(), is(true));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> extractor.next());
assertThat(e.getMessage(), containsString("Multiple top level aggregations not supported; found: [hist_1, hist_2]"));
}
public void testExtractionGivenCancelBeforeNext() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 4000L);
SearchResponse response = createSearchResponse("time", Collections.emptyList());
@ -129,6 +217,34 @@ public class AggregationDataExtractorTests extends ESTestCase {
assertThat(extractor.hasNext(), is(false));
}
public void testExtractionGivenCancelHalfWay() throws IOException {
int buckets = 1200;
List<Histogram.Bucket> histogramBuckets = new ArrayList<>(buckets);
long timestamp = 1000;
for (int i = 0; i < buckets; i++) {
histogramBuckets.add(createHistogramBucket(timestamp, 3));
timestamp += 1000L;
}
TestDataExtractor extractor = new TestDataExtractor(1000L, timestamp + 1);
SearchResponse response = createSearchResponse("time", histogramBuckets);
extractor.setNextResponse(response);
assertThat(extractor.hasNext(), is(true));
assertThat(countMatches('{', asString(extractor.next().get())), equalTo(500L));
assertThat(extractor.hasNext(), is(true));
assertThat(countMatches('{', asString(extractor.next().get())), equalTo(500L));
assertThat(extractor.hasNext(), is(true));
extractor.cancel();
assertThat(extractor.hasNext(), is(false));
assertThat(extractor.isCancelled(), is(true));
assertThat(capturedSearchRequests.size(), equalTo(1));
}
public void testExtractionGivenSearchResponseHasError() throws IOException {
TestDataExtractor extractor = new TestDataExtractor(1000L, 2000L);
extractor.setNextResponse(createErrorResponse());
@ -159,17 +275,20 @@ public class AggregationDataExtractorTests extends ESTestCase {
}
private SearchResponse createSearchResponse(String histogramName, List<Histogram.Bucket> histogramBuckets) {
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.status()).thenReturn(RestStatus.OK);
when(searchResponse.getScrollId()).thenReturn(randomAsciiOfLength(1000));
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn(histogramName);
when(histogram.getBuckets()).thenReturn(histogramBuckets);
Aggregations searchAggs = mock(Aggregations.class);
when(searchAggs.asList()).thenReturn(Arrays.asList(histogram));
when(searchResponse.getAggregations()).thenReturn(searchAggs);
return createSearchResponse(searchAggs);
}
private SearchResponse createSearchResponse(Aggregations aggregations) {
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.status()).thenReturn(RestStatus.OK);
when(searchResponse.getScrollId()).thenReturn(randomAsciiOfLength(1000));
when(searchResponse.getAggregations()).thenReturn(aggregations);
return searchResponse;
}
@ -200,4 +319,8 @@ public class AggregationDataExtractorTests extends ESTestCase {
return reader.lines().collect(Collectors.joining("\n"));
}
}
private static long countMatches(char c, String text) {
return text.chars().filter(current -> current == c).count();
}
}

View File

@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
@ -16,6 +15,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@ -34,18 +34,18 @@ import static org.mockito.Mockito.when;
public class AggregationToJsonProcessorTests extends ESTestCase {
private long keyValuePairsWritten = 0;
public void testProcessGivenHistogramOnly() throws IOException {
List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 3),
createHistogramBucket(2000L, 5)
);
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn("time");
when(histogram.getBuckets()).thenReturn(histogramBuckets);
String json = aggToString(histogram);
String json = aggToString("timestamp", histogramBuckets);
assertThat(json, equalTo("{\"time\":1000,\"doc_count\":3} {\"time\":2000,\"doc_count\":5}"));
assertThat(json, equalTo("{\"timestamp\":1000,\"doc_count\":3} {\"timestamp\":2000,\"doc_count\":5}"));
assertThat(keyValuePairsWritten, equalTo(4L));
}
public void testProcessGivenHistogramOnlyAndNoDocCount() throws IOException {
@ -53,13 +53,11 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
createHistogramBucket(1000L, 3),
createHistogramBucket(2000L, 5)
);
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn("time");
when(histogram.getBuckets()).thenReturn(histogramBuckets);
String json = aggToString(histogram, false);
String json = aggToString("time", false, histogramBuckets);
assertThat(json, equalTo("{\"time\":1000} {\"time\":2000}"));
assertThat(keyValuePairsWritten, equalTo(2L));
}
public void testProcessGivenSingleMetricPerHistogram() throws IOException {
@ -67,11 +65,8 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
createHistogramBucket(1000L, 3, Arrays.asList(createSingleValue("my_value", 1.0))),
createHistogramBucket(2000L, 5, Arrays.asList(createSingleValue("my_value", 2.0)))
);
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn("time");
when(histogram.getBuckets()).thenReturn(histogramBuckets);
String json = aggToString(histogram);
String json = aggToString("time", histogramBuckets);
assertThat(json, equalTo("{\"time\":1000,\"my_value\":1.0,\"doc_count\":3} {\"time\":2000,\"my_value\":2.0,\"doc_count\":5}"));
}
@ -84,11 +79,8 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
createHistogramBucket(3000L, 0, Arrays.asList()),
createHistogramBucket(4000L, 7, Arrays.asList(createTerms("my_field", new Term("c", 4), new Term("b", 3))))
);
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn("time");
when(histogram.getBuckets()).thenReturn(histogramBuckets);
String json = aggToString(histogram);
String json = aggToString("time", histogramBuckets);
assertThat(json, equalTo("{\"time\":1000,\"my_field\":\"a\",\"doc_count\":1} " +
"{\"time\":1000,\"my_field\":\"b\",\"doc_count\":2} " +
@ -109,11 +101,8 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
createHistogramBucket(4000L, 7, Arrays.asList(createTerms("my_field",
new Term("c", 4, "my_value", 41.0), new Term("b", 3, "my_value", 42.0))))
);
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn("time");
when(histogram.getBuckets()).thenReturn(histogramBuckets);
String json = aggToString(histogram);
String json = aggToString("time", histogramBuckets);
assertThat(json, equalTo("{\"time\":1000,\"my_field\":\"a\",\"my_value\":11.0,\"doc_count\":1} " +
"{\"time\":1000,\"my_field\":\"b\",\"my_value\":12.0,\"doc_count\":2} " +
@ -155,11 +144,8 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
createHistogramBucket(4000L, 7, Arrays.asList(createTerms("my_field",
new Term("c", 4, c4NumericAggs), new Term("b", 3, b4NumericAggs))))
);
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn("time");
when(histogram.getBuckets()).thenReturn(histogramBuckets);
String json = aggToString(histogram, false);
String json = aggToString("time", false, histogramBuckets);
assertThat(json, equalTo("{\"time\":1000,\"my_field\":\"a\",\"my_value\":111.0,\"my_value2\":112.0} " +
"{\"time\":1000,\"my_field\":\"b\",\"my_value\":121.0,\"my_value2\":122.0} " +
@ -170,23 +156,14 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
"{\"time\":4000,\"my_field\":\"b\",\"my_value\":421.0,\"my_value2\":422.0}"));
}
public void testProcessGivenTopLevelAggIsNotHistogram() throws IOException {
Terms terms = mock(Terms.class);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString(terms));
assertThat(e.getMessage(), containsString("Top level aggregation should be [histogram]"));
}
public void testProcessGivenUnsupportedAggregationUnderHistogram() throws IOException {
Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2);
Histogram anotherHistogram = mock(Histogram.class);
when(anotherHistogram.getName()).thenReturn("nested-agg");
Aggregations subAggs = createAggs(Arrays.asList(anotherHistogram));
when(histogramBucket.getAggregations()).thenReturn(subAggs);
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn("buckets");
when(histogram.getBuckets()).thenReturn(Arrays.asList(histogramBucket));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString(histogram));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBucket));
assertThat(e.getMessage(), containsString("Unsupported aggregation type [nested-agg]"));
}
@ -196,11 +173,8 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
Terms terms2 = mock(Terms.class);
Aggregations subAggs = createAggs(Arrays.asList(terms1, terms2));
when(histogramBucket.getAggregations()).thenReturn(subAggs);
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn("buckets");
when(histogram.getBuckets()).thenReturn(Arrays.asList(histogramBucket));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString(histogram));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBucket));
assertThat(e.getMessage(), containsString("Multiple non-leaf nested aggregations are not supported"));
}
@ -209,11 +183,8 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
createDateHistogramBucket(new DateTime(1000L), 3),
createDateHistogramBucket(new DateTime(2000L), 5)
);
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn("time");
when(histogram.getBuckets()).thenReturn(histogramBuckets);
String json = aggToString(histogram);
String json = aggToString("time", histogramBuckets);
assertThat(json, equalTo("{\"time\":1000,\"doc_count\":3} {\"time\":2000,\"doc_count\":5}"));
}
@ -225,11 +196,8 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
createHistogramBucket(3000L, 10, Arrays.asList(createPercentiles("my_field", 3.0))),
createHistogramBucket(4000L, 14, Arrays.asList(createPercentiles("my_field", 4.0)))
);
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn("time");
when(histogram.getBuckets()).thenReturn(histogramBuckets);
String json = aggToString(histogram);
String json = aggToString("time", histogramBuckets);
assertThat(json, equalTo("{\"time\":1000,\"my_field\":1.0,\"doc_count\":4} " +
"{\"time\":2000,\"my_field\":2.0,\"doc_count\":7} " +
@ -244,22 +212,26 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
createHistogramBucket(3000L, 10, Arrays.asList(createPercentiles("my_field", 3.0))),
createHistogramBucket(4000L, 14, Arrays.asList(createPercentiles("my_field", 4.0)))
);
Histogram histogram = mock(Histogram.class);
when(histogram.getName()).thenReturn("time");
when(histogram.getBuckets()).thenReturn(histogramBuckets);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString(histogram));
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBuckets));
assertThat(e.getMessage(), containsString("Multi-percentile aggregation [my_field] is not supported"));
}
private String aggToString(Aggregation aggregation) throws IOException {
return aggToString(aggregation, true);
private String aggToString(String timeField, Histogram.Bucket bucket) throws IOException {
return aggToString(timeField, true, Collections.singletonList(bucket));
}
private String aggToString(Aggregation aggregation, boolean includeDocCount) throws IOException {
private String aggToString(String timeField, List<Histogram.Bucket> buckets) throws IOException {
return aggToString(timeField, true, buckets);
}
private String aggToString(String timeField, boolean includeDocCount, List<Histogram.Bucket> buckets) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(includeDocCount, outputStream)) {
processor.process(aggregation);
for (Histogram.Bucket bucket : buckets) {
processor.process(timeField, bucket);
}
keyValuePairsWritten = processor.getKeyValueCount();
}
return outputStream.toString(StandardCharsets.UTF_8.name());
}