Before deleting a local unused shard copy, verify we're connected to the node it's supposed to be on

This is yet another safety guard to make sure we don't delete data if the local copy is the only one (even if it's not part of the cluster state any more)

Closes #6191
This commit is contained in:
Boaz Leskes 2014-05-14 11:37:13 +02:00
parent 541acc7e9b
commit 05d131c39d
1 changed files with 13 additions and 6 deletions

View File

@ -23,6 +23,7 @@ import org.apache.lucene.store.StoreRateLimiting;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
@ -37,7 +38,7 @@ import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService;
import java.io.File; import java.io.File;
@ -78,6 +79,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
private final IndicesService indicesService; private final IndicesService indicesService;
private final ClusterService clusterService; private final ClusterService clusterService;
private final TransportService transportService;
private volatile String rateLimitingType; private volatile String rateLimitingType;
private volatile ByteSizeValue rateLimitingThrottle; private volatile ByteSizeValue rateLimitingThrottle;
@ -86,12 +88,15 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
private final ApplySettings applySettings = new ApplySettings(); private final ApplySettings applySettings = new ApplySettings();
@Inject @Inject
public IndicesStore(Settings settings, NodeEnvironment nodeEnv, NodeSettingsService nodeSettingsService, IndicesService indicesService, ClusterService clusterService, ThreadPool threadPool) { public IndicesStore(Settings settings, NodeEnvironment nodeEnv, NodeSettingsService nodeSettingsService, IndicesService indicesService,
ClusterService clusterService, TransportService transportService) {
super(settings); super(settings);
this.nodeEnv = nodeEnv; this.nodeEnv = nodeEnv;
this.nodeSettingsService = nodeSettingsService; this.nodeSettingsService = nodeSettingsService;
this.indicesService = indicesService; this.indicesService = indicesService;
this.clusterService = clusterService; this.clusterService = clusterService;
this.transportService = transportService;
// we limit with 20MB / sec by default with a default type set to merge sice 0.90.1 // we limit with 20MB / sec by default with a default type set to merge sice 0.90.1
this.rateLimitingType = componentSettings.get("throttle.type", StoreRateLimiting.Type.MERGE.name()); this.rateLimitingType = componentSettings.get("throttle.type", StoreRateLimiting.Type.MERGE.name());
rateLimiting.setType(rateLimitingType); rateLimiting.setType(rateLimitingType);
@ -141,15 +146,17 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
break; break;
} }
// if the allocated or relocation node id doesn't exists in the cluster state, its a stale // if the allocated or relocation node id doesn't exists in the cluster state or we're not connected to it
// node, make sure we don't do anything with this until the routing table has properly been // it may be a stale node, make sure we don't do anything with this until the routing table has properly been
// rerouted to reflect the fact that the node does not exists // rerouted to reflect the fact that the node does not exists
if (!event.state().nodes().nodeExists(shardRouting.currentNodeId())) { DiscoveryNode node = event.state().nodes().get(shardRouting.currentNodeId());
if (node == null || !transportService.nodeConnected(node)) {
shardCanBeDeleted = false; shardCanBeDeleted = false;
break; break;
} }
if (shardRouting.relocatingNodeId() != null) { if (shardRouting.relocatingNodeId() != null) {
if (!event.state().nodes().nodeExists(shardRouting.relocatingNodeId())) { node = event.state().nodes().get(shardRouting.relocatingNodeId());
if (node == null || !transportService.nodeConnected(node)) {
shardCanBeDeleted = false; shardCanBeDeleted = false;
break; break;
} }