Core: Combine messageRecieved methods in TransportRequestHandler (#31519)

TransportRequestHandler currently contains 2 messageReceived methods,
one which takes a Task, and one that does not. The first just delegates
to the second. This commit changes all existing implementors of
TransportRequestHandler to implement the version which takes Task, thus
allowing the class to be a functional interface, and eliminating the
need to throw exceptions when a task needs to be ensured.
This commit is contained in:
Ryan Ernst 2018-06-22 07:36:03 -07:00 committed by GitHub
parent f023e95ae0
commit 59e7c6411a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 175 additions and 271 deletions

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
@ -91,7 +92,7 @@ public class Netty4ScheduledPingTests extends ESTestCase {
serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
new TransportRequestHandler<TransportRequest.Empty>() {
@Override
public void messageReceived(TransportRequest.Empty request, TransportChannel channel) {
public void messageReceived(TransportRequest.Empty request, TransportChannel channel, Task task) {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE, TransportResponseOptions.EMPTY);
} catch (IOException e) {

View File

@ -103,12 +103,12 @@ public class CrossClusterSearchUnavailableClusterIT extends ESRestTestCase {
MockTransportService newService = MockTransportService.createNewService(s, version, threadPool, null);
try {
newService.registerRequestHandler(ClusterSearchShardsAction.NAME, ThreadPool.Names.SAME, ClusterSearchShardsRequest::new,
(request, channel) -> {
(request, channel, task) -> {
channel.sendResponse(new ClusterSearchShardsResponse(new ClusterSearchShardsGroup[0],
knownNodes.toArray(new DiscoveryNode[0]), Collections.emptyMap()));
});
newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new,
(request, channel) -> {
(request, channel, task) -> {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
for (DiscoveryNode node : knownNodes) {
builder.add(node);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.admin.cluster.node.liveness;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequestHandler;
@ -39,7 +40,7 @@ public final class TransportLivenessAction implements TransportRequestHandler<Li
}
@Override
public void messageReceived(LivenessRequest request, TransportChannel channel) throws Exception {
public void messageReceived(LivenessRequest request, TransportChannel channel, Task task) throws Exception {
channel.sendResponse(new LivenessResponse(clusterService.getClusterName(), clusterService.localNode()));
}
}

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.threadpool.ThreadPool;
@ -285,7 +286,7 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
class BanParentRequestHandler implements TransportRequestHandler<BanParentTaskRequest> {
@Override
public void messageReceived(final BanParentTaskRequest request, final TransportChannel channel) throws Exception {
public void messageReceived(final BanParentTaskRequest request, final TransportChannel channel, Task task) throws Exception {
if (request.ban) {
logger.debug("Received ban for the parent [{}] on the node [{}], reason: [{}]", request.parentTaskId,
clusterService.localNode().getId(), request.reason);

View File

@ -45,13 +45,10 @@ import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TaskAwareTransportRequestHandler;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportActionProxy;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportRequest;
import org.elasticsearch.transport.TransportRequestOptions;
@ -314,150 +311,116 @@ public class SearchTransportService extends AbstractComponent {
public static void registerRequestHandler(TransportService transportService, SearchService searchService) {
transportService.registerRequestHandler(FREE_CONTEXT_SCROLL_ACTION_NAME, ThreadPool.Names.SAME, ScrollFreeContextRequest::new,
new TaskAwareTransportRequestHandler<ScrollFreeContextRequest>() {
@Override
public void messageReceived(ScrollFreeContextRequest request, TransportChannel channel, Task task) throws Exception {
boolean freed = searchService.freeContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
}
});
(request, channel, task) -> {
boolean freed = searchService.freeContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
});
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_SCROLL_ACTION_NAME,
(Supplier<TransportResponse>) SearchFreeContextResponse::new);
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, ThreadPool.Names.SAME, SearchFreeContextRequest::new,
new TaskAwareTransportRequestHandler<SearchFreeContextRequest>() {
@Override
public void messageReceived(SearchFreeContextRequest request, TransportChannel channel, Task task) throws Exception {
boolean freed = searchService.freeContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
}
});
(request, channel, task) -> {
boolean freed = searchService.freeContext(request.id());
channel.sendResponse(new SearchFreeContextResponse(freed));
});
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME,
(Supplier<TransportResponse>) SearchFreeContextResponse::new);
transportService.registerRequestHandler(CLEAR_SCROLL_CONTEXTS_ACTION_NAME, () -> TransportRequest.Empty.INSTANCE,
ThreadPool.Names.SAME, new TaskAwareTransportRequestHandler<TransportRequest.Empty>() {
@Override
public void messageReceived(TransportRequest.Empty request, TransportChannel channel, Task task) throws Exception {
searchService.freeAllScrollContexts();
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}
});
ThreadPool.Names.SAME, (request, channel, task) -> {
searchService.freeAllScrollContexts();
channel.sendResponse(TransportResponse.Empty.INSTANCE);
});
TransportActionProxy.registerProxyAction(transportService, CLEAR_SCROLL_CONTEXTS_ACTION_NAME,
() -> TransportResponse.Empty.INSTANCE);
transportService.registerRequestHandler(DFS_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
searchService.executeDfsPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
@Override
public void onResponse(SearchPhaseResult searchPhaseResult) {
try {
channel.sendResponse(searchPhaseResult);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
(request, channel, task) -> {
searchService.executeDfsPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
@Override
public void onResponse(SearchPhaseResult searchPhaseResult) {
try {
channel.sendResponse(searchPhaseResult);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
});
}
}
});
});
TransportActionProxy.registerProxyAction(transportService, DFS_ACTION_NAME, DfsSearchResult::new);
transportService.registerRequestHandler(QUERY_ACTION_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
@Override
public void onResponse(SearchPhaseResult searchPhaseResult) {
try {
channel.sendResponse(searchPhaseResult);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
(request, channel, task) -> {
searchService.executeQueryPhase(request, (SearchTask) task, new ActionListener<SearchPhaseResult>() {
@Override
public void onResponse(SearchPhaseResult searchPhaseResult) {
try {
channel.sendResponse(searchPhaseResult);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
});
}
}
});
});
TransportActionProxy.registerProxyAction(transportService, QUERY_ACTION_NAME,
(request) -> ((ShardSearchRequest)request).numberOfShards() == 1 ? QueryFetchSearchResult::new : QuerySearchResult::new);
transportService.registerRequestHandler(QUERY_ID_ACTION_NAME, ThreadPool.Names.SEARCH, QuerySearchRequest::new,
new TaskAwareTransportRequestHandler<QuerySearchRequest>() {
@Override
public void messageReceived(QuerySearchRequest request, TransportChannel channel, Task task) throws Exception {
QuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
(request, channel, task) -> {
QuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
});
TransportActionProxy.registerProxyAction(transportService, QUERY_ID_ACTION_NAME, QuerySearchResult::new);
transportService.registerRequestHandler(QUERY_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
@Override
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception {
ScrollQuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
(request, channel, task) -> {
ScrollQuerySearchResult result = searchService.executeQueryPhase(request, (SearchTask)task);
channel.sendResponse(result);
});
TransportActionProxy.registerProxyAction(transportService, QUERY_SCROLL_ACTION_NAME, ScrollQuerySearchResult::new);
transportService.registerRequestHandler(QUERY_FETCH_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, InternalScrollSearchRequest::new,
new TaskAwareTransportRequestHandler<InternalScrollSearchRequest>() {
@Override
public void messageReceived(InternalScrollSearchRequest request, TransportChannel channel, Task task) throws Exception {
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
(request, channel, task) -> {
ScrollQueryFetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
});
TransportActionProxy.registerProxyAction(transportService, QUERY_FETCH_SCROLL_ACTION_NAME, ScrollQueryFetchSearchResult::new);
transportService.registerRequestHandler(FETCH_ID_SCROLL_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchRequest::new,
new TaskAwareTransportRequestHandler<ShardFetchRequest>() {
@Override
public void messageReceived(ShardFetchRequest request, TransportChannel channel, Task task) throws Exception {
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
(request, channel, task) -> {
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
});
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_SCROLL_ACTION_NAME, FetchSearchResult::new);
transportService.registerRequestHandler(FETCH_ID_ACTION_NAME, ThreadPool.Names.SEARCH, ShardFetchSearchRequest::new,
new TaskAwareTransportRequestHandler<ShardFetchSearchRequest>() {
@Override
public void messageReceived(ShardFetchSearchRequest request, TransportChannel channel, Task task) throws Exception {
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
}
(request, channel, task) -> {
FetchSearchResult result = searchService.executeFetchPhase(request, (SearchTask)task);
channel.sendResponse(result);
});
TransportActionProxy.registerProxyAction(transportService, FETCH_ID_ACTION_NAME, FetchSearchResult::new);
// this is cheap, it does not fetch during the rewrite phase, so we can let it quickly execute on a networking thread
transportService.registerRequestHandler(QUERY_CAN_MATCH_NAME, ThreadPool.Names.SAME, ShardSearchTransportRequest::new,
new TaskAwareTransportRequestHandler<ShardSearchTransportRequest>() {
@Override
public void messageReceived(ShardSearchTransportRequest request, TransportChannel channel, Task task) throws Exception {
boolean canMatch = searchService.canMatch(request);
channel.sendResponse(new CanMatchResponse(canMatch));
}
(request, channel, task) -> {
boolean canMatch = searchService.canMatch(request);
channel.sendResponse(new CanMatchResponse(canMatch));
});
TransportActionProxy.registerProxyAction(transportService, QUERY_CAN_MATCH_NAME,
(Supplier<TransportResponse>) CanMatchResponse::new);

View File

@ -64,11 +64,6 @@ public abstract class HandledTransportAction<Request extends ActionRequest, Resp
class TransportHandler implements TransportRequestHandler<Request> {
@Override
public final void messageReceived(Request request, TransportChannel channel) throws Exception {
throw new UnsupportedOperationException("the task parameter is required for this operation");
}
@Override
public final void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
// We already got the task created on the network layer - no need to create it again on the transport layer

View File

@ -284,10 +284,5 @@ public abstract class TransportBroadcastAction<Request extends BroadcastRequest<
public void messageReceived(ShardRequest request, TransportChannel channel, Task task) throws Exception {
channel.sendResponse(shardOperation(request, task));
}
@Override
public final void messageReceived(final ShardRequest request, final TransportChannel channel) throws Exception {
throw new UnsupportedOperationException("the task parameter is required");
}
}
}

View File

@ -393,7 +393,7 @@ public abstract class TransportBroadcastByNodeAction<Request extends BroadcastRe
class BroadcastByNodeTransportRequestHandler implements TransportRequestHandler<NodeRequest> {
@Override
public void messageReceived(final NodeRequest request, TransportChannel channel) throws Exception {
public void messageReceived(final NodeRequest request, TransportChannel channel, Task task) throws Exception {
List<ShardRouting> shards = request.getShards();
final int totalShards = shards.size();
if (logger.isTraceEnabled()) {

View File

@ -258,12 +258,6 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
public void messageReceived(NodeRequest request, TransportChannel channel, Task task) throws Exception {
channel.sendResponse(nodeOperation(request, task));
}
@Override
public void messageReceived(NodeRequest request, TransportChannel channel) throws Exception {
channel.sendResponse(nodeOperation(request));
}
}
}

View File

@ -273,11 +273,6 @@ public abstract class TransportReplicationAction<
}
});
}
@Override
public void messageReceived(Request request, TransportChannel channel) throws Exception {
throw new UnsupportedOperationException("the task parameter is required for this operation");
}
}
protected class PrimaryOperationTransportHandler implements TransportRequestHandler<ConcreteShardRequest<Request>> {
@ -286,11 +281,6 @@ public abstract class TransportReplicationAction<
}
@Override
public void messageReceived(final ConcreteShardRequest<Request> request, final TransportChannel channel) throws Exception {
throw new UnsupportedOperationException("the task parameter is required for this operation");
}
@Override
public void messageReceived(ConcreteShardRequest<Request> request, TransportChannel channel, Task task) {
new AsyncPrimaryAction(request.request, request.targetAllocationID, request.primaryTerm, channel, (ReplicationTask) task).run();
@ -493,12 +483,6 @@ public abstract class TransportReplicationAction<
public class ReplicaOperationTransportHandler implements TransportRequestHandler<ConcreteReplicaRequest<ReplicaRequest>> {
@Override
public void messageReceived(
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest, final TransportChannel channel) throws Exception {
throw new UnsupportedOperationException("the task parameter is required for this operation");
}
@Override
public void messageReceived(
final ConcreteReplicaRequest<ReplicaRequest> replicaRequest,

View File

@ -37,6 +37,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
@ -243,7 +244,7 @@ public abstract class TransportInstanceSingleOperationAction<Request extends Ins
private class ShardTransportHandler implements TransportRequestHandler<Request> {
@Override
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
shardOperation(request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {

View File

@ -40,6 +40,7 @@ import org.elasticsearch.common.logging.LoggerMessageFormat;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
@ -271,7 +272,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
private class TransportHandler implements TransportRequestHandler<Request> {
@Override
public void messageReceived(Request request, final TransportChannel channel) throws Exception {
public void messageReceived(Request request, final TransportChannel channel, Task task) throws Exception {
// if we have a local operation, execute it on a thread since we don't spawn
execute(request, new ActionListener<Response>() {
@Override
@ -298,7 +299,7 @@ public abstract class TransportSingleShardAction<Request extends SingleShardRequ
private class ShardTransportHandler implements TransportRequestHandler<Request> {
@Override
public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
public void messageReceived(final Request request, final TransportChannel channel, Task task) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("executing [{}] on shard [{}]", request, request.internalShardId);
}

View File

@ -338,7 +338,7 @@ public abstract class TransportTasksAction<
class NodeTransportHandler implements TransportRequestHandler<NodeTaskRequest> {
@Override
public void messageReceived(final NodeTaskRequest request, final TransportChannel channel) throws Exception {
public void messageReceived(final NodeTaskRequest request, final TransportChannel channel, Task task) throws Exception {
nodeOperation(request, new ActionListener<NodeTasksResponse>() {
@Override
public void onResponse(

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
@ -65,7 +66,7 @@ public class NodeMappingRefreshAction extends AbstractComponent {
private class NodeMappingRefreshTransportHandler implements TransportRequestHandler<NodeMappingRefreshRequest> {
@Override
public void messageReceived(NodeMappingRefreshRequest request, TransportChannel channel) throws Exception {
public void messageReceived(NodeMappingRefreshRequest request, TransportChannel channel, Task task) throws Exception {
metaDataMappingService.refreshMapping(request.index(), request.indexUUID());
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

View File

@ -52,6 +52,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
@ -237,7 +238,7 @@ public class ShardStateAction extends AbstractComponent {
}
@Override
public void messageReceived(FailedShardEntry request, TransportChannel channel) throws Exception {
public void messageReceived(FailedShardEntry request, TransportChannel channel, Task task) throws Exception {
logger.debug(() -> new ParameterizedMessage("{} received shard failed for {}", request.shardId, request), request.failure);
clusterService.submitStateUpdateTask(
"shard-failed",
@ -487,7 +488,7 @@ public class ShardStateAction extends AbstractComponent {
}
@Override
public void messageReceived(StartedShardEntry request, TransportChannel channel) throws Exception {
public void messageReceived(StartedShardEntry request, TransportChannel channel, Task task) throws Exception {
logger.debug("{} received shard started for [{}]", request.shardId, request);
clusterService.submitStateUpdateTask(
"shard-started " + request,

View File

@ -34,6 +34,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
@ -321,7 +322,7 @@ public class MasterFaultDetection extends FaultDetection {
private class MasterPingRequestHandler implements TransportRequestHandler<MasterPingRequest> {
@Override
public void messageReceived(final MasterPingRequest request, final TransportChannel channel) throws Exception {
public void messageReceived(final MasterPingRequest request, final TransportChannel channel, Task task) throws Exception {
final DiscoveryNodes nodes = clusterStateSupplier.get().nodes();
// check if we are really the same master as the one we seemed to be think we are
// this can happen if the master got "kill -9" and then another node started using the same port

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
@ -133,7 +134,7 @@ public class MembershipAction extends AbstractComponent {
private class JoinRequestRequestHandler implements TransportRequestHandler<JoinRequest> {
@Override
public void messageReceived(final JoinRequest request, final TransportChannel channel) throws Exception {
public void messageReceived(final JoinRequest request, final TransportChannel channel, Task task) throws Exception {
listener.onJoin(request.node, new JoinCallback() {
@Override
public void onSuccess() {
@ -190,7 +191,7 @@ public class MembershipAction extends AbstractComponent {
}
@Override
public void messageReceived(ValidateJoinRequest request, TransportChannel channel) throws Exception {
public void messageReceived(ValidateJoinRequest request, TransportChannel channel, Task task) throws Exception {
DiscoveryNode node = localNodeSupplier.get();
assert node != null : "local node is null";
joinValidators.stream().forEach(action -> action.accept(node, request.state));
@ -281,7 +282,7 @@ public class MembershipAction extends AbstractComponent {
private class LeaveRequestRequestHandler implements TransportRequestHandler<LeaveRequest> {
@Override
public void messageReceived(LeaveRequest request, TransportChannel channel) throws Exception {
public void messageReceived(LeaveRequest request, TransportChannel channel, Task task) throws Exception {
listener.onLeave(request.node);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.TransportChannel;
@ -276,7 +277,7 @@ public class NodesFaultDetection extends FaultDetection {
class PingRequestHandler implements TransportRequestHandler<PingRequest> {
@Override
public void messageReceived(PingRequest request, TransportChannel channel) throws Exception {
public void messageReceived(PingRequest request, TransportChannel channel, Task task) throws Exception {
// if we are not the node we are supposed to be pinged, send an exception
// this can happen when a kill -9 is sent, and another node is started using the same port
if (!localNode.equals(request.targetNode())) {

View File

@ -45,6 +45,7 @@ import org.elasticsearch.discovery.AckClusterStatePublishResponseHandler;
import org.elasticsearch.discovery.BlockingClusterStatePublishResponseHandler;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.BytesTransportRequest;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
@ -447,14 +448,14 @@ public class PublishClusterStateAction extends AbstractComponent {
private class SendClusterStateRequestHandler implements TransportRequestHandler<BytesTransportRequest> {
@Override
public void messageReceived(BytesTransportRequest request, final TransportChannel channel) throws Exception {
public void messageReceived(BytesTransportRequest request, final TransportChannel channel, Task task) throws Exception {
handleIncomingClusterStateRequest(request, channel);
}
}
private class CommitClusterStateRequestHandler implements TransportRequestHandler<CommitClusterStateRequest> {
@Override
public void messageReceived(CommitClusterStateRequest request, final TransportChannel channel) throws Exception {
public void messageReceived(CommitClusterStateRequest request, final TransportChannel channel, Task task) throws Exception {
handleCommitRequest(request, channel);
}
}

View File

@ -45,6 +45,7 @@ import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ConnectionProfile;
@ -563,7 +564,7 @@ public class UnicastZenPing extends AbstractComponent implements ZenPing {
class UnicastPingRequestHandler implements TransportRequestHandler<UnicastPingRequest> {
@Override
public void messageReceived(UnicastPingRequest request, TransportChannel channel) throws Exception {
public void messageReceived(UnicastPingRequest request, TransportChannel channel, Task task) throws Exception {
if (closed) {
throw new AlreadyClosedException("node is shutting down");
}

View File

@ -56,6 +56,7 @@ import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.discovery.DiscoveryStats;
import org.elasticsearch.discovery.zen.PublishClusterStateAction.IncomingClusterStateListener;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
@ -1187,7 +1188,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
class RejoinClusterRequestHandler implements TransportRequestHandler<RejoinClusterRequest> {
@Override
public void messageReceived(final RejoinClusterRequest request, final TransportChannel channel) throws Exception {
public void messageReceived(final RejoinClusterRequest request, final TransportChannel channel, Task task) throws Exception {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (Exception e) {

View File

@ -37,6 +37,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
@ -112,7 +113,7 @@ public class LocalAllocateDangledIndices extends AbstractComponent {
class AllocateDangledRequestHandler implements TransportRequestHandler<AllocateDangledRequest> {
@Override
public void messageReceived(final AllocateDangledRequest request, final TransportChannel channel) throws Exception {
public void messageReceived(final AllocateDangledRequest request, final TransportChannel channel, Task task) throws Exception {
String[] indexNames = new String[request.indices.length];
for (int i = 0; i < request.indices.length; i++) {
indexNames[i] = request.indices[i].getIndex().getName();

View File

@ -54,6 +54,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
@ -778,7 +779,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
private final class PreSyncedFlushTransportHandler implements TransportRequestHandler<PreShardSyncedFlushRequest> {
@Override
public void messageReceived(PreShardSyncedFlushRequest request, TransportChannel channel) throws Exception {
public void messageReceived(PreShardSyncedFlushRequest request, TransportChannel channel, Task task) throws Exception {
channel.sendResponse(performPreSyncedFlush(request));
}
}
@ -786,7 +787,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
private final class SyncedFlushTransportHandler implements TransportRequestHandler<ShardSyncedFlushRequest> {
@Override
public void messageReceived(ShardSyncedFlushRequest request, TransportChannel channel) throws Exception {
public void messageReceived(ShardSyncedFlushRequest request, TransportChannel channel, Task task) throws Exception {
channel.sendResponse(performSyncedFlush(request));
}
}
@ -794,7 +795,7 @@ public class SyncedFlushService extends AbstractComponent implements IndexEventL
private final class InFlightOpCountTransportHandler implements TransportRequestHandler<InFlightOpsRequest> {
@Override
public void messageReceived(InFlightOpsRequest request, TransportChannel channel) throws Exception {
public void messageReceived(InFlightOpsRequest request, TransportChannel channel, Task task) throws Exception {
channel.sendResponse(performInFlightOps(request));
}
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportRequestHandler;
@ -103,7 +104,7 @@ public class PeerRecoverySourceService extends AbstractComponent implements Inde
class StartRecoveryTransportRequestHandler implements TransportRequestHandler<StartRecoveryRequest> {
@Override
public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel) throws Exception {
public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel, Task task) throws Exception {
RecoveryResponse response = recover(request);
channel.sendResponse(response);
}

View File

@ -55,6 +55,7 @@ import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.index.translog.TranslogCorruptedException;
import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.FutureTransportResponseHandler;
@ -397,7 +398,8 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
class PrepareForTranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryPrepareForTranslogOperationsRequest> {
@Override
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception {
public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel,
Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps());
@ -409,7 +411,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
class FinalizeRecoveryRequestHandler implements TransportRequestHandler<RecoveryFinalizeRecoveryRequest> {
@Override
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception {
public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef =
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
recoveryRef.target().finalizeRecovery(request.globalCheckpoint());
@ -421,7 +423,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
class WaitForClusterStateRequestHandler implements TransportRequestHandler<RecoveryWaitForClusterStateRequest> {
@Override
public void messageReceived(RecoveryWaitForClusterStateRequest request, TransportChannel channel) throws Exception {
public void messageReceived(RecoveryWaitForClusterStateRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().ensureClusterStateVersion(request.clusterStateVersion());
@ -433,7 +435,8 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
class HandoffPrimaryContextRequestHandler implements TransportRequestHandler<RecoveryHandoffPrimaryContextRequest> {
@Override
public void messageReceived(final RecoveryHandoffPrimaryContextRequest request, final TransportChannel channel) throws Exception {
public void messageReceived(final RecoveryHandoffPrimaryContextRequest request, final TransportChannel channel,
Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
recoveryRef.target().handoffPrimaryContext(request.primaryContext());
}
@ -445,7 +448,8 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
class TranslogOperationsRequestHandler implements TransportRequestHandler<RecoveryTranslogOperationsRequest> {
@Override
public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel) throws IOException {
public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel,
Task task) throws IOException {
try (RecoveryRef recoveryRef =
onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) {
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext());
@ -463,7 +467,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
@Override
public void onNewClusterState(ClusterState state) {
try {
messageReceived(request, channel);
messageReceived(request, channel, task);
} catch (Exception e) {
onFailure(e);
}
@ -537,7 +541,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
class FilesInfoRequestHandler implements TransportRequestHandler<RecoveryFilesInfoRequest> {
@Override
public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel) throws Exception {
public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().receiveFileInfo(request.phase1FileNames, request.phase1FileSizes, request.phase1ExistingFileNames,
@ -550,7 +554,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
class CleanFilesRequestHandler implements TransportRequestHandler<RecoveryCleanFilesRequest> {
@Override
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel) throws Exception {
public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.sourceMetaSnapshot());
@ -565,7 +569,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde
final AtomicLong bytesSinceLastPause = new AtomicLong();
@Override
public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel) throws Exception {
public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel, Task task) throws Exception {
try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId()
)) {
final RecoveryTarget recoveryTarget = recoveryRef.target();

View File

@ -49,6 +49,7 @@ import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
@ -299,7 +300,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
private class ShardActiveRequestHandler implements TransportRequestHandler<ShardActiveRequest> {
@Override
public void messageReceived(final ShardActiveRequest request, final TransportChannel channel) throws Exception {
public void messageReceived(final ShardActiveRequest request, final TransportChannel channel, Task task) throws Exception {
IndexShard indexShard = getShard(request);
// make sure shard is really there before register cluster state observer

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.RepositoriesService.VerifyResponse;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.EmptyTransportResponseHandler;
import org.elasticsearch.transport.TransportChannel;
@ -146,7 +147,7 @@ public class VerifyNodeRepositoryAction extends AbstractComponent {
class VerifyNodeRepositoryRequestHandler implements TransportRequestHandler<VerifyNodeRepositoryRequest> {
@Override
public void messageReceived(VerifyNodeRepositoryRequest request, TransportChannel channel) throws Exception {
public void messageReceived(VerifyNodeRepositoryRequest request, TransportChannel channel, Task task) throws Exception {
DiscoveryNode localNode = clusterService.state().nodes().getLocalNode();
try {
doVerify(request.repository, request.verificationToken, localNode);

View File

@ -59,7 +59,7 @@ public class RequestHandlerRegistry<Request extends TransportRequest> {
public void processMessageReceived(Request request, TransportChannel channel) throws Exception {
final Task task = taskManager.register(channel.getChannelType(), action, request);
if (task == null) {
handler.messageReceived(request, channel);
handler.messageReceived(request, channel, null);
} else {
boolean success = false;
try {

View File

@ -1,30 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.transport;
/**
* Transport request handlers that is using task context
*/
public abstract class TaskAwareTransportRequestHandler<T extends TransportRequest> implements TransportRequestHandler<T> {
@Override
public final void messageReceived(T request, TransportChannel channel) throws Exception {
throw new UnsupportedOperationException("the task parameter is required");
}
}

View File

@ -22,6 +22,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
@ -52,7 +53,7 @@ public final class TransportActionProxy {
}
@Override
public void messageReceived(T request, TransportChannel channel) throws Exception {
public void messageReceived(T request, TransportChannel channel, Task task) throws Exception {
DiscoveryNode targetNode = request.targetNode;
TransportRequest wrappedRequest = request.wrapped;
service.sendRequest(targetNode, action, wrappedRequest,

View File

@ -23,12 +23,5 @@ import org.elasticsearch.tasks.Task;
public interface TransportRequestHandler<T extends TransportRequest> {
/**
* Override this method if access to the Task parameter is needed
*/
default void messageReceived(final T request, final TransportChannel channel, Task task) throws Exception {
messageReceived(request, channel);
}
void messageReceived(T request, TransportChannel channel) throws Exception;
void messageReceived(T request, TransportChannel channel, Task task) throws Exception;
}

View File

@ -231,7 +231,7 @@ public class TransportService extends AbstractLifecycleComponent {
() -> HandshakeRequest.INSTANCE,
ThreadPool.Names.SAME,
false, false,
(request, channel) -> channel.sendResponse(
(request, channel, task) -> channel.sendResponse(
new HandshakeResponse(localNode, clusterName, localNode.getVersion())));
if (connectToRemoteCluster) {
// here we start to connect to the remote clusters

View File

@ -779,11 +779,6 @@ public class IndicesRequestIT extends ESIntegTestCase {
}
requestHandler.messageReceived(request, channel, task);
}
@Override
public void messageReceived(T request, TransportChannel channel) throws Exception {
messageReceived(request, channel, null);
}
}
}
}

View File

@ -364,7 +364,7 @@ public class TransportBroadcastByNodeActionTests extends ESTestCase {
TestTransportChannel channel = new TestTransportChannel();
handler.messageReceived(action.new NodeRequest(nodeId, new Request(), new ArrayList<>(shards)), channel);
handler.messageReceived(action.new NodeRequest(nodeId, new Request(), new ArrayList<>(shards)), channel, null);
// check the operation was executed only on the expected shards
assertEquals(shards, action.getResults().keySet());

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
@ -469,7 +470,7 @@ public class TransportClientNodesServiceTests extends ESTestCase {
}
@Override
public void messageReceived(ClusterStateRequest request, TransportChannel channel) throws Exception {
public void messageReceived(ClusterStateRequest request, TransportChannel channel, Task task) throws Exception {
if (block.get()) {
release.await();
return;

View File

@ -368,7 +368,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
.routingTable(RoutingTable.builder().add(indexRoutingTable).build());
if (incompatible) {
IllegalStateException ex = expectThrows(IllegalStateException.class, () ->
request.messageReceived(new MembershipAction.ValidateJoinRequest(stateBuilder.build()), null));
request.messageReceived(new MembershipAction.ValidateJoinRequest(stateBuilder.build()), null, null));
assertEquals("index [test] version not supported: "
+ VersionUtils.getPreviousVersion(Version.CURRENT.minimumIndexCompatibilityVersion())
+ " minimum compatible index version is: " + Version.CURRENT.minimumIndexCompatibilityVersion(), ex.getMessage());
@ -400,7 +400,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
public void sendResponse(Exception exception) throws IOException {
}
});
}, null);
assertTrue(sendResponse.get());
}
}

View File

@ -114,7 +114,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
MockTransportService newService = MockTransportService.createNewService(s, version, threadPool, null);
try {
newService.registerRequestHandler(ClusterSearchShardsAction.NAME,ThreadPool.Names.SAME, ClusterSearchShardsRequest::new,
(request, channel) -> {
(request, channel, task) -> {
if ("index_not_found".equals(request.preference())) {
channel.sendResponse(new IndexNotFoundException("index"));
} else {
@ -123,7 +123,7 @@ public class RemoteClusterConnectionTests extends ESTestCase {
}
});
newService.registerRequestHandler(ClusterStateAction.NAME, ThreadPool.Names.SAME, ClusterStateRequest::new,
(request, channel) -> {
(request, channel, task) -> {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder();
for (DiscoveryNode node : knownNodes) {
builder.add(node);

View File

@ -18,7 +18,6 @@
*/
package org.elasticsearch.transport;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
@ -26,6 +25,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.threadpool.TestThreadPool;
@ -88,7 +88,7 @@ public class TransportActionProxyTests extends ESTestCase {
public void testSendMessage() throws InterruptedException {
serviceA.registerRequestHandler("/test", SimpleTestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
(request, channel, task) -> {
assertEquals(request.sourceNode, "TS_A");
SimpleTestResponse response = new SimpleTestResponse();
response.targetNode = "TS_A";
@ -98,7 +98,7 @@ public class TransportActionProxyTests extends ESTestCase {
serviceA.connectToNode(nodeB);
serviceB.registerRequestHandler("/test", SimpleTestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
(request, channel, task) -> {
assertEquals(request.sourceNode, "TS_A");
SimpleTestResponse response = new SimpleTestResponse();
response.targetNode = "TS_B";
@ -107,7 +107,7 @@ public class TransportActionProxyTests extends ESTestCase {
TransportActionProxy.registerProxyAction(serviceB, "/test", SimpleTestResponse::new);
serviceB.connectToNode(nodeC);
serviceC.registerRequestHandler("/test", SimpleTestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
(request, channel, task) -> {
assertEquals(request.sourceNode, "TS_A");
SimpleTestResponse response = new SimpleTestResponse();
response.targetNode = "TS_C";
@ -151,7 +151,7 @@ public class TransportActionProxyTests extends ESTestCase {
public void testException() throws InterruptedException {
serviceA.registerRequestHandler("/test", SimpleTestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
(request, channel, task) -> {
assertEquals(request.sourceNode, "TS_A");
SimpleTestResponse response = new SimpleTestResponse();
response.targetNode = "TS_A";
@ -161,7 +161,7 @@ public class TransportActionProxyTests extends ESTestCase {
serviceA.connectToNode(nodeB);
serviceB.registerRequestHandler("/test", SimpleTestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
(request, channel, task) -> {
assertEquals(request.sourceNode, "TS_A");
SimpleTestResponse response = new SimpleTestResponse();
response.targetNode = "TS_B";
@ -170,7 +170,7 @@ public class TransportActionProxyTests extends ESTestCase {
TransportActionProxy.registerProxyAction(serviceB, "/test", SimpleTestResponse::new);
serviceB.connectToNode(nodeC);
serviceC.registerRequestHandler("/test", SimpleTestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
(request, channel, task) -> {
throw new ElasticsearchException("greetings from TS_C");
});
TransportActionProxy.registerProxyAction(serviceC, "/test", SimpleTestResponse::new);

View File

@ -47,6 +47,7 @@ import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.mocksocket.MockServerSocket;
import org.elasticsearch.node.Node;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.transport.MockTransportService;
@ -205,7 +206,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testHelloWorld() {
serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC,
(request, channel) -> {
(request, channel, task) -> {
assertThat("moshe", equalTo(request.message));
try {
channel.sendResponse(new StringMessageResponse("hello " + request.message));
@ -280,7 +281,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testThreadContext() throws ExecutionException, InterruptedException {
serviceA.registerRequestHandler("ping_pong", StringMessageRequest::new, ThreadPool.Names.GENERIC, (request, channel) -> {
serviceA.registerRequestHandler("ping_pong", StringMessageRequest::new, ThreadPool.Names.GENERIC, (request, channel, task) -> {
assertEquals("ping_user", threadPool.getThreadContext().getHeader("test.ping.user"));
assertNull(threadPool.getThreadContext().getTransient("my_private_context"));
try {
@ -339,7 +340,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
serviceA.disconnectFromNode(nodeA);
final AtomicReference<Exception> exception = new AtomicReference<>();
serviceA.registerRequestHandler("localNode", StringMessageRequest::new, ThreadPool.Names.GENERIC,
(request, channel) -> {
(request, channel, task) -> {
try {
channel.sendResponse(new StringMessageResponse(request.message));
} catch (IOException e) {
@ -377,7 +378,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
public void testAdapterSendReceiveCallbacks() throws Exception {
final TransportRequestHandler<TransportRequest.Empty> requestHandler = (request, channel) -> {
final TransportRequestHandler<TransportRequest.Empty> requestHandler = (request, channel, task) -> {
try {
if (randomBoolean()) {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
@ -485,7 +486,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testVoidMessageCompressed() {
serviceA.registerRequestHandler("sayHello", TransportRequest.Empty::new, ThreadPool.Names.GENERIC,
(request, channel) -> {
(request, channel, task) -> {
try {
TransportResponseOptions responseOptions = TransportResponseOptions.builder().withCompress(true).build();
channel.sendResponse(TransportResponse.Empty.INSTANCE, responseOptions);
@ -531,7 +532,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC,
new TransportRequestHandler<StringMessageRequest>() {
@Override
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
public void messageReceived(StringMessageRequest request, TransportChannel channel, Task task) {
assertThat("moshe", equalTo(request.message));
try {
TransportResponseOptions responseOptions = TransportResponseOptions.builder().withCompress(true).build();
@ -580,7 +581,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
serviceA.registerRequestHandler("sayHelloException", StringMessageRequest::new, ThreadPool.Names.GENERIC,
new TransportRequestHandler<StringMessageRequest>() {
@Override
public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception {
public void messageReceived(StringMessageRequest request, TransportChannel channel, Task task) throws Exception {
assertThat("moshe", equalTo(request.message));
throw new RuntimeException("bad message !!!");
}
@ -639,7 +640,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
Set<Exception> sendingErrors = ConcurrentCollections.newConcurrentSet();
Set<Exception> responseErrors = ConcurrentCollections.newConcurrentSet();
serviceA.registerRequestHandler("test", TestRequest::new,
randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC, (request, channel) -> {
randomBoolean() ? ThreadPool.Names.SAME : ThreadPool.Names.GENERIC, (request, channel, task) -> {
try {
channel.sendResponse(new TestResponse());
} catch (Exception e) {
@ -647,7 +648,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
responseErrors.add(e);
}
});
final TransportRequestHandler<TestRequest> ignoringRequestHandler = (request, channel) -> {
final TransportRequestHandler<TestRequest> ignoringRequestHandler = (request, channel, task) -> {
try {
channel.sendResponse(new TestResponse());
} catch (Exception e) {
@ -763,7 +764,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
final CountDownLatch latch2 = new CountDownLatch(1);
try {
serviceA.registerRequestHandler("foobar", StringMessageRequest::new, ThreadPool.Names.GENERIC,
(request, channel) -> {
(request, channel, task) -> {
try {
latch2.await();
logger.info("Stop ServiceB now");
@ -791,7 +792,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
serviceA.registerRequestHandler("sayHelloTimeoutNoResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC,
new TransportRequestHandler<StringMessageRequest>() {
@Override
public void messageReceived(StringMessageRequest request, TransportChannel channel) {
public void messageReceived(StringMessageRequest request, TransportChannel channel, Task task) {
assertThat("moshe", equalTo(request.message));
// don't send back a response
}
@ -836,7 +837,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
serviceA.registerRequestHandler("sayHelloTimeoutDelayedResponse", StringMessageRequest::new, ThreadPool.Names.GENERIC,
new TransportRequestHandler<StringMessageRequest>() {
@Override
public void messageReceived(StringMessageRequest request, TransportChannel channel) throws InterruptedException {
public void messageReceived(StringMessageRequest request, TransportChannel channel, Task task) throws InterruptedException {
String message = request.message;
inFlight.acquireUninterruptibly();
try {
@ -938,10 +939,10 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
public void testTracerLog() throws InterruptedException {
TransportRequestHandler handler = (request, channel) -> channel.sendResponse(new StringMessageResponse(""));
TransportRequestHandler handler = (request, channel, task) -> channel.sendResponse(new StringMessageResponse(""));
TransportRequestHandler handlerWithError = new TransportRequestHandler<StringMessageRequest>() {
@Override
public void messageReceived(StringMessageRequest request, TransportChannel channel) throws Exception {
public void messageReceived(StringMessageRequest request, TransportChannel channel, Task task) throws Exception {
if (request.timeout() > 0) {
Thread.sleep(request.timeout);
}
@ -1257,7 +1258,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
serviceB.registerRequestHandler("/version", Version1Request::new, ThreadPool.Names.SAME,
new TransportRequestHandler<Version1Request>() {
@Override
public void messageReceived(Version1Request request, TransportChannel channel) throws Exception {
public void messageReceived(Version1Request request, TransportChannel channel, Task task) throws Exception {
assertThat(request.value1, equalTo(1));
assertThat(request.value2, equalTo(0)); // not set, coming from service A
Version1Response response = new Version1Response();
@ -1301,7 +1302,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
serviceA.registerRequestHandler("/version", Version0Request::new, ThreadPool.Names.SAME,
new TransportRequestHandler<Version0Request>() {
@Override
public void messageReceived(Version0Request request, TransportChannel channel) throws Exception {
public void messageReceived(Version0Request request, TransportChannel channel, Task task) throws Exception {
assertThat(request.value1, equalTo(1));
Version0Response response = new Version0Response();
response.value1 = 1;
@ -1344,7 +1345,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testVersionFrom1to1() throws Exception {
serviceB.registerRequestHandler("/version", Version1Request::new, ThreadPool.Names.SAME,
(request, channel) -> {
(request, channel, task) -> {
assertThat(request.value1, equalTo(1));
assertThat(request.value2, equalTo(2));
Version1Response response = new Version1Response();
@ -1388,7 +1389,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testVersionFrom0to0() throws Exception {
serviceA.registerRequestHandler("/version", Version0Request::new, ThreadPool.Names.SAME,
(request, channel) -> {
(request, channel, task) -> {
assertThat(request.value1, equalTo(1));
Version0Response response = new Version0Response();
response.value1 = 1;
@ -1427,7 +1428,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testMockFailToSendNoConnectRule() throws Exception {
serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC,
(request, channel) -> {
(request, channel, task) -> {
assertThat("moshe", equalTo(request.message));
throw new RuntimeException("bad message !!!");
});
@ -1484,7 +1485,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testMockUnresponsiveRule() throws IOException {
serviceA.registerRequestHandler("sayHello", StringMessageRequest::new, ThreadPool.Names.GENERIC,
(request, channel) -> {
(request, channel, task) -> {
assertThat("moshe", equalTo(request.message));
throw new RuntimeException("bad message !!!");
});
@ -1540,7 +1541,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
final AtomicReference<TransportAddress> addressB = new AtomicReference<>();
serviceB.registerRequestHandler("action1", TestRequest::new, ThreadPool.Names.SAME, new TransportRequestHandler<TestRequest>() {
@Override
public void messageReceived(TestRequest request, TransportChannel channel) throws Exception {
public void messageReceived(TestRequest request, TransportChannel channel, Task task) throws Exception {
addressA.set(request.remoteAddress());
channel.sendResponse(new TestResponse());
latch.countDown();
@ -1582,7 +1583,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
Settings.EMPTY, false, false)) {
AtomicBoolean requestProcessed = new AtomicBoolean(false);
service.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
(request, channel, task) -> {
requestProcessed.set(true);
channel.sendResponse(TransportResponse.Empty.INSTANCE);
});
@ -1744,7 +1745,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
}
@Override
public void messageReceived(TestRequest request, TransportChannel channel) throws Exception {
public void messageReceived(TestRequest request, TransportChannel channel, Task task) throws Exception {
if (randomBoolean()) {
Thread.sleep(randomIntBetween(10, 50));
}
@ -1868,18 +1869,18 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
public void testRegisterHandlerTwice() {
serviceB.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
(request, message) -> {
(request, message, task) -> {
throw new AssertionError("boom");
});
expectThrows(IllegalArgumentException.class, () ->
serviceB.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
(request, message) -> {
(request, message, task) -> {
throw new AssertionError("boom");
})
);
serviceA.registerRequestHandler("action1", TestRequest::new, randomFrom(ThreadPool.Names.SAME, ThreadPool.Names.GENERIC),
(request, message) -> {
(request, message, task) -> {
throw new AssertionError("boom");
});
}
@ -2066,7 +2067,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
List<String> executors = new ArrayList<>(ThreadPool.THREAD_POOL_TYPES.keySet());
CollectionUtil.timSort(executors); // makes sure it's reproducible
serviceA.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
(request, channel, task) -> {
threadPool.getThreadContext().putTransient("boom", new Object());
threadPool.getThreadContext().addResponseHeader("foo.bar", "baz");
@ -2127,7 +2128,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
CollectionUtil.timSort(executors); // makes sure it's reproducible
TransportService serviceC = build(Settings.builder().put("name", "TS_TEST").build(), version0, null, true);
serviceC.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
(request, channel, task) -> {
// do nothing
});
serviceC.start();
@ -2187,7 +2188,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
CountDownLatch receivedLatch = new CountDownLatch(1);
CountDownLatch sendResponseLatch = new CountDownLatch(1);
serviceC.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
(request, channel, task) -> {
// don't block on a network thread here
threadPool.generic().execute(new AbstractRunnable() {
@Override
@ -2255,7 +2256,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
CountDownLatch receivedLatch = new CountDownLatch(1);
CountDownLatch sendResponseLatch = new CountDownLatch(1);
serviceB.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
(request, channel, task) -> {
// don't block on a network thread here
threadPool.generic().execute(new AbstractRunnable() {
@Override
@ -2368,7 +2369,7 @@ public abstract class AbstractSimpleTransportTestCase extends ESTestCase {
Exception ex = new RuntimeException("boom");
ex.setStackTrace(new StackTraceElement[0]);
serviceB.registerRequestHandler("action", TestRequest::new, ThreadPool.Names.SAME,
(request, channel) -> {
(request, channel, task) -> {
// don't block on a network thread here
threadPool.generic().execute(new AbstractRunnable() {
@Override

View File

@ -399,11 +399,6 @@ public class TransportRollupSearchAction extends TransportAction<SearchRequest,
class TransportHandler implements TransportRequestHandler<SearchRequest> {
@Override
public final void messageReceived(SearchRequest request, TransportChannel channel) throws Exception {
throw new UnsupportedOperationException("the task parameter is required for this operation");
}
@Override
public final void messageReceived(final SearchRequest request, final TransportChannel channel, Task task) throws Exception {
// We already got the task created on the network layer - no need to create it again on the transport layer

View File

@ -318,10 +318,5 @@ public class SecurityServerTransportInterceptor extends AbstractComponent implem
}
}
}
@Override
public void messageReceived(T request, TransportChannel channel) throws Exception {
throw new UnsupportedOperationException("task parameter is required for this operation");
}
}
}