mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-17 02:14:54 +00:00
Speedup snapshot stale indices delete (#613)
Instead of snapshot delete of stale indices being a single threaded operation this commit makes it a multithreaded operation and delete multiple stale indices in parallel using SNAPSHOT threadpool's workers. Signed-off-by: Piyush Daftary <piyush.besu@gmail.com>
This commit is contained in:
parent
0e9f74e35f
commit
7ccb714045
@ -43,8 +43,10 @@ import org.opensearch.cluster.metadata.RepositoryMetadata;
|
||||
import org.opensearch.common.io.FileSystemUtils;
|
||||
import org.opensearch.common.settings.Settings;
|
||||
import org.opensearch.common.unit.ByteSizeUnit;
|
||||
import org.opensearch.repositories.RepositoriesService;
|
||||
import org.opensearch.repositories.RepositoryException;
|
||||
import org.opensearch.repositories.RepositoryVerificationException;
|
||||
import org.opensearch.snapshots.mockstore.MockRepository;
|
||||
import org.opensearch.test.OpenSearchIntegTestCase;
|
||||
|
||||
import java.nio.file.Path;
|
||||
@ -124,6 +126,63 @@ public class RepositoriesIT extends AbstractSnapshotIntegTestCase {
|
||||
assertThat(repositoriesResponse.repositories().size(), equalTo(0));
|
||||
}
|
||||
|
||||
public void testResidualStaleIndicesAreDeletedByConsecutiveDelete() throws Exception {
|
||||
Client client = client();
|
||||
Path repositoryPath = randomRepoPath();
|
||||
final String repositoryName = "test-repo";
|
||||
final String snapshot1Name = "test-snap-1";
|
||||
final String snapshot2Name = "test-snap-2";
|
||||
|
||||
logger.info("--> creating repository at {}", repositoryPath.toAbsolutePath());
|
||||
createRepository(repositoryName, "mock", repositoryPath);
|
||||
|
||||
int numberOfFiles = numberOfFiles(repositoryPath);
|
||||
|
||||
logger.info("--> creating index-1 and ingest data");
|
||||
createIndex("test-idx-1");
|
||||
ensureGreen();
|
||||
for (int j = 0; j < 10; j++) {
|
||||
index("test-idx-1", "_doc", Integer.toString( 10 + j), "foo", "bar" + 10 + j);
|
||||
}
|
||||
refresh();
|
||||
|
||||
logger.info("--> creating first snapshot");
|
||||
createFullSnapshot(repositoryName, snapshot1Name);
|
||||
|
||||
logger.info("--> creating index-2 and ingest data");
|
||||
createIndex("test-idx-2");
|
||||
ensureGreen();
|
||||
for (int j = 0; j < 10; j++) {
|
||||
index("test-idx-2", "_doc", Integer.toString( 10 + j), "foo", "bar" + 10 + j);
|
||||
}
|
||||
refresh();
|
||||
|
||||
logger.info("--> creating second snapshot");
|
||||
createFullSnapshot(repositoryName, snapshot2Name);
|
||||
|
||||
// Make repository to throw exception when trying to delete stale indices
|
||||
// This will make sure stale indices stays in repository after snapshot delete
|
||||
String masterNode = internalCluster().getMasterName();
|
||||
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterNode).repository("test-repo")).
|
||||
setThrowExceptionWhileDelete(true);
|
||||
|
||||
logger.info("--> delete the second snapshot");
|
||||
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot2Name).get();
|
||||
|
||||
// Make repository to work normally
|
||||
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterNode).repository("test-repo")).
|
||||
setThrowExceptionWhileDelete(false);
|
||||
|
||||
// This snapshot should delete last snapshot's residual stale indices as well
|
||||
logger.info("--> delete snapshot one");
|
||||
client.admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot1Name).get();
|
||||
|
||||
logger.info("--> make sure that number of files is back to what it was when the first snapshot was made");
|
||||
assertFileCount(repositoryPath, numberOfFiles + 2);
|
||||
|
||||
logger.info("--> done");
|
||||
}
|
||||
|
||||
private RepositoryMetadata findRepository(List<RepositoryMetadata> repositories, String name) {
|
||||
for (RepositoryMetadata repository : repositories) {
|
||||
if (repository.name().equals(name)) {
|
||||
|
@ -945,7 +945,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||
if (foundIndices.keySet().equals(survivingIndexIds)) {
|
||||
groupedListener.onResponse(DeleteResult.ZERO);
|
||||
} else {
|
||||
executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds)));
|
||||
cleanupStaleIndices(foundIndices, survivingIndexIds, groupedListener);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1053,23 +1053,30 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
private DeleteResult cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Set<String> survivingIndexIds) {
|
||||
DeleteResult deleteResult = DeleteResult.ZERO;
|
||||
private void cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Set<String> survivingIndexIds,
|
||||
GroupedActionListener<DeleteResult> listener) {
|
||||
final GroupedActionListener<DeleteResult> groupedListener = new GroupedActionListener<>(ActionListener.wrap(deleteResults -> {
|
||||
DeleteResult deleteResult = DeleteResult.ZERO;
|
||||
for (DeleteResult result : deleteResults) {
|
||||
deleteResult = deleteResult.add(result);
|
||||
}
|
||||
listener.onResponse(deleteResult);
|
||||
}, listener::onFailure), foundIndices.size() - survivingIndexIds.size());
|
||||
|
||||
try {
|
||||
final BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete = new LinkedBlockingQueue<>();
|
||||
for (Map.Entry<String, BlobContainer> indexEntry : foundIndices.entrySet()) {
|
||||
final String indexSnId = indexEntry.getKey();
|
||||
try {
|
||||
if (survivingIndexIds.contains(indexSnId) == false) {
|
||||
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
|
||||
deleteResult = deleteResult.add(indexEntry.getValue().delete());
|
||||
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.warn(() -> new ParameterizedMessage(
|
||||
"[{}] index {} is no longer part of any snapshots in the repository, " +
|
||||
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
|
||||
if (survivingIndexIds.contains(indexEntry.getKey()) == false) {
|
||||
staleIndicesToDelete.put(indexEntry);
|
||||
}
|
||||
}
|
||||
|
||||
// Start as many workers as fit into the snapshot pool at once at the most
|
||||
final int workers = Math.min(threadPool.info(ThreadPool.Names.SNAPSHOT).getMax(),
|
||||
foundIndices.size() - survivingIndexIds.size());
|
||||
for (int i = 0; i < workers; ++i) {
|
||||
executeOneStaleIndexDelete(staleIndicesToDelete, groupedListener);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
// TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream.
|
||||
// Currently this catch exists as a stop gap solution to tackle unexpected runtime exceptions from implementations
|
||||
@ -1077,7 +1084,33 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||
assert false : e;
|
||||
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale indices", metadata.name()), e);
|
||||
}
|
||||
return deleteResult;
|
||||
}
|
||||
|
||||
private void executeOneStaleIndexDelete(BlockingQueue<Map.Entry<String, BlobContainer>> staleIndicesToDelete,
|
||||
GroupedActionListener<DeleteResult> listener) throws InterruptedException {
|
||||
Map.Entry<String, BlobContainer> indexEntry = staleIndicesToDelete.poll(0L, TimeUnit.MILLISECONDS);
|
||||
if (indexEntry == null) {
|
||||
return;
|
||||
} else {
|
||||
final String indexSnId = indexEntry.getKey();
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> {
|
||||
try {
|
||||
DeleteResult staleIndexDeleteResult = indexEntry.getValue().delete();
|
||||
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
|
||||
executeOneStaleIndexDelete(staleIndicesToDelete, listener);
|
||||
return staleIndexDeleteResult;
|
||||
} catch (IOException e) {
|
||||
logger.warn(() -> new ParameterizedMessage(
|
||||
"[{}] index {} is no longer part of any snapshots in the repository, " +
|
||||
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
|
||||
return DeleteResult.ZERO;
|
||||
} catch (Exception e) {
|
||||
assert false : e;
|
||||
logger.warn(new ParameterizedMessage("[{}] Exception during single stale index delete", metadata.name()), e);
|
||||
return DeleteResult.ZERO;
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -154,6 +154,7 @@ public class MockRepository extends FsRepository {
|
||||
private volatile boolean throwReadErrorAfterUnblock = false;
|
||||
|
||||
private volatile boolean blocked = false;
|
||||
private volatile boolean setThrowExceptionWhileDelete;
|
||||
|
||||
public MockRepository(RepositoryMetadata metadata, Environment environment,
|
||||
NamedXContentRegistry namedXContentRegistry, ClusterService clusterService,
|
||||
@ -257,6 +258,10 @@ public class MockRepository extends FsRepository {
|
||||
this.failReadsAfterUnblock = failReadsAfterUnblock;
|
||||
}
|
||||
|
||||
public void setThrowExceptionWhileDelete(boolean throwError) {
|
||||
setThrowExceptionWhileDelete = throwError;
|
||||
}
|
||||
|
||||
public boolean blocked() {
|
||||
return blocked;
|
||||
}
|
||||
@ -425,6 +430,9 @@ public class MockRepository extends FsRepository {
|
||||
@Override
|
||||
public DeleteResult delete() throws IOException {
|
||||
DeleteResult deleteResult = DeleteResult.ZERO;
|
||||
if (setThrowExceptionWhileDelete) {
|
||||
throw new IOException("Random exception");
|
||||
}
|
||||
for (BlobContainer child : children().values()) {
|
||||
deleteResult = deleteResult.add(child.delete());
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user