Cleanup Old index-N Blobs in Repository Cleanup (#49862) (#49902)

* Cleanup Old index-N Blobs in Repository Cleanup

Repository cleanup didn't deal with old index-N, this change adds
cleaning up all old index-N found in the repository.
This commit is contained in:
Armin Braun 2019-12-09 12:05:55 +01:00 committed by GitHub
parent e4f838e764
commit 62e128f02d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 43 additions and 0 deletions

View File

@ -736,6 +736,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
return false; return false;
} }
return allSnapshotIds.contains(foundUUID) == false; return allSnapshotIds.contains(foundUUID) == false;
} else if (blob.startsWith(INDEX_FILE_PREFIX)) {
// TODO: Include the current generation here once we remove keeping index-(N-1) around from #writeIndexGen
return repositoryData.getGenId() > Long.parseLong(blob.substring(INDEX_FILE_PREFIX.length()));
} }
return false; return false;
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.repositories.blobstore; package org.elasticsearch.repositories.blobstore;
import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.RepositoryCleanupInProgress; import org.elasticsearch.cluster.RepositoryCleanupInProgress;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -26,13 +27,16 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase; import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertThrows;
import static org.hamcrest.Matchers.is;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase { public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase {
@ -107,4 +111,40 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(60)); waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(60));
return masterNode; return masterNode;
} }
public void testCleanupOldIndexN() throws ExecutionException, InterruptedException {
internalCluster().startNodes(Settings.EMPTY);
final String repoName = "test-repo";
logger.info("--> creating repository");
assertAcked(client().admin().cluster().preparePutRepository(repoName).setType("fs").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
logger.info("--> create three snapshots");
for (int i = 0; i < 3; ++i) {
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap-" + i)
.setWaitForCompletion(true).get();
assertThat(createSnapshotResponse.getSnapshotInfo().state(), is(SnapshotState.SUCCESS));
}
final RepositoriesService service = internalCluster().getInstance(RepositoriesService.class, internalCluster().getMasterName());
final BlobStoreRepository repository = (BlobStoreRepository) service.repository(repoName);
logger.info("--> write two outdated index-N blobs");
for (int i = 0; i < 2; ++i) {
final PlainActionFuture<Void> createOldIndexNFuture = PlainActionFuture.newFuture();
final int generation = i;
repository.threadPool().generic().execute(ActionRunnable.run(createOldIndexNFuture, () -> repository.blobStore()
.blobContainer(repository.basePath()).writeBlob(BlobStoreRepository.INDEX_FILE_PREFIX + generation,
new ByteArrayInputStream(new byte[1]), 1, true)));
createOldIndexNFuture.get();
}
logger.info("--> cleanup repository");
client().admin().cluster().prepareCleanupRepository(repoName).get();
BlobStoreTestUtil.assertConsistency(repository, repository.threadPool().generic());
}
} }