From aae93a75780c00f3bc96a25ffae02dc20ad1b127 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 28 Jan 2020 18:33:26 +0100 Subject: [PATCH] Allow Repository Plugins to Filter Metadata on Create (#51472) (#51542) * Allow Repository Plugins to Filter Metadata on Create Add a hook that allows repository plugins to filter the repository metadata before it gets written to the cluster state. --- .../repositories/FilterRepository.java | 5 +- .../repositories/Repository.java | 13 +- .../blobstore/BlobStoreRepository.java | 2 +- .../snapshots/SnapshotShardsService.java | 44 +++---- .../snapshots/SnapshotsService.java | 9 +- .../RepositoriesServiceTests.java | 2 +- .../repositories/fs/FsRepositoryTests.java | 6 +- .../RepositoryFilterUserMetadataIT.java | 111 ++++++++++++++++++ .../index/shard/IndexShardTestCase.java | 2 +- .../index/shard/RestoreOnlyRepository.java | 2 +- .../xpack/ccr/repository/CcrRepository.java | 2 +- .../SourceOnlySnapshotRepository.java | 4 +- .../SourceOnlySnapshotShardTests.java | 10 +- 13 files changed, 170 insertions(+), 42 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 185234093ff..e0cf414f86f 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -125,8 +125,9 @@ public class FilterRepository implements Repository { @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, - ActionListener listener) { - in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, writeShardGens, listener); + Map userMetadata, ActionListener listener) { + in.snapshotShard( + store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, writeShardGens, userMetadata, listener); } @Override public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState, diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 6886a2bc0a3..bba82c24c10 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -21,6 +21,7 @@ package org.elasticsearch.repositories; import org.apache.lucene.index.IndexCommit; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -211,10 +212,12 @@ public interface Repository extends LifecycleComponent { * @param indexId id for the index being snapshotted * @param snapshotIndexCommit commit point * @param snapshotStatus snapshot status + * @param userMetadata user metadata of the snapshot found in {@link SnapshotsInProgress.Entry#userMetadata()} * @param listener listener invoked on completion */ void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, - IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, ActionListener listener); + IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, Map userMetadata, + ActionListener listener); /** * Restores snapshot of the shard. @@ -247,4 +250,12 @@ public interface Repository extends LifecycleComponent { * @param state new cluster state */ void updateState(ClusterState state); + + /** + * Hook that allows a repository to filter the user supplied snapshot metadata in {@link SnapshotsInProgress.Entry#userMetadata()} + * during snapshot initialization. + */ + default Map adaptUserMetadata(Map userMetadata) { + return userMetadata; + } } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index af63d7a2a08..19c5ee794f6 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -1459,7 +1459,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, - ActionListener listener) { + Map userMetadata, ActionListener listener) { final ShardId shardId = store.shardId(); final long startTime = threadPool.absoluteTimeInMillis(); try { diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index e0b9a4e48ac..888216b46f0 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -283,27 +283,28 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements assert indexId != null; assert entry.useShardGenerations() || snapshotStatus.generation() == null : "Found non-null shard generation [" + snapshotStatus.generation() + "] for snapshot with old-format compatibility"; - snapshot(shardId, snapshot, indexId, snapshotStatus, entry.useShardGenerations(), new ActionListener() { - @Override - public void onResponse(String newGeneration) { - assert newGeneration != null; - assert newGeneration.equals(snapshotStatus.generation()); - if (logger.isDebugEnabled()) { - final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); - logger.debug("snapshot [{}] completed to [{}] with [{}] at generation [{}]", - snapshot, snapshot.getRepository(), lastSnapshotStatus, snapshotStatus.generation()); + snapshot(shardId, snapshot, indexId, entry.userMetadata(), snapshotStatus, entry.useShardGenerations(), + new ActionListener() { + @Override + public void onResponse(String newGeneration) { + assert newGeneration != null; + assert newGeneration.equals(snapshotStatus.generation()); + if (logger.isDebugEnabled()) { + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); + logger.debug("snapshot [{}] completed to [{}] with [{}] at generation [{}]", + snapshot, snapshot.getRepository(), lastSnapshotStatus, snapshotStatus.generation()); + } + notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration); } - notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration); - } - @Override - public void onFailure(Exception e) { - final String failure = ExceptionsHelper.stackTrace(e); - snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure); - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); - notifyFailedSnapshotShard(snapshot, shardId, failure); - } - }); + @Override + public void onFailure(Exception e) { + final String failure = ExceptionsHelper.stackTrace(e); + snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure); + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); + notifyFailedSnapshotShard(snapshot, shardId, failure); + } + }); } }); } @@ -314,7 +315,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements * @param snapshot snapshot * @param snapshotStatus snapshot status */ - private void snapshot(final ShardId shardId, final Snapshot snapshot, final IndexId indexId, + private void snapshot(final ShardId shardId, final Snapshot snapshot, final IndexId indexId, final Map userMetadata, final IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, ActionListener listener) { try { final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); @@ -338,7 +339,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements // we flush first to make sure we get the latest writes snapshotted snapshotRef = indexShard.acquireLastIndexCommit(true); repository.snapshotShard(indexShard.store(), indexShard.mapperService(), snapshot.getSnapshotId(), indexId, - snapshotRef.getIndexCommit(), snapshotStatus, writeShardGens, ActionListener.runBefore(listener, snapshotRef::close)); + snapshotRef.getIndexCommit(), snapshotStatus, writeShardGens, userMetadata, + ActionListener.runBefore(listener, snapshotRef::close)); } catch (Exception e) { IOUtils.close(snapshotRef); throw e; diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index b3d2fab31c0..49f16188dae 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -279,6 +279,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot()); validate(repositoryName, snapshotName); final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot + Repository repository = repositoriesService.repository(request.repository()); + final Map userMeta = repository.adaptUserMetadata(request.userMetadata()); clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() { private SnapshotsInProgress.Entry newSnapshot = null; @@ -314,7 +316,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus threadPool.absoluteTimeInMillis(), RepositoryData.UNKNOWN_REPO_GEN, null, - request.userMetadata(), false + userMeta, false ); initializingSnapshots.add(newSnapshot.snapshot()); snapshots = new SnapshotsInProgress(newSnapshot); @@ -337,7 +339,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus final Snapshot current = newSnapshot.snapshot(); assert initializingSnapshots.contains(current); assert indices != null; - beginSnapshot(newState, newSnapshot, request.partial(), indices, new ActionListener() { + beginSnapshot(newState, newSnapshot, request.partial(), indices, repository, new ActionListener() { @Override public void onResponse(final Snapshot snapshot) { initializingSnapshots.remove(snapshot); @@ -443,6 +445,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus final SnapshotsInProgress.Entry snapshot, final boolean partial, final List indices, + final Repository repository, final ActionListener userCreateSnapshotListener) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { @@ -453,8 +456,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus @Override protected void doRun() { assert initializingSnapshots.contains(snapshot.snapshot()); - Repository repository = repositoriesService.repository(snapshot.snapshot().getRepository()); - if (repository.isReadOnly()) { throw new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository"); } diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 2c88e2dfc81..4000b857ab3 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -205,7 +205,7 @@ public class RepositoriesServiceTests extends ESTestCase { @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, - ActionListener listener) { + Map userMetadata, ActionListener listener) { } diff --git a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java index 2b687fbcac2..735afc8c8b1 100644 --- a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java @@ -105,7 +105,8 @@ public class FsRepositoryTests extends ESTestCase { final PlainActionFuture future1 = PlainActionFuture.newFuture(); runGeneric(threadPool, () -> { IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null); - repository.snapshotShard(store, null, snapshotId, indexId, indexCommit, snapshotStatus, true, future1); + repository.snapshotShard(store, null, snapshotId, indexId, indexCommit, snapshotStatus, true, + Collections.emptyMap(), future1); future1.actionGet(); IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy(); assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount()); @@ -133,7 +134,8 @@ public class FsRepositoryTests extends ESTestCase { final PlainActionFuture future2 = PlainActionFuture.newFuture(); runGeneric(threadPool, () -> { IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration); - repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus, true, future2); + repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus, true, + Collections.emptyMap(), future2); future2.actionGet(); IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy(); assertEquals(2, copy.getIncrementalFileCount()); diff --git a/server/src/test/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java b/server/src/test/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java new file mode 100644 index 00000000000..05d0bf4516d --- /dev/null +++ b/server/src/test/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.snapshots; + +import org.apache.lucene.index.IndexCommit; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.env.Environment; +import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; +import org.elasticsearch.index.store.Store; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.plugins.RepositoryPlugin; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.ShardGenerations; +import org.elasticsearch.repositories.fs.FsRepository; +import org.elasticsearch.test.ESIntegTestCase; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.is; + +public class RepositoryFilterUserMetadataIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return Collections.singleton(MetaDataFilteringPlugin.class); + } + + public void testFilteredRepoMetaDataIsUsed() { + final String masterName = internalCluster().getMasterName(); + final String repoName = "test-repo"; + assertAcked(client().admin().cluster().preparePutRepository(repoName).setType(MetaDataFilteringPlugin.TYPE).setSettings( + Settings.builder().put("location", randomRepoPath()) + .put(MetaDataFilteringPlugin.MASTER_SETTING_VALUE, masterName))); + createIndex("test-idx"); + final SnapshotInfo snapshotInfo = client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap") + .setWaitForCompletion(true).get().getSnapshotInfo(); + assertThat(snapshotInfo.userMetadata(), is(Collections.singletonMap(MetaDataFilteringPlugin.MOCK_FILTERED_META, masterName))); + } + + // Mock plugin that stores the name of the master node that started a snapshot in each snapshot's metadata + public static final class MetaDataFilteringPlugin extends org.elasticsearch.plugins.Plugin implements RepositoryPlugin { + + private static final String MOCK_FILTERED_META = "mock_filtered_meta"; + + private static final String MASTER_SETTING_VALUE = "initial_master"; + + private static final String TYPE = "mock_meta_filtering"; + + @Override + public Map getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry, + ClusterService clusterService) { + return Collections.singletonMap("mock_meta_filtering", metadata -> + new FsRepository(metadata, env, namedXContentRegistry, clusterService) { + + // Storing the initially expected metadata value here to verify that #filterUserMetadata is only called once on the + // initial master node starting the snapshot + private final String initialMetaValue = metadata.settings().get(MASTER_SETTING_VALUE); + + @Override + public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, + int totalShards, List shardFailures, long repositoryStateId, + boolean includeGlobalState, MetaData clusterMetaData, Map userMetadata, + boolean writeShardGens, ActionListener listener) { + assertThat(userMetadata, is(Collections.singletonMap(MOCK_FILTERED_META, initialMetaValue))); + super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, + repositoryStateId, includeGlobalState, clusterMetaData, userMetadata, writeShardGens, listener); + } + + @Override + public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, + boolean writeShardGens, Map userMetadata, ActionListener listener) { + assertThat(userMetadata, is(Collections.singletonMap(MOCK_FILTERED_META, initialMetaValue))); + super.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, + writeShardGens, userMetadata, listener); + } + + @Override + public Map adaptUserMetadata(Map userMetadata) { + return Collections.singletonMap(MOCK_FILTERED_META, clusterService.getNodeName()); + } + }); + } + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 527a391acd0..a2da1de24d2 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -840,7 +840,7 @@ public abstract class IndexShardTestCase extends ESTestCase { final String shardGen; try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) { repository.snapshotShard(shard.store(), shard.mapperService(), snapshot.getSnapshotId(), indexId, - indexCommitRef.getIndexCommit(), snapshotStatus, true, future); + indexCommitRef.getIndexCommit(), snapshotStatus, true, Collections.emptyMap(), future); shardGen = future.actionGet(); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index e73c767c3be..ce0704f260e 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -136,7 +136,7 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, - ActionListener listener) { + Map userMetadata, ActionListener listener) { } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 2a7e7090357..edc8eb93b82 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -302,7 +302,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, - ActionListener listener) { + Map userMetadata, ActionListener listener) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java index 8020c2f4efe..1318877a5ae 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java @@ -137,7 +137,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository { @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, - ActionListener listener) { + Map userMetadata, ActionListener listener) { if (mapperService.documentMapper() != null // if there is no mapping this is null && mapperService.documentMapper().sourceMapper().isComplete() == false) { listener.onFailure( @@ -177,7 +177,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository { toClose.add(reader); IndexCommit indexCommit = reader.getIndexCommit(); super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, snapshotStatus, writeShardGens, - ActionListener.runBefore(listener, () -> IOUtils.close(toClose))); + userMetadata, ActionListener.runBefore(listener, () -> IOUtils.close(toClose))); } catch (IOException e) { try { IOUtils.close(toClose); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index 7a7d4dd17a4..5caca033ffb 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -103,7 +103,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing("-1"); final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, - snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, future)); + snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, Collections.emptyMap(), future)); IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, future::actionGet); assertEquals( "Can't snapshot _source only on an index that has incomplete source ie. has _source disabled or filters the source", @@ -129,7 +129,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { SnapshotId snapshotId = new SnapshotId("test", "test"); final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, - snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, future)); + snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, Collections.emptyMap(), future)); shardGeneration = future.actionGet(); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount()); @@ -145,7 +145,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration); final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, - snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, future)); + snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, Collections.emptyMap(), future)); shardGeneration = future.actionGet(); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); // we processed the segments_N file plus _1.si, _1.fdx, _1.fnm, _1.fdt @@ -161,7 +161,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration); final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, - snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, future)); + snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, Collections.emptyMap(), future)); future.actionGet(); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); // we processed the segments_N file plus _1_1.liv @@ -209,7 +209,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> { repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(), - indexShardSnapshotStatus, true, future); + indexShardSnapshotStatus, true, Collections.emptyMap(), future); future.actionGet(); final PlainActionFuture finFuture = PlainActionFuture.newFuture(); repository.finalizeSnapshot(snapshotId,