Migrate Search requests to use Writeable reading strategies (#26428)

Migrates many SearchRequest objects to use Writeable conventions and rejects usage of `readFrom` in these new classes.
This commit is contained in:
Tal Levy 2017-08-30 11:00:33 -07:00 committed by GitHub
parent ea3fa768f9
commit ed151d829d
18 changed files with 224 additions and 156 deletions

View File

@ -42,8 +42,8 @@ public class TransportNoopSearchAction extends HandledTransportAction<SearchRequ
@Inject
public TransportNoopSearchAction(Settings settings, ThreadPool threadPool, TransportService transportService, ActionFilters
actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, NoopSearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
SearchRequest::new);
super(settings, NoopSearchAction.NAME, threadPool, transportService, actionFilters, SearchRequest::new,
indexNameExpressionResolver);
}
@Override

View File

@ -117,8 +117,7 @@ public class MultiSearchRequest extends ActionRequest implements CompositeIndice
maxConcurrentSearchRequests = in.readVInt();
int size = in.readVInt();
for (int i = 0; i < size; i++) {
SearchRequest request = new SearchRequest();
request.readFrom(in);
SearchRequest request = new SearchRequest(in);
requests.add(request);
}
}

View File

@ -109,6 +109,55 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
this.source = source;
}
/**
* Constructs a new search request from reading the specified stream.
*
* @param in The stream the request is read from
* @throws IOException if there is an issue reading the stream
*/
public SearchRequest(StreamInput in) throws IOException {
super(in);
searchType = SearchType.fromId(in.readByte());
indices = new String[in.readVInt()];
for (int i = 0; i < indices.length; i++) {
indices[i] = in.readString();
}
routing = in.readOptionalString();
preference = in.readOptionalString();
scroll = in.readOptionalWriteable(Scroll::new);
source = in.readOptionalWriteable(SearchSourceBuilder::new);
types = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
requestCache = in.readOptionalBoolean();
batchedReduceSize = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_5_6_0)) {
maxConcurrentShardRequests = in.readVInt();
preFilterShardSize = in.readVInt();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(searchType.id());
out.writeVInt(indices.length);
for (String index : indices) {
out.writeString(index);
}
out.writeOptionalString(routing);
out.writeOptionalString(preference);
out.writeOptionalWriteable(scroll);
out.writeOptionalWriteable(source);
out.writeStringArray(types);
indicesOptions.writeIndicesOptions(out);
out.writeOptionalBoolean(requestCache);
out.writeVInt(batchedReduceSize);
if (out.getVersion().onOrAfter(Version.V_5_6_0)) {
out.writeVInt(maxConcurrentShardRequests);
out.writeVInt(preFilterShardSize);
}
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
@ -397,46 +446,7 @@ public final class SearchRequest extends ActionRequest implements IndicesRequest
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
searchType = SearchType.fromId(in.readByte());
indices = new String[in.readVInt()];
for (int i = 0; i < indices.length; i++) {
indices[i] = in.readString();
}
routing = in.readOptionalString();
preference = in.readOptionalString();
scroll = in.readOptionalWriteable(Scroll::new);
source = in.readOptionalWriteable(SearchSourceBuilder::new);
types = in.readStringArray();
indicesOptions = IndicesOptions.readIndicesOptions(in);
requestCache = in.readOptionalBoolean();
batchedReduceSize = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_5_6_0)) {
maxConcurrentShardRequests = in.readVInt();
preFilterShardSize = in.readVInt();
}
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeByte(searchType.id());
out.writeVInt(indices.length);
for (String index : indices) {
out.writeString(index);
}
out.writeOptionalString(routing);
out.writeOptionalString(preference);
out.writeOptionalWriteable(scroll);
out.writeOptionalWriteable(source);
out.writeStringArray(types);
indicesOptions.writeIndicesOptions(out);
out.writeOptionalBoolean(requestCache);
out.writeVInt(batchedReduceSize);
if (out.getVersion().onOrAfter(Version.V_5_6_0)) {
out.writeVInt(maxConcurrentShardRequests);
out.writeVInt(preFilterShardSize);
}
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -48,6 +48,19 @@ public class SearchScrollRequest extends ActionRequest implements ToXContentObje
this.scrollId = scrollId;
}
public SearchScrollRequest(StreamInput in) throws IOException {
super(in);
scrollId = in.readString();
scroll = in.readOptionalWriteable(Scroll::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(scrollId);
out.writeOptionalWriteable(scroll);
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
@ -100,16 +113,7 @@ public class SearchScrollRequest extends ActionRequest implements ToXContentObje
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
scrollId = in.readString();
scroll = in.readOptionalWriteable(Scroll::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(scrollId);
out.writeOptionalWriteable(scroll);
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -203,13 +203,8 @@ public class SearchTransportService extends AbstractComponent {
this.id = id;
}
public long id() {
return this.id;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
ScrollFreeContextRequest(StreamInput in) throws IOException {
super(in);
id = in.readLong();
}
@ -218,6 +213,15 @@ public class SearchTransportService extends AbstractComponent {
super.writeTo(out);
out.writeLong(id);
}
public long id() {
return this.id;
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
}
static class SearchFreeContextRequest extends ScrollFreeContextRequest implements IndicesRequest {
@ -231,6 +235,17 @@ public class SearchTransportService extends AbstractComponent {
this.originalIndices = originalIndices;
}
SearchFreeContextRequest(StreamInput in) throws IOException {
super(in);
originalIndices = OriginalIndices.readOriginalIndices(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
OriginalIndices.writeOriginalIndices(originalIndices, out);
}
@Override
public String[] indices() {
if (originalIndices == null) {
@ -249,14 +264,7 @@ public class SearchTransportService extends AbstractComponent {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
originalIndices = OriginalIndices.readOriginalIndices(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
OriginalIndices.writeOriginalIndices(originalIndices, out);
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
}
@ -289,7 +297,7 @@ public class SearchTransportService extends AbstractComponent {
}
public static void registerRequestHandler(TransportService transportService, SearchService searchService) {
transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ScrollFreeContextRequest::new, ThreadPool.Names.SAME,
transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ScrollFreeContextRequest::new,
new TaskAwareTransportRequestHandler<ScrollFreeContextRequest>() {
@Override
public void messageReceived(ScrollFreeContextRequest request, TransportChannel channel, Task task) throws Exception {
@ -298,7 +306,7 @@ public class SearchTransportService extends AbstractComponent {
}
});
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, SearchFreeContextResponse::new);
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, SearchFreeContextRequest::new, ThreadPool.Names.SAME,
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, SearchFreeContextRequest::new,
new TaskAwareTransportRequestHandler<SearchFreeContextRequest>() {
@Override
public void messageReceived(SearchFreeContextRequest request, TransportChannel channel, Task task) throws Exception {
@ -318,7 +326,7 @@ public class SearchTransportService extends AbstractComponent {
TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
() -> TransportResponse.Empty.INSTANCE);
transportService.registerRequestHandler(DFS_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
transportService.registerRequestHandler(DFS_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
@ -346,7 +354,7 @@ public class SearchTransportService extends AbstractComponent {
});
TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, DfsSearchResult::new);
transportService.registerRequestHandler(QUERY_ACTION_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
@ -373,7 +381,7 @@ public class SearchTransportService extends AbstractComponent {
});
TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME, QuerySearchResult::new);
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, QuerySearchRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SEARCH, QuerySearchRequest::new,
new TaskAwareTransportRequestHandler<QuerySearchRequest>() {
@Override
public void messageReceived(QuerySearchRequest request, TransportChannel channel, Task task) throws Exception {
@ -383,7 +391,7 @@ public class SearchTransportService extends AbstractComponent {
});
TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new);
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
@Override
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception {
@ -393,7 +401,7 @@ public class SearchTransportService extends AbstractComponent {
});
TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new);
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, InternalScrollSearchRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
@Override
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception {
@ -403,7 +411,7 @@ public class SearchTransportService extends AbstractComponent {
});
TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new);
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ShardFetchRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchRequest::new,
new TaskAwareTransportRequestHandler<ShardFetchRequest>() {
@Override
public void messageReceived(ShardFetchRequest request, TransportChannel channel, Task task) throws Exception {
@ -413,7 +421,7 @@ public class SearchTransportService extends AbstractComponent {
});
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new);
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ShardFetchSearchRequest::new, ThreadPool.Names.SEARCH,
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchSearchRequest::new,
new TaskAwareTransportRequestHandler<ShardFetchSearchRequest>() {
@Override
public void messageReceived(ShardFetchSearchRequest request, TransportChannel channel, Task task) throws Exception {
@ -424,7 +432,7 @@ public class SearchTransportService extends AbstractComponent {
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new);
// this is super cheap and should not hit thread-pool rejections
transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ShardSearchTransportRequest::new, ThreadPool.Names.SAME,
transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {

View File

@ -82,7 +82,7 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest,
SearchTransportService searchTransportService, SearchPhaseController searchPhaseController,
ClusterService clusterService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, SearchAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, SearchRequest::new);
super(settings, SearchAction.NAME, threadPool, transportService, actionFilters, SearchRequest::new, indexNameExpressionResolver);
this.searchPhaseController = searchPhaseController;
this.searchTransportService = searchTransportService;
this.remoteClusterService = searchTransportService.getRemoteClusterService();

View File

@ -45,8 +45,8 @@ public class TransportSearchScrollAction extends HandledTransportAction<SearchSc
ClusterService clusterService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
SearchTransportService searchTransportService, SearchPhaseController searchPhaseController) {
super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver,
SearchScrollRequest::new);
super(settings, SearchScrollAction.NAME, threadPool, transportService, actionFilters, SearchScrollRequest::new,
indexNameExpressionResolver);
this.clusterService = clusterService;
this.searchTransportService = searchTransportService;
this.searchPhaseController = searchPhaseController;

View File

@ -399,8 +399,7 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
searchRequest = new SearchRequest();
searchRequest.readFrom(in);
searchRequest = new SearchRequest(in);
abortOnVersionConflict = in.readBoolean();
size = in.readVInt();
refresh = in.readBoolean();

View File

@ -56,25 +56,8 @@ public class ShardFetchRequest extends TransportRequest {
this.lastEmittedDoc = lastEmittedDoc;
}
public long id() {
return id;
}
public int[] docIds() {
return docIds;
}
public int docIdsSize() {
return size;
}
public ScoreDoc lastEmittedDoc() {
return lastEmittedDoc;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
public ShardFetchRequest(StreamInput in) throws IOException {
super(in);
id = in.readLong();
size = in.readVInt();
docIds = new int[size];
@ -110,6 +93,27 @@ public class ShardFetchRequest extends TransportRequest {
}
}
public long id() {
return id;
}
public int[] docIds() {
return docIds;
}
public int docIdsSize() {
return size;
}
public ScoreDoc lastEmittedDoc() {
return lastEmittedDoc;
}
@Override
public void readFrom(StreamInput in) throws IOException {
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId) {
return new SearchTask(id, type, action, getDescription(), parentTaskId);

View File

@ -46,6 +46,17 @@ public class ShardFetchSearchRequest extends ShardFetchRequest implements Indice
this.originalIndices = originalIndices;
}
public ShardFetchSearchRequest(StreamInput in) throws IOException {
super(in);
originalIndices = OriginalIndices.readOriginalIndices(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
OriginalIndices.writeOriginalIndices(originalIndices, out);
}
@Override
public String[] indices() {
if (originalIndices == null) {
@ -64,13 +75,6 @@ public class ShardFetchSearchRequest extends ShardFetchRequest implements Indice
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
originalIndices = OriginalIndices.readOriginalIndices(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
OriginalIndices.writeOriginalIndices(originalIndices, out);
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
}

View File

@ -44,6 +44,19 @@ public class InternalScrollSearchRequest extends TransportRequest {
this.scroll = request.scroll();
}
public InternalScrollSearchRequest(StreamInput in) throws IOException {
super(in);
id = in.readLong();
scroll = in.readOptionalWriteable(Scroll::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
out.writeOptionalWriteable(scroll);
}
public long id() {
return id;
}
@ -59,16 +72,7 @@ public class InternalScrollSearchRequest extends TransportRequest {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readLong();
scroll = in.readOptionalWriteable(Scroll::new);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
out.writeOptionalWriteable(scroll);
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -62,6 +62,20 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
this.originalIndices = originalIndices;
}
public ShardSearchTransportRequest(StreamInput in) throws IOException {
super(in);
shardSearchLocalRequest = new ShardSearchLocalRequest();
shardSearchLocalRequest.innerReadFrom(in);
originalIndices = OriginalIndices.readOriginalIndices(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardSearchLocalRequest.innerWriteTo(out, false);
OriginalIndices.writeOriginalIndices(originalIndices, out);
}
public void searchType(SearchType searchType) {
shardSearchLocalRequest.setSearchType(searchType);
}
@ -144,18 +158,7 @@ public class ShardSearchTransportRequest extends TransportRequest implements Sha
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
shardSearchLocalRequest = new ShardSearchLocalRequest();
shardSearchLocalRequest.innerReadFrom(in);
originalIndices = OriginalIndices.readOriginalIndices(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
shardSearchLocalRequest.innerWriteTo(out, false);
OriginalIndices.writeOriginalIndices(originalIndices, out);
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -52,6 +52,21 @@ public class QuerySearchRequest extends TransportRequest implements IndicesReque
this.originalIndices = originalIndices;
}
public QuerySearchRequest(StreamInput in) throws IOException {
super(in);
id = in.readLong();
dfs = readAggregatedDfs(in);
originalIndices = OriginalIndices.readOriginalIndices(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
dfs.writeTo(out);
OriginalIndices.writeOriginalIndices(originalIndices, out);
}
public long id() {
return id;
}
@ -72,18 +87,7 @@ public class QuerySearchRequest extends TransportRequest implements IndicesReque
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
id = in.readLong();
dfs = readAggregatedDfs(in);
originalIndices = OriginalIndices.readOriginalIndices(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeLong(id);
dfs.writeTo(out);
OriginalIndices.writeOriginalIndices(originalIndices, out);
throw new UnsupportedOperationException("usage of Streamable is to be replaced by Writeable");
}
@Override

View File

@ -46,8 +46,7 @@ public class SearchScrollRequestTests extends ESTestCase {
try (BytesStreamOutput output = new BytesStreamOutput()) {
searchScrollRequest.writeTo(output);
try (StreamInput in = output.bytes().streamInput()) {
SearchScrollRequest deserializedRequest = new SearchScrollRequest();
deserializedRequest.readFrom(in);
SearchScrollRequest deserializedRequest = new SearchScrollRequest(in);
assertEquals(deserializedRequest, searchScrollRequest);
assertEquals(deserializedRequest.hashCode(), searchScrollRequest.hashCode());
assertNotSame(deserializedRequest, searchScrollRequest);
@ -61,8 +60,7 @@ public class SearchScrollRequestTests extends ESTestCase {
try (BytesStreamOutput output = new BytesStreamOutput()) {
internalScrollSearchRequest.writeTo(output);
try (StreamInput in = output.bytes().streamInput()) {
InternalScrollSearchRequest deserializedRequest = new InternalScrollSearchRequest();
deserializedRequest.readFrom(in);
InternalScrollSearchRequest deserializedRequest = new InternalScrollSearchRequest(in);
assertEquals(deserializedRequest.id(), internalScrollSearchRequest.id());
assertEquals(deserializedRequest.scroll(), internalScrollSearchRequest.scroll());
assertNotSame(deserializedRequest, internalScrollSearchRequest);

View File

@ -43,8 +43,7 @@ public class SearchRequestTests extends AbstractSearchTestCase {
try (BytesStreamOutput output = new BytesStreamOutput()) {
searchRequest.writeTo(output);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) {
SearchRequest deserializedRequest = new SearchRequest();
deserializedRequest.readFrom(in);
SearchRequest deserializedRequest = new SearchRequest(in);
assertEquals(deserializedRequest, searchRequest);
assertEquals(deserializedRequest.hashCode(), searchRequest.hashCode());
assertNotSame(deserializedRequest, searchRequest);

View File

@ -61,8 +61,7 @@ public class ShardSearchTransportRequestTests extends AbstractSearchTestCase {
try (BytesStreamOutput output = new BytesStreamOutput()) {
shardSearchTransportRequest.writeTo(output);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) {
ShardSearchTransportRequest deserializedRequest = new ShardSearchTransportRequest();
deserializedRequest.readFrom(in);
ShardSearchTransportRequest deserializedRequest = new ShardSearchTransportRequest(in);
assertEquals(deserializedRequest.scroll(), shardSearchTransportRequest.scroll());
assertEquals(deserializedRequest.getAliasFilter(), shardSearchTransportRequest.getAliasFilter());
assertArrayEquals(deserializedRequest.indices(), shardSearchTransportRequest.indices());

View File

@ -137,7 +137,7 @@ public class SearchTemplateRequest extends ActionRequest implements CompositeInd
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
request = in.readOptionalStreamable(SearchRequest::new);
request = in.readOptionalWriteable(SearchRequest::new);
simulate = in.readBoolean();
explain = in.readBoolean();
profile = in.readBoolean();

View File

@ -55,6 +55,7 @@ import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
@ -688,7 +689,12 @@ public class ElasticsearchAssertions {
input = new NamedWriteableAwareStreamInput(input, namedWriteableRegistry);
}
input.setVersion(version);
// This is here since some Streamables are being converted into Writeables
// and the readFrom method throws an exception if called
Streamable newInstanceFromStream = tryCreateFromStream(streamable, input);
if (newInstanceFromStream == null) {
newInstance.readFrom(input);
}
assertThat("Stream should be fully read with version [" + version + "] for streamable [" + streamable + "]", input.available(),
equalTo(0));
BytesReference newBytes = serialize(version, streamable);
@ -749,6 +755,33 @@ public class ElasticsearchAssertions {
}
}
/**
* This attemps to construct a new {@link Streamable} object that is in the process of
* being converted from {@link Streamable} to {@link Writeable}. Assuming this constructs
* the object successfully, #readFrom should not be called on the constructed object.
*
* @param streamable the object to retrieve the type of class to construct the new instance from
* @param in the stream to read the object from
* @return the newly constructed object from reading the stream
* @throws NoSuchMethodException if constuctor cannot be found
* @throws InstantiationException if the class represents an abstract class
* @throws IllegalAccessException if this {@code Constructor} object
* is enforcing Java language access control and the underlying
* constructor is inaccessible.
* @throws InvocationTargetException if the underlying constructor
* throws an exception.
*/
private static Streamable tryCreateFromStream(Streamable streamable, StreamInput in) throws NoSuchMethodException,
InstantiationException, IllegalAccessException, InvocationTargetException {
try {
Class<? extends Streamable> clazz = streamable.getClass();
Constructor<? extends Streamable> constructor = clazz.getConstructor(StreamInput.class);
return constructor.newInstance(in);
} catch (NoSuchMethodException e) {
return null;
}
}
/**
* Applies basic assertions on the SearchResponse. This method checks if all shards were successful, if
* any of the shards threw an exception and if the response is serializable.