improve setting response/failure on nodes actions

we can use the index in the node ids list as the index for the array when we set the response or the exception, removing the need for an index AtomicInteger
This commit is contained in:
Shay Banon 2013-09-05 17:59:37 +02:00
parent f968ca317a
commit cebd3ca9c3
1 changed files with 14 additions and 21 deletions

View File

@ -102,17 +102,10 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
private class AsyncAction { private class AsyncAction {
private final Request request; private final Request request;
private final String[] nodesIds; private final String[] nodesIds;
private final ActionListener<Response> listener; private final ActionListener<Response> listener;
private final ClusterState clusterState; private final ClusterState clusterState;
private final AtomicReferenceArray<Object> responses; private final AtomicReferenceArray<Object> responses;
private final AtomicInteger index = new AtomicInteger();
private final AtomicInteger counter = new AtomicInteger(); private final AtomicInteger counter = new AtomicInteger();
private AsyncAction(Request request, ActionListener<Response> listener) { private AsyncAction(Request request, ActionListener<Response> listener) {
@ -140,7 +133,9 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
transportRequestOptions.withTimeout(request.timeout()); transportRequestOptions.withTimeout(request.timeout());
} }
transportRequestOptions.withCompress(transportCompress()); transportRequestOptions.withCompress(transportCompress());
for (final String nodeId : nodesIds) { for (int i = 0; i < nodesIds.length; i++) {
final String nodeId = nodesIds[i];
final int idx = i;
final DiscoveryNode node = clusterState.nodes().nodes().get(nodeId); final DiscoveryNode node = clusterState.nodes().nodes().get(nodeId);
try { try {
if (nodeId.equals("_local") || nodeId.equals(clusterState.nodes().localNodeId())) { if (nodeId.equals("_local") || nodeId.equals(clusterState.nodes().localNodeId())) {
@ -148,9 +143,9 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
@Override @Override
public void run() { public void run() {
try { try {
onOperation(nodeOperation(newNodeRequest(clusterState.nodes().localNodeId(), request))); onOperation(idx, nodeOperation(newNodeRequest(clusterState.nodes().localNodeId(), request)));
} catch (Throwable e) { } catch (Throwable e) {
onFailure(clusterState.nodes().localNodeId(), e); onFailure(idx, clusterState.nodes().localNodeId(), e);
} }
} }
}); });
@ -159,15 +154,15 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
@Override @Override
public void run() { public void run() {
try { try {
onOperation(nodeOperation(newNodeRequest(clusterState.nodes().masterNodeId(), request))); onOperation(idx, nodeOperation(newNodeRequest(clusterState.nodes().masterNodeId(), request)));
} catch (Throwable e) { } catch (Throwable e) {
onFailure(clusterState.nodes().masterNodeId(), e); onFailure(idx, clusterState.nodes().masterNodeId(), e);
} }
} }
}); });
} else { } else {
if (node == null) { if (node == null) {
onFailure(nodeId, new NoSuchNodeException(nodeId)); onFailure(idx, nodeId, new NoSuchNodeException(nodeId));
} else { } else {
NodeRequest nodeRequest = newNodeRequest(nodeId, request); NodeRequest nodeRequest = newNodeRequest(nodeId, request);
transportService.sendRequest(node, transportNodeAction, nodeRequest, transportRequestOptions, new BaseTransportResponseHandler<NodeResponse>() { transportService.sendRequest(node, transportNodeAction, nodeRequest, transportRequestOptions, new BaseTransportResponseHandler<NodeResponse>() {
@ -178,12 +173,12 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
@Override @Override
public void handleResponse(NodeResponse response) { public void handleResponse(NodeResponse response) {
onOperation(response); onOperation(idx, response);
} }
@Override @Override
public void handleException(TransportException exp) { public void handleException(TransportException exp) {
onFailure(node.id(), exp); onFailure(idx, node.id(), exp);
} }
@Override @Override
@ -194,24 +189,22 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
} }
} }
} catch (Throwable t) { } catch (Throwable t) {
onFailure(nodeId, t); onFailure(idx, nodeId, t);
} }
} }
} }
private void onOperation(NodeResponse nodeResponse) { private void onOperation(int idx, NodeResponse nodeResponse) {
// need two counters to avoid race conditions responses.set(idx, nodeResponse);
responses.set(index.getAndIncrement(), nodeResponse);
if (counter.incrementAndGet() == responses.length()) { if (counter.incrementAndGet() == responses.length()) {
finishHim(); finishHim();
} }
} }
private void onFailure(String nodeId, Throwable t) { private void onFailure(int idx, String nodeId, Throwable t) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("failed to execute on node [{}]", t, nodeId); logger.debug("failed to execute on node [{}]", t, nodeId);
} }
int idx = index.getAndIncrement();
if (accumulateExceptions()) { if (accumulateExceptions()) {
responses.set(idx, new FailedNodeException(nodeId, "Failed node [" + nodeId + "]", t)); responses.set(idx, new FailedNodeException(nodeId, "Failed node [" + nodeId + "]", t));
} }