Try to save memory on aggregations (backport of #53793) (#53996)

This delays deserializing the aggregation response try until *right*
before we merge the objects.
This commit is contained in:
Nik Everett 2020-03-23 15:45:22 -04:00 committed by GitHub
parent 80c24a0d62
commit 181bc807be
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 440 additions and 65 deletions

View File

@ -19,8 +19,17 @@
package org.elasticsearch.action.search;
import com.carrotsearch.hppc.IntArrayList;
import com.carrotsearch.hppc.ObjectObjectHashMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.CollectionStatistics;
@ -58,16 +67,8 @@ import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.Suggest.Suggestion;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import com.carrotsearch.hppc.IntArrayList;
import com.carrotsearch.hppc.ObjectObjectHashMap;
public final class SearchPhaseController {
private static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0];
@ -429,7 +430,7 @@ public final class SearchPhaseController {
* @see QuerySearchResult#consumeProfileResult()
*/
private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> queryResults,
List<InternalAggregations> bufferedAggs, List<TopDocs> bufferedTopDocs,
List<Supplier<InternalAggregations>> bufferedAggs, List<TopDocs> bufferedTopDocs,
TopDocsStats topDocsStats, int numReducePhases, boolean isScrollRequest,
InternalAggregation.ReduceContextBuilder aggReduceContextBuilder,
boolean performFinalReduce) {
@ -453,7 +454,7 @@ public final class SearchPhaseController {
final boolean hasSuggest = firstResult.suggest() != null;
final boolean hasProfileResults = firstResult.hasProfileResults();
final boolean consumeAggs;
final List<InternalAggregations> aggregationsList;
final List<Supplier<InternalAggregations>> aggregationsList;
if (bufferedAggs != null) {
consumeAggs = false;
// we already have results from intermediate reduces and just need to perform the final reduce
@ -492,7 +493,7 @@ public final class SearchPhaseController {
}
}
if (consumeAggs) {
aggregationsList.add((InternalAggregations) result.consumeAggs());
aggregationsList.add(result.consumeAggs());
}
if (hasProfileResults) {
String key = result.getSearchShardTarget().toString();
@ -508,8 +509,7 @@ public final class SearchPhaseController {
reducedSuggest = new Suggest(Suggest.reduce(groupedSuggestions));
reducedCompletionSuggestions = reducedSuggest.filter(CompletionSuggestion.class);
}
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : InternalAggregations.topLevelReduce(aggregationsList,
performFinalReduce ? aggReduceContextBuilder.forFinalReduction() : aggReduceContextBuilder.forPartialReduction());
final InternalAggregations aggregations = reduceAggs(aggReduceContextBuilder, performFinalReduce, aggregationsList);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size,
reducedCompletionSuggestions);
@ -519,6 +519,24 @@ public final class SearchPhaseController {
firstResult.sortValueFormats(), numReducePhases, size, from, false);
}
private InternalAggregations reduceAggs(
InternalAggregation.ReduceContextBuilder aggReduceContextBuilder,
boolean performFinalReduce,
List<Supplier<InternalAggregations>> aggregationsList
) {
/*
* Parse the aggregations, clearing the list as we go so bits backing
* the DelayedWriteable can be collected immediately.
*/
List<InternalAggregations> toReduce = new ArrayList<>(aggregationsList.size());
for (int i = 0; i < aggregationsList.size(); i++) {
toReduce.add(aggregationsList.get(i).get());
aggregationsList.set(i, null);
}
return aggregationsList.isEmpty() ? null : InternalAggregations.topLevelReduce(toReduce,
performFinalReduce ? aggReduceContextBuilder.forFinalReduction() : aggReduceContextBuilder.forPartialReduction());
}
/*
* Returns the size of the requested top documents (from + size)
*/
@ -600,7 +618,7 @@ public final class SearchPhaseController {
*/
static final class QueryPhaseResultConsumer extends ArraySearchPhaseResults<SearchPhaseResult> {
private final SearchShardTarget[] processedShards;
private final InternalAggregations[] aggsBuffer;
private final Supplier<InternalAggregations>[] aggsBuffer;
private final TopDocs[] topDocsBuffer;
private final boolean hasAggs;
private final boolean hasTopDocs;
@ -642,7 +660,9 @@ public final class SearchPhaseController {
this.progressListener = progressListener;
this.processedShards = new SearchShardTarget[expectedResultSize];
// no need to buffer anything if we have less expected results. in this case we don't consume any results ahead of time.
this.aggsBuffer = new InternalAggregations[hasAggs ? bufferSize : 0];
@SuppressWarnings("unchecked")
Supplier<InternalAggregations>[] aggsBuffer = new Supplier[hasAggs ? bufferSize : 0];
this.aggsBuffer = aggsBuffer;
this.topDocsBuffer = new TopDocs[hasTopDocs ? bufferSize : 0];
this.hasTopDocs = hasTopDocs;
this.hasAggs = hasAggs;
@ -665,10 +685,14 @@ public final class SearchPhaseController {
if (querySearchResult.isNull() == false) {
if (index == bufferSize) {
if (hasAggs) {
ReduceContext reduceContext = aggReduceContextBuilder.forPartialReduction();
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(Arrays.asList(aggsBuffer), reduceContext);
Arrays.fill(aggsBuffer, null);
aggsBuffer[0] = reducedAggs;
List<InternalAggregations> aggs = new ArrayList<>(aggsBuffer.length);
for (int i = 0; i < aggsBuffer.length; i++) {
aggs.add(aggsBuffer[i].get());
aggsBuffer[i] = null; // null the buffer so it can be GCed now.
}
InternalAggregations reducedAggs = InternalAggregations.topLevelReduce(
aggs, aggReduceContextBuilder.forPartialReduction());
aggsBuffer[0] = () -> reducedAggs;
}
if (hasTopDocs) {
TopDocs reducedTopDocs = mergeTopDocs(Arrays.asList(topDocsBuffer),
@ -681,12 +705,12 @@ public final class SearchPhaseController {
index = 1;
if (hasAggs || hasTopDocs) {
progressListener.notifyPartialReduce(SearchProgressListener.buildSearchShards(processedShards),
topDocsStats.getTotalHits(), hasAggs ? aggsBuffer[0] : null, numReducePhases);
topDocsStats.getTotalHits(), hasAggs ? aggsBuffer[0].get() : null, numReducePhases);
}
}
final int i = index++;
if (hasAggs) {
aggsBuffer[i] = (InternalAggregations) querySearchResult.consumeAggs();
aggsBuffer[i] = querySearchResult.consumeAggs();
}
if (hasTopDocs) {
final TopDocsAndMaxScore topDocs = querySearchResult.consumeTopDocs(); // can't be null
@ -698,7 +722,7 @@ public final class SearchPhaseController {
processedShards[querySearchResult.getShardIndex()] = querySearchResult.getSearchShardTarget();
}
private synchronized List<InternalAggregations> getRemainingAggs() {
private synchronized List<Supplier<InternalAggregations>> getRemainingAggs() {
return hasAggs ? Arrays.asList(aggsBuffer).subList(0, index) : null;
}

View File

@ -0,0 +1,131 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import java.io.IOException;
import java.util.function.Supplier;
/**
* A holder for {@link Writeable}s that can delays reading the underlying
* {@linkplain Writeable} when it is read from a remote node.
*/
public abstract class DelayableWriteable<T extends Writeable> implements Supplier<T>, Writeable {
/**
* Build a {@linkplain DelayableWriteable} that wraps an existing object
* but is serialized so that deserializing it can be delayed.
*/
public static <T extends Writeable> DelayableWriteable<T> referencing(T reference) {
return new Referencing<>(reference);
}
/**
* Build a {@linkplain DelayableWriteable} that copies a buffer from
* the provided {@linkplain StreamInput} and deserializes the buffer
* when {@link Supplier#get()} is called.
*/
public static <T extends Writeable> DelayableWriteable<T> delayed(Writeable.Reader<T> reader, StreamInput in) throws IOException {
return new Delayed<>(reader, in);
}
private DelayableWriteable() {}
public abstract boolean isDelayed();
private static class Referencing<T extends Writeable> extends DelayableWriteable<T> {
private T reference;
Referencing(T reference) {
this.reference = reference;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
try (BytesStreamOutput buffer = new BytesStreamOutput()) {
reference.writeTo(buffer);
out.writeBytesReference(buffer.bytes());
}
}
@Override
public T get() {
return reference;
}
@Override
public boolean isDelayed() {
return false;
}
}
private static class Delayed<T extends Writeable> extends DelayableWriteable<T> {
private final Writeable.Reader<T> reader;
private final Version remoteVersion;
private final BytesReference serialized;
private final NamedWriteableRegistry registry;
Delayed(Writeable.Reader<T> reader, StreamInput in) throws IOException {
this.reader = reader;
remoteVersion = in.getVersion();
serialized = in.readBytesReference();
registry = in.namedWriteableRegistry();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion() == remoteVersion) {
/*
* If the version *does* line up we can just copy the bytes
* which is good because this is how shard request caching
* works.
*/
out.writeBytesReference(serialized);
} else {
/*
* If the version doesn't line up then we have to deserialize
* into the Writeable and re-serialize it against the new
* output stream so it can apply any backwards compatibility
* differences in the wire protocol. This ain't efficient but
* it should be quite rare.
*/
referencing(get()).writeTo(out);
}
}
@Override
public T get() {
try {
try (StreamInput in = registry == null ?
serialized.streamInput() : new NamedWriteableAwareStreamInput(serialized.streamInput(), registry)) {
in.setVersion(remoteVersion);
return reader.read(in);
}
} catch (IOException e) {
throw new RuntimeException("unexpected error expanding aggregations", e);
}
}
@Override
public boolean isDelayed() {
return true;
}
}
}

View File

@ -94,4 +94,9 @@ public abstract class FilterStreamInput extends StreamInput {
protected void ensureCanReadBytes(int length) throws EOFException {
delegate.ensureCanReadBytes(length);
}
@Override
public NamedWriteableRegistry namedWriteableRegistry() {
return delegate.namedWriteableRegistry();
}
}

View File

@ -52,4 +52,9 @@ public class NamedWriteableAwareStreamInput extends FilterStreamInput {
+ "] than it was read from [" + name + "].";
return c;
}
@Override
public NamedWriteableRegistry namedWriteableRegistry() {
return namedWriteableRegistry;
}
}

View File

@ -1097,6 +1097,14 @@ public abstract class StreamInput extends InputStream {
return null;
}
/**
* Get the registry of named writeables is his stream has one,
* {@code null} otherwise.
*/
public NamedWriteableRegistry namedWriteableRegistry() {
return null;
}
/**
* Reads a {@link NamedWriteable} from the current stream, by first reading its name and then looking for
* the corresponding entry in the registry by name, so that the proper object can be read and returned.

View File

@ -19,16 +19,24 @@
package org.elasticsearch.search.query;
import static java.util.Collections.emptyList;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.common.lucene.Lucene.readTopDocs;
import static org.elasticsearch.common.lucene.Lucene.writeTopDocs;
import java.io.IOException;
import java.util.List;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.DelayableWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
@ -37,14 +45,6 @@ import org.elasticsearch.search.internal.SearchContextId;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.suggest.Suggest;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import static org.elasticsearch.common.lucene.Lucene.readTopDocs;
import static org.elasticsearch.common.lucene.Lucene.writeTopDocs;
public final class QuerySearchResult extends SearchPhaseResult {
private int from;
@ -54,7 +54,14 @@ public final class QuerySearchResult extends SearchPhaseResult {
private TotalHits totalHits;
private float maxScore = Float.NaN;
private DocValueFormat[] sortValueFormats;
private InternalAggregations aggregations;
/**
* Aggregation results. We wrap them in
* {@linkplain DelayableWriteable} because
* {@link InternalAggregation} is usually made up of many small objects
* which have a fairly high overhead in the JVM. So we delay deserializing
* them until just before we need them.
*/
private DelayableWriteable<InternalAggregations> aggregations;
private boolean hasAggs;
private Suggest suggest;
private boolean searchTimedOut;
@ -196,21 +203,21 @@ public final class QuerySearchResult extends SearchPhaseResult {
* Returns and nulls out the aggregation for this search results. This allows to free up memory once the aggregation is consumed.
* @throws IllegalStateException if the aggregations have already been consumed.
*/
public Aggregations consumeAggs() {
public DelayableWriteable<InternalAggregations> consumeAggs() {
if (aggregations == null) {
throw new IllegalStateException("aggs already consumed");
}
Aggregations aggs = aggregations;
DelayableWriteable<InternalAggregations> aggs = aggregations;
aggregations = null;
return aggs;
}
public void aggregations(InternalAggregations aggregations) {
this.aggregations = aggregations;
this.aggregations = aggregations == null ? null : DelayableWriteable.referencing(aggregations);
hasAggs = aggregations != null;
}
public InternalAggregations aggregations() {
public DelayableWriteable<InternalAggregations> aggregations() {
return aggregations;
}
@ -313,19 +320,32 @@ public final class QuerySearchResult extends SearchPhaseResult {
}
}
setTopDocs(readTopDocs(in));
if (hasAggs = in.readBoolean()) {
aggregations = new InternalAggregations(in);
}
if (in.getVersion().before(Version.V_7_2_0)) {
List<SiblingPipelineAggregator> pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream()
.map(a -> (SiblingPipelineAggregator) a).collect(Collectors.toList());
if (hasAggs && pipelineAggregators.isEmpty() == false) {
List<InternalAggregation> internalAggs = aggregations.asList().stream()
.map(agg -> (InternalAggregation) agg).collect(Collectors.toList());
//Earlier versions serialize sibling pipeline aggs separately as they used to be set to QuerySearchResult directly, while
//later versions include them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of
//InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1.
this.aggregations = new InternalAggregations(internalAggs, pipelineAggregators);
if (in.getVersion().before(Version.V_7_7_0)) {
InternalAggregations readAggs = null;
if (hasAggs = in.readBoolean()) {
readAggs = new InternalAggregations(in);
}
if (in.getVersion().before(Version.V_7_2_0)) {
List<SiblingPipelineAggregator> pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream()
.map(a -> (SiblingPipelineAggregator) a).collect(toList());
if (hasAggs && pipelineAggregators.isEmpty() == false) {
List<InternalAggregation> internalAggs = readAggs.copyResults();
/*
* Earlier versions serialize sibling pipeline aggs
* separately as they used to be set to QuerySearchResult
* directly, while later versions include them in
* InternalAggregations. Note that despite serializing
* sibling pipeline aggs as part of nternalAggregations is
* supported since 6.7.0, the shards set sibling pipeline
* aggs to InternalAggregations only from 7.1.
*/
readAggs = new InternalAggregations(internalAggs, pipelineAggregators);
}
}
aggregations = DelayableWriteable.referencing(readAggs);
} else {
if (hasAggs = in.readBoolean()) {
aggregations = DelayableWriteable.delayed(InternalAggregations::new, in);
}
}
if (in.readBoolean()) {
@ -369,18 +389,37 @@ public final class QuerySearchResult extends SearchPhaseResult {
writeTopDocs(out, topDocsAndMaxScore);
if (aggregations == null) {
out.writeBoolean(false);
if (out.getVersion().before(Version.V_7_2_0)) {
/*
* Earlier versions expect sibling pipeline aggs separately
* as they used to be set to QuerySearchResult directly, while
* later versions expect them in InternalAggregations. Note
* that despite serializing sibling pipeline aggs as part of
* InternalAggregations is supported since 6.7.0, the shards
* set sibling pipeline aggs to InternalAggregations only from
* 7.1 on.
*/
out.writeNamedWriteableList(emptyList());
}
} else {
out.writeBoolean(true);
aggregations.writeTo(out);
}
if (out.getVersion().before(Version.V_7_2_0)) {
//Earlier versions expect sibling pipeline aggs separately as they used to be set to QuerySearchResult directly,
//while later versions expect them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of
//InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1 on.
if (aggregations == null) {
out.writeNamedWriteableList(Collections.emptyList());
if (out.getVersion().before(Version.V_7_7_0)) {
InternalAggregations aggs = aggregations.get();
aggs.writeTo(out);
if (out.getVersion().before(Version.V_7_2_0)) {
/*
* Earlier versions expect sibling pipeline aggs separately
* as they used to be set to QuerySearchResult directly, while
* later versions expect them in InternalAggregations. Note
* that despite serializing sibling pipeline aggs as part of
* InternalAggregations is supported since 6.7.0, the shards
* set sibling pipeline aggs to InternalAggregations only from
* 7.1 on.
*/
out.writeNamedWriteableList(aggs.getTopLevelPipelineAggregators());
}
} else {
out.writeNamedWriteableList(aggregations.getTopLevelPipelineAggregators());
aggregations.writeTo(out);
}
}
if (suggest == null) {

View File

@ -0,0 +1,163 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.io.stream;
import org.elasticsearch.Version;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import java.io.IOException;
import static java.util.Collections.singletonList;
import static org.hamcrest.Matchers.equalTo;
public class DelayableWriteableTests extends ESTestCase {
// NOTE: we don't use AbstractWireSerializingTestCase because we don't implement equals and hashCode.
public static class Example implements NamedWriteable {
private final String s;
public Example(String s) {
this.s = s;
}
public Example(StreamInput in) throws IOException {
s = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(s);
}
@Override
public String getWriteableName() {
return "example";
}
@Override
public boolean equals(Object obj) {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
Example other = (Example) obj;
return s.equals(other.s);
}
@Override
public int hashCode() {
return s.hashCode();
}
}
public static class NamedHolder implements Writeable {
private final Example e;
public NamedHolder(Example e) {
this.e = e;
}
public NamedHolder(StreamInput in) throws IOException {
e = in.readNamedWriteable(Example.class);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeNamedWriteable(e);
}
@Override
public boolean equals(Object obj) {
if (obj == null || getClass() != obj.getClass()) {
return false;
}
NamedHolder other = (NamedHolder) obj;
return e.equals(other.e);
}
@Override
public int hashCode() {
return e.hashCode();
}
}
public void testRoundTripFromReferencing() throws IOException {
Example e = new Example(randomAlphaOfLength(5));
DelayableWriteable<Example> original = DelayableWriteable.referencing(e);
assertFalse(original.isDelayed());
roundTripTestCase(original, Example::new);
}
public void testRoundTripFromReferencingWithNamedWriteable() throws IOException {
NamedHolder n = new NamedHolder(new Example(randomAlphaOfLength(5)));
DelayableWriteable<NamedHolder> original = DelayableWriteable.referencing(n);
assertFalse(original.isDelayed());
roundTripTestCase(original, NamedHolder::new);
}
public void testRoundTripFromDelayed() throws IOException {
Example e = new Example(randomAlphaOfLength(5));
DelayableWriteable<Example> original = roundTrip(DelayableWriteable.referencing(e), Example::new, Version.CURRENT);
assertTrue(original.isDelayed());
roundTripTestCase(original, Example::new);
}
public void testRoundTripFromDelayedWithNamedWriteable() throws IOException {
NamedHolder n = new NamedHolder(new Example(randomAlphaOfLength(5)));
DelayableWriteable<NamedHolder> original = roundTrip(DelayableWriteable.referencing(n), NamedHolder::new, Version.CURRENT);
assertTrue(original.isDelayed());
roundTripTestCase(original, NamedHolder::new);
}
public void testRoundTripFromDelayedFromOldVersion() throws IOException {
Example e = new Example(randomAlphaOfLength(5));
DelayableWriteable<Example> original = roundTrip(DelayableWriteable.referencing(e), Example::new, randomOldVersion());
assertTrue(original.isDelayed());
roundTripTestCase(original, Example::new);
}
public void testRoundTripFromDelayedFromOldVersionWithNamedWriteable() throws IOException {
NamedHolder n = new NamedHolder(new Example(randomAlphaOfLength(5)));
DelayableWriteable<NamedHolder> original = roundTrip(DelayableWriteable.referencing(n), NamedHolder::new, randomOldVersion());
assertTrue(original.isDelayed());
roundTripTestCase(original, NamedHolder::new);
}
private <T extends Writeable> void roundTripTestCase(DelayableWriteable<T> original, Writeable.Reader<T> reader) throws IOException {
DelayableWriteable<T> roundTripped = roundTrip(original, reader, Version.CURRENT);
assertTrue(roundTripped.isDelayed());
assertThat(roundTripped.get(), equalTo(original.get()));
}
private <T extends Writeable> DelayableWriteable<T> roundTrip(DelayableWriteable<T> original,
Writeable.Reader<T> reader, Version version) throws IOException {
return copyInstance(original, writableRegistry(), (out, d) -> d.writeTo(out),
in -> DelayableWriteable.delayed(reader, in), version);
}
@Override
protected NamedWriteableRegistry writableRegistry() {
return new NamedWriteableRegistry(singletonList(
new NamedWriteableRegistry.Entry(Example.class, "example", Example::new)));
}
private static Version randomOldVersion() {
return randomValueOtherThanMany(Version.CURRENT::before, () -> VersionUtils.randomCompatibleVersion(random(), Version.CURRENT));
}
}

View File

@ -89,8 +89,8 @@ public class QuerySearchResultTests extends ESTestCase {
assertEquals(querySearchResult.size(), deserialized.size());
assertEquals(querySearchResult.hasAggs(), deserialized.hasAggs());
if (deserialized.hasAggs()) {
Aggregations aggs = querySearchResult.consumeAggs();
Aggregations deserializedAggs = deserialized.consumeAggs();
Aggregations aggs = querySearchResult.consumeAggs().get();
Aggregations deserializedAggs = deserialized.consumeAggs().get();
assertEquals(aggs.asList(), deserializedAggs.asList());
List<SiblingPipelineAggregator> pipelineAggs = ((InternalAggregations) aggs).getTopLevelPipelineAggregators();
List<SiblingPipelineAggregator> deserializedPipelineAggs =
@ -126,7 +126,7 @@ public class QuerySearchResultTests extends ESTestCase {
QuerySearchResult querySearchResult = new QuerySearchResult(in);
assertEquals(100, querySearchResult.getContextId().getId());
assertTrue(querySearchResult.hasAggs());
InternalAggregations aggs = (InternalAggregations) querySearchResult.consumeAggs();
InternalAggregations aggs = querySearchResult.consumeAggs().get();
assertEquals(1, aggs.asList().size());
//top-level pipeline aggs are retrieved as part of InternalAggregations although they were serialized separately
assertEquals(1, aggs.getTopLevelPipelineAggregators().size());