[ML] Refactor aggregation response handling to make more flexible (elastic/x-pack-elasticsearch#1795)
Currently, aggregated datafeeds construct JSON from the aggregation response by traversing all nested aggregations. In order to achieve this, multiple leaf aggregations are not supported. Also, scenarios it makes it impossible to effectively use pipeline aggregations as it will not ignore the intermediate bucket aggregations. This commit refactors AggregationToJsonProcessor in order to support the above scenarios. This is achieved by only converting the fields of interest, that is the job analysis fields. Original commit: elastic/x-pack-elasticsearch@8b575956ca
This commit is contained in:
parent
efc93c7246
commit
701dc53c2a
|
@ -146,7 +146,7 @@ class AggregationDataExtractor implements DataExtractor {
|
|||
|
||||
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(
|
||||
context.timeField, context.includeDocCount, outputStream)) {
|
||||
context.timeField, context.fields, context.includeDocCount, outputStream)) {
|
||||
while (histogramBuckets.isEmpty() == false && processor.getKeyValueCount() < BATCH_KEY_VALUE_PAIRS) {
|
||||
processor.process(histogramBuckets.removeFirst());
|
||||
}
|
||||
|
|
|
@ -10,11 +10,13 @@ import org.elasticsearch.search.aggregations.AggregatorFactories;
|
|||
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
class AggregationDataExtractorContext {
|
||||
|
||||
final String jobId;
|
||||
final String timeField;
|
||||
final Set<String> fields;
|
||||
final String[] indices;
|
||||
final String[] types;
|
||||
final QueryBuilder query;
|
||||
|
@ -23,10 +25,11 @@ class AggregationDataExtractorContext {
|
|||
final long end;
|
||||
final boolean includeDocCount;
|
||||
|
||||
AggregationDataExtractorContext(String jobId, String timeField, List<String> indices, List<String> types, QueryBuilder query,
|
||||
AggregatorFactories.Builder aggs, long start, long end, boolean includeDocCount) {
|
||||
AggregationDataExtractorContext(String jobId, String timeField, Set<String> fields, List<String> indices, List<String> types,
|
||||
QueryBuilder query, AggregatorFactories.Builder aggs, long start, long end, boolean includeDocCount) {
|
||||
this.jobId = Objects.requireNonNull(jobId);
|
||||
this.timeField = Objects.requireNonNull(timeField);
|
||||
this.fields = Objects.requireNonNull(fields);
|
||||
this.indices = indices.toArray(new String[indices.size()]);
|
||||
this.types = types.toArray(new String[types.size()]);
|
||||
this.query = Objects.requireNonNull(query);
|
||||
|
|
|
@ -30,6 +30,7 @@ public class AggregationDataExtractorFactory implements DataExtractorFactory {
|
|||
AggregationDataExtractorContext dataExtractorContext = new AggregationDataExtractorContext(
|
||||
job.getId(),
|
||||
job.getDataDescription().getTimeField(),
|
||||
job.getAnalysisConfig().analysisFields(),
|
||||
datafeedConfig.getIndices(),
|
||||
datafeedConfig.getTypes(),
|
||||
datafeedConfig.getQuery(),
|
||||
|
|
|
@ -11,8 +11,8 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
|||
import org.elasticsearch.common.xcontent.json.JsonXContent;
|
||||
import org.elasticsearch.search.aggregations.Aggregation;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
|
||||
import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation;
|
||||
import org.elasticsearch.search.aggregations.metrics.max.Max;
|
||||
import org.elasticsearch.search.aggregations.metrics.percentiles.Percentile;
|
||||
|
@ -28,6 +28,7 @@ import java.util.LinkedHashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* Processes {@link Aggregation} objects and writes flat JSON documents for each leaf aggregation.
|
||||
|
@ -37,14 +38,25 @@ import java.util.Objects;
|
|||
class AggregationToJsonProcessor implements Releasable {
|
||||
|
||||
private final String timeField;
|
||||
private final Set<String> fields;
|
||||
private final boolean includeDocCount;
|
||||
private final XContentBuilder jsonBuilder;
|
||||
private final Map<String, Object> keyValuePairs;
|
||||
private long keyValueWrittenCount;
|
||||
|
||||
AggregationToJsonProcessor(String timeField, boolean includeDocCount, OutputStream outputStream)
|
||||
/**
|
||||
* Constructs a processor that processes aggregations into JSON
|
||||
*
|
||||
* @param timeField the time field
|
||||
* @param fields the fields to convert into JSON
|
||||
* @param includeDocCount whether to include the doc_count
|
||||
* @param outputStream the stream to write the output
|
||||
* @throws IOException if an error occurs during the processing
|
||||
*/
|
||||
AggregationToJsonProcessor(String timeField, Set<String> fields, boolean includeDocCount, OutputStream outputStream)
|
||||
throws IOException {
|
||||
this.timeField = Objects.requireNonNull(timeField);
|
||||
this.fields = Objects.requireNonNull(fields);
|
||||
this.includeDocCount = includeDocCount;
|
||||
jsonBuilder = new XContentBuilder(JsonXContent.jsonXContent, outputStream);
|
||||
keyValuePairs = new LinkedHashMap<>();
|
||||
|
@ -55,7 +67,7 @@ class AggregationToJsonProcessor implements Releasable {
|
|||
* Processes a {@link Histogram.Bucket} and writes a flat JSON document for each of its leaf aggregations.
|
||||
* Supported sub-aggregations include:
|
||||
* <ul>
|
||||
* <li>{@link Terms}</li>
|
||||
* <li>{@link MultiBucketsAggregation}</li>
|
||||
* <li>{@link NumericMetricsAggregation.SingleValue}</li>
|
||||
* <li>{@link Percentiles}</li>
|
||||
* </ul>
|
||||
|
@ -82,51 +94,66 @@ class AggregationToJsonProcessor implements Releasable {
|
|||
}
|
||||
|
||||
private void processNestedAggs(long docCount, List<Aggregation> aggs) throws IOException {
|
||||
if (aggs.isEmpty()) {
|
||||
writeJsonObject(docCount);
|
||||
return;
|
||||
}
|
||||
if (aggs.get(0) instanceof Terms) {
|
||||
if (aggs.size() > 1) {
|
||||
throw new IllegalArgumentException("Multiple non-leaf nested aggregations are not supported");
|
||||
}
|
||||
processTerms((Terms) aggs.get(0));
|
||||
} else {
|
||||
List<String> addedKeys = new ArrayList<>();
|
||||
for (Aggregation nestedAgg : aggs) {
|
||||
if (nestedAgg instanceof NumericMetricsAggregation.SingleValue) {
|
||||
addedKeys.add(processSingleValue((NumericMetricsAggregation.SingleValue) nestedAgg));
|
||||
} else if (nestedAgg instanceof Percentiles) {
|
||||
addedKeys.add(processPercentiles((Percentiles) nestedAgg));
|
||||
boolean processedBucketAgg = false;
|
||||
List<String> addedLeafKeys = new ArrayList<>();
|
||||
for (Aggregation agg : aggs) {
|
||||
if (fields.contains(agg.getName())) {
|
||||
if (agg instanceof MultiBucketsAggregation) {
|
||||
if (processedBucketAgg) {
|
||||
throw new IllegalArgumentException("Multiple bucket aggregations at the same level are not supported");
|
||||
}
|
||||
if (addedLeafKeys.isEmpty() == false) {
|
||||
throw new IllegalArgumentException("Mixing bucket and leaf aggregations at the same level is not supported");
|
||||
}
|
||||
processedBucketAgg = true;
|
||||
processBucket((MultiBucketsAggregation) agg);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unsupported aggregation type [" + nestedAgg.getName() + "]");
|
||||
if (processedBucketAgg) {
|
||||
throw new IllegalArgumentException("Mixing bucket and leaf aggregations at the same level is not supported");
|
||||
}
|
||||
addedLeafKeys.add(processLeaf(agg));
|
||||
}
|
||||
}
|
||||
}
|
||||
if (addedLeafKeys.isEmpty() == false) {
|
||||
writeJsonObject(docCount);
|
||||
addedLeafKeys.forEach(k -> keyValuePairs.remove(k));
|
||||
}
|
||||
|
||||
if (processedBucketAgg == false && addedLeafKeys.isEmpty()) {
|
||||
writeJsonObject(docCount);
|
||||
addedKeys.forEach(k -> keyValuePairs.remove(k));
|
||||
}
|
||||
}
|
||||
|
||||
private void processTerms(Terms termsAgg) throws IOException {
|
||||
for (Terms.Bucket bucket : termsAgg.getBuckets()) {
|
||||
keyValuePairs.put(termsAgg.getName(), bucket.getKey());
|
||||
private void processBucket(MultiBucketsAggregation bucketAgg) throws IOException {
|
||||
for (MultiBucketsAggregation.Bucket bucket : bucketAgg.getBuckets()) {
|
||||
keyValuePairs.put(bucketAgg.getName(), bucket.getKey());
|
||||
processNestedAggs(bucket.getDocCount(), asList(bucket.getAggregations()));
|
||||
keyValuePairs.remove(termsAgg.getName());
|
||||
keyValuePairs.remove(bucketAgg.getName());
|
||||
}
|
||||
}
|
||||
|
||||
private String processSingleValue(NumericMetricsAggregation.SingleValue singleValue) throws IOException {
|
||||
keyValuePairs.put(singleValue.getName(), singleValue.value());
|
||||
return singleValue.getName();
|
||||
private String processLeaf(Aggregation agg) throws IOException {
|
||||
if (agg instanceof NumericMetricsAggregation.SingleValue) {
|
||||
processSingleValue((NumericMetricsAggregation.SingleValue) agg);
|
||||
} else if (agg instanceof Percentiles) {
|
||||
processPercentiles((Percentiles) agg);
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unsupported aggregation type [" + agg.getName() + "]");
|
||||
}
|
||||
return agg.getName();
|
||||
}
|
||||
|
||||
private String processPercentiles(Percentiles percentiles) throws IOException {
|
||||
private void processSingleValue(NumericMetricsAggregation.SingleValue singleValue) throws IOException {
|
||||
keyValuePairs.put(singleValue.getName(), singleValue.value());
|
||||
}
|
||||
|
||||
private void processPercentiles(Percentiles percentiles) throws IOException {
|
||||
Iterator<Percentile> percentileIterator = percentiles.iterator();
|
||||
keyValuePairs.put(percentiles.getName(), percentileIterator.next().getValue());
|
||||
if (percentileIterator.hasNext()) {
|
||||
throw new IllegalArgumentException("Multi-percentile aggregation [" + percentiles.getName() + "] is not supported");
|
||||
}
|
||||
return percentiles.getName();
|
||||
}
|
||||
|
||||
private void writeJsonObject(long docCount) throws IOException {
|
||||
|
|
|
@ -274,16 +274,16 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Return the list of fields required by the analysis.
|
||||
* Return the set of fields required by the analysis.
|
||||
* These are the influencer fields, metric field, partition field,
|
||||
* by field and over field of each detector, plus the summary count
|
||||
* field and the categorization field name of the job.
|
||||
* <code>null</code> and empty strings are filtered from the
|
||||
* config.
|
||||
*
|
||||
* @return List of required analysis fields - never <code>null</code>
|
||||
* @return Set of required analysis fields - never <code>null</code>
|
||||
*/
|
||||
public List<String> analysisFields() {
|
||||
public Set<String> analysisFields() {
|
||||
Set<String> analysisFields = termFields();
|
||||
|
||||
addIfNotNull(analysisFields, categorizationFieldName);
|
||||
|
@ -296,7 +296,7 @@ public class AnalysisConfig extends ToXContentToBytes implements Writeable {
|
|||
// remove empty strings
|
||||
analysisFields.remove("");
|
||||
|
||||
return new ArrayList<>(analysisFields);
|
||||
return analysisFields;
|
||||
}
|
||||
|
||||
private static void addIfNotNull(Set<String> fields, String field) {
|
||||
|
|
|
@ -170,7 +170,7 @@ public abstract class AbstractDataToProcessWriter implements DataToProcessWriter
|
|||
* must see in the csv header
|
||||
*/
|
||||
final Collection<String> inputFields() {
|
||||
Set<String> requiredFields = new HashSet<>(analysisConfig.analysisFields());
|
||||
Set<String> requiredFields = analysisConfig.analysisFields();
|
||||
requiredFields.add(dataDescription.getTimeField());
|
||||
|
||||
return requiredFields;
|
||||
|
|
|
@ -28,8 +28,10 @@ import java.nio.charset.StandardCharsets;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term;
|
||||
|
@ -49,6 +51,7 @@ public class AggregationDataExtractorTests extends ESTestCase {
|
|||
private List<SearchRequestBuilder> capturedSearchRequests;
|
||||
private String jobId;
|
||||
private String timeField;
|
||||
private Set<String> fields;
|
||||
private List<String> types;
|
||||
private List<String> indices;
|
||||
private QueryBuilder query;
|
||||
|
@ -79,6 +82,8 @@ public class AggregationDataExtractorTests extends ESTestCase {
|
|||
capturedSearchRequests = new ArrayList<>();
|
||||
jobId = "test-job";
|
||||
timeField = "time";
|
||||
fields = new HashSet<>();
|
||||
fields.addAll(Arrays.asList("time", "airline", "responsetime"));
|
||||
indices = Arrays.asList("index-1", "index-2");
|
||||
types = Arrays.asList("type-1", "type-2");
|
||||
query = QueryBuilders.matchAllQuery();
|
||||
|
@ -270,7 +275,7 @@ public class AggregationDataExtractorTests extends ESTestCase {
|
|||
}
|
||||
|
||||
private AggregationDataExtractorContext createContext(long start, long end) {
|
||||
return new AggregationDataExtractorContext(jobId, timeField, indices, types, query, aggs, start, end, true);
|
||||
return new AggregationDataExtractorContext(jobId, timeField, fields, indices, types, query, aggs, start, end, true);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
|
@ -5,9 +5,12 @@
|
|||
*/
|
||||
package org.elasticsearch.xpack.ml.datafeed.extractor.aggregation;
|
||||
|
||||
import org.elasticsearch.common.util.set.Sets;
|
||||
import org.elasticsearch.search.aggregations.Aggregation;
|
||||
import org.elasticsearch.search.aggregations.Aggregations;
|
||||
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
|
||||
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
|
||||
import org.elasticsearch.search.aggregations.metrics.max.Max;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
|
@ -18,6 +21,7 @@ import java.util.Collections;
|
|||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.Term;
|
||||
import static org.elasticsearch.xpack.ml.datafeed.extractor.aggregation.AggregationTestUtils.createAggs;
|
||||
|
@ -41,7 +45,8 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
|
|||
createHistogramBucket(2000L, 5)
|
||||
);
|
||||
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBuckets));
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
|
||||
() -> aggToString("time", Collections.emptySet(), histogramBuckets));
|
||||
assertThat(e.getMessage(), containsString("Missing max aggregation for time_field [time]"));
|
||||
}
|
||||
|
||||
|
@ -51,7 +56,8 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
|
|||
createHistogramBucket(2000L, 5, Collections.singletonList(createTerms("time")))
|
||||
);
|
||||
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBuckets));
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
|
||||
() -> aggToString("time", Collections.emptySet(), histogramBuckets));
|
||||
assertThat(e.getMessage(), containsString("Missing max aggregation for time_field [time]"));
|
||||
}
|
||||
|
||||
|
@ -61,7 +67,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
|
|||
createHistogramBucket(2000L, 5, Collections.singletonList(createMax("timestamp", 2800)))
|
||||
);
|
||||
|
||||
String json = aggToString("timestamp", histogramBuckets);
|
||||
String json = aggToString("timestamp", Collections.emptySet(), histogramBuckets);
|
||||
|
||||
assertThat(json, equalTo("{\"timestamp\":1200,\"doc_count\":3} {\"timestamp\":2800,\"doc_count\":5}"));
|
||||
assertThat(keyValuePairsWritten, equalTo(4L));
|
||||
|
@ -73,7 +79,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
|
|||
createHistogramBucket(2000L, 5, Collections.singletonList(createMax("time", 2000)))
|
||||
);
|
||||
|
||||
String json = aggToString("time", false, histogramBuckets);
|
||||
String json = aggToString("time", Collections.emptySet(), false, histogramBuckets);
|
||||
|
||||
assertThat(json, equalTo("{\"time\":1000} {\"time\":2000}"));
|
||||
assertThat(keyValuePairsWritten, equalTo(2L));
|
||||
|
@ -87,7 +93,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
|
|||
createMax("time", 2000), createSingleValue("my_value", 2.0)))
|
||||
);
|
||||
|
||||
String json = aggToString("time", histogramBuckets);
|
||||
String json = aggToString("time", Sets.newHashSet("my_value"), histogramBuckets);
|
||||
|
||||
assertThat(json, equalTo("{\"time\":1000,\"my_value\":1.0,\"doc_count\":3} {\"time\":2000,\"my_value\":2.0,\"doc_count\":5}"));
|
||||
}
|
||||
|
@ -106,7 +112,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
|
|||
createTerms("my_field", new Term("c", 4), new Term("b", 3))))
|
||||
);
|
||||
|
||||
String json = aggToString("time", histogramBuckets);
|
||||
String json = aggToString("time", Sets.newHashSet("time", "my_field"), histogramBuckets);
|
||||
|
||||
assertThat(json, equalTo("{\"time\":1100,\"my_field\":\"a\",\"doc_count\":1} " +
|
||||
"{\"time\":1100,\"my_field\":\"b\",\"doc_count\":2} " +
|
||||
|
@ -132,7 +138,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
|
|||
createTerms("my_field", new Term("c", 4, "my_value", 41.0), new Term("b", 3, "my_value", 42.0))))
|
||||
);
|
||||
|
||||
String json = aggToString("time", histogramBuckets);
|
||||
String json = aggToString("time", Sets.newHashSet("my_field", "my_value"), histogramBuckets);
|
||||
|
||||
assertThat(json, equalTo("{\"time\":1000,\"my_field\":\"a\",\"my_value\":11.0,\"doc_count\":1} " +
|
||||
"{\"time\":1000,\"my_field\":\"b\",\"my_value\":12.0,\"doc_count\":2} " +
|
||||
|
@ -179,7 +185,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
|
|||
createTerms("my_field", new Term("c", 4, c4NumericAggs), new Term("b", 3, b4NumericAggs))))
|
||||
);
|
||||
|
||||
String json = aggToString("time", false, histogramBuckets);
|
||||
String json = aggToString("time", Sets.newHashSet("my_field", "my_value", "my_value2"), false, histogramBuckets);
|
||||
|
||||
assertThat(json, equalTo("{\"time\":1000,\"my_field\":\"a\",\"my_value\":111.0,\"my_value2\":112.0} " +
|
||||
"{\"time\":1000,\"my_field\":\"b\",\"my_value\":121.0,\"my_value2\":122.0} " +
|
||||
|
@ -192,24 +198,78 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
|
|||
|
||||
public void testProcessGivenUnsupportedAggregationUnderHistogram() throws IOException {
|
||||
Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2);
|
||||
Histogram anotherHistogram = mock(Histogram.class);
|
||||
Aggregation anotherHistogram = mock(Aggregation.class);
|
||||
when(anotherHistogram.getName()).thenReturn("nested-agg");
|
||||
Aggregations subAggs = createAggs(Arrays.asList(createMax("time", 1000), anotherHistogram));
|
||||
when(histogramBucket.getAggregations()).thenReturn(subAggs);
|
||||
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBucket));
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
|
||||
() -> aggToString("time", Sets.newHashSet("nested-agg"), histogramBucket));
|
||||
assertThat(e.getMessage(), containsString("Unsupported aggregation type [nested-agg]"));
|
||||
}
|
||||
|
||||
public void testProcessGivenMultipleNestedAggregations() throws IOException {
|
||||
public void testProcessGivenMultipleBucketAggregations() throws IOException {
|
||||
Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2);
|
||||
Terms terms1 = mock(Terms.class);
|
||||
when(terms1.getName()).thenReturn("terms_1");
|
||||
Terms terms2 = mock(Terms.class);
|
||||
when(terms2.getName()).thenReturn("terms_2");
|
||||
Aggregations subAggs = createAggs(Arrays.asList(createMax("time", 1000), terms1, terms2));
|
||||
when(histogramBucket.getAggregations()).thenReturn(subAggs);
|
||||
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBucket));
|
||||
assertThat(e.getMessage(), containsString("Multiple non-leaf nested aggregations are not supported"));
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
|
||||
() -> aggToString("time", Sets.newHashSet("terms_1", "terms_2"), histogramBucket));
|
||||
assertThat(e.getMessage(), containsString("Multiple bucket aggregations at the same level are not supported"));
|
||||
}
|
||||
|
||||
public void testProcessGivenMixedBucketAndLeafAggregationsAtSameLevel_BucketFirst() throws IOException {
|
||||
Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2);
|
||||
Terms terms = mock(Terms.class);
|
||||
when(terms.getName()).thenReturn("terms");
|
||||
Max maxAgg = createMax("max_value", 1200);
|
||||
Aggregations subAggs = createAggs(Arrays.asList(createMax("time", 1000), terms, maxAgg));
|
||||
when(histogramBucket.getAggregations()).thenReturn(subAggs);
|
||||
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
|
||||
() -> aggToString("time", Sets.newHashSet("terms", "max_value"), histogramBucket));
|
||||
assertThat(e.getMessage(), containsString("Mixing bucket and leaf aggregations at the same level is not supported"));
|
||||
}
|
||||
|
||||
public void testProcessGivenMixedBucketAndLeafAggregationsAtSameLevel_LeafFirst() throws IOException {
|
||||
Histogram.Bucket histogramBucket = createHistogramBucket(1000L, 2);
|
||||
Max maxAgg = createMax("max_value", 1200);
|
||||
Terms terms = mock(Terms.class);
|
||||
when(terms.getName()).thenReturn("terms");
|
||||
Aggregations subAggs = createAggs(Arrays.asList(createMax("time", 1000), maxAgg, terms));
|
||||
when(histogramBucket.getAggregations()).thenReturn(subAggs);
|
||||
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
|
||||
() -> aggToString("time", Sets.newHashSet("terms", "max_value"), histogramBucket));
|
||||
assertThat(e.getMessage(), containsString("Mixing bucket and leaf aggregations at the same level is not supported"));
|
||||
}
|
||||
|
||||
public void testProcessGivenBucketAndLeafAggregationsButBucketNotInFields() throws IOException {
|
||||
List<Histogram.Bucket> histogramBuckets = Arrays.asList(
|
||||
createHistogramBucket(1000L, 4, Arrays.asList(
|
||||
createMax("time", 1100),
|
||||
createMax("my_value", 1),
|
||||
createTerms("my_field", new Term("a", 1), new Term("b", 2), new Term("c", 1)))),
|
||||
createHistogramBucket(2000L, 5, Arrays.asList(
|
||||
createMax("time", 2200),
|
||||
createMax("my_value", 2),
|
||||
createTerms("my_field", new Term("a", 5), new Term("b", 2)))),
|
||||
createHistogramBucket(3000L, 0, Collections.singletonList(createMax("time", -1))),
|
||||
createHistogramBucket(4000L, 7, Arrays.asList(
|
||||
createMax("time", 4400),
|
||||
createMax("my_value", 4),
|
||||
createTerms("my_field", new Term("c", 4), new Term("b", 3))))
|
||||
);
|
||||
|
||||
String json = aggToString("time", Sets.newHashSet("time", "my_value"), histogramBuckets);
|
||||
|
||||
assertThat(json, equalTo("{\"time\":1100,\"my_value\":1.0,\"doc_count\":4} " +
|
||||
"{\"time\":2200,\"my_value\":2.0,\"doc_count\":5} " +
|
||||
"{\"time\":4400,\"my_value\":4.0,\"doc_count\":7}"));
|
||||
}
|
||||
|
||||
public void testProcessGivenSinglePercentilesPerHistogram() throws IOException {
|
||||
|
@ -224,7 +284,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
|
|||
createMax("time", 4000), createPercentiles("my_field", 4.0)))
|
||||
);
|
||||
|
||||
String json = aggToString("time", histogramBuckets);
|
||||
String json = aggToString("time", Sets.newHashSet("my_field"), histogramBuckets);
|
||||
|
||||
assertThat(json, equalTo("{\"time\":1000,\"my_field\":1.0,\"doc_count\":4} " +
|
||||
"{\"time\":2000,\"my_field\":2.0,\"doc_count\":7} " +
|
||||
|
@ -244,21 +304,23 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
|
|||
createMax("time", 4000), createPercentiles("my_field", 4.0)))
|
||||
);
|
||||
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> aggToString("time", histogramBuckets));
|
||||
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
|
||||
() -> aggToString("time", Sets.newHashSet("my_field"), histogramBuckets));
|
||||
assertThat(e.getMessage(), containsString("Multi-percentile aggregation [my_field] is not supported"));
|
||||
}
|
||||
|
||||
private String aggToString(String timeField, Histogram.Bucket bucket) throws IOException {
|
||||
return aggToString(timeField, true, Collections.singletonList(bucket));
|
||||
private String aggToString(String timeField, Set<String> fields, Histogram.Bucket bucket) throws IOException {
|
||||
return aggToString(timeField, fields, true, Collections.singletonList(bucket));
|
||||
}
|
||||
|
||||
private String aggToString(String timeField, List<Histogram.Bucket> buckets) throws IOException {
|
||||
return aggToString(timeField, true, buckets);
|
||||
private String aggToString(String timeField, Set<String> fields, List<Histogram.Bucket> buckets) throws IOException {
|
||||
return aggToString(timeField, fields, true, buckets);
|
||||
}
|
||||
|
||||
private String aggToString(String timeField, boolean includeDocCount, List<Histogram.Bucket> buckets) throws IOException {
|
||||
private String aggToString(String timeField, Set<String> fields, boolean includeDocCount, List<Histogram.Bucket> buckets)
|
||||
throws IOException {
|
||||
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(timeField, includeDocCount, outputStream)) {
|
||||
try (AggregationToJsonProcessor processor = new AggregationToJsonProcessor(timeField, fields, includeDocCount, outputStream)) {
|
||||
for (Histogram.Bucket bucket : buckets) {
|
||||
processor.process(bucket);
|
||||
}
|
||||
|
|
|
@ -24,7 +24,6 @@ import java.util.Collections;
|
|||
import java.util.Date;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -187,7 +186,7 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
|
|||
AnalysisConfig.Builder ac = new AnalysisConfig.Builder(Arrays.asList(d1.build(), d2.build()));
|
||||
ac.setSummaryCountFieldName("agg");
|
||||
|
||||
List<String> analysisFields = ac.build().analysisFields();
|
||||
Set<String> analysisFields = ac.build().analysisFields();
|
||||
assertTrue(analysisFields.size() == 5);
|
||||
|
||||
assertTrue(analysisFields.contains("agg"));
|
||||
|
@ -199,7 +198,6 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
|
|||
assertFalse(analysisFields.contains("max"));
|
||||
assertFalse(analysisFields.contains("median"));
|
||||
assertFalse(analysisFields.contains(""));
|
||||
assertFalse(analysisFields.contains(null));
|
||||
|
||||
Detector.Builder d3 = new Detector.Builder("count", null);
|
||||
d3.setByFieldName("by2");
|
||||
|
@ -221,7 +219,6 @@ public class JobTests extends AbstractSerializingTestCase<Job> {
|
|||
assertFalse(analysisFields.contains("max"));
|
||||
assertFalse(analysisFields.contains("median"));
|
||||
assertFalse(analysisFields.contains(""));
|
||||
assertFalse(analysisFields.contains(null));
|
||||
}
|
||||
|
||||
// JobConfigurationVerifierTests:
|
||||
|
|
Loading…
Reference in New Issue