From fc6422ffccfeba2b4ef422d15239b26388c723d6 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Thu, 30 Apr 2020 16:28:47 +0200 Subject: [PATCH] Consolidate DelayableWriteable (#55932) This commit includes a number of minor improvements around `DelayableWriteable`: javadocs were expanded and reworded, `get` was renamed to `expand` and `DelayableWriteable` no longer implements `Supplier`. Also a couple of methods are now private instead of package private. --- .../action/search/SearchPhaseController.java | 17 +++---- .../common/io/stream/DelayableWriteable.java | 50 +++++++++++++------ .../common/io/stream/StreamInput.java | 2 +- .../search/query/QuerySearchResult.java | 2 +- .../io/stream/DelayableWriteableTests.java | 4 +- .../search/query/QuerySearchResultTests.java | 6 +-- .../xpack/search/AsyncSearchTask.java | 16 +++--- .../xpack/search/MutableSearchResponse.java | 14 +++--- 8 files changed, 63 insertions(+), 48 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java index d5b65f887b5..b6f68d8e4ee 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchPhaseController.java @@ -71,7 +71,6 @@ import java.util.List; import java.util.Map; import java.util.function.Function; import java.util.function.IntFunction; -import java.util.function.Supplier; import java.util.stream.Collectors; public final class SearchPhaseController { @@ -437,7 +436,7 @@ public final class SearchPhaseController { * @see QuerySearchResult#consumeProfileResult() */ private ReducedQueryPhase reducedQueryPhase(Collection queryResults, - List> bufferedAggs, + List> bufferedAggs, List bufferedTopDocs, TopDocsStats topDocsStats, int numReducePhases, boolean isScrollRequest, InternalAggregation.ReduceContextBuilder aggReduceContextBuilder, @@ -462,7 +461,7 @@ public final class SearchPhaseController { final boolean hasSuggest = firstResult.suggest() != null; final boolean hasProfileResults = firstResult.hasProfileResults(); final boolean consumeAggs; - final List> aggregationsList; + final List> aggregationsList; if (bufferedAggs != null) { consumeAggs = false; // we already have results from intermediate reduces and just need to perform the final reduce @@ -527,10 +526,10 @@ public final class SearchPhaseController { firstResult.sortValueFormats(), numReducePhases, size, from, false); } - private InternalAggregations reduceAggs( + private static InternalAggregations reduceAggs( InternalAggregation.ReduceContextBuilder aggReduceContextBuilder, boolean performFinalReduce, - List> aggregationsList + List> aggregationsList ) { /* * Parse the aggregations, clearing the list as we go so bits backing @@ -538,7 +537,7 @@ public final class SearchPhaseController { */ List toReduce = new ArrayList<>(aggregationsList.size()); for (int i = 0; i < aggregationsList.size(); i++) { - toReduce.add(aggregationsList.get(i).get()); + toReduce.add(aggregationsList.get(i).expand()); aggregationsList.set(i, null); } return aggregationsList.isEmpty() ? null : InternalAggregations.topLevelReduce(toReduce, @@ -701,7 +700,7 @@ public final class SearchPhaseController { if (hasAggs) { List aggs = new ArrayList<>(aggsBuffer.length); for (int i = 0; i < aggsBuffer.length; i++) { - aggs.add(aggsBuffer[i].get()); + aggs.add(aggsBuffer[i].expand()); aggsBuffer[i] = null; // null the buffer so it can be GCed now. } InternalAggregations reduced = @@ -743,8 +742,8 @@ public final class SearchPhaseController { processedShards[querySearchResult.getShardIndex()] = querySearchResult.getSearchShardTarget(); } - private synchronized List> getRemainingAggs() { - return hasAggs ? Arrays.asList((Supplier[]) aggsBuffer).subList(0, index) : null; + private synchronized List> getRemainingAggs() { + return hasAggs ? Arrays.asList((DelayableWriteable[]) aggsBuffer).subList(0, index) : null; } private synchronized List getRemainingTopDocs() { diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java index 201c7edead6..1e58f12985c 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/DelayableWriteable.java @@ -25,13 +25,26 @@ import org.elasticsearch.Version; import org.elasticsearch.common.bytes.BytesReference; import java.io.IOException; -import java.util.function.Supplier; /** - * A holder for {@link Writeable}s that can delays reading the underlying - * {@linkplain Writeable} when it is read from a remote node. + * A holder for {@link Writeable}s that delays reading the underlying object + * on the receiving end. To be used for objects whose deserialized + * representation is inefficient to keep in memory compared to their + * corresponding serialized representation. + * The node that produces the {@link Writeable} calls {@link #referencing(Writeable)} + * to create a {@link DelayableWriteable} that serializes the inner object + * first to a buffer and writes the content of the buffer to the {@link StreamOutput}. + * The receiver node calls {@link #delayed(Reader, StreamInput)} to create a + * {@link DelayableWriteable} that reads the buffer from the @link {@link StreamInput} + * but delays creating the actual object by calling {@link #expand()} when needed. + * Multiple {@link DelayableWriteable}s coming from different nodes may be buffered + * on the receiver end, which may hold a mix of {@link DelayableWriteable}s that were + * produced locally (hence expanded) as well as received form another node (hence subject + * to delayed expansion). When such objects are buffered for some time it may be desirable + * to force their buffering in serialized format by calling + * {@link #asSerialized(Reader, NamedWriteableRegistry)}. */ -public abstract class DelayableWriteable implements Supplier, Writeable { +public abstract class DelayableWriteable implements Writeable { /** * Build a {@linkplain DelayableWriteable} that wraps an existing object * but is serialized so that deserializing it can be delayed. @@ -42,7 +55,7 @@ public abstract class DelayableWriteable implements Supplie /** * Build a {@linkplain DelayableWriteable} that copies a buffer from * the provided {@linkplain StreamInput} and deserializes the buffer - * when {@link Supplier#get()} is called. + * when {@link #expand()} is called. */ public static DelayableWriteable delayed(Writeable.Reader reader, StreamInput in) throws IOException { return new Serialized<>(reader, in.getVersion(), in.namedWriteableRegistry(), in.readBytesReference()); @@ -56,6 +69,11 @@ public abstract class DelayableWriteable implements Supplie */ public abstract Serialized asSerialized(Writeable.Reader reader, NamedWriteableRegistry registry); + /** + * Expands the inner {@link Writeable} to its original representation and returns it + */ + public abstract T expand(); + /** * {@code true} if the {@linkplain Writeable} is being stored in * serialized form, {@code false} otherwise. @@ -63,9 +81,9 @@ public abstract class DelayableWriteable implements Supplie abstract boolean isSerialized(); private static class Referencing extends DelayableWriteable { - private T reference; + private final T reference; - Referencing(T reference) { + private Referencing(T reference) { this.reference = reference; } @@ -75,17 +93,19 @@ public abstract class DelayableWriteable implements Supplie } @Override - public T get() { + public T expand() { return reference; } @Override public Serialized asSerialized(Reader reader, NamedWriteableRegistry registry) { + BytesStreamOutput buffer; try { - return new Serialized(reader, Version.CURRENT, registry, writeToBuffer(Version.CURRENT).bytes()); + buffer = writeToBuffer(Version.CURRENT); } catch (IOException e) { - throw new RuntimeException("unexpected error expanding aggregations", e); + throw new RuntimeException("unexpected error writing writeable to buffer", e); } + return new Serialized<>(reader, Version.CURRENT, registry, buffer.bytes()); } @Override @@ -111,8 +131,8 @@ public abstract class DelayableWriteable implements Supplie private final NamedWriteableRegistry registry; private final BytesReference serialized; - Serialized(Writeable.Reader reader, Version serializedAtVersion, - NamedWriteableRegistry registry, BytesReference serialized) throws IOException { + private Serialized(Writeable.Reader reader, Version serializedAtVersion, + NamedWriteableRegistry registry, BytesReference serialized) { this.reader = reader; this.serializedAtVersion = serializedAtVersion; this.registry = registry; @@ -136,12 +156,12 @@ public abstract class DelayableWriteable implements Supplie * differences in the wire protocol. This ain't efficient but * it should be quite rare. */ - referencing(get()).writeTo(out); + referencing(expand()).writeTo(out); } } @Override - public T get() { + public T expand() { try { try (StreamInput in = registry == null ? serialized.streamInput() : new NamedWriteableAwareStreamInput(serialized.streamInput(), registry)) { @@ -149,7 +169,7 @@ public abstract class DelayableWriteable implements Supplie return reader.read(in); } } catch (IOException e) { - throw new RuntimeException("unexpected error expanding aggregations", e); + throw new RuntimeException("unexpected error expanding serialized delayed writeable", e); } } diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java index e88188fe8c2..a52a0ec9377 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/StreamInput.java @@ -1103,7 +1103,7 @@ public abstract class StreamInput extends InputStream { } /** - * Get the registry of named writeables is his stream has one, + * Get the registry of named writeables if this stream has one, * {@code null} otherwise. */ public NamedWriteableRegistry namedWriteableRegistry() { diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 8b7ebdb6907..e1d1d702aad 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -386,7 +386,7 @@ public final class QuerySearchResult extends SearchPhaseResult { } else { out.writeBoolean(true); if (out.getVersion().before(Version.V_7_7_0)) { - InternalAggregations aggs = aggregations.get(); + InternalAggregations aggs = aggregations.expand(); aggs.writeTo(out); if (out.getVersion().before(Version.V_7_2_0)) { /* diff --git a/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java b/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java index ed5a7b8d6fb..a88616e8ee7 100644 --- a/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java +++ b/server/src/test/java/org/elasticsearch/common/io/stream/DelayableWriteableTests.java @@ -159,7 +159,7 @@ public class DelayableWriteableTests extends ESTestCase { public void testSerializesWithRemoteVersion() throws IOException { Version remoteVersion = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT); DelayableWriteable original = DelayableWriteable.referencing(new SneakOtherSideVersionOnWire()); - assertThat(roundTrip(original, SneakOtherSideVersionOnWire::new, remoteVersion).get().version, equalTo(remoteVersion)); + assertThat(roundTrip(original, SneakOtherSideVersionOnWire::new, remoteVersion).expand().version, equalTo(remoteVersion)); } public void testAsSerializedIsNoopOnSerialized() throws IOException { @@ -172,7 +172,7 @@ public class DelayableWriteableTests extends ESTestCase { private void roundTripTestCase(DelayableWriteable original, Writeable.Reader reader) throws IOException { DelayableWriteable roundTripped = roundTrip(original, reader, Version.CURRENT); assertTrue(roundTripped.isSerialized()); - assertThat(roundTripped.get(), equalTo(original.get())); + assertThat(roundTripped.expand(), equalTo(original.expand())); } private DelayableWriteable roundTrip(DelayableWriteable original, diff --git a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java index 730e7ae30ac..20dab6ca289 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java @@ -87,8 +87,8 @@ public class QuerySearchResultTests extends ESTestCase { assertEquals(querySearchResult.size(), deserialized.size()); assertEquals(querySearchResult.hasAggs(), deserialized.hasAggs()); if (deserialized.hasAggs()) { - Aggregations aggs = querySearchResult.consumeAggs().get(); - Aggregations deserializedAggs = deserialized.consumeAggs().get(); + Aggregations aggs = querySearchResult.consumeAggs().expand(); + Aggregations deserializedAggs = deserialized.consumeAggs().expand(); assertEquals(aggs.asList(), deserializedAggs.asList()); } assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly()); @@ -114,7 +114,7 @@ public class QuerySearchResultTests extends ESTestCase { QuerySearchResult querySearchResult = new QuerySearchResult(in); assertEquals(100, querySearchResult.getContextId().getId()); assertTrue(querySearchResult.hasAggs()); - InternalAggregations aggs = querySearchResult.consumeAggs().get(); + InternalAggregations aggs = querySearchResult.consumeAggs().expand(); assertEquals(1, aggs.asList().size()); // We deserialize and throw away top level pipeline aggs } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java index e7ce86a0dad..d6ee78eaf80 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/AsyncSearchTask.java @@ -379,18 +379,16 @@ final class AsyncSearchTask extends SearchTask implements AsyncTask { reducedAggs = () -> null; } else { /* - * Keep a reference to the serialiazed form of the partially + * Keep a reference to the serialized form of the partially * reduced aggs and reduce it on the fly when someone asks - * for it. This will produce right-ish aggs. Much more right - * than if you don't do the final reduce. Its important that - * we wait until someone needs the result so we don't perform - * the final reduce only to throw it away. And it is important - * that we kep the reference to the serialized aggrgations - * because the SearchPhaseController *already* has that - * reference so we're not creating more garbage. + * for it. It's important that we wait until someone needs + * the result so we don't perform the final reduce only to + * throw it away. And it is important that we keep the reference + * to the serialized aggregations because SearchPhaseController + * *already* has that reference so we're not creating more garbage. */ reducedAggs = () -> - InternalAggregations.topLevelReduce(singletonList(aggregations.get()), aggReduceContextSupplier.get()); + InternalAggregations.topLevelReduce(singletonList(aggregations.expand()), aggReduceContextSupplier.get()); } searchResponse.get().updatePartialResponse(shards.size(), totalHits, reducedAggs, reducePhase); } diff --git a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java index 4c45c38b5ec..5980a1eea94 100644 --- a/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java +++ b/x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java @@ -50,8 +50,9 @@ class MutableSearchResponse { private int reducePhase; /** * The response produced by the search API. Once we receive it we stop - * building our own {@linkplain SearchResponse}s when you get the status - * and instead return this. + * building our own {@linkplain SearchResponse}s when get async search + * is called, and instead return this. + * @see #findOrBuildResponse(AsyncSearchTask) */ private SearchResponse finalResponse; private ElasticsearchException failure; @@ -157,10 +158,9 @@ class MutableSearchResponse { /* * Build the response, reducing aggs if we haven't already and * storing the result of the reduction so we won't have to reduce - * a second time if you get the response again and nothing has - * changed. This does cost memory because we have a reference - * to the reduced aggs sitting around so it can't be GCed until - * we get an update. + * the same aggregation results a second time if nothing has changed. + * This does cost memory because we have a reference to the finally + * reduced aggs sitting around which can't be GCed until we get an update. */ InternalAggregations reducedAggs = reducedAggsSource.get(); reducedAggsSource = () -> reducedAggs; @@ -183,8 +183,6 @@ class MutableSearchResponse { return resp; } - - private void failIfFrozen() { if (frozen) { throw new IllegalStateException("invalid update received after the completion of the request");