diff --git a/core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 70880373530..6df5f85987d 100644 --- a/core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/core/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -210,12 +210,9 @@ public class SnapshotsInProgress extends AbstractDiffable implements Cus public static class ShardSnapshotStatus { - private State state; - private String nodeId; - private String reason; - - private ShardSnapshotStatus() { - } + private final State state; + private final String nodeId; + private final String reason; public ShardSnapshotStatus(String nodeId) { this(nodeId, State.INIT); @@ -231,6 +228,12 @@ public class SnapshotsInProgress extends AbstractDiffable implements Cus this.reason = reason; } + public ShardSnapshotStatus(StreamInput in) throws IOException { + nodeId = in.readOptionalString(); + state = State.fromValue(in.readByte()); + reason = in.readOptionalString(); + } + public State state() { return state; } @@ -243,18 +246,6 @@ public class SnapshotsInProgress extends AbstractDiffable implements Cus return reason; } - public static ShardSnapshotStatus readShardSnapshotStatus(StreamInput in) throws IOException { - ShardSnapshotStatus shardSnapshotStatus = new ShardSnapshotStatus(); - shardSnapshotStatus.readFrom(in); - return shardSnapshotStatus; - } - - public void readFrom(StreamInput in) throws IOException { - nodeId = in.readOptionalString(); - state = State.fromValue(in.readByte()); - reason = in.readOptionalString(); - } - public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(nodeId); out.writeByte(state.value); @@ -282,6 +273,11 @@ public class SnapshotsInProgress extends AbstractDiffable implements Cus result = 31 * result + (reason != null ? reason.hashCode() : 0); return result; } + + @Override + public String toString() { + return "ShardSnapshotStatus[state=" + state + ", nodeId=" + nodeId + ", reason=" + reason + "]"; + } } public enum State { diff --git a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java index 9d0edf7b910..f3a8932f53c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java +++ b/core/src/main/java/org/elasticsearch/cluster/node/DiscoveryNodes.java @@ -26,6 +26,7 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.common.Booleans; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -399,9 +400,7 @@ public class DiscoveryNodes extends AbstractDiffable implements public String toString() { StringBuilder sb = new StringBuilder(); sb.append("{"); - for (DiscoveryNode node : this) { - sb.append(node).append(','); - } + sb.append(Strings.collectionToDelimitedString(this, ",")); sb.append("}"); return sb.toString(); } diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java index ee1f1d1c976..ffcc7648293 100644 --- a/core/src/main/java/org/elasticsearch/index/IndexService.java +++ b/core/src/main/java/org/elasticsearch/index/IndexService.java @@ -411,7 +411,11 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust } } finally { try { - store.close(); + if (store != null) { + store.close(); + } else { + logger.trace("[{}] store not initialized prior to closing shard, nothing to close", shardId); + } } catch (Exception e) { logger.warn( (Supplier) () -> new ParameterizedMessage( diff --git a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java b/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java index 51c8bcf5d7e..644caa7520b 100644 --- a/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java +++ b/core/src/main/java/org/elasticsearch/index/snapshots/IndexShardSnapshotStatus.java @@ -27,7 +27,7 @@ public class IndexShardSnapshotStatus { /** * Snapshot stage */ - public static enum Stage { + public enum Stage { /** * Snapshot hasn't started yet */ @@ -66,7 +66,7 @@ public class IndexShardSnapshotStatus { private long indexVersion; - private boolean aborted; + private volatile boolean aborted; private String failure; diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index b55074f592d..1fb360ccfd5 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -69,6 +69,7 @@ import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.search.SearchService; import org.elasticsearch.snapshots.RestoreService; +import org.elasticsearch.snapshots.SnapshotShardsService; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -113,10 +114,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple NodeMappingRefreshAction nodeMappingRefreshAction, RepositoriesService repositoriesService, RestoreService restoreService, SearchService searchService, SyncedFlushService syncedFlushService, - PeerRecoverySourceService peerRecoverySourceService) { + PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService) { this(settings, (AllocatedIndices>) indicesService, clusterService, threadPool, recoveryTargetService, shardStateAction, - nodeMappingRefreshAction, repositoriesService, restoreService, searchService, syncedFlushService, peerRecoverySourceService); + nodeMappingRefreshAction, repositoriesService, restoreService, searchService, syncedFlushService, peerRecoverySourceService, + snapshotShardsService); } // for tests @@ -128,9 +130,10 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent imple NodeMappingRefreshAction nodeMappingRefreshAction, RepositoriesService repositoriesService, RestoreService restoreService, SearchService searchService, SyncedFlushService syncedFlushService, - PeerRecoverySourceService peerRecoverySourceService) { + PeerRecoverySourceService peerRecoverySourceService, SnapshotShardsService snapshotShardsService) { super(settings); - this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, syncedFlushService); + this.buildInIndexListener = Arrays.asList(peerRecoverySourceService, recoveryTargetService, searchService, syncedFlushService, + snapshotShardsService); this.indicesService = indicesService; this.clusterService = clusterService; this.threadPool = threadPool; diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 1f7a4ee4fd6..d939236732e 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -29,8 +29,10 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -42,11 +44,13 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.engine.SnapshotFailedEngineException; +import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.snapshots.IndexShardSnapshotFailedException; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus.Stage; import org.elasticsearch.indices.IndicesService; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; @@ -80,7 +84,7 @@ import static org.elasticsearch.cluster.SnapshotsInProgress.completed; * This service runs on data and master nodes and controls currently snapshotted shards on these nodes. It is responsible for * starting and stopping shard level snapshots */ -public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener { +public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener { public static final String UPDATE_SNAPSHOT_ACTION_NAME = "internal:cluster/snapshot/update_snapshot"; @@ -156,12 +160,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements SnapshotsInProgress prev = event.previousState().custom(SnapshotsInProgress.TYPE); SnapshotsInProgress curr = event.state().custom(SnapshotsInProgress.TYPE); - if (prev == null) { - if (curr != null) { - processIndexShardSnapshots(event); - } - } else if (prev.equals(curr) == false) { - processIndexShardSnapshots(event); + if ((prev == null && curr != null) || (prev != null && prev.equals(curr) == false)) { + processIndexShardSnapshots(event); } String masterNodeId = event.state().nodes().getMasterNodeId(); if (masterNodeId != null && masterNodeId.equals(event.previousState().nodes().getMasterNodeId()) == false) { @@ -173,6 +173,18 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements } } + @Override + public void beforeIndexShardClosed(ShardId shardId, @Nullable IndexShard indexShard, Settings indexSettings) { + // abort any snapshots occurring on the soon-to-be closed shard + Map snapshotShardsMap = shardSnapshots; + for (Map.Entry snapshotShards : snapshotShardsMap.entrySet()) { + Map shards = snapshotShards.getValue().shards; + if (shards.containsKey(shardId)) { + logger.debug("[{}] shard closing, abort snapshotting for snapshot [{}]", shardId, snapshotShards.getKey().getSnapshotId()); + shards.get(shardId).abort(); + } + } + } /** * Returns status of shards that are snapshotted on the node and belong to the given snapshot @@ -205,6 +217,16 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements final Snapshot snapshot = entry.getKey(); if (snapshotsInProgress != null && snapshotsInProgress.snapshot(snapshot) != null) { survivors.put(entry.getKey(), entry.getValue()); + } else { + // abort any running snapshots of shards for the removed entry; + // this could happen if for some reason the cluster state update for aborting + // running shards is missed, then the snapshot is removed is a subsequent cluster + // state update, which is being processed here + for (IndexShardSnapshotStatus snapshotStatus : entry.getValue().shards.values()) { + if (snapshotStatus.stage() == Stage.INIT || snapshotStatus.stage() == Stage.STARTED) { + snapshotStatus.abort(); + } + } } } @@ -221,7 +243,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements if (entry.state() == SnapshotsInProgress.State.STARTED) { Map startedShards = new HashMap<>(); SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshot()); - for (ObjectObjectCursor shard : entry.shards()) { + for (ObjectObjectCursor shard : entry.shards()) { // Add all new shards to start processing on if (localNodeId.equals(shard.value.nodeId())) { if (shard.value.state() == SnapshotsInProgress.State.INIT && (snapshotShards == null || !snapshotShards.shards.containsKey(shard.key))) { @@ -249,7 +271,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements // Abort all running shards for this snapshot SnapshotShards snapshotShards = shardSnapshots.get(entry.snapshot()); if (snapshotShards != null) { - for (ObjectObjectCursor shard : entry.shards()) { + for (ObjectObjectCursor shard : entry.shards()) { IndexShardSnapshotStatus snapshotStatus = snapshotShards.shards.get(shard.key); if (snapshotStatus != null) { switch (snapshotStatus.stage()) { @@ -263,12 +285,12 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements case DONE: logger.debug("[{}] trying to cancel snapshot on the shard [{}] that is already done, updating status on the master", entry.snapshot(), shard.key); updateIndexShardSnapshotStatus(entry.snapshot(), shard.key, - new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.SUCCESS)); + new ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.SUCCESS)); break; case FAILURE: logger.debug("[{}] trying to cancel snapshot on the shard [{}] that has already failed, updating status on the master", entry.snapshot(), shard.key); updateIndexShardSnapshotStatus(entry.snapshot(), shard.key, - new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.FAILED, snapshotStatus.failure())); + new ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.FAILED, snapshotStatus.failure())); break; default: throw new IllegalStateException("Unknown snapshot shard stage " + snapshotStatus.stage()); @@ -309,18 +331,18 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements @Override public void doRun() { snapshot(indexShard, entry.getKey(), indexId, shardEntry.getValue()); - updateIndexShardSnapshotStatus(entry.getKey(), shardId, new SnapshotsInProgress.ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.SUCCESS)); + updateIndexShardSnapshotStatus(entry.getKey(), shardId, new ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.SUCCESS)); } @Override public void onFailure(Exception e) { logger.warn((Supplier) () -> new ParameterizedMessage("[{}] [{}] failed to create snapshot", shardId, entry.getKey()), e); - updateIndexShardSnapshotStatus(entry.getKey(), shardId, new SnapshotsInProgress.ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.FAILED, ExceptionsHelper.detailedMessage(e))); + updateIndexShardSnapshotStatus(entry.getKey(), shardId, new ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.FAILED, ExceptionsHelper.detailedMessage(e))); } }); } catch (Exception e) { - updateIndexShardSnapshotStatus(entry.getKey(), shardId, new SnapshotsInProgress.ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.FAILED, ExceptionsHelper.detailedMessage(e))); + updateIndexShardSnapshotStatus(entry.getKey(), shardId, new ShardSnapshotStatus(localNodeId, SnapshotsInProgress.State.FAILED, ExceptionsHelper.detailedMessage(e))); } } } @@ -383,23 +405,23 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements if (snapshot.state() == SnapshotsInProgress.State.STARTED || snapshot.state() == SnapshotsInProgress.State.ABORTED) { Map localShards = currentSnapshotShards(snapshot.snapshot()); if (localShards != null) { - ImmutableOpenMap masterShards = snapshot.shards(); + ImmutableOpenMap masterShards = snapshot.shards(); for(Map.Entry localShard : localShards.entrySet()) { ShardId shardId = localShard.getKey(); IndexShardSnapshotStatus localShardStatus = localShard.getValue(); - SnapshotsInProgress.ShardSnapshotStatus masterShard = masterShards.get(shardId); + ShardSnapshotStatus masterShard = masterShards.get(shardId); if (masterShard != null && masterShard.state().completed() == false) { // Master knows about the shard and thinks it has not completed - if (localShardStatus.stage() == IndexShardSnapshotStatus.Stage.DONE) { + if (localShardStatus.stage() == Stage.DONE) { // but we think the shard is done - we need to make new master know that the shard is done logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard is done locally, updating status on the master", snapshot.snapshot(), shardId); updateIndexShardSnapshotStatus(snapshot.snapshot(), shardId, - new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.SUCCESS)); - } else if (localShard.getValue().stage() == IndexShardSnapshotStatus.Stage.FAILURE) { + new ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.SUCCESS)); + } else if (localShard.getValue().stage() == Stage.FAILURE) { // but we think the shard failed - we need to make new master know that the shard failed logger.debug("[{}] new master thinks the shard [{}] is not completed but the shard failed locally, updating status on master", snapshot.snapshot(), shardId); updateIndexShardSnapshotStatus(snapshot.snapshot(), shardId, - new SnapshotsInProgress.ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.FAILED, localShardStatus.failure())); + new ShardSnapshotStatus(event.state().nodes().getLocalNodeId(), SnapshotsInProgress.State.FAILED, localShardStatus.failure())); } } @@ -427,7 +449,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements public static class UpdateIndexShardSnapshotStatusRequest extends TransportRequest { private Snapshot snapshot; private ShardId shardId; - private SnapshotsInProgress.ShardSnapshotStatus status; + private ShardSnapshotStatus status; private volatile boolean processed; // state field, no need to serialize @@ -435,7 +457,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements } - public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, SnapshotsInProgress.ShardSnapshotStatus status) { + public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) { this.snapshot = snapshot; this.shardId = shardId; this.status = status; @@ -446,7 +468,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements super.readFrom(in); snapshot = new Snapshot(in); shardId = ShardId.readShardId(in); - status = SnapshotsInProgress.ShardSnapshotStatus.readShardSnapshotStatus(in); + status = new ShardSnapshotStatus(in); } @Override @@ -465,7 +487,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements return shardId; } - public SnapshotsInProgress.ShardSnapshotStatus status() { + public ShardSnapshotStatus status() { return status; } @@ -486,7 +508,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements /** * Updates the shard status */ - public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, SnapshotsInProgress.ShardSnapshotStatus status) { + public void updateIndexShardSnapshotStatus(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) { UpdateIndexShardSnapshotStatusRequest request = new UpdateIndexShardSnapshotStatusRequest(snapshot, shardId, status); try { if (clusterService.state().nodes().isLocalNodeElectedMaster()) { @@ -533,7 +555,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements int changedCount = 0; final List entries = new ArrayList<>(); for (SnapshotsInProgress.Entry entry : snapshots.entries()) { - ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); + ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); boolean updated = false; for (int i = 0; i < batchSize; i++) { diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index ea8deea5661..f5ef5f111df 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -793,12 +793,12 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus } private boolean removedNodesCleanupNeeded(ClusterChangedEvent event) { - // Check if we just became the master - boolean newMaster = !event.previousState().nodes().isLocalNodeElectedMaster(); SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); if (snapshotsInProgress == null) { return false; } + // Check if we just became the master + boolean newMaster = !event.previousState().nodes().isLocalNodeElectedMaster(); for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) { if (newMaster && (snapshot.state() == State.SUCCESS || snapshot.state() == State.INIT)) { // We just replaced old master and snapshots in intermediate states needs to be cleaned diff --git a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java index 7a4753725ad..67a82d93c54 100644 --- a/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java +++ b/core/src/test/java/org/elasticsearch/indices/cluster/IndicesClusterStateServiceRandomUpdatesTests.java @@ -376,7 +376,7 @@ public class IndicesClusterStateServiceRandomUpdatesTests extends AbstractIndice transportService, null, clusterService); final ShardStateAction shardStateAction = mock(ShardStateAction.class); return new IndicesClusterStateService(settings, indicesService, clusterService, - threadPool, recoveryTargetService, shardStateAction, null, repositoriesService, null, null, null, null); + threadPool, recoveryTargetService, shardStateAction, null, repositoriesService, null, null, null, null, null); } private class RecordingIndicesService extends MockIndicesService { diff --git a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index bf69f6016de..b7ce99e6ea3 100644 --- a/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/core/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -51,6 +51,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; +import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -58,8 +59,10 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.index.IndexService; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.IndexStore; +import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.InvalidIndexNameException; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; @@ -2490,4 +2493,66 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas waitForCompletion(repositoryName, inProgressSnapshot, TimeValue.timeValueSeconds(60)); } + /** + * This test ensures that when a shard is removed from a node (perhaps due to the node + * leaving the cluster, then returning), all snapshotting of that shard is aborted, so + * all Store references held onto by the snapshot are released. + * + * See https://github.com/elastic/elasticsearch/issues/20876 + */ + public void testSnapshotCanceledOnRemovedShard() throws Exception { + final int numPrimaries = 1; + final int numReplicas = 1; + final int numDocs = 100; + final String repo = "test-repo"; + final String index = "test-idx"; + final String snapshot = "test-snap"; + + assertAcked(prepareCreate(index, 1, + Settings.builder().put("number_of_shards", numPrimaries).put("number_of_replicas", numReplicas))); + + logger.info("--> indexing some data"); + for (int i = 0; i < numDocs; i++) { + index(index, "doc", Integer.toString(i), "foo", "bar" + i); + } + refresh(); + + logger.info("--> creating repository"); + PutRepositoryResponse putRepositoryResponse = + client().admin().cluster().preparePutRepository(repo).setType("mock").setSettings(Settings.builder() + .put("location", randomRepoPath()) + .put("random", randomAsciiOfLength(10)) + .put("wait_after_unblock", 200) + ).get(); + assertTrue(putRepositoryResponse.isAcknowledged()); + + String blockedNode = blockNodeWithIndex(repo, index); + + logger.info("--> snapshot"); + client().admin().cluster().prepareCreateSnapshot(repo, snapshot) + .setWaitForCompletion(false) + .execute(); + + logger.info("--> waiting for block to kick in on node [{}]", blockedNode); + waitForBlock(blockedNode, repo, TimeValue.timeValueSeconds(10)); + + logger.info("--> removing primary shard that is being snapshotted"); + ClusterState clusterState = internalCluster().clusterService(internalCluster().getMasterName()).state(); + IndexRoutingTable indexRoutingTable = clusterState.getRoutingTable().index(index); + String nodeWithPrimary = clusterState.nodes().get(indexRoutingTable.shard(0).primaryShard().currentNodeId()).getName(); + assertNotNull("should be at least one node with a primary shard", nodeWithPrimary); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, nodeWithPrimary); + IndexService indexService = indicesService.indexService(resolveIndex(index)); + indexService.removeShard(0, "simulate node removal"); + + logger.info("--> unblocking blocked node [{}]", blockedNode); + unblockNode(repo, blockedNode); + + logger.info("--> ensuring snapshot is aborted and the aborted shard was marked as failed"); + SnapshotInfo snapshotInfo = waitForCompletion(repo, snapshot, TimeValue.timeValueSeconds(10)); + assertEquals(1, snapshotInfo.shardFailures().size()); + assertEquals(0, snapshotInfo.shardFailures().get(0).shardId()); + assertEquals("IndexShardSnapshotFailedException[Aborted]", snapshotInfo.shardFailures().get(0).reason()); + } + }