Remove Redundant CS Update on Snapshot Finalization (#55276) (#55528)

This change folds the removal of the in-progress snapshot entry
into setting the safe repository generation. Outside of removing
an unnecessary cluster state update, this also has the advantage
of removing a somewhat inconsistent cluster state where the safe
repository generation points at `RepositoryData` that contains a
finished snapshot while it is still in-progress in the cluster
state, making it easier to reason about the state machine of
upcoming concurrent snapshot operations.
This commit is contained in:
Armin Braun 2020-04-21 15:33:17 +02:00 committed by GitHub
parent 402b6b1715
commit db7eb8e8ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 106 additions and 128 deletions

View File

@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.cluster.service.ClusterService;
@ -274,12 +275,13 @@ class S3Repository extends BlobStoreRepository {
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
Metadata clusterMetadata, Map<String, Object> userMetadata, Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
listener = delayedListener(listener);
}
super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, clusterMetadata, userMetadata, repositoryMetaVersion, listener);
includeGlobalState, clusterMetadata, userMetadata, repositoryMetaVersion, stateTransformer, listener);
}
@Override

View File

@ -41,6 +41,7 @@ import org.elasticsearch.snapshots.SnapshotShardFailure;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
public class FilterRepository implements Repository {
@ -84,9 +85,10 @@ public class FilterRepository implements Repository {
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, Metadata metadata, Map<String, Object> userMetadata,
Version repositoryMetaVersion, ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
Version repositoryMetaVersion, Function<ClusterState, ClusterState> stateTransformer,
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
in.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, metadata, userMetadata, repositoryMetaVersion, listener);
includeGlobalState, metadata, userMetadata, repositoryMetaVersion, stateTransformer, listener);
}
@Override

View File

@ -141,13 +141,16 @@ public interface Repository extends LifecycleComponent {
* @param clusterMetadata cluster metadata
* @param userMetadata user metadata
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param stateTransformer a function that filters the last cluster state update that the snapshot finalization will execute and
* is used to remove any state tracked for the in-progress snapshot from the cluster state
* @param listener listener to be invoked with the new {@link RepositoryData} and the snapshot's {@link SnapshotInfo}
* completion of the snapshot
*/
void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, Metadata clusterMetadata, Map<String, Object> userMetadata,
Version repositoryMetaVersion, ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener);
Version repositoryMetaVersion, Function<ClusterState, ClusterState> stateTransformer,
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener);
/**
* Deletes snapshot

View File

@ -137,6 +137,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import java.util.stream.Stream;
@ -607,7 +608,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration);
}
final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, builder.build());
writeIndexGen(updatedRepoData, repositoryStateId, true,
writeIndexGen(updatedRepoData, repositoryStateId, true, Function.identity(),
ActionListener.wrap(v -> writeUpdatedRepoDataStep.onResponse(updatedRepoData), listener::onFailure));
}, listener::onFailure);
// Once we have updated the repository, run the clean-ups
@ -621,7 +622,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} else {
// Write the new repository data first (with the removed snapshot), using no shard generations
final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, ShardGenerations.EMPTY);
writeIndexGen(updatedRepoData, repositoryStateId, false, ActionListener.wrap(v -> {
writeIndexGen(updatedRepoData, repositoryStateId, false, Function.identity(), ActionListener.wrap(v -> {
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
final ActionListener<Void> afterCleanupsListener =
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
@ -813,7 +814,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
} else {
// write new index-N blob to ensure concurrent operations will fail
writeIndexGen(repositoryData, repositoryStateId, SnapshotsService.useShardGenerations(repositoryMetaVersion),
ActionListener.wrap(v -> cleanupStaleBlobs(foundIndices, rootBlobs, repositoryData,
Function.identity(), ActionListener.wrap(v -> cleanupStaleBlobs(foundIndices, rootBlobs, repositoryData,
ActionListener.map(listener, RepositoryCleanupResult::new)), listener::onFailure));
}
} catch (Exception e) {
@ -912,6 +913,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
final Metadata clusterMetadata,
final Map<String, Object> userMetadata,
Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
final ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN :
"Must finalize based on a valid repository generation but received [" + repositoryStateId + "]";
@ -930,7 +932,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
getRepositoryData(ActionListener.wrap(existingRepositoryData -> {
final RepositoryData updatedRepositoryData =
existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations);
writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, ActionListener.wrap(writtenRepoData -> {
writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, stateTransformer,
ActionListener.wrap(writtenRepoData -> {
if (writeShardGens) {
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
}
@ -1319,10 +1322,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
* @param repositoryData RepositoryData to write
* @param expectedGen expected repository generation at the start of the operation
* @param writeShardGens whether to write {@link ShardGenerations} to the new {@link RepositoryData} blob
* @param stateFilter filter for the last cluster state update executed by this method
* @param listener completion listener
*/
protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, boolean writeShardGens,
ActionListener<RepositoryData> listener) {
Function<ClusterState, ClusterState> stateFilter, ActionListener<RepositoryData> listener) {
assert isReadOnly() == false; // can not write to a read only repository
final long currentGen = repositoryData.getGenId();
if (currentGen != expectedGen) {
@ -1455,10 +1459,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
"Tried to update from unexpected pending repo generation [" + meta.pendingGeneration() +
"] after write to generation [" + newGen + "]");
}
return ClusterState.builder(currentState).metadata(Metadata.builder(currentState.getMetadata())
return stateFilter.apply(ClusterState.builder(currentState).metadata(Metadata.builder(currentState.getMetadata())
.putCustom(RepositoriesMetadata.TYPE,
currentState.metadata().<RepositoriesMetadata>custom(RepositoriesMetadata.TYPE).withUpdatedGeneration(
metadata.name(), newGen, newGen)).build()).build();
metadata.name(), newGen, newGen))).build());
}
@Override

View File

@ -315,8 +315,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
final ActionListener<Snapshot> userCreateSnapshotListener) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
boolean snapshotCreated;
boolean hadAbortedInitializations;
@Override
@ -341,7 +339,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
repository.initializeSnapshot(
snapshot.snapshot().getSnapshotId(), snapshot.indices(),
metadataForSnapshot(snapshot, clusterState.metadata()));
snapshotCreated = true;
}
logger.info("snapshot [{}] started", snapshot.snapshot());
@ -412,8 +409,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
public void onFailure(String source, Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to create snapshot",
snapshot.snapshot().getSnapshotId()), e);
removeSnapshotFromClusterState(snapshot.snapshot(), null, e,
new CleanupAfterErrorListener(snapshot, snapshotCreated, userCreateSnapshotListener, e));
removeSnapshotFromClusterState(snapshot.snapshot(), e,
new CleanupAfterErrorListener(userCreateSnapshotListener, e));
}
@Override
@ -449,62 +446,29 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
public void onFailure(Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to create snapshot [{}]",
snapshot.snapshot().getSnapshotId()), e);
removeSnapshotFromClusterState(snapshot.snapshot(), null, e,
new CleanupAfterErrorListener(snapshot, snapshotCreated, userCreateSnapshotListener, e));
removeSnapshotFromClusterState(snapshot.snapshot(), e,
new CleanupAfterErrorListener(userCreateSnapshotListener, e));
}
});
}
private class CleanupAfterErrorListener {
private static class CleanupAfterErrorListener {
private final SnapshotsInProgress.Entry snapshot;
private final boolean snapshotCreated;
private final ActionListener<Snapshot> userCreateSnapshotListener;
private final Exception e;
CleanupAfterErrorListener(SnapshotsInProgress.Entry snapshot, boolean snapshotCreated,
ActionListener<Snapshot> userCreateSnapshotListener, Exception e) {
this.snapshot = snapshot;
this.snapshotCreated = snapshotCreated;
CleanupAfterErrorListener(ActionListener<Snapshot> userCreateSnapshotListener, Exception e) {
this.userCreateSnapshotListener = userCreateSnapshotListener;
this.e = e;
}
public void onFailure(@Nullable Exception e) {
if (snapshotCreated) {
cleanupAfterError(ExceptionsHelper.useOrSuppress(e, this.e));
} else {
userCreateSnapshotListener.onFailure(ExceptionsHelper.useOrSuppress(e, this.e));
}
}
public void onNoLongerMaster() {
userCreateSnapshotListener.onFailure(e);
}
private void cleanupAfterError(Exception exception) {
threadPool.generic().execute(() -> {
final Metadata metadata = clusterService.state().metadata();
repositoriesService.repository(snapshot.snapshot().getRepository())
.finalizeSnapshot(snapshot.snapshot().getSnapshotId(),
buildGenerations(snapshot, metadata),
snapshot.startTime(),
ExceptionsHelper.stackTrace(exception),
0,
Collections.emptyList(),
snapshot.repositoryStateId(),
snapshot.includeGlobalState(),
metadataForSnapshot(snapshot, metadata),
snapshot.userMetadata(),
snapshot.version(),
ActionListener.runAfter(ActionListener.wrap(ignored -> {
}, inner -> {
inner.addSuppressed(exception);
logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot in repository",
snapshot.snapshot()), inner);
}), () -> userCreateSnapshotListener.onFailure(e)));
});
}
}
private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, Metadata metadata) {
@ -876,8 +840,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
final Snapshot snapshot = entry.snapshot();
if (entry.repositoryStateId() == RepositoryData.UNKNOWN_REPO_GEN) {
logger.debug("[{}] was aborted before starting", snapshot);
removeSnapshotFromClusterState(entry.snapshot(), null,
new SnapshotException(snapshot, "Aborted on initialization"));
removeSnapshotFromClusterState(entry.snapshot(), new SnapshotException(snapshot, "Aborted on initialization"), null);
return;
}
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
@ -912,10 +875,19 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
metadataForSnapshot(entry, metadata),
entry.userMetadata(),
entry.version(),
state -> stateWithoutSnapshot(state, snapshot),
ActionListener.wrap(result -> {
final SnapshotInfo snapshotInfo = result.v2();
removeSnapshotFromClusterState(snapshot, result, null);
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
final List<ActionListener<Tuple<RepositoryData, SnapshotInfo>>> completionListeners =
snapshotCompletionListeners.remove(snapshot);
if (completionListeners != null) {
try {
ActionListener.onResponse(completionListeners, result);
} catch (Exception e) {
logger.warn("Failed to notify listeners", e);
}
}
endingSnapshots.remove(snapshot);
logger.info("snapshot [{}] completed with state [{}]", snapshot, result.v2().state());
}, this::onFailure));
}
@ -931,37 +903,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
new SnapshotException(snapshot, "Failed to update cluster state during snapshot finalization", e));
} else {
logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot", snapshot), e);
removeSnapshotFromClusterState(snapshot, null, e);
removeSnapshotFromClusterState(snapshot, e, null);
}
}
});
}
/**
* Removes record of running snapshot from cluster state
* @param snapshot snapshot
* @param snapshotResult new {@link RepositoryData} and {@link SnapshotInfo} info if snapshot was successful
* @param e exception if snapshot failed, {@code null} otherwise
*/
private void removeSnapshotFromClusterState(final Snapshot snapshot, @Nullable Tuple<RepositoryData, SnapshotInfo> snapshotResult,
@Nullable Exception e) {
removeSnapshotFromClusterState(snapshot, snapshotResult, e, null);
}
/**
* Removes record of running snapshot from cluster state and notifies the listener when this action is complete
* @param snapshot snapshot
* @param failure exception if snapshot failed, {@code null} otherwise
* @param listener listener to notify when snapshot information is removed from the cluster state
*/
private void removeSnapshotFromClusterState(final Snapshot snapshot, @Nullable Tuple<RepositoryData, SnapshotInfo> snapshotResult,
@Nullable Exception failure, @Nullable CleanupAfterErrorListener listener) {
assert snapshotResult != null || failure != null : "Either snapshotInfo or failure must be supplied";
clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
private static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot) {
SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE);
if (snapshots != null) {
boolean changed = false;
ArrayList<SnapshotsInProgress.Entry> entries = new ArrayList<>();
@ -973,11 +922,27 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
}
}
if (changed) {
return ClusterState.builder(currentState)
.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))).build();
return ClusterState.builder(state).putCustom(
SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))).build();
}
}
return currentState;
return state;
}
/**
* Removes record of running snapshot from cluster state and notifies the listener when this action is complete
* @param snapshot snapshot
* @param failure exception if snapshot failed
* @param listener listener to notify when snapshot information is removed from the cluster state
*/
private void removeSnapshotFromClusterState(final Snapshot snapshot, Exception failure,
@Nullable CleanupAfterErrorListener listener) {
assert failure != null : "Failure must be supplied";
clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return stateWithoutSnapshot(currentState, snapshot);
}
@Override
@ -1001,20 +966,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (snapshotResult == null) {
failSnapshotCompletionListeners(snapshot, failure);
} else {
final List<ActionListener<Tuple<RepositoryData, SnapshotInfo>>> completionListeners =
snapshotCompletionListeners.remove(snapshot);
if (completionListeners != null) {
try {
ActionListener.onResponse(completionListeners, snapshotResult);
} catch (Exception e) {
logger.warn("Failed to notify listeners", e);
}
}
endingSnapshots.remove(snapshot);
}
if (listener != null) {
listener.onFailure(null);
}

View File

@ -189,8 +189,7 @@ public class SnapshotDisruptionIT extends ESIntegTestCase {
final RepositoriesMetadata repoMeta =
event.state().metadata().custom(RepositoriesMetadata.TYPE);
final RepositoryMetadata metadata = repoMeta.repository("test-repo");
if (metadata.generation() == metadata.pendingGeneration()
&& metadata.generation() > snapshotEntry.repositoryStateId()) {
if (metadata.pendingGeneration() > snapshotEntry.repositoryStateId()) {
logger.info("--> starting disruption");
networkDisruption.startDisrupting();
clusterService.removeListener(this);
@ -234,7 +233,8 @@ public class SnapshotDisruptionIT extends ESIntegTestCase {
final SnapshotException sne = (SnapshotException) ExceptionsHelper.unwrap(ex, SnapshotException.class);
assertNotNull(sne);
assertThat(
sne.getMessage(), either(endsWith(" Failed to remove snapshot from cluster state")).or(endsWith(" no longer master")));
sne.getMessage(), either(endsWith(" Failed to update cluster state during snapshot finalization"))
.or(endsWith(" no longer master")));
assertThat(sne.getSnapshotName(), is(snapshot));
}

View File

@ -52,6 +52,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import static org.mockito.Mockito.mock;
@ -165,7 +166,8 @@ public class RepositoriesServiceTests extends ESTestCase {
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations indices, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, Metadata metadata, Map<String, Object> userMetadata,
Version repositoryMetaVersion, ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
Version repositoryMetaVersion, Function<ClusterState, ClusterState> stateTransformer,
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
listener.onResponse(null);
}

View File

@ -59,6 +59,7 @@ import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import static org.hamcrest.Matchers.containsString;
@ -177,7 +178,7 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {
ShardGenerations.builder().put(indexId, 0, shardGen).build(),
0L, null, 1, Collections.emptyList(), -1L, false,
Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(), Collections.emptyMap(),
Version.CURRENT, f));
Version.CURRENT, Function.identity(), f));
IndexShardSnapshotFailedException isfe = expectThrows(IndexShardSnapshotFailedException.class,
() -> snapshotShard(shard, snapshotWithSameName, repository));
assertThat(isfe.getMessage(), containsString("Duplicate snapshot name"));

View File

@ -51,6 +51,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.elasticsearch.repositories.RepositoryDataTests.generateRandomRepoData;
@ -198,7 +199,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
RepositoryData repositoryData = generateRandomRepoData();
final long startingGeneration = repositoryData.getGenId();
final PlainActionFuture<RepositoryData> future1 = PlainActionFuture.newFuture();
repository.writeIndexGen(repositoryData, startingGeneration, true, future1);
repository.writeIndexGen(repositoryData, startingGeneration, true, Function.identity(),future1);
// write repo data again to index generational file, errors because we already wrote to the
// N+1 generation from which this repository data instance was created
@ -239,7 +240,8 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
}
private static void writeIndexGen(BlobStoreRepository repository, RepositoryData repositoryData, long generation) throws Exception {
PlainActionFuture.<RepositoryData, Exception>get(f -> repository.writeIndexGen(repositoryData, generation, true, f));
PlainActionFuture.<RepositoryData, Exception>get(f -> repository.writeIndexGen(repositoryData, generation, true,
Function.identity(), f));
}
private BlobStoreRepository setupRepo() {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.snapshots;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.Tuple;
@ -43,6 +44,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.is;
@ -89,11 +91,12 @@ public class RepositoryFilterUserMetadataIT extends ESIntegTestCase {
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, Metadata clusterMetadata, Map<String, Object> userMetadata,
Version repositoryMetaVersion,
Version repositoryMetaVersion, Function<ClusterState, ClusterState> stateTransformer,
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
assertThat(userMetadata, is(Collections.singletonMap(MOCK_FILTERED_META, initialMetaValue)));
super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures,
repositoryStateId, includeGlobalState, clusterMetadata, userMetadata, repositoryMetaVersion, listener);
repositoryStateId, includeGlobalState, clusterMetadata, userMetadata, repositoryMetaVersion, stateTransformer,
listener);
}
@Override

View File

@ -41,6 +41,7 @@ import java.io.InputStream;
import java.nio.file.NoSuchFileException;
import java.util.Arrays;
import java.util.Collections;
import java.util.function.Function;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.startsWith;
@ -152,20 +153,20 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
PlainActionFuture.<Tuple<RepositoryData, SnapshotInfo>, Exception>get(f ->
// We try to write another snap- blob for "foo" in the next generation. It fails because the content differs.
repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(),
-1L, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, f));
-1L, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, Function.identity(), f));
// We try to write another snap- blob for "foo" in the next generation. It fails because the content differs.
final AssertionError assertionError = expectThrows(AssertionError.class,
() -> PlainActionFuture.<Tuple<RepositoryData, SnapshotInfo>, Exception>get(f ->
repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 6, Collections.emptyList(),
0, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, f)));
0, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, Function.identity(), f)));
assertThat(assertionError.getMessage(), equalTo("\nExpected: <6>\n but: was <5>"));
// We try to write yet another snap- blob for "foo" in the next generation.
// It passes cleanly because the content of the blob except for the timestamps.
PlainActionFuture.<Tuple<RepositoryData, SnapshotInfo>, Exception>get(f ->
repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(),
0, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, f));
0, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, Function.identity(), f));
}
}

View File

@ -43,6 +43,7 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import static java.util.Collections.emptyList;
import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
@ -102,7 +103,8 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, Metadata metadata, Map<String, Object> userMetadata,
Version repositoryMetaVersion, ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
Version repositoryMetaVersion, Function<ClusterState, ClusterState> stateTransformer,
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
listener.onResponse(null);
}

View File

@ -92,6 +92,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.LongConsumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -262,6 +263,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
Metadata metadata, Map<String, Object> userMetadata, Version repositoryMetaVersion,
Function<ClusterState, ClusterState> stateTransformer,
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

View File

@ -17,6 +17,7 @@ import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.SimpleFSDirectory;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
@ -98,14 +99,15 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, Metadata metadata, Map<String, Object> userMetadata,
Version repositoryMetaVersion, ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
Version repositoryMetaVersion, Function<ClusterState, ClusterState> stateTransformer,
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
// we process the index metadata at snapshot time. This means if somebody tries to restore
// a _source only snapshot with a plain repository it will be just fine since we already set the
// required engine, that the index is read-only and the mapping to a default mapping
try {
super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, metadataToSnapshot(shardGenerations.indices(), metadata), userMetadata, repositoryMetaVersion,
listener);
stateTransformer, listener);
} catch (IOException ex) {
listener.onFailure(ex);
}

View File

@ -74,6 +74,7 @@ import java.nio.file.Path;
import java.util.Collections;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
@ -222,8 +223,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
indexShardSnapshotStatus.asCopy().getStartTime(), null, 1, Collections.emptyList(),
ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getGenId(), true,
Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(), Collections.emptyMap(),
Version.CURRENT,
finFuture);
Version.CURRENT, Function.identity(), finFuture);
finFuture.actionGet();
});
IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();