diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index cbaca8a05ff..d3a868ba7ba 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -33,12 +33,14 @@ import org.elasticsearch.common.blobstore.BlobStore; import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.settings.SecureSetting; import org.elasticsearch.common.settings.SecureString; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.monitor.jvm.JvmInfo; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; @@ -272,7 +274,7 @@ class S3Repository extends BlobStoreRepository { public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, Metadata clusterMetadata, Map userMetadata, Version repositoryMetaVersion, - ActionListener listener) { + ActionListener> listener) { if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) { listener = delayedListener(listener); } diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 1a2ecf4d494..64669702029 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.index.mapper.MapperService; @@ -83,7 +84,7 @@ public class FilterRepository implements Repository { public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, Metadata metadata, Map userMetadata, - Version repositoryMetaVersion, ActionListener listener) { + Version repositoryMetaVersion, ActionListener> listener) { in.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId, includeGlobalState, metadata, userMetadata, repositoryMetaVersion, listener); } diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index ecd13a693bf..86a358d88a2 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.shard.ShardId; @@ -140,12 +141,13 @@ public interface Repository extends LifecycleComponent { * @param clusterMetadata cluster metadata * @param userMetadata user metadata * @param repositoryMetaVersion version of the updated repository metadata to write - * @param listener listener to be called on completion of the snapshot + * @param listener listener to be invoked with the new {@link RepositoryData} and the snapshot's {@link SnapshotInfo} + * completion of the snapshot */ void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, Metadata clusterMetadata, Map userMetadata, - Version repositoryMetaVersion, ActionListener listener); + Version repositoryMetaVersion, ActionListener> listener); /** * Deletes snapshot diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 3efc0a53649..3af82cf3c76 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -912,7 +912,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp final Metadata clusterMetadata, final Map userMetadata, Version repositoryMetaVersion, - final ActionListener listener) { + final ActionListener> listener) { assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN : "Must finalize based on a valid repository generation but received [" + repositoryStateId + "]"; final Collection indices = shardGenerations.indices(); @@ -930,11 +930,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp getRepositoryData(ActionListener.wrap(existingRepositoryData -> { final RepositoryData updatedRepositoryData = existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations); - writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, ActionListener.wrap(v -> { + writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, ActionListener.wrap(writtenRepoData -> { if (writeShardGens) { cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); } - listener.onResponse(snapshotInfo); + listener.onResponse(new Tuple<>(writtenRepoData, snapshotInfo)); }, onUpdateFailure)); }, onUpdateFailure)); }, onUpdateFailure), 2 + indices.size()); @@ -1321,7 +1321,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp * @param writeShardGens whether to write {@link ShardGenerations} to the new {@link RepositoryData} blob * @param listener completion listener */ - protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, boolean writeShardGens, ActionListener listener) { + protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, boolean writeShardGens, + ActionListener listener) { assert isReadOnly() == false; // can not write to a read only repository final long currentGen = repositoryData.getGenId(); if (currentGen != expectedGen) { @@ -1468,8 +1469,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - cacheRepositoryData(filteredRepositoryData.withGenId(newGen)); - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(listener, () -> { + final RepositoryData writtenRepositoryData = filteredRepositoryData.withGenId(newGen); + cacheRepositoryData(writtenRepositoryData); + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> { // Delete all now outdated index files up to 1000 blobs back from the new generation. // If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them. // Deleting one older than the current expectedGen is done for BwC reasons as older versions used to keep @@ -1483,6 +1485,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } catch (IOException e) { logger.warn(() -> new ParameterizedMessage("Failed to clean up old index blobs {}", oldIndexN), e); } + return writtenRepositoryData; })); } }); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index f3e7d3e267c..854b916095d 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -121,7 +121,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private final ThreadPool threadPool; - private final Map>> snapshotCompletionListeners = new ConcurrentHashMap<>(); + private final Map>>> snapshotCompletionListeners = + new ConcurrentHashMap<>(); // Set of snapshots that are currently being initialized by this node private final Set initializingSnapshots = Collections.synchronizedSet(new HashSet<>()); @@ -150,7 +151,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus * @param listener snapshot completion listener */ public void executeSnapshot(final CreateSnapshotRequest request, final ActionListener listener) { - createSnapshot(request, ActionListener.wrap(snapshot -> addListener(snapshot, listener), listener::onFailure)); + createSnapshot(request, + ActionListener.wrap(snapshot -> addListener(snapshot, ActionListener.map(listener, Tuple::v2)), listener::onFailure)); } /** @@ -449,7 +451,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus }); } - private class CleanupAfterErrorListener implements ActionListener { + private class CleanupAfterErrorListener { private final SnapshotsInProgress.Entry snapshot; private final boolean snapshotCreated; @@ -464,12 +466,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus this.e = e; } - @Override - public void onResponse(SnapshotInfo snapshotInfo) { - cleanupAfterError(this.e); - } - - @Override public void onFailure(@Nullable Exception e) { if (snapshotCreated) { cleanupAfterError(ExceptionsHelper.useOrSuppress(e, this.e)); @@ -899,8 +895,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus metadataForSnapshot(entry, metadata), entry.userMetadata(), entry.version(), - ActionListener.wrap(snapshotInfo -> { - removeSnapshotFromClusterState(snapshot, snapshotInfo, null); + ActionListener.wrap(result -> { + final SnapshotInfo snapshotInfo = result.v2(); + removeSnapshotFromClusterState(snapshot, result, null); logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); }, this::onFailure)); } @@ -926,11 +923,12 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus /** * Removes record of running snapshot from cluster state * @param snapshot snapshot - * @param snapshotInfo snapshot info if snapshot was successful + * @param snapshotResult new {@link RepositoryData} and {@link SnapshotInfo} info if snapshot was successful * @param e exception if snapshot failed, {@code null} otherwise */ - private void removeSnapshotFromClusterState(final Snapshot snapshot, final SnapshotInfo snapshotInfo, @Nullable Exception e) { - removeSnapshotFromClusterState(snapshot, snapshotInfo, e, null); + private void removeSnapshotFromClusterState(final Snapshot snapshot, @Nullable Tuple snapshotResult, + @Nullable Exception e) { + removeSnapshotFromClusterState(snapshot, snapshotResult, e, null); } /** @@ -939,9 +937,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus * @param failure exception if snapshot failed, {@code null} otherwise * @param listener listener to notify when snapshot information is removed from the cluster state */ - private void removeSnapshotFromClusterState(final Snapshot snapshot, @Nullable SnapshotInfo snapshotInfo, @Nullable Exception failure, - @Nullable CleanupAfterErrorListener listener) { - assert snapshotInfo != null || failure != null : "Either snapshotInfo or failure must be supplied"; + private void removeSnapshotFromClusterState(final Snapshot snapshot, @Nullable Tuple snapshotResult, + @Nullable Exception failure, @Nullable CleanupAfterErrorListener listener) { + assert snapshotResult != null || failure != null : "Either snapshotInfo or failure must be supplied"; clusterService.submitStateUpdateTask("remove snapshot metadata", new ClusterStateUpdateTask() { @Override @@ -986,13 +984,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (snapshotInfo == null) { + if (snapshotResult == null) { failSnapshotCompletionListeners(snapshot, failure); } else { - final List> completionListeners = snapshotCompletionListeners.remove(snapshot); + final List>> completionListeners = + snapshotCompletionListeners.remove(snapshot); if (completionListeners != null) { try { - ActionListener.onResponse(completionListeners, snapshotInfo); + ActionListener.onResponse(completionListeners, snapshotResult); } catch (Exception e) { logger.warn("Failed to notify listeners", e); } @@ -1007,7 +1006,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus } private void failSnapshotCompletionListeners(Snapshot snapshot, Exception e) { - final List> completionListeners = snapshotCompletionListeners.remove(snapshot); + final List>> completionListeners = snapshotCompletionListeners.remove(snapshot); if (completionListeners != null) { try { ActionListener.onFailure(completionListeners, e); @@ -1108,14 +1107,30 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { if (runningSnapshot == null) { - tryDeleteExisting(Priority.NORMAL); + threadPool.generic().execute(ActionRunnable.wrap(listener, l -> + repositoriesService.repository(repositoryName).getRepositoryData( + ActionListener.wrap(repositoryData -> { + Optional matchedEntry = repositoryData.getSnapshotIds() + .stream() + .filter(s -> s.getName().equals(snapshotName)) + .findFirst(); + // If we can't find the snapshot by the given name in the repository at all or if the snapshot we find in + // the repository is not the one we expected to find when waiting for a finishing snapshot we fail. + if (matchedEntry.isPresent()) { + deleteCompletedSnapshot( + new Snapshot(repositoryName, matchedEntry.get()), repositoryData.getGenId(), Priority.NORMAL, l); + } else { + l.onFailure(new SnapshotMissingException(repositoryName, snapshotName)); + } + }, l::onFailure)))); return; } logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish"); addListener(runningSnapshot, ActionListener.wrap( - snapshotInfo -> { + result -> { logger.debug("deleted snapshot completed - deleting files"); - tryDeleteExisting(Priority.IMMEDIATE); + deleteCompletedSnapshot( + new Snapshot(repositoryName, result.v2().snapshotId()), result.v1().getGenId(), Priority.IMMEDIATE, listener); }, e -> { if (abortedDuringInit) { @@ -1136,33 +1151,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus } )); } - - private void tryDeleteExisting(Priority priority) { - threadPool.generic().execute(ActionRunnable.wrap(listener, l -> - repositoriesService.repository(repositoryName).getRepositoryData(ActionListener.wrap(repositoryData -> { - Optional matchedEntry = repositoryData.getSnapshotIds() - .stream() - .filter(s -> s.getName().equals(snapshotName)) - .findFirst(); - // If we can't find the snapshot by the given name in the repository at all or if the snapshot we find in the - // repository is not the one we expected to find when waiting for a finishing snapshot we fail. - // Note: Not finding a snapshot we expected to find is practically impossible as it would imply that the snapshot - // we waited for was concurrently deleted and another snapshot by the same name concurrently created - // during the context switch from the cluster state thread to the snapshot thread. We still guard against the - // possibility as a safety measure. - if (matchedEntry.isPresent() == false - || (runningSnapshot != null && matchedEntry.get().equals(runningSnapshot.getSnapshotId()) == false)) { - if (runningSnapshot != null && matchedEntry.isPresent()) { - logger.warn("Waited for snapshot [{}}] but found snapshot [{}] in repository [{}]", - runningSnapshot.getSnapshotId(), matchedEntry.get(), repositoryName); - } - l.onFailure(new SnapshotMissingException(repositoryName, snapshotName)); - } else { - deleteCompletedSnapshot( - new Snapshot(repositoryName, matchedEntry.get()), repositoryData.getGenId(), priority, l); - } - }, l::onFailure)))); - } }); } @@ -1470,7 +1458,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus * @param snapshot Snapshot to listen for * @param listener listener */ - private void addListener(Snapshot snapshot, ActionListener listener) { + private void addListener(Snapshot snapshot, ActionListener> listener) { snapshotCompletionListeners.computeIfAbsent(snapshot, k -> new CopyOnWriteArrayList<>()).add(listener); } diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index ad597783d97..43ed5333fd8 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -31,6 +31,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.settings.Settings; @@ -164,7 +165,7 @@ public class RepositoriesServiceTests extends ESTestCase { public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, Metadata metadata, Map userMetadata, - Version repositoryMetaVersion, ActionListener listener) { + Version repositoryMetaVersion, ActionListener> listener) { listener.onResponse(null); } diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index e7e98c88a52..4c394fda013 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingHelper; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; @@ -45,6 +46,7 @@ import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreFileMetadata; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.snapshots.Snapshot; @@ -146,7 +148,7 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase { } } - public void testSnapshotWithConflictingName() throws IOException { + public void testSnapshotWithConflictingName() throws Exception { final IndexId indexId = new IndexId(randomAlphaOfLength(10), UUIDs.randomBase64UUID()); final ShardId shardId = new ShardId(indexId.getName(), indexId.getId(), 0); @@ -170,13 +172,12 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase { assertNotNull(shardGen); final Snapshot snapshotWithSameName = new Snapshot(repository.getMetadata().name(), new SnapshotId( snapshot.getSnapshotId().getName(), "_uuid2")); - final PlainActionFuture future = PlainActionFuture.newFuture(); - repository.finalizeSnapshot(snapshot.getSnapshotId(), - ShardGenerations.builder().put(indexId, 0, shardGen).build(), - 0L, null, 1, Collections.emptyList(), -1L, false, - Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(), Collections.emptyMap(), Version.CURRENT, - future); - future.actionGet(); + PlainActionFuture., Exception>get(f -> + repository.finalizeSnapshot(snapshot.getSnapshotId(), + ShardGenerations.builder().put(indexId, 0, shardGen).build(), + 0L, null, 1, Collections.emptyList(), -1L, false, + Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(), Collections.emptyMap(), + Version.CURRENT, f)); IndexShardSnapshotFailedException isfe = expectThrows(IndexShardSnapshotFailedException.class, () -> snapshotShard(shard, snapshotWithSameName, repository)); assertThat(isfe.getMessage(), containsString("Duplicate snapshot name")); diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index 0b66e4215d7..58e707a480d 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -197,7 +197,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { // write to index generational file RepositoryData repositoryData = generateRandomRepoData(); final long startingGeneration = repositoryData.getGenId(); - final PlainActionFuture future1 = PlainActionFuture.newFuture(); + final PlainActionFuture future1 = PlainActionFuture.newFuture(); repository.writeIndexGen(repositoryData, startingGeneration, true, future1); // write repo data again to index generational file, errors because we already wrote to the @@ -238,10 +238,8 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { " See the breaking changes documentation for the next major version."); } - private static void writeIndexGen(BlobStoreRepository repository, RepositoryData repositoryData, long generation) { - final PlainActionFuture future = PlainActionFuture.newFuture(); - repository.writeIndexGen(repositoryData, generation, true, future); - future.actionGet(); + private static void writeIndexGen(BlobStoreRepository repository, RepositoryData repositoryData, long generation) throws Exception { + PlainActionFuture.get(f -> repository.writeIndexGen(repositoryData, generation, true, f)); } private BlobStoreRepository setupRepo() { diff --git a/server/src/test/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java b/server/src/test/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java index 44a457bbd25..6a692eab35b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; @@ -33,6 +34,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.test.ESIntegTestCase; @@ -87,7 +89,8 @@ public class RepositoryFilterUserMetadataIT extends ESIntegTestCase { public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, Metadata clusterMetadata, Map userMetadata, - Version repositoryMetaVersion, ActionListener listener) { + Version repositoryMetaVersion, + ActionListener> listener) { assertThat(userMetadata, is(Collections.singletonMap(MOCK_FILTERED_META, initialMetaValue))); super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId, includeGlobalState, clusterMetadata, userMetadata, repositoryMetaVersion, listener); diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index 8b7703658ad..a070e3c85b6 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -25,7 +25,9 @@ import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; @@ -134,7 +136,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase { } } - public void testOverwriteSnapshotInfoBlob() { + public void testOverwriteSnapshotInfoBlob() throws Exception { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); final RepositoryMetadata metadata = new RepositoryMetadata("testRepo", "mockEventuallyConsistent", Settings.EMPTY); final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(metadata); @@ -146,30 +148,24 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase { repository.start(); // We create a snap- blob for snapshot "foo" in the first generation - final PlainActionFuture future = PlainActionFuture.newFuture(); final SnapshotId snapshotId = new SnapshotId("foo", UUIDs.randomBase64UUID()); - // We try to write another snap- blob for "foo" in the next generation. It fails because the content differs. - repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(), - -1L, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, future); - future.actionGet(); + PlainActionFuture., Exception>get(f -> + // We try to write another snap- blob for "foo" in the next generation. It fails because the content differs. + repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(), + -1L, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, f)); // We try to write another snap- blob for "foo" in the next generation. It fails because the content differs. final AssertionError assertionError = expectThrows(AssertionError.class, - () -> { - final PlainActionFuture fut = PlainActionFuture.newFuture(); - repository.finalizeSnapshot( - snapshotId, ShardGenerations.EMPTY, 1L, null, 6, Collections.emptyList(), - 0, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, fut); - fut.actionGet(); - }); + () -> PlainActionFuture., Exception>get(f -> + repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 6, Collections.emptyList(), + 0, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, f))); assertThat(assertionError.getMessage(), equalTo("\nExpected: <6>\n but: was <5>")); // We try to write yet another snap- blob for "foo" in the next generation. // It passes cleanly because the content of the blob except for the timestamps. - final PlainActionFuture future2 = PlainActionFuture.newFuture(); - repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(), - 0, false, Metadata.EMPTY_METADATA, Collections.emptyMap(),Version.CURRENT, future2); - future2.actionGet(); + PlainActionFuture., Exception>get(f -> + repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(), + 0, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, f)); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index b7153fa374e..7183b2f05a1 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; @@ -101,7 +102,7 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, Metadata metadata, Map userMetadata, - Version repositoryMetaVersion, ActionListener listener) { + Version repositoryMetaVersion, ActionListener> listener) { listener.onResponse(null); } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index f2462701c17..93881e2547c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -33,6 +33,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.Strings; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.metrics.CounterMetric; @@ -261,7 +262,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, Metadata metadata, Map userMetadata, Version repositoryMetaVersion, - ActionListener listener) { + ActionListener> listener) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java index de2161f8253..f814a02a6e6 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lucene.search.Queries; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; @@ -37,6 +38,7 @@ import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.repositories.FilterRepository; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.ShardGenerations; import java.io.Closeable; @@ -96,7 +98,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository { public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, Metadata metadata, Map userMetadata, - Version repositoryMetaVersion, ActionListener listener) { + Version repositoryMetaVersion, ActionListener> listener) { // we process the index metadata at snapshot time. This means if somebody tries to restore // a _source only snapshot with a plain repository it will be just fine since we already set the // required engine, that the index is read-only and the mapping to a default mapping diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index 92b2d1a620a..2e241bbb056 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -35,6 +35,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.lucene.uid.Versions; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentHelper; @@ -60,6 +61,7 @@ import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.repositories.blobstore.ESBlobStoreRepositoryIntegTestCase; @@ -214,7 +216,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(), null, indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future); future.actionGet(); - final PlainActionFuture finFuture = PlainActionFuture.newFuture(); + final PlainActionFuture> finFuture = PlainActionFuture.newFuture(); repository.finalizeSnapshot(snapshotId, ShardGenerations.builder().put(indexId, 0, indexShardSnapshotStatus.generation()).build(), indexShardSnapshotStatus.asCopy().getStartTime(), null, 1, Collections.emptyList(),