Changes the behavior of the recursive deletion function `executeOneStaleIndexDelete()` stop condition to be when the queue of `staleIndicesToDelete` is empty -- also in the error flow. Otherwise the GroupedActionListener never responds and in the event of a few exceptions the deletion task gets stuck. Alters the test case to fail to delete in bulk many snapshots at the first attempt, and then the next successful deletion also takes care of the previously failed attempt as the test originally intended. SNAPSHOT threadpool is at most 5. So in the event we get more than 5 exceptions there are no more threads to handle the deletion task and there is still one more snapshot to delete in the queue. Thus, in the test I made the number of extra snapshots be one more than the max in the SNAPSHOT threadpool. Signed-off-by: AmiStrn <amitai.stern@logz.io>
This commit is contained in:
parent
b1b563dc39
commit
f4f5374592
|
@ -48,6 +48,7 @@ import org.opensearch.repositories.RepositoryException;
|
|||
import org.opensearch.repositories.RepositoryVerificationException;
|
||||
import org.opensearch.snapshots.mockstore.MockRepository;
|
||||
import org.opensearch.test.OpenSearchIntegTestCase;
|
||||
import org.opensearch.threadpool.ThreadPool;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
|
@ -130,35 +131,42 @@ public class RepositoriesIT extends AbstractSnapshotIntegTestCase {
|
|||
Client client = client();
|
||||
Path repositoryPath = randomRepoPath();
|
||||
final String repositoryName = "test-repo";
|
||||
final String snapshot1Name = "test-snap-1";
|
||||
final String snapshot2Name = "test-snap-2";
|
||||
final String snapshotToBeDeletedLastName = "test-snapshot-to-be-deleted-last";
|
||||
final String bulkSnapshotsPattern = "test-snap-*";
|
||||
|
||||
logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath());
|
||||
createRepository(repositoryName, "mock", repositoryPath);
|
||||
|
||||
int numberOfFiles = numberOfFiles(repositoryPath);
|
||||
|
||||
logger.info("--> creating index-1 and ingest data");
|
||||
createIndex("test-idx-1");
|
||||
logger.info("--> creating index-0 and ingest data");
|
||||
createIndex("test-idx-0");
|
||||
ensureGreen();
|
||||
for (int j = 0; j < 10; j++) {
|
||||
index("test-idx-1", "_doc", Integer.toString( 10 + j), "foo", "bar" + 10 + j);
|
||||
index("test-idx-0", "_doc", Integer.toString( 10 + j), "foo", "bar" + 10 + j);
|
||||
}
|
||||
refresh();
|
||||
|
||||
logger.info("--> creating first snapshot");
|
||||
createFullSnapshot(repositoryName, snapshot1Name);
|
||||
createFullSnapshot(repositoryName, snapshotToBeDeletedLastName);
|
||||
|
||||
logger.info("--> creating index-2 and ingest data");
|
||||
createIndex("test-idx-2");
|
||||
ensureGreen();
|
||||
for (int j = 0; j < 10; j++) {
|
||||
index("test-idx-2", "_doc", Integer.toString( 10 + j), "foo", "bar" + 10 + j);
|
||||
// Create more snapshots to be deleted in bulk
|
||||
int maxThreadsForSnapshotDeletion = internalCluster().getMasterNodeInstance(ThreadPool.class)
|
||||
.info(ThreadPool.Names.SNAPSHOT).getMax();
|
||||
for (int i = 1; i<= maxThreadsForSnapshotDeletion + 1; i++) {
|
||||
String snapshotName = "test-snap-" + i;
|
||||
String testIndexName = "test-idx-" + i;
|
||||
logger.info("--> creating index-" + i + " and ingest data");
|
||||
createIndex(testIndexName);
|
||||
ensureGreen();
|
||||
for (int j = 0; j < 10; j++) {
|
||||
index(testIndexName, "_doc", Integer.toString( 10 + j), "foo", "bar" + 10 + j);
|
||||
}
|
||||
refresh();
|
||||
|
||||
logger.info("--> creating snapshot: {}", snapshotName);
|
||||
createFullSnapshot(repositoryName, snapshotName);
|
||||
}
|
||||
refresh();
|
||||
|
||||
logger.info("--> creating second snapshot");
|
||||
createFullSnapshot(repositoryName, snapshot2Name);
|
||||
|
||||
// Make repository to throw exception when trying to delete stale indices
|
||||
// This will make sure stale indices stays in repository after snapshot delete
|
||||
|
@ -166,16 +174,16 @@ public class RepositoriesIT extends AbstractSnapshotIntegTestCase {
|
|||
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterNode).repository("test-repo")).
|
||||
setThrowExceptionWhileDelete(true);
|
||||
|
||||
logger.info("--> delete the second snapshot");
|
||||
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot2Name).get();
|
||||
logger.info("--> delete the bulk of the snapshots");
|
||||
client.admin().cluster().prepareDeleteSnapshot(repositoryName, bulkSnapshotsPattern).get();
|
||||
|
||||
// Make repository to work normally
|
||||
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterNode).repository("test-repo")).
|
||||
setThrowExceptionWhileDelete(false);
|
||||
|
||||
// This snapshot should delete last snapshot's residual stale indices as well
|
||||
logger.info("--> delete snapshot one");
|
||||
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot1Name).get();
|
||||
logger.info("--> delete first snapshot");
|
||||
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshotToBeDeletedLastName).get();
|
||||
|
||||
logger.info("--> make sure that number of files is back to what it was when the first snapshot was made");
|
||||
assertFileCount(repositoryPath, numberOfFiles + 2);
|
||||
|
|
|
@ -1088,27 +1088,26 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
|
||||
private void executeOneStaleIndexDelete(BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete,
|
||||
GroupedActionListener<DeleteResult> listener) throws InterruptedException {
|
||||
Map.Entry<String, BlobContainer> indexEntry = staleIndicesToDelete.poll(0L, TimeUnit.MILLISECONDS);
|
||||
if (indexEntry == null) {
|
||||
return;
|
||||
} else {
|
||||
Map.Entry<String, BlobContainer> indexEntry = staleIndicesToDelete.poll(0L, TimeUnit.MILLISECONDS);
|
||||
if (indexEntry != null) {
|
||||
final String indexSnId = indexEntry.getKey();
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> {
|
||||
DeleteResult deleteResult = DeleteResult.ZERO;
|
||||
try {
|
||||
DeleteResult staleIndexDeleteResult = indexEntry.getValue().delete();
|
||||
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
|
||||
deleteResult = indexEntry.getValue().delete();
|
||||
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
|
||||
executeOneStaleIndexDelete(staleIndicesToDelete, listener);
|
||||
return staleIndexDeleteResult;
|
||||
} catch (IOException e) {
|
||||
logger.warn(() -> new ParameterizedMessage(
|
||||
"[{}] index {} is no longer part of any snapshots in the repository, " +
|
||||
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
|
||||
return DeleteResult.ZERO;
|
||||
} catch (Exception e) {
|
||||
assert false : e;
|
||||
logger.warn(new ParameterizedMessage("[{}] Exception during single stale index delete", metadata.name()), e);
|
||||
return DeleteResult.ZERO;
|
||||
}
|
||||
|
||||
executeOneStaleIndexDelete(staleIndicesToDelete, listener);
|
||||
return deleteResult;
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue