Cut over SearchResponse and SearchTemplateResponse to Writeable (#41855)

Relates to #34389
This commit is contained in:
Luca Cavanna 2019-05-09 11:48:05 +02:00
parent 29c9bb9181
commit c2af62455f
14 changed files with 102 additions and 70 deletions

View File

@ -20,17 +20,23 @@ package org.elasticsearch.plugin.noop.action.search;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.io.stream.Writeable;
public class NoopSearchAction extends Action<SearchResponse> {
public static final NoopSearchAction INSTANCE = new NoopSearchAction();
public static final String NAME = "mock:data/read/search";
public NoopSearchAction() {
private NoopSearchAction() {
super(NAME);
}
@Override
public SearchResponse newResponse() {
return new SearchResponse();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<SearchResponse> getResponseReader() {
return SearchResponse::new;
}
}

View File

@ -49,8 +49,7 @@ public class MultiSearchTemplateResponse extends ActionResponse implements Itera
private Item(StreamInput in) throws IOException {
if (in.readBoolean()) {
this.response = new SearchTemplateResponse();
response.readFrom(in);
this.response = new SearchTemplateResponse(in);
this.exception = null;
} else {
exception = in.readException();

View File

@ -20,6 +20,7 @@
package org.elasticsearch.script.mustache;
import org.elasticsearch.action.Action;
import org.elasticsearch.common.io.stream.Writeable;
public class SearchTemplateAction extends Action<SearchTemplateResponse> {
@ -32,6 +33,11 @@ public class SearchTemplateAction extends Action<SearchTemplateResponse> {
@Override
public SearchTemplateResponse newResponse() {
return new SearchTemplateResponse();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<SearchTemplateResponse> getResponseReader() {
return SearchTemplateResponse::new;
}
}

View File

@ -48,6 +48,12 @@ public class SearchTemplateResponse extends ActionResponse implements StatusToXC
SearchTemplateResponse() {
}
SearchTemplateResponse(StreamInput in) throws IOException {
super(in);
source = in.readOptionalBytesReference();
response = in.readOptionalWriteable(SearchResponse::new);
}
public BytesReference getSource() {
return source;
}
@ -81,10 +87,8 @@ public class SearchTemplateResponse extends ActionResponse implements StatusToXC
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
source = in.readOptionalBytesReference();
response = in.readOptionalStreamable(SearchResponse::new);
public void readFrom(StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
public static SearchTemplateResponse fromXContent(XContentParser parser) throws IOException {

View File

@ -71,8 +71,7 @@ public class MultiSearchResponse extends ActionResponse implements Iterable<Mult
Item(StreamInput in) throws IOException{
if (in.readBoolean()) {
this.response = new SearchResponse();
this.response.readFrom(in);
this.response = new SearchResponse(in);
this.exception = null;
} else {
this.exception = in.readException();

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.search;
import org.elasticsearch.action.Action;
import org.elasticsearch.common.io.stream.Writeable;
public class SearchAction extends Action<SearchResponse> {
@ -32,6 +33,11 @@ public class SearchAction extends Action<SearchResponse> {
@Override
public SearchResponse newResponse() {
return new SearchResponse();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<SearchResponse> getResponseReader() {
return SearchResponse::new;
}
}

View File

@ -67,23 +67,37 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
private static final ParseField TERMINATED_EARLY = new ParseField("terminated_early");
private static final ParseField NUM_REDUCE_PHASES = new ParseField("num_reduce_phases");
private SearchResponseSections internalResponse;
private final SearchResponseSections internalResponse;
private final String scrollId;
private final int totalShards;
private final int successfulShards;
private final int skippedShards;
private final ShardSearchFailure[] shardFailures;
private final Clusters clusters;
private final long tookInMillis;
private String scrollId;
private int totalShards;
private int successfulShards;
private int skippedShards;
private ShardSearchFailure[] shardFailures;
private Clusters clusters;
private long tookInMillis;
public SearchResponse() {
public SearchResponse(StreamInput in) throws IOException {
super(in);
internalResponse = new InternalSearchResponse(in);
totalShards = in.readVInt();
successfulShards = in.readVInt();
int size = in.readVInt();
if (size == 0) {
shardFailures = ShardSearchFailure.EMPTY_ARRAY;
} else {
shardFailures = new ShardSearchFailure[size];
for (int i = 0; i < shardFailures.length; i++) {
shardFailures[i] = readShardSearchFailure(in);
}
}
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
clusters = new Clusters(in);
} else {
clusters = Clusters.EMPTY;
}
scrollId = in.readOptionalString();
tookInMillis = in.readVLong();
skippedShards = in.readVInt();
}
public SearchResponse(SearchResponseSections internalResponse, String scrollId, int totalShards, int successfulShards,
@ -194,10 +208,6 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
return scrollId;
}
public void scrollId(String scrollId) {
this.scrollId = scrollId;
}
/**
* If profiling was enabled, this returns an object containing the profile results from
* each shard. If profiling was not enabled, this will return null
@ -356,28 +366,8 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
internalResponse = new InternalSearchResponse(in);
totalShards = in.readVInt();
successfulShards = in.readVInt();
int size = in.readVInt();
if (size == 0) {
shardFailures = ShardSearchFailure.EMPTY_ARRAY;
} else {
shardFailures = new ShardSearchFailure[size];
for (int i = 0; i < shardFailures.length; i++) {
shardFailures[i] = readShardSearchFailure(in);
}
}
if (in.getVersion().onOrAfter(Version.V_6_1_0)) {
clusters = new Clusters(in);
} else {
clusters = Clusters.EMPTY;
}
scrollId = in.readOptionalString();
tookInMillis = in.readVLong();
skippedShards = in.readVInt();
public void readFrom(StreamInput in) {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.search;
import org.elasticsearch.action.Action;
import org.elasticsearch.common.io.stream.Writeable;
public class SearchScrollAction extends Action<SearchResponse> {
@ -32,6 +33,11 @@ public class SearchScrollAction extends Action<SearchResponse> {
@Override
public SearchResponse newResponse() {
return new SearchResponse();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<SearchResponse> getResponseReader() {
return SearchResponse::new;
}
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
@ -154,7 +155,8 @@ public class MultiSearchActionTookTests extends ESTestCase {
requests.add(request);
commonExecutor.execute(() -> {
counter.decrementAndGet();
listener.onResponse(new SearchResponse());
listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, 0, 0L,
ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY));
});
}
};

View File

@ -32,6 +32,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportException;
@ -406,13 +407,17 @@ public class SearchAsyncActionTests extends ESTestCase {
}
public static class TestSearchResponse extends SearchResponse {
public final Set<ShardId> queried = new HashSet<>();
final Set<ShardId> queried = new HashSet<>();
TestSearchResponse() {
super(InternalSearchResponse.empty(), null, 0, 0, 0, 0L, ShardSearchFailure.EMPTY_ARRAY, Clusters.EMPTY);
}
}
public static class TestSearchPhaseResult extends SearchPhaseResult {
final DiscoveryNode node;
public TestSearchPhaseResult(long id, DiscoveryNode node) {
TestSearchPhaseResult(long id, DiscoveryNode node) {
this.requestId = id;
this.node = node;
}
@ -427,7 +432,7 @@ public class SearchAsyncActionTests extends ESTestCase {
private final DiscoveryNode node;
public MockConnection(DiscoveryNode node) {
MockConnection(DiscoveryNode node) {
this.node = node;
}
@ -438,7 +443,7 @@ public class SearchAsyncActionTests extends ESTestCase {
@Override
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
throws TransportException {
throw new UnsupportedOperationException();
}

View File

@ -278,7 +278,7 @@ public class SearchResponseTests extends ESTestCase {
public void testSerialization() throws IOException {
SearchResponse searchResponse = createTestItem(false);
SearchResponse deserialized = copyStreamable(searchResponse, namedWriteableRegistry, SearchResponse::new, Version.CURRENT);
SearchResponse deserialized = copyWriteable(searchResponse, namedWriteableRegistry, SearchResponse::new, Version.CURRENT);
if (searchResponse.getHits().getTotalHits() == null) {
assertNull(deserialized.getHits().getTotalHits());
} else {

View File

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
@ -117,7 +118,8 @@ public class TransportMultiSearchActionTests extends ESTestCase {
final ExecutorService executorService = rarely() ? rarelyExecutor : commonExecutor;
executorService.execute(() -> {
counter.decrementAndGet();
listener.onResponse(new SearchResponse());
listener.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, 0, 0L,
ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY));
});
}
};

View File

@ -10,6 +10,7 @@ import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.io.stream.Writeable;
public class RollupSearchAction extends Action<SearchResponse> {
@ -22,16 +23,17 @@ public class RollupSearchAction extends Action<SearchResponse> {
@Override
public SearchResponse newResponse() {
return new SearchResponse();
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Writeable.Reader<SearchResponse> getResponseReader() {
return SearchResponse::new;
}
public static class RequestBuilder extends ActionRequestBuilder<SearchRequest, SearchResponse> {
public RequestBuilder(ElasticsearchClient client, SearchRequest searchRequest) {
super(client, INSTANCE, searchRequest);
}
RequestBuilder(ElasticsearchClient client) {
super(client, INSTANCE, new SearchRequest());
}
}
}

View File

@ -12,11 +12,13 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.security.authc.AuthenticationField;
@ -232,7 +234,8 @@ public class ClientHelperTests extends ESTestCase {
when(client.threadPool()).thenReturn(threadPool);
PlainActionFuture<SearchResponse> searchFuture = PlainActionFuture.newFuture();
searchFuture.onResponse(new SearchResponse());
searchFuture.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, 0, 0L, ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY));
when(client.search(any())).thenReturn(searchFuture);
assertExecutionWithOrigin(Collections.emptyMap(), client);
}
@ -245,7 +248,8 @@ public class ClientHelperTests extends ESTestCase {
when(client.threadPool()).thenReturn(threadPool);
PlainActionFuture<SearchResponse> searchFuture = PlainActionFuture.newFuture();
searchFuture.onResponse(new SearchResponse());
searchFuture.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, 0, 0L, ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY));
when(client.search(any())).thenReturn(searchFuture);
Map<String, String> headers = MapBuilder.<String, String> newMapBuilder().put(AuthenticationField.AUTHENTICATION_KEY, "anything")
.put(AuthenticationServiceField.RUN_AS_USER_HEADER, "anything").map();
@ -265,7 +269,8 @@ public class ClientHelperTests extends ESTestCase {
when(client.threadPool()).thenReturn(threadPool);
PlainActionFuture<SearchResponse> searchFuture = PlainActionFuture.newFuture();
searchFuture.onResponse(new SearchResponse());
searchFuture.onResponse(new SearchResponse(InternalSearchResponse.empty(), null, 0, 0, 0, 0L, ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY));
when(client.search(any())).thenReturn(searchFuture);
Map<String, String> unrelatedHeaders = MapBuilder.<String, String> newMapBuilder().put(randomAlphaOfLength(10), "anything").map();