simplify cleaning up non allocation shard code

its pretty complex today, simplify it by doing a single iteration, and handling both dangling shard cleanup on existing indices and shards that have been reallocated to a differetn node
This commit is contained in:
Shay Banon 2012-10-11 18:46:24 -07:00
parent 320c9b7681
commit 27562270ca
1 changed files with 37 additions and 58 deletions

View File

@ -24,7 +24,9 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.FileSystemUtils;
@ -37,8 +39,6 @@ import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.File;
/** /**
* *
*/ */
@ -81,8 +81,6 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
private final ClusterService clusterService; private final ClusterService clusterService;
private final ThreadPool threadPool;
private volatile String rateLimitingType; private volatile String rateLimitingType;
private volatile ByteSizeValue rateLimitingThrottle; private volatile ByteSizeValue rateLimitingThrottle;
private final StoreRateLimiting rateLimiting = new StoreRateLimiting(); private final StoreRateLimiting rateLimiting = new StoreRateLimiting();
@ -96,7 +94,6 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
this.nodeSettingsService = nodeSettingsService; this.nodeSettingsService = nodeSettingsService;
this.indicesService = indicesService; this.indicesService = indicesService;
this.clusterService = clusterService; this.clusterService = clusterService;
this.threadPool = threadPool;
this.rateLimitingType = componentSettings.get("throttle.type", "none"); this.rateLimitingType = componentSettings.get("throttle.type", "none");
rateLimiting.setType(rateLimitingType); rateLimiting.setType(rateLimitingType);
@ -128,55 +125,20 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
return; return;
} }
// when all shards are started within a shard replication group, delete an unallocated shard on this node for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) {
RoutingTable routingTable = event.state().routingTable(); // Note, closed indices will not have any routing information, so won't be deleted
for (IndexRoutingTable indexRoutingTable : routingTable) {
IndexService indexService = indicesService.indexService(indexRoutingTable.index());
if (indexService == null) {
// we handle this later...
continue;
}
// if the store is not persistent, don't bother trying to check if it can be deleted
if (!indexService.store().persistent()) {
continue;
}
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) { for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
// if it has been created on this node, we don't want to delete it ShardId shardId = indexShardRoutingTable.shardId();
if (indexService.hasShard(indexShardRoutingTable.shardId().id())) { // a shard can be deleted if all its copies are active, and its not allocated on this node
continue; boolean shardCanBeDeleted = true;
} if (indexShardRoutingTable.size() == 0) {
if (!indexService.store().canDeleteUnallocated(indexShardRoutingTable.shardId())) { // should not really happen, there should always be at least 1 (primary) shard in a
continue; // shard replication group, in any case, protected from deleting something by mistake
} shardCanBeDeleted = false;
// only delete an unallocated shard if all (other shards) are started } else {
int startedShardsCount = indexShardRoutingTable.countWithState(ShardRoutingState.STARTED);
if (startedShardsCount > 0 && startedShardsCount == indexShardRoutingTable.size()) {
if (logger.isDebugEnabled()) {
logger.debug("[{}][{}] deleting unallocated shard", indexShardRoutingTable.shardId().index().name(), indexShardRoutingTable.shardId().id());
}
try {
indexService.store().deleteUnallocated(indexShardRoutingTable.shardId());
} catch (Exception e) {
logger.debug("[{}][{}] failed to delete unallocated shard, ignoring", e, indexShardRoutingTable.shardId().index().name(), indexShardRoutingTable.shardId().id());
}
}
}
}
// do the reverse, and delete dangling shards that might remain on that node
// but are allocated on other nodes
if (nodeEnv.hasNodeFile()) {
// delete unused shards for existing indices
for (IndexRoutingTable indexRoutingTable : routingTable) {
IndexService indexService = indicesService.indexService(indexRoutingTable.index());
if (indexService != null) { // allocated, ignore this
continue;
}
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
boolean shardCanBeDeleted = true;
for (ShardRouting shardRouting : indexShardRoutingTable) { for (ShardRouting shardRouting : indexShardRoutingTable) {
// don't delete a shard that not all instances are active // be conservative here, check on started, not even active
if (!shardRouting.active()) { if (!shardRouting.started()) {
shardCanBeDeleted = false; shardCanBeDeleted = false;
break; break;
} }
@ -188,13 +150,30 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
break; break;
} }
} }
if (shardCanBeDeleted) { }
ShardId shardId = indexShardRoutingTable.shardId(); if (shardCanBeDeleted) {
for (File shardLocation : nodeEnv.shardLocations(shardId)) { IndexService indexService = indicesService.indexService(indexRoutingTable.index());
if (shardLocation.exists()) { if (indexService == null) {
// not physical allocation of the index, delete it from the file system if applicable
if (nodeEnv.hasNodeFile()) {
logger.debug("[{}][{}] deleting shard that is no longer used", shardId.index().name(), shardId.id());
FileSystemUtils.deleteRecursively(nodeEnv.shardLocations(shardId));
}
} else {
if (!indexService.hasShard(shardId.id())) {
if (indexService.store().canDeleteUnallocated(shardId)) {
logger.debug("[{}][{}] deleting shard that is no longer used", shardId.index().name(), shardId.id()); logger.debug("[{}][{}] deleting shard that is no longer used", shardId.index().name(), shardId.id());
FileSystemUtils.deleteRecursively(shardLocation); try {
indexService.store().deleteUnallocated(indexShardRoutingTable.shardId());
} catch (Exception e) {
logger.debug("[{}][{}] failed to delete unallocated shard, ignoring", e, indexShardRoutingTable.shardId().index().name(), indexShardRoutingTable.shardId().id());
}
} }
} else {
// this state is weird, should we log?
// basically, it means that the shard is not allocated on this node using the routing
// but its still physically exists on an IndexService
// Note, this listener should run after IndicesClusterStateService...
} }
} }
} }