From 2247ab329541e7c2e73c7da9ae518d6ba7d895d5 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 28 Sep 2020 18:35:35 +0200 Subject: [PATCH] Make TransportNodesAction finishHim Execute on Configured Executor (#62753) (#62955) Currently, `finishHim` can either execute on the specified executor (in the less likely case that the local node request is the last to arrive) or on a transport thread. In case of e.g. `org.elasticsearch.action.admin.cluster.stats.TransportClusterStatsAction` this leads to an expensive execution that deserializes all mapping metadata in the cluster running on the transport thread and destabilizing the cluster. In case of this transport action it was specifically moved to the `MANAGEMENT` thread to avoid the high cost of processing the stats requests on the nodes during fan-out but that did not cover the final execution on the node that received the initial request. This PR adds to ability to optionally specify the executor for the final step of the nodes request execution and uses that to work around the issue for the slow `TransportClusterStatsAction`. Note: the specific problem that motivated this PR is essentially the same as https://github.com/elastic/elasticsearch/pull/57937 where we moved the execution off the transport and on the management thread as a fix as well. --- .../stats/TransportClusterStatsAction.java | 7 ++- .../support/nodes/TransportNodesAction.java | 46 ++++++++++++++----- 2 files changed, 39 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java index d54465ff845..3a0d22945b4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -45,6 +45,7 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.NodeService; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import org.elasticsearch.transport.Transports; import java.io.IOException; import java.util.ArrayList; @@ -64,8 +65,8 @@ public class TransportClusterStatsAction extends TransportNodesAction responses, List failures) { + assert Transports.assertNotTransportThread("Constructor of ClusterStatsResponse runs expensive computations on mappings found in" + + " the cluster state that are too slow for a transport thread"); ClusterState state = clusterService.state(); return new ClusterStatsResponse( System.currentTimeMillis(), diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index af7e48b1eeb..e6da86876dd 100644 --- a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.support.nodes; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; @@ -60,10 +61,24 @@ public abstract class TransportNodesAction nodeResponseClass; protected final String transportNodeAction; + private final String finalExecutor; + + /** + * @param actionName action name + * @param threadPool thread-pool + * @param clusterService cluster service + * @param transportService transport service + * @param actionFilters action filters + * @param request node request writer + * @param nodeRequest node request reader + * @param nodeExecutor executor to execute node action on + * @param finalExecutor executor to execute final collection of all responses on + * @param nodeResponseClass class of the node responses + */ protected TransportNodesAction(String actionName, ThreadPool threadPool, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, Writeable.Reader request, Writeable.Reader nodeRequest, String nodeExecutor, - Class nodeResponseClass) { + String finalExecutor, Class nodeResponseClass) { super(actionName, transportService, actionFilters, request); this.threadPool = threadPool; this.clusterService = Objects.requireNonNull(clusterService); @@ -71,9 +86,24 @@ public abstract class TransportNodesAction request, Writeable.Reader nodeRequest, String nodeExecutor, + Class nodeResponseClass) { + this(actionName, threadPool, clusterService, transportService, actionFilters, request, nodeRequest, nodeExecutor, + ThreadPool.Names.SAME, nodeResponseClass); } @Override @@ -230,15 +260,7 @@ public abstract class TransportNodesAction newResponse(request, responses))); } }