From bdbe86dd2db73c6ff59eebe5b2a0bd251270db2d Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 17 Jul 2014 11:58:23 +0200 Subject: [PATCH] [Store] delete unallocated shards under a cluster state task This is to prevent a rare racing condition where the very same shard gets allocated to the node after our sanity check that the cluster state didn't check and the actual deletion of the files. Closes #6902 --- .../indices/store/IndicesStore.java | 68 ++++++++++++------- 1 file changed, 42 insertions(+), 26 deletions(-) diff --git a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index e1f31c5bf0f..9b62ecc1e61 100644 --- a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -237,7 +237,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe requests.add(new Tuple<>(currentNode, new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId()))); if (shardRouting.relocatingNodeId() != null) { DiscoveryNode relocatingNode = state.nodes().get(shardRouting.relocatingNodeId()); - assert relocatingNode != null; + assert relocatingNode != null; requests.add(new Tuple<>(relocatingNode, new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId()))); } } @@ -296,43 +296,59 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe private void allNodesResponded() { if (activeCopies.get() != expectedActiveCopies) { - logger.trace("not deleting shard [{}], expected {} active copies, but only {} found active copies", shardId, expectedActiveCopies, activeCopies.get()); + logger.trace("not deleting shard {}, expected {} active copies, but only {} found active copies", shardId, expectedActiveCopies, activeCopies.get()); return; } ClusterState latestClusterState = clusterService.state(); if (clusterState.getVersion() != latestClusterState.getVersion()) { - logger.trace("not deleting shard [{}], the latest cluster state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, latestClusterState.getVersion(), clusterState.getVersion()); + logger.trace("not deleting shard {}, the latest cluster state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, latestClusterState.getVersion(), clusterState.getVersion()); return; } - IndexService indexService = indicesService.indexService(shardId.getIndex()); - if (indexService == null) { - // not physical allocation of the index, delete it from the file system if applicable - if (nodeEnv.hasNodeFile()) { - File[] shardLocations = nodeEnv.shardLocations(shardId); - if (FileSystemUtils.exists(shardLocations)) { - logger.debug("[{}][{}] deleting shard that is no longer used", shardId.index().name(), shardId.id()); - FileSystemUtils.deleteRecursively(shardLocations); + clusterService.submitStateUpdateTask("indices_store", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + if (clusterState.getVersion() != currentState.getVersion()) { + logger.trace("not deleting shard {}, the update task state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, currentState.getVersion(), clusterState.getVersion()); + return currentState; } - } - } else { - if (!indexService.hasShard(shardId.id())) { - if (indexService.store().canDeleteUnallocated(shardId)) { - logger.debug("[{}][{}] deleting shard that is no longer used", shardId.index().name(), shardId.id()); - try { - indexService.store().deleteUnallocated(shardId); - } catch (Exception e) { - logger.debug("[{}][{}] failed to delete unallocated shard, ignoring", e, shardId.index().name(), shardId.id()); + + IndexService indexService = indicesService.indexService(shardId.getIndex()); + if (indexService == null) { + // not physical allocation of the index, delete it from the file system if applicable + if (nodeEnv.hasNodeFile()) { + File[] shardLocations = nodeEnv.shardLocations(shardId); + if (FileSystemUtils.exists(shardLocations)) { + logger.debug("[{}][{}] deleting shard that is no longer used", shardId); + FileSystemUtils.deleteRecursively(shardLocations); + } + } + } else { + if (!indexService.hasShard(shardId.id())) { + if (indexService.store().canDeleteUnallocated(shardId)) { + logger.debug("{} deleting shard that is no longer used", shardId); + try { + indexService.store().deleteUnallocated(shardId); + } catch (Exception e) { + logger.debug("{} failed to delete unallocated shard, ignoring", e, shardId); + } + } + } else { + // this state is weird, should we log? + // basically, it means that the shard is not allocated on this node using the routing + // but its still physically exists on an IndexService + // Note, this listener should run after IndicesClusterStateService... } } - } else { - // this state is weird, should we log? - // basically, it means that the shard is not allocated on this node using the routing - // but its still physically exists on an IndexService - // Note, this listener should run after IndicesClusterStateService... + return currentState; } - } + + @Override + public void onFailure(String source, Throwable t) { + logger.error("{} unexpected error during deletion of unallocated shard", t, shardId); + } + }); } }