Start migration away from aggregation streams

We'll migrate to NamedWriteable so we can share code with the rest
of the system. So we can work on this in multiple pull requests without
breaking Elasticsearch in between the commits this change supports
*both* old style `InternalAggregations.stream` serialization and
`NamedWriteable` style serialization. As such it creates about a
half dozen `// NORELEASE` comments that will have to be removed
once the migration is complete.

This also introduces a boolean `transportClient` flag to `SearchModule`
which is used to skip inappropriate registrations for for the
transport client while still registering the things it needs. In
this case that means that the `InternalAggregation` subclasses are
registered with the `NamedWriteableRegistry` but the `AggregationBuilder`
subclasses are not.

Finally, this moves aggregation registration from guice configuration
time to `SearchModule` construction time. This will make it simpler to
work with in the future as we further clean up Elasticsearch's
extension points.
This commit is contained in:
Nik Everett 2016-06-27 11:21:45 -04:00
parent e1ab3f16fd
commit f5a269b029
23 changed files with 204 additions and 126 deletions

View File

@ -142,12 +142,7 @@ public class TransportClient extends AbstractClient {
} }
modules.add(new NetworkModule(networkService, settings, true, namedWriteableRegistry)); modules.add(new NetworkModule(networkService, settings, true, namedWriteableRegistry));
modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool)); modules.add(b -> b.bind(ThreadPool.class).toInstance(threadPool));
modules.add(new SearchModule(settings, namedWriteableRegistry) { modules.add(new SearchModule(settings, namedWriteableRegistry, true));
@Override
protected void configure() {
// noop
}
});
modules.add(new ActionModule(false, true, settings, null, settingsModule.getClusterSettings(), modules.add(new ActionModule(false, true, settings, null, settingsModule.getClusterSettings(),
pluginsService.filterPlugins(ActionPlugin.class))); pluginsService.filterPlugins(ActionPlugin.class)));

View File

@ -267,7 +267,7 @@ public class Node implements Closeable {
ClusterModule clusterModule = new ClusterModule(settings, clusterService); ClusterModule clusterModule = new ClusterModule(settings, clusterService);
modules.add(clusterModule); modules.add(clusterModule);
modules.add(new IndicesModule(namedWriteableRegistry, pluginsService.filterPlugins(MapperPlugin.class))); modules.add(new IndicesModule(namedWriteableRegistry, pluginsService.filterPlugins(MapperPlugin.class)));
modules.add(new SearchModule(settings, namedWriteableRegistry)); modules.add(new SearchModule(settings, namedWriteableRegistry, false));
modules.add(new ActionModule(DiscoveryNode.isIngestNode(settings), false, settings, modules.add(new ActionModule(DiscoveryNode.isIngestNode(settings), false, settings,
clusterModule.getIndexNameExpressionResolver(), settingsModule.getClusterSettings(), clusterModule.getIndexNameExpressionResolver(), settingsModule.getClusterSettings(),
pluginsService.filterPlugins(ActionPlugin.class))); pluginsService.filterPlugins(ActionPlugin.class)));

View File

@ -94,6 +94,7 @@ import org.elasticsearch.search.action.SearchTransportService;
import org.elasticsearch.search.aggregations.Aggregator; import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregationBuilder; import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.AggregatorParsers; import org.elasticsearch.search.aggregations.AggregatorParsers;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder; import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.children.ChildrenAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.children.ChildrenAggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.children.InternalChildren; import org.elasticsearch.search.aggregations.bucket.children.InternalChildren;
@ -264,6 +265,7 @@ import java.util.Set;
*/ */
public class SearchModule extends AbstractModule { public class SearchModule extends AbstractModule {
private final boolean transportClient;
private final Highlighters highlighters; private final Highlighters highlighters;
private final Suggesters suggesters; private final Suggesters suggesters;
private final ParseFieldRegistry<ScoreFunctionParser<?>> scoreFunctionParserRegistry = new ParseFieldRegistry<>("score_function"); private final ParseFieldRegistry<ScoreFunctionParser<?>> scoreFunctionParserRegistry = new ParseFieldRegistry<>("score_function");
@ -287,9 +289,10 @@ public class SearchModule extends AbstractModule {
// pkg private so tests can mock // pkg private so tests can mock
Class<? extends SearchService> searchServiceImpl = SearchService.class; Class<? extends SearchService> searchServiceImpl = SearchService.class;
public SearchModule(Settings settings, NamedWriteableRegistry namedWriteableRegistry) { public SearchModule(Settings settings, NamedWriteableRegistry namedWriteableRegistry, boolean transportClient) {
this.settings = settings; this.settings = settings;
this.namedWriteableRegistry = namedWriteableRegistry; this.namedWriteableRegistry = namedWriteableRegistry;
this.transportClient = transportClient;
suggesters = new Suggesters(namedWriteableRegistry); suggesters = new Suggesters(namedWriteableRegistry);
highlighters = new Highlighters(settings); highlighters = new Highlighters(settings);
registerBuiltinScoreFunctionParsers(); registerBuiltinScoreFunctionParsers();
@ -300,6 +303,7 @@ public class SearchModule extends AbstractModule {
registerBuiltinSignificanceHeuristics(); registerBuiltinSignificanceHeuristics();
registerBuiltinMovingAverageModels(); registerBuiltinMovingAverageModels();
registerBuiltinSubFetchPhases(); registerBuiltinSubFetchPhases();
registerBuiltinAggregations();
} }
public void registerHighlighter(String key, Highlighter highligher) { public void registerHighlighter(String key, Highlighter highligher) {
@ -414,15 +418,27 @@ public class SearchModule extends AbstractModule {
/** /**
* Register an aggregation. * Register an aggregation.
* *
* @param reader reads the aggregation builder from a stream * @param builderReader reads the {@link AggregationBuilder} from a stream
* @param internalReader reads the {@link InternalAggregation} from a stream
* @param aggregationParser reads the aggregation builder from XContent * @param aggregationParser reads the aggregation builder from XContent
* @param aggregationName names by which the aggregation may be parsed. The first name is special because it is the name that the reader * @param aggregationName names by which the aggregation may be parsed. The first name is special because it is the name that the reader
* is registered under. * is registered under.
*/ */
public void registerAggregation(Writeable.Reader<? extends AggregationBuilder> reader, Aggregator.Parser aggregationParser, public void registerAggregation(Writeable.Reader<? extends AggregationBuilder> builderReader,
ParseField aggregationName) { Writeable.Reader<? extends InternalAggregation> internalReader, Aggregator.Parser aggregationParser,
ParseField aggregationName) {
if (false == transportClient) {
namedWriteableRegistry.register(AggregationBuilder.class, aggregationName.getPreferredName(), builderReader);
aggregationParserRegistry.register(aggregationParser, aggregationName);
}
namedWriteableRegistry.register(InternalAggregation.class, aggregationName.getPreferredName(), internalReader);
}
public void registerAggregation(Writeable.Reader<? extends AggregationBuilder> builderReader, Aggregator.Parser aggregationParser,
ParseField aggregationName) {
// NORELEASE remove me in favor of the above method
namedWriteableRegistry.register(AggregationBuilder.class, aggregationName.getPreferredName(), builderReader);
aggregationParserRegistry.register(aggregationParser, aggregationName); aggregationParserRegistry.register(aggregationParser, aggregationName);
namedWriteableRegistry.register(AggregationBuilder.class, aggregationName.getPreferredName(), reader);
} }
/** /**
@ -441,15 +457,21 @@ public class SearchModule extends AbstractModule {
@Override @Override
protected void configure() { protected void configure() {
bind(IndicesQueriesRegistry.class).toInstance(queryParserRegistry); if (false == transportClient) {
bind(Suggesters.class).toInstance(suggesters); /*
configureSearch(); * Nothing is bound for transport client *but* SearchModule is still responsible for settings up the things like the
configureAggs(); * NamedWriteableRegistry.
configureShapes(); */
bind(IndicesQueriesRegistry.class).toInstance(queryParserRegistry);
bind(Suggesters.class).toInstance(suggesters);
configureSearch();
configureShapes();
bind(AggregatorParsers.class).toInstance(aggregatorParsers);
}
} }
protected void configureAggs() { private void registerBuiltinAggregations() {
registerAggregation(AvgAggregationBuilder::new, new AvgParser(), AvgAggregationBuilder.AGGREGATION_NAME_FIELD); registerAggregation(AvgAggregationBuilder::new, InternalAvg::new, new AvgParser(), AvgAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(SumAggregationBuilder::new, new SumParser(), SumAggregationBuilder.AGGREGATION_NAME_FIELD); registerAggregation(SumAggregationBuilder::new, new SumParser(), SumAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(MinAggregationBuilder::new, new MinParser(), MinAggregationBuilder.AGGREGATION_NAME_FIELD); registerAggregation(MinAggregationBuilder::new, new MinParser(), MinAggregationBuilder.AGGREGATION_NAME_FIELD);
registerAggregation(MaxAggregationBuilder::new, new MaxParser(), MaxAggregationBuilder.AGGREGATION_NAME_FIELD); registerAggregation(MaxAggregationBuilder::new, new MaxParser(), MaxAggregationBuilder.AGGREGATION_NAME_FIELD);
@ -527,7 +549,6 @@ public class SearchModule extends AbstractModule {
BucketSelectorPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); BucketSelectorPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
registerPipelineAggregation(SerialDiffPipelineAggregationBuilder::new, SerialDiffPipelineAggregationBuilder::parse, registerPipelineAggregation(SerialDiffPipelineAggregationBuilder::new, SerialDiffPipelineAggregationBuilder::parse,
SerialDiffPipelineAggregationBuilder.AGGREGATION_NAME_FIELD); SerialDiffPipelineAggregationBuilder.AGGREGATION_NAME_FIELD);
bind(AggregatorParsers.class).toInstance(aggregatorParsers);
} }
protected void configureSearch() { protected void configureSearch() {
@ -679,7 +700,6 @@ public class SearchModule extends AbstractModule {
static { static {
// calcs // calcs
InternalAvg.registerStreams();
InternalSum.registerStreams(); InternalSum.registerStreams();
InternalMin.registerStreams(); InternalMin.registerStreams();
InternalMax.registerStreams(); InternalMax.registerStreams();

View File

@ -19,10 +19,10 @@
package org.elasticsearch.search.aggregations; package org.elasticsearch.search.aggregations;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.ParseField; import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
@ -43,8 +43,8 @@ import java.util.Map;
/** /**
* An internal implementation of {@link Aggregation}. Serves as a base class for all aggregation implementations. * An internal implementation of {@link Aggregation}. Serves as a base class for all aggregation implementations.
*/ */
public abstract class InternalAggregation implements Aggregation, ToXContent, Streamable { public abstract class InternalAggregation implements Aggregation, ToXContent, Streamable, NamedWriteable {
// NORELEASE remove Streamable
/** /**
* The aggregation type that holds all the string types that are associated with an aggregation: * The aggregation type that holds all the string types that are associated with an aggregation:
@ -139,15 +139,84 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
this.metaData = metaData; this.metaData = metaData;
} }
/**
* Read from a stream.
*/
protected InternalAggregation(StreamInput in) throws IOException {
name = in.readString();
metaData = in.readMap();
int size = in.readVInt();
if (size == 0) {
pipelineAggregators = Collections.emptyList();
} else {
pipelineAggregators = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
BytesReference type = in.readBytesReference();
PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in);
pipelineAggregators.add(pipelineAggregator);
}
}
}
@Override
public final void readFrom(StreamInput in) throws IOException {
try {
getWriteableName(); // Throws UnsupportedOperationException if this aggregation should be read using old style Streams
assert false : "Used reading constructor instead";
} catch (UnsupportedOperationException e) {
// OK
}
name = in.readString();
metaData = in.readMap();
int size = in.readVInt();
if (size == 0) {
pipelineAggregators = Collections.emptyList();
} else {
pipelineAggregators = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
BytesReference type = in.readBytesReference();
PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in);
pipelineAggregators.add(pipelineAggregator);
}
}
doReadFrom(in);
}
protected void doReadFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("Use reading constructor instead"); // NORELEASE remove when we remove Streamable
}
@Override
public final void writeTo(StreamOutput out) throws IOException {
out.writeString(name); // NORELEASE remote writing the name - it is automatically handled with writeNamedWriteable
out.writeGenericValue(metaData);
out.writeVInt(pipelineAggregators.size());
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
out.writeBytesReference(pipelineAggregator.type().stream());
pipelineAggregator.writeTo(out);
}
doWriteTo(out);
}
protected abstract void doWriteTo(StreamOutput out) throws IOException;
@Override @Override
public String getName() { public String getName() {
return name; return name;
} }
@Override
public String getWriteableName() {
// NORELEASE remove me when all InternalAggregations override it
throw new UnsupportedOperationException("Override on every class");
}
/** /**
* @return The {@link Type} of this aggregation * @return The {@link Type} of this aggregation
*/ */
public abstract Type type(); public Type type() {
throw new UnsupportedOperationException("Use getWriteableName instead"); // NORELEASE remove me
}
/** /**
* Reduces the given addAggregation to a single one and returns it. In <b>most</b> cases, the assumption will be the all given * Reduces the given addAggregation to a single one and returns it. In <b>most</b> cases, the assumption will be the all given
@ -214,40 +283,6 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
public abstract XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException; public abstract XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException;
@Override
public final void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeGenericValue(metaData);
out.writeVInt(pipelineAggregators.size());
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
out.writeBytesReference(pipelineAggregator.type().stream());
pipelineAggregator.writeTo(out);
}
doWriteTo(out);
}
protected abstract void doWriteTo(StreamOutput out) throws IOException;
@Override
public final void readFrom(StreamInput in) throws IOException {
name = in.readString();
metaData = in.readMap();
int size = in.readVInt();
if (size == 0) {
pipelineAggregators = Collections.emptyList();
} else {
pipelineAggregators = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
BytesReference type = in.readBytesReference();
PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in);
pipelineAggregators.add(pipelineAggregator);
}
}
doReadFrom(in);
}
protected abstract void doReadFrom(StreamInput in) throws IOException;
/** /**
* Common xcontent fields that are shared among addAggregation * Common xcontent fields that are shared among addAggregation
*/ */

View File

@ -205,9 +205,14 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
} else { } else {
aggregations = new ArrayList<>(size); aggregations = new ArrayList<>(size);
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
BytesReference type = in.readBytesReference(); // NORELEASE temporary hack to support old style streams and new style NamedWriteable at the same time
InternalAggregation aggregation = AggregationStreams.stream(type).readResult(in); if (in.readBoolean()) {
aggregations.add(aggregation); aggregations.add(in.readNamedWriteable(InternalAggregation.class));
} else {
BytesReference type = in.readBytesReference();
InternalAggregation aggregation = AggregationStreams.stream(type).readResult(in);
aggregations.add(aggregation);
}
} }
} }
} }
@ -217,8 +222,16 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
out.writeVInt(aggregations.size()); out.writeVInt(aggregations.size());
for (Aggregation aggregation : aggregations) { for (Aggregation aggregation : aggregations) {
InternalAggregation internal = (InternalAggregation) aggregation; InternalAggregation internal = (InternalAggregation) aggregation;
out.writeBytesReference(internal.type().stream()); // NORELEASE Temporary hack to support old style streams and new style NamedWriteable at the same time
internal.writeTo(out); try {
internal.getWriteableName(); // Throws UnsupportedOperationException if we should use old style streams.
out.writeBoolean(true);
out.writeNamedWriteable(internal);
} catch (UnsupportedOperationException e) {
out.writeBoolean(false);
out.writeBytesReference(internal.type().stream());
internal.writeTo(out);
}
} }
} }

View File

@ -19,9 +19,11 @@
package org.elasticsearch.search.aggregations.metrics; package org.elasticsearch.search.aggregations.metrics;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -32,4 +34,12 @@ public abstract class InternalMetricsAggregation extends InternalAggregation {
protected InternalMetricsAggregation(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) { protected InternalMetricsAggregation(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData); super(name, pipelineAggregators, metaData);
} }
/**
* Read from a stream.
*/
protected InternalMetricsAggregation(StreamInput in) throws IOException {
super(in);
}
} }

View File

@ -18,9 +18,11 @@
*/ */
package org.elasticsearch.search.aggregations.metrics; package org.elasticsearch.search.aggregations.metrics;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -41,6 +43,13 @@ public abstract class InternalNumericMetricsAggregation extends InternalMetricsA
super(name, pipelineAggregators, metaData); super(name, pipelineAggregators, metaData);
} }
/**
* Read from a stream.
*/
protected SingleValue(StreamInput in) throws IOException {
super(in);
}
@Override @Override
public String getValueAsString() { public String getValueAsString() {
return format.format(value()); return format.format(value());
@ -67,6 +76,13 @@ public abstract class InternalNumericMetricsAggregation extends InternalMetricsA
super(name, pipelineAggregators, metaData); super(name, pipelineAggregators, metaData);
} }
/**
* Read from a stream.
*/
protected MultiValue(StreamInput in) throws IOException {
super(in);
}
public abstract double value(String name); public abstract double value(String name);
public String valueAsString(String name) { public String valueAsString(String name) {
@ -91,4 +107,10 @@ public abstract class InternalNumericMetricsAggregation extends InternalMetricsA
super(name, pipelineAggregators, metaData); super(name, pipelineAggregators, metaData);
} }
/**
* Read from a stream.
*/
protected InternalNumericMetricsAggregation(StreamInput in) throws IOException {
super(in);
}
} }

View File

@ -24,6 +24,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.aggregations.AggregatorFactories.Builder; import org.elasticsearch.search.aggregations.AggregatorFactories.Builder;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.AggregatorFactory; import org.elasticsearch.search.aggregations.AggregatorFactory;
import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.ValueType; import org.elasticsearch.search.aggregations.support.ValueType;
@ -36,18 +37,19 @@ import org.elasticsearch.search.aggregations.support.ValuesSourceType;
import java.io.IOException; import java.io.IOException;
public class AvgAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource.Numeric, AvgAggregationBuilder> { public class AvgAggregationBuilder extends ValuesSourceAggregationBuilder.LeafOnly<ValuesSource.Numeric, AvgAggregationBuilder> {
public static final String NAME = InternalAvg.TYPE.name(); public static final String NAME = "avg";
private final static Type TYPE = new Type(NAME);
public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME); public static final ParseField AGGREGATION_NAME_FIELD = new ParseField(NAME);
public AvgAggregationBuilder(String name) { public AvgAggregationBuilder(String name) {
super(name, InternalAvg.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC); super(name, TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
} }
/** /**
* Read from a stream. * Read from a stream.
*/ */
public AvgAggregationBuilder(StreamInput in) throws IOException { public AvgAggregationBuilder(StreamInput in) throws IOException {
super(in, InternalAvg.TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC); super(in, TYPE, ValuesSourceType.NUMERIC, ValueType.NUMERIC);
} }
@Override @Override

View File

@ -22,7 +22,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat; import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation; import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation; import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator; import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@ -31,30 +30,9 @@ import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
/**
*
*/
public class InternalAvg extends InternalNumericMetricsAggregation.SingleValue implements Avg { public class InternalAvg extends InternalNumericMetricsAggregation.SingleValue implements Avg {
private final double sum;
public final static Type TYPE = new Type("avg"); private final long count;
public final static AggregationStreams.Stream STREAM = new AggregationStreams.Stream() {
@Override
public InternalAvg readResult(StreamInput in) throws IOException {
InternalAvg result = new InternalAvg();
result.readFrom(in);
return result;
}
};
public static void registerStreams() {
AggregationStreams.registerStream(STREAM, TYPE.stream());
}
private double sum;
private long count;
InternalAvg() {} // for serialization
public InternalAvg(String name, double sum, long count, DocValueFormat format, List<PipelineAggregator> pipelineAggregators, public InternalAvg(String name, double sum, long count, DocValueFormat format, List<PipelineAggregator> pipelineAggregators,
Map<String, Object> metaData) { Map<String, Object> metaData) {
@ -64,6 +42,23 @@ public class InternalAvg extends InternalNumericMetricsAggregation.SingleValue i
this.format = format; this.format = format;
} }
/**
* Read from a stream.
*/
public InternalAvg(StreamInput in) throws IOException {
super(in);
format = in.readNamedWriteable(DocValueFormat.class);
sum = in.readDouble();
count = in.readVLong();
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
out.writeDouble(sum);
out.writeVLong(count);
}
@Override @Override
public double value() { public double value() {
return getValue(); return getValue();
@ -75,8 +70,8 @@ public class InternalAvg extends InternalNumericMetricsAggregation.SingleValue i
} }
@Override @Override
public Type type() { public String getWriteableName() {
return TYPE; return AvgAggregationBuilder.NAME;
} }
@Override @Override
@ -90,20 +85,6 @@ public class InternalAvg extends InternalNumericMetricsAggregation.SingleValue i
return new InternalAvg(getName(), sum, count, format, pipelineAggregators(), getMetaData()); return new InternalAvg(getName(), sum, count, format, pipelineAggregators(), getMetaData());
} }
@Override
protected void doReadFrom(StreamInput in) throws IOException {
format = in.readNamedWriteable(DocValueFormat.class);
sum = in.readDouble();
count = in.readVLong();
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(format);
out.writeDouble(sum);
out.writeVLong(count);
}
@Override @Override
public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException { public XContentBuilder doXContentBody(XContentBuilder builder, Params params) throws IOException {
builder.field(CommonFields.VALUE, count != 0 ? getValue() : null); builder.field(CommonFields.VALUE, count != 0 ? getValue() : null);

View File

@ -67,7 +67,7 @@ public class InnerHitBuilderTests extends ESTestCase {
@BeforeClass @BeforeClass
public static void init() { public static void init() {
namedWriteableRegistry = new NamedWriteableRegistry(); namedWriteableRegistry = new NamedWriteableRegistry();
indicesQueriesRegistry = new SearchModule(Settings.EMPTY, namedWriteableRegistry).getQueryParserRegistry(); indicesQueriesRegistry = new SearchModule(Settings.EMPTY, namedWriteableRegistry, false).getQueryParserRegistry();
} }
@AfterClass @AfterClass

View File

@ -47,7 +47,7 @@ import static org.hamcrest.Matchers.notNullValue;
public class SearchModuleTests extends ModuleTestCase { public class SearchModuleTests extends ModuleTestCase {
public void testDoubleRegister() { public void testDoubleRegister() {
SearchModule module = new SearchModule(Settings.EMPTY, new NamedWriteableRegistry()); SearchModule module = new SearchModule(Settings.EMPTY, new NamedWriteableRegistry(), false);
try { try {
module.registerHighlighter("fvh", new PlainHighlighter()); module.registerHighlighter("fvh", new PlainHighlighter());
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
@ -62,7 +62,7 @@ public class SearchModuleTests extends ModuleTestCase {
} }
public void testRegisterSuggester() { public void testRegisterSuggester() {
SearchModule module = new SearchModule(Settings.EMPTY, new NamedWriteableRegistry()); SearchModule module = new SearchModule(Settings.EMPTY, new NamedWriteableRegistry(), false);
module.registerSuggester("custom", CustomSuggester.INSTANCE); module.registerSuggester("custom", CustomSuggester.INSTANCE);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> module.registerSuggester("custom", CustomSuggester.INSTANCE)); () -> module.registerSuggester("custom", CustomSuggester.INSTANCE));
@ -70,7 +70,7 @@ public class SearchModuleTests extends ModuleTestCase {
} }
public void testRegisterHighlighter() { public void testRegisterHighlighter() {
SearchModule module = new SearchModule(Settings.EMPTY, new NamedWriteableRegistry()); SearchModule module = new SearchModule(Settings.EMPTY, new NamedWriteableRegistry(), false);
CustomHighlighter customHighlighter = new CustomHighlighter(); CustomHighlighter customHighlighter = new CustomHighlighter();
module.registerHighlighter("custom", customHighlighter); module.registerHighlighter("custom", customHighlighter);
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, IllegalArgumentException exception = expectThrows(IllegalArgumentException.class,
@ -88,14 +88,14 @@ public class SearchModuleTests extends ModuleTestCase {
} }
public void testRegisterQueryParserDuplicate() { public void testRegisterQueryParserDuplicate() {
SearchModule module = new SearchModule(Settings.EMPTY, new NamedWriteableRegistry()); SearchModule module = new SearchModule(Settings.EMPTY, new NamedWriteableRegistry(), false);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> module IllegalArgumentException e = expectThrows(IllegalArgumentException.class, () -> module
.registerQuery(TermQueryBuilder::new, TermQueryBuilder::fromXContent, TermQueryBuilder.QUERY_NAME_FIELD)); .registerQuery(TermQueryBuilder::new, TermQueryBuilder::fromXContent, TermQueryBuilder.QUERY_NAME_FIELD));
assertThat(e.getMessage(), containsString("] already registered for [query][term] while trying to register [org.elasticsearch.")); assertThat(e.getMessage(), containsString("] already registered for [query][term] while trying to register [org.elasticsearch."));
} }
public void testRegisteredQueries() throws IOException { public void testRegisteredQueries() throws IOException {
SearchModule module = new SearchModule(Settings.EMPTY, new NamedWriteableRegistry()); SearchModule module = new SearchModule(Settings.EMPTY, new NamedWriteableRegistry(), false);
List<String> allSupportedQueries = new ArrayList<>(); List<String> allSupportedQueries = new ArrayList<>();
Collections.addAll(allSupportedQueries, NON_DEPRECATED_QUERIES); Collections.addAll(allSupportedQueries, NON_DEPRECATED_QUERIES);
Collections.addAll(allSupportedQueries, DEPRECATED_QUERIES); Collections.addAll(allSupportedQueries, DEPRECATED_QUERIES);

View File

@ -119,7 +119,7 @@ public class AggregatorParsingTests extends ESTestCase {
protected void configure() { protected void configure() {
bindMapperExtension(); bindMapperExtension();
} }
}, new SearchModule(settings, namedWriteableRegistry) { }, new SearchModule(settings, namedWriteableRegistry, false) {
@Override @Override
protected void configureSearch() { protected void configureSearch() {
// Skip me // Skip me

View File

@ -151,7 +151,7 @@ public abstract class BaseAggregationTestCase<AB extends AbstractAggregationBuil
bindMapperExtension(); bindMapperExtension();
} }
}, },
new SearchModule(settings, namedWriteableRegistry) { new SearchModule(settings, namedWriteableRegistry, false) {
@Override @Override
protected void configureSearch() { protected void configureSearch() {
// Skip me // Skip me

View File

@ -106,7 +106,7 @@ public class SignificanceHeuristicTests extends ESTestCase {
ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray()); ByteArrayInputStream inBuffer = new ByteArrayInputStream(outBuffer.toByteArray());
StreamInput in = new InputStreamStreamInput(inBuffer); StreamInput in = new InputStreamStreamInput(inBuffer);
NamedWriteableRegistry registry = new NamedWriteableRegistry(); NamedWriteableRegistry registry = new NamedWriteableRegistry();
new SearchModule(Settings.EMPTY, registry); // populates the registry through side effects new SearchModule(Settings.EMPTY, registry, false); // populates the registry through side effects
in = new NamedWriteableAwareStreamInput(in, registry); in = new NamedWriteableAwareStreamInput(in, registry);
in.setVersion(version); in.setVersion(version);
sigTerms[1].readFrom(in); sigTerms[1].readFrom(in);
@ -202,7 +202,7 @@ public class SignificanceHeuristicTests extends ESTestCase {
// 1. The output of the builders can actually be parsed // 1. The output of the builders can actually be parsed
// 2. The parser does not swallow parameters after a significance heuristic was defined // 2. The parser does not swallow parameters after a significance heuristic was defined
public void testBuilderAndParser() throws Exception { public void testBuilderAndParser() throws Exception {
SearchModule searchModule = new SearchModule(Settings.EMPTY, new NamedWriteableRegistry()); SearchModule searchModule = new SearchModule(Settings.EMPTY, new NamedWriteableRegistry(), false);
ParseFieldRegistry<SignificanceHeuristicParser> heuristicParserMapper = searchModule.getSignificanceHeuristicParserRegistry(); ParseFieldRegistry<SignificanceHeuristicParser> heuristicParserMapper = searchModule.getSignificanceHeuristicParserRegistry();
SearchContext searchContext = new SignificantTermsTestSearchContext(); SearchContext searchContext = new SignificantTermsTestSearchContext();

View File

@ -145,7 +145,7 @@ public class SearchSourceBuilderTests extends ESTestCase {
bindMapperExtension(); bindMapperExtension();
} }
}, },
new SearchModule(settings, namedWriteableRegistry) { new SearchModule(settings, namedWriteableRegistry, false) {
@Override @Override
protected void configureSearch() { protected void configureSearch() {
// Skip me // Skip me

View File

@ -83,7 +83,7 @@ public class HighlightBuilderTests extends ESTestCase {
@BeforeClass @BeforeClass
public static void init() { public static void init() {
namedWriteableRegistry = new NamedWriteableRegistry(); namedWriteableRegistry = new NamedWriteableRegistry();
indicesQueriesRegistry = new SearchModule(Settings.EMPTY, namedWriteableRegistry).getQueryParserRegistry(); indicesQueriesRegistry = new SearchModule(Settings.EMPTY, namedWriteableRegistry, false).getQueryParserRegistry();
} }
@AfterClass @AfterClass

View File

@ -70,7 +70,7 @@ public class QueryRescoreBuilderTests extends ESTestCase {
@BeforeClass @BeforeClass
public static void init() { public static void init() {
namedWriteableRegistry = new NamedWriteableRegistry(); namedWriteableRegistry = new NamedWriteableRegistry();
indicesQueriesRegistry = new SearchModule(Settings.EMPTY, namedWriteableRegistry).getQueryParserRegistry(); indicesQueriesRegistry = new SearchModule(Settings.EMPTY, namedWriteableRegistry, false).getQueryParserRegistry();
} }
@AfterClass @AfterClass

View File

@ -106,7 +106,7 @@ public abstract class AbstractSortTestCase<T extends SortBuilder<T>> extends EST
}; };
namedWriteableRegistry = new NamedWriteableRegistry(); namedWriteableRegistry = new NamedWriteableRegistry();
indicesQueriesRegistry = new SearchModule(Settings.EMPTY, namedWriteableRegistry).getQueryParserRegistry(); indicesQueriesRegistry = new SearchModule(Settings.EMPTY, namedWriteableRegistry, false).getQueryParserRegistry();
} }
@AfterClass @AfterClass

View File

@ -52,7 +52,7 @@ public class SortBuilderTests extends ESTestCase {
@BeforeClass @BeforeClass
public static void init() { public static void init() {
namedWriteableRegistry = new NamedWriteableRegistry(); namedWriteableRegistry = new NamedWriteableRegistry();
indicesQueriesRegistry = new SearchModule(Settings.EMPTY, namedWriteableRegistry).getQueryParserRegistry(); indicesQueriesRegistry = new SearchModule(Settings.EMPTY, namedWriteableRegistry, false).getQueryParserRegistry();
} }
@AfterClass @AfterClass

View File

@ -56,7 +56,7 @@ public abstract class AbstractSuggestionBuilderTestCase<SB extends SuggestionBui
@BeforeClass @BeforeClass
public static void init() throws IOException { public static void init() throws IOException {
namedWriteableRegistry = new NamedWriteableRegistry(); namedWriteableRegistry = new NamedWriteableRegistry();
SearchModule searchModule = new SearchModule(Settings.EMPTY, namedWriteableRegistry); SearchModule searchModule = new SearchModule(Settings.EMPTY, namedWriteableRegistry, false);
queriesRegistry = searchModule.getQueryParserRegistry(); queriesRegistry = searchModule.getQueryParserRegistry();
suggesters = searchModule.getSuggesters(); suggesters = searchModule.getSuggesters();
parseFieldMatcher = ParseFieldMatcher.STRICT; parseFieldMatcher = ParseFieldMatcher.STRICT;

View File

@ -118,7 +118,7 @@ public class TemplateQueryParserTests extends ESTestCase {
b.bind(CircuitBreakerService.class).to(NoneCircuitBreakerService.class); b.bind(CircuitBreakerService.class).to(NoneCircuitBreakerService.class);
}, },
settingsModule, settingsModule,
new SearchModule(settings, new NamedWriteableRegistry()) { new SearchModule(settings, new NamedWriteableRegistry(), false) {
@Override @Override
protected void configureSearch() { protected void configureSearch() {
// skip so we don't need transport // skip so we don't need transport

View File

@ -880,7 +880,7 @@ public abstract class AbstractQueryTestCase<QB extends AbstractQueryBuilder<QB>>
scriptSettings.addAll(pluginsService.getPluginSettings()); scriptSettings.addAll(pluginsService.getPluginSettings());
scriptSettings.add(InternalSettingsPlugin.VERSION_CREATED); scriptSettings.add(InternalSettingsPlugin.VERSION_CREATED);
SettingsModule settingsModule = new SettingsModule(settings, scriptSettings, pluginsService.getPluginSettingsFilter()); SettingsModule settingsModule = new SettingsModule(settings, scriptSettings, pluginsService.getPluginSettingsFilter());
searchModule = new SearchModule(settings, namedWriteableRegistry) { searchModule = new SearchModule(settings, namedWriteableRegistry, false) {
@Override @Override
protected void configureSearch() { protected void configureSearch() {
// Skip me // Skip me

View File

@ -631,7 +631,7 @@ public class ElasticsearchAssertions {
registry = ESIntegTestCase.internalCluster().getInstance(NamedWriteableRegistry.class); registry = ESIntegTestCase.internalCluster().getInstance(NamedWriteableRegistry.class);
} else { } else {
registry = new NamedWriteableRegistry(); registry = new NamedWriteableRegistry();
new SearchModule(Settings.EMPTY, registry); new SearchModule(Settings.EMPTY, registry, false);
} }
assertVersionSerializable(version, streamable, registry); assertVersionSerializable(version, streamable, registry);
} }