Use ClusterState as Consistency Source for Snapshot Repositories (#49060) (#50267)

Follow up to #49729

This change removes falling back to listing out the repository contents to find the latest `index-N` in write-mounted blob store repositories.
This saves 2-3 list operations on each snapshot create and delete operation. Also it makes all the snapshot status APIs cheaper (and faster) by saving one list operation there as well in many cases.
This removes the resiliency to concurrent modifications of the repository as a result and puts a repository in a `corrupted` state in case loading `RepositoryData` failed from the assumed generation.
This commit is contained in:
Armin Braun 2019-12-17 10:55:13 +01:00 committed by GitHub
parent ce294e1564
commit 2e7b1ab375
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 457 additions and 50 deletions

View File

@ -58,6 +58,11 @@ public final class RepositoryData {
*/
public static final long UNKNOWN_REPO_GEN = -2L;
/**
* The generation value indicating that the repository generation could not be determined.
*/
public static final long CORRUPTED_REPO_GEN = -3L;
/**
* An instance initialized for an empty repository.
*/

View File

@ -31,6 +31,7 @@ import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.lucene.store.RateLimiter;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.StepListener;
@ -66,6 +67,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -179,6 +181,21 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private static final String DATA_BLOB_PREFIX = "__";
/**
* When set to true metadata files are stored in compressed format. This setting doesnt affect index
* files that are already compressed by default. Changing the setting does not invalidate existing files since reads
* do not observe the setting, instead they examine the file to see if it is compressed or not.
*/
public static final Setting<Boolean> COMPRESS_SETTING = Setting.boolSetting("compress", true, Setting.Property.NodeScope);
/**
* When set to {@code true}, {@link #bestEffortConsistency} will be set to {@code true} and concurrent modifications of the repository
* contents will not result in the repository being marked as corrupted.
* Note: This setting is intended as a backwards compatibility solution for 7.x and will go away in 8.
*/
public static final Setting<Boolean> ALLOW_CONCURRENT_MODIFICATION =
Setting.boolSetting("allow_concurrent_modifications", false, Setting.Property.Deprecated);
private final boolean compress;
private final RateLimiter snapshotRateLimiter;
@ -209,6 +226,34 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private final ClusterService clusterService;
/**
* Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for
* {@link RepositoryMetaData#pendingGeneration()} than for {@link RepositoryMetaData#generation()} indicating a full cluster restart
* potentially accounting for the the last {@code index-N} write in the cluster state.
* Note: While it is true that this value could also be set to {@code true} for an instance on a node that is just joining the cluster
* during a new {@code index-N} write, this does not present a problem. The node will still load the correct {@link RepositoryData} in
* all cases and simply do a redundant listing of the repository contents if it tries to load {@link RepositoryData} and falls back
* to {@link #latestIndexBlobId()} to validate the value of {@link RepositoryMetaData#generation()}.
*/
private boolean uncleanStart;
/**
* This flag indicates that the repository can not exclusively rely on the value stored in {@link #latestKnownRepoGen} to determine the
* latest repository generation but must inspect its physical contents as well via {@link #latestIndexBlobId()}.
* This flag is set in the following situations:
* <ul>
* <li>All repositories that are read-only, i.e. for which {@link #isReadOnly()} returns {@code true} because there are no
* guarantees that another cluster is not writing to the repository at the same time</li>
* <li>The node finds itself in a mixed-version cluster containing nodes older than
* {@link RepositoryMetaData#REPO_GEN_IN_CS_VERSION} where the master node does not update the value of
* {@link RepositoryMetaData#generation()} when writing a new {@code index-N} blob</li>
* <li>The value of {@link RepositoryMetaData#generation()} for this repository is {@link RepositoryData#UNKNOWN_REPO_GEN}
* indicating that no consistent repository generation is tracked in the cluster state yet.</li>
* <li>The {@link #uncleanStart} flag is set to {@code true}</li>
* </ul>
*/
private volatile boolean bestEffortConsistency;
/**
* Constructs new BlobStoreRepository
* @param metadata The metadata for this repository including name and settings
@ -237,6 +282,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
protected void doStart() {
uncleanStart = metadata.pendingGeneration() > RepositoryData.EMPTY_REPO_GEN &&
metadata.generation() != metadata.pendingGeneration();
ByteSizeValue chunkSize = chunkSize();
if (chunkSize != null && chunkSize.getBytes() <= 0) {
throw new IllegalArgumentException("the chunk size cannot be negative: [" + chunkSize + "]");
@ -272,29 +319,42 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// #latestKnownRepoGen if a newer than currently known generation is found
@Override
public void updateState(ClusterState state) {
if (readOnly) {
metadata = getRepoMetaData(state);
uncleanStart = uncleanStart && metadata.generation() != metadata.pendingGeneration();
bestEffortConsistency = uncleanStart || isReadOnly()
|| state.nodes().getMinNodeVersion().before(RepositoryMetaData.REPO_GEN_IN_CS_VERSION)
|| metadata.generation() == RepositoryData.UNKNOWN_REPO_GEN || ALLOW_CONCURRENT_MODIFICATION.get(metadata.settings());
if (isReadOnly()) {
// No need to waste cycles, no operations can run against a read-only repository
return;
}
long bestGenerationFromCS = RepositoryData.EMPTY_REPO_GEN;
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress != null) {
bestGenerationFromCS = bestGeneration(snapshotsInProgress.entries());
if (bestEffortConsistency) {
long bestGenerationFromCS = RepositoryData.EMPTY_REPO_GEN;
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress != null) {
bestGenerationFromCS = bestGeneration(snapshotsInProgress.entries());
}
final SnapshotDeletionsInProgress deletionsInProgress = state.custom(SnapshotDeletionsInProgress.TYPE);
// Don't use generation from the delete task if we already found a generation for an in progress snapshot.
// In this case, the generation points at the generation the repo will be in after the snapshot finishes so it may not yet
// exist
if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN && deletionsInProgress != null) {
bestGenerationFromCS = bestGeneration(deletionsInProgress.getEntries());
}
final RepositoryCleanupInProgress cleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE);
if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN && cleanupInProgress != null) {
bestGenerationFromCS = bestGeneration(cleanupInProgress.entries());
}
final long finalBestGen = Math.max(bestGenerationFromCS, metadata.generation());
latestKnownRepoGen.updateAndGet(known -> Math.max(known, finalBestGen));
} else {
final long previousBest = latestKnownRepoGen.getAndSet(metadata.generation());
if (previousBest != metadata.generation()) {
assert metadata.generation() == RepositoryData.CORRUPTED_REPO_GEN || previousBest < metadata.generation() :
"Illegal move from repository generation [" + previousBest + "] to generation [" + metadata.generation() + "]";
logger.debug("Updated repository generation from [{}] to [{}]", previousBest, metadata.generation());
}
}
final SnapshotDeletionsInProgress deletionsInProgress = state.custom(SnapshotDeletionsInProgress.TYPE);
// Don't use generation from the delete task if we already found a generation for an in progress snapshot.
// In this case, the generation points at the generation the repo will be in after the snapshot finishes so it may not yet exist
if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN && deletionsInProgress != null) {
bestGenerationFromCS = bestGeneration(deletionsInProgress.getEntries());
}
final RepositoryCleanupInProgress cleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE);
if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN && cleanupInProgress != null) {
bestGenerationFromCS = bestGeneration(cleanupInProgress.entries());
}
metadata = getRepoMetaData(state);
final long finalBestGen = Math.max(bestGenerationFromCS, metadata.generation());
latestKnownRepoGen.updateAndGet(known -> Math.max(known, finalBestGen));
}
private long bestGeneration(Collection<? extends RepositoryOperation> operations) {
@ -451,7 +511,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
*/
private RepositoryData safeRepositoryData(long repositoryStateId, Map<String, BlobMetaData> rootBlobs) {
final long generation = latestGeneration(rootBlobs.keySet());
final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, repositoryStateId));
final long genToLoad;
if (bestEffortConsistency) {
genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, repositoryStateId));
} else {
genToLoad = latestKnownRepoGen.get();
}
if (genToLoad > generation) {
// It's always a possibility to not see the latest index-N in the listing here on an eventually consistent blob store, just
// debug log it. Any blobs leaked as a result of an inconsistent listing here will be cleaned up in a subsequent cleanup or
@ -988,36 +1053,106 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// Tracks the latest known repository generation in a best-effort way to detect inconsistent listing of root level index-N blobs
// and concurrent modifications.
private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN);
private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.UNKNOWN_REPO_GEN);
@Override
public void getRepositoryData(ActionListener<RepositoryData> listener) {
ActionListener.completeWith(listener, () -> {
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
while (true) {
if (latestKnownRepoGen.get() == RepositoryData.CORRUPTED_REPO_GEN) {
listener.onFailure(corruptedStateException(null));
return;
}
// Retry loading RepositoryData in a loop in case we run into concurrent modifications of the repository.
while (true) {
final long genToLoad;
if (bestEffortConsistency) {
// We're only using #latestKnownRepoGen as a hint in this mode and listing repo contents as a secondary way of trying
// to find a higher generation
final long generation;
try {
generation = latestIndexBlobId();
} catch (IOException ioe) {
throw new RepositoryException(metadata.name(), "Could not determine repository generation from root blobs", ioe);
}
final long genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation));
genToLoad = latestKnownRepoGen.updateAndGet(known -> Math.max(known, generation));
if (genToLoad > generation) {
logger.info("Determined repository generation [" + generation
+ "] from repository contents but correct generation must be at least [" + genToLoad + "]");
}
try {
return getRepositoryData(genToLoad);
} catch (RepositoryException e) {
if (genToLoad != latestKnownRepoGen.get()) {
logger.warn("Failed to load repository data generation [" + genToLoad +
"] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e);
continue;
}
} else {
// We only rely on the generation tracked in #latestKnownRepoGen which is exclusively updated from the cluster state
genToLoad = latestKnownRepoGen.get();
}
try {
listener.onResponse(getRepositoryData(genToLoad));
return;
} catch (RepositoryException e) {
if (genToLoad != latestKnownRepoGen.get()) {
logger.warn("Failed to load repository data generation [" + genToLoad +
"] because a concurrent operation moved the current generation to [" + latestKnownRepoGen.get() + "]", e);
continue;
}
if (bestEffortConsistency == false && ExceptionsHelper.unwrap(e, NoSuchFileException.class) != null) {
// We did not find the expected index-N even though the cluster state continues to point at the missing value
// of N so we mark this repository as corrupted.
markRepoCorrupted(genToLoad, e,
ActionListener.wrap(v -> listener.onFailure(corruptedStateException(e)), listener::onFailure));
return;
} else {
throw e;
}
}
});
}
}
private RepositoryException corruptedStateException(@Nullable Exception cause) {
return new RepositoryException(metadata.name(),
"Could not read repository data because the contents of the repository do not match its " +
"expected state. This is likely the result of either concurrently modifying the contents of the " +
"repository by a process other than this cluster or an issue with the repository's underlying" +
"storage. The repository has been disabled to prevent corrupting its contents. To re-enable it " +
"and continue using it please remove the repository from the cluster and add it again to make " +
"the cluster recover the known state of the repository from its physical contents.", cause);
}
/**
* Marks the repository as corrupted. This puts the repository in a state where its tracked value for
* {@link RepositoryMetaData#pendingGeneration()} is unchanged while its value for {@link RepositoryMetaData#generation()} is set to
* {@link RepositoryData#CORRUPTED_REPO_GEN}. In this state, the repository can not be used any longer and must be removed and
* recreated after the problem that lead to it being marked as corrupted has been fixed.
*
* @param corruptedGeneration generation that failed to load because the index file was not found but that should have loaded
* @param originalException exception that lead to the failing to load the {@code index-N} blob
* @param listener listener to invoke once done
*/
private void markRepoCorrupted(long corruptedGeneration, Exception originalException, ActionListener<Void> listener) {
assert corruptedGeneration != RepositoryData.UNKNOWN_REPO_GEN;
assert bestEffortConsistency == false;
clusterService.submitStateUpdateTask("mark repository corrupted [" + metadata.name() + "][" + corruptedGeneration + "]",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
final RepositoriesMetaData state = currentState.metaData().custom(RepositoriesMetaData.TYPE);
final RepositoryMetaData repoState = state.repository(metadata.name());
if (repoState.generation() != corruptedGeneration) {
throw new IllegalStateException("Tried to mark repo generation [" + corruptedGeneration
+ "] as corrupted but its state concurrently changed to [" + repoState + "]");
}
return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.metaData()).putCustom(
RepositoriesMetaData.TYPE, state.withUpdatedGeneration(
metadata.name(), RepositoryData.CORRUPTED_REPO_GEN, repoState.pendingGeneration())).build()).build();
}
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(new RepositoryException(metadata.name(), "Failed marking repository state as corrupted",
ExceptionsHelper.useOrSuppress(e, originalException)));
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(null);
}
});
}
private RepositoryData getRepositoryData(long indexGen) {
@ -1034,11 +1169,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
return RepositoryData.snapshotsFromXContent(parser, indexGen);
}
} catch (IOException ioe) {
// If we fail to load the generation we tracked in latestKnownRepoGen we reset it.
// This is done as a fail-safe in case a user manually deletes the contents of the repository in which case subsequent
// operations must start from the EMPTY_REPO_GEN again
if (latestKnownRepoGen.compareAndSet(indexGen, RepositoryData.EMPTY_REPO_GEN)) {
logger.warn("Resetting repository generation tracker because we failed to read generation [" + indexGen + "]", ioe);
if (bestEffortConsistency) {
// If we fail to load the generation we tracked in latestKnownRepoGen we reset it.
// This is done as a fail-safe in case a user manually deletes the contents of the repository in which case subsequent
// operations must start from the EMPTY_REPO_GEN again
if (latestKnownRepoGen.compareAndSet(indexGen, RepositoryData.EMPTY_REPO_GEN)) {
logger.warn("Resetting repository generation tracker because we failed to read generation [" + indexGen + "]", ioe);
}
}
throw new RepositoryException(metadata.name(), "could not read repository data from index blob", ioe);
}
@ -1090,13 +1227,12 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
final RepositoryMetaData meta = getRepoMetaData(currentState);
final String repoName = metadata.name();
final long genInState = meta.generation();
// TODO: Remove all usages of this variable, instead initialize the generation when loading RepositoryData
final boolean uninitializedMeta = meta.generation() == RepositoryData.UNKNOWN_REPO_GEN;
final boolean uninitializedMeta = meta.generation() == RepositoryData.UNKNOWN_REPO_GEN || bestEffortConsistency;
if (uninitializedMeta == false && meta.pendingGeneration() != genInState) {
logger.info("Trying to write new repository data over unfinished write, repo [{}] is at " +
"safe generation [{}] and pending generation [{}]", meta.name(), genInState, meta.pendingGeneration());
}
assert expectedGen == RepositoryData.EMPTY_REPO_GEN || RepositoryData.UNKNOWN_REPO_GEN == meta.generation()
assert expectedGen == RepositoryData.EMPTY_REPO_GEN || uninitializedMeta
|| expectedGen == meta.generation() :
"Expected non-empty generation [" + expectedGen + "] does not match generation tracked in [" + meta + "]";
// If we run into the empty repo generation for the expected gen, the repo is assumed to have been cleared of
@ -1107,7 +1243,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// even if a repository has been manually cleared of all contents we will never reuse the same repository generation.
// This is motivated by the consistency behavior the S3 based blob repository implementation has to support which does
// not offer any consistency guarantees when it comes to overwriting the same blob name with different content.
newGen = uninitializedMeta ? expectedGen + 1: metadata.pendingGeneration() + 1;
final long nextPendingGen = metadata.pendingGeneration() + 1;
newGen = uninitializedMeta ? Math.max(expectedGen + 1, nextPendingGen) : nextPendingGen;
assert newGen > latestKnownRepoGen.get() : "Attempted new generation [" + newGen +
"] must be larger than latest known generation [" + latestKnownRepoGen.get() + "]";
return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.getMetaData())

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
@ -193,13 +194,16 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {
private Repository createRepository() {
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(),
BlobStoreTestUtil.mockClusterService(repositoryMetaData)) {
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetaData);
final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), clusterService) {
@Override
protected void assertSnapshotOrGenericThread() {
// eliminate thread name check as we create repo manually
}
};
clusterService.addStateApplier(event -> repository.updateState(event.state()));
// Apply state once to initialize repo properly like RepositoriesService would
repository.updateState(clusterService.state());
repository.start();
return repository;
}

View File

@ -0,0 +1,247 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.snapshots;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import java.nio.file.Files;
import java.nio.file.Path;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCase {
public void testConcurrentlyChangeRepositoryContents() throws Exception {
Client client = client();
Path repo = randomRepoPath();
final String repoName = "test-repo";
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createIndex("test-idx-1", "test-idx-2");
logger.info("--> indexing some data");
indexRandom(true,
client().prepareIndex().setIndex("test-idx-1").setSource("foo", "bar"),
client().prepareIndex().setIndex("test-idx-2").setSource("foo", "bar"));
final String snapshot = "test-snap";
logger.info("--> creating snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshot)
.setWaitForCompletion(true).setIndices("test-idx-*").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
logger.info("--> move index-N blob to next generation");
final RepositoryData repositoryData =
getRepositoryData(internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(repoName));
Files.move(repo.resolve("index-" + repositoryData.getGenId()), repo.resolve("index-" + (repositoryData.getGenId() + 1)));
assertRepositoryBlocked(client, repoName, snapshot);
if (randomBoolean()) {
logger.info("--> move index-N blob back to initial generation");
Files.move(repo.resolve("index-" + (repositoryData.getGenId() + 1)), repo.resolve("index-" + repositoryData.getGenId()));
logger.info("--> verify repository remains blocked");
assertRepositoryBlocked(client, repoName, snapshot);
}
logger.info("--> remove repository");
assertAcked(client.admin().cluster().prepareDeleteRepository(repoName));
logger.info("--> recreate repository");
assertAcked(client.admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
logger.info("--> delete snapshot");
client.admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();
logger.info("--> make sure snapshot doesn't exist");
expectThrows(SnapshotMissingException.class, () -> client.admin().cluster().prepareGetSnapshots(repoName)
.addSnapshots(snapshot).get());
}
public void testConcurrentlyChangeRepositoryContentsInBwCMode() throws Exception {
Client client = client();
Path repo = randomRepoPath();
final String repoName = "test-repo";
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client.admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put(BlobStoreRepository.ALLOW_CONCURRENT_MODIFICATION.getKey(), true)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createIndex("test-idx-1", "test-idx-2");
logger.info("--> indexing some data");
indexRandom(true,
client().prepareIndex().setIndex("test-idx-1").setSource("foo", "bar"),
client().prepareIndex().setIndex("test-idx-2").setSource("foo", "bar"));
final String snapshot = "test-snap";
logger.info("--> creating snapshot");
CreateSnapshotResponse createSnapshotResponse = client.admin().cluster().prepareCreateSnapshot(repoName, snapshot)
.setWaitForCompletion(true).setIndices("test-idx-*").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
final Repository repository = internalCluster().getMasterNodeInstance(RepositoriesService.class).repository(repoName);
logger.info("--> move index-N blob to next generation");
final RepositoryData repositoryData = getRepositoryData(repository);
final long beforeMoveGen = repositoryData.getGenId();
Files.move(repo.resolve("index-" + beforeMoveGen), repo.resolve("index-" + (beforeMoveGen + 1)));
logger.info("--> verify index-N blob is found at the new location");
assertThat(getRepositoryData(repository).getGenId(), is(beforeMoveGen + 1));
logger.info("--> delete snapshot");
client.admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();
logger.info("--> verify index-N blob is found at the expected location");
assertThat(getRepositoryData(repository).getGenId(), is(beforeMoveGen + 2));
logger.info("--> make sure snapshot doesn't exist");
expectThrows(SnapshotMissingException.class, () -> client.admin().cluster().prepareGetSnapshots(repoName)
.addSnapshots(snapshot).get());
}
public void testFindDanglingLatestGeneration() throws Exception {
Path repo = randomRepoPath();
final String repoName = "test-repo";
logger.info("--> creating repository at {}", repo.toAbsolutePath());
assertAcked(client().admin().cluster().preparePutRepository(repoName)
.setType("fs").setSettings(Settings.builder()
.put("location", repo)
.put("compress", false)
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)));
createIndex("test-idx-1", "test-idx-2");
logger.info("--> indexing some data");
indexRandom(true,
client().prepareIndex().setIndex("test-idx-1").setSource("foo", "bar"),
client().prepareIndex().setIndex("test-idx-2").setSource("foo", "bar"));
final String snapshot = "test-snap";
logger.info("--> creating snapshot");
CreateSnapshotResponse createSnapshotResponse = client().admin().cluster().prepareCreateSnapshot(repoName, snapshot)
.setWaitForCompletion(true).setIndices("test-idx-*").get();
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
equalTo(createSnapshotResponse.getSnapshotInfo().totalShards()));
final Repository repository = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
logger.info("--> move index-N blob to next generation");
final RepositoryData repositoryData = getRepositoryData(repository);
final long beforeMoveGen = repositoryData.getGenId();
Files.move(repo.resolve("index-" + beforeMoveGen), repo.resolve("index-" + (beforeMoveGen + 1)));
logger.info("--> set next generation as pending in the cluster state");
final PlainActionFuture<Void> csUpdateFuture = PlainActionFuture.newFuture();
internalCluster().getCurrentMasterNodeInstance(ClusterService.class).submitStateUpdateTask("set pending generation",
new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.getMetaData())
.putCustom(RepositoriesMetaData.TYPE,
currentState.metaData().<RepositoriesMetaData>custom(RepositoriesMetaData.TYPE).withUpdatedGeneration(
repository.getMetadata().name(), beforeMoveGen, beforeMoveGen + 1)).build()).build();
}
@Override
public void onFailure(String source, Exception e) {
csUpdateFuture.onFailure(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
csUpdateFuture.onResponse(null);
}
}
);
csUpdateFuture.get();
logger.info("--> full cluster restart");
internalCluster().fullRestart();
ensureGreen();
Repository repositoryAfterRestart = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
logger.info("--> verify index-N blob is found at the new location");
assertThat(getRepositoryData(repositoryAfterRestart).getGenId(), is(beforeMoveGen + 1));
logger.info("--> delete snapshot");
client().admin().cluster().prepareDeleteSnapshot(repoName, snapshot).get();
logger.info("--> verify index-N blob is found at the expected location");
assertThat(getRepositoryData(repositoryAfterRestart).getGenId(), is(beforeMoveGen + 2));
logger.info("--> make sure snapshot doesn't exist");
expectThrows(SnapshotMissingException.class, () -> client().admin().cluster().prepareGetSnapshots(repoName)
.addSnapshots(snapshot).get());
}
private void assertRepositoryBlocked(Client client, String repo, String existingSnapshot) {
logger.info("--> try to delete snapshot");
final RepositoryException repositoryException3 = expectThrows(RepositoryException.class,
() -> client.admin().cluster().prepareDeleteSnapshot(repo, existingSnapshot).execute().actionGet());
assertThat(repositoryException3.getMessage(),
containsString("Could not read repository data because the contents of the repository do not match its expected state."));
logger.info("--> try to create snapshot");
final RepositoryException repositoryException4 = expectThrows(RepositoryException.class,
() -> client.admin().cluster().prepareCreateSnapshot(repo, existingSnapshot).execute().actionGet());
assertThat(repositoryException4.getMessage(),
containsString("Could not read repository data because the contents of the repository do not match its expected state."));
}
}

View File

@ -140,6 +140,8 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
try (BlobStoreRepository repository =
new MockEventuallyConsistentRepository(metaData, xContentRegistry(), clusterService, blobStoreContext, random())) {
clusterService.addStateApplier(event -> repository.updateState(event.state()));
// Apply state once to initialize repo properly like RepositoriesService would
repository.updateState(clusterService.state());
repository.start();
// We create a snap- blob for snapshot "foo" in the first generation

View File

@ -75,6 +75,7 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
@Override
public void tearDown() throws Exception {
deleteAndAssertEmpty(getRepository().basePath());
client().admin().cluster().prepareDeleteRepository("test-repo").get();
super.tearDown();
}
@ -168,8 +169,6 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT
}
public void testCleanup() throws Exception {
createRepository("test-repo");
createIndex("test-idx-1");
createIndex("test-idx-2");
createIndex("test-idx-3");

View File

@ -19,6 +19,7 @@
package org.elasticsearch.repositories.blobstore;
import org.apache.lucene.util.SameThreadExecutorService;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterChangedEvent;
@ -29,6 +30,8 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
@ -69,6 +72,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import static org.elasticsearch.test.ESTestCase.buildNewFakeTransportAddress;
import static org.elasticsearch.test.ESTestCase.randomIntBetween;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
@ -326,7 +330,11 @@ public final class BlobStoreTestUtil {
final ClusterService clusterService = mock(ClusterService.class);
final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class);
when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService);
final AtomicReference<ClusterState> currentState = new AtomicReference<>(initialState);
// Setting local node as master so it may update the repository metadata in the cluster state
final DiscoveryNode localNode = new DiscoveryNode("", buildNewFakeTransportAddress(), Version.CURRENT);
final AtomicReference<ClusterState> currentState = new AtomicReference<>(
ClusterState.builder(initialState).nodes(
DiscoveryNodes.builder().add(localNode).masterNodeId(localNode.getId()).localNodeId(localNode.getId()).build()).build());
when(clusterService.state()).then(invocationOnMock -> currentState.get());
final List<ClusterStateApplier> appliers = new CopyOnWriteArrayList<>();
doAnswer(invocation -> {

View File

@ -32,6 +32,7 @@ import org.elasticsearch.cluster.routing.RecoverySource;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.lucene.uid.Versions;
@ -353,8 +354,12 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
private Repository createRepository() {
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(),
BlobStoreTestUtil.mockClusterService(repositoryMetaData));
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(repositoryMetaData);
final Repository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), clusterService);
clusterService.addStateApplier(e -> repository.updateState(e.state()));
// Apply state once to initialize repo properly like RepositoriesService would
repository.updateState(clusterService.state());
return repository;
}
private static void runAsSnapshot(ThreadPool pool, Runnable runnable) {