Aggregation refactor: make aggregationFactory implement NamedWritable
Also makes AggregatorFactories implement Writable
This commit is contained in:
parent
855c199f60
commit
712b7116f4
|
@ -37,6 +37,8 @@ import org.elasticsearch.common.geo.builders.ShapeBuilder;
|
|||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
|
||||
import org.joda.time.DateTime;
|
||||
import org.joda.time.DateTimeZone;
|
||||
|
||||
|
@ -622,6 +624,20 @@ public abstract class StreamInput extends InputStream {
|
|||
throw new UnsupportedOperationException("can't read named writeable from StreamInput");
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a {@link AggregatorFactory} from the current stream
|
||||
*/
|
||||
public AggregatorFactory readAggregatorFactory() throws IOException {
|
||||
return readNamedWriteable(AggregatorFactory.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a {@link PipelineAggregatorFactory} from the current stream
|
||||
*/
|
||||
public PipelineAggregatorFactory readPipelineAggregatorFactory() throws IOException {
|
||||
return readNamedWriteable(PipelineAggregatorFactory.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads a {@link QueryBuilder} from the current stream
|
||||
*/
|
||||
|
|
|
@ -36,6 +36,8 @@ import org.elasticsearch.common.geo.builders.ShapeBuilder;
|
|||
import org.elasticsearch.common.text.Text;
|
||||
import org.elasticsearch.index.query.QueryBuilder;
|
||||
import org.elasticsearch.index.query.functionscore.ScoreFunctionBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
|
||||
import org.joda.time.ReadableInstant;
|
||||
|
||||
import java.io.EOFException;
|
||||
|
@ -612,6 +614,20 @@ public abstract class StreamOutput extends OutputStream {
|
|||
namedWriteable.writeTo(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a {@link AggregatorFactory} to the current stream
|
||||
*/
|
||||
public void writeAggregatorFactory(AggregatorFactory factory) throws IOException {
|
||||
writeNamedWriteable(factory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a {@link PipelineAggregatorFactory} to the current stream
|
||||
*/
|
||||
public void writePipelineAggregatorFactory(PipelineAggregatorFactory factory) throws IOException {
|
||||
writeNamedWriteable(factory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a {@link QueryBuilder} to the current stream
|
||||
*/
|
||||
|
|
|
@ -61,6 +61,11 @@ public abstract class Aggregator extends BucketCollector implements Releasable {
|
|||
*/
|
||||
AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException;
|
||||
|
||||
/**
|
||||
* @return an empty {@link AggregatorFactory} instance for this parser
|
||||
* that can be used for deserialization
|
||||
*/
|
||||
AggregatorFactory getFactoryPrototype();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -18,6 +18,11 @@
|
|||
*/
|
||||
package org.elasticsearch.search.aggregations;
|
||||
|
||||
import org.elasticsearch.action.support.ToXContentToBytes;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
||||
|
@ -26,19 +31,23 @@ import org.elasticsearch.search.aggregations.support.AggregationPath.PathElement
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class AggregatorFactories {
|
||||
public class AggregatorFactories extends ToXContentToBytes implements Writeable<AggregatorFactories> {
|
||||
|
||||
public static final AggregatorFactories EMPTY = new Empty();
|
||||
public static final AggregatorFactories EMPTY = new AggregatorFactories(new AggregatorFactory[0],
|
||||
new ArrayList<PipelineAggregatorFactory>());
|
||||
|
||||
private AggregatorFactory parent;
|
||||
private AggregatorFactory[] factories;
|
||||
|
@ -48,7 +57,8 @@ public class AggregatorFactories {
|
|||
return new Builder();
|
||||
}
|
||||
|
||||
private AggregatorFactories(AggregatorFactory[] factories, List<PipelineAggregatorFactory> pipelineAggregators) {
|
||||
private AggregatorFactories(AggregatorFactory[] factories,
|
||||
List<PipelineAggregatorFactory> pipelineAggregators) {
|
||||
this.factories = factories;
|
||||
this.pipelineAggregatorFactories = pipelineAggregators;
|
||||
}
|
||||
|
@ -115,33 +125,12 @@ public class AggregatorFactories {
|
|||
}
|
||||
}
|
||||
|
||||
private final static class Empty extends AggregatorFactories {
|
||||
|
||||
private static final AggregatorFactory[] EMPTY_FACTORIES = new AggregatorFactory[0];
|
||||
private static final Aggregator[] EMPTY_AGGREGATORS = new Aggregator[0];
|
||||
private static final List<PipelineAggregatorFactory> EMPTY_PIPELINE_AGGREGATORS = new ArrayList<>();
|
||||
|
||||
private Empty() {
|
||||
super(EMPTY_FACTORIES, EMPTY_PIPELINE_AGGREGATORS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator[] createSubAggregators(Aggregator parent) {
|
||||
return EMPTY_AGGREGATORS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Aggregator[] createTopLevelAggregators() {
|
||||
return EMPTY_AGGREGATORS;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class Builder {
|
||||
|
||||
private final Set<String> names = new HashSet<>();
|
||||
private final List<AggregatorFactory> factories = new ArrayList<>();
|
||||
private final List<PipelineAggregatorFactory> pipelineAggregatorFactories = new ArrayList<>();
|
||||
private boolean skipResolveOrder;
|
||||
|
||||
public Builder addAggregator(AggregatorFactory factory) {
|
||||
if (!names.add(factory.name)) {
|
||||
|
@ -156,15 +145,29 @@ public class AggregatorFactories {
|
|||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* FOR TESTING ONLY
|
||||
*/
|
||||
Builder skipResolveOrder() {
|
||||
this.skipResolveOrder = true;
|
||||
return this;
|
||||
}
|
||||
|
||||
public AggregatorFactories build() {
|
||||
if (factories.isEmpty() && pipelineAggregatorFactories.isEmpty()) {
|
||||
return EMPTY;
|
||||
}
|
||||
List<PipelineAggregatorFactory> orderedpipelineAggregators = resolvePipelineAggregatorOrder(this.pipelineAggregatorFactories, this.factories);
|
||||
List<PipelineAggregatorFactory> orderedpipelineAggregators = null;
|
||||
if (skipResolveOrder) {
|
||||
orderedpipelineAggregators = new ArrayList<>(pipelineAggregatorFactories);
|
||||
} else {
|
||||
orderedpipelineAggregators = resolvePipelineAggregatorOrder(this.pipelineAggregatorFactories, this.factories);
|
||||
}
|
||||
return new AggregatorFactories(factories.toArray(new AggregatorFactory[factories.size()]), orderedpipelineAggregators);
|
||||
}
|
||||
|
||||
private List<PipelineAggregatorFactory> resolvePipelineAggregatorOrder(List<PipelineAggregatorFactory> pipelineAggregatorFactories, List<AggregatorFactory> aggFactories) {
|
||||
private List<PipelineAggregatorFactory> resolvePipelineAggregatorOrder(List<PipelineAggregatorFactory> pipelineAggregatorFactories,
|
||||
List<AggregatorFactory> aggFactories) {
|
||||
Map<String, PipelineAggregatorFactory> pipelineAggregatorFactoriesMap = new HashMap<>();
|
||||
for (PipelineAggregatorFactory factory : pipelineAggregatorFactories) {
|
||||
pipelineAggregatorFactoriesMap.put(factory.getName(), factory);
|
||||
|
@ -259,4 +262,71 @@ public class AggregatorFactories {
|
|||
return this.pipelineAggregatorFactories;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public AggregatorFactories readFrom(StreamInput in) throws IOException {
|
||||
int factoriesSize = in.readVInt();
|
||||
AggregatorFactory[] factoriesList = new AggregatorFactory[factoriesSize];
|
||||
for (int i = 0; i < factoriesSize; i++) {
|
||||
AggregatorFactory factory = in.readAggregatorFactory();
|
||||
factoriesList[i] = factory;
|
||||
}
|
||||
int pipelineFactoriesSize = in.readVInt();
|
||||
List<PipelineAggregatorFactory> pipelineAggregatorFactoriesList = new ArrayList<PipelineAggregatorFactory>(pipelineFactoriesSize);
|
||||
for (int i = 0; i < pipelineFactoriesSize; i++) {
|
||||
PipelineAggregatorFactory factory = in.readPipelineAggregatorFactory();
|
||||
pipelineAggregatorFactoriesList.add(factory);
|
||||
}
|
||||
AggregatorFactories aggregatorFactories = new AggregatorFactories(factoriesList,
|
||||
Collections.unmodifiableList(pipelineAggregatorFactoriesList));
|
||||
return aggregatorFactories;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(this.factories.length);
|
||||
for (AggregatorFactory factory : factories) {
|
||||
out.writeAggregatorFactory(factory);
|
||||
}
|
||||
out.writeVInt(this.pipelineAggregatorFactories.size());
|
||||
for (PipelineAggregatorFactory factory : pipelineAggregatorFactories) {
|
||||
out.writePipelineAggregatorFactory(factory);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
if (factories != null) {
|
||||
for (AggregatorFactory subAgg : factories) {
|
||||
subAgg.toXContent(builder, params);
|
||||
}
|
||||
}
|
||||
if (pipelineAggregatorFactories != null) {
|
||||
for (PipelineAggregatorFactory subAgg : pipelineAggregatorFactories) {
|
||||
subAgg.toXContent(builder, params);
|
||||
}
|
||||
}
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(Arrays.hashCode(factories), pipelineAggregatorFactories);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
AggregatorFactories other = (AggregatorFactories) obj;
|
||||
if (!Objects.deepEquals(factories, other.factories))
|
||||
return false;
|
||||
if (!Objects.equals(pipelineAggregatorFactories, other.pipelineAggregatorFactories))
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,11 +18,17 @@
|
|||
*/
|
||||
package org.elasticsearch.search.aggregations;
|
||||
|
||||
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.search.Scorer;
|
||||
import org.elasticsearch.action.support.ToXContentToBytes;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.common.util.ObjectArray;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
||||
import org.elasticsearch.search.internal.SearchContext.Lifetime;
|
||||
|
@ -30,11 +36,12 @@ import org.elasticsearch.search.internal.SearchContext.Lifetime;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* A factory that knows how to create an {@link Aggregator} of a specific type.
|
||||
*/
|
||||
public abstract class AggregatorFactory {
|
||||
public abstract class AggregatorFactory extends ToXContentToBytes implements NamedWriteable<AggregatorFactory> {
|
||||
|
||||
protected String name;
|
||||
protected String type;
|
||||
|
@ -64,6 +71,13 @@ public abstract class AggregatorFactory {
|
|||
this.factories.init(context);
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows the {@link AggregatorFactory} to initialize any state prior to
|
||||
* using it to create {@link Aggregator}s.
|
||||
*
|
||||
* @param context
|
||||
* the {@link AggregationContext} to use during initialization.
|
||||
*/
|
||||
protected void doInit(AggregationContext context) {
|
||||
}
|
||||
|
||||
|
@ -105,7 +119,6 @@ public abstract class AggregatorFactory {
|
|||
/**
|
||||
* Creates the aggregator
|
||||
*
|
||||
* @param context The aggregation context
|
||||
* @param parent The parent aggregator (if this is a top level factory, the parent will be {@code null})
|
||||
* @param collectsFromSingleBucket If true then the created aggregator will only be collected with <tt>0</tt> as a bucket ordinal.
|
||||
* Some factories can take advantage of this in order to return more optimized implementations.
|
||||
|
@ -123,13 +136,68 @@ public abstract class AggregatorFactory {
|
|||
this.metaData = metaData;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final AggregatorFactory readFrom(StreamInput in) throws IOException {
|
||||
String name = in.readString();
|
||||
AggregatorFactory factory = doReadFrom(name, in);
|
||||
factory.factories = AggregatorFactories.EMPTY.readFrom(in);
|
||||
factory.factories.setParent(this);
|
||||
factory.metaData = in.readMap();
|
||||
return factory;
|
||||
}
|
||||
|
||||
// NORELEASE make this abstract when agg refactor complete
|
||||
protected AggregatorFactory doReadFrom(String name, StreamInput in) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(name);
|
||||
doWriteTo(out);
|
||||
factories.writeTo(out);
|
||||
out.writeMap(metaData);
|
||||
}
|
||||
|
||||
// NORELEASE make this abstract when agg refactor complete
|
||||
protected void doWriteTo(StreamOutput out) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(name);
|
||||
|
||||
if (this.metaData != null) {
|
||||
builder.field("meta", this.metaData);
|
||||
}
|
||||
builder.field(type);
|
||||
internalXContent(builder, params);
|
||||
|
||||
if (factories != null && factories.count() > 0) {
|
||||
builder.field("aggregations");
|
||||
factories.toXContent(builder, params);
|
||||
|
||||
}
|
||||
|
||||
return builder.endObject();
|
||||
}
|
||||
|
||||
// NORELEASE make this method abstract when agg refactor complete
|
||||
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return type;
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method. Given an {@link AggregatorFactory} that creates {@link Aggregator}s that only know how
|
||||
* to collect bucket <tt>0</tt>, this returns an aggregator that can collect any bucket.
|
||||
*/
|
||||
protected static Aggregator asMultiBucketAggregator(final AggregatorFactory factory, final AggregationContext context, final Aggregator parent) throws IOException {
|
||||
protected static Aggregator asMultiBucketAggregator(final AggregatorFactory factory,
|
||||
final AggregationContext context, final Aggregator parent) throws IOException {
|
||||
final Aggregator first = factory.create(parent, true);
|
||||
final BigArrays bigArrays = context.bigArrays();
|
||||
return new Aggregator() {
|
||||
|
@ -248,4 +316,41 @@ public abstract class AggregatorFactory {
|
|||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(factories, metaData, name, type, doHashCode());
|
||||
}
|
||||
|
||||
// NORELEASE make this method abstract here when agg refactor complete (so
|
||||
// that subclasses are forced to implement it)
|
||||
protected int doHashCode() {
|
||||
throw new UnsupportedOperationException(
|
||||
"This method should be implemented by a sub-class and should not rely on this method. When agg re-factoring is complete this method will be made abstract.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
AggregatorFactory other = (AggregatorFactory) obj;
|
||||
if (!Objects.equals(name, other.name))
|
||||
return false;
|
||||
if (!Objects.equals(type, other.type))
|
||||
return false;
|
||||
if (!Objects.equals(metaData, other.metaData))
|
||||
return false;
|
||||
if (!Objects.equals(factories, other.factories))
|
||||
return false;
|
||||
return doEquals(obj);
|
||||
}
|
||||
|
||||
// NORELEASE make this method abstract here when agg refactor complete (so
|
||||
// that subclasses are forced to implement it)
|
||||
protected boolean doEquals(Object obj) {
|
||||
throw new UnsupportedOperationException(
|
||||
"This method should be implemented by a sub-class and should not rely on this method. When agg re-factoring is complete this method will be made abstract.");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.elasticsearch.search.aggregations;
|
||||
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.search.SearchParseException;
|
||||
|
@ -54,15 +55,26 @@ public class AggregatorParsers {
|
|||
* ).
|
||||
*/
|
||||
@Inject
|
||||
public AggregatorParsers(Set<Aggregator.Parser> aggParsers, Set<PipelineAggregator.Parser> pipelineAggregatorParsers) {
|
||||
public AggregatorParsers(Set<Aggregator.Parser> aggParsers, Set<PipelineAggregator.Parser> pipelineAggregatorParsers,
|
||||
NamedWriteableRegistry namedWriteableRegistry) {
|
||||
Map<String, Aggregator.Parser> aggParsersBuilder = new HashMap<>(aggParsers.size());
|
||||
for (Aggregator.Parser parser : aggParsers) {
|
||||
aggParsersBuilder.put(parser.type(), parser);
|
||||
AggregatorFactory factoryPrototype = parser.getFactoryPrototype();
|
||||
// NORELEASE remove this check when agg refactoring complete
|
||||
if (factoryPrototype != null) {
|
||||
namedWriteableRegistry.registerPrototype(AggregatorFactory.class, factoryPrototype);
|
||||
}
|
||||
}
|
||||
this.aggParsers = unmodifiableMap(aggParsersBuilder);
|
||||
Map<String, PipelineAggregator.Parser> pipelineAggregatorParsersBuilder = new HashMap<>(pipelineAggregatorParsers.size());
|
||||
for (PipelineAggregator.Parser parser : pipelineAggregatorParsers) {
|
||||
pipelineAggregatorParsersBuilder.put(parser.type(), parser);
|
||||
PipelineAggregatorFactory factoryPrototype = parser.getFactoryPrototype();
|
||||
// NORELEASE remove this check when agg refactoring complete
|
||||
if (factoryPrototype != null) {
|
||||
namedWriteableRegistry.registerPrototype(PipelineAggregatorFactory.class, factoryPrototype);
|
||||
}
|
||||
}
|
||||
this.pipelineAggregatorParsers = unmodifiableMap(pipelineAggregatorParsersBuilder);
|
||||
}
|
||||
|
|
|
@ -67,4 +67,10 @@ public class ChildrenParser implements Aggregator.Parser {
|
|||
|
||||
return new ParentToChildrenAggregator.Factory(aggregationName, childType);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,7 +34,6 @@ import org.elasticsearch.common.util.LongObjectPagedHashMap;
|
|||
import org.elasticsearch.index.fielddata.plain.ParentChildIndexFieldData;
|
||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
|
||||
import org.elasticsearch.index.search.child.ConstantScorer;
|
||||
import org.elasticsearch.search.SearchParseException;
|
||||
import org.elasticsearch.search.aggregations.Aggregator;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactories;
|
||||
|
@ -49,6 +48,7 @@ import org.elasticsearch.search.aggregations.support.ValuesSource;
|
|||
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceParser;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceType;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
@ -185,8 +185,8 @@ public class ParentToChildrenAggregator extends SingleBucketAggregator {
|
|||
|
||||
private String parentType;
|
||||
private final String childType;
|
||||
private Filter parentFilter;
|
||||
private Filter childFilter;
|
||||
private Query parentFilter;
|
||||
private Query childFilter;
|
||||
|
||||
public Factory(String name, String childType) {
|
||||
super(name, InternalChildren.TYPE.name(), new ValuesSourceParser.Input<ValuesSource.Bytes.WithOrdinals.ParentChild>());
|
||||
|
@ -221,7 +221,7 @@ public class ParentToChildrenAggregator extends SingleBucketAggregator {
|
|||
}
|
||||
|
||||
private void resolveConfig(AggregationContext aggregationContext) {
|
||||
config = new ValuesSourceConfig<>(ValuesSource.Bytes.WithOrdinals.ParentChild.class);
|
||||
config = new ValuesSourceConfig<>(ValuesSourceType.BYTES);
|
||||
DocumentMapper childDocMapper = aggregationContext.searchContext().mapperService().documentMapper(childType);
|
||||
|
||||
if (childDocMapper != null) {
|
||||
|
@ -233,9 +233,8 @@ public class ParentToChildrenAggregator extends SingleBucketAggregator {
|
|||
parentType = parentFieldMapper.type();
|
||||
DocumentMapper parentDocMapper = aggregationContext.searchContext().mapperService().documentMapper(parentType);
|
||||
if (parentDocMapper != null) {
|
||||
// TODO: use the query API
|
||||
parentFilter = new QueryWrapperFilter(parentDocMapper.typeFilter());
|
||||
childFilter = new QueryWrapperFilter(childDocMapper.typeFilter());
|
||||
parentFilter = parentDocMapper.typeFilter();
|
||||
childFilter = childDocMapper.typeFilter();
|
||||
ParentChildIndexFieldData parentChildIndexFieldData = aggregationContext.searchContext().fieldData()
|
||||
.getForField(parentFieldMapper.fieldType());
|
||||
config.fieldContext(new FieldContext(parentFieldMapper.fieldType().names().indexName(), parentChildIndexFieldData,
|
||||
|
|
|
@ -44,4 +44,10 @@ public class FilterParser implements Aggregator.Parser {
|
|||
return new FilterAggregator.Factory(aggregationName, filter == null ? new MatchAllDocsQuery() : filter.query());
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -117,4 +117,10 @@ public class FiltersParser implements Aggregator.Parser {
|
|||
return new FiltersAggregator.Factory(aggregationName, filters, keyed, otherBucketKey);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -209,4 +209,10 @@ public class GeoHashGridParser implements Aggregator.Parser {
|
|||
|
||||
}
|
||||
}
|
||||
}
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
|
@ -41,4 +41,10 @@ public class GlobalParser implements Aggregator.Parser {
|
|||
return new GlobalAggregator.Factory(aggregationName);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -212,4 +212,10 @@ public class DateHistogramParser implements Aggregator.Parser {
|
|||
int beginIndex = offset.charAt(0) == '+' ? 1 : 0;
|
||||
return TimeValue.parseTimeValue(offset.substring(beginIndex), null, getClass().getSimpleName() + ".parseOffset").millis();
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -181,13 +181,16 @@ public class HistogramAggregator extends BucketsAggregator {
|
|||
|
||||
@Override
|
||||
protected Aggregator doCreateInternal(ValuesSource.Numeric valuesSource, AggregationContext aggregationContext, Aggregator parent,
|
||||
boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) throws IOException {
|
||||
boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
|
||||
throws IOException {
|
||||
if (collectsFromSingleBucket == false) {
|
||||
return asMultiBucketAggregator(this, aggregationContext, parent);
|
||||
}
|
||||
// we need to round the bounds given by the user and we have to do it for every aggregator we crate
|
||||
// we need to round the bounds given by the user and we have to do
|
||||
// it for every aggregator we create
|
||||
// as the rounding is not necessarily an idempotent operation.
|
||||
// todo we need to think of a better structure to the factory/agtor code so we won't need to do that
|
||||
// todo we need to think of a better structure to the factory/agtor
|
||||
// code so we won't need to do that
|
||||
ExtendedBounds roundedBounds = null;
|
||||
if (extendedBounds != null) {
|
||||
// we need to process & validate here using the parser
|
||||
|
|
|
@ -149,4 +149,10 @@ public class HistogramParser implements Aggregator.Parser {
|
|||
}
|
||||
return new InternalOrder.Aggregation(key, asc);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -59,4 +59,10 @@ public class MissingParser implements Aggregator.Parser {
|
|||
|
||||
return new MissingAggregator.Factory(aggregationName, vsParser.input());
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -66,4 +66,10 @@ public class NestedParser implements Aggregator.Parser {
|
|||
|
||||
return new NestedAggregator.Factory(aggregationName, path);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -60,4 +60,10 @@ public class ReverseNestedParser implements Aggregator.Parser {
|
|||
|
||||
return new ReverseNestedAggregator.Factory(aggregationName, path);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -112,4 +112,10 @@ public class RangeParser implements Aggregator.Parser {
|
|||
|
||||
return new RangeAggregator.Factory(aggregationName, vsParser.input(), InternalRange.FACTORY, ranges, keyed);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -117,4 +117,10 @@ public class DateRangeParser implements Aggregator.Parser {
|
|||
|
||||
return new RangeAggregator.Factory(aggregationName, vsParser.input(), InternalDateRange.FACTORY, ranges, keyed);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -244,4 +244,10 @@ public class GeoDistanceParser implements Aggregator.Parser {
|
|||
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
|
@ -139,4 +139,10 @@ public class IpRangeParser implements Aggregator.Parser {
|
|||
}
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -101,5 +101,10 @@ public class SamplerParser implements Aggregator.Parser {
|
|||
}
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -155,6 +155,12 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
|
|||
this.filter = filter;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doInit(AggregationContext context) {
|
||||
super.doInit(context);
|
||||
setFieldInfo();
|
||||
}
|
||||
|
||||
private void setFieldInfo() {
|
||||
if (!config.unmapped()) {
|
||||
this.indexedFieldName = config.fieldContext().field();
|
||||
|
@ -166,7 +172,6 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
|
|||
protected Aggregator createUnmapped(AggregationContext aggregationContext, Aggregator parent,
|
||||
List<PipelineAggregator> pipelineAggregators,
|
||||
Map<String, Object> metaData) throws IOException {
|
||||
setFieldInfo();
|
||||
final InternalAggregation aggregation = new UnmappedSignificantTerms(name, bucketCountThresholds.getRequiredSize(),
|
||||
bucketCountThresholds.getMinDocCount(), pipelineAggregators, metaData);
|
||||
return new NonCollectingAggregator(name, aggregationContext, parent, pipelineAggregators, metaData) {
|
||||
|
@ -181,7 +186,6 @@ public class SignificantTermsAggregatorFactory extends ValuesSourceAggregatorFac
|
|||
protected Aggregator doCreateInternal(ValuesSource valuesSource, AggregationContext aggregationContext, Aggregator parent,
|
||||
boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
|
||||
throws IOException {
|
||||
setFieldInfo();
|
||||
if (collectsFromSingleBucket == false) {
|
||||
return asMultiBucketAggregator(this, aggregationContext, parent);
|
||||
}
|
||||
|
|
|
@ -80,4 +80,10 @@ public class SignificantTermsParser implements Aggregator.Parser {
|
|||
return new SignificantTermsAggregatorFactory(aggregationName, vsParser.input(), bucketCountThresholds,
|
||||
aggParser.getIncludeExclude(), aggParser.getExecutionHint(), aggParser.getFilter(), significanceHeuristic);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -87,4 +87,10 @@ public class TermsParser implements Aggregator.Parser {
|
|||
return Order.aggregation(key, asc);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -37,4 +37,10 @@ public class AvgParser extends NumericValuesSourceMetricsAggregatorParser<Intern
|
|||
return new AvgAggregator.Factory(aggregationName, type(), input);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -76,4 +76,10 @@ public class CardinalityParser implements Aggregator.Parser {
|
|||
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -67,4 +67,10 @@ public class GeoBoundsParser implements Aggregator.Parser {
|
|||
return new GeoBoundsAggregator.Factory(aggregationName, vsParser.input(), wrapLongitude);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
|||
import org.elasticsearch.search.aggregations.support.AggregationContext;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSource;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceConfig;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
@ -123,7 +123,7 @@ public final class GeoCentroidAggregator extends MetricsAggregator {
|
|||
}
|
||||
|
||||
public static class Factory extends ValuesSourceAggregatorFactory.LeafOnly<ValuesSource.GeoPoint> {
|
||||
protected Factory(String name, ValuesSourceConfig<ValuesSource.GeoPoint> config) {
|
||||
protected Factory(String name, ValuesSourceParser.Input<ValuesSource.GeoPoint> config) {
|
||||
super(name, InternalGeoBounds.TYPE.name(), config);
|
||||
}
|
||||
|
||||
|
|
|
@ -58,6 +58,12 @@ public class GeoCentroidParser implements Aggregator.Parser {
|
|||
+ currentFieldName + "].", parser.getTokenLocation());
|
||||
}
|
||||
}
|
||||
return new GeoCentroidAggregator.Factory(aggregationName, vsParser.config());
|
||||
return new GeoCentroidAggregator.Factory(aggregationName, vsParser.input());
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,4 +37,10 @@ public class MaxParser extends NumericValuesSourceMetricsAggregatorParser<Intern
|
|||
return new MaxAggregator.Factory(aggregationName, input);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -36,4 +36,10 @@ public class MinParser extends NumericValuesSourceMetricsAggregatorParser<Intern
|
|||
protected AggregatorFactory createFactory(String aggregationName, ValuesSourceParser.Input<ValuesSource.Numeric> input) {
|
||||
return new MinAggregator.Factory(aggregationName, input);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,16 +49,16 @@ public abstract class AbstractPercentilesParser implements Aggregator.Parser {
|
|||
|
||||
@Override
|
||||
public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException {
|
||||
|
||||
|
||||
ValuesSourceParser<ValuesSource.Numeric> vsParser = ValuesSourceParser.numeric(aggregationName, InternalTDigestPercentiles.TYPE, context)
|
||||
.formattable(formattable).build();
|
||||
|
||||
|
||||
double[] keys = null;
|
||||
boolean keyed = true;
|
||||
Double compression = null;
|
||||
Integer numberOfSignificantValueDigits = null;
|
||||
PercentilesMethod method = null;
|
||||
|
||||
|
||||
XContentParser.Token token;
|
||||
String currentFieldName = null;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
|
@ -102,17 +102,17 @@ public abstract class AbstractPercentilesParser implements Aggregator.Parser {
|
|||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
currentFieldName = parser.currentName();
|
||||
} else if (token == XContentParser.Token.VALUE_NUMBER) {
|
||||
} else if (token == XContentParser.Token.VALUE_NUMBER) {
|
||||
if (context.parseFieldMatcher().match(currentFieldName, COMPRESSION_FIELD)) {
|
||||
compression = parser.doubleValue();
|
||||
} else {
|
||||
compression = parser.doubleValue();
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName
|
||||
+ "]: [" + currentFieldName + "].", parser.getTokenLocation());
|
||||
}
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: ["
|
||||
+ currentFieldName + "].", parser.getTokenLocation());
|
||||
}
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName + "]: ["
|
||||
+ currentFieldName + "].", parser.getTokenLocation());
|
||||
}
|
||||
}
|
||||
break;
|
||||
case HDR:
|
||||
|
@ -122,7 +122,7 @@ public abstract class AbstractPercentilesParser implements Aggregator.Parser {
|
|||
} else if (token == XContentParser.Token.VALUE_NUMBER) {
|
||||
if (context.parseFieldMatcher().match(currentFieldName, NUMBER_SIGNIFICANT_DIGITS_FIELD)) {
|
||||
numberOfSignificantValueDigits = parser.intValue();
|
||||
} else {
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unknown key for a " + token + " in [" + aggregationName
|
||||
+ "]: [" + currentFieldName + "].", parser.getTokenLocation());
|
||||
}
|
||||
|
|
|
@ -48,7 +48,7 @@ public class PercentileRanksParser extends AbstractPercentilesParser {
|
|||
protected ParseField keysField() {
|
||||
return VALUES_FIELD;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected AggregatorFactory buildFactory(SearchContext context, String aggregationName, ValuesSourceParser.Input<Numeric> valuesSourceInput,
|
||||
double[] keys, PercentilesMethod method, Double compression, Integer numberOfSignificantValueDigits, boolean keyed) {
|
||||
|
@ -65,4 +65,10 @@ public class PercentileRanksParser extends AbstractPercentilesParser {
|
|||
}
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -65,4 +65,10 @@ public class PercentilesParser extends AbstractPercentilesParser {
|
|||
}
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -35,7 +35,7 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
|
||||
public class ScriptedMetricParser implements Aggregator.Parser {
|
||||
|
||||
|
||||
public static final String INIT_SCRIPT = "init_script";
|
||||
public static final String MAP_SCRIPT = "map_script";
|
||||
public static final String COMBINE_SCRIPT = "combine_script";
|
||||
|
@ -143,11 +143,17 @@ public class ScriptedMetricParser implements Aggregator.Parser {
|
|||
reduceScript = new Script(scriptValue.script(), scriptValue.scriptType(), scriptParameterParser.lang(), reduceParams);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (mapScript == null) {
|
||||
throw new SearchParseException(context, "map_script field is required in [" + aggregationName + "].", parser.getTokenLocation());
|
||||
}
|
||||
return new ScriptedMetricAggregator.Factory(aggregationName, initScript, mapScript, combineScript, reduceScript, params);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -36,4 +36,10 @@ public class StatsParser extends NumericValuesSourceMetricsAggregatorParser<Inte
|
|||
protected AggregatorFactory createFactory(String aggregationName, ValuesSourceParser.Input<ValuesSource.Numeric> input) {
|
||||
return new StatsAggregator.Factory(aggregationName, input);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -79,4 +79,10 @@ public class ExtendedStatsParser implements Aggregator.Parser {
|
|||
|
||||
return createFactory(aggregationName, vsParser.input(), sigma);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,4 +36,10 @@ public class SumParser extends NumericValuesSourceMetricsAggregatorParser<Intern
|
|||
protected AggregatorFactory createFactory(String aggregationName, ValuesSourceParser.Input<ValuesSource.Numeric> input) {
|
||||
return new SumAggregator.Factory(aggregationName, input);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -138,4 +138,10 @@ public class TopHitsParser implements Aggregator.Parser {
|
|||
return new TopHitsAggregator.Factory(aggregationName, fetchPhase, subSearchContext);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -56,4 +56,10 @@ public class ValueCountParser implements Aggregator.Parser {
|
|||
|
||||
return new ValueCountAggregator.Factory(aggregationName, vsParser.input());
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public AggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,6 +69,12 @@ public abstract class PipelineAggregator implements Streamable {
|
|||
*/
|
||||
PipelineAggregatorFactory parse(String pipelineAggregatorName, XContentParser parser, SearchContext context) throws IOException;
|
||||
|
||||
/**
|
||||
* @return an empty {@link PipelineAggregatorFactory} instance for this
|
||||
* parser that can be used for deserialization
|
||||
*/
|
||||
PipelineAggregatorFactory getFactoryPrototype();
|
||||
|
||||
}
|
||||
|
||||
private String name;
|
||||
|
|
|
@ -18,17 +18,25 @@
|
|||
*/
|
||||
package org.elasticsearch.search.aggregations.pipeline;
|
||||
|
||||
import org.elasticsearch.action.support.ToXContentToBytes;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* A factory that knows how to create an {@link PipelineAggregator} of a
|
||||
* specific type.
|
||||
*/
|
||||
public abstract class PipelineAggregatorFactory {
|
||||
public abstract class PipelineAggregatorFactory extends ToXContentToBytes implements NamedWriteable<PipelineAggregatorFactory>, ToXContent {
|
||||
|
||||
protected String name;
|
||||
protected String type;
|
||||
|
@ -53,6 +61,10 @@ public abstract class PipelineAggregatorFactory {
|
|||
return name;
|
||||
}
|
||||
|
||||
public String type() {
|
||||
return type;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the state of this factory (makes sure the factory is properly
|
||||
* configured)
|
||||
|
@ -90,4 +102,100 @@ public abstract class PipelineAggregatorFactory {
|
|||
return bucketsPaths;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeString(name);
|
||||
out.writeStringArray(bucketsPaths);
|
||||
doWriteTo(out);
|
||||
out.writeMap(metaData);
|
||||
}
|
||||
|
||||
// NORELEASE make this abstract when agg refactor complete
|
||||
protected void doWriteTo(StreamOutput out) throws IOException {
|
||||
}
|
||||
|
||||
// NORELEASE remove this method when agg refactor complete
|
||||
@Override
|
||||
public String getWriteableName() {
|
||||
return type;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PipelineAggregatorFactory readFrom(StreamInput in) throws IOException {
|
||||
String name = in.readString();
|
||||
String[] bucketsPaths = in.readStringArray();
|
||||
PipelineAggregatorFactory factory = doReadFrom(name, bucketsPaths, in);
|
||||
factory.metaData = in.readMap();
|
||||
return factory;
|
||||
}
|
||||
|
||||
// NORELEASE make this abstract when agg refactor complete
|
||||
protected PipelineAggregatorFactory doReadFrom(String name, String[] bucketsPaths, StreamInput in) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(getName());
|
||||
|
||||
if (this.metaData != null) {
|
||||
builder.field("meta", this.metaData);
|
||||
}
|
||||
builder.startObject(type);
|
||||
|
||||
if (bucketsPaths != null) {
|
||||
builder.startArray(PipelineAggregator.Parser.BUCKETS_PATH.getPreferredName());
|
||||
for (String path : bucketsPaths) {
|
||||
builder.value(path);
|
||||
}
|
||||
builder.endArray();
|
||||
}
|
||||
|
||||
internalXContent(builder, params);
|
||||
|
||||
builder.endObject();
|
||||
|
||||
return builder.endObject();
|
||||
}
|
||||
|
||||
// NORELEASE make this method abstract when agg refactor complete
|
||||
protected XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(Arrays.hashCode(bucketsPaths), metaData, name, type, doHashCode());
|
||||
}
|
||||
|
||||
// NORELEASE make this method abstract here when agg refactor complete (so
|
||||
// that subclasses are forced to implement it)
|
||||
protected int doHashCode() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == null)
|
||||
return false;
|
||||
if (getClass() != obj.getClass())
|
||||
return false;
|
||||
PipelineAggregatorFactory other = (PipelineAggregatorFactory) obj;
|
||||
if (!Objects.equals(name, other.name))
|
||||
return false;
|
||||
if (!Objects.equals(type, other.type))
|
||||
return false;
|
||||
if (!Objects.deepEquals(bucketsPaths, other.bucketsPaths))
|
||||
return false;
|
||||
if (!Objects.equals(metaData, other.metaData))
|
||||
return false;
|
||||
return doEquals(obj);
|
||||
}
|
||||
|
||||
// NORELEASE make this method abstract here when agg refactor complete (so
|
||||
// that subclasses are forced to implement it)
|
||||
protected boolean doEquals(Object obj) {
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -37,4 +37,10 @@ public class AvgBucketParser extends BucketMetricsParser {
|
|||
ValueFormatter formatter, Map<String, Object> unparsedParams) {
|
||||
return new AvgBucketPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, gapPolicy, formatter);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public PipelineAggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,4 +39,10 @@ public class MaxBucketParser extends BucketMetricsParser {
|
|||
return new MaxBucketPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, gapPolicy, formatter);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public PipelineAggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -33,9 +33,16 @@ public class MinBucketParser extends BucketMetricsParser {
|
|||
return MinBucketPipelineAggregator.TYPE.name();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected PipelineAggregatorFactory buildFactory(String pipelineAggregatorName, String[] bucketsPaths, GapPolicy gapPolicy,
|
||||
ValueFormatter formatter, Map<String, Object> unparsedParams) {
|
||||
return new MinBucketPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, gapPolicy, formatter);
|
||||
};
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public PipelineAggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
package org.elasticsearch.search.aggregations.pipeline.bucketmetrics.percentile;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.pipeline.bucketmetrics.BucketMetricsParser;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormatter;
|
||||
|
@ -28,8 +29,6 @@ import java.text.ParseException;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
|
||||
|
||||
|
||||
public class PercentilesBucketParser extends BucketMetricsParser {
|
||||
|
||||
|
@ -69,4 +68,10 @@ public class PercentilesBucketParser extends BucketMetricsParser {
|
|||
|
||||
return new PercentilesBucketPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, gapPolicy, formatter, percents);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public PipelineAggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,4 +37,10 @@ public class StatsBucketParser extends BucketMetricsParser {
|
|||
ValueFormatter formatter, Map<String, Object> unparsedParams) {
|
||||
return new StatsBucketPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, gapPolicy, formatter);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public PipelineAggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -54,4 +54,10 @@ public class ExtendedStatsBucketParser extends BucketMetricsParser {
|
|||
}
|
||||
return new ExtendedStatsBucketPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, sigma, gapPolicy, formatter);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public PipelineAggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,4 +37,10 @@ public class SumBucketParser extends BucketMetricsParser {
|
|||
ValueFormatter formatter, Map<String, Object> unparsedParams) {
|
||||
return new SumBucketPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, gapPolicy, formatter);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public PipelineAggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -128,4 +128,10 @@ public class BucketScriptParser implements PipelineAggregator.Parser {
|
|||
return new BucketScriptPipelineAggregator.Factory(reducerName, bucketsPathsMap, script, formatter, gapPolicy);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public PipelineAggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -95,4 +95,10 @@ public class CumulativeSumParser implements PipelineAggregator.Parser {
|
|||
return new CumulativeSumPipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, formatter);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public PipelineAggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -118,4 +118,10 @@ public class DerivativeParser implements PipelineAggregator.Parser {
|
|||
return new DerivativePipelineAggregator.Factory(pipelineAggregatorName, bucketsPaths, formatter, gapPolicy, xAxisUnits);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public PipelineAggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -116,4 +116,10 @@ public class BucketSelectorParser implements PipelineAggregator.Parser {
|
|||
return new BucketSelectorPipelineAggregator.Factory(reducerName, bucketsPathsMap, script, gapPolicy);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public PipelineAggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -177,5 +177,10 @@ public class MovAvgParser implements PipelineAggregator.Parser {
|
|||
movAvgModel, minimize);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public PipelineAggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.search.aggregations.pipeline.serialdiff;
|
|||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.search.SearchParseException;
|
||||
import org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormat;
|
||||
|
@ -32,8 +33,6 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.search.aggregations.pipeline.BucketHelpers.GapPolicy;
|
||||
|
||||
public class SerialDiffParser implements PipelineAggregator.Parser {
|
||||
|
||||
public static final ParseField FORMAT = new ParseField("format");
|
||||
|
@ -113,4 +112,10 @@ public class SerialDiffParser implements PipelineAggregator.Parser {
|
|||
return new SerialDiffPipelineAggregator.Factory(reducerName, bucketsPaths, formatter, gapPolicy, lag);
|
||||
}
|
||||
|
||||
// NORELEASE implement this method when refactoring this aggregation
|
||||
@Override
|
||||
public PipelineAggregatorFactory getFactoryPrototype() {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -19,62 +19,64 @@
|
|||
|
||||
package org.elasticsearch.search.aggregations.support;
|
||||
|
||||
import org.elasticsearch.common.ParseField;
|
||||
import org.elasticsearch.common.ParseFieldMatcher;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.script.Script.ScriptField;
|
||||
import org.elasticsearch.script.ScriptParameterParser;
|
||||
import org.elasticsearch.script.ScriptParameterParser.ScriptParameterValue;
|
||||
import org.elasticsearch.search.SearchParseException;
|
||||
import org.elasticsearch.search.aggregations.Aggregator;
|
||||
import org.elasticsearch.search.aggregations.AggregatorFactory;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
import org.joda.time.DateTimeZone;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static com.google.common.collect.Maps.newHashMap;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public abstract class AbstractValuesSourceParser<VS extends ValuesSource> implements Aggregator.Parser {
|
||||
static final ParseField TIME_ZONE = new ParseField("time_zone");
|
||||
|
||||
public abstract static class AnyValuesSourceParser extends AbstractValuesSourceParser<ValuesSource> {
|
||||
|
||||
protected AnyValuesSourceParser(boolean scriptable, boolean formattable) {
|
||||
super(scriptable, formattable, ValuesSource.class, null);
|
||||
super(scriptable, formattable, false, ValuesSourceType.ANY, null);
|
||||
}
|
||||
}
|
||||
|
||||
public abstract static class NumericValuesSourceParser extends AbstractValuesSourceParser<ValuesSource.Numeric> {
|
||||
|
||||
protected NumericValuesSourceParser(boolean scriptable, boolean formattable) {
|
||||
super(scriptable, formattable, ValuesSource.Numeric.class, ValueType.NUMERIC);
|
||||
protected NumericValuesSourceParser(boolean scriptable, boolean formattable, boolean timezoneAware) {
|
||||
super(scriptable, formattable, timezoneAware, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
|
||||
}
|
||||
}
|
||||
|
||||
public abstract static class BytesValuesSourceParser extends AbstractValuesSourceParser<ValuesSource.Bytes> {
|
||||
|
||||
protected BytesValuesSourceParser(boolean scriptable, boolean formattable) {
|
||||
super(scriptable, formattable, ValuesSource.Bytes.class, ValueType.STRING);
|
||||
super(scriptable, formattable, false, ValuesSourceType.BYTES, ValueType.STRING);
|
||||
}
|
||||
}
|
||||
|
||||
public abstract static class GeoPointValuesSourceParser extends AbstractValuesSourceParser<ValuesSource.GeoPoint> {
|
||||
|
||||
protected GeoPointValuesSourceParser(boolean scriptable, boolean formattable) {
|
||||
super(scriptable, formattable, ValuesSource.GeoPoint.class, ValueType.GEOPOINT);
|
||||
super(scriptable, formattable, false, ValuesSourceType.GEOPOINT, ValueType.GEOPOINT);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean scriptable = true;
|
||||
private boolean formattable = false;
|
||||
private Class<VS> valuesSourceType = null;
|
||||
private boolean timezoneAware = false;
|
||||
private ValuesSourceType valuesSourceType = null;
|
||||
private ValueType targetValueType = null;
|
||||
private ScriptParameterParser scriptParameterParser = new ScriptParameterParser();
|
||||
|
||||
private AbstractValuesSourceParser(boolean scriptable,
|
||||
boolean formattable, Class<VS> valuesSourceType, ValueType targetValueType) {
|
||||
private AbstractValuesSourceParser(boolean scriptable, boolean formattable, boolean timezoneAware, ValuesSourceType valuesSourceType,
|
||||
ValueType targetValueType) {
|
||||
this.timezoneAware = timezoneAware;
|
||||
this.valuesSourceType = valuesSourceType;
|
||||
this.targetValueType = targetValueType;
|
||||
this.scriptable = scriptable;
|
||||
|
@ -82,15 +84,15 @@ public abstract class AbstractValuesSourceParser<VS extends ValuesSource> implem
|
|||
}
|
||||
|
||||
@Override
|
||||
public AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException {
|
||||
public final AggregatorFactory parse(String aggregationName, XContentParser parser, SearchContext context) throws IOException {
|
||||
|
||||
String field = null;
|
||||
Script script = null;
|
||||
@Deprecated
|
||||
Map<String, Object> params = null; // TODO Remove in 3.0
|
||||
ValueType valueType = null;
|
||||
String format = null;
|
||||
Object missing = null;
|
||||
DateTimeZone timezone = null;
|
||||
Map<ParseField, Object> otherOptions = new HashMap<>();
|
||||
|
||||
XContentParser.Token token;
|
||||
String currentFieldName = null;
|
||||
|
@ -99,6 +101,15 @@ public abstract class AbstractValuesSourceParser<VS extends ValuesSource> implem
|
|||
currentFieldName = parser.currentName();
|
||||
} else if ("missing".equals(currentFieldName) && token.isValue()) {
|
||||
missing = parser.objectText();
|
||||
} else if (timezoneAware && context.parseFieldMatcher().match(currentFieldName, TIME_ZONE)) {
|
||||
if (token == XContentParser.Token.VALUE_STRING) {
|
||||
timezone = DateTimeZone.forID(parser.text());
|
||||
} else if (token == XContentParser.Token.VALUE_NUMBER) {
|
||||
timezone = DateTimeZone.forOffsetHours(parser.intValue());
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unexpected token " + token + " [" + currentFieldName + "] in ["
|
||||
+ aggregationName + "].", parser.getTokenLocation());
|
||||
}
|
||||
} else if (token == XContentParser.Token.VALUE_STRING) {
|
||||
if ("field".equals(currentFieldName)) {
|
||||
field = parser.text();
|
||||
|
@ -110,54 +121,43 @@ public abstract class AbstractValuesSourceParser<VS extends ValuesSource> implem
|
|||
if (targetValueType != null && valueType.isNotA(targetValueType)) {
|
||||
throw new SearchParseException(context, type() + " aggregation [" + aggregationName
|
||||
+ "] was configured with an incompatible value type [" + valueType + "]. [" + type()
|
||||
+ "] aggregation can only work on value of type [" + targetValueType + "]",
|
||||
parser.getTokenLocation());
|
||||
+ "] aggregation can only work on value of type [" + targetValueType + "]", parser.getTokenLocation());
|
||||
}
|
||||
} else if (!scriptParameterParser.token(currentFieldName, token, parser, context.parseFieldMatcher())) {
|
||||
throw new SearchParseException(context, "Unexpected token " + token + " in [" + aggregationName + "].",
|
||||
parser.getTokenLocation());
|
||||
} else if (!token(aggregationName, currentFieldName, token, parser, context.parseFieldMatcher(), otherOptions)) {
|
||||
throw new SearchParseException(context, "Unexpected token " + token + " [" + currentFieldName + "] in ["
|
||||
+ aggregationName + "].", parser.getTokenLocation());
|
||||
}
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unexpected token " + token + " in [" + aggregationName + "].",
|
||||
parser.getTokenLocation());
|
||||
} else if (!token(aggregationName, currentFieldName, token, parser, context.parseFieldMatcher(), otherOptions)) {
|
||||
throw new SearchParseException(context, "Unexpected token " + token + " [" + currentFieldName + "] in ["
|
||||
+ aggregationName + "].", parser.getTokenLocation());
|
||||
}
|
||||
} else if (scriptable && token == XContentParser.Token.START_OBJECT) {
|
||||
if (context.parseFieldMatcher().match(currentFieldName, ScriptField.SCRIPT)) {
|
||||
script = Script.parse(parser, context.parseFieldMatcher());
|
||||
} else if ("params".equals(currentFieldName)) {
|
||||
params = parser.map();
|
||||
} else {
|
||||
throw new SearchParseException(context, "Unexpected token " + token + " in [" + aggregationName + "].",
|
||||
parser.getTokenLocation());
|
||||
} else if (!token(aggregationName, currentFieldName, token, parser, context.parseFieldMatcher(), otherOptions)) {
|
||||
throw new SearchParseException(context, "Unexpected token " + token + " [" + currentFieldName + "] in ["
|
||||
+ aggregationName + "].", parser.getTokenLocation());
|
||||
}
|
||||
} else if (!token(currentFieldName, token, parser)) {
|
||||
throw new SearchParseException(context, "Unexpected token " + token + " in [" + aggregationName + "].",
|
||||
parser.getTokenLocation());
|
||||
} else if (!token(aggregationName, currentFieldName, token, parser, context.parseFieldMatcher(), otherOptions)) {
|
||||
throw new SearchParseException(context, "Unexpected token " + token + " [" + currentFieldName + "] in [" + aggregationName
|
||||
+ "].", parser.getTokenLocation());
|
||||
}
|
||||
}
|
||||
|
||||
if (script == null) { // Didn't find anything using the new API so
|
||||
// try using the old one instead
|
||||
ScriptParameterValue scriptValue = scriptParameterParser.getDefaultScriptParameterValue();
|
||||
if (scriptValue != null) {
|
||||
if (params == null) {
|
||||
params = newHashMap();
|
||||
}
|
||||
script = new Script(scriptValue.script(), scriptValue.scriptType(), scriptParameterParser.lang(), params);
|
||||
}
|
||||
}
|
||||
|
||||
ValuesSourceAggregatorFactory<VS> factory = createFactory(aggregationName, this.valuesSourceType, this.targetValueType);
|
||||
ValuesSourceAggregatorFactory<VS> factory = createFactory(aggregationName, this.valuesSourceType, this.targetValueType,
|
||||
otherOptions);
|
||||
factory.field(field);
|
||||
factory.script(script);
|
||||
factory.valueType(valueType);
|
||||
factory.format(format);
|
||||
factory.missing(missing);
|
||||
factory.timeZone(timezone);
|
||||
return factory;
|
||||
}
|
||||
|
||||
protected abstract ValuesSourceAggregatorFactory<VS> createFactory(String aggregationName, Class<VS> valuesSourceType,
|
||||
ValueType targetValueType);
|
||||
protected abstract ValuesSourceAggregatorFactory<VS> createFactory(String aggregationName, ValuesSourceType valuesSourceType,
|
||||
ValueType targetValueType, Map<ParseField, Object> otherOptions);
|
||||
|
||||
protected abstract boolean token(String currentFieldName, XContentParser.Token token, XContentParser parser) throws IOException;
|
||||
protected abstract boolean token(String aggregationName, String currentFieldName, XContentParser.Token token, XContentParser parser,
|
||||
ParseFieldMatcher parseFieldMatcher, Map<ParseField, Object> otherOptions) throws IOException;
|
||||
}
|
||||
|
|
|
@ -70,13 +70,11 @@ public class AggregationContext {
|
|||
if (config.missing == null) {
|
||||
// otherwise we will have values because of the missing value
|
||||
vs = null;
|
||||
} else if (ValuesSource.Numeric.class.isAssignableFrom(config.valueSourceType)) {
|
||||
} else if (config.valueSourceType == ValuesSourceType.NUMERIC) {
|
||||
vs = (VS) ValuesSource.Numeric.EMPTY;
|
||||
} else if (ValuesSource.GeoPoint.class.isAssignableFrom(config.valueSourceType)) {
|
||||
} else if (config.valueSourceType == ValuesSourceType.GEOPOINT) {
|
||||
vs = (VS) ValuesSource.GeoPoint.EMPTY;
|
||||
} else if (ValuesSource.class.isAssignableFrom(config.valueSourceType)
|
||||
|| ValuesSource.Bytes.class.isAssignableFrom(config.valueSourceType)
|
||||
|| ValuesSource.Bytes.WithOrdinals.class.isAssignableFrom(config.valueSourceType)) {
|
||||
} else if (config.valueSourceType == ValuesSourceType.ANY || config.valueSourceType == ValuesSourceType.BYTES) {
|
||||
vs = (VS) ValuesSource.Bytes.EMPTY;
|
||||
} else {
|
||||
throw new SearchParseException(searchContext, "Can't deal with unmapped ValuesSource type " + config.valueSourceType, null);
|
||||
|
@ -132,19 +130,20 @@ public class AggregationContext {
|
|||
*/
|
||||
private <VS extends ValuesSource> VS originalValuesSource(ValuesSourceConfig<VS> config) throws IOException {
|
||||
if (config.fieldContext == null) {
|
||||
if (ValuesSource.Numeric.class.isAssignableFrom(config.valueSourceType)) {
|
||||
if (config.valueSourceType == ValuesSourceType.NUMERIC) {
|
||||
return (VS) numericScript(config);
|
||||
}
|
||||
if (ValuesSource.Bytes.class.isAssignableFrom(config.valueSourceType)) {
|
||||
if (config.valueSourceType == ValuesSourceType.BYTES) {
|
||||
return (VS) bytesScript(config);
|
||||
}
|
||||
throw new AggregationExecutionException("value source of type [" + config.valueSourceType.getSimpleName() + "] is not supported by scripts");
|
||||
throw new AggregationExecutionException("value source of type [" + config.valueSourceType.name()
|
||||
+ "] is not supported by scripts");
|
||||
}
|
||||
|
||||
if (ValuesSource.Numeric.class.isAssignableFrom(config.valueSourceType)) {
|
||||
if (config.valueSourceType == ValuesSourceType.NUMERIC) {
|
||||
return (VS) numericField(config);
|
||||
}
|
||||
if (ValuesSource.GeoPoint.class.isAssignableFrom(config.valueSourceType)) {
|
||||
if (config.valueSourceType == ValuesSourceType.GEOPOINT) {
|
||||
return (VS) geoPointField(config);
|
||||
}
|
||||
// falling back to bytes values
|
||||
|
|
|
@ -19,26 +19,33 @@
|
|||
|
||||
package org.elasticsearch.search.aggregations.support;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldData;
|
||||
import org.elasticsearch.index.fielddata.IndexGeoPointFieldData;
|
||||
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
|
||||
import org.elasticsearch.search.aggregations.support.format.ValueFormat;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public enum ValueType {
|
||||
public enum ValueType implements Writeable<ValueType> {
|
||||
|
||||
@Deprecated
|
||||
ANY("any", ValuesSource.class, IndexFieldData.class, ValueFormat.RAW), STRING("string", ValuesSource.Bytes.class, IndexFieldData.class,
|
||||
ANY((byte) 0, "any", ValuesSourceType.ANY, IndexFieldData.class, ValueFormat.RAW), STRING((byte) 1, "string", ValuesSourceType.BYTES,
|
||||
IndexFieldData.class,
|
||||
ValueFormat.RAW),
|
||||
LONG("byte|short|integer|long", ValuesSource.Numeric.class, IndexNumericFieldData.class, ValueFormat.RAW) {
|
||||
LONG((byte) 2, "byte|short|integer|long", ValuesSourceType.NUMERIC,
|
||||
IndexNumericFieldData.class, ValueFormat.RAW) {
|
||||
@Override
|
||||
public boolean isNumeric() {
|
||||
return true;
|
||||
}
|
||||
},
|
||||
DOUBLE("float|double", ValuesSource.Numeric.class, IndexNumericFieldData.class, ValueFormat.RAW) {
|
||||
DOUBLE((byte) 3, "float|double", ValuesSourceType.NUMERIC, IndexNumericFieldData.class, ValueFormat.RAW) {
|
||||
@Override
|
||||
public boolean isNumeric() {
|
||||
return true;
|
||||
|
@ -49,31 +56,31 @@ public enum ValueType {
|
|||
return true;
|
||||
}
|
||||
},
|
||||
NUMBER("number", ValuesSource.Numeric.class, IndexNumericFieldData.class, ValueFormat.RAW) {
|
||||
NUMBER((byte) 4, "number", ValuesSourceType.NUMERIC, IndexNumericFieldData.class, ValueFormat.RAW) {
|
||||
@Override
|
||||
public boolean isNumeric() {
|
||||
return true;
|
||||
}
|
||||
},
|
||||
DATE("date", ValuesSource.Numeric.class, IndexNumericFieldData.class, ValueFormat.DateTime.DEFAULT) {
|
||||
DATE((byte) 5, "date", ValuesSourceType.NUMERIC, IndexNumericFieldData.class, ValueFormat.DateTime.DEFAULT) {
|
||||
@Override
|
||||
public boolean isNumeric() {
|
||||
return true;
|
||||
}
|
||||
},
|
||||
IP("ip", ValuesSource.Numeric.class, IndexNumericFieldData.class, ValueFormat.IPv4) {
|
||||
IP((byte) 6, "ip", ValuesSourceType.NUMERIC, IndexNumericFieldData.class, ValueFormat.IPv4) {
|
||||
@Override
|
||||
public boolean isNumeric() {
|
||||
return true;
|
||||
}
|
||||
},
|
||||
NUMERIC("numeric", ValuesSource.Numeric.class, IndexNumericFieldData.class, ValueFormat.RAW) {
|
||||
NUMERIC((byte) 7, "numeric", ValuesSourceType.NUMERIC, IndexNumericFieldData.class, ValueFormat.RAW) {
|
||||
@Override
|
||||
public boolean isNumeric() {
|
||||
return true;
|
||||
}
|
||||
},
|
||||
GEOPOINT("geo_point", ValuesSource.GeoPoint.class, IndexGeoPointFieldData.class, ValueFormat.RAW) {
|
||||
GEOPOINT((byte) 8, "geo_point", ValuesSourceType.GEOPOINT, IndexGeoPointFieldData.class, ValueFormat.RAW) {
|
||||
@Override
|
||||
public boolean isGeoPoint() {
|
||||
return true;
|
||||
|
@ -81,11 +88,14 @@ public enum ValueType {
|
|||
};
|
||||
|
||||
final String description;
|
||||
final Class<? extends ValuesSource> valuesSourceType;
|
||||
final ValuesSourceType valuesSourceType;
|
||||
final Class<? extends IndexFieldData> fieldDataType;
|
||||
final ValueFormat defaultFormat;
|
||||
private final byte id;
|
||||
|
||||
private ValueType(String description, Class<? extends ValuesSource> valuesSourceType, Class<? extends IndexFieldData> fieldDataType, ValueFormat defaultFormat) {
|
||||
private ValueType(byte id, String description, ValuesSourceType valuesSourceType, Class<? extends IndexFieldData> fieldDataType,
|
||||
ValueFormat defaultFormat) {
|
||||
this.id = id;
|
||||
this.description = description;
|
||||
this.valuesSourceType = valuesSourceType;
|
||||
this.fieldDataType = fieldDataType;
|
||||
|
@ -96,7 +106,7 @@ public enum ValueType {
|
|||
return description;
|
||||
}
|
||||
|
||||
public Class<? extends ValuesSource> getValuesSourceType() {
|
||||
public ValuesSourceType getValuesSourceType() {
|
||||
return valuesSourceType;
|
||||
}
|
||||
|
||||
|
@ -105,7 +115,7 @@ public enum ValueType {
|
|||
}
|
||||
|
||||
public boolean isA(ValueType valueType) {
|
||||
return valueType.valuesSourceType.isAssignableFrom(valuesSourceType) &&
|
||||
return valueType.valuesSourceType == valuesSourceType &&
|
||||
valueType.fieldDataType.isAssignableFrom(fieldDataType);
|
||||
}
|
||||
|
||||
|
@ -149,4 +159,20 @@ public enum ValueType {
|
|||
public String toString() {
|
||||
return description;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ValueType readFrom(StreamInput in) throws IOException {
|
||||
byte id = in.readByte();
|
||||
for (ValueType valueType : values()) {
|
||||
if (id == valueType.id) {
|
||||
return valueType;
|
||||
}
|
||||
}
|
||||
throw new IOException("No valueType found for id [" + id + "]");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeByte(id);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,9 @@
|
|||
package org.elasticsearch.search.aggregations.support;
|
||||
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldData;
|
||||
import org.elasticsearch.index.fielddata.IndexGeoPointFieldData;
|
||||
import org.elasticsearch.index.fielddata.IndexNumericFieldData;
|
||||
|
@ -43,6 +46,7 @@ import org.joda.time.DateTimeZone;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -55,7 +59,7 @@ public abstract class ValuesSourceAggregatorFactory<VS extends ValuesSource> ext
|
|||
super(name, type, input);
|
||||
}
|
||||
|
||||
protected LeafOnly(String name, String type, Class<VS> valuesSourceType, ValueType targetValueType) {
|
||||
protected LeafOnly(String name, String type, ValuesSourceType valuesSourceType, ValueType targetValueType) {
|
||||
super(name, type, valuesSourceType, targetValueType);
|
||||
}
|
||||
|
||||
|
@ -65,15 +69,15 @@ public abstract class ValuesSourceAggregatorFactory<VS extends ValuesSource> ext
|
|||
}
|
||||
}
|
||||
|
||||
private final Class<VS> valuesSourceType;
|
||||
private final ValuesSourceType valuesSourceType;
|
||||
private final ValueType targetValueType;
|
||||
private String field = null;
|
||||
private Script script = null;
|
||||
private ValueType valueType = null;
|
||||
private String format = null;
|
||||
private Object missing = null;
|
||||
protected ValuesSourceConfig<VS> config;
|
||||
private DateTimeZone timeZone;
|
||||
protected ValuesSourceConfig<VS> config;
|
||||
|
||||
// NORELEASE remove this method when aggs refactoring complete
|
||||
/**
|
||||
|
@ -94,36 +98,98 @@ public abstract class ValuesSourceAggregatorFactory<VS extends ValuesSource> ext
|
|||
this.timeZone = input.timezone;
|
||||
}
|
||||
|
||||
protected ValuesSourceAggregatorFactory(String name, String type, Class<VS> valuesSourceType, ValueType targetValueType) {
|
||||
protected ValuesSourceAggregatorFactory(String name, String type, ValuesSourceType valuesSourceType, ValueType targetValueType) {
|
||||
super(name, type);
|
||||
this.valuesSourceType = valuesSourceType;
|
||||
this.targetValueType = targetValueType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the field to use for this aggregation.
|
||||
*/
|
||||
public void field(String field) {
|
||||
this.field = field;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the field to use for this aggregation.
|
||||
*/
|
||||
public String field() {
|
||||
return field;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the script to use for this aggregation.
|
||||
*/
|
||||
public void script(Script script) {
|
||||
this.script = script;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the script to use for this aggregation.
|
||||
*/
|
||||
public Script script() {
|
||||
return script;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the {@link ValueType} for the value produced by this aggregation
|
||||
*/
|
||||
public void valueType(ValueType valueType) {
|
||||
this.valueType = valueType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the {@link ValueType} for the value produced by this aggregation
|
||||
*/
|
||||
public ValueType valueType() {
|
||||
return valueType;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the format to use for the output of the aggregation.
|
||||
*/
|
||||
public void format(String format) {
|
||||
this.format = format;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the format to use for the output of the aggregation.
|
||||
*/
|
||||
public String format() {
|
||||
return format;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the value to use when the aggregation finds a missing value in a
|
||||
* document
|
||||
*/
|
||||
public void missing(Object missing) {
|
||||
this.missing = missing;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the value to use when the aggregation finds a missing value in a
|
||||
* document
|
||||
*/
|
||||
public Object missing() {
|
||||
return missing;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the time zone to use for this aggregation
|
||||
*/
|
||||
public void timeZone(DateTimeZone timeZone) {
|
||||
this.timeZone = timeZone;
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the time zone to use for this aggregation
|
||||
*/
|
||||
public DateTimeZone timeZone() {
|
||||
return timeZone;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doInit(AggregationContext context) {
|
||||
this.config = config(context);
|
||||
|
@ -153,17 +219,17 @@ public abstract class ValuesSourceAggregatorFactory<VS extends ValuesSource> ext
|
|||
|
||||
if (field == null) {
|
||||
if (script == null) {
|
||||
ValuesSourceConfig<VS> config = new ValuesSourceConfig(ValuesSource.class);
|
||||
ValuesSourceConfig<VS> config = new ValuesSourceConfig(ValuesSourceType.ANY);
|
||||
config.format = resolveFormat(null, valueType);
|
||||
return config;
|
||||
}
|
||||
Class valuesSourceType = valueType != null ? (Class<VS>) valueType.getValuesSourceType() : this.valuesSourceType;
|
||||
if (valuesSourceType == null || valuesSourceType == ValuesSource.class) {
|
||||
ValuesSourceType valuesSourceType = valueType != null ? valueType.getValuesSourceType() : this.valuesSourceType;
|
||||
if (valuesSourceType == null || valuesSourceType == ValuesSourceType.ANY) {
|
||||
// the specific value source type is undefined, but for scripts,
|
||||
// we need to have a specific value source
|
||||
// type to know how to handle the script values, so we fallback
|
||||
// on Bytes
|
||||
valuesSourceType = ValuesSource.Bytes.class;
|
||||
valuesSourceType = ValuesSourceType.BYTES;
|
||||
}
|
||||
ValuesSourceConfig<VS> config = new ValuesSourceConfig<VS>(valuesSourceType);
|
||||
config.missing = missing;
|
||||
|
@ -175,7 +241,7 @@ public abstract class ValuesSourceAggregatorFactory<VS extends ValuesSource> ext
|
|||
|
||||
MappedFieldType fieldType = context.searchContext().smartNameFieldTypeFromAnyType(field);
|
||||
if (fieldType == null) {
|
||||
Class<VS> valuesSourceType = valueType != null ? (Class<VS>) valueType.getValuesSourceType() : this.valuesSourceType;
|
||||
ValuesSourceType valuesSourceType = valueType != null ? valueType.getValuesSourceType() : this.valuesSourceType;
|
||||
ValuesSourceConfig<VS> config = new ValuesSourceConfig<>(valuesSourceType);
|
||||
config.missing = missing;
|
||||
config.format = resolveFormat(format, valueType);
|
||||
|
@ -190,13 +256,13 @@ public abstract class ValuesSourceAggregatorFactory<VS extends ValuesSource> ext
|
|||
IndexFieldData<?> indexFieldData = context.searchContext().fieldData().getForField(fieldType);
|
||||
|
||||
ValuesSourceConfig config;
|
||||
if (valuesSourceType == ValuesSource.class) {
|
||||
if (valuesSourceType == ValuesSourceType.ANY) {
|
||||
if (indexFieldData instanceof IndexNumericFieldData) {
|
||||
config = new ValuesSourceConfig<>(ValuesSource.Numeric.class);
|
||||
config = new ValuesSourceConfig<>(ValuesSourceType.NUMERIC);
|
||||
} else if (indexFieldData instanceof IndexGeoPointFieldData) {
|
||||
config = new ValuesSourceConfig<>(ValuesSource.GeoPoint.class);
|
||||
config = new ValuesSourceConfig<>(ValuesSourceType.GEOPOINT);
|
||||
} else {
|
||||
config = new ValuesSourceConfig<>(ValuesSource.Bytes.class);
|
||||
config = new ValuesSourceConfig<>(ValuesSourceType.BYTES);
|
||||
}
|
||||
} else {
|
||||
config = new ValuesSourceConfig(valuesSourceType);
|
||||
|
@ -248,13 +314,14 @@ public abstract class ValuesSourceAggregatorFactory<VS extends ValuesSource> ext
|
|||
boolean collectsFromSingleBucket, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData)
|
||||
throws IOException;
|
||||
|
||||
private void resolveValuesSourceConfigFromAncestors(String aggName, AggregatorFactory parent, Class<VS> requiredValuesSourceType) {
|
||||
private void resolveValuesSourceConfigFromAncestors(String aggName, AggregatorFactory parent, ValuesSourceType requiredValuesSourceType) {
|
||||
ValuesSourceConfig config;
|
||||
while (parent != null) {
|
||||
if (parent instanceof ValuesSourceAggregatorFactory) {
|
||||
config = ((ValuesSourceAggregatorFactory) parent).config;
|
||||
if (config != null && config.valid()) {
|
||||
if (requiredValuesSourceType == null || requiredValuesSourceType.isAssignableFrom(config.valueSourceType)) {
|
||||
if (requiredValuesSourceType == null || requiredValuesSourceType == ValuesSourceType.ANY
|
||||
|| requiredValuesSourceType == config.valueSourceType) {
|
||||
ValueFormat format = config.format;
|
||||
this.config = config;
|
||||
// if the user explicitly defined a format pattern,
|
||||
|
@ -271,4 +338,136 @@ public abstract class ValuesSourceAggregatorFactory<VS extends ValuesSource> ext
|
|||
}
|
||||
throw new AggregationExecutionException("could not find the appropriate value context to perform aggregation [" + aggName + "]");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doWriteTo(StreamOutput out) throws IOException {
|
||||
valuesSourceType.writeTo(out);
|
||||
boolean hasTargetValueType = targetValueType != null;
|
||||
out.writeBoolean(hasTargetValueType);
|
||||
if (hasTargetValueType) {
|
||||
targetValueType.writeTo(out);
|
||||
}
|
||||
innerWriteTo(out);
|
||||
out.writeOptionalString(field);
|
||||
boolean hasScript = script != null;
|
||||
out.writeBoolean(hasScript);
|
||||
if (hasScript) {
|
||||
script.writeTo(out);
|
||||
}
|
||||
boolean hasValueType = valueType != null;
|
||||
out.writeBoolean(hasValueType);
|
||||
if (hasValueType) {
|
||||
valueType.writeTo(out);
|
||||
}
|
||||
out.writeOptionalString(format);
|
||||
out.writeGenericValue(missing);
|
||||
boolean hasTimeZone = timeZone != null;
|
||||
out.writeBoolean(hasTimeZone);
|
||||
if (hasTimeZone) {
|
||||
out.writeString(timeZone.getID());
|
||||
}
|
||||
}
|
||||
|
||||
// NORELEASE make this abstract when agg refactor complete
|
||||
protected void innerWriteTo(StreamOutput out) throws IOException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ValuesSourceAggregatorFactory<VS> doReadFrom(String name, StreamInput in) throws IOException {
|
||||
ValuesSourceType valuesSourceType = ValuesSourceType.ANY.readFrom(in);
|
||||
ValueType targetValueType = null;
|
||||
if (in.readBoolean()) {
|
||||
targetValueType = ValueType.STRING.readFrom(in);
|
||||
}
|
||||
ValuesSourceAggregatorFactory<VS> factory = innerReadFrom(name, valuesSourceType, targetValueType, in);
|
||||
factory.field = in.readOptionalString();
|
||||
if (in.readBoolean()) {
|
||||
factory.script = Script.readScript(in);
|
||||
}
|
||||
if (in.readBoolean()) {
|
||||
factory.valueType = ValueType.STRING.readFrom(in);
|
||||
}
|
||||
factory.format = in.readOptionalString();
|
||||
factory.missing = in.readGenericValue();
|
||||
if (in.readBoolean()) {
|
||||
factory.timeZone = DateTimeZone.forID(in.readString());
|
||||
}
|
||||
return factory;
|
||||
}
|
||||
|
||||
// NORELEASE make this abstract when agg refactor complete
|
||||
protected ValuesSourceAggregatorFactory<VS> innerReadFrom(String name, ValuesSourceType valuesSourceType, ValueType targetValueType,
|
||||
StreamInput in) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final XContentBuilder internalXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
if (field != null) {
|
||||
builder.field("field", field);
|
||||
}
|
||||
if (script != null) {
|
||||
builder.field("script", script);
|
||||
}
|
||||
if (missing != null) {
|
||||
builder.field("missing", missing);
|
||||
}
|
||||
if (format != null) {
|
||||
builder.field("format", format);
|
||||
}
|
||||
if (timeZone != null) {
|
||||
builder.field("time_zone", timeZone);
|
||||
}
|
||||
doXContentBody(builder, params);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
||||
// NORELEASE make this abstract when agg refactor complete
|
||||
protected XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
|
||||
return builder;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int doHashCode() {
|
||||
return Objects.hash(field, format, missing, script, targetValueType, timeZone, valueType, valuesSourceType,
|
||||
innerHashCode());
|
||||
}
|
||||
|
||||
// NORELEASE make this method abstract here when agg refactor complete (so
|
||||
// that subclasses are forced to implement it)
|
||||
protected int innerHashCode() {
|
||||
throw new UnsupportedOperationException(
|
||||
"This method should be implemented by a sub-class and should not rely on this method. When agg re-factoring is complete this method will be made abstract.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean doEquals(Object obj) {
|
||||
ValuesSourceAggregatorFactory<?> other = (ValuesSourceAggregatorFactory<?>) obj;
|
||||
if (!Objects.equals(field, other.field))
|
||||
return false;
|
||||
if (!Objects.equals(format, other.format))
|
||||
return false;
|
||||
if (!Objects.equals(missing, other.missing))
|
||||
return false;
|
||||
if (!Objects.equals(script, other.script))
|
||||
return false;
|
||||
if (!Objects.equals(targetValueType, other.targetValueType))
|
||||
return false;
|
||||
if (!Objects.equals(timeZone, other.timeZone))
|
||||
return false;
|
||||
if (!Objects.equals(valueType, other.valueType))
|
||||
return false;
|
||||
if (!Objects.equals(valuesSourceType, other.valuesSourceType))
|
||||
return false;
|
||||
return innerEquals(obj);
|
||||
}
|
||||
|
||||
// NORELEASE make this method abstract here when agg refactor complete (so
|
||||
// that subclasses are forced to implement it)
|
||||
protected boolean innerEquals(Object obj) {
|
||||
throw new UnsupportedOperationException(
|
||||
"This method should be implemented by a sub-class and should not rely on this method. When agg re-factoring is complete this method will be made abstract.");
|
||||
}
|
||||
}
|
|
@ -28,7 +28,7 @@ import org.elasticsearch.search.aggregations.support.format.ValueParser;
|
|||
*/
|
||||
public class ValuesSourceConfig<VS extends ValuesSource> {
|
||||
|
||||
final Class<VS> valueSourceType;
|
||||
final ValuesSourceType valueSourceType;
|
||||
FieldContext fieldContext;
|
||||
SearchScript script;
|
||||
ValueType scriptValueType;
|
||||
|
@ -37,11 +37,11 @@ public class ValuesSourceConfig<VS extends ValuesSource> {
|
|||
ValueFormat format = ValueFormat.RAW;
|
||||
Object missing;
|
||||
|
||||
public ValuesSourceConfig(Class<VS> valueSourceType) {
|
||||
public ValuesSourceConfig(ValuesSourceType valueSourceType) {
|
||||
this.valueSourceType = valueSourceType;
|
||||
}
|
||||
|
||||
public Class<VS> valueSourceType() {
|
||||
public ValuesSourceType valueSourceType() {
|
||||
return valueSourceType;
|
||||
}
|
||||
|
||||
|
|
|
@ -46,19 +46,21 @@ public class ValuesSourceParser<VS extends ValuesSource> {
|
|||
static final ParseField TIME_ZONE = new ParseField("time_zone");
|
||||
|
||||
public static Builder any(String aggName, InternalAggregation.Type aggType, SearchContext context) {
|
||||
return new Builder<>(aggName, aggType, context, ValuesSource.class);
|
||||
return new Builder<>(aggName, aggType, context, ValuesSource.class, ValuesSourceType.ANY);
|
||||
}
|
||||
|
||||
public static Builder<ValuesSource.Numeric> numeric(String aggName, InternalAggregation.Type aggType, SearchContext context) {
|
||||
return new Builder<>(aggName, aggType, context, ValuesSource.Numeric.class).targetValueType(ValueType.NUMERIC);
|
||||
return new Builder<>(aggName, aggType, context, ValuesSource.Numeric.class, ValuesSourceType.NUMERIC)
|
||||
.targetValueType(ValueType.NUMERIC);
|
||||
}
|
||||
|
||||
public static Builder<ValuesSource.Bytes> bytes(String aggName, InternalAggregation.Type aggType, SearchContext context) {
|
||||
return new Builder<>(aggName, aggType, context, ValuesSource.Bytes.class).targetValueType(ValueType.STRING);
|
||||
return new Builder<>(aggName, aggType, context, ValuesSource.Bytes.class, ValuesSourceType.BYTES).targetValueType(ValueType.STRING);
|
||||
}
|
||||
|
||||
public static Builder<ValuesSource.GeoPoint> geoPoint(String aggName, InternalAggregation.Type aggType, SearchContext context) {
|
||||
return new Builder<>(aggName, aggType, context, ValuesSource.GeoPoint.class).targetValueType(ValueType.GEOPOINT).scriptable(false);
|
||||
return new Builder<>(aggName, aggType, context, ValuesSource.GeoPoint.class, ValuesSourceType.GEOPOINT).targetValueType(
|
||||
ValueType.GEOPOINT).scriptable(false);
|
||||
}
|
||||
|
||||
// NORELEASE remove this class when aggs refactoring complete
|
||||
|
@ -75,7 +77,7 @@ public class ValuesSourceParser<VS extends ValuesSource> {
|
|||
ValueType valueType = null;
|
||||
String format = null;
|
||||
Object missing = null;
|
||||
Class<VS> valuesSourceType = null;
|
||||
ValuesSourceType valuesSourceType = null;
|
||||
ValueType targetValueType = null;
|
||||
DateTimeZone timezone = DateTimeZone.UTC;
|
||||
|
||||
|
@ -99,7 +101,7 @@ public class ValuesSourceParser<VS extends ValuesSource> {
|
|||
|
||||
private Input<VS> input = new Input<VS>();
|
||||
|
||||
private ValuesSourceParser(String aggName, InternalAggregation.Type aggType, SearchContext context, Class<VS> valuesSourceType) {
|
||||
private ValuesSourceParser(String aggName, InternalAggregation.Type aggType, SearchContext context, ValuesSourceType valuesSourceType) {
|
||||
this.aggName = aggName;
|
||||
this.aggType = aggType;
|
||||
this.context = context;
|
||||
|
@ -170,7 +172,7 @@ public class ValuesSourceParser<VS extends ValuesSource> {
|
|||
}
|
||||
|
||||
return input;
|
||||
}
|
||||
}
|
||||
|
||||
// NORELEASE remove this class when aggs refactoring complete
|
||||
/**
|
||||
|
@ -182,7 +184,8 @@ public class ValuesSourceParser<VS extends ValuesSource> {
|
|||
|
||||
private final ValuesSourceParser<VS> parser;
|
||||
|
||||
private Builder(String aggName, InternalAggregation.Type aggType, SearchContext context, Class<VS> valuesSourceType) {
|
||||
private Builder(String aggName, InternalAggregation.Type aggType, SearchContext context, Class<VS> valuesSourcecClass,
|
||||
ValuesSourceType valuesSourceType) {
|
||||
parser = new ValuesSourceParser<>(aggName, aggType, context, valuesSourceType);
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.support;
|
||||
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/*
|
||||
* The ordinal values for this class are tested in ValuesSourceTypeTests to
|
||||
* ensure that the ordinal for each value does not change and break bwc
|
||||
*/
|
||||
public enum ValuesSourceType implements Writeable<ValuesSourceType> {
|
||||
|
||||
ANY,
|
||||
NUMERIC,
|
||||
BYTES,
|
||||
GEOPOINT;
|
||||
|
||||
@Override
|
||||
public ValuesSourceType readFrom(StreamInput in) throws IOException {
|
||||
int ordinal = in.readVInt();
|
||||
if (ordinal < 0 || ordinal >= values().length) {
|
||||
throw new IOException("Unknown ValuesSourceType ordinal [" + ordinal + "]");
|
||||
}
|
||||
return values()[ordinal];
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(ordinal());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,281 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.inject.Injector;
|
||||
import org.elasticsearch.common.inject.ModulesBuilder;
|
||||
import org.elasticsearch.common.inject.util.Providers;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsModule;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.EnvironmentModule;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.indices.IndicesModule;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.script.ScriptModule;
|
||||
import org.elasticsearch.search.SearchModule;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.IndexSettingsModule;
|
||||
import org.elasticsearch.test.TestSearchContext;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPoolModule;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public abstract class BaseAggregationTestCase<AF extends AggregatorFactory> extends ESTestCase {
|
||||
|
||||
protected static final String STRING_FIELD_NAME = "mapped_string";
|
||||
protected static final String INT_FIELD_NAME = "mapped_int";
|
||||
protected static final String DOUBLE_FIELD_NAME = "mapped_double";
|
||||
protected static final String BOOLEAN_FIELD_NAME = "mapped_boolean";
|
||||
protected static final String DATE_FIELD_NAME = "mapped_date";
|
||||
protected static final String OBJECT_FIELD_NAME = "mapped_object";
|
||||
protected static final String[] mappedFieldNames = new String[] { STRING_FIELD_NAME, INT_FIELD_NAME,
|
||||
DOUBLE_FIELD_NAME, BOOLEAN_FIELD_NAME, DATE_FIELD_NAME, OBJECT_FIELD_NAME };
|
||||
|
||||
private static Injector injector;
|
||||
private static Index index;
|
||||
|
||||
private static String[] currentTypes;
|
||||
|
||||
protected static String[] getCurrentTypes() {
|
||||
return currentTypes;
|
||||
}
|
||||
|
||||
private static NamedWriteableRegistry namedWriteableRegistry;
|
||||
|
||||
private static AggregatorParsers aggParsers;
|
||||
|
||||
protected abstract AF createTestAggregatorFactory();
|
||||
|
||||
/**
|
||||
* Setup for the whole base test class.
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void init() throws IOException {
|
||||
Settings settings = Settings.settingsBuilder()
|
||||
.put("name", BaseAggregationTestCase.class.toString())
|
||||
.put("path.home", createTempDir())
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomVersionBetween(random(),
|
||||
Version.V_1_0_0, Version.CURRENT))
|
||||
.build();
|
||||
|
||||
index = new Index("test");
|
||||
injector = new ModulesBuilder().add(
|
||||
new EnvironmentModule(new Environment(settings)),
|
||||
new SettingsModule(settings),
|
||||
new ThreadPoolModule(new ThreadPool(settings)),
|
||||
new ScriptModule(settings),
|
||||
new IndicesModule() {
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
bindQueryParsersExtension();
|
||||
}
|
||||
}, new SearchModule() {
|
||||
@Override
|
||||
protected void configure() {
|
||||
configureAggs();
|
||||
configureHighlighters();
|
||||
configureFetchSubPhase();
|
||||
configureFunctionScore();
|
||||
}
|
||||
},
|
||||
new IndexSettingsModule(index, settings),
|
||||
new AbstractModule() {
|
||||
@Override
|
||||
protected void configure() {
|
||||
bind(ClusterService.class).toProvider(Providers.of((ClusterService) null));
|
||||
bind(CircuitBreakerService.class).to(NoneCircuitBreakerService.class);
|
||||
bind(NamedWriteableRegistry.class).asEagerSingleton();
|
||||
}
|
||||
}
|
||||
).createInjector();
|
||||
aggParsers = injector.getInstance(AggregatorParsers.class);
|
||||
//create some random type with some default field, those types will stick around for all of the subclasses
|
||||
currentTypes = new String[randomIntBetween(0, 5)];
|
||||
for (int i = 0; i < currentTypes.length; i++) {
|
||||
String type = randomAsciiOfLengthBetween(1, 10);
|
||||
currentTypes[i] = type;
|
||||
}
|
||||
namedWriteableRegistry = injector.getInstance(NamedWriteableRegistry.class);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() throws Exception {
|
||||
terminate(injector.getInstance(ThreadPool.class));
|
||||
injector = null;
|
||||
index = null;
|
||||
aggParsers = null;
|
||||
currentTypes = null;
|
||||
namedWriteableRegistry = null;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void beforeTest() {
|
||||
//set some random types to be queried as part the search request, before each test
|
||||
String[] types = getRandomTypes();
|
||||
TestSearchContext testSearchContext = new TestSearchContext();
|
||||
testSearchContext.setTypes(types);
|
||||
SearchContext.setCurrent(testSearchContext);
|
||||
}
|
||||
|
||||
@After
|
||||
public void afterTest() {
|
||||
SearchContext.removeCurrent();
|
||||
}
|
||||
|
||||
/**
|
||||
* Generic test that creates new AggregatorFactory from the test
|
||||
* AggregatorFactory and checks both for equality and asserts equality on
|
||||
* the two queries.
|
||||
*/
|
||||
public void testFromXContent() throws IOException {
|
||||
AF testAgg = createTestAggregatorFactory();
|
||||
AggregatorFactories factories = AggregatorFactories.builder().addAggregator(testAgg).build();
|
||||
String contentString = factories.toString();
|
||||
XContentParser parser = XContentFactory.xContent(contentString).createParser(contentString);
|
||||
assertSame(XContentParser.Token.START_OBJECT, parser.nextToken());
|
||||
assertSame(XContentParser.Token.FIELD_NAME, parser.nextToken());
|
||||
assertEquals(testAgg.name, parser.currentName());
|
||||
assertSame(XContentParser.Token.START_OBJECT, parser.nextToken());
|
||||
assertSame(XContentParser.Token.FIELD_NAME, parser.nextToken());
|
||||
assertEquals(testAgg.type, parser.currentName());
|
||||
assertSame(XContentParser.Token.START_OBJECT, parser.nextToken());
|
||||
AggregatorFactory newAgg = aggParsers.parser(testAgg.getWriteableName()).parse(testAgg.name, parser, SearchContext.current());
|
||||
assertSame(XContentParser.Token.END_OBJECT, parser.currentToken());
|
||||
assertSame(XContentParser.Token.END_OBJECT, parser.nextToken());
|
||||
assertSame(XContentParser.Token.END_OBJECT, parser.nextToken());
|
||||
assertNull(parser.nextToken());
|
||||
assertNotNull(newAgg);
|
||||
assertNotSame(newAgg, testAgg);
|
||||
assertEquals(testAgg, newAgg);
|
||||
assertEquals(testAgg.hashCode(), newAgg.hashCode());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test serialization and deserialization of the test AggregatorFactory.
|
||||
*/
|
||||
|
||||
public void testSerialization() throws IOException {
|
||||
AF testAgg = createTestAggregatorFactory();
|
||||
try (BytesStreamOutput output = new BytesStreamOutput()) {
|
||||
testAgg.writeTo(output);
|
||||
try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(output.bytes()), namedWriteableRegistry)) {
|
||||
AggregatorFactory prototype = aggParsers.parser(testAgg.getWriteableName()).getFactoryPrototype();
|
||||
AggregatorFactory deserializedQuery = prototype.readFrom(in);
|
||||
assertEquals(deserializedQuery, testAgg);
|
||||
assertEquals(deserializedQuery.hashCode(), testAgg.hashCode());
|
||||
assertNotSame(deserializedQuery, testAgg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void testEqualsAndHashcode() throws IOException {
|
||||
AF firstAgg = createTestAggregatorFactory();
|
||||
assertFalse("aggregation is equal to null", firstAgg.equals(null));
|
||||
assertFalse("aggregation is equal to incompatible type", firstAgg.equals(""));
|
||||
assertTrue("aggregation is not equal to self", firstAgg.equals(firstAgg));
|
||||
assertThat("same aggregation's hashcode returns different values if called multiple times", firstAgg.hashCode(),
|
||||
equalTo(firstAgg.hashCode()));
|
||||
|
||||
AF secondQuery = copyAggregation(firstAgg);
|
||||
assertTrue("aggregation is not equal to self", secondQuery.equals(secondQuery));
|
||||
assertTrue("aggregation is not equal to its copy", firstAgg.equals(secondQuery));
|
||||
assertTrue("equals is not symmetric", secondQuery.equals(firstAgg));
|
||||
assertThat("aggregation copy's hashcode is different from original hashcode", secondQuery.hashCode(), equalTo(firstAgg.hashCode()));
|
||||
|
||||
AF thirdQuery = copyAggregation(secondQuery);
|
||||
assertTrue("aggregation is not equal to self", thirdQuery.equals(thirdQuery));
|
||||
assertTrue("aggregation is not equal to its copy", secondQuery.equals(thirdQuery));
|
||||
assertThat("aggregation copy's hashcode is different from original hashcode", secondQuery.hashCode(),
|
||||
equalTo(thirdQuery.hashCode()));
|
||||
assertTrue("equals is not transitive", firstAgg.equals(thirdQuery));
|
||||
assertThat("aggregation copy's hashcode is different from original hashcode", firstAgg.hashCode(), equalTo(thirdQuery.hashCode()));
|
||||
assertTrue("equals is not symmetric", thirdQuery.equals(secondQuery));
|
||||
assertTrue("equals is not symmetric", thirdQuery.equals(firstAgg));
|
||||
}
|
||||
|
||||
// we use the streaming infra to create a copy of the query provided as
|
||||
// argument
|
||||
private AF copyAggregation(AF agg) throws IOException {
|
||||
try (BytesStreamOutput output = new BytesStreamOutput()) {
|
||||
agg.writeTo(output);
|
||||
try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(output.bytes()), namedWriteableRegistry)) {
|
||||
AggregatorFactory prototype = aggParsers.parser(agg.getWriteableName()).getFactoryPrototype();
|
||||
@SuppressWarnings("unchecked")
|
||||
AF secondAgg = (AF) prototype.readFrom(in);
|
||||
return secondAgg;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected String[] getRandomTypes() {
|
||||
String[] types;
|
||||
if (currentTypes.length > 0 && randomBoolean()) {
|
||||
int numberOfQueryTypes = randomIntBetween(1, currentTypes.length);
|
||||
types = new String[numberOfQueryTypes];
|
||||
for (int i = 0; i < numberOfQueryTypes; i++) {
|
||||
types[i] = randomFrom(currentTypes);
|
||||
}
|
||||
} else {
|
||||
if (randomBoolean()) {
|
||||
types = new String[] { MetaData.ALL };
|
||||
} else {
|
||||
types = new String[0];
|
||||
}
|
||||
}
|
||||
return types;
|
||||
}
|
||||
|
||||
public String randomNumericField() {
|
||||
int randomInt = randomInt(3);
|
||||
switch (randomInt) {
|
||||
case 0:
|
||||
return DATE_FIELD_NAME;
|
||||
case 1:
|
||||
return DOUBLE_FIELD_NAME;
|
||||
case 2:
|
||||
default:
|
||||
return INT_FIELD_NAME;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,285 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.inject.Injector;
|
||||
import org.elasticsearch.common.inject.ModulesBuilder;
|
||||
import org.elasticsearch.common.inject.util.Providers;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
|
||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsModule;
|
||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.EnvironmentModule;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.indices.IndicesModule;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.script.ScriptModule;
|
||||
import org.elasticsearch.search.SearchModule;
|
||||
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorFactory;
|
||||
import org.elasticsearch.search.internal.SearchContext;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.IndexSettingsModule;
|
||||
import org.elasticsearch.test.TestSearchContext;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.threadpool.ThreadPoolModule;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public abstract class BasePipelineAggregationTestCase<AF extends PipelineAggregatorFactory> extends ESTestCase {
|
||||
|
||||
protected static final String STRING_FIELD_NAME = "mapped_string";
|
||||
protected static final String INT_FIELD_NAME = "mapped_int";
|
||||
protected static final String DOUBLE_FIELD_NAME = "mapped_double";
|
||||
protected static final String BOOLEAN_FIELD_NAME = "mapped_boolean";
|
||||
protected static final String DATE_FIELD_NAME = "mapped_date";
|
||||
protected static final String OBJECT_FIELD_NAME = "mapped_object";
|
||||
protected static final String[] mappedFieldNames = new String[] { STRING_FIELD_NAME, INT_FIELD_NAME,
|
||||
DOUBLE_FIELD_NAME, BOOLEAN_FIELD_NAME, DATE_FIELD_NAME, OBJECT_FIELD_NAME };
|
||||
|
||||
private static Injector injector;
|
||||
private static Index index;
|
||||
|
||||
private static String[] currentTypes;
|
||||
|
||||
protected static String[] getCurrentTypes() {
|
||||
return currentTypes;
|
||||
}
|
||||
|
||||
private static NamedWriteableRegistry namedWriteableRegistry;
|
||||
|
||||
private static AggregatorParsers aggParsers;
|
||||
|
||||
protected abstract AF createTestAggregatorFactory();
|
||||
|
||||
/**
|
||||
* Setup for the whole base test class.
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void init() throws IOException {
|
||||
Settings settings = Settings.settingsBuilder()
|
||||
.put("name", BasePipelineAggregationTestCase.class.toString())
|
||||
.put("path.home", createTempDir())
|
||||
.put(IndexMetaData.SETTING_VERSION_CREATED, VersionUtils.randomVersionBetween(random(),
|
||||
Version.V_1_0_0, Version.CURRENT))
|
||||
.build();
|
||||
|
||||
index = new Index("test");
|
||||
injector = new ModulesBuilder().add(
|
||||
new EnvironmentModule(new Environment(settings)),
|
||||
new SettingsModule(settings),
|
||||
new ThreadPoolModule(new ThreadPool(settings)),
|
||||
new ScriptModule(settings),
|
||||
new IndicesModule() {
|
||||
|
||||
@Override
|
||||
protected void configure() {
|
||||
bindQueryParsersExtension();
|
||||
}
|
||||
}, new SearchModule() {
|
||||
@Override
|
||||
protected void configure() {
|
||||
configureAggs();
|
||||
configureHighlighters();
|
||||
configureFetchSubPhase();
|
||||
configureFunctionScore();
|
||||
}
|
||||
},
|
||||
new IndexSettingsModule(index, settings),
|
||||
new AbstractModule() {
|
||||
@Override
|
||||
protected void configure() {
|
||||
bind(ClusterService.class).toProvider(Providers.of((ClusterService) null));
|
||||
bind(CircuitBreakerService.class).to(NoneCircuitBreakerService.class);
|
||||
bind(NamedWriteableRegistry.class).asEagerSingleton();
|
||||
}
|
||||
}
|
||||
).createInjector();
|
||||
aggParsers = injector.getInstance(AggregatorParsers.class);
|
||||
//create some random type with some default field, those types will stick around for all of the subclasses
|
||||
currentTypes = new String[randomIntBetween(0, 5)];
|
||||
for (int i = 0; i < currentTypes.length; i++) {
|
||||
String type = randomAsciiOfLengthBetween(1, 10);
|
||||
currentTypes[i] = type;
|
||||
}
|
||||
namedWriteableRegistry = injector.getInstance(NamedWriteableRegistry.class);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() throws Exception {
|
||||
terminate(injector.getInstance(ThreadPool.class));
|
||||
injector = null;
|
||||
index = null;
|
||||
aggParsers = null;
|
||||
currentTypes = null;
|
||||
namedWriteableRegistry = null;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void beforeTest() {
|
||||
//set some random types to be queried as part the search request, before each test
|
||||
String[] types = getRandomTypes();
|
||||
TestSearchContext testSearchContext = new TestSearchContext();
|
||||
testSearchContext.setTypes(types);
|
||||
SearchContext.setCurrent(testSearchContext);
|
||||
}
|
||||
|
||||
@After
|
||||
public void afterTest() {
|
||||
SearchContext.removeCurrent();
|
||||
}
|
||||
|
||||
/**
|
||||
* Generic test that creates new AggregatorFactory from the test
|
||||
* AggregatorFactory and checks both for equality and asserts equality on
|
||||
* the two queries.
|
||||
*/
|
||||
|
||||
public void testFromXContent() throws IOException {
|
||||
AF testAgg = createTestAggregatorFactory();
|
||||
AggregatorFactories factories = AggregatorFactories.builder().skipResolveOrder().addPipelineAggregator(testAgg).build();
|
||||
String contentString = factories.toString();
|
||||
System.out.println(contentString);
|
||||
XContentParser parser = XContentFactory.xContent(contentString).createParser(contentString);
|
||||
assertSame(XContentParser.Token.START_OBJECT, parser.nextToken());
|
||||
assertSame(XContentParser.Token.FIELD_NAME, parser.nextToken());
|
||||
assertEquals(testAgg.name(), parser.currentName());
|
||||
assertSame(XContentParser.Token.START_OBJECT, parser.nextToken());
|
||||
assertSame(XContentParser.Token.FIELD_NAME, parser.nextToken());
|
||||
assertEquals(testAgg.type(), parser.currentName());
|
||||
assertSame(XContentParser.Token.START_OBJECT, parser.nextToken());
|
||||
PipelineAggregatorFactory newAgg = aggParsers.pipelineAggregator(testAgg.getWriteableName()).parse(testAgg.name(), parser,
|
||||
SearchContext.current());
|
||||
assertSame(XContentParser.Token.END_OBJECT, parser.currentToken());
|
||||
assertSame(XContentParser.Token.END_OBJECT, parser.nextToken());
|
||||
assertSame(XContentParser.Token.END_OBJECT, parser.nextToken());
|
||||
assertNull(parser.nextToken());
|
||||
assertNotNull(newAgg);
|
||||
assertNotSame(newAgg, testAgg);
|
||||
assertEquals(testAgg, newAgg);
|
||||
assertEquals(testAgg.hashCode(), newAgg.hashCode());
|
||||
}
|
||||
|
||||
/**
|
||||
* Test serialization and deserialization of the test AggregatorFactory.
|
||||
*/
|
||||
|
||||
public void testSerialization() throws IOException {
|
||||
AF testAgg = createTestAggregatorFactory();
|
||||
try (BytesStreamOutput output = new BytesStreamOutput()) {
|
||||
testAgg.writeTo(output);
|
||||
try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(output.bytes()), namedWriteableRegistry)) {
|
||||
PipelineAggregatorFactory prototype = aggParsers.pipelineAggregator(testAgg.getWriteableName()).getFactoryPrototype();
|
||||
PipelineAggregatorFactory deserializedQuery = prototype.readFrom(in);
|
||||
assertEquals(deserializedQuery, testAgg);
|
||||
assertEquals(deserializedQuery.hashCode(), testAgg.hashCode());
|
||||
assertNotSame(deserializedQuery, testAgg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void testEqualsAndHashcode() throws IOException {
|
||||
AF firstAgg = createTestAggregatorFactory();
|
||||
assertFalse("aggregation is equal to null", firstAgg.equals(null));
|
||||
assertFalse("aggregation is equal to incompatible type", firstAgg.equals(""));
|
||||
assertTrue("aggregation is not equal to self", firstAgg.equals(firstAgg));
|
||||
assertThat("same aggregation's hashcode returns different values if called multiple times", firstAgg.hashCode(),
|
||||
equalTo(firstAgg.hashCode()));
|
||||
|
||||
AF secondQuery = copyAggregation(firstAgg);
|
||||
assertTrue("aggregation is not equal to self", secondQuery.equals(secondQuery));
|
||||
assertTrue("aggregation is not equal to its copy", firstAgg.equals(secondQuery));
|
||||
assertTrue("equals is not symmetric", secondQuery.equals(firstAgg));
|
||||
assertThat("aggregation copy's hashcode is different from original hashcode", secondQuery.hashCode(), equalTo(firstAgg.hashCode()));
|
||||
|
||||
AF thirdQuery = copyAggregation(secondQuery);
|
||||
assertTrue("aggregation is not equal to self", thirdQuery.equals(thirdQuery));
|
||||
assertTrue("aggregation is not equal to its copy", secondQuery.equals(thirdQuery));
|
||||
assertThat("aggregation copy's hashcode is different from original hashcode", secondQuery.hashCode(),
|
||||
equalTo(thirdQuery.hashCode()));
|
||||
assertTrue("equals is not transitive", firstAgg.equals(thirdQuery));
|
||||
assertThat("aggregation copy's hashcode is different from original hashcode", firstAgg.hashCode(), equalTo(thirdQuery.hashCode()));
|
||||
assertTrue("equals is not symmetric", thirdQuery.equals(secondQuery));
|
||||
assertTrue("equals is not symmetric", thirdQuery.equals(firstAgg));
|
||||
}
|
||||
|
||||
// we use the streaming infra to create a copy of the query provided as
|
||||
// argument
|
||||
private AF copyAggregation(AF agg) throws IOException {
|
||||
try (BytesStreamOutput output = new BytesStreamOutput()) {
|
||||
agg.writeTo(output);
|
||||
try (StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(output.bytes()), namedWriteableRegistry)) {
|
||||
PipelineAggregatorFactory prototype = aggParsers.pipelineAggregator(agg.getWriteableName()).getFactoryPrototype();
|
||||
@SuppressWarnings("unchecked")
|
||||
AF secondAgg = (AF) prototype.readFrom(in);
|
||||
return secondAgg;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected String[] getRandomTypes() {
|
||||
String[] types;
|
||||
if (currentTypes.length > 0 && randomBoolean()) {
|
||||
int numberOfQueryTypes = randomIntBetween(1, currentTypes.length);
|
||||
types = new String[numberOfQueryTypes];
|
||||
for (int i = 0; i < numberOfQueryTypes; i++) {
|
||||
types[i] = randomFrom(currentTypes);
|
||||
}
|
||||
} else {
|
||||
if (randomBoolean()) {
|
||||
types = new String[] { MetaData.ALL };
|
||||
} else {
|
||||
types = new String[0];
|
||||
}
|
||||
}
|
||||
return types;
|
||||
}
|
||||
|
||||
public String randomNumericField() {
|
||||
int randomInt = randomInt(3);
|
||||
switch (randomInt) {
|
||||
case 0:
|
||||
return DATE_FIELD_NAME;
|
||||
case 1:
|
||||
return DOUBLE_FIELD_NAME;
|
||||
case 2:
|
||||
default:
|
||||
return INT_FIELD_NAME;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.metrics;
|
||||
|
||||
import org.elasticsearch.script.Script;
|
||||
import org.elasticsearch.search.aggregations.BaseAggregationTestCase;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSource;
|
||||
import org.elasticsearch.search.aggregations.support.ValuesSourceAggregatorFactory;
|
||||
|
||||
public abstract class AbstractNumericMetricTestCase<AF extends ValuesSourceAggregatorFactory.LeafOnly<ValuesSource.Numeric>> extends
|
||||
BaseAggregationTestCase<AF> {
|
||||
|
||||
@Override
|
||||
protected final AF createTestAggregatorFactory() {
|
||||
AF factory = doCreateTestAggregatorFactory();
|
||||
String field = randomNumericField();
|
||||
int randomFieldBranch = randomInt(3);
|
||||
switch (randomFieldBranch) {
|
||||
case 0:
|
||||
factory.field(field);
|
||||
break;
|
||||
case 1:
|
||||
factory.field(field);
|
||||
factory.script(new Script("_value + 1"));
|
||||
break;
|
||||
case 2:
|
||||
factory.script(new Script("doc[" + field + "] + 1"));
|
||||
break;
|
||||
}
|
||||
if (randomBoolean()) {
|
||||
factory.missing("MISSING");
|
||||
}
|
||||
return factory;
|
||||
}
|
||||
|
||||
protected abstract AF doCreateTestAggregatorFactory();
|
||||
}
|
|
@ -0,0 +1,109 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.search.aggregations.support;
|
||||
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
|
||||
public class ValuesSourceTypeTests extends ESTestCase {
|
||||
|
||||
public void testValidOrdinals() {
|
||||
assertThat(ValuesSourceType.ANY.ordinal(), equalTo(0));
|
||||
assertThat(ValuesSourceType.NUMERIC.ordinal(), equalTo(1));
|
||||
assertThat(ValuesSourceType.BYTES.ordinal(), equalTo(2));
|
||||
assertThat(ValuesSourceType.GEOPOINT.ordinal(), equalTo(3));
|
||||
}
|
||||
|
||||
public void testwriteTo() throws Exception {
|
||||
try (BytesStreamOutput out = new BytesStreamOutput()) {
|
||||
ValuesSourceType.ANY.writeTo(out);
|
||||
try (StreamInput in = StreamInput.wrap(out.bytes())) {
|
||||
assertThat(in.readVInt(), equalTo(0));
|
||||
}
|
||||
}
|
||||
|
||||
try (BytesStreamOutput out = new BytesStreamOutput()) {
|
||||
ValuesSourceType.NUMERIC.writeTo(out);
|
||||
try (StreamInput in = StreamInput.wrap(out.bytes())) {
|
||||
assertThat(in.readVInt(), equalTo(1));
|
||||
}
|
||||
}
|
||||
|
||||
try (BytesStreamOutput out = new BytesStreamOutput()) {
|
||||
ValuesSourceType.BYTES.writeTo(out);
|
||||
try (StreamInput in = StreamInput.wrap(out.bytes())) {
|
||||
assertThat(in.readVInt(), equalTo(2));
|
||||
}
|
||||
}
|
||||
|
||||
try (BytesStreamOutput out = new BytesStreamOutput()) {
|
||||
ValuesSourceType.GEOPOINT.writeTo(out);
|
||||
try (StreamInput in = StreamInput.wrap(out.bytes())) {
|
||||
assertThat(in.readVInt(), equalTo(3));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testReadFrom() throws Exception {
|
||||
try (BytesStreamOutput out = new BytesStreamOutput()) {
|
||||
out.writeVInt(0);
|
||||
try (StreamInput in = StreamInput.wrap(out.bytes())) {
|
||||
assertThat(ValuesSourceType.ANY.readFrom(in), equalTo(ValuesSourceType.ANY));
|
||||
}
|
||||
}
|
||||
try (BytesStreamOutput out = new BytesStreamOutput()) {
|
||||
out.writeVInt(1);
|
||||
try (StreamInput in = StreamInput.wrap(out.bytes())) {
|
||||
assertThat(ValuesSourceType.ANY.readFrom(in), equalTo(ValuesSourceType.NUMERIC));
|
||||
}
|
||||
}
|
||||
try (BytesStreamOutput out = new BytesStreamOutput()) {
|
||||
out.writeVInt(2);
|
||||
try (StreamInput in = StreamInput.wrap(out.bytes())) {
|
||||
assertThat(ValuesSourceType.ANY.readFrom(in), equalTo(ValuesSourceType.BYTES));
|
||||
}
|
||||
}
|
||||
try (BytesStreamOutput out = new BytesStreamOutput()) {
|
||||
out.writeVInt(3);
|
||||
try (StreamInput in = StreamInput.wrap(out.bytes())) {
|
||||
assertThat(ValuesSourceType.ANY.readFrom(in), equalTo(ValuesSourceType.GEOPOINT));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testInvalidReadFrom() throws Exception {
|
||||
try (BytesStreamOutput out = new BytesStreamOutput()) {
|
||||
out.writeVInt(randomIntBetween(4, Integer.MAX_VALUE));
|
||||
try (StreamInput in = StreamInput.wrap(out.bytes())) {
|
||||
ValuesSourceType.ANY.readFrom(in);
|
||||
fail("Expected IOException");
|
||||
} catch(IOException e) {
|
||||
assertThat(e.getMessage(), containsString("Unknown ValuesSourceType ordinal ["));
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue