[7.x] Extensibility for Composite Agg #59648 (#60842)

This commit is contained in:
Mark Tozzi 2020-08-11 09:14:33 -04:00 committed by GitHub
parent 839c6cdfc0
commit ab8518fb5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 830 additions and 221 deletions

View File

@ -32,6 +32,9 @@ PUT /sales
"shop": {
"type": "keyword"
},
"location": {
"type": "geo_point"
},
"nested": {
"type": "nested",
"properties": {

View File

@ -119,13 +119,13 @@ import org.elasticsearch.search.aggregations.bucket.geogrid.InternalGeoTileGrid;
import org.elasticsearch.search.aggregations.bucket.global.GlobalAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.global.InternalGlobal;
import org.elasticsearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.VariableWidthHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.HistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalAutoDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalVariableWidthHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalDateHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.InternalVariableWidthHistogram;
import org.elasticsearch.search.aggregations.bucket.histogram.VariableWidthHistogramAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.missing.InternalMissing;
import org.elasticsearch.search.aggregations.bucket.missing.MissingAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.nested.InternalNested;
@ -512,8 +512,12 @@ public class SearchModule {
.setAggregatorRegistrar(GeoCentroidAggregationBuilder::registerAggregators), builder);
registerAggregation(new AggregationSpec(ScriptedMetricAggregationBuilder.NAME, ScriptedMetricAggregationBuilder::new,
ScriptedMetricAggregationBuilder.PARSER).addResultReader(InternalScriptedMetric::new), builder);
registerAggregation((new AggregationSpec(CompositeAggregationBuilder.NAME, CompositeAggregationBuilder::new,
CompositeAggregationBuilder.PARSER).addResultReader(InternalComposite::new)), builder);
registerAggregation(
new AggregationSpec(CompositeAggregationBuilder.NAME, CompositeAggregationBuilder::new, CompositeAggregationBuilder.PARSER)
.addResultReader(InternalComposite::new)
.setAggregatorRegistrar(CompositeAggregationBuilder::registerAggregators),
builder
);
registerFromPlugin(plugins, SearchPlugin::getAggregations, (agg) -> this.registerAggregation(agg, builder));
// after aggs have been registered, see if there are any new VSTypes that need to be linked to core fields

View File

@ -30,6 +30,7 @@ import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.bucket.nested.NestedAggregatorFactory;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import java.io.IOException;
import java.util.ArrayList;
@ -61,6 +62,14 @@ public class CompositeAggregationBuilder extends AbstractAggregationBuilder<Comp
PARSER.declareObject(CompositeAggregationBuilder::aggregateAfter, (p, context) -> p.map(), AFTER_FIELD_NAME);
}
public static void registerAggregators(ValuesSourceRegistry.Builder builder) {
DateHistogramValuesSourceBuilder.register(builder);
HistogramValuesSourceBuilder.register(builder);
GeoTileGridValuesSourceBuilder.register(builder);
TermsValuesSourceBuilder.register(builder);
builder.registerUsage(NAME);
}
private List<CompositeValuesSourceBuilder<?>> sources;
private Map<String, Object> after;
private int size = 10;

View File

@ -19,9 +19,7 @@
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.DocValues;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.SortedNumericDocValues;
@ -44,21 +42,18 @@ import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.RoaringDocIdSet;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.IndexSortConfig;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
import org.elasticsearch.search.aggregations.BucketCollector;
import org.elasticsearch.search.aggregations.CardinalityUpperBound;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.LeafBucketCollector;
import org.elasticsearch.search.aggregations.MultiBucketCollector;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.bucket.BucketsAggregator;
import org.elasticsearch.search.aggregations.bucket.geogrid.CellIdSource;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.searchafter.SearchAfterBuilder;
import org.elasticsearch.search.sort.SortAndFormats;
@ -110,7 +105,12 @@ final class CompositeAggregator extends BucketsAggregator {
}
this.sourceConfigs = sourceConfigs;
for (int i = 0; i < sourceConfigs.length; i++) {
this.sources[i] = createValuesSource(context.bigArrays(), context.searcher().getIndexReader(), sourceConfigs[i], size);
this.sources[i] = sourceConfigs[i].createValuesSource(
context.bigArrays(),
context.searcher().getIndexReader(),
size,
this::addRequestCircuitBreakerBytes
);
}
this.queue = new CompositeValuesCollectorQueue(context.bigArrays(), sources, size, rawAfterKey);
this.rawAfterKey = rawAfterKey;
@ -495,81 +495,6 @@ final class CompositeAggregator extends BucketsAggregator {
};
}
private SingleDimensionValuesSource<?> createValuesSource(BigArrays bigArrays, IndexReader reader,
CompositeValuesSourceConfig config, int size) {
final int reverseMul = config.reverseMul();
if (config.valuesSource() instanceof ValuesSource.Bytes.WithOrdinals && reader instanceof DirectoryReader) {
ValuesSource.Bytes.WithOrdinals vs = (ValuesSource.Bytes.WithOrdinals) config.valuesSource();
return new GlobalOrdinalValuesSource(
bigArrays,
config.fieldType(),
vs::globalOrdinalsValues,
config.format(),
config.missingBucket(),
size,
reverseMul
);
} else if (config.valuesSource() instanceof ValuesSource.Bytes) {
ValuesSource.Bytes vs = (ValuesSource.Bytes) config.valuesSource();
return new BinaryValuesSource(
bigArrays,
this::addRequestCircuitBreakerBytes,
config.fieldType(),
vs::bytesValues,
config.format(),
config.missingBucket(),
size,
reverseMul
);
} else if (config.valuesSource() instanceof CellIdSource) {
final CellIdSource cis = (CellIdSource) config.valuesSource();
return new GeoTileValuesSource(
bigArrays,
config.fieldType(),
cis::longValues,
LongUnaryOperator.identity(),
config.format(),
config.missingBucket(),
size,
reverseMul);
} else if (config.valuesSource() instanceof ValuesSource.Numeric) {
final ValuesSource.Numeric vs = (ValuesSource.Numeric) config.valuesSource();
if (vs.isFloatingPoint()) {
return new DoubleValuesSource(
bigArrays,
config.fieldType(),
vs::doubleValues,
config.format(),
config.missingBucket(),
size,
reverseMul
);
} else {
final LongUnaryOperator rounding;
if (vs instanceof RoundingValuesSource) {
rounding = ((RoundingValuesSource) vs)::round;
} else {
rounding = LongUnaryOperator.identity();
}
return new LongValuesSource(
bigArrays,
config.fieldType(),
vs::longValues,
rounding,
config.format(),
config.missingBucket(),
size,
reverseMul
);
}
} else {
throw new IllegalArgumentException("Unknown values source type: " + config.valuesSource().getClass().getName() +
" for source: " + config.name());
}
}
private static class Entry {
final LeafReaderContext context;
final DocIdSet docIdSet;

View File

@ -27,10 +27,10 @@ import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
@ -45,18 +45,13 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
protected final String name;
private String field = null;
private Script script = null;
private ValueType valueType = null;
private ValueType userValueTypeHint = null;
private boolean missingBucket = false;
private SortOrder order = SortOrder.ASC;
private String format = null;
CompositeValuesSourceBuilder(String name) {
this(name, null);
}
CompositeValuesSourceBuilder(String name, ValueType valueType) {
this.name = name;
this.valueType = valueType;
}
CompositeValuesSourceBuilder(StreamInput in) throws IOException {
@ -66,7 +61,7 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
this.script = new Script(in);
}
if (in.readBoolean()) {
this.valueType = ValueType.readFromStream(in);
this.userValueTypeHint = ValueType.readFromStream(in);
}
if (in.getVersion().onOrAfter(Version.V_6_4_0)) {
this.missingBucket = in.readBoolean();
@ -94,10 +89,10 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
if (hasScript) {
script.writeTo(out);
}
boolean hasValueType = valueType != null;
boolean hasValueType = userValueTypeHint != null;
out.writeBoolean(hasValueType);
if (hasValueType) {
valueType.writeTo(out);
userValueTypeHint.writeTo(out);
}
if (out.getVersion().onOrAfter(Version.V_6_4_0)) {
out.writeBoolean(missingBucket);
@ -127,8 +122,8 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
builder.field("script", script);
}
builder.field("missing_bucket", missingBucket);
if (valueType != null) {
builder.field("value_type", valueType.getPreferredName());
if (userValueTypeHint != null) {
builder.field("value_type", userValueTypeHint.getPreferredName());
}
if (format != null) {
builder.field("format", format);
@ -141,7 +136,7 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
@Override
public int hashCode() {
return Objects.hash(field, missingBucket, script, valueType, order, format);
return Objects.hash(field, missingBucket, script, userValueTypeHint, order, format);
}
@Override
@ -153,7 +148,7 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
AB that = (AB) o;
return Objects.equals(field, that.field()) &&
Objects.equals(script, that.script()) &&
Objects.equals(valueType, that.valueType()) &&
Objects.equals(userValueTypeHint, that.userValuetypeHint()) &&
Objects.equals(missingBucket, that.missingBucket()) &&
Objects.equals(order, that.order()) &&
Objects.equals(format, that.format());
@ -207,19 +202,19 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
* Sets the {@link ValueType} for the value produced by this source
*/
@SuppressWarnings("unchecked")
public AB valueType(ValueType valueType) {
public AB userValuetypeHint(ValueType valueType) {
if (valueType == null) {
throw new IllegalArgumentException("[valueType] must not be null");
throw new IllegalArgumentException("[userValueTypeHint] must not be null");
}
this.valueType = valueType;
this.userValueTypeHint = valueType;
return (AB) this;
}
/**
* Gets the {@link ValueType} for the value produced by this source
*/
public ValueType valueType() {
return valueType;
public ValueType userValuetypeHint() {
return userValueTypeHint;
}
/**
@ -297,9 +292,11 @@ public abstract class CompositeValuesSourceBuilder<AB extends CompositeValuesSou
protected abstract CompositeValuesSourceConfig innerBuild(QueryShardContext queryShardContext,
ValuesSourceConfig config) throws IOException;
protected abstract ValuesSourceType getDefaultValuesSourceType();
public final CompositeValuesSourceConfig build(QueryShardContext queryShardContext) throws IOException {
ValuesSourceConfig config = ValuesSourceConfig.resolveUnregistered(queryShardContext,
valueType, field, script, null, timeZone(), format, CoreValuesSourceType.BYTES);
ValuesSourceConfig config = ValuesSourceConfig.resolve(queryShardContext,
userValueTypeHint, field, script, null, timeZone(), format, getDefaultValuesSourceType());
return innerBuild(queryShardContext, config);
}

View File

@ -19,13 +19,29 @@
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.index.IndexReader;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.sort.SortOrder;
class CompositeValuesSourceConfig {
import java.util.function.LongConsumer;
public class CompositeValuesSourceConfig {
@FunctionalInterface
public interface SingleDimensionValuesSourceProvider {
SingleDimensionValuesSource<?> createValuesSource(
BigArrays bigArrays,
IndexReader reader,
int size,
LongConsumer addRequestCircuitBreakerBytes,
CompositeValuesSourceConfig config
);
}
private final String name;
@Nullable
private final MappedFieldType fieldType;
@ -34,6 +50,7 @@ class CompositeValuesSourceConfig {
private final int reverseMul;
private final boolean missingBucket;
private final boolean hasScript;
private final SingleDimensionValuesSourceProvider singleDimensionValuesSourceProvider;
/**
* Creates a new {@link CompositeValuesSourceConfig}.
@ -46,8 +63,16 @@ class CompositeValuesSourceConfig {
* @param missingBucket If <code>true</code> an explicit <code>null</code> bucket will represent documents with missing values.
* @param hasScript <code>true</code> if the source contains a script that can change the value.
*/
CompositeValuesSourceConfig(String name, @Nullable MappedFieldType fieldType, ValuesSource vs, DocValueFormat format,
SortOrder order, boolean missingBucket, boolean hasScript) {
CompositeValuesSourceConfig(
String name,
@Nullable MappedFieldType fieldType,
ValuesSource vs,
DocValueFormat format,
SortOrder order,
boolean missingBucket,
boolean hasScript,
SingleDimensionValuesSourceProvider singleDimensionValuesSourceProvider
) {
this.name = name;
this.fieldType = fieldType;
this.vs = vs;
@ -55,6 +80,7 @@ class CompositeValuesSourceConfig {
this.reverseMul = order == SortOrder.ASC ? 1 : -1;
this.missingBucket = missingBucket;
this.hasScript = hasScript;
this.singleDimensionValuesSourceProvider = singleDimensionValuesSourceProvider;
}
/**
@ -107,4 +133,13 @@ class CompositeValuesSourceConfig {
assert reverseMul == -1 || reverseMul == 1;
return reverseMul;
}
SingleDimensionValuesSource<?> createValuesSource(
BigArrays bigArrays,
IndexReader reader,
int size,
LongConsumer addRequestCircuitBreakerBytes
) {
return this.singleDimensionValuesSourceProvider.createValuesSource(bigArrays, reader, size, addRequestCircuitBreakerBytes, this);
}
}

View File

@ -38,20 +38,13 @@ import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpect
public class CompositeValuesSourceParserHelper {
static <VB extends CompositeValuesSourceBuilder<VB>, T> void declareValuesSourceFields(AbstractObjectParser<VB, T> objectParser,
ValueType expectedValueType) {
static <VB extends CompositeValuesSourceBuilder<VB>, T> void declareValuesSourceFields(AbstractObjectParser<VB, T> objectParser) {
objectParser.declareField(VB::field, XContentParser::text,
new ParseField("field"), ObjectParser.ValueType.STRING);
objectParser.declareBoolean(VB::missingBucket, new ParseField("missing_bucket"));
objectParser.declareField(VB::valueType, p -> {
objectParser.declareField(VB::userValuetypeHint, p -> {
ValueType valueType = ValueType.lenientParse(p.text());
if (expectedValueType != null && valueType.isNotA(expectedValueType)) {
throw new ParsingException(p.getTokenLocation(),
"Aggregation [" + objectParser.getName() + "] was configured with an incompatible value type ["
+ valueType + "]. It can only work on value of type ["
+ expectedValueType + "]");
}
return valueType;
}, new ParseField("value_type"), ObjectParser.ValueType.STRING);

View File

@ -19,12 +19,14 @@
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.index.IndexReader;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
@ -37,14 +39,18 @@ import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInter
import org.elasticsearch.search.aggregations.bucket.histogram.DateIntervalConsumer;
import org.elasticsearch.search.aggregations.bucket.histogram.DateIntervalWrapper;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Objects;
import java.util.function.LongConsumer;
/**
* A {@link CompositeValuesSourceBuilder} that builds a {@link RoundingValuesSource} from a {@link Script} or
@ -52,7 +58,24 @@ import java.util.Objects;
*/
public class DateHistogramValuesSourceBuilder
extends CompositeValuesSourceBuilder<DateHistogramValuesSourceBuilder> implements DateIntervalConsumer {
@FunctionalInterface
public interface DateHistogramCompositeSupplier extends ValuesSourceRegistry.CompositeSupplier {
CompositeValuesSourceConfig apply(
ValuesSourceConfig config,
Rounding rounding,
String name,
boolean hasScript, // probably redundant with the config, but currently we check this two different ways...
String format,
boolean missingBucket,
SortOrder order
);
}
static final String TYPE = "date_histogram";
static final ValuesSourceRegistry.RegistryKey<DateHistogramCompositeSupplier> REGISTRY_KEY = new ValuesSourceRegistry.RegistryKey<>(
TYPE,
DateHistogramCompositeSupplier.class
);
static final ObjectParser<DateHistogramValuesSourceBuilder, String> PARSER =
ObjectParser.fromBuilder(TYPE, DateHistogramValuesSourceBuilder::new);
@ -73,7 +96,7 @@ public class DateHistogramValuesSourceBuilder
return ZoneOffset.ofHours(p.intValue());
}
}, new ParseField("time_zone"), ObjectParser.ValueType.LONG);
CompositeValuesSourceParserHelper.declareValuesSourceFields(PARSER, ValueType.NUMERIC);
CompositeValuesSourceParserHelper.declareValuesSourceFields(PARSER);
}
private ZoneId timeZone = null;
@ -81,7 +104,7 @@ public class DateHistogramValuesSourceBuilder
private long offset = 0;
public DateHistogramValuesSourceBuilder(String name) {
super(name, ValueType.DATE);
super(name);
}
protected DateHistogramValuesSourceBuilder(StreamInput in) throws IOException {
@ -246,25 +269,60 @@ public class DateHistogramValuesSourceBuilder
return this;
}
public static void register(ValuesSourceRegistry.Builder builder) {
builder.registerComposite(
REGISTRY_KEY,
org.elasticsearch.common.collect.List.of(CoreValuesSourceType.DATE, CoreValuesSourceType.NUMERIC),
(valuesSourceConfig, rounding, name, hasScript, format, missingBucket, order) -> {
ValuesSource.Numeric numeric = (ValuesSource.Numeric) valuesSourceConfig.getValuesSource();
// TODO once composite is plugged in to the values source registry or at least understands Date values source types use it
// here
Rounding.Prepared preparedRounding = rounding.prepareForUnknown();
RoundingValuesSource vs = new RoundingValuesSource(numeric, preparedRounding);
// is specified in the builder.
final DocValueFormat docValueFormat = format == null ? DocValueFormat.RAW : valuesSourceConfig.format();
final MappedFieldType fieldType = valuesSourceConfig.fieldType();
return new CompositeValuesSourceConfig(
name,
fieldType,
vs,
docValueFormat,
order,
missingBucket,
hasScript,
(
BigArrays bigArrays,
IndexReader reader,
int size,
LongConsumer addRequestCircuitBreakerBytes,
CompositeValuesSourceConfig compositeValuesSourceConfig) -> {
final RoundingValuesSource roundingValuesSource = (RoundingValuesSource) compositeValuesSourceConfig.valuesSource();
return new LongValuesSource(
bigArrays,
compositeValuesSourceConfig.fieldType(),
roundingValuesSource::longValues,
roundingValuesSource::round,
compositeValuesSourceConfig.format(),
compositeValuesSourceConfig.missingBucket(),
size,
compositeValuesSourceConfig.reverseMul()
);
}
);
}
);
}
@Override
protected ValuesSourceType getDefaultValuesSourceType() {
return CoreValuesSourceType.DATE;
}
@Override
protected CompositeValuesSourceConfig innerBuild(QueryShardContext queryShardContext, ValuesSourceConfig config) throws IOException {
Rounding rounding = dateHistogramInterval.createRounding(timeZone(), offset);
ValuesSource orig = config.hasValues() ? config.getValuesSource() : null;
if (orig == null) {
orig = ValuesSource.Numeric.EMPTY;
}
if (orig instanceof ValuesSource.Numeric) {
ValuesSource.Numeric numeric = (ValuesSource.Numeric) orig;
// TODO once composite is plugged in to the values source registry or at least understands Date values source types use it here
Rounding.Prepared preparedRounding = rounding.prepareForUnknown();
RoundingValuesSource vs = new RoundingValuesSource(numeric, preparedRounding);
// is specified in the builder.
final DocValueFormat docValueFormat = format() == null ? DocValueFormat.RAW : config.format();
final MappedFieldType fieldType = config.fieldType();
return new CompositeValuesSourceConfig(name, fieldType, vs, docValueFormat, order(),
missingBucket(), config.script() != null);
} else {
throw new IllegalArgumentException("invalid source, expected numeric, got " + orig.getClass().getSimpleName());
}
return queryShardContext.getValuesSourceRegistry()
.getComposite(REGISTRY_KEY, config)
.apply(config, rounding, name, config.script() != null, format(), missingBucket(), order());
}
}

View File

@ -19,12 +19,14 @@
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.index.IndexReader;
import org.elasticsearch.Version;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.geo.GeoBoundingBox;
import org.elasticsearch.common.geo.GeoPoint;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
@ -34,15 +36,38 @@ import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.bucket.geogrid.CellIdSource;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileGridAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.geogrid.GeoTileUtils;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.Objects;
import java.util.function.LongConsumer;
import java.util.function.LongUnaryOperator;
public class GeoTileGridValuesSourceBuilder extends CompositeValuesSourceBuilder<GeoTileGridValuesSourceBuilder> {
@FunctionalInterface
public interface GeoTileCompositeSuppier extends ValuesSourceRegistry.CompositeSupplier {
CompositeValuesSourceConfig apply(
ValuesSourceConfig config,
int precision,
GeoBoundingBox boundingBox,
String name,
boolean hasScript, // probably redundant with the config, but currently we check this two different ways...
String format,
boolean missingBucket,
SortOrder order
);
}
static final String TYPE = "geotile_grid";
static final ValuesSourceRegistry.RegistryKey<GeoTileCompositeSuppier> REGISTRY_KEY = new ValuesSourceRegistry.RegistryKey(
TYPE,
GeoTileCompositeSuppier.class
);
private static final ObjectParser<GeoTileGridValuesSourceBuilder, Void> PARSER;
static {
@ -50,13 +75,61 @@ public class GeoTileGridValuesSourceBuilder extends CompositeValuesSourceBuilder
PARSER.declareInt(GeoTileGridValuesSourceBuilder::precision, new ParseField("precision"));
PARSER.declareField(((p, builder, context) -> builder.geoBoundingBox(GeoBoundingBox.parseBoundingBox(p))),
GeoBoundingBox.BOUNDS_FIELD, ObjectParser.ValueType.OBJECT);
CompositeValuesSourceParserHelper.declareValuesSourceFields(PARSER, ValueType.NUMERIC);
CompositeValuesSourceParserHelper.declareValuesSourceFields(PARSER);
}
static GeoTileGridValuesSourceBuilder parse(String name, XContentParser parser) throws IOException {
return PARSER.parse(parser, new GeoTileGridValuesSourceBuilder(name), null);
}
static void register(ValuesSourceRegistry.Builder builder) {
builder.registerComposite(
REGISTRY_KEY,
CoreValuesSourceType.GEOPOINT,
(valuesSourceConfig, precision, boundingBox, name, hasScript, format, missingBucket, order) -> {
ValuesSource.GeoPoint geoPoint = (ValuesSource.GeoPoint) valuesSourceConfig.getValuesSource();
// is specified in the builder.
final MappedFieldType fieldType = valuesSourceConfig.fieldType();
CellIdSource cellIdSource = new CellIdSource(
geoPoint,
precision,
boundingBox,
GeoTileUtils::longEncode
);
return new CompositeValuesSourceConfig(
name,
fieldType,
cellIdSource,
DocValueFormat.GEOTILE,
order,
missingBucket,
hasScript,
(
BigArrays bigArrays,
IndexReader reader,
int size,
LongConsumer addRequestCircuitBreakerBytes,
CompositeValuesSourceConfig compositeValuesSourceConfig
) -> {
final CellIdSource cis = (CellIdSource) compositeValuesSourceConfig.valuesSource();
return new GeoTileValuesSource(
bigArrays,
compositeValuesSourceConfig.fieldType(),
cis::longValues,
LongUnaryOperator.identity(),
compositeValuesSourceConfig.format(),
compositeValuesSourceConfig.missingBucket(),
size,
compositeValuesSourceConfig.reverseMul()
);
}
);
}
);
}
private int precision = GeoTileGridAggregationBuilder.DEFAULT_PRECISION;
private GeoBoundingBox geoBoundingBox = new GeoBoundingBox(new GeoPoint(Double.NaN, Double.NaN), new GeoPoint(Double.NaN, Double.NaN));
@ -127,22 +200,16 @@ public class GeoTileGridValuesSourceBuilder extends CompositeValuesSourceBuilder
&& Objects.equals(geoBoundingBox, other.geoBoundingBox);
}
@Override
protected ValuesSourceType getDefaultValuesSourceType() {
return CoreValuesSourceType.GEOPOINT;
}
@Override
protected CompositeValuesSourceConfig innerBuild(QueryShardContext queryShardContext, ValuesSourceConfig config) throws IOException {
ValuesSource orig = config.hasValues() ? config.getValuesSource() : null;
if (orig == null) {
orig = ValuesSource.GeoPoint.EMPTY;
}
if (orig instanceof ValuesSource.GeoPoint) {
ValuesSource.GeoPoint geoPoint = (ValuesSource.GeoPoint) orig;
// is specified in the builder.
final MappedFieldType fieldType = config.fieldType();
CellIdSource cellIdSource = new CellIdSource(geoPoint, precision, geoBoundingBox, GeoTileUtils::longEncode);
return new CompositeValuesSourceConfig(name, fieldType, cellIdSource, DocValueFormat.GEOTILE, order(),
missingBucket(), script() != null);
} else {
throw new IllegalArgumentException("invalid source, expected geo_point, got " + orig.getClass().getSimpleName());
}
return queryShardContext.getValuesSourceRegistry()
.getComposite(REGISTRY_KEY, config)
.apply(config, precision, geoBoundingBox(), name, script() != null, format(), missingBucket(), order());
}
}

View File

@ -19,42 +19,102 @@
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.index.IndexReader;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.support.ValueType;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.Objects;
import java.util.function.LongConsumer;
/**
* A {@link CompositeValuesSourceBuilder} that builds a {@link HistogramValuesSource} from another numeric values source
* using the provided interval.
*/
public class HistogramValuesSourceBuilder extends CompositeValuesSourceBuilder<HistogramValuesSourceBuilder> {
@FunctionalInterface
public interface HistogramCompositeSupplier extends ValuesSourceRegistry.CompositeSupplier {
CompositeValuesSourceConfig apply(
ValuesSourceConfig config,
double interval,
String name,
boolean hasScript, // probably redundant with the config, but currently we check this two different ways...
String format,
boolean missingBucket,
SortOrder order
);
}
static final String TYPE = "histogram";
static final ValuesSourceRegistry.RegistryKey<HistogramCompositeSupplier> REGISTRY_KEY = new ValuesSourceRegistry.RegistryKey<>(
TYPE,
HistogramCompositeSupplier.class
);
private static final ObjectParser<HistogramValuesSourceBuilder, Void> PARSER;
static {
PARSER = new ObjectParser<>(HistogramValuesSourceBuilder.TYPE);
PARSER.declareDouble(HistogramValuesSourceBuilder::interval, Histogram.INTERVAL_FIELD);
CompositeValuesSourceParserHelper.declareValuesSourceFields(PARSER, ValueType.NUMERIC);
CompositeValuesSourceParserHelper.declareValuesSourceFields(PARSER);
}
static HistogramValuesSourceBuilder parse(String name, XContentParser parser) throws IOException {
return PARSER.parse(parser, new HistogramValuesSourceBuilder(name), null);
}
public static void register(ValuesSourceRegistry.Builder builder) {
builder.registerComposite(
REGISTRY_KEY,
org.elasticsearch.common.collect.List.of(CoreValuesSourceType.DATE, CoreValuesSourceType.NUMERIC),
(valuesSourceConfig, interval, name, hasScript, format, missingBucket, order) -> {
ValuesSource.Numeric numeric = (ValuesSource.Numeric) valuesSourceConfig.getValuesSource();
final HistogramValuesSource vs = new HistogramValuesSource(numeric, interval);
final MappedFieldType fieldType = valuesSourceConfig.fieldType();
return new CompositeValuesSourceConfig(
name,
fieldType,
vs,
valuesSourceConfig.format(),
order,
missingBucket,
hasScript,
(
BigArrays bigArrays,
IndexReader reader,
int size,
LongConsumer addRequestCircuitBreakerBytes,
CompositeValuesSourceConfig compositeValuesSourceConfig) -> {
final ValuesSource.Numeric numericValuesSource = (ValuesSource.Numeric) compositeValuesSourceConfig.valuesSource();
return new DoubleValuesSource(
bigArrays,
compositeValuesSourceConfig.fieldType(),
numericValuesSource::doubleValues,
compositeValuesSourceConfig.format(),
compositeValuesSourceConfig.missingBucket(),
size,
compositeValuesSourceConfig.reverseMul()
);
}
);
});
}
private double interval = 0;
public HistogramValuesSourceBuilder(String name) {
super(name, ValueType.DOUBLE);
super(name);
}
protected HistogramValuesSourceBuilder(StreamInput in) throws IOException {
@ -109,20 +169,15 @@ public class HistogramValuesSourceBuilder extends CompositeValuesSourceBuilder<H
return this;
}
@Override
protected ValuesSourceType getDefaultValuesSourceType() {
return CoreValuesSourceType.NUMERIC;
}
@Override
protected CompositeValuesSourceConfig innerBuild(QueryShardContext queryShardContext, ValuesSourceConfig config) throws IOException {
ValuesSource orig = config.hasValues() ? config.getValuesSource() : null;
if (orig == null) {
orig = ValuesSource.Numeric.EMPTY;
}
if (orig instanceof ValuesSource.Numeric) {
ValuesSource.Numeric numeric = (ValuesSource.Numeric) orig;
final HistogramValuesSource vs = new HistogramValuesSource(numeric, interval);
final MappedFieldType fieldType = config.fieldType();
return new CompositeValuesSourceConfig(name, fieldType, vs, config.format(), order(),
missingBucket(), script() != null);
} else {
throw new IllegalArgumentException("invalid source, expected numeric, got " + orig.getClass().getSimpleName());
}
return queryShardContext.getValuesSourceRegistry()
.getComposite(REGISTRY_KEY, config)
.apply(config, interval, name, script() != null, format(), missingBucket(), order());
}
}

View File

@ -19,33 +19,57 @@
package org.elasticsearch.search.aggregations.bucket.composite;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.support.CoreValuesSourceType;
import org.elasticsearch.search.aggregations.support.ValuesSource;
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.support.ValuesSourceRegistry;
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import org.elasticsearch.search.sort.SortOrder;
import java.io.IOException;
import java.util.function.LongConsumer;
import java.util.function.LongUnaryOperator;
/**
* A {@link CompositeValuesSourceBuilder} that builds a {@link ValuesSource} from a {@link Script} or
* a field name.
*/
public class TermsValuesSourceBuilder extends CompositeValuesSourceBuilder<TermsValuesSourceBuilder> {
@FunctionalInterface
public interface TermsCompositeSupplier extends ValuesSourceRegistry.CompositeSupplier {
CompositeValuesSourceConfig apply(
ValuesSourceConfig config,
String name,
boolean hasScript, // probably redundant with the config, but currently we check this two different ways...
String format,
boolean missingBucket,
SortOrder order
);
}
static final String TYPE = "terms";
static final ValuesSourceRegistry.RegistryKey<TermsCompositeSupplier> REGISTRY_KEY = new ValuesSourceRegistry.RegistryKey<>(
TYPE,
TermsCompositeSupplier.class
);
private static final ObjectParser<TermsValuesSourceBuilder, Void> PARSER;
static {
PARSER = new ObjectParser<>(TermsValuesSourceBuilder.TYPE);
CompositeValuesSourceParserHelper.declareValuesSourceFields(PARSER, null);
CompositeValuesSourceParserHelper.declareValuesSourceFields(PARSER);
}
static TermsValuesSourceBuilder parse(String name, XContentParser parser) throws IOException {
return PARSER.parse(parser, new TermsValuesSourceBuilder(name), null);
}
@ -69,22 +93,122 @@ public class TermsValuesSourceBuilder extends CompositeValuesSourceBuilder<Terms
return TYPE;
}
static void register(ValuesSourceRegistry.Builder builder) {
builder.registerComposite(
REGISTRY_KEY,
org.elasticsearch.common.collect.List.of(CoreValuesSourceType.DATE, CoreValuesSourceType.NUMERIC, CoreValuesSourceType.BOOLEAN),
(valuesSourceConfig, name, hasScript, format, missingBucket, order) -> {
final DocValueFormat docValueFormat;
if (format == null && valuesSourceConfig.valueSourceType() == CoreValuesSourceType.DATE) {
// defaults to the raw format on date fields (preserve timestamp as longs).
docValueFormat = DocValueFormat.RAW;
} else {
docValueFormat = valuesSourceConfig.format();
}
return new CompositeValuesSourceConfig(
name,
valuesSourceConfig.fieldType(),
valuesSourceConfig.getValuesSource(),
docValueFormat,
order,
missingBucket,
hasScript,
(
BigArrays bigArrays,
IndexReader reader,
int size,
LongConsumer addRequestCircuitBreakerBytes,
CompositeValuesSourceConfig compositeValuesSourceConfig) -> {
final ValuesSource.Numeric vs = (ValuesSource.Numeric) compositeValuesSourceConfig.valuesSource();
if (vs.isFloatingPoint()) {
return new DoubleValuesSource(
bigArrays,
compositeValuesSourceConfig.fieldType(),
vs::doubleValues,
compositeValuesSourceConfig.format(),
compositeValuesSourceConfig.missingBucket(),
size,
compositeValuesSourceConfig.reverseMul()
);
} else {
final LongUnaryOperator rounding;
rounding = LongUnaryOperator.identity();
return new LongValuesSource(
bigArrays,
compositeValuesSourceConfig.fieldType(),
vs::longValues,
rounding,
compositeValuesSourceConfig.format(),
compositeValuesSourceConfig.missingBucket(),
size,
compositeValuesSourceConfig.reverseMul()
);
}
}
);
}
);
builder.registerComposite(
REGISTRY_KEY,
org.elasticsearch.common.collect.List.of(CoreValuesSourceType.BYTES, CoreValuesSourceType.IP),
(valuesSourceConfig, name, hasScript, format, missingBucket, order) -> new CompositeValuesSourceConfig(
name,
valuesSourceConfig.fieldType(),
valuesSourceConfig.getValuesSource(),
valuesSourceConfig.format(),
order,
missingBucket,
hasScript,
(
BigArrays bigArrays,
IndexReader reader,
int size,
LongConsumer addRequestCircuitBreakerBytes,
CompositeValuesSourceConfig compositeValuesSourceConfig) -> {
if (valuesSourceConfig.hasGlobalOrdinals() && reader instanceof DirectoryReader) {
ValuesSource.Bytes.WithOrdinals vs = (ValuesSource.Bytes.WithOrdinals) compositeValuesSourceConfig
.valuesSource();
return new GlobalOrdinalValuesSource(
bigArrays,
compositeValuesSourceConfig.fieldType(),
vs::globalOrdinalsValues,
compositeValuesSourceConfig.format(),
compositeValuesSourceConfig.missingBucket(),
size,
compositeValuesSourceConfig.reverseMul()
);
} else {
ValuesSource.Bytes vs = (ValuesSource.Bytes) compositeValuesSourceConfig.valuesSource();
return new BinaryValuesSource(
bigArrays,
addRequestCircuitBreakerBytes,
compositeValuesSourceConfig.fieldType(),
vs::bytesValues,
compositeValuesSourceConfig.format(),
compositeValuesSourceConfig.missingBucket(),
size,
compositeValuesSourceConfig.reverseMul()
);
}
}
)
);
}
@Override
protected ValuesSourceType getDefaultValuesSourceType() {
return CoreValuesSourceType.BYTES;
}
@Override
protected CompositeValuesSourceConfig innerBuild(QueryShardContext queryShardContext, ValuesSourceConfig config) throws IOException {
ValuesSource vs = config.hasValues() ? config.getValuesSource() : null;
if (vs == null) {
// The field is unmapped so we use a value source that can parse any type of values.
// This is needed because the after values are parsed even when there are no values to process.
vs = ValuesSource.Bytes.WithOrdinals.EMPTY;
}
final MappedFieldType fieldType = config.fieldType();
final DocValueFormat format;
if (format() == null && fieldType instanceof DateFieldMapper.DateFieldType) {
// defaults to the raw format on date fields (preserve timestamp as longs).
format = DocValueFormat.RAW;
} else {
format = config.format();
}
return new CompositeValuesSourceConfig(name, fieldType, vs, format, order(), missingBucket(), script() != null);
return queryShardContext.getValuesSourceRegistry()
.getComposite(REGISTRY_KEY, config)
.apply(config, name, script() != null, format(), missingBucket(), order());
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.search.aggregations.support;
import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceBuilder;
import java.util.AbstractMap;
import java.util.ArrayList;
@ -28,6 +29,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
@ -38,14 +40,47 @@ import java.util.stream.Collectors;
*/
public class ValuesSourceRegistry {
public interface CompositeSupplier {
// this interface intentionally left blank
}
public static final class RegistryKey<T extends CompositeSupplier> {
private final String name;
private final Class<T> supplierType;
public RegistryKey(String name, Class<T> supplierType) {
this.name = Objects.requireNonNull(name);
this.supplierType = Objects.requireNonNull(supplierType);
}
public String getName() {
return name;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RegistryKey that = (RegistryKey) o;
return name.equals(that.name) && supplierType.equals(that.supplierType);
}
@Override
public int hashCode() {
return Objects.hash(name, supplierType);
}
}
public static class Builder {
private final AggregationUsageService.Builder usageServiceBuilder;
private Map<String, List<Map.Entry<ValuesSourceType, AggregatorSupplier>>> aggregatorRegistry = new HashMap<>();
private Map<RegistryKey<? extends CompositeSupplier>, List<Map.Entry<ValuesSourceType, CompositeSupplier>>> compositeRegistry =
new HashMap<>();
public Builder() {
this.usageServiceBuilder = new AggregationUsageService.Builder();
}
private final Map<String, List<Map.Entry<ValuesSourceType, AggregatorSupplier>>> aggregatorRegistry = new HashMap<>();
/**
* Register a ValuesSource to Aggregator mapping. This method registers mappings that only apply to a
@ -56,7 +91,7 @@ public class ValuesSourceRegistry {
* @param aggregatorSupplier An Aggregation-specific specialization of AggregatorSupplier which will construct the mapped aggregator
* from the aggregation standard set of parameters
*/
public synchronized void register(String aggregationName, ValuesSourceType valuesSourceType,
public void register(String aggregationName, ValuesSourceType valuesSourceType,
AggregatorSupplier aggregatorSupplier) {
if (aggregatorRegistry.containsKey(aggregationName) == false) {
aggregatorRegistry.put(aggregationName, new ArrayList<>());
@ -80,6 +115,46 @@ public class ValuesSourceRegistry {
}
}
/**
* Register a new key generation function for the
* {@link org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation}.
* @param registryKey the subclass of {@link CompositeSupplier} associated with the {@link CompositeValuesSourceBuilder} type this
* mapping is being registered for, paired with the name of the key type.
* @param valuesSourceType the {@link ValuesSourceType} this mapping applies to
* @param compositeSupplier A function returning an appropriate
* {@link org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceConfig}
*/
public <T extends CompositeSupplier> void registerComposite(
RegistryKey<T> registryKey,
ValuesSourceType valuesSourceType,
T compositeSupplier
) {
if (compositeRegistry.containsKey(registryKey) == false) {
compositeRegistry.put(registryKey, new ArrayList<>());
}
compositeRegistry.get(registryKey).add(new AbstractMap.SimpleEntry<>(valuesSourceType, compositeSupplier));
}
/**
* Register a new key generation function for the
* {@link org.elasticsearch.search.aggregations.bucket.composite.CompositeAggregation}. This is a convenience version to map
* multiple types to the same supplier.
* @param registryKey the subclass of {@link CompositeSupplier} associated with the {@link CompositeValuesSourceBuilder} type this
* mapping is being registered for, paired with the name of the key type.
* @param valuesSourceTypes the {@link ValuesSourceType}s this mapping applies to
* @param compositeSupplier A function returning an appropriate
* {@link org.elasticsearch.search.aggregations.bucket.composite.CompositeValuesSourceConfig}
*/
public <T extends CompositeSupplier> void registerComposite(
RegistryKey<T> registryKey,
List<ValuesSourceType> valuesSourceTypes,
T compositeSupplier
) {
for (ValuesSourceType valuesSourceType : valuesSourceTypes) {
registerComposite(registryKey, valuesSourceType, compositeSupplier);
}
}
public void registerUsage(String aggregationName, ValuesSourceType valuesSourceType) {
usageServiceBuilder.registerAggregationUsage(aggregationName, valuesSourceType.typeName());
}
@ -89,21 +164,32 @@ public class ValuesSourceRegistry {
}
public ValuesSourceRegistry build() {
return new ValuesSourceRegistry(aggregatorRegistry, usageServiceBuilder.build());
return new ValuesSourceRegistry(aggregatorRegistry, compositeRegistry, usageServiceBuilder.build());
}
}
private static <K, T> Map<K, Map<ValuesSourceType, T>> copyMap(Map<K, List<Map.Entry<ValuesSourceType, T>>> mutableMap) {
/*
Make an immutatble copy of our input map. Since this is write once, read many, we'll spend a bit of extra time to shape this
into a Map.of(), which is more read optimized than just using a hash map.
*/
Map<K, Map<ValuesSourceType, T>> tmp = new HashMap<>();
mutableMap.forEach((key, value) -> tmp.put(key, value.stream().collect(
Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))));
return Collections.unmodifiableMap(tmp);
}
/** Maps Aggregation names to (ValuesSourceType, Supplier) pairs, keyed by ValuesSourceType */
private final AggregationUsageService usageService;
private final Map<String, Map<ValuesSourceType, AggregatorSupplier>> aggregatorRegistry;
private Map<RegistryKey<? extends CompositeSupplier>, Map<ValuesSourceType, CompositeSupplier>> compositeRegistry;
public ValuesSourceRegistry(Map<String, List<Map.Entry<ValuesSourceType, AggregatorSupplier>>> aggregatorRegistry,
Map<RegistryKey<? extends CompositeSupplier>, List<Map.Entry<ValuesSourceType, CompositeSupplier>>> compositeRegistry,
AggregationUsageService usageService) {
Map<String, Map<ValuesSourceType, AggregatorSupplier>> tmp = new HashMap<>();
aggregatorRegistry.forEach((key, value) -> tmp.put(key, value.stream().collect(
Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))));
this.aggregatorRegistry = Collections.unmodifiableMap(tmp);
this.usageService = usageService;
this.aggregatorRegistry = copyMap(aggregatorRegistry);
this.compositeRegistry = copyMap(compositeRegistry);
}
private AggregatorSupplier findMatchingSuppier(ValuesSourceType valuesSourceType,
@ -131,6 +217,18 @@ public class ValuesSourceRegistry {
throw new AggregationExecutionException("Unregistered Aggregation [" + aggregationName + "]");
}
public <T extends CompositeSupplier> T getComposite(RegistryKey<T> registryKey, ValuesSourceConfig config) {
if (registryKey != null && compositeRegistry.containsKey(registryKey)) {
CompositeSupplier supplier = compositeRegistry.get(registryKey).get(config.valueSourceType());
if (supplier == null) {
throw new IllegalArgumentException(config.getDescription() + " is not supported for composite source [" +
registryKey.getName() + "]");
}
return (T) supplier; // Safe because we checked the type matched the key at load time
}
throw new AggregationExecutionException("Unregistered composite source [" + registryKey.getName() + "]");
}
public AggregationUsageService getUsageService() {
return usageService;
}

View File

@ -141,7 +141,7 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
return mapperService;
}
public void testUnmappedField() throws Exception {
public void testUnmappedFieldWithTerms() throws Exception {
final List<Map<String, List<Object>>> dataset = new ArrayList<>();
dataset.addAll(
Arrays.asList(
@ -219,6 +219,245 @@ public class CompositeAggregatorTests extends AggregatorTestCase {
);
}
public void testUnmappedFieldWithGeopoint() throws Exception {
final List<Map<String, List<Object>>> dataset = new ArrayList<>();
final String mappedFieldName = "geo_point";
dataset.addAll(
Arrays.asList(
createDocument(mappedFieldName, new GeoPoint(48.934059, 41.610741)),
createDocument(mappedFieldName, new GeoPoint(-23.065941, 113.610741)),
createDocument(mappedFieldName, new GeoPoint(90.0, 0.0)),
createDocument(mappedFieldName, new GeoPoint(37.2343, -115.8067)),
createDocument(mappedFieldName, new GeoPoint(90.0, 0.0))
)
);
// just unmapped = no results
testSearchCase(
Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery(mappedFieldName)),
dataset,
() -> new CompositeAggregationBuilder("name",
Arrays.asList(
new GeoTileGridValuesSourceBuilder("unmapped") .field("unmapped")
)
),
(result) -> assertEquals(0, result.getBuckets().size())
);
// unmapped missing bucket = one result
testSearchCase(
Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery(mappedFieldName)),
dataset,
() -> new CompositeAggregationBuilder("name",
Arrays.asList(
new GeoTileGridValuesSourceBuilder("unmapped") .field("unmapped").missingBucket(true)
)
),
(result) -> {
assertEquals(1, result.getBuckets().size());
assertEquals("{unmapped=null}", result.afterKey().toString());
assertEquals("{unmapped=null}", result.getBuckets().get(0).getKeyAsString());
assertEquals(5L, result.getBuckets().get(0).getDocCount());
}
);
// field + unmapped, no missing bucket = no results
testSearchCase(
Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery(mappedFieldName)),
dataset,
() -> new CompositeAggregationBuilder("name",
Arrays.asList(
new GeoTileGridValuesSourceBuilder(mappedFieldName).field(mappedFieldName),
new GeoTileGridValuesSourceBuilder("unmapped") .field("unmapped")
)
),
(result) -> assertEquals(0, result.getBuckets().size())
);
// field + unmapped with missing bucket = multiple results
testSearchCase(
Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery(mappedFieldName)),
dataset,
() -> new CompositeAggregationBuilder("name",
Arrays.asList(
new GeoTileGridValuesSourceBuilder(mappedFieldName).field(mappedFieldName),
new GeoTileGridValuesSourceBuilder("unmapped") .field("unmapped").missingBucket(true)
)
),
(result) -> {
assertEquals(2, result.getBuckets().size());
assertEquals("{geo_point=7/64/56, unmapped=null}", result.afterKey().toString());
assertEquals("{geo_point=7/32/56, unmapped=null}", result.getBuckets().get(0).getKeyAsString());
assertEquals(2L, result.getBuckets().get(0).getDocCount());
assertEquals("{geo_point=7/64/56, unmapped=null}", result.getBuckets().get(1).getKeyAsString());
assertEquals(3L, result.getBuckets().get(1).getDocCount());
}
);
}
public void testUnmappedFieldWithHistogram() throws Exception {
final List<Map<String, List<Object>>> dataset = new ArrayList<>();
final String mappedFieldName = "price";
dataset.addAll(
Arrays.asList(
createDocument(mappedFieldName, 103L),
createDocument(mappedFieldName, 51L),
createDocument(mappedFieldName, 56L),
createDocument(mappedFieldName, 105L),
createDocument(mappedFieldName, 25L)
)
);
// just unmapped = no results
testSearchCase(
Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery(mappedFieldName)),
dataset,
() -> new CompositeAggregationBuilder(
"name",
Arrays.asList(new HistogramValuesSourceBuilder("unmapped").field("unmapped").interval(10))
),
(result) -> assertEquals(0, result.getBuckets().size())
);
// unmapped missing bucket = one result
testSearchCase(
Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery(mappedFieldName)),
dataset,
() -> new CompositeAggregationBuilder(
"name",
Arrays.asList(new HistogramValuesSourceBuilder("unmapped").field("unmapped").interval(10).missingBucket(true))
),
(result) -> {
assertEquals(1, result.getBuckets().size());
assertEquals("{unmapped=null}", result.afterKey().toString());
assertEquals("{unmapped=null}", result.getBuckets().get(0).getKeyAsString());
assertEquals(5L, result.getBuckets().get(0).getDocCount());
}
);
// field + unmapped, no missing bucket = no results
testSearchCase(
Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery(mappedFieldName)),
dataset,
() -> new CompositeAggregationBuilder(
"name",
Arrays.asList(
new HistogramValuesSourceBuilder(mappedFieldName).field(mappedFieldName).interval(10),
new HistogramValuesSourceBuilder("unmapped").field("unmapped").interval(10)
)
),
(result) -> assertEquals(0, result.getBuckets().size())
);
// field + unmapped with missing bucket = multiple results
testSearchCase(
Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery(mappedFieldName)),
dataset,
() -> new CompositeAggregationBuilder(
"name",
Arrays.asList(
new HistogramValuesSourceBuilder(mappedFieldName).field(mappedFieldName).interval(10),
new HistogramValuesSourceBuilder("unmapped").field("unmapped").interval(10).missingBucket(true)
)
),
(result) -> {
assertEquals(3, result.getBuckets().size());
assertEquals("{price=100.0, unmapped=null}", result.afterKey().toString());
assertEquals("{price=20.0, unmapped=null}", result.getBuckets().get(0).getKeyAsString());
assertEquals(1L, result.getBuckets().get(0).getDocCount());
assertEquals("{price=50.0, unmapped=null}", result.getBuckets().get(1).getKeyAsString());
assertEquals(2L, result.getBuckets().get(1).getDocCount());
assertEquals("{price=100.0, unmapped=null}", result.getBuckets().get(2).getKeyAsString());
assertEquals(2L, result.getBuckets().get(1).getDocCount());
}
);
}
public void testUnmappedFieldWithDateHistogram() throws Exception {
String mappedFieldName = "date";
final List<Map<String, List<Object>>> dataset = new ArrayList<>();
dataset.addAll(
Arrays.asList(
createDocument(mappedFieldName, asLong("2017-10-20T03:08:45")),
createDocument(mappedFieldName, asLong("2016-09-20T09:00:34")),
createDocument(mappedFieldName, asLong("2016-09-20T11:34:00")),
createDocument(mappedFieldName, asLong("2017-10-20T06:09:24")),
createDocument(mappedFieldName, asLong("2017-10-19T06:09:24"))
)
);
// just unmapped = no results
testSearchCase(
Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery(mappedFieldName)),
dataset,
() -> new CompositeAggregationBuilder(
"name",
Arrays.asList(
new DateHistogramValuesSourceBuilder("unmapped").field("unmapped").calendarInterval(DateHistogramInterval.days(1))
)
),
(result) -> assertEquals(0, result.getBuckets().size())
);
// unmapped missing bucket = one result
testSearchCase(
Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery(mappedFieldName)),
dataset,
() -> new CompositeAggregationBuilder(
"name",
Arrays.asList(
new DateHistogramValuesSourceBuilder("unmapped").field("unmapped")
.calendarInterval(DateHistogramInterval.days(1))
.missingBucket(true)
)
),
(result) -> {
assertEquals(1, result.getBuckets().size());
assertEquals("{unmapped=null}", result.afterKey().toString());
assertEquals("{unmapped=null}", result.getBuckets().get(0).getKeyAsString());
assertEquals(5L, result.getBuckets().get(0).getDocCount());
}
);
// field + unmapped, no missing bucket = no results
testSearchCase(
Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery(mappedFieldName)),
dataset,
() -> new CompositeAggregationBuilder(
"name",
Arrays.asList(
new HistogramValuesSourceBuilder(mappedFieldName).field(mappedFieldName).interval(10),
new DateHistogramValuesSourceBuilder("unmapped").field("unmapped").calendarInterval(DateHistogramInterval.days(1))
)
),
(result) -> assertEquals(0, result.getBuckets().size())
);
// field + unmapped with missing bucket = multiple results
testSearchCase(
Arrays.asList(new MatchAllDocsQuery(), new DocValuesFieldExistsQuery(mappedFieldName)),
dataset,
() -> new CompositeAggregationBuilder(
"name",
Arrays.asList(
new DateHistogramValuesSourceBuilder(mappedFieldName).field(mappedFieldName)
.calendarInterval(DateHistogramInterval.days(1)),
new DateHistogramValuesSourceBuilder("unmapped").field("unmapped")
.calendarInterval(DateHistogramInterval.days(1))
.missingBucket(true)
)
),
(result) -> {
assertEquals(3, result.getBuckets().size());
assertEquals("{date=1508457600000, unmapped=null}", result.afterKey().toString());
assertEquals("{date=1474329600000, unmapped=null}", result.getBuckets().get(0).getKeyAsString());
assertEquals(2L, result.getBuckets().get(0).getDocCount());
assertEquals("{date=1508371200000, unmapped=null}", result.getBuckets().get(1).getKeyAsString());
assertEquals(1L, result.getBuckets().get(1).getDocCount());
assertEquals("{date=1508457600000, unmapped=null}", result.getBuckets().get(2).getKeyAsString());
assertEquals(2L, result.getBuckets().get(2).getDocCount());
}
);
}
public void testWithKeyword() throws Exception {
final List<Map<String, List<Object>>> dataset = new ArrayList<>();
dataset.addAll(

View File

@ -60,9 +60,11 @@ public class ValuesSourceRegistryTests extends ESTestCase {
);
ValuesSourceRegistry registry = new ValuesSourceRegistry(
Collections.singletonMap("bogus", Collections.emptyList()),
null);
Collections.singletonMap(new ValuesSourceRegistry.RegistryKey<>("bogus", ValuesSourceRegistry.CompositeSupplier.class),
Collections.emptyList()),
null
);
expectThrows(IllegalArgumentException.class, () -> registry.getAggregator(fieldOnly, "bogus"));
expectThrows(IllegalArgumentException.class, () -> registry.getAggregator(scriptOnly, "bogus"));
}
}

View File

@ -42,21 +42,21 @@ public abstract class GroupByKey extends Agg {
if (script != null) {
builder.script(script.toPainless());
if (script.outputType().isInteger()) {
builder.valueType(ValueType.LONG);
builder.userValuetypeHint(ValueType.LONG);
} else if (script.outputType().isRational()) {
builder.valueType(ValueType.DOUBLE);
builder.userValuetypeHint(ValueType.DOUBLE);
} else if (DataTypes.isString(script.outputType())) {
builder.valueType(ValueType.STRING);
builder.userValuetypeHint(ValueType.STRING);
} else if (script.outputType() == DATE) {
builder.valueType(ValueType.LONG);
builder.userValuetypeHint(ValueType.LONG);
} else if (script.outputType() == TIME) {
builder.valueType(ValueType.LONG);
builder.userValuetypeHint(ValueType.LONG);
} else if (script.outputType() == DATETIME) {
builder.valueType(ValueType.LONG);
builder.userValuetypeHint(ValueType.LONG);
} else if (script.outputType() == BOOLEAN) {
builder.valueType(ValueType.BOOLEAN);
builder.userValuetypeHint(ValueType.BOOLEAN);
} else if (script.outputType() == IP) {
builder.valueType(ValueType.IP);
builder.userValuetypeHint(ValueType.IP);
}
}
// field based

View File

@ -1306,7 +1306,7 @@ public class QueryTranslatorTests extends ESTestCase {
assertEquals("MAX(int)", eqe.output().get(0).qualifiedName());
assertEquals(INTEGER, eqe.output().get(0).dataType());
assertThat(eqe.queryContainer().aggs().asAggBuilder().toString().replaceAll("\\s+", ""),
containsString("\"date_histogram\":{\"field\":\"date\",\"missing_bucket\":true,\"value_type\":\"date\",\"order\":\"asc\","
containsString("\"date_histogram\":{\"field\":\"date\",\"missing_bucket\":true,\"order\":\"asc\","
+ "\"fixed_interval\":\"62208000000ms\",\"time_zone\":\"Z\"}}}]}"));
}
@ -1321,7 +1321,7 @@ public class QueryTranslatorTests extends ESTestCase {
assertEquals("h", eqe.output().get(1).qualifiedName());
assertEquals(DATETIME, eqe.output().get(1).dataType());
assertThat(eqe.queryContainer().aggs().asAggBuilder().toString().replaceAll("\\s+", ""),
containsString("\"date_histogram\":{\"field\":\"date\",\"missing_bucket\":true,\"value_type\":\"date\"," +
containsString("\"date_histogram\":{\"field\":\"date\",\"missing_bucket\":true," +
"\"order\":\"asc\",\"fixed_interval\":\"139968000000ms\",\"time_zone\":\"Z\"}}}]}"));
}
@ -1333,7 +1333,7 @@ public class QueryTranslatorTests extends ESTestCase {
assertEquals("YEAR(date)", eqe.output().get(0).qualifiedName());
assertEquals(INTEGER, eqe.output().get(0).dataType());
assertThat(eqe.queryContainer().aggs().asAggBuilder().toString().replaceAll("\\s+", ""),
endsWith("\"date_histogram\":{\"field\":\"date\",\"missing_bucket\":true,\"value_type\":\"date\",\"order\":\"asc\","
endsWith("\"date_histogram\":{\"field\":\"date\",\"missing_bucket\":true,\"order\":\"asc\","
+ "\"calendar_interval\":\"1y\",\"time_zone\":\"Z\"}}}]}}}"));
}
@ -1345,7 +1345,7 @@ public class QueryTranslatorTests extends ESTestCase {
assertEquals("h", eqe.output().get(0).qualifiedName());
assertEquals(DATETIME, eqe.output().get(0).dataType());
assertThat(eqe.queryContainer().aggs().asAggBuilder().toString().replaceAll("\\s+", ""),
endsWith("\"date_histogram\":{\"field\":\"date\",\"missing_bucket\":true,\"value_type\":\"date\",\"order\":\"asc\","
endsWith("\"date_histogram\":{\"field\":\"date\",\"missing_bucket\":true,\"order\":\"asc\","
+ "\"calendar_interval\":\"1M\",\"time_zone\":\"Z\"}}}]}}}"));
}
@ -1357,7 +1357,7 @@ public class QueryTranslatorTests extends ESTestCase {
assertEquals("h", eqe.output().get(0).qualifiedName());
assertEquals(DATETIME, eqe.output().get(0).dataType());
assertThat(eqe.queryContainer().aggs().asAggBuilder().toString().replaceAll("\\s+", ""),
endsWith("\"date_histogram\":{\"field\":\"date\",\"missing_bucket\":true,\"value_type\":\"date\",\"order\":\"asc\","
endsWith("\"date_histogram\":{\"field\":\"date\",\"missing_bucket\":true,\"order\":\"asc\","
+ "\"fixed_interval\":\"12960000000ms\",\"time_zone\":\"Z\"}}}]}}}"));
}
@ -1369,7 +1369,7 @@ public class QueryTranslatorTests extends ESTestCase {
assertEquals("h", eqe.output().get(0).qualifiedName());
assertEquals(DATETIME, eqe.output().get(0).dataType());
assertThat(eqe.queryContainer().aggs().asAggBuilder().toString().replaceAll("\\s+", ""),
endsWith("\"date_histogram\":{\"field\":\"date\",\"missing_bucket\":true,\"value_type\":\"date\",\"order\":\"asc\","
endsWith("\"date_histogram\":{\"field\":\"date\",\"missing_bucket\":true,\"order\":\"asc\","
+ "\"calendar_interval\":\"1d\",\"time_zone\":\"Z\"}}}]}}}"));
}
@ -1381,7 +1381,7 @@ public class QueryTranslatorTests extends ESTestCase {
assertEquals("h", eqe.output().get(0).qualifiedName());
assertEquals(DATETIME, eqe.output().get(0).dataType());
assertThat(eqe.queryContainer().aggs().asAggBuilder().toString().replaceAll("\\s+", ""),
endsWith("\"date_histogram\":{\"field\":\"date\",\"missing_bucket\":true,\"value_type\":\"date\",\"order\":\"asc\","
endsWith("\"date_histogram\":{\"field\":\"date\",\"missing_bucket\":true,\"order\":\"asc\","
+ "\"fixed_interval\":\"104400000ms\",\"time_zone\":\"Z\"}}}]}}}"));
}