diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index d3a868ba7ba..cff2d6037b0 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.service.ClusterService; @@ -274,12 +275,13 @@ class S3Repository extends BlobStoreRepository { public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, Metadata clusterMetadata, Map userMetadata, Version repositoryMetaVersion, + Function stateTransformer, ActionListener> listener) { if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) { listener = delayedListener(listener); } super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId, - includeGlobalState, clusterMetadata, userMetadata, repositoryMetaVersion, listener); + includeGlobalState, clusterMetadata, userMetadata, repositoryMetaVersion, stateTransformer, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 64669702029..a418ac49910 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -41,6 +41,7 @@ import org.elasticsearch.snapshots.SnapshotShardFailure; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.function.Function; public class FilterRepository implements Repository { @@ -84,9 +85,10 @@ public class FilterRepository implements Repository { public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, Metadata metadata, Map userMetadata, - Version repositoryMetaVersion, ActionListener> listener) { + Version repositoryMetaVersion, Function stateTransformer, + ActionListener> listener) { in.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId, - includeGlobalState, metadata, userMetadata, repositoryMetaVersion, listener); + includeGlobalState, metadata, userMetadata, repositoryMetaVersion, stateTransformer, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 86a358d88a2..15297d4f699 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -141,13 +141,16 @@ public interface Repository extends LifecycleComponent { * @param clusterMetadata cluster metadata * @param userMetadata user metadata * @param repositoryMetaVersion version of the updated repository metadata to write + * @param stateTransformer a function that filters the last cluster state update that the snapshot finalization will execute and + * is used to remove any state tracked for the in-progress snapshot from the cluster state * @param listener listener to be invoked with the new {@link RepositoryData} and the snapshot's {@link SnapshotInfo} * completion of the snapshot */ void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, Metadata clusterMetadata, Map userMetadata, - Version repositoryMetaVersion, ActionListener> listener); + Version repositoryMetaVersion, Function stateTransformer, + ActionListener> listener); /** * Deletes snapshot diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 3af82cf3c76..adaddfe0fda 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -137,6 +137,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.LongStream; import java.util.stream.Stream; @@ -607,7 +608,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration); } final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, builder.build()); - writeIndexGen(updatedRepoData, repositoryStateId, true, + writeIndexGen(updatedRepoData, repositoryStateId, true, Function.identity(), ActionListener.wrap(v -> writeUpdatedRepoDataStep.onResponse(updatedRepoData), listener::onFailure)); }, listener::onFailure); // Once we have updated the repository, run the clean-ups @@ -621,7 +622,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } else { // Write the new repository data first (with the removed snapshot), using no shard generations final RepositoryData updatedRepoData = repositoryData.removeSnapshot(snapshotId, ShardGenerations.EMPTY); - writeIndexGen(updatedRepoData, repositoryStateId, false, ActionListener.wrap(v -> { + writeIndexGen(updatedRepoData, repositoryStateId, false, Function.identity(), ActionListener.wrap(v -> { // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion final ActionListener afterCleanupsListener = new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2); @@ -813,7 +814,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } else { // write new index-N blob to ensure concurrent operations will fail writeIndexGen(repositoryData, repositoryStateId, SnapshotsService.useShardGenerations(repositoryMetaVersion), - ActionListener.wrap(v -> cleanupStaleBlobs(foundIndices, rootBlobs, repositoryData, + Function.identity(), ActionListener.wrap(v -> cleanupStaleBlobs(foundIndices, rootBlobs, repositoryData, ActionListener.map(listener, RepositoryCleanupResult::new)), listener::onFailure)); } } catch (Exception e) { @@ -912,6 +913,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp final Metadata clusterMetadata, final Map userMetadata, Version repositoryMetaVersion, + Function stateTransformer, final ActionListener> listener) { assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN : "Must finalize based on a valid repository generation but received [" + repositoryStateId + "]"; @@ -930,12 +932,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp getRepositoryData(ActionListener.wrap(existingRepositoryData -> { final RepositoryData updatedRepositoryData = existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations); - writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, ActionListener.wrap(writtenRepoData -> { - if (writeShardGens) { - cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); - } - listener.onResponse(new Tuple<>(writtenRepoData, snapshotInfo)); - }, onUpdateFailure)); + writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, stateTransformer, + ActionListener.wrap(writtenRepoData -> { + if (writeShardGens) { + cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); + } + listener.onResponse(new Tuple<>(writtenRepoData, snapshotInfo)); + }, onUpdateFailure)); }, onUpdateFailure)); }, onUpdateFailure), 2 + indices.size()); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); @@ -1319,10 +1322,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp * @param repositoryData RepositoryData to write * @param expectedGen expected repository generation at the start of the operation * @param writeShardGens whether to write {@link ShardGenerations} to the new {@link RepositoryData} blob + * @param stateFilter filter for the last cluster state update executed by this method * @param listener completion listener */ protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, boolean writeShardGens, - ActionListener listener) { + Function stateFilter, ActionListener listener) { assert isReadOnly() == false; // can not write to a read only repository final long currentGen = repositoryData.getGenId(); if (currentGen != expectedGen) { @@ -1455,10 +1459,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp "Tried to update from unexpected pending repo generation [" + meta.pendingGeneration() + "] after write to generation [" + newGen + "]"); } - return ClusterState.builder(currentState).metadata(Metadata.builder(currentState.getMetadata()) + return stateFilter.apply(ClusterState.builder(currentState).metadata(Metadata.builder(currentState.getMetadata()) .putCustom(RepositoriesMetadata.TYPE, currentState.metadata().custom(RepositoriesMetadata.TYPE).withUpdatedGeneration( - metadata.name(), newGen, newGen)).build()).build(); + metadata.name(), newGen, newGen))).build()); } @Override diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index e7bf14abc4a..ebd460f20f3 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -315,8 +315,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus final ActionListener userCreateSnapshotListener) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { - boolean snapshotCreated; - boolean hadAbortedInitializations; @Override @@ -341,7 +339,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus repository.initializeSnapshot( snapshot.snapshot().getSnapshotId(), snapshot.indices(), metadataForSnapshot(snapshot, clusterState.metadata())); - snapshotCreated = true; } logger.info("snapshot [{}] started", snapshot.snapshot()); @@ -412,8 +409,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus public void onFailure(String source, Exception e) { logger.warn(() -> new ParameterizedMessage("[{}] failed to create snapshot", snapshot.snapshot().getSnapshotId()), e); - removeSnapshotFromClusterState(snapshot.snapshot(), null, e, - new CleanupAfterErrorListener(snapshot, snapshotCreated, userCreateSnapshotListener, e)); + removeSnapshotFromClusterState(snapshot.snapshot(), e, + new CleanupAfterErrorListener(userCreateSnapshotListener, e)); } @Override @@ -449,62 +446,29 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus public void onFailure(Exception e) { logger.warn(() -> new ParameterizedMessage("failed to create snapshot [{}]", snapshot.snapshot().getSnapshotId()), e); - removeSnapshotFromClusterState(snapshot.snapshot(), null, e, - new CleanupAfterErrorListener(snapshot, snapshotCreated, userCreateSnapshotListener, e)); + removeSnapshotFromClusterState(snapshot.snapshot(), e, + new CleanupAfterErrorListener(userCreateSnapshotListener, e)); } }); } - private class CleanupAfterErrorListener { + private static class CleanupAfterErrorListener { - private final SnapshotsInProgress.Entry snapshot; - private final boolean snapshotCreated; private final ActionListener userCreateSnapshotListener; private final Exception e; - CleanupAfterErrorListener(SnapshotsInProgress.Entry snapshot, boolean snapshotCreated, - ActionListener userCreateSnapshotListener, Exception e) { - this.snapshot = snapshot; - this.snapshotCreated = snapshotCreated; + CleanupAfterErrorListener(ActionListener userCreateSnapshotListener, Exception e) { this.userCreateSnapshotListener = userCreateSnapshotListener; this.e = e; } public void onFailure(@Nullable Exception e) { - if (snapshotCreated) { - cleanupAfterError(ExceptionsHelper.useOrSuppress(e, this.e)); - } else { - userCreateSnapshotListener.onFailure(ExceptionsHelper.useOrSuppress(e, this.e)); - } + userCreateSnapshotListener.onFailure(ExceptionsHelper.useOrSuppress(e, this.e)); } public void onNoLongerMaster() { userCreateSnapshotListener.onFailure(e); } - - private void cleanupAfterError(Exception exception) { - threadPool.generic().execute(() -> { - final Metadata metadata = clusterService.state().metadata(); - repositoriesService.repository(snapshot.snapshot().getRepository()) - .finalizeSnapshot(snapshot.snapshot().getSnapshotId(), - buildGenerations(snapshot, metadata), - snapshot.startTime(), - ExceptionsHelper.stackTrace(exception), - 0, - Collections.emptyList(), - snapshot.repositoryStateId(), - snapshot.includeGlobalState(), - metadataForSnapshot(snapshot, metadata), - snapshot.userMetadata(), - snapshot.version(), - ActionListener.runAfter(ActionListener.wrap(ignored -> { - }, inner -> { - inner.addSuppressed(exception); - logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot in repository", - snapshot.snapshot()), inner); - }), () -> userCreateSnapshotListener.onFailure(e))); - }); - } } private static ShardGenerations buildGenerations(SnapshotsInProgress.Entry snapshot, Metadata metadata) { @@ -876,8 +840,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus final Snapshot snapshot = entry.snapshot(); if (entry.repositoryStateId() == RepositoryData.UNKNOWN_REPO_GEN) { logger.debug("[{}] was aborted before starting", snapshot); - removeSnapshotFromClusterState(entry.snapshot(), null, - new SnapshotException(snapshot, "Aborted on initialization")); + removeSnapshotFromClusterState(entry.snapshot(), new SnapshotException(snapshot, "Aborted on initialization"), null); return; } threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { @@ -912,11 +875,20 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus metadataForSnapshot(entry, metadata), entry.userMetadata(), entry.version(), - ActionListener.wrap(result -> { - final SnapshotInfo snapshotInfo = result.v2(); - removeSnapshotFromClusterState(snapshot, result, null); - logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); - }, this::onFailure)); + state -> stateWithoutSnapshot(state, snapshot), + ActionListener.wrap(result -> { + final List>> completionListeners = + snapshotCompletionListeners.remove(snapshot); + if (completionListeners != null) { + try { + ActionListener.onResponse(completionListeners, result); + } catch (Exception e) { + logger.warn("Failed to notify listeners", e); + } + } + endingSnapshots.remove(snapshot); + logger.info("snapshot [{}] completed with state [{}]", snapshot, result.v2().state()); + }, this::onFailure)); } @Override @@ -931,53 +903,46 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus new SnapshotException(snapshot, "Failed to update cluster state during snapshot finalization", e)); } else { logger.warn(() -> new ParameterizedMessage("[{}] failed to finalize snapshot", snapshot), e); - removeSnapshotFromClusterState(snapshot, null, e); + removeSnapshotFromClusterState(snapshot, e, null); } } }); } - /** - * Removes record of running snapshot from cluster state - * @param snapshot snapshot - * @param snapshotResult new {@link RepositoryData} and {@link SnapshotInfo} info if snapshot was successful - * @param e exception if snapshot failed, {@code null} otherwise - */ - private void removeSnapshotFromClusterState(final Snapshot snapshot, @Nullable Tuple snapshotResult, - @Nullable Exception e) { - removeSnapshotFromClusterState(snapshot, snapshotResult, e, null); + private static ClusterState stateWithoutSnapshot(ClusterState state, Snapshot snapshot) { + SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE); + if (snapshots != null) { + boolean changed = false; + ArrayList entries = new ArrayList<>(); + for (SnapshotsInProgress.Entry entry : snapshots.entries()) { + if (entry.snapshot().equals(snapshot)) { + changed = true; + } else { + entries.add(entry); + } + } + if (changed) { + return ClusterState.builder(state).putCustom( + SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))).build(); + } + } + return state; } /** * Removes record of running snapshot from cluster state and notifies the listener when this action is complete * @param snapshot snapshot - * @param failure exception if snapshot failed, {@code null} otherwise + * @param failure exception if snapshot failed * @param listener listener to notify when snapshot information is removed from the cluster state */ - private void removeSnapshotFromClusterState(final Snapshot snapshot, @Nullable Tuple snapshotResult, - @Nullable Exception failure, @Nullable CleanupAfterErrorListener listener) { - assert snapshotResult != null || failure != null : "Either snapshotInfo or failure must be supplied"; + private void removeSnapshotFromClusterState(final Snapshot snapshot, Exception failure, + @Nullable CleanupAfterErrorListener listener) { + assert failure != null : "Failure must be supplied"; clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { - SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - if (snapshots != null) { - boolean changed = false; - ArrayList entries = new ArrayList<>(); - for (SnapshotsInProgress.Entry entry : snapshots.entries()) { - if (entry.snapshot().equals(snapshot)) { - changed = true; - } else { - entries.add(entry); - } - } - if (changed) { - return ClusterState.builder(currentState) - .putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))).build(); - } - } - return currentState; + return stateWithoutSnapshot(currentState, snapshot); } @Override @@ -1001,20 +966,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (snapshotResult == null) { - failSnapshotCompletionListeners(snapshot, failure); - } else { - final List>> completionListeners = - snapshotCompletionListeners.remove(snapshot); - if (completionListeners != null) { - try { - ActionListener.onResponse(completionListeners, snapshotResult); - } catch (Exception e) { - logger.warn("Failed to notify listeners", e); - } - } - endingSnapshots.remove(snapshot); - } + failSnapshotCompletionListeners(snapshot, failure); if (listener != null) { listener.onFailure(null); } diff --git a/server/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java b/server/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java index fdbe685cd4b..85b8d972967 100644 --- a/server/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java +++ b/server/src/test/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java @@ -189,8 +189,7 @@ public class SnapshotDisruptionIT extends ESIntegTestCase { final RepositoriesMetadata repoMeta = event.state().metadata().custom(RepositoriesMetadata.TYPE); final RepositoryMetadata metadata = repoMeta.repository("test-repo"); - if (metadata.generation() == metadata.pendingGeneration() - && metadata.generation() > snapshotEntry.repositoryStateId()) { + if (metadata.pendingGeneration() > snapshotEntry.repositoryStateId()) { logger.info("--> starting disruption"); networkDisruption.startDisrupting(); clusterService.removeListener(this); @@ -234,7 +233,8 @@ public class SnapshotDisruptionIT extends ESIntegTestCase { final SnapshotException sne = (SnapshotException) ExceptionsHelper.unwrap(ex, SnapshotException.class); assertNotNull(sne); assertThat( - sne.getMessage(), either(endsWith(" Failed to remove snapshot from cluster state")).or(endsWith(" no longer master"))); + sne.getMessage(), either(endsWith(" Failed to update cluster state during snapshot finalization")) + .or(endsWith(" no longer master"))); assertThat(sne.getSnapshotName(), is(snapshot)); } diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 43ed5333fd8..60481e42729 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -52,6 +52,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Function; import static org.mockito.Mockito.mock; @@ -165,7 +166,8 @@ public class RepositoriesServiceTests extends ESTestCase { public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, Metadata metadata, Map userMetadata, - Version repositoryMetaVersion, ActionListener> listener) { + Version repositoryMetaVersion, Function stateTransformer, + ActionListener> listener) { listener.onResponse(null); } diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index 4c394fda013..e7943134bd8 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -59,6 +59,7 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.function.Function; import static org.hamcrest.Matchers.containsString; @@ -177,7 +178,7 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase { ShardGenerations.builder().put(indexId, 0, shardGen).build(), 0L, null, 1, Collections.emptyList(), -1L, false, Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(), Collections.emptyMap(), - Version.CURRENT, f)); + Version.CURRENT, Function.identity(), f)); IndexShardSnapshotFailedException isfe = expectThrows(IndexShardSnapshotFailedException.class, () -> snapshotShard(shard, snapshotWithSameName, repository)); assertThat(isfe.getMessage(), containsString("Duplicate snapshot name")); diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index 58e707a480d..8347ba5a977 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -51,6 +51,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Function; import java.util.stream.Collectors; import static org.elasticsearch.repositories.RepositoryDataTests.generateRandomRepoData; @@ -198,7 +199,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { RepositoryData repositoryData = generateRandomRepoData(); final long startingGeneration = repositoryData.getGenId(); final PlainActionFuture future1 = PlainActionFuture.newFuture(); - repository.writeIndexGen(repositoryData, startingGeneration, true, future1); + repository.writeIndexGen(repositoryData, startingGeneration, true, Function.identity(),future1); // write repo data again to index generational file, errors because we already wrote to the // N+1 generation from which this repository data instance was created @@ -239,7 +240,8 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { } private static void writeIndexGen(BlobStoreRepository repository, RepositoryData repositoryData, long generation) throws Exception { - PlainActionFuture.get(f -> repository.writeIndexGen(repositoryData, generation, true, f)); + PlainActionFuture.get(f -> repository.writeIndexGen(repositoryData, generation, true, + Function.identity(), f)); } private BlobStoreRepository setupRepo() { diff --git a/server/src/test/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java b/server/src/test/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java index 6a692eab35b..da016df56bf 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java @@ -21,6 +21,7 @@ package org.elasticsearch.snapshots; import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.collect.Tuple; @@ -43,6 +44,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Function; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.is; @@ -89,11 +91,12 @@ public class RepositoryFilterUserMetadataIT extends ESIntegTestCase { public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, Metadata clusterMetadata, Map userMetadata, - Version repositoryMetaVersion, + Version repositoryMetaVersion, Function stateTransformer, ActionListener> listener) { assertThat(userMetadata, is(Collections.singletonMap(MOCK_FILTERED_META, initialMetaValue))); super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, - repositoryStateId, includeGlobalState, clusterMetadata, userMetadata, repositoryMetaVersion, listener); + repositoryStateId, includeGlobalState, clusterMetadata, userMetadata, repositoryMetaVersion, stateTransformer, + listener); } @Override diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index a070e3c85b6..d0c18d24bcd 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -41,6 +41,7 @@ import java.io.InputStream; import java.nio.file.NoSuchFileException; import java.util.Arrays; import java.util.Collections; +import java.util.function.Function; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.startsWith; @@ -152,20 +153,20 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase { PlainActionFuture., Exception>get(f -> // We try to write another snap- blob for "foo" in the next generation. It fails because the content differs. repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(), - -1L, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, f)); + -1L, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, Function.identity(), f)); // We try to write another snap- blob for "foo" in the next generation. It fails because the content differs. final AssertionError assertionError = expectThrows(AssertionError.class, () -> PlainActionFuture., Exception>get(f -> repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 6, Collections.emptyList(), - 0, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, f))); + 0, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, Function.identity(), f))); assertThat(assertionError.getMessage(), equalTo("\nExpected: <6>\n but: was <5>")); // We try to write yet another snap- blob for "foo" in the next generation. // It passes cleanly because the content of the blob except for the timestamps. PlainActionFuture., Exception>get(f -> repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(), - 0, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, f)); + 0, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, Function.identity(), f)); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 6b6933930b1..99b72aa5a42 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -43,6 +43,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Function; import static java.util.Collections.emptyList; import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN; @@ -102,7 +103,8 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, Metadata metadata, Map userMetadata, - Version repositoryMetaVersion, ActionListener> listener) { + Version repositoryMetaVersion, Function stateTransformer, + ActionListener> listener) { listener.onResponse(null); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 321087f7f1d..9610852385d 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -92,6 +92,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.function.LongConsumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -262,6 +263,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, Metadata metadata, Map userMetadata, Version repositoryMetaVersion, + Function stateTransformer, ActionListener> listener) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java index 1cccc00f5c7..be36cf6ff99 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java @@ -17,6 +17,7 @@ import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.SimpleFSDirectory; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.MappingMetadata; import org.elasticsearch.cluster.metadata.Metadata; @@ -98,14 +99,15 @@ public final class SourceOnlySnapshotRepository extends FilterRepository { public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, Metadata metadata, Map userMetadata, - Version repositoryMetaVersion, ActionListener> listener) { + Version repositoryMetaVersion, Function stateTransformer, + ActionListener> listener) { // we process the index metadata at snapshot time. This means if somebody tries to restore // a _source only snapshot with a plain repository it will be just fine since we already set the // required engine, that the index is read-only and the mapping to a default mapping try { super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId, includeGlobalState, metadataToSnapshot(shardGenerations.indices(), metadata), userMetadata, repositoryMetaVersion, - listener); + stateTransformer, listener); } catch (IOException ex) { listener.onFailure(ex); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index 2e241bbb056..427ba7d3b9b 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -74,6 +74,7 @@ import java.nio.file.Path; import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.function.Function; public class SourceOnlySnapshotShardTests extends IndexShardTestCase { @@ -222,8 +223,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { indexShardSnapshotStatus.asCopy().getStartTime(), null, 1, Collections.emptyList(), ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getGenId(), true, Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(), Collections.emptyMap(), - Version.CURRENT, - finFuture); + Version.CURRENT, Function.identity(), finFuture); finFuture.actionGet(); }); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();