[Store] delete unallocated shards under a cluster state task

This is to prevent a rare racing condition where the very same shard gets allocated to the node after our sanity check that the cluster state didn't check and the actual deletion of the files.

Closes #6902
This commit is contained in:
Boaz Leskes 2014-07-17 11:58:23 +02:00
parent bb421d7ea3
commit bdbe86dd2d
1 changed files with 42 additions and 26 deletions

View File

@ -237,7 +237,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
requests.add(new Tuple<>(currentNode, new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId()))); requests.add(new Tuple<>(currentNode, new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId())));
if (shardRouting.relocatingNodeId() != null) { if (shardRouting.relocatingNodeId() != null) {
DiscoveryNode relocatingNode = state.nodes().get(shardRouting.relocatingNodeId()); DiscoveryNode relocatingNode = state.nodes().get(shardRouting.relocatingNodeId());
assert relocatingNode != null; assert relocatingNode != null;
requests.add(new Tuple<>(relocatingNode, new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId()))); requests.add(new Tuple<>(relocatingNode, new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId())));
} }
} }
@ -296,43 +296,59 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
private void allNodesResponded() { private void allNodesResponded() {
if (activeCopies.get() != expectedActiveCopies) { if (activeCopies.get() != expectedActiveCopies) {
logger.trace("not deleting shard [{}], expected {} active copies, but only {} found active copies", shardId, expectedActiveCopies, activeCopies.get()); logger.trace("not deleting shard {}, expected {} active copies, but only {} found active copies", shardId, expectedActiveCopies, activeCopies.get());
return; return;
} }
ClusterState latestClusterState = clusterService.state(); ClusterState latestClusterState = clusterService.state();
if (clusterState.getVersion() != latestClusterState.getVersion()) { if (clusterState.getVersion() != latestClusterState.getVersion()) {
logger.trace("not deleting shard [{}], the latest cluster state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, latestClusterState.getVersion(), clusterState.getVersion()); logger.trace("not deleting shard {}, the latest cluster state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, latestClusterState.getVersion(), clusterState.getVersion());
return; return;
} }
IndexService indexService = indicesService.indexService(shardId.getIndex()); clusterService.submitStateUpdateTask("indices_store", new ClusterStateUpdateTask() {
if (indexService == null) { @Override
// not physical allocation of the index, delete it from the file system if applicable public ClusterState execute(ClusterState currentState) throws Exception {
if (nodeEnv.hasNodeFile()) { if (clusterState.getVersion() != currentState.getVersion()) {
File[] shardLocations = nodeEnv.shardLocations(shardId); logger.trace("not deleting shard {}, the update task state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, currentState.getVersion(), clusterState.getVersion());
if (FileSystemUtils.exists(shardLocations)) { return currentState;
logger.debug("[{}][{}] deleting shard that is no longer used", shardId.index().name(), shardId.id());
FileSystemUtils.deleteRecursively(shardLocations);
} }
}
} else { IndexService indexService = indicesService.indexService(shardId.getIndex());
if (!indexService.hasShard(shardId.id())) { if (indexService == null) {
if (indexService.store().canDeleteUnallocated(shardId)) { // not physical allocation of the index, delete it from the file system if applicable
logger.debug("[{}][{}] deleting shard that is no longer used", shardId.index().name(), shardId.id()); if (nodeEnv.hasNodeFile()) {
try { File[] shardLocations = nodeEnv.shardLocations(shardId);
indexService.store().deleteUnallocated(shardId); if (FileSystemUtils.exists(shardLocations)) {
} catch (Exception e) { logger.debug("[{}][{}] deleting shard that is no longer used", shardId);
logger.debug("[{}][{}] failed to delete unallocated shard, ignoring", e, shardId.index().name(), shardId.id()); FileSystemUtils.deleteRecursively(shardLocations);
}
}
} else {
if (!indexService.hasShard(shardId.id())) {
if (indexService.store().canDeleteUnallocated(shardId)) {
logger.debug("{} deleting shard that is no longer used", shardId);
try {
indexService.store().deleteUnallocated(shardId);
} catch (Exception e) {
logger.debug("{} failed to delete unallocated shard, ignoring", e, shardId);
}
}
} else {
// this state is weird, should we log?
// basically, it means that the shard is not allocated on this node using the routing
// but its still physically exists on an IndexService
// Note, this listener should run after IndicesClusterStateService...
} }
} }
} else { return currentState;
// this state is weird, should we log?
// basically, it means that the shard is not allocated on this node using the routing
// but its still physically exists on an IndexService
// Note, this listener should run after IndicesClusterStateService...
} }
}
@Override
public void onFailure(String source, Throwable t) {
logger.error("{} unexpected error during deletion of unallocated shard", t, shardId);
}
});
} }
} }