[Rollup] Move toBuilders() methods out of rollup config objects (#32585)

This commit is contained in:
Tanguy Leroux 2018-08-27 09:18:26 +02:00 committed by GitHub
parent 974f839093
commit e1e8cf382f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 236 additions and 197 deletions

View File

@ -20,16 +20,11 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.DateHistogramValuesSourceBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.xpack.core.rollup.RollupField;
import org.joda.time.DateTimeZone; import org.joda.time.DateTimeZone;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -182,19 +177,6 @@ public class DateHistogramGroupConfig implements Writeable, ToXContentObject {
return createRounding(interval.toString(), timeZone); return createRounding(interval.toString(), timeZone);
} }
/**
* This returns a set of aggregation builders which represent the configured
* set of date histograms. Used by the rollup indexer to iterate over historical data
*/
public List<CompositeValuesSourceBuilder<?>> toBuilders() {
DateHistogramValuesSourceBuilder vsBuilder =
new DateHistogramValuesSourceBuilder(RollupField.formatIndexerAggName(field, DateHistogramAggregationBuilder.NAME));
vsBuilder.dateHistogramInterval(interval);
vsBuilder.field(field);
vsBuilder.timeZone(toDateTimeZone(timeZone));
return Collections.singletonList(vsBuilder);
}
public void validateMappings(Map<String, Map<String, FieldCapabilities>> fieldCapsResponse, public void validateMappings(Map<String, Map<String, FieldCapabilities>> fieldCapsResponse,
ActionRequestValidationException validationException) { ActionRequestValidationException validationException) {

View File

@ -16,18 +16,13 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.HistogramValuesSourceBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.RollupField;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.stream.Collectors;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
@ -85,25 +80,6 @@ public class HistogramGroupConfig implements Writeable, ToXContentObject {
return fields; return fields;
} }
/**
* This returns a set of aggregation builders which represent the configured
* set of histograms. Used by the rollup indexer to iterate over historical data
*/
public List<CompositeValuesSourceBuilder<?>> toBuilders() {
if (fields.length == 0) {
return Collections.emptyList();
}
return Arrays.stream(fields).map(f -> {
HistogramValuesSourceBuilder vsBuilder
= new HistogramValuesSourceBuilder(RollupField.formatIndexerAggName(f, HistogramAggregationBuilder.NAME));
vsBuilder.interval(interval);
vsBuilder.field(f);
vsBuilder.missingBucket(true);
return vsBuilder;
}).collect(Collectors.toList());
}
public void validateMappings(Map<String, Map<String, FieldCapabilities>> fieldCapsResponse, public void validateMappings(Map<String, Map<String, FieldCapabilities>> fieldCapsResponse,
ActionRequestValidationException validationException) { ActionRequestValidationException validationException) {

View File

@ -16,18 +16,9 @@ import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.min.MinAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.RollupField;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -53,11 +44,11 @@ import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constru
public class MetricConfig implements Writeable, ToXContentObject { public class MetricConfig implements Writeable, ToXContentObject {
// TODO: replace these with an enum // TODO: replace these with an enum
private static final ParseField MIN = new ParseField("min"); public static final ParseField MIN = new ParseField("min");
private static final ParseField MAX = new ParseField("max"); public static final ParseField MAX = new ParseField("max");
private static final ParseField SUM = new ParseField("sum"); public static final ParseField SUM = new ParseField("sum");
private static final ParseField AVG = new ParseField("avg"); public static final ParseField AVG = new ParseField("avg");
private static final ParseField VALUE_COUNT = new ParseField("value_count"); public static final ParseField VALUE_COUNT = new ParseField("value_count");
static final String NAME = "metrics"; static final String NAME = "metrics";
private static final String FIELD = "field"; private static final String FIELD = "field";
@ -111,46 +102,6 @@ public class MetricConfig implements Writeable, ToXContentObject {
return metrics; return metrics;
} }
/**
* This returns a set of aggregation builders which represent the configured
* set of metrics. Used by the rollup indexer to iterate over historical data
*/
public List<ValuesSourceAggregationBuilder.LeafOnly> toBuilders() {
if (metrics.size() == 0) {
return Collections.emptyList();
}
List<ValuesSourceAggregationBuilder.LeafOnly> aggs = new ArrayList<>(metrics.size());
for (String metric : metrics) {
ValuesSourceAggregationBuilder.LeafOnly newBuilder;
if (metric.equals(MIN.getPreferredName())) {
newBuilder = new MinAggregationBuilder(RollupField.formatFieldName(field, MinAggregationBuilder.NAME, RollupField.VALUE));
} else if (metric.equals(MAX.getPreferredName())) {
newBuilder = new MaxAggregationBuilder(RollupField.formatFieldName(field, MaxAggregationBuilder.NAME, RollupField.VALUE));
} else if (metric.equals(AVG.getPreferredName())) {
// Avgs are sum + count
newBuilder = new SumAggregationBuilder(RollupField.formatFieldName(field, AvgAggregationBuilder.NAME, RollupField.VALUE));
ValuesSourceAggregationBuilder.LeafOnly countBuilder
= new ValueCountAggregationBuilder(
RollupField.formatFieldName(field, AvgAggregationBuilder.NAME, RollupField.COUNT_FIELD), ValueType.NUMERIC);
countBuilder.field(field);
aggs.add(countBuilder);
} else if (metric.equals(SUM.getPreferredName())) {
newBuilder = new SumAggregationBuilder(RollupField.formatFieldName(field, SumAggregationBuilder.NAME, RollupField.VALUE));
} else if (metric.equals(VALUE_COUNT.getPreferredName())) {
// TODO allow non-numeric value_counts.
// Hardcoding this is fine for now since the job validation guarantees that all metric fields are numerics
newBuilder = new ValueCountAggregationBuilder(
RollupField.formatFieldName(field, ValueCountAggregationBuilder.NAME, RollupField.VALUE), ValueType.NUMERIC);
} else {
throw new IllegalArgumentException("Unsupported metric type [" + metric + "]");
}
newBuilder.field(field);
aggs.add(newBuilder);
}
return aggs;
}
public void validateMappings(Map<String, Map<String, FieldCapabilities>> fieldCapsResponse, public void validateMappings(Map<String, Map<String, FieldCapabilities>> fieldCapsResponse,
ActionRequestValidationException validationException) { ActionRequestValidationException validationException) {

View File

@ -18,16 +18,11 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.KeywordFieldMapper; import org.elasticsearch.index.mapper.KeywordFieldMapper;
import org.elasticsearch.index.mapper.TextFieldMapper; import org.elasticsearch.index.mapper.TextFieldMapper;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.xpack.core.rollup.RollupField;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.stream.Collectors;
import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg; import static org.elasticsearch.common.xcontent.ConstructingObjectParser.constructorArg;
@ -79,20 +74,6 @@ public class TermsGroupConfig implements Writeable, ToXContentObject {
return fields; return fields;
} }
/**
* This returns a set of aggregation builders which represent the configured
* set of date histograms. Used by the rollup indexer to iterate over historical data
*/
public List<CompositeValuesSourceBuilder<?>> toBuilders() {
return Arrays.stream(fields).map(f -> {
TermsValuesSourceBuilder vsBuilder
= new TermsValuesSourceBuilder(RollupField.formatIndexerAggName(f, TermsAggregationBuilder.NAME));
vsBuilder.field(f);
vsBuilder.missingBucket(true);
return vsBuilder;
}).collect(Collectors.toList());
}
public void validateMappings(Map<String, Map<String, FieldCapabilities>> fieldCapsResponse, public void validateMappings(Map<String, Map<String, FieldCapabilities>> fieldCapsResponse,
ActionRequestValidationException validationException) { ActionRequestValidationException validationException) {

View File

@ -9,19 +9,16 @@ import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.fieldcaps.FieldCapabilities; import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
import org.elasticsearch.test.AbstractSerializingTestCase; import org.elasticsearch.test.AbstractSerializingTestCase;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import static org.elasticsearch.xpack.core.rollup.ConfigTestHelpers.randomTermsGroupConfig; import static org.elasticsearch.xpack.core.rollup.ConfigTestHelpers.randomTermsGroupConfig;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TermsGroupConfigSerializingTests extends AbstractSerializingTestCase<TermsGroupConfig> { public class TermsGroupConfigSerializingTests extends AbstractSerializingTestCase<TermsGroupConfig> {
@ -77,62 +74,4 @@ public class TermsGroupConfigSerializingTests extends AbstractSerializingTestCas
assertThat(e.validationErrors().get(0), equalTo("The field referenced by a terms group must be a [numeric] or " + assertThat(e.validationErrors().get(0), equalTo("The field referenced by a terms group must be a [numeric] or " +
"[keyword/text] type, but found [geo_point] for field [my_field]")); "[keyword/text] type, but found [geo_point] for field [my_field]"));
} }
public void testValidateFieldMatchingNotAggregatable() {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
when(fieldCaps.isAggregatable()).thenReturn(false);
responseMap.put("my_field", Collections.singletonMap(getRandomType(), fieldCaps));
TermsGroupConfig config = new TermsGroupConfig("my_field");
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().get(0), equalTo("The field [my_field] must be aggregatable across all indices, but is not."));
}
public void testValidateMatchingField() {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
String type = getRandomType();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
when(fieldCaps.isAggregatable()).thenReturn(true);
responseMap.put("my_field", Collections.singletonMap(type, fieldCaps));
TermsGroupConfig config = new TermsGroupConfig("my_field");
config.validateMappings(responseMap, e);
if (e.validationErrors().size() != 0) {
fail(e.getMessage());
}
List<CompositeValuesSourceBuilder<?>> builders = config.toBuilders();
assertThat(builders.size(), equalTo(1));
}
private String getRandomType() {
int n = randomIntBetween(0,8);
if (n == 0) {
return "keyword";
} else if (n == 1) {
return "text";
} else if (n == 2) {
return "long";
} else if (n == 3) {
return "integer";
} else if (n == 4) {
return "short";
} else if (n == 5) {
return "float";
} else if (n == 6) {
return "double";
} else if (n == 7) {
return "scaled_float";
} else if (n == 8) {
return "half_float";
}
return "long";
}
} }

View File

@ -15,19 +15,35 @@ import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder; import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.DateHistogramValuesSourceBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.HistogramValuesSourceBuilder;
import org.elasticsearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.min.MinAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.valuecount.ValueCountAggregationBuilder;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.xpack.core.rollup.RollupField; import org.elasticsearch.xpack.core.rollup.RollupField;
import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig; import org.elasticsearch.xpack.core.rollup.job.DateHistogramGroupConfig;
import org.elasticsearch.xpack.core.rollup.job.GroupConfig; import org.elasticsearch.xpack.core.rollup.job.GroupConfig;
import org.elasticsearch.xpack.core.rollup.job.HistogramGroupConfig; import org.elasticsearch.xpack.core.rollup.job.HistogramGroupConfig;
import org.elasticsearch.xpack.core.rollup.job.IndexerState; import org.elasticsearch.xpack.core.rollup.job.IndexerState;
import org.elasticsearch.xpack.core.rollup.job.MetricConfig;
import org.elasticsearch.xpack.core.rollup.job.RollupJob; import org.elasticsearch.xpack.core.rollup.job.RollupJob;
import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig; import org.elasticsearch.xpack.core.rollup.job.RollupJobConfig;
import org.elasticsearch.xpack.core.rollup.job.RollupJobStats; import org.elasticsearch.xpack.core.rollup.job.RollupJobStats;
import org.elasticsearch.xpack.core.rollup.job.TermsGroupConfig;
import org.joda.time.DateTimeZone;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -38,6 +54,10 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.singletonList;
import static java.util.Collections.unmodifiableList;
import static org.elasticsearch.xpack.core.rollup.RollupField.formatFieldName;
/** /**
* An abstract class that builds a rollup index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)}, * An abstract class that builds a rollup index incrementally. A background job can be launched using {@link #maybeTriggerAsyncJob(long)},
* it will create the rollup index from the source index up to the last complete bucket that is allowed to be built (based on the current * it will create the rollup index from the source index up to the last complete bucket that is allowed to be built (based on the current
@ -392,21 +412,12 @@ public abstract class RollupIndexer {
*/ */
private CompositeAggregationBuilder createCompositeBuilder(RollupJobConfig config) { private CompositeAggregationBuilder createCompositeBuilder(RollupJobConfig config) {
final GroupConfig groupConfig = config.getGroupConfig(); final GroupConfig groupConfig = config.getGroupConfig();
List<CompositeValuesSourceBuilder<?>> builders = new ArrayList<>(); List<CompositeValuesSourceBuilder<?>> builders = createValueSourceBuilders(groupConfig);
// Add all the agg builders to our request in order: date_histo -> histo -> terms
if (groupConfig != null) {
builders.addAll(groupConfig.getDateHistogram().toBuilders());
if (groupConfig.getHistogram() != null) {
builders.addAll(groupConfig.getHistogram().toBuilders());
}
if (groupConfig.getTerms() != null) {
builders.addAll(groupConfig.getTerms().toBuilders());
}
}
CompositeAggregationBuilder composite = new CompositeAggregationBuilder(AGGREGATION_NAME, builders); CompositeAggregationBuilder composite = new CompositeAggregationBuilder(AGGREGATION_NAME, builders);
config.getMetricsConfig().forEach(m -> m.toBuilders().forEach(composite::subAggregation));
List<AggregationBuilder> aggregations = createAggregationBuilders(config.getMetricsConfig());
aggregations.forEach(composite::subAggregation);
final Map<String, Object> metadata = createMetadata(groupConfig); final Map<String, Object> metadata = createMetadata(groupConfig);
if (metadata.isEmpty() == false) { if (metadata.isEmpty() == false) {
@ -456,5 +467,112 @@ public abstract class RollupIndexer {
} }
return metadata; return metadata;
} }
public static List<CompositeValuesSourceBuilder<?>> createValueSourceBuilders(final GroupConfig groupConfig) {
final List<CompositeValuesSourceBuilder<?>> builders = new ArrayList<>();
// Add all the agg builders to our request in order: date_histo -> histo -> terms
if (groupConfig != null) {
final DateHistogramGroupConfig dateHistogram = groupConfig.getDateHistogram();
builders.addAll(createValueSourceBuilders(dateHistogram));
final HistogramGroupConfig histogram = groupConfig.getHistogram();
builders.addAll(createValueSourceBuilders(histogram));
final TermsGroupConfig terms = groupConfig.getTerms();
builders.addAll(createValueSourceBuilders(terms));
}
return unmodifiableList(builders);
}
public static List<CompositeValuesSourceBuilder<?>> createValueSourceBuilders(final DateHistogramGroupConfig dateHistogram) {
final String dateHistogramField = dateHistogram.getField();
final String dateHistogramName = RollupField.formatIndexerAggName(dateHistogramField, DateHistogramAggregationBuilder.NAME);
final DateHistogramValuesSourceBuilder dateHistogramBuilder = new DateHistogramValuesSourceBuilder(dateHistogramName);
dateHistogramBuilder.dateHistogramInterval(dateHistogram.getInterval());
dateHistogramBuilder.field(dateHistogramField);
dateHistogramBuilder.timeZone(toDateTimeZone(dateHistogram.getTimeZone()));
return singletonList(dateHistogramBuilder);
}
public static List<CompositeValuesSourceBuilder<?>> createValueSourceBuilders(final HistogramGroupConfig histogram) {
final List<CompositeValuesSourceBuilder<?>> builders = new ArrayList<>();
if (histogram != null) {
for (String field : histogram.getFields()) {
final String histogramName = RollupField.formatIndexerAggName(field, HistogramAggregationBuilder.NAME);
final HistogramValuesSourceBuilder histogramBuilder = new HistogramValuesSourceBuilder(histogramName);
histogramBuilder.interval(histogram.getInterval());
histogramBuilder.field(field);
histogramBuilder.missingBucket(true);
builders.add(histogramBuilder);
}
}
return unmodifiableList(builders);
}
public static List<CompositeValuesSourceBuilder<?>> createValueSourceBuilders(final TermsGroupConfig terms) {
final List<CompositeValuesSourceBuilder<?>> builders = new ArrayList<>();
if (terms != null) {
for (String field : terms.getFields()) {
final String termsName = RollupField.formatIndexerAggName(field, TermsAggregationBuilder.NAME);
final TermsValuesSourceBuilder termsBuilder = new TermsValuesSourceBuilder(termsName);
termsBuilder.field(field);
termsBuilder.missingBucket(true);
builders.add(termsBuilder);
}
}
return unmodifiableList(builders);
}
/**
* This returns a set of aggregation builders which represent the configured
* set of metrics. Used to iterate over historical data.
*/
static List<AggregationBuilder> createAggregationBuilders(final List<MetricConfig> metricsConfigs) {
final List<AggregationBuilder> builders = new ArrayList<>();
if (metricsConfigs != null) {
for (MetricConfig metricConfig : metricsConfigs) {
final List<String> metrics = metricConfig.getMetrics();
if (metrics.isEmpty() == false) {
final String field = metricConfig.getField();
for (String metric : metrics) {
ValuesSourceAggregationBuilder.LeafOnly newBuilder;
if (metric.equals(MetricConfig.MIN.getPreferredName())) {
newBuilder = new MinAggregationBuilder(formatFieldName(field, MinAggregationBuilder.NAME, RollupField.VALUE));
} else if (metric.equals(MetricConfig.MAX.getPreferredName())) {
newBuilder = new MaxAggregationBuilder(formatFieldName(field, MaxAggregationBuilder.NAME, RollupField.VALUE));
} else if (metric.equals(MetricConfig.AVG.getPreferredName())) {
// Avgs are sum + count
newBuilder = new SumAggregationBuilder(formatFieldName(field, AvgAggregationBuilder.NAME, RollupField.VALUE));
ValuesSourceAggregationBuilder.LeafOnly countBuilder
= new ValueCountAggregationBuilder(
formatFieldName(field, AvgAggregationBuilder.NAME, RollupField.COUNT_FIELD), ValueType.NUMERIC);
countBuilder.field(field);
builders.add(countBuilder);
} else if (metric.equals(MetricConfig.SUM.getPreferredName())) {
newBuilder = new SumAggregationBuilder(formatFieldName(field, SumAggregationBuilder.NAME, RollupField.VALUE));
} else if (metric.equals(MetricConfig.VALUE_COUNT.getPreferredName())) {
// TODO allow non-numeric value_counts.
// Hardcoding this is fine for now since the job validation guarantees that all metric fields are numerics
newBuilder = new ValueCountAggregationBuilder(
formatFieldName(field, ValueCountAggregationBuilder.NAME, RollupField.VALUE), ValueType.NUMERIC);
} else {
throw new IllegalArgumentException("Unsupported metric type [" + metric + "]");
}
newBuilder.field(field);
builders.add(newBuilder);
}
}
}
}
return unmodifiableList(builders);
}
private static DateTimeZone toDateTimeZone(final String timezone) {
try {
return DateTimeZone.forOffsetHours(Integer.parseInt(timezone));
} catch (NumberFormatException e) {
return DateTimeZone.forID(timezone);
}
}
} }

View File

@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.rollup.action.job;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.fieldcaps.FieldCapabilities;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.core.rollup.job.TermsGroupConfig;
import org.elasticsearch.xpack.rollup.job.RollupIndexer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class RollupIndexTests extends ESTestCase {
public void testValidateMatchingField() {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
String type = getRandomType();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
when(fieldCaps.isAggregatable()).thenReturn(true);
responseMap.put("my_field", Collections.singletonMap(type, fieldCaps));
TermsGroupConfig config = new TermsGroupConfig("my_field");
config.validateMappings(responseMap, e);
if (e.validationErrors().size() != 0) {
fail(e.getMessage());
}
List<CompositeValuesSourceBuilder<?>> builders = RollupIndexer.createValueSourceBuilders(config);
assertThat(builders.size(), equalTo(1));
}
public void testValidateFieldMatchingNotAggregatable() {
ActionRequestValidationException e = new ActionRequestValidationException();
Map<String, Map<String, FieldCapabilities>> responseMap = new HashMap<>();
// Have to mock fieldcaps because the ctor's aren't public...
FieldCapabilities fieldCaps = mock(FieldCapabilities.class);
when(fieldCaps.isAggregatable()).thenReturn(false);
responseMap.put("my_field", Collections.singletonMap(getRandomType(), fieldCaps));
TermsGroupConfig config = new TermsGroupConfig("my_field");
config.validateMappings(responseMap, e);
assertThat(e.validationErrors().get(0), equalTo("The field [my_field] must be aggregatable across all indices, but is not."));
}
private String getRandomType() {
int n = randomIntBetween(0,8);
if (n == 0) {
return "keyword";
} else if (n == 1) {
return "text";
} else if (n == 2) {
return "long";
} else if (n == 3) {
return "integer";
} else if (n == 4) {
return "short";
} else if (n == 5) {
return "float";
} else if (n == 6) {
return "double";
} else if (n == 7) {
return "scaled_float";
} else if (n == 8) {
return "half_float";
}
return "long";
}
}

View File

@ -21,6 +21,7 @@ import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.index.mapper.NumberFieldMapper;
import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.Aggregations; import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorTestCase; import org.elasticsearch.search.aggregations.AggregatorTestCase;
@ -57,6 +58,7 @@ import static java.util.Collections.singletonList;
import static org.elasticsearch.xpack.core.rollup.ConfigTestHelpers.randomDateHistogramGroupConfig; import static org.elasticsearch.xpack.core.rollup.ConfigTestHelpers.randomDateHistogramGroupConfig;
import static org.elasticsearch.xpack.core.rollup.ConfigTestHelpers.randomGroupConfig; import static org.elasticsearch.xpack.core.rollup.ConfigTestHelpers.randomGroupConfig;
import static org.elasticsearch.xpack.core.rollup.ConfigTestHelpers.randomHistogramGroupConfig; import static org.elasticsearch.xpack.core.rollup.ConfigTestHelpers.randomHistogramGroupConfig;
import static org.elasticsearch.xpack.rollup.job.RollupIndexer.createAggregationBuilders;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -101,9 +103,11 @@ public class IndexerUtilsTests extends AggregatorTestCase {
//TODO swap this over to DateHistoConfig.Builder once DateInterval is in //TODO swap this over to DateHistoConfig.Builder once DateInterval is in
DateHistogramGroupConfig dateHistoGroupConfig = new DateHistogramGroupConfig(timestampField, DateHistogramInterval.DAY); DateHistogramGroupConfig dateHistoGroupConfig = new DateHistogramGroupConfig(timestampField, DateHistogramInterval.DAY);
CompositeAggregationBuilder compositeBuilder = CompositeAggregationBuilder compositeBuilder =
new CompositeAggregationBuilder(RollupIndexer.AGGREGATION_NAME, dateHistoGroupConfig.toBuilders()); new CompositeAggregationBuilder(RollupIndexer.AGGREGATION_NAME,
RollupIndexer.createValueSourceBuilders(dateHistoGroupConfig));
MetricConfig metricConfig = new MetricConfig("does_not_exist", singletonList("max")); MetricConfig metricConfig = new MetricConfig("does_not_exist", singletonList("max"));
metricConfig.toBuilders().forEach(compositeBuilder::subAggregation); List<AggregationBuilder> metricAgg = createAggregationBuilders(singletonList(metricConfig));
metricAgg.forEach(compositeBuilder::subAggregation);
Aggregator aggregator = createAggregator(compositeBuilder, indexSearcher, timestampFieldType, valueFieldType); Aggregator aggregator = createAggregator(compositeBuilder, indexSearcher, timestampFieldType, valueFieldType);
aggregator.preCollection(); aggregator.preCollection();
@ -170,7 +174,8 @@ public class IndexerUtilsTests extends AggregatorTestCase {
singletonList(dateHisto)); singletonList(dateHisto));
MetricConfig metricConfig = new MetricConfig(valueField, singletonList("max")); MetricConfig metricConfig = new MetricConfig(valueField, singletonList("max"));
metricConfig.toBuilders().forEach(compositeBuilder::subAggregation); List<AggregationBuilder> metricAgg = createAggregationBuilders(singletonList(metricConfig));
metricAgg.forEach(compositeBuilder::subAggregation);
Aggregator aggregator = createAggregator(compositeBuilder, indexSearcher, timestampFieldType, valueFieldType); Aggregator aggregator = createAggregator(compositeBuilder, indexSearcher, timestampFieldType, valueFieldType);
aggregator.preCollection(); aggregator.preCollection();
@ -226,7 +231,8 @@ public class IndexerUtilsTests extends AggregatorTestCase {
singletonList(terms)); singletonList(terms));
MetricConfig metricConfig = new MetricConfig(valueField, singletonList("max")); MetricConfig metricConfig = new MetricConfig(valueField, singletonList("max"));
metricConfig.toBuilders().forEach(compositeBuilder::subAggregation); List<AggregationBuilder> metricAgg = createAggregationBuilders(singletonList(metricConfig));
metricAgg.forEach(compositeBuilder::subAggregation);
Aggregator aggregator = createAggregator(compositeBuilder, indexSearcher, valueFieldType); Aggregator aggregator = createAggregator(compositeBuilder, indexSearcher, valueFieldType);
aggregator.preCollection(); aggregator.preCollection();
@ -292,7 +298,8 @@ public class IndexerUtilsTests extends AggregatorTestCase {
singletonList(dateHisto)); singletonList(dateHisto));
MetricConfig metricConfig = new MetricConfig("another_field", Arrays.asList("avg", "sum")); MetricConfig metricConfig = new MetricConfig("another_field", Arrays.asList("avg", "sum"));
metricConfig.toBuilders().forEach(compositeBuilder::subAggregation); List<AggregationBuilder> metricAgg = createAggregationBuilders(singletonList(metricConfig));
metricAgg.forEach(compositeBuilder::subAggregation);
Aggregator aggregator = createAggregator(compositeBuilder, indexSearcher, timestampFieldType, valueFieldType); Aggregator aggregator = createAggregator(compositeBuilder, indexSearcher, timestampFieldType, valueFieldType);
aggregator.preCollection(); aggregator.preCollection();
@ -523,11 +530,13 @@ public class IndexerUtilsTests extends AggregatorTestCase {
// Setup the composite agg // Setup the composite agg
TermsGroupConfig termsGroupConfig = new TermsGroupConfig(valueField); TermsGroupConfig termsGroupConfig = new TermsGroupConfig(valueField);
CompositeAggregationBuilder compositeBuilder = new CompositeAggregationBuilder(RollupIndexer.AGGREGATION_NAME, CompositeAggregationBuilder compositeBuilder =
termsGroupConfig.toBuilders()).size(numDocs*2); new CompositeAggregationBuilder(RollupIndexer.AGGREGATION_NAME, RollupIndexer.createValueSourceBuilders(termsGroupConfig))
.size(numDocs*2);
MetricConfig metricConfig = new MetricConfig(metricField, singletonList("max")); MetricConfig metricConfig = new MetricConfig(metricField, singletonList("max"));
metricConfig.toBuilders().forEach(compositeBuilder::subAggregation); List<AggregationBuilder> metricAgg = createAggregationBuilders(singletonList(metricConfig));
metricAgg.forEach(compositeBuilder::subAggregation);
Aggregator aggregator = createAggregator(compositeBuilder, indexSearcher, valueFieldType, metricFieldType); Aggregator aggregator = createAggregator(compositeBuilder, indexSearcher, valueFieldType, metricFieldType);
aggregator.preCollection(); aggregator.preCollection();