Fix snapshot deletion task getting stuck in the event of exceptions (#629)

Changes the behavior of the recursive deletion function `executeOneStaleIndexDelete()` stop
condition to be when the queue of `staleIndicesToDelete` is empty -- also in the error flow.
Otherwise the GroupedActionListener never responds and in the event of a few exceptions the
deletion task gets stuck.
Alters the test case to fail to delete in bulk many snapshots at the first attempt, and then
the next successful deletion also takes care of the previously failed attempt as the test
originally intended.
SNAPSHOT threadpool is at most 5. So in the event we get more than 5 exceptions there are no
more threads to handle the deletion task and there is still one more snapshot to delete in the
queue. Thus, in the test I made the number of extra snapshots be one more than the max in the
SNAPSHOT threadpool.

Signed-off-by: AmiStrn <amitai.stern@logz.io>
This commit is contained in:
amitai stern 2021-05-04 16:44:15 +03:00 committed by GitHub
parent b4dad22348
commit ca9fb6d302
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 28 deletions

View File

@ -48,6 +48,7 @@ import org.opensearch.repositories.RepositoryException;
import org.opensearch.repositories.RepositoryVerificationException;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.threadpool.ThreadPool;
import java.nio.file.Path;
import java.util.List;
@ -130,35 +131,42 @@ public class RepositoriesIT extends AbstractSnapshotIntegTestCase {
Client client = client();
Path repositoryPath = randomRepoPath();
final String repositoryName = "test-repo";
final String snapshot1Name = "test-snap-1";
final String snapshot2Name = "test-snap-2";
final String snapshotToBeDeletedLastName = "test-snapshot-to-be-deleted-last";
final String bulkSnapshotsPattern = "test-snap-*";
logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath());
createRepository(repositoryName, "mock", repositoryPath);
int numberOfFiles = numberOfFiles(repositoryPath);
logger.info("--> creating index-1 and ingest data");
createIndex("test-idx-1");
logger.info("--> creating index-0 and ingest data");
createIndex("test-idx-0");
ensureGreen();
for (int j = 0; j < 10; j++) {
index("test-idx-1", "_doc", Integer.toString( 10 + j), "foo", "bar" + 10 + j);
index("test-idx-0", "_doc", Integer.toString( 10 + j), "foo", "bar" + 10 + j);
}
refresh();
logger.info("--> creating first snapshot");
createFullSnapshot(repositoryName, snapshot1Name);
createFullSnapshot(repositoryName, snapshotToBeDeletedLastName);
logger.info("--> creating index-2 and ingest data");
createIndex("test-idx-2");
ensureGreen();
for (int j = 0; j < 10; j++) {
index("test-idx-2", "_doc", Integer.toString( 10 + j), "foo", "bar" + 10 + j);
// Create more snapshots to be deleted in bulk
int maxThreadsForSnapshotDeletion = internalCluster().getMasterNodeInstance(ThreadPool.class)
.info(ThreadPool.Names.SNAPSHOT).getMax();
for (int i = 1; i<= maxThreadsForSnapshotDeletion + 1; i++) {
String snapshotName = "test-snap-" + i;
String testIndexName = "test-idx-" + i;
logger.info("--> creating index-" + i + " and ingest data");
createIndex(testIndexName);
ensureGreen();
for (int j = 0; j < 10; j++) {
index(testIndexName, "_doc", Integer.toString( 10 + j), "foo", "bar" + 10 + j);
}
refresh();
logger.info("--> creating snapshot: {}", snapshotName);
createFullSnapshot(repositoryName, snapshotName);
}
refresh();
logger.info("--> creating second snapshot");
createFullSnapshot(repositoryName, snapshot2Name);
// Make repository to throw exception when trying to delete stale indices
// This will make sure stale indices stays in repository after snapshot delete
@ -166,16 +174,16 @@ public class RepositoriesIT extends AbstractSnapshotIntegTestCase {
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterNode).repository("test-repo")).
setThrowExceptionWhileDelete(true);
logger.info("--> delete the second snapshot");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot2Name).get();
logger.info("--> delete the bulk of the snapshots");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, bulkSnapshotsPattern).get();
// Make repository to work normally
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterNode).repository("test-repo")).
setThrowExceptionWhileDelete(false);
// This snapshot should delete last snapshot's residual stale indices as well
logger.info("--> delete snapshot one");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot1Name).get();
logger.info("--> delete first snapshot");
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshotToBeDeletedLastName).get();
logger.info("--> make sure that number of files is back to what it was when the first snapshot was made");
assertFileCount(repositoryPath, numberOfFiles + 2);

View File

@ -1088,27 +1088,26 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private void executeOneStaleIndexDelete(BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete,
GroupedActionListener<DeleteResult> listener) throws InterruptedException {
Map.Entry<String, BlobContainer> indexEntry = staleIndicesToDelete.poll(0L, TimeUnit.MILLISECONDS);
if (indexEntry == null) {
return;
} else {
Map.Entry<String, BlobContainer> indexEntry = staleIndicesToDelete.poll(0L, TimeUnit.MILLISECONDS);
if (indexEntry != null) {
final String indexSnId = indexEntry.getKey();
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> {
DeleteResult deleteResult = DeleteResult.ZERO;
try {
DeleteResult staleIndexDeleteResult = indexEntry.getValue().delete();
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
deleteResult = indexEntry.getValue().delete();
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
executeOneStaleIndexDelete(staleIndicesToDelete, listener);
return staleIndexDeleteResult;
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage(
"[{}] index {} is no longer part of any snapshots in the repository, " +
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
return DeleteResult.ZERO;
} catch (Exception e) {
assert false : e;
logger.warn(new ParameterizedMessage("[{}] Exception during single stale index delete", metadata.name()), e);
return DeleteResult.ZERO;
}
executeOneStaleIndexDelete(staleIndicesToDelete, listener);
return deleteResult;
}));
}
}