mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-26 14:54:56 +00:00
Fix serialization errors when cross cluster search goes to a single shard (#26881)
The single shard optimization that we have in our search api changes the type of response returned by the query transport action name based on the shard search request. if the request goes to one shard, we will do query and fetch at the same time, hence the response will be different. The proxying layer used in cross cluster search was not aware of this distinction, which causes serialization issues every time a cross cluster search request goes to a single shard and goes through a gateway node which has to forward the shard request to a data node. The coordinating node would then expect a QueryFetchSearchResult while the gateway would return a QuerySearchResult. Closes #26833
This commit is contained in:
parent
84a3899550
commit
9b9cb81c41
@ -40,6 +40,7 @@ import org.elasticsearch.search.fetch.ScrollQueryFetchSearchResult;
|
||||
import org.elasticsearch.search.fetch.ShardFetchRequest;
|
||||
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
|
||||
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
|
||||
import org.elasticsearch.search.internal.ShardSearchRequest;
|
||||
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
|
||||
import org.elasticsearch.search.query.QuerySearchRequest;
|
||||
import org.elasticsearch.search.query.QuerySearchResult;
|
||||
@ -320,7 +321,8 @@ public class SearchTransportService extends AbstractComponent {
|
||||
channel.sendResponse(new SearchFreeContextResponse(freed));
|
||||
}
|
||||
});
|
||||
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME, SearchFreeContextResponse::new);
|
||||
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME,
|
||||
(Supplier<TransportResponse>) SearchFreeContextResponse::new);
|
||||
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, SearchFreeContextRequest::new,
|
||||
new TaskAwareTransportRequestHandler<SearchFreeContextRequest>() {
|
||||
@Override
|
||||
@ -329,7 +331,8 @@ public class SearchTransportService extends AbstractComponent {
|
||||
channel.sendResponse(new SearchFreeContextResponse(freed));
|
||||
}
|
||||
});
|
||||
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, SearchFreeContextResponse::new);
|
||||
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME,
|
||||
(Supplier<TransportResponse>) SearchFreeContextResponse::new);
|
||||
transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, () -> TransportRequest.Empty.INSTANCE,
|
||||
ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler<TransportRequest.Empty>() {
|
||||
@Override
|
||||
@ -339,7 +342,7 @@ public class SearchTransportService extends AbstractComponent {
|
||||
}
|
||||
});
|
||||
TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
|
||||
() -> TransportResponse.Empty.INSTANCE);
|
||||
() -> TransportResponse.Empty.INSTANCE);
|
||||
|
||||
transportService.registerRequestHandler(DFS_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
|
||||
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
|
||||
@ -394,7 +397,8 @@ public class SearchTransportService extends AbstractComponent {
|
||||
});
|
||||
}
|
||||
});
|
||||
TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME, QuerySearchResult::new);
|
||||
TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME,
|
||||
(request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new);
|
||||
|
||||
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SEARCH, QuerySearchRequest::new,
|
||||
new TaskAwareTransportRequestHandler<QuerySearchRequest>() {
|
||||
@ -455,7 +459,8 @@ public class SearchTransportService extends AbstractComponent {
|
||||
channel.sendResponse(new CanMatchResponse(canMatch));
|
||||
}
|
||||
});
|
||||
TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME, CanMatchResponse::new);
|
||||
TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME,
|
||||
(Supplier<TransportResponse>) CanMatchResponse::new);
|
||||
}
|
||||
|
||||
public static final class CanMatchResponse extends SearchPhaseResult {
|
||||
|
@ -18,7 +18,6 @@
|
||||
*/
|
||||
package org.elasticsearch.transport;
|
||||
|
||||
import org.apache.logging.log4j.util.Supplier;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
@ -27,6 +26,8 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.UncheckedIOException;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* TransportActionProxy allows an arbitrary action to be executed on a defined target node while the initial request is sent to a second
|
||||
@ -41,19 +42,21 @@ public final class TransportActionProxy {
|
||||
|
||||
private final TransportService service;
|
||||
private final String action;
|
||||
private final Supplier<TransportResponse> responseFactory;
|
||||
private final Function<TransportRequest, Supplier<TransportResponse>> responseFunction;
|
||||
|
||||
ProxyRequestHandler(TransportService service, String action, Supplier<TransportResponse> responseFactory) {
|
||||
ProxyRequestHandler(TransportService service, String action, Function<TransportRequest,
|
||||
Supplier<TransportResponse>> responseFunction) {
|
||||
this.service = service;
|
||||
this.action = action;
|
||||
this.responseFactory = responseFactory;
|
||||
this.responseFunction = responseFunction;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void messageReceived(T request, TransportChannel channel) throws Exception {
|
||||
DiscoveryNode targetNode = request.targetNode;
|
||||
TransportRequest wrappedRequest = request.wrapped;
|
||||
service.sendRequest(targetNode, action, wrappedRequest, new ProxyResponseHandler<>(channel, responseFactory));
|
||||
service.sendRequest(targetNode, action, wrappedRequest,
|
||||
new ProxyResponseHandler<>(channel, responseFunction.apply(wrappedRequest)));
|
||||
}
|
||||
}
|
||||
|
||||
@ -126,12 +129,24 @@ public final class TransportActionProxy {
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a proxy request handler that allows to forward requests for the given action to another node.
|
||||
* Registers a proxy request handler that allows to forward requests for the given action to another node. To be used when the
|
||||
* response type changes based on the upcoming request (quite rare)
|
||||
*/
|
||||
public static void registerProxyAction(TransportService service, String action,
|
||||
Function<TransportRequest, Supplier<TransportResponse>> responseFunction) {
|
||||
RequestHandlerRegistry requestHandler = service.getRequestHandler(action);
|
||||
service.registerRequestHandler(getProxyAction(action), () -> new ProxyRequest(requestHandler::newRequest), ThreadPool.Names.SAME,
|
||||
true, false, new ProxyRequestHandler<>(service, action, responseFunction));
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers a proxy request handler that allows to forward requests for the given action to another node. To be used when the
|
||||
* response type is always the same (most of the cases).
|
||||
*/
|
||||
public static void registerProxyAction(TransportService service, String action, Supplier<TransportResponse> responseSupplier) {
|
||||
RequestHandlerRegistry requestHandler = service.getRequestHandler(action);
|
||||
service.registerRequestHandler(getProxyAction(action), () -> new ProxyRequest(requestHandler::newRequest), ThreadPool.Names.SAME,
|
||||
true, false, new ProxyRequestHandler<>(service, action, responseSupplier));
|
||||
true, false, new ProxyRequestHandler<>(service, action, request -> responseSupplier));
|
||||
}
|
||||
|
||||
private static final String PROXY_ACTION_PREFIX = "internal:transport/proxy/";
|
||||
|
@ -165,3 +165,14 @@
|
||||
- match: { hits.total: 2 }
|
||||
- match: { hits.hits.0._source.filter_field: 1 }
|
||||
- match: { hits.hits.0._index: "my_remote_cluster:test_index" }
|
||||
|
||||
---
|
||||
"Single shard search gets properly proxied":
|
||||
|
||||
- do:
|
||||
search:
|
||||
index: "my_remote_cluster:single_doc_index"
|
||||
|
||||
- match: { _shards.total: 1 }
|
||||
- match: { hits.total: 1 }
|
||||
- match: { hits.hits.0._index: "my_remote_cluster:single_doc_index"}
|
||||
|
Loading…
x
Reference in New Issue
Block a user