diff --git a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index 18042977f56..9d030b33ced 100644 --- a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -24,7 +24,9 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.FileSystemUtils; @@ -37,8 +39,6 @@ import org.elasticsearch.indices.IndicesService; import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.threadpool.ThreadPool; -import java.io.File; - /** * */ @@ -81,8 +81,6 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe private final ClusterService clusterService; - private final ThreadPool threadPool; - private volatile String rateLimitingType; private volatile ByteSizeValue rateLimitingThrottle; private final StoreRateLimiting rateLimiting = new StoreRateLimiting(); @@ -96,7 +94,6 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe this.nodeSettingsService = nodeSettingsService; this.indicesService = indicesService; this.clusterService = clusterService; - this.threadPool = threadPool; this.rateLimitingType = componentSettings.get("throttle.type", "none"); rateLimiting.setType(rateLimitingType); @@ -128,55 +125,20 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe return; } - // when all shards are started within a shard replication group, delete an unallocated shard on this node - RoutingTable routingTable = event.state().routingTable(); - for (IndexRoutingTable indexRoutingTable : routingTable) { - IndexService indexService = indicesService.indexService(indexRoutingTable.index()); - if (indexService == null) { - // we handle this later... - continue; - } - // if the store is not persistent, don't bother trying to check if it can be deleted - if (!indexService.store().persistent()) { - continue; - } + for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) { + // Note, closed indices will not have any routing information, so won't be deleted for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { - // if it has been created on this node, we don't want to delete it - if (indexService.hasShard(indexShardRoutingTable.shardId().id())) { - continue; - } - if (!indexService.store().canDeleteUnallocated(indexShardRoutingTable.shardId())) { - continue; - } - // only delete an unallocated shard if all (other shards) are started - int startedShardsCount = indexShardRoutingTable.countWithState(ShardRoutingState.STARTED); - if (startedShardsCount > 0 && startedShardsCount == indexShardRoutingTable.size()) { - if (logger.isDebugEnabled()) { - logger.debug("[{}][{}] deleting unallocated shard", indexShardRoutingTable.shardId().index().name(), indexShardRoutingTable.shardId().id()); - } - try { - indexService.store().deleteUnallocated(indexShardRoutingTable.shardId()); - } catch (Exception e) { - logger.debug("[{}][{}] failed to delete unallocated shard, ignoring", e, indexShardRoutingTable.shardId().index().name(), indexShardRoutingTable.shardId().id()); - } - } - } - } - - // do the reverse, and delete dangling shards that might remain on that node - // but are allocated on other nodes - if (nodeEnv.hasNodeFile()) { - // delete unused shards for existing indices - for (IndexRoutingTable indexRoutingTable : routingTable) { - IndexService indexService = indicesService.indexService(indexRoutingTable.index()); - if (indexService != null) { // allocated, ignore this - continue; - } - for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { - boolean shardCanBeDeleted = true; + ShardId shardId = indexShardRoutingTable.shardId(); + // a shard can be deleted if all its copies are active, and its not allocated on this node + boolean shardCanBeDeleted = true; + if (indexShardRoutingTable.size() == 0) { + // should not really happen, there should always be at least 1 (primary) shard in a + // shard replication group, in any case, protected from deleting something by mistake + shardCanBeDeleted = false; + } else { for (ShardRouting shardRouting : indexShardRoutingTable) { - // don't delete a shard that not all instances are active - if (!shardRouting.active()) { + // be conservative here, check on started, not even active + if (!shardRouting.started()) { shardCanBeDeleted = false; break; } @@ -188,13 +150,30 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe break; } } - if (shardCanBeDeleted) { - ShardId shardId = indexShardRoutingTable.shardId(); - for (File shardLocation : nodeEnv.shardLocations(shardId)) { - if (shardLocation.exists()) { + } + if (shardCanBeDeleted) { + IndexService indexService = indicesService.indexService(indexRoutingTable.index()); + if (indexService == null) { + // not physical allocation of the index, delete it from the file system if applicable + if (nodeEnv.hasNodeFile()) { + logger.debug("[{}][{}] deleting shard that is no longer used", shardId.index().name(), shardId.id()); + FileSystemUtils.deleteRecursively(nodeEnv.shardLocations(shardId)); + } + } else { + if (!indexService.hasShard(shardId.id())) { + if (indexService.store().canDeleteUnallocated(shardId)) { logger.debug("[{}][{}] deleting shard that is no longer used", shardId.index().name(), shardId.id()); - FileSystemUtils.deleteRecursively(shardLocation); + try { + indexService.store().deleteUnallocated(indexShardRoutingTable.shardId()); + } catch (Exception e) { + logger.debug("[{}][{}] failed to delete unallocated shard, ignoring", e, indexShardRoutingTable.shardId().index().name(), indexShardRoutingTable.shardId().id()); + } } + } else { + // this state is weird, should we log? + // basically, it means that the shard is not allocated on this node using the routing + // but its still physically exists on an IndexService + // Note, this listener should run after IndicesClusterStateService... } } }