* Use ability to list child "folders" in the blob store to implement recursive delete on all stale index folders when cleaning up instead of using the diff between two `RepositoryData` instances to cover aborted deletes * Runs after ever delete operation * Relates #13159 (fixing most of this issues caused by unreferenced indices, leaving some meta files to be cleaned up only)
This commit is contained in:
parent
247f2dabad
commit
af9b98e81c
|
@ -27,9 +27,13 @@ import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
|
import org.elasticsearch.repositories.RepositoriesService;
|
||||||
import org.elasticsearch.repositories.RepositoryException;
|
import org.elasticsearch.repositories.RepositoryException;
|
||||||
|
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||||
|
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
|
||||||
import org.elasticsearch.snapshots.SnapshotState;
|
import org.elasticsearch.snapshots.SnapshotState;
|
||||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
|
@ -145,6 +149,9 @@ public class HdfsTests extends ESSingleNodeTestCase {
|
||||||
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
|
ClusterState clusterState = client.admin().cluster().prepareState().get().getState();
|
||||||
assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true));
|
assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true));
|
||||||
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false));
|
assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false));
|
||||||
|
final BlobStoreRepository repo =
|
||||||
|
(BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo");
|
||||||
|
BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testMissingUri() {
|
public void testMissingUri() {
|
||||||
|
|
|
@ -26,10 +26,12 @@ import org.elasticsearch.common.settings.SecureSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
|
import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase;
|
||||||
|
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||||
import org.elasticsearch.test.StreamsUtils;
|
import org.elasticsearch.test.StreamsUtils;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
@ -76,6 +78,20 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTes
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
protected boolean assertCorruptionVisible(BlobStoreRepository repo, Executor genericExec) throws Exception {
|
||||||
|
// S3 is only eventually consistent for the list operations used by this assertions so we retry for 10 minutes assuming that
|
||||||
|
// listing operations will become consistent within these 10 minutes.
|
||||||
|
assertBusy(() -> assertTrue(super.assertCorruptionVisible(repo, genericExec)), 10L, TimeUnit.MINUTES);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void assertConsistentRepository(BlobStoreRepository repo, Executor executor) throws Exception {
|
||||||
|
// S3 is only eventually consistent for the list operations used by this assertions so we retry for 10 minutes assuming that
|
||||||
|
// listing operations will become consistent within these 10 minutes.
|
||||||
|
assertBusy(() -> super.assertConsistentRepository(repo, executor), 10L, TimeUnit.MINUTES);
|
||||||
|
}
|
||||||
|
|
||||||
protected void assertBlobsByPrefix(BlobPath path, String prefix, Map<String, BlobMetaData> blobs) throws Exception {
|
protected void assertBlobsByPrefix(BlobPath path, String prefix, Map<String, BlobMetaData> blobs) throws Exception {
|
||||||
// AWS S3 is eventually consistent so we retry for 10 minutes assuming a list operation will never take longer than that
|
// AWS S3 is eventually consistent so we retry for 10 minutes assuming a list operation will never take longer than that
|
||||||
// to become consistent.
|
// to become consistent.
|
||||||
|
|
|
@ -57,7 +57,6 @@ import org.elasticsearch.common.metrics.CounterMetric;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.util.set.Sets;
|
|
||||||
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
|
@ -419,46 +418,68 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
logger.warn(() -> new ParameterizedMessage("cannot read snapshot file [{}]", snapshotId), ex);
|
logger.warn(() -> new ParameterizedMessage("cannot read snapshot file [{}]", snapshotId), ex);
|
||||||
}
|
}
|
||||||
// Delete snapshot from the index file, since it is the maintainer of truth of active snapshots
|
// Delete snapshot from the index file, since it is the maintainer of truth of active snapshots
|
||||||
final RepositoryData repositoryData;
|
|
||||||
final RepositoryData updatedRepositoryData;
|
final RepositoryData updatedRepositoryData;
|
||||||
|
final Map<String, BlobContainer> foundIndices;
|
||||||
try {
|
try {
|
||||||
repositoryData = getRepositoryData();
|
final RepositoryData repositoryData = getRepositoryData();
|
||||||
updatedRepositoryData = repositoryData.removeSnapshot(snapshotId);
|
updatedRepositoryData = repositoryData.removeSnapshot(snapshotId);
|
||||||
|
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
|
||||||
|
// delete an index that was created by another master node after writing this index-N blob.
|
||||||
|
foundIndices = blobStore().blobContainer(basePath().add("indices")).children();
|
||||||
writeIndexGen(updatedRepositoryData, repositoryStateId);
|
writeIndexGen(updatedRepositoryData, repositoryStateId);
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex));
|
listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
final SnapshotInfo finalSnapshotInfo = snapshot;
|
final SnapshotInfo finalSnapshotInfo = snapshot;
|
||||||
final Collection<IndexId> unreferencedIndices = Sets.newHashSet(repositoryData.getIndices().values());
|
|
||||||
unreferencedIndices.removeAll(updatedRepositoryData.getIndices().values());
|
|
||||||
try {
|
try {
|
||||||
blobContainer().deleteBlobsIgnoringIfNotExists(
|
blobContainer().deleteBlobsIgnoringIfNotExists(
|
||||||
Arrays.asList(snapshotFormat.blobName(snapshotId.getUUID()), globalMetaDataFormat.blobName(snapshotId.getUUID())));
|
Arrays.asList(snapshotFormat.blobName(snapshotId.getUUID()), globalMetaDataFormat.blobName(snapshotId.getUUID())));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete global metadata files", snapshotId), e);
|
logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete global metadata files", snapshotId), e);
|
||||||
}
|
}
|
||||||
|
final Map<String, IndexId> survivingIndices = updatedRepositoryData.getIndices();
|
||||||
deleteIndices(
|
deleteIndices(
|
||||||
Optional.ofNullable(finalSnapshotInfo)
|
Optional.ofNullable(finalSnapshotInfo)
|
||||||
.map(info -> info.indices().stream().map(repositoryData::resolveIndexId).collect(Collectors.toList()))
|
.map(info -> info.indices().stream().filter(survivingIndices::containsKey)
|
||||||
|
.map(updatedRepositoryData::resolveIndexId).collect(Collectors.toList()))
|
||||||
.orElse(Collections.emptyList()),
|
.orElse(Collections.emptyList()),
|
||||||
snapshotId,
|
snapshotId,
|
||||||
ActionListener.map(listener, v -> {
|
ActionListener.map(listener, v -> {
|
||||||
try {
|
cleanupStaleIndices(foundIndices, survivingIndices);
|
||||||
blobStore().blobContainer(indicesPath()).deleteBlobsIgnoringIfNotExists(
|
|
||||||
unreferencedIndices.stream().map(IndexId::getId).collect(Collectors.toList()));
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.warn(() ->
|
|
||||||
new ParameterizedMessage(
|
|
||||||
"[{}] indices {} are no longer part of any snapshots in the repository, " +
|
|
||||||
"but failed to clean up their index folders.", metadata.name(), unreferencedIndices), e);
|
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void cleanupStaleIndices(Map<String, BlobContainer> foundIndices, Map<String, IndexId> survivingIndices) {
|
||||||
|
try {
|
||||||
|
final Set<String> survivingIndexIds = survivingIndices.values().stream()
|
||||||
|
.map(IndexId::getId).collect(Collectors.toSet());
|
||||||
|
for (Map.Entry<String, BlobContainer> indexEntry : foundIndices.entrySet()) {
|
||||||
|
final String indexSnId = indexEntry.getKey();
|
||||||
|
try {
|
||||||
|
if (survivingIndexIds.contains(indexSnId) == false) {
|
||||||
|
logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId);
|
||||||
|
indexEntry.getValue().delete();
|
||||||
|
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.warn(() -> new ParameterizedMessage(
|
||||||
|
"[{}] index {} is no longer part of any snapshots in the repository, " +
|
||||||
|
"but failed to clean up their index folders", metadata.name(), indexSnId), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
// TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream.
|
||||||
|
// Currently this catch exists as a stop gap solution to tackle unexpected runtime exceptions from implementations
|
||||||
|
// bubbling up and breaking the snapshot functionality.
|
||||||
|
assert false : e;
|
||||||
|
logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale indices", metadata.name()), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void deleteIndices(List<IndexId> indices, SnapshotId snapshotId, ActionListener<Void> listener) {
|
private void deleteIndices(List<IndexId> indices, SnapshotId snapshotId, ActionListener<Void> listener) {
|
||||||
if (indices.isEmpty()) {
|
if (indices.isEmpty()) {
|
||||||
listener.onResponse(null);
|
listener.onResponse(null);
|
||||||
|
@ -523,9 +544,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
startTime, failure, System.currentTimeMillis(), totalShards, shardFailures,
|
startTime, failure, System.currentTimeMillis(), totalShards, shardFailures,
|
||||||
includeGlobalState, userMetadata);
|
includeGlobalState, userMetadata);
|
||||||
try {
|
try {
|
||||||
|
final RepositoryData updatedRepositoryData = getRepositoryData().addSnapshot(snapshotId, blobStoreSnapshot.state(), indices);
|
||||||
snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID());
|
snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID());
|
||||||
final RepositoryData repositoryData = getRepositoryData();
|
writeIndexGen(updatedRepositoryData, repositoryStateId);
|
||||||
writeIndexGen(repositoryData.addSnapshot(snapshotId, blobStoreSnapshot.state(), indices), repositoryStateId);
|
|
||||||
} catch (FileAlreadyExistsException ex) {
|
} catch (FileAlreadyExistsException ex) {
|
||||||
// if another master was elected and took over finalizing the snapshot, it is possible
|
// if another master was elected and took over finalizing the snapshot, it is possible
|
||||||
// that both nodes try to finalize the snapshot and write to the same blobs, so we just
|
// that both nodes try to finalize the snapshot and write to the same blobs, so we just
|
||||||
|
|
|
@ -20,11 +20,13 @@ package org.elasticsearch.snapshots;
|
||||||
|
|
||||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||||
|
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||||
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
|
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.repositories.RepositoriesService;
|
import org.elasticsearch.repositories.RepositoriesService;
|
||||||
|
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
|
||||||
import org.elasticsearch.snapshots.mockstore.MockRepository;
|
import org.elasticsearch.snapshots.mockstore.MockRepository;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
@ -65,6 +67,32 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
|
||||||
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex();
|
internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private String skipRepoConsistencyCheckReason;
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void assertRepoConsistency() {
|
||||||
|
if (skipRepoConsistencyCheckReason == null) {
|
||||||
|
client().admin().cluster().prepareGetRepositories().get().repositories()
|
||||||
|
.stream()
|
||||||
|
.map(RepositoryMetaData::name)
|
||||||
|
.forEach(name -> {
|
||||||
|
final List<SnapshotInfo> snapshots = client().admin().cluster().prepareGetSnapshots(name).get().getSnapshots();
|
||||||
|
// Delete one random snapshot to trigger repository cleanup.
|
||||||
|
if (snapshots.isEmpty() == false) {
|
||||||
|
client().admin().cluster().prepareDeleteSnapshot(name, randomFrom(snapshots).snapshotId().getName()).get();
|
||||||
|
}
|
||||||
|
BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name);
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
logger.info("--> skipped repo consistency checks because [{}]", skipRepoConsistencyCheckReason);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void disableRepoConsistencyCheck(String reason) {
|
||||||
|
assertNotNull(reason);
|
||||||
|
skipRepoConsistencyCheckReason = reason;
|
||||||
|
}
|
||||||
|
|
||||||
public static long getFailureCount(String repository) {
|
public static long getFailureCount(String repository) {
|
||||||
long failureCount = 0;
|
long failureCount = 0;
|
||||||
for (RepositoriesService repositoriesService :
|
for (RepositoriesService repositoriesService :
|
||||||
|
|
|
@ -723,6 +723,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRegistrationFailure() {
|
public void testRegistrationFailure() {
|
||||||
|
disableRepoConsistencyCheck("This test does not create any data in the repository");
|
||||||
logger.info("--> start first node");
|
logger.info("--> start first node");
|
||||||
internalCluster().startNode();
|
internalCluster().startNode();
|
||||||
logger.info("--> start second node");
|
logger.info("--> start second node");
|
||||||
|
@ -742,6 +743,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testThatSensitiveRepositorySettingsAreNotExposed() throws Exception {
|
public void testThatSensitiveRepositorySettingsAreNotExposed() throws Exception {
|
||||||
|
disableRepoConsistencyCheck("This test does not create any data in the repository");
|
||||||
Settings nodeSettings = Settings.EMPTY;
|
Settings nodeSettings = Settings.EMPTY;
|
||||||
logger.info("--> start two nodes");
|
logger.info("--> start two nodes");
|
||||||
internalCluster().startNodes(2, nodeSettings);
|
internalCluster().startNodes(2, nodeSettings);
|
||||||
|
|
|
@ -144,8 +144,8 @@ public class MetadataLoadingDuringSnapshotRestoreIT extends AbstractSnapshotInte
|
||||||
// Deleting a snapshot does not load the global metadata state but loads each index metadata
|
// Deleting a snapshot does not load the global metadata state but loads each index metadata
|
||||||
assertAcked(client().admin().cluster().prepareDeleteSnapshot("repository", "snap").get());
|
assertAcked(client().admin().cluster().prepareDeleteSnapshot("repository", "snap").get());
|
||||||
assertGlobalMetadataLoads("snap", 1);
|
assertGlobalMetadataLoads("snap", 1);
|
||||||
assertIndexMetadataLoads("snap", "docs", 5);
|
assertIndexMetadataLoads("snap", "docs", 4);
|
||||||
assertIndexMetadataLoads("snap", "others", 4);
|
assertIndexMetadataLoads("snap", "others", 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertGlobalMetadataLoads(final String snapshot, final int times) {
|
private void assertGlobalMetadataLoads(final String snapshot, final int times) {
|
||||||
|
|
|
@ -184,6 +184,8 @@ public class RepositoriesIT extends AbstractSnapshotIntegTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRepositoryVerification() throws Exception {
|
public void testRepositoryVerification() throws Exception {
|
||||||
|
disableRepoConsistencyCheck("This test does not create any data in the repository.");
|
||||||
|
|
||||||
Client client = client();
|
Client client = client();
|
||||||
|
|
||||||
Settings settings = Settings.builder()
|
Settings settings = Settings.builder()
|
||||||
|
|
|
@ -363,7 +363,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||||
assertThat(client.prepareGet(restoredIndexName, typeName, docId).get().isExists(), equalTo(true));
|
assertThat(client.prepareGet(restoredIndexName, typeName, docId).get().isExists(), equalTo(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testFreshIndexUUID() {
|
public void testFreshIndexUUID() throws InterruptedException {
|
||||||
Client client = client();
|
Client client = client();
|
||||||
|
|
||||||
logger.info("--> creating repository");
|
logger.info("--> creating repository");
|
||||||
|
@ -541,7 +541,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||||
logger.info("--> check that aliases are not restored and existing aliases still exist");
|
logger.info("--> check that aliases are not restored and existing aliases still exist");
|
||||||
assertAliasesMissing(client.admin().indices().prepareAliasesExist("alias-123", "alias-1").get());
|
assertAliasesMissing(client.admin().indices().prepareAliasesExist("alias-123", "alias-1").get());
|
||||||
assertAliasesExist(client.admin().indices().prepareAliasesExist("alias-3").get());
|
assertAliasesExist(client.admin().indices().prepareAliasesExist("alias-3").get());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRestoreTemplates() throws Exception {
|
public void testRestoreTemplates() throws Exception {
|
||||||
|
@ -594,7 +593,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||||
logger.info("--> check that template is restored");
|
logger.info("--> check that template is restored");
|
||||||
getIndexTemplatesResponse = client().admin().indices().prepareGetTemplates().get();
|
getIndexTemplatesResponse = client().admin().indices().prepareGetTemplates().get();
|
||||||
assertIndexTemplateExists(getIndexTemplatesResponse, "test-template");
|
assertIndexTemplateExists(getIndexTemplatesResponse, "test-template");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testIncludeGlobalState() throws Exception {
|
public void testIncludeGlobalState() throws Exception {
|
||||||
|
@ -781,10 +779,10 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||||
assertFalse(client().admin().cluster().prepareGetPipeline("barbaz").get().isFound());
|
assertFalse(client().admin().cluster().prepareGetPipeline("barbaz").get().isFound());
|
||||||
assertNull(client().admin().cluster().prepareGetStoredScript("foobar").get().getSource());
|
assertNull(client().admin().cluster().prepareGetStoredScript("foobar").get().getSource());
|
||||||
assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
|
assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testSnapshotFileFailureDuringSnapshot() {
|
public void testSnapshotFileFailureDuringSnapshot() throws InterruptedException {
|
||||||
|
disableRepoConsistencyCheck("This test uses a purposely broken repository so it would fail consistency checks");
|
||||||
Client client = client();
|
Client client = client();
|
||||||
|
|
||||||
logger.info("--> creating repository");
|
logger.info("--> creating repository");
|
||||||
|
@ -911,6 +909,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testDataFileFailureDuringRestore() throws Exception {
|
public void testDataFileFailureDuringRestore() throws Exception {
|
||||||
|
disableRepoConsistencyCheck("This test intentionally leaves a broken repository");
|
||||||
|
|
||||||
Path repositoryLocation = randomRepoPath();
|
Path repositoryLocation = randomRepoPath();
|
||||||
Client client = client();
|
Client client = client();
|
||||||
logger.info("--> creating repository");
|
logger.info("--> creating repository");
|
||||||
|
@ -974,6 +974,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testDataFileCorruptionDuringRestore() throws Exception {
|
public void testDataFileCorruptionDuringRestore() throws Exception {
|
||||||
|
disableRepoConsistencyCheck("This test intentionally leaves a broken repository");
|
||||||
|
|
||||||
Path repositoryLocation = randomRepoPath();
|
Path repositoryLocation = randomRepoPath();
|
||||||
Client client = client();
|
Client client = client();
|
||||||
logger.info("--> creating repository");
|
logger.info("--> creating repository");
|
||||||
|
@ -1238,7 +1240,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||||
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
|
assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0));
|
||||||
SearchResponse countResponse = client.prepareSearch("test-idx").setSize(0).get();
|
SearchResponse countResponse = client.prepareSearch("test-idx").setSize(0).get();
|
||||||
assertThat(countResponse.getHits().getTotalHits().value, equalTo(100L));
|
assertThat(countResponse.getHits().getTotalHits().value, equalTo(100L));
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testUnallocatedShards() throws Exception {
|
public void testUnallocatedShards() throws Exception {
|
||||||
|
@ -1703,8 +1704,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||||
.setIndices("test-idx-1").setRenamePattern("test-idx").setRenameReplacement("alias")
|
.setIndices("test-idx-1").setRenamePattern("test-idx").setRenameReplacement("alias")
|
||||||
.setWaitForCompletion(true).setIncludeAliases(false).execute().actionGet();
|
.setWaitForCompletion(true).setIncludeAliases(false).execute().actionGet();
|
||||||
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
|
assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0));
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testMoveShardWhileSnapshotting() throws Exception {
|
public void testMoveShardWhileSnapshotting() throws Exception {
|
||||||
|
@ -1771,6 +1770,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testDeleteRepositoryWhileSnapshotting() throws Exception {
|
public void testDeleteRepositoryWhileSnapshotting() throws Exception {
|
||||||
|
disableRepoConsistencyCheck("This test uses a purposely broken repository so it would fail consistency checks");
|
||||||
Client client = client();
|
Client client = client();
|
||||||
Path repositoryLocation = randomRepoPath();
|
Path repositoryLocation = randomRepoPath();
|
||||||
logger.info("--> creating repository");
|
logger.info("--> creating repository");
|
||||||
|
@ -2329,7 +2329,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||||
|
|
||||||
assertHitCount(client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "Foo")).get(), numdocs);
|
assertHitCount(client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "Foo")).get(), numdocs);
|
||||||
assertHitCount(client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "bar")).get(), numdocs);
|
assertHitCount(client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "bar")).get(), numdocs);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testRecreateBlocksOnRestore() throws Exception {
|
public void testRecreateBlocksOnRestore() throws Exception {
|
||||||
|
@ -2423,6 +2422,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testCloseOrDeleteIndexDuringSnapshot() throws Exception {
|
public void testCloseOrDeleteIndexDuringSnapshot() throws Exception {
|
||||||
|
disableRepoConsistencyCheck("This test intentionally leaves a broken repository");
|
||||||
|
|
||||||
Client client = client();
|
Client client = client();
|
||||||
|
|
||||||
boolean allowPartial = randomBoolean();
|
boolean allowPartial = randomBoolean();
|
||||||
|
@ -2747,6 +2748,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testSnapshotName() throws Exception {
|
public void testSnapshotName() throws Exception {
|
||||||
|
disableRepoConsistencyCheck("This test does not create any data in the repository");
|
||||||
|
|
||||||
final Client client = client();
|
final Client client = client();
|
||||||
|
|
||||||
logger.info("--> creating repository");
|
logger.info("--> creating repository");
|
||||||
|
@ -2767,6 +2770,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testListCorruptedSnapshot() throws Exception {
|
public void testListCorruptedSnapshot() throws Exception {
|
||||||
|
disableRepoConsistencyCheck("This test intentionally leaves a broken repository");
|
||||||
|
|
||||||
Client client = client();
|
Client client = client();
|
||||||
Path repo = randomRepoPath();
|
Path repo = randomRepoPath();
|
||||||
logger.info("--> creating repository at {}", repo.toAbsolutePath());
|
logger.info("--> creating repository at {}", repo.toAbsolutePath());
|
||||||
|
@ -3336,6 +3341,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testSnapshotSucceedsAfterSnapshotFailure() throws Exception {
|
public void testSnapshotSucceedsAfterSnapshotFailure() throws Exception {
|
||||||
|
// TODO: Fix repo cleanup logic to handle these leaked snap-file and only exclude test-repo (the mock repo) here.
|
||||||
|
disableRepoConsistencyCheck(
|
||||||
|
"This test uses a purposely broken repository implementation that results in leaking snap-{uuid}.dat files");
|
||||||
logger.info("--> creating repository");
|
logger.info("--> creating repository");
|
||||||
final Path repoPath = randomRepoPath();
|
final Path repoPath = randomRepoPath();
|
||||||
final Client client = client();
|
final Client client = client();
|
||||||
|
|
|
@ -108,8 +108,6 @@ import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimary
|
||||||
import org.elasticsearch.cluster.service.ClusterApplierService;
|
import org.elasticsearch.cluster.service.ClusterApplierService;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.cluster.service.MasterService;
|
import org.elasticsearch.cluster.service.MasterService;
|
||||||
import org.elasticsearch.common.Strings;
|
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
|
||||||
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
|
||||||
import org.elasticsearch.common.network.NetworkModule;
|
import org.elasticsearch.common.network.NetworkModule;
|
||||||
import org.elasticsearch.common.settings.ClusterSettings;
|
import org.elasticsearch.common.settings.ClusterSettings;
|
||||||
|
@ -120,11 +118,7 @@ import org.elasticsearch.common.util.BigArrays;
|
||||||
import org.elasticsearch.common.util.PageCacheRecycler;
|
import org.elasticsearch.common.util.PageCacheRecycler;
|
||||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
|
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
|
||||||
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
import org.elasticsearch.common.xcontent.XContentHelper;
|
|
||||||
import org.elasticsearch.common.xcontent.XContentParser;
|
|
||||||
import org.elasticsearch.common.xcontent.XContentType;
|
|
||||||
import org.elasticsearch.env.Environment;
|
import org.elasticsearch.env.Environment;
|
||||||
import org.elasticsearch.env.NodeEnvironment;
|
import org.elasticsearch.env.NodeEnvironment;
|
||||||
import org.elasticsearch.env.TestEnvironment;
|
import org.elasticsearch.env.TestEnvironment;
|
||||||
|
@ -149,10 +143,10 @@ import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||||
import org.elasticsearch.ingest.IngestService;
|
import org.elasticsearch.ingest.IngestService;
|
||||||
import org.elasticsearch.node.ResponseCollectorService;
|
import org.elasticsearch.node.ResponseCollectorService;
|
||||||
import org.elasticsearch.plugins.PluginsService;
|
import org.elasticsearch.plugins.PluginsService;
|
||||||
import org.elasticsearch.repositories.IndexId;
|
|
||||||
import org.elasticsearch.repositories.RepositoriesService;
|
import org.elasticsearch.repositories.RepositoriesService;
|
||||||
import org.elasticsearch.repositories.Repository;
|
import org.elasticsearch.repositories.Repository;
|
||||||
import org.elasticsearch.repositories.RepositoryData;
|
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||||
|
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
|
||||||
import org.elasticsearch.repositories.fs.FsRepository;
|
import org.elasticsearch.repositories.fs.FsRepository;
|
||||||
import org.elasticsearch.script.ScriptService;
|
import org.elasticsearch.script.ScriptService;
|
||||||
import org.elasticsearch.search.SearchService;
|
import org.elasticsearch.search.SearchService;
|
||||||
|
@ -170,16 +164,8 @@ import org.elasticsearch.transport.TransportService;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.ByteBuffer;
|
|
||||||
import java.nio.file.DirectoryNotEmptyException;
|
|
||||||
import java.nio.file.FileVisitResult;
|
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.nio.file.NoSuchFileException;
|
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.nio.file.SimpleFileVisitor;
|
|
||||||
import java.nio.file.attribute.BasicFileAttributes;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
@ -225,9 +211,11 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void verifyReposThenStopServices() throws IOException {
|
public void verifyReposThenStopServices() {
|
||||||
try {
|
try {
|
||||||
assertNoStaleRepositoryData();
|
BlobStoreTestUtil.assertConsistency(
|
||||||
|
(BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo"),
|
||||||
|
Runnable::run);
|
||||||
} finally {
|
} finally {
|
||||||
testClusterNodes.nodes.values().forEach(TestClusterNode::stop);
|
testClusterNodes.nodes.values().forEach(TestClusterNode::stop);
|
||||||
}
|
}
|
||||||
|
@ -525,109 +513,6 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
||||||
assertThat(snapshotIds, either(hasSize(1)).or(hasSize(0)));
|
assertThat(snapshotIds, either(hasSize(1)).or(hasSize(0)));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Assert that there are no unreferenced indices or unreferenced root-level metadata blobs in any repository.
|
|
||||||
* TODO: Expand the logic here to also check for unreferenced segment blobs and shard level metadata
|
|
||||||
*/
|
|
||||||
private void assertNoStaleRepositoryData() throws IOException {
|
|
||||||
final Path repoPath = tempDir.resolve("repo").toAbsolutePath();
|
|
||||||
final List<Path> repos;
|
|
||||||
try (Stream<Path> reposDir = repoFilesByPrefix(repoPath)) {
|
|
||||||
repos = reposDir.filter(s -> s.getFileName().toString().startsWith("extra") == false).collect(Collectors.toList());
|
|
||||||
}
|
|
||||||
for (Path repoRoot : repos) {
|
|
||||||
cleanupEmptyTrees(repoRoot);
|
|
||||||
final Path latestIndexGenBlob = repoRoot.resolve("index.latest");
|
|
||||||
assertTrue("Could not find index.latest blob for repo at [" + repoRoot + ']', Files.exists(latestIndexGenBlob));
|
|
||||||
final long latestGen = ByteBuffer.wrap(Files.readAllBytes(latestIndexGenBlob)).getLong(0);
|
|
||||||
assertIndexGenerations(repoRoot, latestGen);
|
|
||||||
final RepositoryData repositoryData;
|
|
||||||
try (XContentParser parser =
|
|
||||||
XContentHelper.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE,
|
|
||||||
new BytesArray(Files.readAllBytes(repoRoot.resolve("index-" + latestGen))), XContentType.JSON)) {
|
|
||||||
repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen);
|
|
||||||
}
|
|
||||||
assertIndexUUIDs(repoRoot, repositoryData);
|
|
||||||
assertSnapshotUUIDs(repoRoot, repositoryData);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Lucene's mock file system randomly generates empty `extra0` files that break the deletion of blob-store directories.
|
|
||||||
// We clean those up here before checking a blob-store for stale files.
|
|
||||||
private void cleanupEmptyTrees(Path repoPath) {
|
|
||||||
try {
|
|
||||||
Files.walkFileTree(repoPath, new SimpleFileVisitor<Path>() {
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
|
|
||||||
if (file.getFileName().toString().startsWith("extra")) {
|
|
||||||
Files.delete(file);
|
|
||||||
}
|
|
||||||
return FileVisitResult.CONTINUE;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
|
|
||||||
try {
|
|
||||||
Files.delete(dir);
|
|
||||||
} catch (DirectoryNotEmptyException e) {
|
|
||||||
// We're only interested in deleting empty trees here, just ignore directories with content
|
|
||||||
}
|
|
||||||
return FileVisitResult.CONTINUE;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new AssertionError(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void assertIndexGenerations(Path repoRoot, long latestGen) throws IOException {
|
|
||||||
try (Stream<Path> repoRootBlobs = repoFilesByPrefix(repoRoot)) {
|
|
||||||
final long[] indexGenerations = repoRootBlobs.filter(p -> p.getFileName().toString().startsWith("index-"))
|
|
||||||
.map(p -> p.getFileName().toString().replace("index-", ""))
|
|
||||||
.mapToLong(Long::parseLong).sorted().toArray();
|
|
||||||
assertEquals(latestGen, indexGenerations[indexGenerations.length - 1]);
|
|
||||||
assertTrue(indexGenerations.length <= 2);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void assertIndexUUIDs(Path repoRoot, RepositoryData repositoryData) throws IOException {
|
|
||||||
final List<String> expectedIndexUUIDs =
|
|
||||||
repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toList());
|
|
||||||
try (Stream<Path> indexRoots = repoFilesByPrefix(repoRoot.resolve("indices"))) {
|
|
||||||
final List<String> foundIndexUUIDs = indexRoots.filter(s -> s.getFileName().toString().startsWith("extra") == false)
|
|
||||||
.map(p -> p.getFileName().toString()).collect(Collectors.toList());
|
|
||||||
assertThat(foundIndexUUIDs, containsInAnyOrder(expectedIndexUUIDs.toArray(Strings.EMPTY_ARRAY)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void assertSnapshotUUIDs(Path repoRoot, RepositoryData repositoryData) throws IOException {
|
|
||||||
final List<String> expectedSnapshotUUIDs =
|
|
||||||
repositoryData.getSnapshotIds().stream().map(SnapshotId::getUUID).collect(Collectors.toList());
|
|
||||||
for (String prefix : new String[]{"snap-", "meta-"}) {
|
|
||||||
try (Stream<Path> repoRootBlobs = repoFilesByPrefix(repoRoot)) {
|
|
||||||
final Collection<String> foundSnapshotUUIDs = repoRootBlobs.filter(p -> p.getFileName().toString().startsWith(prefix))
|
|
||||||
.map(p -> p.getFileName().toString().replace(prefix, "").replace(".dat", ""))
|
|
||||||
.collect(Collectors.toSet());
|
|
||||||
assertThat(foundSnapshotUUIDs, containsInAnyOrder(expectedSnapshotUUIDs.toArray(Strings.EMPTY_ARRAY)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* List contents of a blob path and return an empty stream if the path doesn't exist.
|
|
||||||
* @param prefix Path to find children for
|
|
||||||
* @return stream of child paths
|
|
||||||
* @throws IOException on failure
|
|
||||||
*/
|
|
||||||
private static Stream<Path> repoFilesByPrefix(Path prefix) throws IOException {
|
|
||||||
try {
|
|
||||||
return Files.list(prefix);
|
|
||||||
} catch (FileNotFoundException | NoSuchFileException e) {
|
|
||||||
return Stream.empty();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void clearDisruptionsAndAwaitSync() {
|
private void clearDisruptionsAndAwaitSync() {
|
||||||
testClusterNodes.clearNetworkDisruptions();
|
testClusterNodes.clearNetworkDisruptions();
|
||||||
runUntil(() -> {
|
runUntil(() -> {
|
||||||
|
|
|
@ -59,6 +59,7 @@ import java.util.Random;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public class MockRepository extends FsRepository {
|
public class MockRepository extends FsRepository {
|
||||||
private static final Logger logger = LogManager.getLogger(MockRepository.class);
|
private static final Logger logger = LogManager.getLogger(MockRepository.class);
|
||||||
|
@ -333,6 +334,17 @@ public class MockRepository extends FsRepository {
|
||||||
super.deleteBlobIgnoringIfNotExists(blobName);
|
super.deleteBlobIgnoringIfNotExists(blobName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void delete() throws IOException {
|
||||||
|
for (BlobContainer child : children().values()) {
|
||||||
|
child.delete();
|
||||||
|
}
|
||||||
|
for (String blob : listBlobs().values().stream().map(BlobMetaData::name).collect(Collectors.toList())) {
|
||||||
|
deleteBlobIgnoringIfNotExists(blob);
|
||||||
|
}
|
||||||
|
blobStore().blobContainer(path().parent()).deleteBlob(path().toArray()[path().toArray().length - 1]);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Map<String, BlobMetaData> listBlobs() throws IOException {
|
public Map<String, BlobMetaData> listBlobs() throws IOException {
|
||||||
maybeIOExceptionOrBlock("");
|
maybeIOExceptionOrBlock("");
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.elasticsearch.repositories;
|
||||||
|
|
||||||
import org.elasticsearch.action.ActionRunnable;
|
import org.elasticsearch.action.ActionRunnable;
|
||||||
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
|
||||||
|
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
|
||||||
import org.elasticsearch.action.support.PlainActionFuture;
|
import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.blobstore.BlobMetaData;
|
import org.elasticsearch.common.blobstore.BlobMetaData;
|
||||||
|
@ -29,8 +30,10 @@ import org.elasticsearch.common.blobstore.support.PlainBlobMetaData;
|
||||||
import org.elasticsearch.common.settings.SecureSettings;
|
import org.elasticsearch.common.settings.SecureSettings;
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||||
|
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
|
||||||
import org.elasticsearch.snapshots.SnapshotState;
|
import org.elasticsearch.snapshots.SnapshotState;
|
||||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -184,6 +187,85 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testCleanup() throws Exception {
|
||||||
|
createRepository("test-repo");
|
||||||
|
|
||||||
|
createIndex("test-idx-1");
|
||||||
|
createIndex("test-idx-2");
|
||||||
|
createIndex("test-idx-3");
|
||||||
|
ensureGreen();
|
||||||
|
|
||||||
|
logger.info("--> indexing some data");
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
client().prepareIndex("test-idx-1", "doc", Integer.toString(i)).setSource("foo", "bar" + i).get();
|
||||||
|
client().prepareIndex("test-idx-2", "doc", Integer.toString(i)).setSource("foo", "bar" + i).get();
|
||||||
|
client().prepareIndex("test-idx-3", "doc", Integer.toString(i)).setSource("foo", "bar" + i).get();
|
||||||
|
}
|
||||||
|
client().admin().indices().prepareRefresh().get();
|
||||||
|
|
||||||
|
final String snapshotName = "test-snap-" + System.currentTimeMillis();
|
||||||
|
|
||||||
|
logger.info("--> snapshot");
|
||||||
|
CreateSnapshotResponse createSnapshotResponse = client().admin()
|
||||||
|
.cluster()
|
||||||
|
.prepareCreateSnapshot("test-repo", snapshotName)
|
||||||
|
.setWaitForCompletion(true)
|
||||||
|
.setIndices("test-idx-*", "-test-idx-3")
|
||||||
|
.get();
|
||||||
|
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
|
||||||
|
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
|
||||||
|
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
|
||||||
|
|
||||||
|
assertThat(client().admin()
|
||||||
|
.cluster()
|
||||||
|
.prepareGetSnapshots("test-repo")
|
||||||
|
.setSnapshots(snapshotName)
|
||||||
|
.get()
|
||||||
|
.getSnapshots().get(0)
|
||||||
|
.state(),
|
||||||
|
equalTo(SnapshotState.SUCCESS));
|
||||||
|
|
||||||
|
logger.info("--> creating a dangling index folder");
|
||||||
|
final BlobStoreRepository repo =
|
||||||
|
(BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo");
|
||||||
|
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
|
||||||
|
final Executor genericExec = repo.threadPool().executor(ThreadPool.Names.GENERIC);
|
||||||
|
genericExec.execute(new ActionRunnable<Void>(future) {
|
||||||
|
@Override
|
||||||
|
protected void doRun() throws Exception {
|
||||||
|
final BlobStore blobStore = repo.blobStore();
|
||||||
|
blobStore.blobContainer(BlobPath.cleanPath().add("indices").add("foo"))
|
||||||
|
.writeBlob("bar", new ByteArrayInputStream(new byte[0]), 0, false);
|
||||||
|
future.onResponse(null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
future.actionGet();
|
||||||
|
assertTrue(assertCorruptionVisible(repo, genericExec));
|
||||||
|
logger.info("--> deleting a snapshot to trigger repository cleanup");
|
||||||
|
client().admin().cluster().deleteSnapshot(new DeleteSnapshotRequest("test-repo", snapshotName)).actionGet();
|
||||||
|
|
||||||
|
assertConsistentRepository(repo, genericExec);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected boolean assertCorruptionVisible(BlobStoreRepository repo, Executor executor) throws Exception {
|
||||||
|
final PlainActionFuture<Boolean> future = PlainActionFuture.newFuture();
|
||||||
|
executor.execute(new ActionRunnable<Boolean>(future) {
|
||||||
|
@Override
|
||||||
|
protected void doRun() throws Exception {
|
||||||
|
final BlobStore blobStore = repo.blobStore();
|
||||||
|
future.onResponse(
|
||||||
|
blobStore.blobContainer(BlobPath.cleanPath().add("indices")).children().containsKey("foo")
|
||||||
|
&& blobStore.blobContainer(BlobPath.cleanPath().add("indices").add("foo")).blobExists("bar")
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return future.actionGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void assertConsistentRepository(BlobStoreRepository repo, Executor executor) throws Exception {
|
||||||
|
BlobStoreTestUtil.assertConsistency(repo, executor);
|
||||||
|
}
|
||||||
|
|
||||||
protected void assertDeleted(BlobPath path, String name) throws Exception {
|
protected void assertDeleted(BlobPath path, String name) throws Exception {
|
||||||
assertThat(listChildren(path), not(contains(name)));
|
assertThat(listChildren(path), not(contains(name)));
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,133 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
package org.elasticsearch.repositories.blobstore;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.ActionRunnable;
|
||||||
|
import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
|
import org.elasticsearch.common.Strings;
|
||||||
|
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||||
|
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
||||||
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentHelper;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentParser;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentType;
|
||||||
|
import org.elasticsearch.core.internal.io.Streams;
|
||||||
|
import org.elasticsearch.repositories.IndexId;
|
||||||
|
import org.elasticsearch.repositories.RepositoriesService;
|
||||||
|
import org.elasticsearch.repositories.RepositoryData;
|
||||||
|
import org.elasticsearch.snapshots.SnapshotId;
|
||||||
|
import org.elasticsearch.test.InternalTestCluster;
|
||||||
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
public final class BlobStoreTestUtil {
|
||||||
|
|
||||||
|
public static void assertRepoConsistency(InternalTestCluster testCluster, String repoName) {
|
||||||
|
final BlobStoreRepository repo =
|
||||||
|
(BlobStoreRepository) testCluster.getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
|
||||||
|
BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Assert that there are no unreferenced indices or unreferenced root-level metadata blobs in any repository.
|
||||||
|
* TODO: Expand the logic here to also check for unreferenced segment blobs and shard level metadata
|
||||||
|
* @param repository BlobStoreRepository to check
|
||||||
|
* @param executor Executor to run all repository calls on. This is needed since the production {@link BlobStoreRepository}
|
||||||
|
* implementations assert that all IO inducing calls happen on the generic or snapshot thread-pools and hence callers
|
||||||
|
* of this assertion must pass an executor on those when using such an implementation.
|
||||||
|
*/
|
||||||
|
public static void assertConsistency(BlobStoreRepository repository, Executor executor) {
|
||||||
|
final PlainActionFuture<Void> listener = PlainActionFuture.newFuture();
|
||||||
|
executor.execute(new ActionRunnable<Void>(listener) {
|
||||||
|
@Override
|
||||||
|
protected void doRun() throws Exception {
|
||||||
|
final BlobContainer blobContainer = repository.blobContainer();
|
||||||
|
assertTrue(
|
||||||
|
"Could not find index.latest blob for repo [" + repository + "]", blobContainer.blobExists("index.latest"));
|
||||||
|
final long latestGen;
|
||||||
|
try (DataInputStream inputStream = new DataInputStream(blobContainer.readBlob("index.latest"))) {
|
||||||
|
latestGen = inputStream.readLong();
|
||||||
|
}
|
||||||
|
assertIndexGenerations(blobContainer, latestGen);
|
||||||
|
final RepositoryData repositoryData;
|
||||||
|
try (InputStream inputStream = blobContainer.readBlob("index-" + latestGen);
|
||||||
|
BytesStreamOutput out = new BytesStreamOutput()) {
|
||||||
|
Streams.copy(inputStream, out);
|
||||||
|
try (XContentParser parser =
|
||||||
|
XContentHelper.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE,
|
||||||
|
out.bytes(), XContentType.JSON)) {
|
||||||
|
repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertIndexUUIDs(blobContainer, repositoryData);
|
||||||
|
assertSnapshotUUIDs(blobContainer, repositoryData);
|
||||||
|
listener.onResponse(null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
listener.actionGet(TimeValue.timeValueMinutes(1L));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void assertIndexGenerations(BlobContainer repoRoot, long latestGen) throws IOException {
|
||||||
|
final long[] indexGenerations = repoRoot.listBlobsByPrefix("index-").keySet().stream()
|
||||||
|
.map(s -> s.replace("index-", ""))
|
||||||
|
.mapToLong(Long::parseLong).sorted().toArray();
|
||||||
|
assertEquals(latestGen, indexGenerations[indexGenerations.length - 1]);
|
||||||
|
assertTrue(indexGenerations.length <= 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void assertIndexUUIDs(BlobContainer repoRoot, RepositoryData repositoryData) throws IOException {
|
||||||
|
final List<String> expectedIndexUUIDs =
|
||||||
|
repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toList());
|
||||||
|
final BlobContainer indicesContainer = repoRoot.children().get("indices");
|
||||||
|
final List<String> foundIndexUUIDs;
|
||||||
|
if (indicesContainer == null) {
|
||||||
|
foundIndexUUIDs = Collections.emptyList();
|
||||||
|
} else {
|
||||||
|
foundIndexUUIDs = indicesContainer.children().keySet().stream().filter(
|
||||||
|
s -> s.startsWith("extra") == false).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
assertThat(foundIndexUUIDs, containsInAnyOrder(expectedIndexUUIDs.toArray(Strings.EMPTY_ARRAY)));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void assertSnapshotUUIDs(BlobContainer repoRoot, RepositoryData repositoryData) throws IOException {
|
||||||
|
final List<String> expectedSnapshotUUIDs =
|
||||||
|
repositoryData.getSnapshotIds().stream().map(SnapshotId::getUUID).collect(Collectors.toList());
|
||||||
|
for (String prefix : new String[]{"snap-", "meta-"}) {
|
||||||
|
final Collection<String> foundSnapshotUUIDs = repoRoot.listBlobs().keySet().stream().filter(p -> p.startsWith(prefix))
|
||||||
|
.map(p -> p.replace(prefix, "").replace(".dat", ""))
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
assertThat(foundSnapshotUUIDs, containsInAnyOrder(expectedSnapshotUUIDs.toArray(Strings.EMPTY_ARRAY)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue