diff --git a/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index 065f4ad744a..50e84379059 100644 --- a/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -83,6 +83,11 @@ public abstract class TransportNodesAction(this.nodesIds.length); } diff --git a/core/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java b/core/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java index 4573e2dac12..d937fa614e0 100644 --- a/core/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java +++ b/core/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java @@ -165,19 +165,29 @@ public abstract class AsyncShardFetch implements Rel protected synchronized void processAsyncFetch(ShardId shardId, T[] responses, FailedNodeException[] failures) { if (closed) { // we are closed, no need to process this async fetch at all + logger.trace("{} ignoring fetched [{}] results, already closed", shardId, type); return; } + logger.trace("{} processing fetched [{}] results", shardId, type); + if (responses != null) { for (T response : responses) { NodeEntry nodeEntry = cache.get(response.getNode().getId()); // if the entry is there, and not marked as failed already, process it - if (nodeEntry != null && nodeEntry.isFailed() == false) { + if (nodeEntry == null) { + continue; + } + if (nodeEntry.isFailed()) { + logger.trace("{} node {} has failed for [{}] (failure [{}])", shardId, nodeEntry.getNodeId(), type, nodeEntry.getFailure()); + } else { + logger.trace("{} marking {} as done for [{}]", shardId, nodeEntry.getNodeId(), type); nodeEntry.doneFetching(response); } } } if (failures != null) { for (FailedNodeException failure : failures) { + logger.trace("{} processing failure {} for [{}]", shardId, failure, type); NodeEntry nodeEntry = cache.get(failure.nodeId()); // if the entry is there, and not marked as failed already, process it if (nodeEntry != null && nodeEntry.isFailed() == false) { @@ -253,6 +263,7 @@ public abstract class AsyncShardFetch implements Rel // visible for testing void asyncFetch(final ShardId shardId, final String[] nodesIds, final MetaData metaData) { IndexMetaData indexMetaData = metaData.index(shardId.getIndex()); + logger.trace("{} fetching [{}] from {}", shardId, type, nodesIds); action.list(shardId, indexMetaData, nodesIds, new ActionListener>() { @Override public void onResponse(BaseNodesResponse response) { diff --git a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java index d5692b3a5aa..8d9a7960f6e 100644 --- a/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/core/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.nodes.*; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.Inject; @@ -68,6 +69,13 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesAction execute(new Request(shardId, indexMetaData.getUUID(), nodesIds), listener); } + @Override + protected String[] resolveNodes(Request request, ClusterState clusterState) { + // default implementation may filter out non existent nodes. it's important to keep exactly the ids + // we were given for accounting on the caller + return request.nodesIds(); + } + @Override protected boolean transportCompress() { return true; // this can become big... diff --git a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java index c574594040e..f73576fef79 100644 --- a/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java +++ b/core/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetaData.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.nodes.*; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.inject.Inject; @@ -81,6 +82,13 @@ public class TransportNodesListShardStoreMetaData extends TransportNodesAction