mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 02:14:54 +00:00
Always Accumulate Transport Exceptions (#25017)
This removes the `accumulateExceptions()` method (and its usage) from `TransportNodesAction` and `TransportTasksAction`, forcing both transport actions to always accumulate exceptions. Without this change, some transport actions, like `TransportNodesStatsAction` would respond in very unexpected ways by returning no response due to some failure, but instead of returning an error the response would simply be empty: no response and no error. This results in a very trappy response structure where users can check for an error, then attempt to blindly use the response when no error is returned.
This commit is contained in:
parent
5f3ed99c71
commit
6464add551
@ -24,7 +24,6 @@ import org.elasticsearch.action.FailedNodeException;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.nodes.BaseNodeRequest;
|
||||
import org.elasticsearch.action.support.nodes.TransportNodesAction;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
@ -82,11 +81,6 @@ public class TransportNodesHotThreadsAction extends TransportNodesAction<NodesHo
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean accumulateExceptions() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public static class NodeRequest extends BaseNodeRequest {
|
||||
|
||||
NodesHotThreadsRequest request;
|
||||
|
@ -76,11 +76,6 @@ public class TransportNodesInfoAction extends TransportNodesAction<NodesInfoRequ
|
||||
request.transport(), request.http(), request.plugins(), request.ingest(), request.indices());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean accumulateExceptions() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public static class NodeInfoRequest extends BaseNodeRequest {
|
||||
|
||||
NodesInfoRequest request;
|
||||
|
@ -76,11 +76,6 @@ public class TransportNodesStatsAction extends TransportNodesAction<NodesStatsRe
|
||||
request.ingest());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean accumulateExceptions() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public static class NodeStatsRequest extends BaseNodeRequest {
|
||||
|
||||
NodesStatsRequest request;
|
||||
|
@ -167,12 +167,6 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected boolean accumulateExceptions() {
|
||||
return true;
|
||||
}
|
||||
|
||||
private void setBanOnNodes(String reason, CancellableTask task, DiscoveryNodes nodes, ActionListener<Void> listener) {
|
||||
sendSetBanRequest(nodes,
|
||||
BanParentTaskRequest.createSetBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId()), reason),
|
||||
|
@ -90,8 +90,4 @@ public class TransportListTasksAction extends TransportTasksAction<Task, ListTas
|
||||
super.processTasks(request, operation);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean accumulateExceptions() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -122,11 +122,6 @@ public class TransportNodesSnapshotsStatus extends TransportNodesAction<Transpor
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean accumulateExceptions() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public static class Request extends BaseNodesRequest<Request> {
|
||||
|
||||
private Snapshot[] snapshots;
|
||||
|
@ -118,11 +118,6 @@ public class TransportClusterStatsAction extends TransportNodesAction<ClusterSta
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean accumulateExceptions() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public static class ClusterStatsNodeRequest extends BaseNodeRequest {
|
||||
|
||||
ClusterStatsRequest request;
|
||||
|
@ -106,16 +106,11 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
|
||||
final List<NodeResponse> responses = new ArrayList<>();
|
||||
final List<FailedNodeException> failures = new ArrayList<>();
|
||||
|
||||
final boolean accumulateExceptions = accumulateExceptions();
|
||||
for (int i = 0; i < nodesResponses.length(); ++i) {
|
||||
Object response = nodesResponses.get(i);
|
||||
|
||||
if (response instanceof FailedNodeException) {
|
||||
if (accumulateExceptions) {
|
||||
failures.add((FailedNodeException)response);
|
||||
} else {
|
||||
logger.warn("not accumulating exceptions, excluding exception from response", (FailedNodeException)response);
|
||||
}
|
||||
failures.add((FailedNodeException)response);
|
||||
} else {
|
||||
responses.add(nodeResponseClass.cast(response));
|
||||
}
|
||||
@ -145,8 +140,6 @@ public abstract class TransportNodesAction<NodesRequest extends BaseNodesRequest
|
||||
return nodeOperation(request);
|
||||
}
|
||||
|
||||
protected abstract boolean accumulateExceptions();
|
||||
|
||||
/**
|
||||
* resolve node ids to concrete nodes of the incoming request
|
||||
**/
|
||||
|
@ -226,8 +226,6 @@ public abstract class TransportTasksAction<
|
||||
return false;
|
||||
}
|
||||
|
||||
protected abstract boolean accumulateExceptions();
|
||||
|
||||
private class AsyncAction {
|
||||
|
||||
private final TasksRequest request;
|
||||
@ -321,9 +319,9 @@ public abstract class TransportTasksAction<
|
||||
(org.apache.logging.log4j.util.Supplier<?>)
|
||||
() -> new ParameterizedMessage("failed to execute on node [{}]", nodeId), t);
|
||||
}
|
||||
if (accumulateExceptions()) {
|
||||
responses.set(idx, new FailedNodeException(nodeId, "Failed node [" + nodeId + "]", t));
|
||||
}
|
||||
|
||||
responses.set(idx, new FailedNodeException(nodeId, "Failed node [" + nodeId + "]", t));
|
||||
|
||||
if (counter.incrementAndGet() == responses.length()) {
|
||||
finishHim();
|
||||
}
|
||||
|
@ -100,11 +100,6 @@ public class TransportNodesListGatewayMetaState extends TransportNodesAction<Tra
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean accumulateExceptions() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public static class Request extends BaseNodesRequest<Request> {
|
||||
|
||||
public Request() {
|
||||
|
@ -172,11 +172,6 @@ public class TransportNodesListGatewayStartedShards extends
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean accumulateExceptions() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public static class Request extends BaseNodesRequest<Request> {
|
||||
|
||||
private ShardId shardId;
|
||||
|
@ -159,11 +159,6 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction<T
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean accumulateExceptions() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public static class StoreFilesMetaData implements Iterable<StoreFileMetaData>, Streamable {
|
||||
private ShardId shardId;
|
||||
Store.MetadataSnapshot metadataSnapshot;
|
||||
|
@ -164,10 +164,6 @@ public abstract class TaskManagerTestCase extends ESTestCase {
|
||||
@Override
|
||||
protected abstract NodeResponse nodeOperation(NodeRequest request);
|
||||
|
||||
@Override
|
||||
protected boolean accumulateExceptions() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestNode implements Releasable {
|
||||
|
@ -313,10 +313,6 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin {
|
||||
throw new UnsupportedOperationException("the task parameter is required");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean accumulateExceptions() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public static class TestTaskAction extends Action<NodesRequest, NodesResponse, NodesRequestBuilder> {
|
||||
@ -453,10 +449,6 @@ public class TestTaskPlugin extends Plugin implements ActionPlugin {
|
||||
listener.onResponse(new UnblockTestTaskResponse());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean accumulateExceptions() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public static class UnblockTestTasksAction extends Action<UnblockTestTasksRequest, UnblockTestTasksResponse,
|
||||
|
@ -270,10 +270,6 @@ public class TransportTasksActionTests extends TaskManagerTestCase {
|
||||
return new TestTaskResponse(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean accumulateExceptions() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
private ActionFuture<NodesResponse> startBlockingTestNodesAction(CountDownLatch checkLatch) throws InterruptedException {
|
||||
|
@ -57,7 +57,6 @@ import java.util.function.Supplier;
|
||||
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.setState;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
public class TransportNodesActionTests extends ESTestCase {
|
||||
|
||||
@ -275,10 +274,6 @@ public class TransportNodesActionTests extends ESTestCase {
|
||||
return new TestNodeResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean accumulateExceptions() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private static class DataNodesOnlyTransportNodesAction
|
||||
|
@ -87,8 +87,4 @@ public class TransportRethrottleAction extends TransportTasksAction<BulkByScroll
|
||||
return new ListTasksResponse(tasks, taskOperationFailures, failedNodeExceptions);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean accumulateExceptions() {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user