diff --git a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index e1f31c5bf0f..9b62ecc1e61 100644 --- a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -237,7 +237,7 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe requests.add(new Tuple<>(currentNode, new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId()))); if (shardRouting.relocatingNodeId() != null) { DiscoveryNode relocatingNode = state.nodes().get(shardRouting.relocatingNodeId()); - assert relocatingNode != null; + assert relocatingNode != null; requests.add(new Tuple<>(relocatingNode, new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId()))); } } @@ -296,43 +296,59 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe private void allNodesResponded() { if (activeCopies.get() != expectedActiveCopies) { - logger.trace("not deleting shard [{}], expected {} active copies, but only {} found active copies", shardId, expectedActiveCopies, activeCopies.get()); + logger.trace("not deleting shard {}, expected {} active copies, but only {} found active copies", shardId, expectedActiveCopies, activeCopies.get()); return; } ClusterState latestClusterState = clusterService.state(); if (clusterState.getVersion() != latestClusterState.getVersion()) { - logger.trace("not deleting shard [{}], the latest cluster state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, latestClusterState.getVersion(), clusterState.getVersion()); + logger.trace("not deleting shard {}, the latest cluster state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, latestClusterState.getVersion(), clusterState.getVersion()); return; } - IndexService indexService = indicesService.indexService(shardId.getIndex()); - if (indexService == null) { - // not physical allocation of the index, delete it from the file system if applicable - if (nodeEnv.hasNodeFile()) { - File[] shardLocations = nodeEnv.shardLocations(shardId); - if (FileSystemUtils.exists(shardLocations)) { - logger.debug("[{}][{}] deleting shard that is no longer used", shardId.index().name(), shardId.id()); - FileSystemUtils.deleteRecursively(shardLocations); + clusterService.submitStateUpdateTask("indices_store", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + if (clusterState.getVersion() != currentState.getVersion()) { + logger.trace("not deleting shard {}, the update task state version[{}] is not equal to cluster state before shard active api call [{}]", shardId, currentState.getVersion(), clusterState.getVersion()); + return currentState; } - } - } else { - if (!indexService.hasShard(shardId.id())) { - if (indexService.store().canDeleteUnallocated(shardId)) { - logger.debug("[{}][{}] deleting shard that is no longer used", shardId.index().name(), shardId.id()); - try { - indexService.store().deleteUnallocated(shardId); - } catch (Exception e) { - logger.debug("[{}][{}] failed to delete unallocated shard, ignoring", e, shardId.index().name(), shardId.id()); + + IndexService indexService = indicesService.indexService(shardId.getIndex()); + if (indexService == null) { + // not physical allocation of the index, delete it from the file system if applicable + if (nodeEnv.hasNodeFile()) { + File[] shardLocations = nodeEnv.shardLocations(shardId); + if (FileSystemUtils.exists(shardLocations)) { + logger.debug("[{}][{}] deleting shard that is no longer used", shardId); + FileSystemUtils.deleteRecursively(shardLocations); + } + } + } else { + if (!indexService.hasShard(shardId.id())) { + if (indexService.store().canDeleteUnallocated(shardId)) { + logger.debug("{} deleting shard that is no longer used", shardId); + try { + indexService.store().deleteUnallocated(shardId); + } catch (Exception e) { + logger.debug("{} failed to delete unallocated shard, ignoring", e, shardId); + } + } + } else { + // this state is weird, should we log? + // basically, it means that the shard is not allocated on this node using the routing + // but its still physically exists on an IndexService + // Note, this listener should run after IndicesClusterStateService... } } - } else { - // this state is weird, should we log? - // basically, it means that the shard is not allocated on this node using the routing - // but its still physically exists on an IndexService - // Note, this listener should run after IndicesClusterStateService... + return currentState; } - } + + @Override + public void onFailure(String source, Throwable t) { + logger.error("{} unexpected error during deletion of unallocated shard", t, shardId); + } + }); } }