Make TransportNodesAction finishHim Execute on Configured Executor (#62753) (#62955)

Currently, `finishHim` can either execute on the specified executor
(in the less likely case that the local node request is the last to arrive)
or on a transport thread.
In case of e.g. `org.elasticsearch.action.admin.cluster.stats.TransportClusterStatsAction`
this leads to an expensive execution that deserializes all mapping metadata in the cluster
running on the transport thread and destabilizing the cluster. In case of this transport
action it was specifically moved to the `MANAGEMENT` thread to avoid the high cost of processing
the stats requests on the nodes during fan-out but that did not cover the final execution
on the node that received the initial request. This PR  adds to ability to optionally specify the executor for the final step of the
nodes request execution and uses that to work around the issue for the slow `TransportClusterStatsAction`.

Note: the specific problem that motivated this PR is essentially the same as https://github.com/elastic/elasticsearch/pull/57937 where we moved the execution off the transport and on the management thread as a fix as well.
This commit is contained in:
Armin Braun 2020-09-28 18:35:35 +02:00 committed by GitHub
parent c3e07da437
commit 2247ab3295
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 39 additions and 14 deletions

View File

@ -45,6 +45,7 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeService; import org.elasticsearch.node.NodeService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.Transports;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
@ -64,8 +65,8 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
@Inject @Inject
public TransportClusterStatsAction(ThreadPool threadPool, ClusterService clusterService, TransportService transportService, public TransportClusterStatsAction(ThreadPool threadPool, ClusterService clusterService, TransportService transportService,
NodeService nodeService, IndicesService indicesService, ActionFilters actionFilters) { NodeService nodeService, IndicesService indicesService, ActionFilters actionFilters) {
super(ClusterStatsAction.NAME, threadPool, clusterService, transportService, actionFilters, super(ClusterStatsAction.NAME, threadPool, clusterService, transportService, actionFilters, ClusterStatsRequest::new,
ClusterStatsRequest::new, ClusterStatsNodeRequest::new, ThreadPool.Names.MANAGEMENT, ClusterStatsNodeResponse.class); ClusterStatsNodeRequest::new, ThreadPool.Names.MANAGEMENT, ThreadPool.Names.MANAGEMENT, ClusterStatsNodeResponse.class);
this.nodeService = nodeService; this.nodeService = nodeService;
this.indicesService = indicesService; this.indicesService = indicesService;
} }
@ -73,6 +74,8 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
@Override @Override
protected ClusterStatsResponse newResponse(ClusterStatsRequest request, protected ClusterStatsResponse newResponse(ClusterStatsRequest request,
List<ClusterStatsNodeResponse> responses, List<FailedNodeException> failures) { List<ClusterStatsNodeResponse> responses, List<FailedNodeException> failures) {
assert Transports.assertNotTransportThread("Constructor of ClusterStatsResponse runs expensive computations on mappings found in" +
" the cluster state that are too slow for a transport thread");
ClusterState state = clusterService.state(); ClusterState state = clusterService.state();
return new ClusterStatsResponse( return new ClusterStatsResponse(
System.currentTimeMillis(), System.currentTimeMillis(),

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.support.nodes;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction; import org.elasticsearch.action.support.HandledTransportAction;
@ -60,10 +61,24 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
protected final Class<NodeResponse> nodeResponseClass; protected final Class<NodeResponse> nodeResponseClass;
protected final String transportNodeAction; protected final String transportNodeAction;
private final String finalExecutor;
/**
* @param actionName action name
* @param threadPool thread-pool
* @param clusterService cluster service
* @param transportService transport service
* @param actionFilters action filters
* @param request node request writer
* @param nodeRequest node request reader
* @param nodeExecutor executor to execute node action on
* @param finalExecutor executor to execute final collection of all responses on
* @param nodeResponseClass class of the node responses
*/
protected TransportNodesAction(String actionName, ThreadPool threadPool, protected TransportNodesAction(String actionName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService, ActionFilters actionFilters, ClusterService clusterService, TransportService transportService, ActionFilters actionFilters,
Writeable.Reader<NodesRequest> request, Writeable.Reader<NodeRequest> nodeRequest, String nodeExecutor, Writeable.Reader<NodesRequest> request, Writeable.Reader<NodeRequest> nodeRequest, String nodeExecutor,
Class<NodeResponse> nodeResponseClass) { String finalExecutor, Class<NodeResponse> nodeResponseClass) {
super(actionName, transportService, actionFilters, request); super(actionName, transportService, actionFilters, request);
this.threadPool = threadPool; this.threadPool = threadPool;
this.clusterService = Objects.requireNonNull(clusterService); this.clusterService = Objects.requireNonNull(clusterService);
@ -71,11 +86,26 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
this.nodeResponseClass = Objects.requireNonNull(nodeResponseClass); this.nodeResponseClass = Objects.requireNonNull(nodeResponseClass);
this.transportNodeAction = actionName + "[n]"; this.transportNodeAction = actionName + "[n]";
this.finalExecutor = finalExecutor;
transportService.registerRequestHandler( transportService.registerRequestHandler(
transportNodeAction, nodeExecutor, nodeRequest, new NodeTransportHandler()); transportNodeAction, nodeExecutor, nodeRequest, new NodeTransportHandler());
} }
/**
* Same as {@link #TransportNodesAction(String, ThreadPool, ClusterService, TransportService, ActionFilters, Writeable.Reader,
* Writeable.Reader, String, String, Class)} but executes final response collection on the transport thread except for when the final
* node response is received from the local node, in which case {@code nodeExecutor} is used.
* This constructor should only be used for actions for which the creation of the final response is fast enough to be safely executed
* on a transport thread.
*/
protected TransportNodesAction(String actionName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService, ActionFilters actionFilters,
Writeable.Reader<NodesRequest> request, Writeable.Reader<NodeRequest> nodeRequest, String nodeExecutor,
Class<NodeResponse> nodeResponseClass) {
this(actionName, threadPool, clusterService, transportService, actionFilters, request, nodeRequest, nodeExecutor,
ThreadPool.Names.SAME, nodeResponseClass);
}
@Override @Override
protected void doExecute(Task task, NodesRequest request, ActionListener<NodesResponse> listener) { protected void doExecute(Task task, NodesRequest request, ActionListener<NodesResponse> listener) {
new AsyncAction(task, request, listener).start(); new AsyncAction(task, request, listener).start();
@ -230,15 +260,7 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
} }
private void finishHim() { private void finishHim() {
NodesResponse finalResponse; threadPool.executor(finalExecutor).execute(ActionRunnable.supply(listener, () -> newResponse(request, responses)));
try {
finalResponse = newResponse(request, responses);
} catch (Exception e) {
logger.debug("failed to combine responses from nodes", e);
listener.onFailure(e);
return;
}
listener.onResponse(finalResponse);
} }
} }