[ALLOCATION] Verify shards index UUID when fetching started shards

Today we simply fetch the shards metadata without verifying the
index UUID the shard belongs to. We recently added this UUID
to the shard state metadata. This commit adds verification
to the shard metadata fetching to prevent bringing shards
back into an index it doesn't belong to due to name collisions.
This commit is contained in:
Simon Willnauer 2015-03-15 17:50:57 -07:00
parent 911f522a0e
commit cae2707375
2 changed files with 44 additions and 17 deletions

View File

@ -118,8 +118,8 @@ public class GatewayAllocator extends AbstractComponent {
if (!routingNodes.routingTable().index(shard.index()).shard(shard.id()).primaryAllocatedPostApi()) { if (!routingNodes.routingTable().index(shard.index()).shard(shard.id()).primaryAllocatedPostApi()) {
continue; continue;
} }
final String indexUUID = allocation.metaData().index(shard.index()).getUUID();
ObjectLongOpenHashMap<DiscoveryNode> nodesState = buildShardStates(nodes, shard); ObjectLongOpenHashMap<DiscoveryNode> nodesState = buildShardStates(nodes, shard, indexUUID);
int numberOfAllocationsFound = 0; int numberOfAllocationsFound = 0;
long highestVersion = -1; long highestVersion = -1;
@ -370,7 +370,7 @@ public class GatewayAllocator extends AbstractComponent {
return changed; return changed;
} }
private ObjectLongOpenHashMap<DiscoveryNode> buildShardStates(final DiscoveryNodes nodes, MutableShardRouting shard) { private ObjectLongOpenHashMap<DiscoveryNode> buildShardStates(final DiscoveryNodes nodes, MutableShardRouting shard, String indexUUID) {
ObjectLongOpenHashMap<DiscoveryNode> shardStates = cachedShardsState.get(shard.shardId()); ObjectLongOpenHashMap<DiscoveryNode> shardStates = cachedShardsState.get(shard.shardId());
ObjectOpenHashSet<String> nodeIds; ObjectOpenHashSet<String> nodeIds;
if (shardStates == null) { if (shardStates == null) {
@ -399,7 +399,7 @@ public class GatewayAllocator extends AbstractComponent {
} }
String[] nodesIdsArray = nodeIds.toArray(String.class); String[] nodesIdsArray = nodeIds.toArray(String.class);
TransportNodesListGatewayStartedShards.NodesGatewayStartedShards response = listGatewayStartedShards.list(shard.shardId(), nodesIdsArray, listTimeout).actionGet(); TransportNodesListGatewayStartedShards.NodesGatewayStartedShards response = listGatewayStartedShards.list(shard.shardId(), indexUUID, nodesIdsArray, listTimeout).actionGet();
logListActionFailures(shard, "state", response.failures()); logListActionFailures(shard, "state", response.failures());
for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : response) { for (TransportNodesListGatewayStartedShards.NodeGatewayStartedShards nodeShardState : response) {

View File

@ -27,6 +27,7 @@ import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.nodes.*; import org.elasticsearch.action.support.nodes.*;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -41,6 +42,8 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.atomic.AtomicReferenceArray;
@ -59,8 +62,8 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
this.nodeEnv = env; this.nodeEnv = env;
} }
public ActionFuture<NodesGatewayStartedShards> list(ShardId shardId, String[] nodesIds, @Nullable TimeValue timeout) { public ActionFuture<NodesGatewayStartedShards> list(ShardId shardId, String indexUUID, String[] nodesIds, @Nullable TimeValue timeout) {
return execute(new Request(shardId, nodesIds).timeout(timeout)); return execute(new Request(shardId, indexUUID, nodesIds).timeout(timeout));
} }
@Override @Override
@ -114,13 +117,22 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
@Override @Override
protected NodeGatewayStartedShards nodeOperation(NodeRequest request) throws ElasticsearchException { protected NodeGatewayStartedShards nodeOperation(NodeRequest request) throws ElasticsearchException {
try { try {
logger.trace("{} loading local shard state info", request.shardId); final ShardId shardId = request.getShardId();
final String indexUUID = request.getIndexUUID();
logger.trace("{} loading local shard state info", shardId);
ShardStateMetaData shardStateMetaData = ShardStateMetaData.load(logger, request.shardId, nodeEnv.shardPaths(request.shardId)); ShardStateMetaData shardStateMetaData = ShardStateMetaData.load(logger, request.shardId, nodeEnv.shardPaths(request.shardId));
if (shardStateMetaData != null) { if (shardStateMetaData != null) {
logger.debug("{} shard state info found: [{}]", request.shardId, shardStateMetaData); // old shard metadata doesn't have the actual index UUID so we need to check if the actual uuid in the metadata
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version); // is equal to IndexMetaData.INDEX_UUID_NA_VALUE otherwise this shard doesn't belong to the requested index.
if (indexUUID.equals(shardStateMetaData.indexUUID) == false
&& IndexMetaData.INDEX_UUID_NA_VALUE.equals(shardStateMetaData.indexUUID) == false) {
logger.warn("{} shard state info found but indexUUID didn't match expected [{}] actual [{}]", shardId, indexUUID, shardStateMetaData.indexUUID);
} else {
logger.debug("{} shard state info found: [{}]", shardId, shardStateMetaData);
return new NodeGatewayStartedShards(clusterService.localNode(), shardStateMetaData.version);
}
} }
logger.trace("{} no local shard info found", request.shardId); logger.trace("{} no local shard info found", shardId);
return new NodeGatewayStartedShards(clusterService.localNode(), -1); return new NodeGatewayStartedShards(clusterService.localNode(), -1);
} catch (Exception e) { } catch (Exception e) {
throw new ElasticsearchException("failed to load started shards", e); throw new ElasticsearchException("failed to load started shards", e);
@ -135,18 +147,15 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
static class Request extends NodesOperationRequest<Request> { static class Request extends NodesOperationRequest<Request> {
private ShardId shardId; private ShardId shardId;
private String indexUUID;
public Request() { public Request() {
} }
public Request(ShardId shardId, Set<String> nodesIds) { public Request(ShardId shardId, String indexUUID, String[] nodesIds) {
super(nodesIds.toArray(new String[nodesIds.size()]));
this.shardId = shardId;
}
public Request(ShardId shardId, String... nodesIds) {
super(nodesIds); super(nodesIds);
this.shardId = shardId; this.shardId = shardId;
this.indexUUID = indexUUID;
} }
public ShardId shardId() { public ShardId shardId() {
@ -157,12 +166,18 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
shardId = ShardId.readShardId(in); shardId = ShardId.readShardId(in);
indexUUID = in.readString();
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
shardId.writeTo(out); shardId.writeTo(out);
out.writeString(indexUUID);
}
public String getIndexUUID() {
return indexUUID;
} }
} }
@ -202,7 +217,8 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
static class NodeRequest extends NodeOperationRequest { static class NodeRequest extends NodeOperationRequest {
ShardId shardId; private ShardId shardId;
private String indexUUID;
NodeRequest() { NodeRequest() {
} }
@ -210,18 +226,29 @@ public class TransportNodesListGatewayStartedShards extends TransportNodesOperat
NodeRequest(String nodeId, TransportNodesListGatewayStartedShards.Request request) { NodeRequest(String nodeId, TransportNodesListGatewayStartedShards.Request request) {
super(request, nodeId); super(request, nodeId);
this.shardId = request.shardId(); this.shardId = request.shardId();
this.indexUUID = request.getIndexUUID();
} }
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
shardId = ShardId.readShardId(in); shardId = ShardId.readShardId(in);
indexUUID = in.readString();
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
shardId.writeTo(out); shardId.writeTo(out);
out.writeString(indexUUID);
}
public ShardId getShardId() {
return shardId;
}
public String getIndexUUID() {
return indexUUID;
} }
} }