diff --git a/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index f606f02a47c..272655d5d83 100644 --- a/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -118,8 +118,8 @@ public class GatewayAllocator extends AbstractComponent { if (!routingNodes.routingTable().index(shard.index()).shard(shard.id()).primaryAllocatedPostApi()) { continue; } - - ObjectLongOpenHashMap nodesState = buildShardStates(nodes, shard); + final String indexUUID = allocation.metaData().index(shard.index()).getUUID(); + ObjectLongOpenHashMap nodesState = buildShardStates(nodes, shard, indexUUID); int numberOfAllocationsFound = 0; long highestVersion = -1; @@ -370,7 +370,7 @@ public class GatewayAllocator extends AbstractComponent { return changed; } - private ObjectLongOpenHashMap buildShardStates(final DiscoveryNodes nodes, MutableShardRouting shard) { + private ObjectLongOpenHashMap buildShardStates(final DiscoveryNodes nodes, MutableShardRouting shard, String indexUUID) { ObjectLongOpenHashMap shardStates = cachedShardsState.get(shard.shardId()); ObjectOpenHashSet nodeIds; if (shardStates == null) { @@ -399,7 +399,7 @@ public class GatewayAllocator extends AbstractComponent { } String[] nodesIdsArray = nodeIds.toArray(String.class); - TransportNodesListGatewayStartedShards.NodesGatewayStartedShards response = listGatewayStartedShards.list(shard.shardId(), nodesIdsArray, listTimeout).actionGet(); + TransportNodesListGatewayStartedShards.NodesGatewayStartedShards response = listGatewayStartedShards.list(shard.shardId(), indexUUID, nodesIdsArray, listTimeout).actionGet(); logListActionFailures(shard, "state", response.failures()); for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : response) { diff --git a/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java b/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java index 26b1de7fb34..96e5748d088 100644 --- a/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.nodes.*; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterService; +import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; @@ -41,6 +42,8 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicReferenceArray; @@ -59,8 +62,8 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat this.nodeEnv = env; } - public ActionFuture list(ShardId shardId, String[] nodesIds, @Nullable TimeValue timeout) { - return execute(new Request(shardId, nodesIds).timeout(timeout)); + public ActionFuture list(ShardId shardId, String indexUUID, String[] nodesIds, @Nullable TimeValue timeout) { + return execute(new Request(shardId, indexUUID, nodesIds).timeout(timeout)); } @Override @@ -114,13 +117,22 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat @Override protected NodeGatewayStartedShards nodeOperation(NodeRequest request) throws ElasticsearchException { try { - logger.trace("{} loading local shard state info", request.shardId); + final ShardId shardId = request.getShardId(); + final String indexUUID = request.getIndexUUID(); + logger.trace("{} loading local shard state info", shardId); ShardStateMetaData shardStateMetaData = ShardStateMetaData.load(logger, request.shardId, nodeEnv.shardPaths(request.shardId)); if (shardStateMetaData != null) { - logger.debug("{} shard state info found: [{}]", request.shardId, shardStateMetaData); - return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version); + // old shard metadata doesn't have the actual index UUID so we need to check if the actual uuid in the metadata + // is equal to IndexMetaData.INDEX_UUID_NA_VALUE otherwise this shard doesn't belong to the requested index. + if (indexUUID.equals(shardStateMetaData.indexUUID) == false + && IndexMetaData.INDEX_UUID_NA_VALUE.equals(shardStateMetaData.indexUUID) == false) { + logger.warn("{} shard state info found but indexUUID didn't match expected [{}] actual [{}]", shardId, indexUUID, shardStateMetaData.indexUUID); + } else { + logger.debug("{} shard state info found: [{}]", shardId, shardStateMetaData); + return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version); + } } - logger.trace("{} no local shard info found", request.shardId); + logger.trace("{} no local shard info found", shardId); return new NodeGatewayStartedShards(clusterService.localNode(), -1); } catch (Exception e) { throw new ElasticsearchException("failed to load started shards", e); @@ -135,18 +147,15 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat static class Request extends NodesOperationRequest { private ShardId shardId; + private String indexUUID; public Request() { } - public Request(ShardId shardId, Set nodesIds) { - super(nodesIds.toArray(new String[nodesIds.size()])); - this.shardId = shardId; - } - - public Request(ShardId shardId, String... nodesIds) { + public Request(ShardId shardId, String indexUUID, String[] nodesIds) { super(nodesIds); this.shardId = shardId; + this.indexUUID = indexUUID; } public ShardId shardId() { @@ -157,12 +166,18 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat public void readFrom(StreamInput in) throws IOException { super.readFrom(in); shardId = ShardId.readShardId(in); + indexUUID = in.readString(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); shardId.writeTo(out); + out.writeString(indexUUID); + } + + public String getIndexUUID() { + return indexUUID; } } @@ -202,7 +217,8 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat static class NodeRequest extends NodeOperationRequest { - ShardId shardId; + private ShardId shardId; + private String indexUUID; NodeRequest() { } @@ -210,18 +226,29 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat NodeRequest(String nodeId, TransportNodesListGatewayStartedShards.Request request) { super(request, nodeId); this.shardId = request.shardId(); + this.indexUUID = request.getIndexUUID(); } @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); shardId = ShardId.readShardId(in); + indexUUID = in.readString(); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); shardId.writeTo(out); + out.writeString(indexUUID); + } + + public ShardId getShardId() { + return shardId; + } + + public String getIndexUUID() { + return indexUUID; } }