Remove AggregationStreams and friends

* Remove outdated aggregation registration method
* Remove AggregationStreams
* Adds StreamInput#readNamedWriteableList and
StreamOutput#writeNamedWriteableList convenience methods. We strive to
make the reading and writing from the streams terse so they are easier
to scan visually.
* Remove PipelineAggregatorStreams
* Remove stream info from InternalAggreation.Type
* Remove InternalAggregation#type
* Remove Streamable from PipelineAggregator
* Remove Streamable from MultiBucketsAggregation.Bucket
This commit is contained in:
Nik Everett 2016-07-19 14:50:49 -04:00
parent a4f09d2b81
commit fc4b439635
24 changed files with 68 additions and 427 deletions

View File

@ -836,6 +836,18 @@ public abstract class StreamInput extends InputStream {
return builder;
}
/**
* Reads a list of {@link NamedWriteable}s.
*/
public <T extends NamedWriteable> List<T> readNamedWriteableList(Class<T> categoryClass) throws IOException {
int count = readVInt();
List<T> builder = new ArrayList<>(count);
for (int i=0; i<count; i++) {
builder.add(readNamedWriteable(categoryClass));
}
return builder;
}
public static StreamInput wrap(byte[] bytes) {
return wrap(bytes, 0, bytes.length);
}

View File

@ -835,4 +835,14 @@ public abstract class StreamOutput extends OutputStream {
obj.writeTo(this);
}
}
/**
* Writes a list of {@link NamedWriteable} objects.
*/
public void writeNamedWriteableList(List<? extends NamedWriteable> list) throws IOException {
writeVInt(list.size());
for (NamedWriteable obj: list) {
writeNamedWriteable(obj);
}
}
}

View File

@ -486,14 +486,6 @@ public class SearchModule extends AbstractModule {
}
}
public void registerPipelineAggregation(Writeable.Reader<? extends PipelineAggregationBuilder> reader,
PipelineAggregator.Parser aggregationParser, ParseField aggregationName) {
// NORELEASE remove me in favor of the above method
pipelineAggregationParserRegistry.register(aggregationParser, aggregationName);
namedWriteableRegistry.register(PipelineAggregationBuilder.class, aggregationName.getPreferredName(), reader);
}
@Override
protected void configure() {
if (false == transportClient) {

View File

@ -154,8 +154,8 @@ public class AggregationPhase implements SearchPhase {
siblingPipelineAggregators.add((SiblingPipelineAggregator) pipelineAggregator);
} else {
throw new AggregationExecutionException("Invalid pipeline aggregation named [" + pipelineAggregator.name()
+ "] of type [" + pipelineAggregator.type().name()
+ "]. Only sibling pipeline aggregations are allowed at the top level");
+ "] of type [" + pipelineAggregator.getWriteableName() + "]. Only sibling pipeline aggregations are "
+ "allowed at the top level");
}
}
context.queryResult().pipelineAggregators(siblingPipelineAggregators);

View File

@ -1,69 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
/**
* A registry for all the dedicated streams in the aggregation module. This is to support dynamic addAggregation that
* know how to stream themselves.
*/
public class AggregationStreams {
private static Map<BytesReference, Stream> streams = emptyMap();
/**
* A stream that knows how to read an aggregation from the input.
*/
public interface Stream {
InternalAggregation readResult(StreamInput in) throws IOException;
}
/**
* Registers the given stream and associate it with the given types.
*
* @param stream The streams to register
* @param types The types associated with the streams
*/
public static synchronized void registerStream(Stream stream, BytesReference... types) {
Map<BytesReference, Stream> newStreams = new HashMap<>(streams);
for (BytesReference type : types) {
newStreams.put(type, stream);
}
streams = unmodifiableMap(newStreams);
}
/**
* Returns the stream that is registered for the given type
*
* @param type The given type
* @return The associated stream
*/
public static Stream stream(BytesReference type) {
return streams.get(type);
}
}

View File

@ -20,55 +20,35 @@ package org.elasticsearch.search.aggregations;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
import org.elasticsearch.search.aggregations.support.AggregationPath;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* An internal implementation of {@link Aggregation}. Serves as a base class for all aggregation implementations.
*/
public abstract class InternalAggregation implements Aggregation, ToXContent, Streamable, NamedWriteable {
// NORELEASE remove Streamable
public abstract class InternalAggregation implements Aggregation, ToXContent, NamedWriteable {
/**
* The aggregation type that holds all the string types that are associated with an aggregation:
* <ul>
* <li>name - used as the parser type</li>
* <li>stream - used as the stream type</li>
* </ul>
*/
public static class Type {
private String name;
private BytesReference stream;
private final String name;
public Type(String name) {
this(name, new BytesArray(name));
}
public Type(String name, String stream) {
this(name, new BytesArray(stream));
}
public Type(String name, BytesReference stream) {
this.name = name;
this.stream = stream;
}
/**
@ -79,14 +59,6 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
return name;
}
/**
* @return The name of the stream type (used for registering the aggregation stream
* (see {@link AggregationStreams#registerStream(AggregationStreams.Stream, BytesReference...)}).
*/
public BytesReference stream() {
return stream;
}
@Override
public String toString() {
return name;
@ -118,15 +90,11 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
}
}
protected final String name;
protected String name;
protected final Map<String, Object> metaData;
protected Map<String, Object> metaData;
private List<PipelineAggregator> pipelineAggregators;
/** Constructs an un initialized addAggregation (used for serialization) **/
protected InternalAggregation() {} // NORELEASE remove when removing Streamable
private final List<PipelineAggregator> pipelineAggregators;
/**
* Constructs an get with a given name.
@ -145,96 +113,25 @@ public abstract class InternalAggregation implements Aggregation, ToXContent, St
protected InternalAggregation(StreamInput in) throws IOException {
name = in.readString();
metaData = in.readMap();
int size = in.readVInt();
if (size == 0) {
pipelineAggregators = Collections.emptyList();
} else {
pipelineAggregators = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
if (in.readBoolean()) {
pipelineAggregators.add(in.readNamedWriteable(PipelineAggregator.class));
} else {
BytesReference type = in.readBytesReference();
PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in);
pipelineAggregators.add(pipelineAggregator);
}
}
}
}
@Override
public final void readFrom(StreamInput in) throws IOException {
try {
getWriteableName(); // Throws UnsupportedOperationException if this aggregation should be read using old style Streams
assert false : "Used reading constructor instead";
} catch (UnsupportedOperationException e) {
// OK
}
name = in.readString();
metaData = in.readMap();
int size = in.readVInt();
if (size == 0) {
pipelineAggregators = Collections.emptyList();
} else {
pipelineAggregators = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
if (in.readBoolean()) {
pipelineAggregators.add(in.readNamedWriteable(PipelineAggregator.class));
} else {
BytesReference type = in.readBytesReference();
PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in);
pipelineAggregators.add(pipelineAggregator);
}
}
}
doReadFrom(in);
}
protected void doReadFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("Use reading constructor instead"); // NORELEASE remove when we remove Streamable
pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class);
}
@Override
public final void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeGenericValue(metaData);
out.writeVInt(pipelineAggregators.size());
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
// NORELEASE temporary hack to support old style streams and new style NamedWriteable
try {
pipelineAggregator.getWriteableName(); // Throws UnsupportedOperationException if we should use old style streams.
out.writeBoolean(true);
out.writeNamedWriteable(pipelineAggregator);
} catch (UnsupportedOperationException e) {
out.writeBoolean(false);
out.writeBytesReference(pipelineAggregator.type().stream());
pipelineAggregator.writeTo(out);
}
}
out.writeNamedWriteableList(pipelineAggregators);
doWriteTo(out);
}
protected abstract void doWriteTo(StreamOutput out) throws IOException;
@Override
public String getWriteableName() {
// NORELEASE remove me when all InternalAggregations override it
throw new UnsupportedOperationException("Override on every class");
}
@Override
public String getName() {
return name;
}
/**
* @return The {@link Type} of this aggregation
*/
public Type type() {
// NORELEASE remove this method
throw new UnsupportedOperationException(getClass().getName() + " used type but should Use getWriteableName instead");
}
/**
* Reduces the given aggregations to a single one and returns it. In <b>most</b> cases, the assumption will be the all given
* aggregations are of the same type (the same type as this aggregation). For best efficiency, when implementing,

View File

@ -18,7 +18,6 @@
*/
package org.elasticsearch.search.aggregations;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -198,41 +197,15 @@ public class InternalAggregations implements Aggregations, ToXContent, Streamabl
@Override
public void readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
if (size == 0) {
aggregations = Collections.emptyList();
aggregations = in.readList(stream -> in.readNamedWriteable(InternalAggregation.class));
if (aggregations.isEmpty()) {
aggregationsAsMap = emptyMap();
} else {
aggregations = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
// NORELEASE temporary hack to support old style streams and new style NamedWriteable at the same time
if (in.readBoolean()) {
aggregations.add(in.readNamedWriteable(InternalAggregation.class));
} else {
BytesReference type = in.readBytesReference();
InternalAggregation aggregation = AggregationStreams.stream(type).readResult(in);
aggregations.add(aggregation);
}
}
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(aggregations.size());
for (Aggregation aggregation : aggregations) {
InternalAggregation internal = (InternalAggregation) aggregation;
// NORELEASE Temporary hack to support old style streams and new style NamedWriteable at the same time
try {
internal.getWriteableName(); // Throws UnsupportedOperationException if we should use old style streams.
out.writeBoolean(true);
out.writeNamedWriteable(internal);
} catch (UnsupportedOperationException e) {
out.writeBoolean(false);
out.writeBytesReference(internal.type().stream());
internal.writeTo(out);
}
}
out.writeNamedWriteableList(aggregations);
}
}

View File

@ -29,10 +29,6 @@ import java.util.Map;
public abstract class InternalMultiBucketAggregation<A extends InternalMultiBucketAggregation, B extends InternalMultiBucketAggregation.InternalBucket>
extends InternalAggregation implements MultiBucketsAggregation {
public InternalMultiBucketAggregation() {
}
public InternalMultiBucketAggregation(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
}

View File

@ -39,8 +39,6 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio
private long docCount;
private InternalAggregations aggregations;
protected InternalSingleBucketAggregation() {} // for serialization
/**
* Creates a single bucket aggregation.
*
@ -63,12 +61,6 @@ public abstract class InternalSingleBucketAggregation extends InternalAggregatio
aggregations = InternalAggregations.readAggregations(in);
}
@Override
protected void doReadFrom(StreamInput in) throws IOException {
docCount = in.readVLong();
aggregations = InternalAggregations.readAggregations(in);
}
@Override
protected void doWriteTo(StreamOutput out) throws IOException {
out.writeVLong(docCount);

View File

@ -40,9 +40,7 @@ public interface MultiBucketsAggregation extends Aggregation {
* A bucket represents a criteria to which all documents that fall in it adhere to. It is also uniquely identified
* by a key, and can potentially hold sub-aggregations computed over all documents in it.
*/
public interface Bucket extends HasAggregations, ToXContent, Streamable, Writeable {
// NORELEASE remove Streamable
public interface Bucket extends HasAggregations, ToXContent, Writeable {
/**
* @return The key associated with the bucket
*/
@ -66,12 +64,6 @@ public interface MultiBucketsAggregation extends Aggregation {
Object getProperty(String containingAggName, List<String> path);
@Override
default void readFrom(StreamInput in) throws IOException {
// NORELEASE remove me when no Buckets override it
throw new UnsupportedOperationException("Prefer the Writeable interface");
}
static class SubAggregationComparator<B extends Bucket> implements java.util.Comparator<B> {
private final AggregationPath path;

View File

@ -35,7 +35,7 @@ import java.io.IOException;
public class InternalDateHistogram {
public static final Factory HISTOGRAM_FACTORY = new Factory();
static final Type TYPE = new Type("date_histogram", "dhisto");
static final Type TYPE = new Type("date_histogram");
static class Bucket extends InternalHistogram.Bucket {
Bucket(long key, long docCount, InternalAggregations aggregations, boolean keyed, DocValueFormat formatter,

View File

@ -302,11 +302,6 @@ public class InternalHistogram<B extends InternalHistogram.Bucket> extends Inter
return HistogramAggregationBuilder.NAME;
}
@Override
public Type type() {
return TYPE;
}
@Override
public List<B> getBuckets() {
return buckets;

View File

@ -172,14 +172,8 @@ public class InternalRange<B extends InternalRange.Bucket, R extends InternalRan
return sb.toString();
}
@Override
public void readFrom(StreamInput in) throws IOException {
}
@Override
public void writeTo(StreamOutput out) throws IOException {
}
}

View File

@ -241,11 +241,6 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
throw new UnsupportedOperationException();
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException();
}
@Override
protected void writeTermTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException();

View File

@ -196,13 +196,9 @@ public abstract class InternalTerms<A extends InternalTerms<A, B>, B extends Int
!terms.getClass().equals(UnmappedTerms.class)) {
// control gets into this loop when the same field name against which the query is executed
// is of different types in different indices.
throw new AggregationExecutionException("Merging/Reducing the aggregations failed " +
"when computing the aggregation [ Name: " +
referenceTerms.getName() + ", Type: " +
referenceTerms.type() + " ]" + " because: " +
"the field you gave in the aggregation query " +
"existed as two different types " +
"in two different indices");
throw new AggregationExecutionException("Merging/Reducing the aggregations failed when computing the aggregation ["
+ referenceTerms.getName() + "] because the field you gave in the aggregation query existed as two different "
+ "types in two different indices");
}
otherDocCount += terms.getSumOfOtherDocCounts();
final long thisAggDocCountError;

View File

@ -28,9 +28,6 @@ import java.util.List;
import java.util.Map;
public abstract class InternalMetricsAggregation extends InternalAggregation {
protected InternalMetricsAggregation() {} // NORELEASE remove when we remove streamable
protected InternalMetricsAggregation(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
}

View File

@ -36,9 +36,6 @@ public abstract class InternalNumericMetricsAggregation extends InternalMetricsA
protected DocValueFormat format = DEFAULT_FORMAT;
public abstract static class SingleValue extends InternalNumericMetricsAggregation implements NumericMetricsAggregation.SingleValue {
protected SingleValue() {} // NORELEASE remove when we remove Streamable
protected SingleValue(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
}
@ -69,9 +66,6 @@ public abstract class InternalNumericMetricsAggregation extends InternalMetricsA
}
public abstract static class MultiValue extends InternalNumericMetricsAggregation implements NumericMetricsAggregation.MultiValue {
protected MultiValue() {}
protected MultiValue(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
}
@ -101,8 +95,6 @@ public abstract class InternalNumericMetricsAggregation extends InternalMetricsA
}
}
private InternalNumericMetricsAggregation() {} // NORELEASE remove when we remove Streamable
private InternalNumericMetricsAggregation(String name, List<PipelineAggregator> pipelineAggregators, Map<String, Object> metaData) {
super(name, pipelineAggregators, metaData);
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationStreams;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.metrics.InternalNumericMetricsAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

View File

@ -24,19 +24,15 @@ import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregation.ReduceContext;
import org.elasticsearch.search.aggregations.InternalAggregation.Type;
import org.elasticsearch.search.aggregations.PipelineAggregationBuilder;
import java.io.IOException;
import java.util.Map;
public abstract class PipelineAggregator implements Streamable, NamedWriteable {
// NORELEASE remove Streamable
public abstract class PipelineAggregator implements NamedWriteable {
/**
* Parse the {@link PipelineAggregationBuilder} from a {@link QueryParseContext}.
*/
@ -66,9 +62,6 @@ public abstract class PipelineAggregator implements Streamable, NamedWriteable {
private String[] bucketsPaths;
private Map<String, Object> metaData;
protected PipelineAggregator() { // for Serialisation
}
protected PipelineAggregator(String name, String[] bucketsPaths, Map<String, Object> metaData) {
this.name = name;
this.bucketsPaths = bucketsPaths;
@ -84,27 +77,9 @@ public abstract class PipelineAggregator implements Streamable, NamedWriteable {
metaData = in.readMap();
}
@Override
public final void readFrom(StreamInput in) throws IOException {
try {
getWriteableName(); // Throws UnsupportedOperationException if this aggregation should be read using old style Streams
assert false : "Used reading constructor instead";
} catch (UnsupportedOperationException e) {
// OK
}
name = in.readString();
bucketsPaths = in.readStringArray();
metaData = in.readMap();
doReadFrom(in);
}
protected void doReadFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("Use reading constructor instead"); // NORELEASE remove when we remove Streamable
}
@Override
public final void writeTo(StreamOutput out) throws IOException {
out.writeString(name); // NORELEASE remote writing the name - it is automatically handled with writeNamedWriteable
out.writeString(name);
out.writeStringArray(bucketsPaths);
out.writeMap(metaData);
doWriteTo(out);
@ -112,12 +87,6 @@ public abstract class PipelineAggregator implements Streamable, NamedWriteable {
protected abstract void doWriteTo(StreamOutput out) throws IOException;
@Override
public String getWriteableName() {
// NORELEASE remove me when all InternalAggregations override it
throw new UnsupportedOperationException("Override on every class");
}
public String name() {
return name;
}
@ -130,10 +99,5 @@ public abstract class PipelineAggregator implements Streamable, NamedWriteable {
return metaData;
}
public Type type() {
// NORELEASE remove this method
throw new UnsupportedOperationException(getClass().getName() + " used type but should Use getWriteableName instead");
}
public abstract InternalAggregation reduce(InternalAggregation aggregation, ReduceContext reduceContext);
}

View File

@ -1,69 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.search.aggregations.pipeline;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
/**
* A registry for all the dedicated streams in the aggregation module. This is to support dynamic addAggregation that
* know how to stream themselves.
*/
public class PipelineAggregatorStreams {
private static Map<BytesReference, Stream> streams = emptyMap();
/**
* A stream that knows how to read an aggregation from the input.
*/
public interface Stream {
PipelineAggregator readResult(StreamInput in) throws IOException;
}
/**
* Registers the given stream and associate it with the given types.
*
* @param stream The streams to register
* @param types The types associated with the streams
*/
public static synchronized void registerStream(Stream stream, BytesReference... types) {
Map<BytesReference, Stream> newStreams = new HashMap<>(streams);
for (BytesReference type : types) {
newStreams.put(type, stream);
}
streams = unmodifiableMap(newStreams);
}
/**
* Returns the stream that is registered for the given type
*
* @param type The given type
* @return The associated stream
*/
public static Stream stream(BytesReference type) {
return streams.get(type);
}
}

View File

@ -36,11 +36,6 @@ import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
public abstract class SiblingPipelineAggregator extends PipelineAggregator {
protected SiblingPipelineAggregator() { // NOCOMMIT remove me
super();
}
protected SiblingPipelineAggregator(String name, String[] bucketsPaths, Map<String, Object> metaData) {
super(name, bucketsPaths, metaData);
}
@ -83,7 +78,7 @@ public abstract class SiblingPipelineAggregator extends PipelineAggregator {
return singleBucketAgg.create(new InternalAggregations(aggs));
} else {
throw new IllegalStateException("Aggregation [" + aggregation.getName() + "] must be a bucket aggregation ["
+ aggregation.type().name() + "]");
+ aggregation.getWriteableName() + "]");
}
}

View File

@ -23,7 +23,6 @@ import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.search.DocValueFormat;
@ -31,15 +30,15 @@ import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregatorStreams;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.suggest.Suggest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
import static org.elasticsearch.common.lucene.Lucene.readTopDocs;
import static org.elasticsearch.common.lucene.Lucene.writeTopDocs;
@ -220,21 +219,8 @@ public class QuerySearchResult extends QuerySearchResultProvider {
if (in.readBoolean()) {
aggregations = InternalAggregations.readAggregations(in);
}
if (in.readBoolean()) {
int size = in.readVInt();
List<SiblingPipelineAggregator> pipelineAggregators = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
// NORELEASE temporary hack to support old style streams and new style NamedWriteable at the same time
if (in.readBoolean()) {
pipelineAggregators.add((SiblingPipelineAggregator) in.readNamedWriteable(PipelineAggregator.class));
} else {
BytesReference type = in.readBytesReference();
PipelineAggregator pipelineAggregator = PipelineAggregatorStreams.stream(type).readResult(in);
pipelineAggregators.add((SiblingPipelineAggregator) pipelineAggregator);
}
}
this.pipelineAggregators = pipelineAggregators;
}
pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream().map(a -> (SiblingPipelineAggregator) a)
.collect(Collectors.toList());
if (in.readBoolean()) {
suggest = Suggest.readSuggest(in);
}
@ -272,24 +258,7 @@ public class QuerySearchResult extends QuerySearchResultProvider {
out.writeBoolean(true);
aggregations.writeTo(out);
}
if (pipelineAggregators == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeVInt(pipelineAggregators.size());
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
// NORELEASE temporary hack to support old style streams and new style NamedWriteable
try {
pipelineAggregator.getWriteableName(); // Throws UnsupportedOperationException if we should use old style streams.
out.writeBoolean(true);
out.writeNamedWriteable(pipelineAggregator);
} catch (UnsupportedOperationException e) {
out.writeBoolean(false);
out.writeBytesReference(pipelineAggregator.type().stream());
pipelineAggregator.writeTo(out);
}
}
}
out.writeNamedWriteableList(pipelineAggregators == null ? emptyList() : pipelineAggregators);
if (suggest == null) {
out.writeBoolean(false);
} else {

View File

@ -337,8 +337,26 @@ public class BytesStreamsTests extends ESTestCase {
StreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(bytes), namedWriteableRegistry);
assertEquals(in.available(), bytes.length);
BaseNamedWriteable namedWriteableOut = in.readNamedWriteable(BaseNamedWriteable.class);
assertEquals(namedWriteableOut, namedWriteableIn);
assertEquals(in.available(), 0);
assertEquals(namedWriteableIn, namedWriteableOut);
assertEquals(0, in.available());
}
public void testNamedWriteableList() throws IOException {
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();
namedWriteableRegistry.register(BaseNamedWriteable.class, TestNamedWriteable.NAME, TestNamedWriteable::new);
int size = between(0, 100);
List<BaseNamedWriteable> expected = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
expected.add(new TestNamedWriteable(randomAsciiOfLengthBetween(1, 10), randomAsciiOfLengthBetween(1, 10)));
}
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeNamedWriteableList(expected);
try (StreamInput in = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), namedWriteableRegistry)) {
assertEquals(expected, in.readNamedWriteableList(BaseNamedWriteable.class));
assertEquals(0, in.available());
}
}
}
public void testNamedWriteableDuplicates() throws IOException {

View File

@ -54,6 +54,7 @@ public class InternalMatrixStats extends InternalMetricsAggregation implements M
* Read from a stream.
*/
public InternalMatrixStats(StreamInput in) throws IOException {
super(in);
stats = in.readOptionalWriteable(RunningStats::new);
results = in.readOptionalWriteable(MatrixStatsResults::new);
}