diff --git a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index f4985c24233..59b3d871075 100644 --- a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -23,6 +23,7 @@ import org.apache.lucene.store.StoreRateLimiting; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterStateListener; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; @@ -37,7 +38,7 @@ import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.settings.NodeSettingsService; -import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; import java.io.File; @@ -78,6 +79,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe private final IndicesService indicesService; private final ClusterService clusterService; + private final TransportService transportService; private volatile String rateLimitingType; private volatile ByteSizeValue rateLimitingThrottle; @@ -86,12 +88,15 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe private final ApplySettings applySettings = new ApplySettings(); @Inject - public IndicesStore(Settings settings, NodeEnvironment nodeEnv, NodeSettingsService nodeSettingsService, IndicesService indicesService, ClusterService clusterService, ThreadPool threadPool) { + public IndicesStore(Settings settings, NodeEnvironment nodeEnv, NodeSettingsService nodeSettingsService, IndicesService indicesService, + ClusterService clusterService, TransportService transportService) { super(settings); this.nodeEnv = nodeEnv; this.nodeSettingsService = nodeSettingsService; this.indicesService = indicesService; this.clusterService = clusterService; + this.transportService = transportService; + // we limit with 20MB / sec by default with a default type set to merge sice 0.90.1 this.rateLimitingType = componentSettings.get("throttle.type", StoreRateLimiting.Type.MERGE.name()); rateLimiting.setType(rateLimitingType); @@ -141,15 +146,17 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe break; } - // if the allocated or relocation node id doesn't exists in the cluster state, its a stale - // node, make sure we don't do anything with this until the routing table has properly been + // if the allocated or relocation node id doesn't exists in the cluster state or we're not connected to it + // it may be a stale node, make sure we don't do anything with this until the routing table has properly been // rerouted to reflect the fact that the node does not exists - if (!event.state().nodes().nodeExists(shardRouting.currentNodeId())) { + DiscoveryNode node = event.state().nodes().get(shardRouting.currentNodeId()); + if (node == null || !transportService.nodeConnected(node)) { shardCanBeDeleted = false; break; } if (shardRouting.relocatingNodeId() != null) { - if (!event.state().nodes().nodeExists(shardRouting.relocatingNodeId())) { + node = event.state().nodes().get(shardRouting.relocatingNodeId()); + if (node == null || !transportService.nodeConnected(node)) { shardCanBeDeleted = false; break; }