[ML] Do not write JSON records when metric value is not finite (elastic/x-pack-elasticsearch#1849)

relates elastic/x-pack-elasticsearch#1847

Original commit: elastic/x-pack-elasticsearch@26a69b840f
This commit is contained in:
Dimitris Athanasiou 2017-06-26 14:45:00 +01:00 committed by GitHub
parent 3db3cd0f0b
commit bd06a7b9b4
2 changed files with 40 additions and 19 deletions

View File

@ -94,6 +94,12 @@ class AggregationToJsonProcessor implements Releasable {
} }
private void processNestedAggs(long docCount, List<Aggregation> aggs) throws IOException { private void processNestedAggs(long docCount, List<Aggregation> aggs) throws IOException {
if (aggs.isEmpty()) {
// This means we reached a bucket aggregation without sub-aggs. Thus, we can flush the path written so far.
writeJsonObject(docCount);
return;
}
boolean processedBucketAgg = false; boolean processedBucketAgg = false;
List<String> addedLeafKeys = new ArrayList<>(); List<String> addedLeafKeys = new ArrayList<>();
for (Aggregation agg : aggs) { for (Aggregation agg : aggs) {
@ -111,18 +117,18 @@ class AggregationToJsonProcessor implements Releasable {
if (processedBucketAgg) { if (processedBucketAgg) {
throw new IllegalArgumentException("Mixing bucket and leaf aggregations at the same level is not supported"); throw new IllegalArgumentException("Mixing bucket and leaf aggregations at the same level is not supported");
} }
addedLeafKeys.add(processLeaf(agg)); String addedKey = processLeaf(agg);
if (addedKey != null) {
addedLeafKeys.add(addedKey);
} }
} }
} }
}
if (addedLeafKeys.isEmpty() == false) { if (addedLeafKeys.isEmpty() == false) {
writeJsonObject(docCount); writeJsonObject(docCount);
addedLeafKeys.forEach(k -> keyValuePairs.remove(k)); addedLeafKeys.forEach(k -> keyValuePairs.remove(k));
} }
if (processedBucketAgg == false && addedLeafKeys.isEmpty()) {
writeJsonObject(docCount);
}
} }
private void processBucket(MultiBucketsAggregation bucketAgg) throws IOException { private void processBucket(MultiBucketsAggregation bucketAgg) throws IOException {
@ -133,27 +139,41 @@ class AggregationToJsonProcessor implements Releasable {
} }
} }
/**
* Adds a leaf key-value. It returns the name of the key added or {@code null} when nothing was added.
* Non-finite metric values are not added.
*/
@Nullable
private String processLeaf(Aggregation agg) throws IOException { private String processLeaf(Aggregation agg) throws IOException {
if (agg instanceof NumericMetricsAggregation.SingleValue) { if (agg instanceof NumericMetricsAggregation.SingleValue) {
processSingleValue((NumericMetricsAggregation.SingleValue) agg); return processSingleValue((NumericMetricsAggregation.SingleValue) agg);
} else if (agg instanceof Percentiles) { } else if (agg instanceof Percentiles) {
processPercentiles((Percentiles) agg); return processPercentiles((Percentiles) agg);
} else { } else {
throw new IllegalArgumentException("Unsupported aggregation type [" + agg.getName() + "]"); throw new IllegalArgumentException("Unsupported aggregation type [" + agg.getName() + "]");
} }
return agg.getName();
} }
private void processSingleValue(NumericMetricsAggregation.SingleValue singleValue) throws IOException { private String processSingleValue(NumericMetricsAggregation.SingleValue singleValue) throws IOException {
keyValuePairs.put(singleValue.getName(), singleValue.value()); return addMetricIfFinite(singleValue.getName(), singleValue.value());
} }
private void processPercentiles(Percentiles percentiles) throws IOException { @Nullable
private String addMetricIfFinite(String key, double value) {
if (Double.isFinite(value)) {
keyValuePairs.put(key, value);
return key;
}
return null;
}
private String processPercentiles(Percentiles percentiles) throws IOException {
Iterator<Percentile> percentileIterator = percentiles.iterator(); Iterator<Percentile> percentileIterator = percentiles.iterator();
keyValuePairs.put(percentiles.getName(), percentileIterator.next().getValue()); String addedKey = addMetricIfFinite(percentiles.getName(), percentileIterator.next().getValue());
if (percentileIterator.hasNext()) { if (percentileIterator.hasNext()) {
throw new IllegalArgumentException("Multi-percentile aggregation [" + percentiles.getName() + "] is not supported"); throw new IllegalArgumentException("Multi-percentile aggregation [" + percentiles.getName() + "] is not supported");
} }
return addedKey;
} }
private void writeJsonObject(long docCount) throws IOException { private void writeJsonObject(long docCount) throws IOException {

View File

@ -89,13 +89,15 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
List<Histogram.Bucket> histogramBuckets = Arrays.asList( List<Histogram.Bucket> histogramBuckets = Arrays.asList(
createHistogramBucket(1000L, 3, Arrays.asList( createHistogramBucket(1000L, 3, Arrays.asList(
createMax("time", 1000), createSingleValue("my_value", 1.0))), createMax("time", 1000), createSingleValue("my_value", 1.0))),
createHistogramBucket(2000L, 5, Arrays.asList( createHistogramBucket(2000L, 3, Arrays.asList(
createMax("time", 2000), createSingleValue("my_value", 2.0))) createMax("time", 2000), createSingleValue("my_value", Double.NEGATIVE_INFINITY))),
createHistogramBucket(3000L, 5, Arrays.asList(
createMax("time", 3000), createSingleValue("my_value", 3.0)))
); );
String json = aggToString("time", Sets.newHashSet("my_value"), histogramBuckets); String json = aggToString("time", Sets.newHashSet("my_value"), histogramBuckets);
assertThat(json, equalTo("{\"time\":1000,\"my_value\":1.0,\"doc_count\":3} {\"time\":2000,\"my_value\":2.0,\"doc_count\":5}")); assertThat(json, equalTo("{\"time\":1000,\"my_value\":1.0,\"doc_count\":3} {\"time\":3000,\"my_value\":3.0,\"doc_count\":5}"));
} }
public void testProcessGivenTermsPerHistogram() throws IOException { public void testProcessGivenTermsPerHistogram() throws IOException {
@ -154,7 +156,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
a1NumericAggs.put("my_value", 111.0); a1NumericAggs.put("my_value", 111.0);
a1NumericAggs.put("my_value2", 112.0); a1NumericAggs.put("my_value2", 112.0);
Map<String, Double> b1NumericAggs = new LinkedHashMap<>(); Map<String, Double> b1NumericAggs = new LinkedHashMap<>();
b1NumericAggs.put("my_value", 121.0); b1NumericAggs.put("my_value", Double.POSITIVE_INFINITY);
b1NumericAggs.put("my_value2", 122.0); b1NumericAggs.put("my_value2", 122.0);
Map<String, Double> c1NumericAggs = new LinkedHashMap<>(); Map<String, Double> c1NumericAggs = new LinkedHashMap<>();
c1NumericAggs.put("my_value", 131.0); c1NumericAggs.put("my_value", 131.0);
@ -188,7 +190,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
String json = aggToString("time", Sets.newHashSet("my_field", "my_value", "my_value2"), false, histogramBuckets); String json = aggToString("time", Sets.newHashSet("my_field", "my_value", "my_value2"), false, histogramBuckets);
assertThat(json, equalTo("{\"time\":1000,\"my_field\":\"a\",\"my_value\":111.0,\"my_value2\":112.0} " + assertThat(json, equalTo("{\"time\":1000,\"my_field\":\"a\",\"my_value\":111.0,\"my_value2\":112.0} " +
"{\"time\":1000,\"my_field\":\"b\",\"my_value\":121.0,\"my_value2\":122.0} " + "{\"time\":1000,\"my_field\":\"b\",\"my_value2\":122.0} " +
"{\"time\":1000,\"my_field\":\"c\",\"my_value\":131.0,\"my_value2\":132.0} " + "{\"time\":1000,\"my_field\":\"c\",\"my_value\":131.0,\"my_value2\":132.0} " +
"{\"time\":2000,\"my_field\":\"a\",\"my_value\":211.0,\"my_value2\":212.0} " + "{\"time\":2000,\"my_field\":\"a\",\"my_value\":211.0,\"my_value2\":212.0} " +
"{\"time\":2000,\"my_field\":\"b\",\"my_value\":221.0,\"my_value2\":222.0} " + "{\"time\":2000,\"my_field\":\"b\",\"my_value\":221.0,\"my_value2\":222.0} " +
@ -279,7 +281,7 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
createHistogramBucket(2000L, 7, Arrays.asList( createHistogramBucket(2000L, 7, Arrays.asList(
createMax("time", 2000), createPercentiles("my_field", 2.0))), createMax("time", 2000), createPercentiles("my_field", 2.0))),
createHistogramBucket(3000L, 10, Arrays.asList( createHistogramBucket(3000L, 10, Arrays.asList(
createMax("time", 3000), createPercentiles("my_field", 3.0))), createMax("time", 3000), createPercentiles("my_field", Double.NEGATIVE_INFINITY))),
createHistogramBucket(4000L, 14, Arrays.asList( createHistogramBucket(4000L, 14, Arrays.asList(
createMax("time", 4000), createPercentiles("my_field", 4.0))) createMax("time", 4000), createPercentiles("my_field", 4.0)))
); );
@ -288,7 +290,6 @@ public class AggregationToJsonProcessorTests extends ESTestCase {
assertThat(json, equalTo("{\"time\":1000,\"my_field\":1.0,\"doc_count\":4} " + assertThat(json, equalTo("{\"time\":1000,\"my_field\":1.0,\"doc_count\":4} " +
"{\"time\":2000,\"my_field\":2.0,\"doc_count\":7} " + "{\"time\":2000,\"my_field\":2.0,\"doc_count\":7} " +
"{\"time\":3000,\"my_field\":3.0,\"doc_count\":10} " +
"{\"time\":4000,\"my_field\":4.0,\"doc_count\":14}")); "{\"time\":4000,\"my_field\":4.0,\"doc_count\":14}"));
} }