From 1ded45b0a27e27d44a198cc6da5c8b40e039db98 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Wed, 8 May 2019 11:00:59 +0200 Subject: [PATCH] Cut over SearchPhaseResult to Writeable (#41853) Relates to #34389 --- .../search/SearchPhaseResult.java | 22 ++++++-- .../elasticsearch/search/SearchService.java | 9 +-- .../search/dfs/DfsSearchResult.java | 55 ++++++------------- .../search/fetch/FetchSearchResult.java | 17 +----- .../search/fetch/QueryFetchSearchResult.java | 27 ++------- .../fetch/ScrollQueryFetchSearchResult.java | 20 ++----- .../search/query/QuerySearchResult.java | 17 +----- .../search/query/ScrollQuerySearchResult.java | 20 ++----- .../action/search/SearchAsyncActionTests.java | 6 -- .../search/query/QuerySearchResultTests.java | 5 +- 10 files changed, 61 insertions(+), 137 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchPhaseResult.java b/server/src/main/java/org/elasticsearch/search/SearchPhaseResult.java index f242cbd8eb9..39e6adede88 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchPhaseResult.java +++ b/server/src/main/java/org/elasticsearch/search/SearchPhaseResult.java @@ -19,11 +19,13 @@ package org.elasticsearch.search; -import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.search.fetch.FetchSearchResult; import org.elasticsearch.search.query.QuerySearchResult; import org.elasticsearch.transport.TransportResponse; +import java.io.IOException; + /** * This class is a base class for all search related results. It contains the shard target it * was executed against, a shard index used to reference the result on the coordinating node @@ -32,15 +34,22 @@ import org.elasticsearch.transport.TransportResponse; * across search phases to ensure the same point in time snapshot is used for querying and * fetching etc. */ -public abstract class SearchPhaseResult extends TransportResponse implements Streamable { +public abstract class SearchPhaseResult extends TransportResponse { private SearchShardTarget searchShardTarget; private int shardIndex = -1; protected long requestId; + protected SearchPhaseResult() { + + } + + protected SearchPhaseResult(StreamInput in) throws IOException { + super(in); + } + /** - * Returns the results request ID that is used to reference the search context on the executing - * node + * Returns the results request ID that is used to reference the search context on the executing node */ public long getRequestId() { return requestId; @@ -79,4 +88,9 @@ public abstract class SearchPhaseResult extends TransportResponse implements Str * Returns the fetch result iff it's included in this response otherwise null */ public FetchSearchResult fetchResult() { return null; } + + @Override + public final void readFrom(StreamInput in) { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index bf950ac23df..b703493b4d5 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -1055,9 +1055,10 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv } public static final class CanMatchResponse extends SearchPhaseResult { - private boolean canMatch; + private final boolean canMatch; public CanMatchResponse(StreamInput in) throws IOException { + super(in); this.canMatch = in.readBoolean(); } @@ -1065,12 +1066,6 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv this.canMatch = canMatch; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - canMatch = in.readBoolean(); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java b/server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java index 49a791941f6..a25918988af 100644 --- a/server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/dfs/DfsSearchResult.java @@ -43,11 +43,22 @@ public class DfsSearchResult extends SearchPhaseResult { private ObjectObjectHashMap fieldStatistics = HppcMaps.newNoNullKeysMap(); private int maxDoc; - public DfsSearchResult() { - } - public DfsSearchResult(StreamInput in) throws IOException { - readFrom(in); + super(in); + requestId = in.readLong(); + int termsSize = in.readVInt(); + if (termsSize == 0) { + terms = EMPTY_TERMS; + } else { + terms = new Term[termsSize]; + for (int i = 0; i < terms.length; i++) { + terms[i] = new Term(in.readString(), in.readBytesRef()); + } + } + this.termStatistics = readTermStats(in, terms); + fieldStatistics = readFieldStats(in); + + maxDoc = in.readVInt(); } public DfsSearchResult(long id, SearchShardTarget shardTarget) { @@ -87,26 +98,6 @@ public class DfsSearchResult extends SearchPhaseResult { return fieldStatistics; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - requestId = in.readLong(); - int termsSize = in.readVInt(); - if (termsSize == 0) { - terms = EMPTY_TERMS; - } else { - terms = new Term[termsSize]; - for (int i = 0; i < terms.length; i++) { - terms[i] = new Term(in.readString(), in.readBytesRef()); - } - } - this.termStatistics = readTermStats(in, terms); - readFieldStats(in, fieldStatistics); - - - maxDoc = in.readVInt(); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -161,16 +152,9 @@ public class DfsSearchResult extends SearchPhaseResult { } } - public static ObjectObjectHashMap readFieldStats(StreamInput in) throws IOException { - return readFieldStats(in, null); - } - - public static ObjectObjectHashMap readFieldStats(StreamInput in, - ObjectObjectHashMap fieldStatistics) throws IOException { + static ObjectObjectHashMap readFieldStats(StreamInput in) throws IOException { final int numFieldStatistics = in.readVInt(); - if (fieldStatistics == null) { - fieldStatistics = HppcMaps.newNoNullKeysMap(numFieldStatistics); - } + ObjectObjectHashMap fieldStatistics = HppcMaps.newNoNullKeysMap(numFieldStatistics); for (int i = 0; i < numFieldStatistics; i++) { final String field = in.readString(); assert field != null; @@ -194,7 +178,7 @@ public class DfsSearchResult extends SearchPhaseResult { return fieldStatistics; } - public static TermStatistics[] readTermStats(StreamInput in, Term[] terms) throws IOException { + static TermStatistics[] readTermStats(StreamInput in, Term[] terms) throws IOException { int termsStatsSize = in.readVInt(); final TermStatistics[] termStatistics; if (termsStatsSize == 0) { @@ -216,7 +200,6 @@ public class DfsSearchResult extends SearchPhaseResult { return termStatistics; } - /* * optional statistics are set to -1 in lucene by default. * Since we are using var longs to encode values we add one to each value @@ -227,7 +210,6 @@ public class DfsSearchResult extends SearchPhaseResult { return value + 1; } - /* * See #addOne this just subtracting one and asserts that the actual value * is positive. @@ -236,5 +218,4 @@ public class DfsSearchResult extends SearchPhaseResult { assert value >= 0; return value - 1; } - } diff --git a/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java b/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java index 400ab3623c0..6e183f11483 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/FetchSearchResult.java @@ -39,7 +39,9 @@ public final class FetchSearchResult extends SearchPhaseResult { } public FetchSearchResult(StreamInput in) throws IOException { - readFrom(in); + super(in); + requestId = in.readLong(); + hits = new SearchHits(in); } public FetchSearchResult(long id, SearchShardTarget shardTarget) { @@ -82,19 +84,6 @@ public final class FetchSearchResult extends SearchPhaseResult { return counter++; } - public static FetchSearchResult readFetchSearchResult(StreamInput in) throws IOException { - FetchSearchResult result = new FetchSearchResult(); - result.readFrom(in); - return result; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - requestId = in.readLong(); - hits = new SearchHits(in); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java b/server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java index 0a5a7cec375..e87bd6d5e9d 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/QueryFetchSearchResult.java @@ -27,19 +27,15 @@ import org.elasticsearch.search.query.QuerySearchResult; import java.io.IOException; -import static org.elasticsearch.search.fetch.FetchSearchResult.readFetchSearchResult; -import static org.elasticsearch.search.query.QuerySearchResult.readQuerySearchResult; - public final class QueryFetchSearchResult extends SearchPhaseResult { - private QuerySearchResult queryResult; - private FetchSearchResult fetchResult; - - public QueryFetchSearchResult() { - } + private final QuerySearchResult queryResult; + private final FetchSearchResult fetchResult; public QueryFetchSearchResult(StreamInput in) throws IOException { - readFrom(in); + super(in); + queryResult = new QuerySearchResult(in); + fetchResult = new FetchSearchResult(in); } public QueryFetchSearchResult(QuerySearchResult queryResult, FetchSearchResult fetchResult) { @@ -81,19 +77,6 @@ public final class QueryFetchSearchResult extends SearchPhaseResult { return fetchResult; } - public static QueryFetchSearchResult readQueryFetchSearchResult(StreamInput in) throws IOException { - QueryFetchSearchResult result = new QueryFetchSearchResult(); - result.readFrom(in); - return result; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - queryResult = readQuerySearchResult(in); - fetchResult = readFetchSearchResult(in); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java b/server/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java index 6b0a8b619bf..0785fafdc21 100644 --- a/server/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/fetch/ScrollQueryFetchSearchResult.java @@ -27,17 +27,15 @@ import org.elasticsearch.search.query.QuerySearchResult; import java.io.IOException; -import static org.elasticsearch.search.fetch.QueryFetchSearchResult.readQueryFetchSearchResult; - public final class ScrollQueryFetchSearchResult extends SearchPhaseResult { - private QueryFetchSearchResult result; - - public ScrollQueryFetchSearchResult() { - } + private final QueryFetchSearchResult result; public ScrollQueryFetchSearchResult(StreamInput in) throws IOException { - readFrom(in); + super(in); + SearchShardTarget searchShardTarget = new SearchShardTarget(in); + result = new QueryFetchSearchResult(in); + setSearchShardTarget(searchShardTarget); } public ScrollQueryFetchSearchResult(QueryFetchSearchResult result, SearchShardTarget shardTarget) { @@ -71,14 +69,6 @@ public final class ScrollQueryFetchSearchResult extends SearchPhaseResult { return result.fetchResult(); } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - SearchShardTarget searchShardTarget = new SearchShardTarget(in); - result = readQueryFetchSearchResult(in); - setSearchShardTarget(searchShardTarget); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java index 16100eb9646..7639ffc4706 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/QuerySearchResult.java @@ -67,7 +67,9 @@ public final class QuerySearchResult extends SearchPhaseResult { } public QuerySearchResult(StreamInput in) throws IOException { - readFrom(in); + super(in); + long id = in.readLong(); + readFromWithId(id, in); } public QuerySearchResult(long id, SearchShardTarget shardTarget) { @@ -256,19 +258,6 @@ public final class QuerySearchResult extends SearchPhaseResult { return hasScoreDocs || hasSuggestHits(); } - public static QuerySearchResult readQuerySearchResult(StreamInput in) throws IOException { - QuerySearchResult result = new QuerySearchResult(); - result.readFrom(in); - return result; - } - - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - long id = in.readLong(); - readFromWithId(id, in); - } - public void readFromWithId(long id, StreamInput in) throws IOException { this.requestId = id; from = in.readVInt(); diff --git a/server/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java b/server/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java index 632d148ea90..fe8bad3d098 100644 --- a/server/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java +++ b/server/src/main/java/org/elasticsearch/search/query/ScrollQuerySearchResult.java @@ -26,17 +26,15 @@ import org.elasticsearch.search.SearchShardTarget; import java.io.IOException; -import static org.elasticsearch.search.query.QuerySearchResult.readQuerySearchResult; - public final class ScrollQuerySearchResult extends SearchPhaseResult { - private QuerySearchResult result; - - public ScrollQuerySearchResult() { - } + private final QuerySearchResult result; public ScrollQuerySearchResult(StreamInput in) throws IOException { - readFrom(in); + super(in); + SearchShardTarget shardTarget = new SearchShardTarget(in); + result = new QuerySearchResult(in); + setSearchShardTarget(shardTarget); } public ScrollQuerySearchResult(QuerySearchResult result, SearchShardTarget shardTarget) { @@ -61,14 +59,6 @@ public final class ScrollQuerySearchResult extends SearchPhaseResult { return result; } - @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - SearchShardTarget shardTarget = new SearchShardTarget(in); - result = readQuerySearchResult(in); - setSearchShardTarget(shardTarget); - } - @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 21ac0cdf636..23abefea15f 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -27,7 +27,6 @@ import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.Strings; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; @@ -418,11 +417,6 @@ public class SearchAsyncActionTests extends ESTestCase { this.node = node; } - @Override - public void readFrom(StreamInput in) throws IOException { - - } - @Override public void writeTo(StreamOutput out) throws IOException { diff --git a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java index 29ee7c9238e..f74cafc22ff 100644 --- a/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java +++ b/server/src/test/java/org/elasticsearch/search/query/QuerySearchResultTests.java @@ -78,7 +78,7 @@ public class QuerySearchResultTests extends ESTestCase { public void testSerialization() throws Exception { QuerySearchResult querySearchResult = createTestInstance(); Version version = VersionUtils.randomVersion(random()); - QuerySearchResult deserialized = copyStreamable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new, version); + QuerySearchResult deserialized = copyWriteable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new, version); assertEquals(querySearchResult.getRequestId(), deserialized.getRequestId()); assertNull(deserialized.getSearchShardTarget()); assertEquals(querySearchResult.topDocs().maxScore, deserialized.topDocs().maxScore, 0f); @@ -121,8 +121,7 @@ public class QuerySearchResultTests extends ESTestCase { byte[] bytes = Base64.getDecoder().decode(message); try (NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(bytes), namedWriteableRegistry)) { in.setVersion(Version.V_7_0_0); - QuerySearchResult querySearchResult = new QuerySearchResult(); - querySearchResult.readFrom(in); + QuerySearchResult querySearchResult = new QuerySearchResult(in); assertEquals(100, querySearchResult.getRequestId()); assertTrue(querySearchResult.hasAggs()); InternalAggregations aggs = (InternalAggregations)querySearchResult.consumeAggs();