Fix Race in Snapshot Abort (#54873) (#55233)

We can be a little more efficient when aborting a snapshot. Since we know the new repository
data after finalizing the aborted snapshot when can pass it down to the snapshot completion listeners.
This way, we don't have to fork off to the snapshot threadpool to get the repository data when the listener completes and can directly submit the delete task with high priority straight from the cluster state thread.
This commit is contained in:
Armin Braun 2020-04-15 15:42:15 +02:00 committed by GitHub
parent acf2acba4a
commit 2f91e2aab7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 99 additions and 98 deletions

View File

@ -33,12 +33,14 @@ import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.SecureSetting;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
@ -272,7 +274,7 @@ class S3Repository extends BlobStoreRepository {
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
Metadata clusterMetadata, Map<String, Object> userMetadata, Version repositoryMetaVersion,
ActionListener<SnapshotInfo> listener) {
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
listener = delayedListener(listener);
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.index.mapper.MapperService;
@ -83,7 +84,7 @@ public class FilterRepository implements Repository {
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, Metadata metadata, Map<String, Object> userMetadata,
Version repositoryMetaVersion, ActionListener<SnapshotInfo> listener) {
Version repositoryMetaVersion, ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
in.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, metadata, userMetadata, repositoryMetaVersion, listener);
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.ShardId;
@ -140,12 +141,13 @@ public interface Repository extends LifecycleComponent {
* @param clusterMetadata cluster metadata
* @param userMetadata user metadata
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param listener listener to be called on completion of the snapshot
* @param listener listener to be invoked with the new {@link RepositoryData} and the snapshot's {@link SnapshotInfo}
* completion of the snapshot
*/
void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, Metadata clusterMetadata, Map<String, Object> userMetadata,
Version repositoryMetaVersion, ActionListener<SnapshotInfo> listener);
Version repositoryMetaVersion, ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener);
/**
* Deletes snapshot

View File

@ -912,7 +912,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
final Metadata clusterMetadata,
final Map<String, Object> userMetadata,
Version repositoryMetaVersion,
final ActionListener<SnapshotInfo> listener) {
final ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN :
"Must finalize based on a valid repository generation but received [" + repositoryStateId + "]";
final Collection<IndexId> indices = shardGenerations.indices();
@ -930,11 +930,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
getRepositoryData(ActionListener.wrap(existingRepositoryData -> {
final RepositoryData updatedRepositoryData =
existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations);
writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, ActionListener.wrap(v -> {
writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, ActionListener.wrap(writtenRepoData -> {
if (writeShardGens) {
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
}
listener.onResponse(snapshotInfo);
listener.onResponse(new Tuple<>(writtenRepoData, snapshotInfo));
}, onUpdateFailure));
}, onUpdateFailure));
}, onUpdateFailure), 2 + indices.size());
@ -1321,7 +1321,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
* @param writeShardGens whether to write {@link ShardGenerations} to the new {@link RepositoryData} blob
* @param listener completion listener
*/
protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, boolean writeShardGens, ActionListener<Void> listener) {
protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, boolean writeShardGens,
ActionListener<RepositoryData> listener) {
assert isReadOnly() == false; // can not write to a read only repository
final long currentGen = repositoryData.getGenId();
if (currentGen != expectedGen) {
@ -1468,8 +1469,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
cacheRepositoryData(filteredRepositoryData.withGenId(newGen));
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(listener, () -> {
final RepositoryData writtenRepositoryData = filteredRepositoryData.withGenId(newGen);
cacheRepositoryData(writtenRepositoryData);
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> {
// Delete all now outdated index files up to 1000 blobs back from the new generation.
// If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.
// Deleting one older than the current expectedGen is done for BwC reasons as older versions used to keep
@ -1483,6 +1485,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("Failed to clean up old index blobs {}", oldIndexN), e);
}
return writtenRepositoryData;
}));
}
});

View File

@ -121,7 +121,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
private final ThreadPool threadPool;
private final Map<Snapshot, List<ActionListener<SnapshotInfo>>> snapshotCompletionListeners = new ConcurrentHashMap<>();
private final Map<Snapshot, List<ActionListener<Tuple<RepositoryData, SnapshotInfo>>>> snapshotCompletionListeners =
new ConcurrentHashMap<>();
// Set of snapshots that are currently being initialized by this node
private final Set<Snapshot> initializingSnapshots = Collections.synchronizedSet(new HashSet<>());
@ -150,7 +151,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
* @param listener snapshot completion listener
*/
public void executeSnapshot(final CreateSnapshotRequest request, final ActionListener<SnapshotInfo> listener) {
createSnapshot(request, ActionListener.wrap(snapshot -> addListener(snapshot, listener), listener::onFailure));
createSnapshot(request,
ActionListener.wrap(snapshot -> addListener(snapshot, ActionListener.map(listener, Tuple::v2)), listener::onFailure));
}
/**
@ -449,7 +451,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
});
}
private class CleanupAfterErrorListener implements ActionListener<SnapshotInfo> {
private class CleanupAfterErrorListener {
private final SnapshotsInProgress.Entry snapshot;
private final boolean snapshotCreated;
@ -464,12 +466,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
this.e = e;
}
@Override
public void onResponse(SnapshotInfo snapshotInfo) {
cleanupAfterError(this.e);
}
@Override
public void onFailure(@Nullable Exception e) {
if (snapshotCreated) {
cleanupAfterError(ExceptionsHelper.useOrSuppress(e, this.e));
@ -899,8 +895,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
metadataForSnapshot(entry, metadata),
entry.userMetadata(),
entry.version(),
ActionListener.wrap(snapshotInfo -> {
removeSnapshotFromClusterState(snapshot, snapshotInfo, null);
ActionListener.wrap(result -> {
final SnapshotInfo snapshotInfo = result.v2();
removeSnapshotFromClusterState(snapshot, result, null);
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
}, this::onFailure));
}
@ -926,11 +923,12 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
/**
* Removes record of running snapshot from cluster state
* @param snapshot snapshot
* @param snapshotInfo snapshot info if snapshot was successful
* @param snapshotResult new {@link RepositoryData} and {@link SnapshotInfo} info if snapshot was successful
* @param e exception if snapshot failed, {@code null} otherwise
*/
private void removeSnapshotFromClusterState(final Snapshot snapshot, final SnapshotInfo snapshotInfo, @Nullable Exception e) {
removeSnapshotFromClusterState(snapshot, snapshotInfo, e, null);
private void removeSnapshotFromClusterState(final Snapshot snapshot, @Nullable Tuple<RepositoryData, SnapshotInfo> snapshotResult,
@Nullable Exception e) {
removeSnapshotFromClusterState(snapshot, snapshotResult, e, null);
}
/**
@ -939,9 +937,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
* @param failure exception if snapshot failed, {@code null} otherwise
* @param listener listener to notify when snapshot information is removed from the cluster state
*/
private void removeSnapshotFromClusterState(final Snapshot snapshot, @Nullable SnapshotInfo snapshotInfo, @Nullable Exception failure,
@Nullable CleanupAfterErrorListener listener) {
assert snapshotInfo != null || failure != null : "Either snapshotInfo or failure must be supplied";
private void removeSnapshotFromClusterState(final Snapshot snapshot, @Nullable Tuple<RepositoryData, SnapshotInfo> snapshotResult,
@Nullable Exception failure, @Nullable CleanupAfterErrorListener listener) {
assert snapshotResult != null || failure != null : "Either snapshotInfo or failure must be supplied";
clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() {
@Override
@ -986,13 +984,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (snapshotInfo == null) {
if (snapshotResult == null) {
failSnapshotCompletionListeners(snapshot, failure);
} else {
final List<ActionListener<SnapshotInfo>> completionListeners = snapshotCompletionListeners.remove(snapshot);
final List<ActionListener<Tuple<RepositoryData, SnapshotInfo>>> completionListeners =
snapshotCompletionListeners.remove(snapshot);
if (completionListeners != null) {
try {
ActionListener.onResponse(completionListeners, snapshotInfo);
ActionListener.onResponse(completionListeners, snapshotResult);
} catch (Exception e) {
logger.warn("Failed to notify listeners", e);
}
@ -1007,7 +1006,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
}
private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) {
final List<ActionListener<SnapshotInfo>> completionListeners = snapshotCompletionListeners.remove(snapshot);
final List<ActionListener<Tuple<RepositoryData, SnapshotInfo>>> completionListeners = snapshotCompletionListeners.remove(snapshot);
if (completionListeners != null) {
try {
ActionListener.onFailure(completionListeners, e);
@ -1108,14 +1107,30 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (runningSnapshot == null) {
tryDeleteExisting(Priority.NORMAL);
threadPool.generic().execute(ActionRunnable.wrap(listener, l ->
repositoriesService.repository(repositoryName).getRepositoryData(
ActionListener.wrap(repositoryData -> {
Optional<SnapshotId> matchedEntry = repositoryData.getSnapshotIds()
.stream()
.filter(s -> s.getName().equals(snapshotName))
.findFirst();
// If we can't find the snapshot by the given name in the repository at all or if the snapshot we find in
// the repository is not the one we expected to find when waiting for a finishing snapshot we fail.
if (matchedEntry.isPresent()) {
deleteCompletedSnapshot(
new Snapshot(repositoryName, matchedEntry.get()), repositoryData.getGenId(), Priority.NORMAL, l);
} else {
l.onFailure(new SnapshotMissingException(repositoryName, snapshotName));
}
}, l::onFailure))));
return;
}
logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish");
addListener(runningSnapshot, ActionListener.wrap(
snapshotInfo -> {
result -> {
logger.debug("deleted snapshot completed - deleting files");
tryDeleteExisting(Priority.IMMEDIATE);
deleteCompletedSnapshot(
new Snapshot(repositoryName, result.v2().snapshotId()), result.v1().getGenId(), Priority.IMMEDIATE, listener);
},
e -> {
if (abortedDuringInit) {
@ -1136,33 +1151,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
}
));
}
private void tryDeleteExisting(Priority priority) {
threadPool.generic().execute(ActionRunnable.wrap(listener, l ->
repositoriesService.repository(repositoryName).getRepositoryData(ActionListener.wrap(repositoryData -> {
Optional<SnapshotId> matchedEntry = repositoryData.getSnapshotIds()
.stream()
.filter(s -> s.getName().equals(snapshotName))
.findFirst();
// If we can't find the snapshot by the given name in the repository at all or if the snapshot we find in the
// repository is not the one we expected to find when waiting for a finishing snapshot we fail.
// Note: Not finding a snapshot we expected to find is practically impossible as it would imply that the snapshot
// we waited for was concurrently deleted and another snapshot by the same name concurrently created
// during the context switch from the cluster state thread to the snapshot thread. We still guard against the
// possibility as a safety measure.
if (matchedEntry.isPresent() == false
|| (runningSnapshot != null && matchedEntry.get().equals(runningSnapshot.getSnapshotId()) == false)) {
if (runningSnapshot != null && matchedEntry.isPresent()) {
logger.warn("Waited for snapshot [{}}] but found snapshot [{}] in repository [{}]",
runningSnapshot.getSnapshotId(), matchedEntry.get(), repositoryName);
}
l.onFailure(new SnapshotMissingException(repositoryName, snapshotName));
} else {
deleteCompletedSnapshot(
new Snapshot(repositoryName, matchedEntry.get()), repositoryData.getGenId(), priority, l);
}
}, l::onFailure))));
}
});
}
@ -1470,7 +1458,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
* @param snapshot Snapshot to listen for
* @param listener listener
*/
private void addListener(Snapshot snapshot, ActionListener<SnapshotInfo> listener) {
private void addListener(Snapshot snapshot, ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
snapshotCompletionListeners.computeIfAbsent(snapshot, k -> new CopyOnWriteArrayList<>()).add(listener);
}

View File

@ -31,6 +31,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.settings.Settings;
@ -164,7 +165,7 @@ public class RepositoriesServiceTests extends ESTestCase {
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations indices, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, Metadata metadata, Map<String, Object> userMetadata,
Version repositoryMetaVersion, ActionListener<SnapshotInfo> listener) {
Version repositoryMetaVersion, ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
listener.onResponse(null);
}

View File

@ -30,6 +30,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
@ -45,6 +46,7 @@ import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.store.StoreFileMetadata;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.Snapshot;
@ -146,7 +148,7 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {
}
}
public void testSnapshotWithConflictingName() throws IOException {
public void testSnapshotWithConflictingName() throws Exception {
final IndexId indexId = new IndexId(randomAlphaOfLength(10), UUIDs.randomBase64UUID());
final ShardId shardId = new ShardId(indexId.getName(), indexId.getId(), 0);
@ -170,13 +172,12 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {
assertNotNull(shardGen);
final Snapshot snapshotWithSameName = new Snapshot(repository.getMetadata().name(), new SnapshotId(
snapshot.getSnapshotId().getName(), "_uuid2"));
final PlainActionFuture<SnapshotInfo> future = PlainActionFuture.newFuture();
repository.finalizeSnapshot(snapshot.getSnapshotId(),
ShardGenerations.builder().put(indexId, 0, shardGen).build(),
0L, null, 1, Collections.emptyList(), -1L, false,
Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(), Collections.emptyMap(), Version.CURRENT,
future);
future.actionGet();
PlainActionFuture.<Tuple<RepositoryData, SnapshotInfo>, Exception>get(f ->
repository.finalizeSnapshot(snapshot.getSnapshotId(),
ShardGenerations.builder().put(indexId, 0, shardGen).build(),
0L, null, 1, Collections.emptyList(), -1L, false,
Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(), Collections.emptyMap(),
Version.CURRENT, f));
IndexShardSnapshotFailedException isfe = expectThrows(IndexShardSnapshotFailedException.class,
() -> snapshotShard(shard, snapshotWithSameName, repository));
assertThat(isfe.getMessage(), containsString("Duplicate snapshot name"));

View File

@ -197,7 +197,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
// write to index generational file
RepositoryData repositoryData = generateRandomRepoData();
final long startingGeneration = repositoryData.getGenId();
final PlainActionFuture<Void> future1 = PlainActionFuture.newFuture();
final PlainActionFuture<RepositoryData> future1 = PlainActionFuture.newFuture();
repository.writeIndexGen(repositoryData, startingGeneration, true, future1);
// write repo data again to index generational file, errors because we already wrote to the
@ -238,10 +238,8 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
" See the breaking changes documentation for the next major version.");
}
private static void writeIndexGen(BlobStoreRepository repository, RepositoryData repositoryData, long generation) {
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
repository.writeIndexGen(repositoryData, generation, true, future);
future.actionGet();
private static void writeIndexGen(BlobStoreRepository repository, RepositoryData repositoryData, long generation) throws Exception {
PlainActionFuture.<RepositoryData, Exception>get(f -> repository.writeIndexGen(repositoryData, generation, true, f));
}
private BlobStoreRepository setupRepo() {

View File

@ -23,6 +23,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
@ -33,6 +34,7 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.test.ESIntegTestCase;
@ -87,7 +89,8 @@ public class RepositoryFilterUserMetadataIT extends ESIntegTestCase {
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, Metadata clusterMetadata, Map<String, Object> userMetadata,
Version repositoryMetaVersion, ActionListener<SnapshotInfo> listener) {
Version repositoryMetaVersion,
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
assertThat(userMetadata, is(Collections.singletonMap(MOCK_FILTERED_META, initialMetaValue)));
super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures,
repositoryStateId, includeGlobalState, clusterMetadata, userMetadata, repositoryMetaVersion, listener);

View File

@ -25,7 +25,9 @@ import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
@ -134,7 +136,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
}
}
public void testOverwriteSnapshotInfoBlob() {
public void testOverwriteSnapshotInfoBlob() throws Exception {
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
final RepositoryMetadata metadata = new RepositoryMetadata("testRepo", "mockEventuallyConsistent", Settings.EMPTY);
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(metadata);
@ -146,30 +148,24 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
repository.start();
// We create a snap- blob for snapshot "foo" in the first generation
final PlainActionFuture<SnapshotInfo> future = PlainActionFuture.newFuture();
final SnapshotId snapshotId = new SnapshotId("foo", UUIDs.randomBase64UUID());
// We try to write another snap- blob for "foo" in the next generation. It fails because the content differs.
repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(),
-1L, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, future);
future.actionGet();
PlainActionFuture.<Tuple<RepositoryData, SnapshotInfo>, Exception>get(f ->
// We try to write another snap- blob for "foo" in the next generation. It fails because the content differs.
repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(),
-1L, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, f));
// We try to write another snap- blob for "foo" in the next generation. It fails because the content differs.
final AssertionError assertionError = expectThrows(AssertionError.class,
() -> {
final PlainActionFuture<SnapshotInfo> fut = PlainActionFuture.newFuture();
repository.finalizeSnapshot(
snapshotId, ShardGenerations.EMPTY, 1L, null, 6, Collections.emptyList(),
0, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, fut);
fut.actionGet();
});
() -> PlainActionFuture.<Tuple<RepositoryData, SnapshotInfo>, Exception>get(f ->
repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 6, Collections.emptyList(),
0, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, f)));
assertThat(assertionError.getMessage(), equalTo("\nExpected: <6>\n but: was <5>"));
// We try to write yet another snap- blob for "foo" in the next generation.
// It passes cleanly because the content of the blob except for the timestamps.
final PlainActionFuture<SnapshotInfo> future2 = PlainActionFuture.newFuture();
repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(),
0, false, Metadata.EMPTY_METADATA, Collections.emptyMap(),Version.CURRENT, future2);
future2.actionGet();
PlainActionFuture.<Tuple<RepositoryData, SnapshotInfo>, Exception>get(f ->
repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(),
0, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, f));
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
@ -101,7 +102,7 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, Metadata metadata, Map<String, Object> userMetadata,
Version repositoryMetaVersion, ActionListener<SnapshotInfo> listener) {
Version repositoryMetaVersion, ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
listener.onResponse(null);
}

View File

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.metrics.CounterMetric;
@ -261,7 +262,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
Metadata metadata, Map<String, Object> userMetadata, Version repositoryMetaVersion,
ActionListener<SnapshotInfo> listener) {
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

View File

@ -23,6 +23,7 @@ import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
@ -37,6 +38,7 @@ import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.repositories.FilterRepository;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.ShardGenerations;
import java.io.Closeable;
@ -96,7 +98,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, Metadata metadata, Map<String, Object> userMetadata,
Version repositoryMetaVersion, ActionListener<SnapshotInfo> listener) {
Version repositoryMetaVersion, ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
// we process the index metadata at snapshot time. This means if somebody tries to restore
// a _source only snapshot with a plain repository it will be just fine since we already set the
// required engine, that the index is read-only and the mapping to a default mapping

View File

@ -35,6 +35,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentHelper;
@ -60,6 +61,7 @@ import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil;
import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase;
@ -214,7 +216,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(),
null, indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future);
future.actionGet();
final PlainActionFuture<SnapshotInfo> finFuture = PlainActionFuture.newFuture();
final PlainActionFuture<Tuple<RepositoryData, SnapshotInfo>> finFuture = PlainActionFuture.newFuture();
repository.finalizeSnapshot(snapshotId,
ShardGenerations.builder().put(indexId, 0, indexShardSnapshotStatus.generation()).build(),
indexShardSnapshotStatus.asCopy().getStartTime(), null, 1, Collections.emptyList(),