From c2af62455f6a001fc600bdf9c750b81b21adda34 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Thu, 9 May 2019 11:48:05 +0200 Subject: [PATCH] Cut over SearchResponse and SearchTemplateResponse to Writeable (#41855) Relates to #34389 --- .../noop/action/search/NoopSearchAction.java | 10 ++- .../mustache/MultiSearchTemplateResponse.java | 3 +- .../script/mustache/SearchTemplateAction.java | 8 +- .../mustache/SearchTemplateResponse.java | 12 ++- .../action/search/MultiSearchResponse.java | 3 +- .../action/search/SearchAction.java | 8 +- .../action/search/SearchResponse.java | 74 ++++++++----------- .../action/search/SearchScrollAction.java | 8 +- .../search/MultiSearchActionTookTests.java | 4 +- .../action/search/SearchAsyncActionTests.java | 13 +++- .../action/search/SearchResponseTests.java | 2 +- .../TransportMultiSearchActionTests.java | 4 +- .../rollup/action/RollupSearchAction.java | 12 +-- .../xpack/core/ClientHelperTests.java | 11 ++- 14 files changed, 102 insertions(+), 70 deletions(-) diff --git a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/NoopSearchAction.java b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/NoopSearchAction.java index e7e515594a5..9b390e1ffdd 100644 --- a/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/NoopSearchAction.java +++ b/client/client-benchmark-noop-api-plugin/src/main/java/org/elasticsearch/plugin/noop/action/search/NoopSearchAction.java @@ -20,17 +20,23 @@ package org.elasticsearch.plugin.noop.action.search; import org.elasticsearch.action.Action; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.common.io.stream.Writeable; public class NoopSearchAction extends Action { public static final NoopSearchAction INSTANCE = new NoopSearchAction(); public static final String NAME = "mock:data/read/search"; - public NoopSearchAction() { + private NoopSearchAction() { super(NAME); } @Override public SearchResponse newResponse() { - return new SearchResponse(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return SearchResponse::new; } } diff --git a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/MultiSearchTemplateResponse.java b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/MultiSearchTemplateResponse.java index dd8cdc04457..68adcffc4c4 100644 --- a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/MultiSearchTemplateResponse.java +++ b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/MultiSearchTemplateResponse.java @@ -49,8 +49,7 @@ public class MultiSearchTemplateResponse extends ActionResponse implements Itera private Item(StreamInput in) throws IOException { if (in.readBoolean()) { - this.response = new SearchTemplateResponse(); - response.readFrom(in); + this.response = new SearchTemplateResponse(in); this.exception = null; } else { exception = in.readException(); diff --git a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/SearchTemplateAction.java b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/SearchTemplateAction.java index a08329f48dc..5d905ec39e1 100644 --- a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/SearchTemplateAction.java +++ b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/SearchTemplateAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.script.mustache; import org.elasticsearch.action.Action; +import org.elasticsearch.common.io.stream.Writeable; public class SearchTemplateAction extends Action { @@ -32,6 +33,11 @@ public class SearchTemplateAction extends Action { @Override public SearchTemplateResponse newResponse() { - return new SearchTemplateResponse(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return SearchTemplateResponse::new; } } diff --git a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/SearchTemplateResponse.java b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/SearchTemplateResponse.java index 6d19afbfd6f..52a3a8b2882 100644 --- a/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/SearchTemplateResponse.java +++ b/modules/lang-mustache/src/main/java/org/elasticsearch/script/mustache/SearchTemplateResponse.java @@ -48,6 +48,12 @@ public class SearchTemplateResponse extends ActionResponse implements StatusToXC SearchTemplateResponse() { } + SearchTemplateResponse(StreamInput in) throws IOException { + super(in); + source = in.readOptionalBytesReference(); + response = in.readOptionalWriteable(SearchResponse::new); + } + public BytesReference getSource() { return source; } @@ -81,10 +87,8 @@ public class SearchTemplateResponse extends ActionResponse implements StatusToXC } @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - source = in.readOptionalBytesReference(); - response = in.readOptionalStreamable(SearchResponse::new); + public void readFrom(StreamInput in) { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } public static SearchTemplateResponse fromXContent(XContentParser parser) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java b/server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java index 343a2b3ebe3..ad5154a312a 100644 --- a/server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java +++ b/server/src/main/java/org/elasticsearch/action/search/MultiSearchResponse.java @@ -71,8 +71,7 @@ public class MultiSearchResponse extends ActionResponse implements Iterable { @@ -32,6 +33,11 @@ public class SearchAction extends Action { @Override public SearchResponse newResponse() { - return new SearchResponse(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return SearchResponse::new; } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java index 681f13b7a81..73c670f0c03 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java @@ -67,23 +67,37 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb private static final ParseField TERMINATED_EARLY = new ParseField("terminated_early"); private static final ParseField NUM_REDUCE_PHASES = new ParseField("num_reduce_phases"); - private SearchResponseSections internalResponse; + private final SearchResponseSections internalResponse; + private final String scrollId; + private final int totalShards; + private final int successfulShards; + private final int skippedShards; + private final ShardSearchFailure[] shardFailures; + private final Clusters clusters; + private final long tookInMillis; - private String scrollId; - - private int totalShards; - - private int successfulShards; - - private int skippedShards; - - private ShardSearchFailure[] shardFailures; - - private Clusters clusters; - - private long tookInMillis; - - public SearchResponse() { + public SearchResponse(StreamInput in) throws IOException { + super(in); + internalResponse = new InternalSearchResponse(in); + totalShards = in.readVInt(); + successfulShards = in.readVInt(); + int size = in.readVInt(); + if (size == 0) { + shardFailures = ShardSearchFailure.EMPTY_ARRAY; + } else { + shardFailures = new ShardSearchFailure[size]; + for (int i = 0; i < shardFailures.length; i++) { + shardFailures[i] = readShardSearchFailure(in); + } + } + if (in.getVersion().onOrAfter(Version.V_6_1_0)) { + clusters = new Clusters(in); + } else { + clusters = Clusters.EMPTY; + } + scrollId = in.readOptionalString(); + tookInMillis = in.readVLong(); + skippedShards = in.readVInt(); } public SearchResponse(SearchResponseSections internalResponse, String scrollId, int totalShards, int successfulShards, @@ -194,10 +208,6 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb return scrollId; } - public void scrollId(String scrollId) { - this.scrollId = scrollId; - } - /** * If profiling was enabled, this returns an object containing the profile results from * each shard. If profiling was not enabled, this will return null @@ -356,28 +366,8 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb } @Override - public void readFrom(StreamInput in) throws IOException { - super.readFrom(in); - internalResponse = new InternalSearchResponse(in); - totalShards = in.readVInt(); - successfulShards = in.readVInt(); - int size = in.readVInt(); - if (size == 0) { - shardFailures = ShardSearchFailure.EMPTY_ARRAY; - } else { - shardFailures = new ShardSearchFailure[size]; - for (int i = 0; i < shardFailures.length; i++) { - shardFailures[i] = readShardSearchFailure(in); - } - } - if (in.getVersion().onOrAfter(Version.V_6_1_0)) { - clusters = new Clusters(in); - } else { - clusters = Clusters.EMPTY; - } - scrollId = in.readOptionalString(); - tookInMillis = in.readVLong(); - skippedShards = in.readVInt(); + public void readFrom(StreamInput in) { + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchScrollAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchScrollAction.java index ff72a7e5e51..0b4adfc1ba5 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchScrollAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchScrollAction.java @@ -20,6 +20,7 @@ package org.elasticsearch.action.search; import org.elasticsearch.action.Action; +import org.elasticsearch.common.io.stream.Writeable; public class SearchScrollAction extends Action { @@ -32,6 +33,11 @@ public class SearchScrollAction extends Action { @Override public SearchResponse newResponse() { - return new SearchResponse(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return SearchResponse::new; } } diff --git a/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java b/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java index ece695575a1..01f1109ef3b 100644 --- a/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/MultiSearchActionTookTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.common.Randomness; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.tasks.Task; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; @@ -154,7 +155,8 @@ public class MultiSearchActionTookTests extends ESTestCase { requests.add(request); commonExecutor.execute(() -> { counter.decrementAndGet(); - listener.onResponse(new SearchResponse()); + listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, 0, 0L, + ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY)); }); } }; diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index 23abefea15f..6a6b1c54db2 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportException; @@ -406,13 +407,17 @@ public class SearchAsyncActionTests extends ESTestCase { } public static class TestSearchResponse extends SearchResponse { - public final Set queried = new HashSet<>(); + final Set queried = new HashSet<>(); + + TestSearchResponse() { + super(InternalSearchResponse.empty(), null, 0, 0, 0, 0L, ShardSearchFailure.EMPTY_ARRAY, Clusters.EMPTY); + } } public static class TestSearchPhaseResult extends SearchPhaseResult { final DiscoveryNode node; - public TestSearchPhaseResult(long id, DiscoveryNode node) { + TestSearchPhaseResult(long id, DiscoveryNode node) { this.requestId = id; this.node = node; } @@ -427,7 +432,7 @@ public class SearchAsyncActionTests extends ESTestCase { private final DiscoveryNode node; - public MockConnection(DiscoveryNode node) { + MockConnection(DiscoveryNode node) { this.node = node; } @@ -438,7 +443,7 @@ public class SearchAsyncActionTests extends ESTestCase { @Override public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options) - throws IOException, TransportException { + throws TransportException { throw new UnsupportedOperationException(); } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java index 18890e13395..ed5104cabe5 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchResponseTests.java @@ -278,7 +278,7 @@ public class SearchResponseTests extends ESTestCase { public void testSerialization() throws IOException { SearchResponse searchResponse = createTestItem(false); - SearchResponse deserialized = copyStreamable(searchResponse, namedWriteableRegistry, SearchResponse::new, Version.CURRENT); + SearchResponse deserialized = copyWriteable(searchResponse, namedWriteableRegistry, SearchResponse::new, Version.CURRENT); if (searchResponse.getHits().getTotalHits() == null) { assertNull(deserialized.getHits().getTotalHits()); } else { diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java index dade5eadb18..7ecc172924b 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportMultiSearchActionTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -117,7 +118,8 @@ public class TransportMultiSearchActionTests extends ESTestCase { final ExecutorService executorService = rarely() ? rarelyExecutor : commonExecutor; executorService.execute(() -> { counter.decrementAndGet(); - listener.onResponse(new SearchResponse()); + listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, 0, 0L, + ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY)); }); } }; diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupSearchAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupSearchAction.java index 595df0f8c31..c6eecca5e3d 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupSearchAction.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/rollup/action/RollupSearchAction.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionRequestBuilder; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.io.stream.Writeable; public class RollupSearchAction extends Action { @@ -22,16 +23,17 @@ public class RollupSearchAction extends Action { @Override public SearchResponse newResponse() { - return new SearchResponse(); + throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable"); + } + + @Override + public Writeable.Reader getResponseReader() { + return SearchResponse::new; } public static class RequestBuilder extends ActionRequestBuilder { public RequestBuilder(ElasticsearchClient client, SearchRequest searchRequest) { super(client, INSTANCE, searchRequest); } - - RequestBuilder(ElasticsearchClient client) { - super(client, INSTANCE, new SearchRequest()); - } } } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ClientHelperTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ClientHelperTests.java index 1a0a8e76412..9641f097119 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ClientHelperTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ClientHelperTests.java @@ -12,11 +12,13 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Client; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.core.security.authc.AuthenticationField; @@ -232,7 +234,8 @@ public class ClientHelperTests extends ESTestCase { when(client.threadPool()).thenReturn(threadPool); PlainActionFuture searchFuture = PlainActionFuture.newFuture(); - searchFuture.onResponse(new SearchResponse()); + searchFuture.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, 0, 0L, ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY)); when(client.search(any())).thenReturn(searchFuture); assertExecutionWithOrigin(Collections.emptyMap(), client); } @@ -245,7 +248,8 @@ public class ClientHelperTests extends ESTestCase { when(client.threadPool()).thenReturn(threadPool); PlainActionFuture searchFuture = PlainActionFuture.newFuture(); - searchFuture.onResponse(new SearchResponse()); + searchFuture.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, 0, 0L, ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY)); when(client.search(any())).thenReturn(searchFuture); Map headers = MapBuilder. newMapBuilder().put(AuthenticationField.AUTHENTICATION_KEY, "anything") .put(AuthenticationServiceField.RUN_AS_USER_HEADER, "anything").map(); @@ -265,7 +269,8 @@ public class ClientHelperTests extends ESTestCase { when(client.threadPool()).thenReturn(threadPool); PlainActionFuture searchFuture = PlainActionFuture.newFuture(); - searchFuture.onResponse(new SearchResponse()); + searchFuture.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, 0, 0L, ShardSearchFailure.EMPTY_ARRAY, + SearchResponse.Clusters.EMPTY)); when(client.search(any())).thenReturn(searchFuture); Map unrelatedHeaders = MapBuilder. newMapBuilder().put(randomAlphaOfLength(10), "anything").map();