From 780b4c72fe90c83ce08b7c5b96f70e91f53b1f73 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Sat, 26 Jan 2019 22:01:30 -0500 Subject: [PATCH] Make ChannelActionListener a top-level class (#37797) We start using this class more often. Let's make it a top-level class. --- .../action/search/SearchTransportService.java | 2 +- .../action/support/ChannelActionListener.java | 62 +++++++++++++++++++ .../support/HandledTransportAction.java | 38 ------------ .../shard/TransportSingleShardAction.java | 7 +-- .../recovery/PeerRecoverySourceService.java | 4 +- .../recovery/PeerRecoveryTargetService.java | 13 ++-- 6 files changed, 73 insertions(+), 53 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java index 577ce4f6b7a..7d2dd9a22b2 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchTransportService.java @@ -23,7 +23,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListenerResponseHandler; import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.OriginalIndices; -import org.elasticsearch.action.support.HandledTransportAction.ChannelActionListener; +import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; diff --git a/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java b/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java new file mode 100644 index 00000000000..b23758758e2 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/support/ChannelActionListener.java @@ -0,0 +1,62 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action.support; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.transport.TransportChannel; +import org.elasticsearch.transport.TransportRequest; +import org.elasticsearch.transport.TransportResponse; + +public final class ChannelActionListener< + Response extends TransportResponse, Request extends TransportRequest> implements ActionListener { + + private static final Logger logger = LogManager.getLogger(ChannelActionListener.class); + private final TransportChannel channel; + private final Request request; + private final String actionName; + + public ChannelActionListener(TransportChannel channel, String actionName, Request request) { + this.channel = channel; + this.request = request; + this.actionName = actionName; + } + + @Override + public void onResponse(Response response) { + try { + channel.sendResponse(response); + } catch (Exception e) { + onFailure(e); + } + } + + @Override + public void onFailure(Exception e) { + try { + channel.sendResponse(e); + } catch (Exception e1) { + logger.warn(() -> new ParameterizedMessage( + "Failed to send error response for action [{}] and request [{}]", actionName, request), e1); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java b/server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java index f1f4962851c..0b35bc8fb89 100644 --- a/server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/HandledTransportAction.java @@ -18,19 +18,14 @@ */ package org.elasticsearch.action.support; -import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.tasks.Task; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportChannel; -import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestHandler; -import org.elasticsearch.transport.TransportResponse; import org.elasticsearch.transport.TransportService; import java.util.function.Supplier; @@ -74,37 +69,4 @@ public abstract class HandledTransportAction implements - ActionListener { - private final Logger logger = LogManager.getLogger(getClass()); - private final TransportChannel channel; - private final Request request; - private final String actionName; - - public ChannelActionListener(TransportChannel channel, String actionName, Request request) { - this.channel = channel; - this.request = request; - this.actionName = actionName; - } - - @Override - public void onResponse(Response response) { - try { - channel.sendResponse(response); - } catch (Exception e) { - onFailure(e); - } - } - - @Override - public void onFailure(Exception e) { - try { - channel.sendResponse(e); - } catch (Exception e1) { - logger.warn(() -> new ParameterizedMessage( - "Failed to send error response for action [{}] and request [{}]", actionName, request), e1); - } - } - } - } diff --git a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java index a028c3f0e1e..6d7ad085dcd 100644 --- a/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/single/shard/TransportSingleShardAction.java @@ -24,7 +24,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.TransportAction; import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.ClusterState; @@ -285,7 +285,7 @@ public abstract class TransportSingleShardAction(channel, actionName, request)); + execute(request, new ChannelActionListener<>(channel, actionName, request)); } } @@ -296,8 +296,7 @@ public abstract class TransportSingleShardAction(channel, - transportShardAction, request)); + asyncShardOperation(request, request.internalShardId, new ChannelActionListener<>(channel, transportShardAction, request)); } } /** diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java index 556df71ca2c..f53e8edecd9 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java @@ -23,7 +23,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; @@ -109,7 +109,7 @@ public class PeerRecoverySourceService implements IndexEventListener { class StartRecoveryTransportRequestHandler implements TransportRequestHandler { @Override public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel, Task task) throws Exception { - recover(request, new HandledTransportAction.ChannelActionListener<>(channel, Actions.START_RECOVERY, request)); + recover(request, new ChannelActionListener<>(channel, Actions.START_RECOVERY, request)); } } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index bc2196501fb..dbbaed1132e 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -30,7 +30,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.action.support.ChannelActionListener; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; @@ -433,8 +433,7 @@ public class PeerRecoveryTargetService implements IndexEventListener { @Override public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel, Task task) { try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { - final ActionListener listener = - new HandledTransportAction.ChannelActionListener<>(channel, Actions.PREPARE_TRANSLOG, request); + final ActionListener listener = new ChannelActionListener<>(channel, Actions.PREPARE_TRANSLOG, request); recoveryRef.target().prepareForTranslogOperations(request.isFileBasedRecovery(), request.totalTranslogOps(), ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure)); } @@ -446,8 +445,7 @@ public class PeerRecoveryTargetService implements IndexEventListener { @Override public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel, Task task) throws Exception { try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { - final ActionListener listener = - new HandledTransportAction.ChannelActionListener<>(channel, Actions.FINALIZE, request); + final ActionListener listener = new ChannelActionListener<>(channel, Actions.FINALIZE, request); recoveryRef.target().finalizeRecovery(request.globalCheckpoint(), ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure)); } @@ -489,7 +487,7 @@ public class PeerRecoveryTargetService implements IndexEventListener { final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); final RecoveryTarget recoveryTarget = recoveryRef.target(); final ActionListener listener = - new HandledTransportAction.ChannelActionListener<>(channel, Actions.TRANSLOG_OPS, request); + new ChannelActionListener<>(channel, Actions.TRANSLOG_OPS, request); final Consumer retryOnMappingException = exception -> { // in very rare cases a translog replay from primary is processed before a mapping update on this node // which causes local mapping changes since the mapping (clusterstate) might not have arrived on this node. @@ -626,8 +624,7 @@ public class PeerRecoveryTargetService implements IndexEventListener { recoveryTarget.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos); } } - final ActionListener listener = - new HandledTransportAction.ChannelActionListener<>(channel, Actions.FILE_CHUNK, request); + final ActionListener listener = new ChannelActionListener<>(channel, Actions.FILE_CHUNK, request); recoveryTarget.writeFileChunk(request.metadata(), request.position(), request.content(), request.lastChunk(), request.totalTranslogOps(), ActionListener.wrap(nullVal -> listener.onResponse(TransportResponse.Empty.INSTANCE), listener::onFailure));