From f4f5374592828292d0278dc2584ca270e5a3a18d Mon Sep 17 00:00:00 2001 From: amitai stern Date: Thu, 13 May 2021 17:02:57 +0300 Subject: [PATCH] Fix snapshot deletion task getting stuck in the event of exceptions (#629) (#650) Changes the behavior of the recursive deletion function `executeOneStaleIndexDelete()` stop condition to be when the queue of `staleIndicesToDelete` is empty -- also in the error flow. Otherwise the GroupedActionListener never responds and in the event of a few exceptions the deletion task gets stuck. Alters the test case to fail to delete in bulk many snapshots at the first attempt, and then the next successful deletion also takes care of the previously failed attempt as the test originally intended. SNAPSHOT threadpool is at most 5. So in the event we get more than 5 exceptions there are no more threads to handle the deletion task and there is still one more snapshot to delete in the queue. Thus, in the test I made the number of extra snapshots be one more than the max in the SNAPSHOT threadpool. Signed-off-by: AmiStrn --- .../opensearch/snapshots/RepositoriesIT.java | 46 +++++++++++-------- .../blobstore/BlobStoreRepository.java | 17 ++++--- 2 files changed, 35 insertions(+), 28 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoriesIT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoriesIT.java index 39fdf77d1ba..0226a1408a3 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoriesIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/RepositoriesIT.java @@ -48,6 +48,7 @@ import org.opensearch.repositories.RepositoryException; import org.opensearch.repositories.RepositoryVerificationException; import org.opensearch.snapshots.mockstore.MockRepository; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.threadpool.ThreadPool; import java.nio.file.Path; import java.util.List; @@ -130,35 +131,42 @@ public class RepositoriesIT extends AbstractSnapshotIntegTestCase { Client client = client(); Path repositoryPath = randomRepoPath(); final String repositoryName = "test-repo"; - final String snapshot1Name = "test-snap-1"; - final String snapshot2Name = "test-snap-2"; + final String snapshotToBeDeletedLastName = "test-snapshot-to-be-deleted-last"; + final String bulkSnapshotsPattern = "test-snap-*"; logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath()); createRepository(repositoryName, "mock", repositoryPath); int numberOfFiles = numberOfFiles(repositoryPath); - logger.info("--> creating index-1 and ingest data"); - createIndex("test-idx-1"); + logger.info("--> creating index-0 and ingest data"); + createIndex("test-idx-0"); ensureGreen(); for (int j = 0; j < 10; j++) { - index("test-idx-1", "_doc", Integer.toString( 10 + j), "foo", "bar" + 10 + j); + index("test-idx-0", "_doc", Integer.toString( 10 + j), "foo", "bar" + 10 + j); } refresh(); logger.info("--> creating first snapshot"); - createFullSnapshot(repositoryName, snapshot1Name); + createFullSnapshot(repositoryName, snapshotToBeDeletedLastName); - logger.info("--> creating index-2 and ingest data"); - createIndex("test-idx-2"); - ensureGreen(); - for (int j = 0; j < 10; j++) { - index("test-idx-2", "_doc", Integer.toString( 10 + j), "foo", "bar" + 10 + j); + // Create more snapshots to be deleted in bulk + int maxThreadsForSnapshotDeletion = internalCluster().getMasterNodeInstance(ThreadPool.class) + .info(ThreadPool.Names.SNAPSHOT).getMax(); + for (int i = 1; i<= maxThreadsForSnapshotDeletion + 1; i++) { + String snapshotName = "test-snap-" + i; + String testIndexName = "test-idx-" + i; + logger.info("--> creating index-" + i + " and ingest data"); + createIndex(testIndexName); + ensureGreen(); + for (int j = 0; j < 10; j++) { + index(testIndexName, "_doc", Integer.toString( 10 + j), "foo", "bar" + 10 + j); + } + refresh(); + + logger.info("--> creating snapshot: {}", snapshotName); + createFullSnapshot(repositoryName, snapshotName); } - refresh(); - - logger.info("--> creating second snapshot"); - createFullSnapshot(repositoryName, snapshot2Name); // Make repository to throw exception when trying to delete stale indices // This will make sure stale indices stays in repository after snapshot delete @@ -166,16 +174,16 @@ public class RepositoriesIT extends AbstractSnapshotIntegTestCase { ((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterNode).repository("test-repo")). setThrowExceptionWhileDelete(true); - logger.info("--> delete the second snapshot"); - client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot2Name).get(); + logger.info("--> delete the bulk of the snapshots"); + client.admin().cluster().prepareDeleteSnapshot(repositoryName, bulkSnapshotsPattern).get(); // Make repository to work normally ((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterNode).repository("test-repo")). setThrowExceptionWhileDelete(false); // This snapshot should delete last snapshot's residual stale indices as well - logger.info("--> delete snapshot one"); - client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot1Name).get(); + logger.info("--> delete first snapshot"); + client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshotToBeDeletedLastName).get(); logger.info("--> make sure that number of files is back to what it was when the first snapshot was made"); assertFileCount(repositoryPath, numberOfFiles + 2); diff --git a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java index 18162b103eb..275e9aab873 100644 --- a/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java @@ -1088,27 +1088,26 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private void executeOneStaleIndexDelete(BlockingQueue> staleIndicesToDelete, GroupedActionListener listener) throws InterruptedException { - Map.Entry indexEntry = staleIndicesToDelete.poll(0L, TimeUnit.MILLISECONDS); - if (indexEntry == null) { - return; - } else { + Map.Entry indexEntry = staleIndicesToDelete.poll(0L, TimeUnit.MILLISECONDS); + if (indexEntry != null) { final String indexSnId = indexEntry.getKey(); threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> { + DeleteResult deleteResult = DeleteResult.ZERO; try { - DeleteResult staleIndexDeleteResult = indexEntry.getValue().delete(); + logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId); + deleteResult = indexEntry.getValue().delete(); logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); - executeOneStaleIndexDelete(staleIndicesToDelete, listener); - return staleIndexDeleteResult; } catch (IOException e) { logger.warn(() -> new ParameterizedMessage( "[{}] index {} is no longer part of any snapshots in the repository, " + "but failed to clean up their index folders", metadata.name(), indexSnId), e); - return DeleteResult.ZERO; } catch (Exception e) { assert false : e; logger.warn(new ParameterizedMessage("[{}] Exception during single stale index delete", metadata.name()), e); - return DeleteResult.ZERO; } + + executeOneStaleIndexDelete(staleIndicesToDelete, listener); + return deleteResult; })); } }