diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java index e054ad266a5..34ff64bc533 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java @@ -92,7 +92,7 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA clusterService.addStateApplier(event -> { if (event.localNodeMaster() && event.previousState().nodes().isLocalNodeElectedMaster() == false) { final RepositoryCleanupInProgress repositoryCleanupInProgress = event.state().custom(RepositoryCleanupInProgress.TYPE); - if (repositoryCleanupInProgress == null || repositoryCleanupInProgress.cleanupInProgress() == false) { + if (repositoryCleanupInProgress == null || repositoryCleanupInProgress.hasCleanupInProgress() == false) { return; } clusterService.submitStateUpdateTask("clean up repository cleanup task after master failover", @@ -121,7 +121,7 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA RepositoryCleanupInProgress cleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE); if (cleanupInProgress != null) { boolean changed = false; - if (cleanupInProgress.cleanupInProgress() == false) { + if (cleanupInProgress.hasCleanupInProgress()) { cleanupInProgress = new RepositoryCleanupInProgress(); changed = true; } @@ -171,10 +171,13 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA logger.info("Running cleanup operations on repository [{}][{}]", repositoryName, repositoryStateId); clusterService.submitStateUpdateTask("cleanup repository [" + repositoryName + "][" + repositoryStateId + ']', new ClusterStateUpdateTask() { + + private boolean startedCleanup = false; + @Override public ClusterState execute(ClusterState currentState) { final RepositoryCleanupInProgress repositoryCleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE); - if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) { + if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) { throw new IllegalStateException( "Cannot cleanup [" + repositoryName + "] - a repository cleanup is already in-progress in [" + repositoryCleanupInProgress + "]"); @@ -201,6 +204,7 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + startedCleanup = true; logger.debug("Initialized repository cleanup in cluster state for [{}][{}]", repositoryName, repositoryStateId); threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> blobStoreRepository.cleanup( @@ -217,6 +221,11 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA "Failed to finish repository cleanup operations on [{}][{}]", repositoryName, repositoryStateId), failure); } assert failure != null || result != null; + if (startedCleanup == false) { + logger.debug("No cleanup task to remove from cluster state because we failed to start one", failure); + listener.onFailure(failure); + return; + } clusterService.submitStateUpdateTask( "remove repository cleanup task [" + repositoryName + "][" + repositoryStateId + ']', new ClusterStateUpdateTask() { diff --git a/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java b/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java index e7c8e995dd6..2acdfff2fa4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/RepositoryCleanupInProgress.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -51,9 +52,13 @@ public final class RepositoryCleanupInProgress extends AbstractNamedDiffable entries() { + return new ArrayList<>(entries); } @Override @@ -106,6 +111,10 @@ public final class RepositoryCleanupInProgress extends AbstractNamedDiffable stopping master node"); + internalCluster().stopCurrentMasterNode(); + + logger.info("--> wait for cleanup to finish and disappear from cluster state"); + assertBusy(() -> { + RepositoryCleanupInProgress cleanupInProgress = + client().admin().cluster().prepareState().get().getState().custom(RepositoryCleanupInProgress.TYPE); + assertFalse(cleanupInProgress.hasCleanupInProgress()); + }, 30, TimeUnit.SECONDS); + } + + public void testRepeatCleanupsDontRemove() throws Exception { + final String masterNode = startBlockedCleanup("test-repo"); + + logger.info("--> sending another cleanup"); + assertThrows(client().admin().cluster().prepareCleanupRepository("test-repo").execute(), IllegalStateException.class); + + logger.info("--> ensure cleanup is still in progress"); + final RepositoryCleanupInProgress cleanup = + client().admin().cluster().prepareState().get().getState().custom(RepositoryCleanupInProgress.TYPE); + assertTrue(cleanup.hasCleanupInProgress()); + + logger.info("--> unblocking master node"); + unblockNode("test-repo", masterNode); + + logger.info("--> wait for cleanup to finish and disappear from cluster state"); + assertBusy(() -> { + RepositoryCleanupInProgress cleanupInProgress = + client().admin().cluster().prepareState().get().getState().custom(RepositoryCleanupInProgress.TYPE); + assertFalse(cleanupInProgress.hasCleanupInProgress()); + }, 30, TimeUnit.SECONDS); + } + + private String startBlockedCleanup(String repoName) throws Exception { + logger.info("--> starting two master nodes and one data node"); + internalCluster().startMasterOnlyNodes(2); + internalCluster().startDataOnlyNodes(1); + + logger.info("--> creating repository"); + assertAcked(client().admin().cluster().preparePutRepository(repoName) + .setType("mock").setSettings(Settings.builder() + .put("location", randomRepoPath()) + .put("compress", randomBoolean()) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES))); + + logger.info("--> snapshot"); + client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap") + .setWaitForCompletion(true).get(); + + final RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName()); + final BlobStoreRepository repository = (BlobStoreRepository) service.repository(repoName); + + logger.info("--> creating a garbage data blob"); + final PlainActionFuture garbageFuture = PlainActionFuture.newFuture(); + repository.threadPool().generic().execute(ActionRunnable.run(garbageFuture, () -> repository.blobStore() + .blobContainer(repository.basePath()).writeBlob("snap-foo.dat", new ByteArrayInputStream(new byte[1]), 1, true))); + garbageFuture.get(); + + final String masterNode = blockMasterFromFinalizingSnapshotOnIndexFile(repoName); + + logger.info("--> starting repository cleanup"); + client().admin().cluster().prepareCleanupRepository(repoName).execute(); + + logger.info("--> waiting for block to kick in on " + masterNode); + waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(60)); + return masterNode; + } +} diff --git a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java index f600f13dc16..d573a7a17e3 100644 --- a/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java +++ b/x-pack/plugin/ilm/src/main/java/org/elasticsearch/xpack/slm/SnapshotRetentionTask.java @@ -458,7 +458,7 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener { // Cannot delete while a repository is being cleaned final RepositoryCleanupInProgress repositoryCleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE); - if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.cleanupInProgress() == false) { + if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) { return false; }