Allow Repository Plugins to Filter Metadata on Create (#51472) (#51542)

* Allow Repository Plugins to Filter Metadata on Create

Add a hook that allows repository plugins to filter the repository metadata
before it gets written to the cluster state.
This commit is contained in:
Armin Braun 2020-01-28 18:33:26 +01:00 committed by GitHub
parent b8adb59e4a
commit aae93a7578
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 170 additions and 42 deletions

View File

@ -125,8 +125,9 @@ public class FilterRepository implements Repository {
@Override @Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
ActionListener<String> listener) { Map<String, Object> userMetadata, ActionListener<String> listener) {
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, writeShardGens, listener); in.snapshotShard(
store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, writeShardGens, userMetadata, listener);
} }
@Override @Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState, public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState,

View File

@ -21,6 +21,7 @@ package org.elasticsearch.repositories;
import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData;
@ -211,10 +212,12 @@ public interface Repository extends LifecycleComponent {
* @param indexId id for the index being snapshotted * @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point * @param snapshotIndexCommit commit point
* @param snapshotStatus snapshot status * @param snapshotStatus snapshot status
* @param userMetadata user metadata of the snapshot found in {@link SnapshotsInProgress.Entry#userMetadata()}
* @param listener listener invoked on completion * @param listener listener invoked on completion
*/ */
void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, ActionListener<String> listener); IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, Map<String, Object> userMetadata,
ActionListener<String> listener);
/** /**
* Restores snapshot of the shard. * Restores snapshot of the shard.
@ -247,4 +250,12 @@ public interface Repository extends LifecycleComponent {
* @param state new cluster state * @param state new cluster state
*/ */
void updateState(ClusterState state); void updateState(ClusterState state);
/**
* Hook that allows a repository to filter the user supplied snapshot metadata in {@link SnapshotsInProgress.Entry#userMetadata()}
* during snapshot initialization.
*/
default Map<String, Object> adaptUserMetadata(Map<String, Object> userMetadata) {
return userMetadata;
}
} }

View File

@ -1459,7 +1459,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override @Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
ActionListener<String> listener) { Map<String, Object> userMetadata, ActionListener<String> listener) {
final ShardId shardId = store.shardId(); final ShardId shardId = store.shardId();
final long startTime = threadPool.absoluteTimeInMillis(); final long startTime = threadPool.absoluteTimeInMillis();
try { try {

View File

@ -283,27 +283,28 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
assert indexId != null; assert indexId != null;
assert entry.useShardGenerations() || snapshotStatus.generation() == null : assert entry.useShardGenerations() || snapshotStatus.generation() == null :
"Found non-null shard generation [" + snapshotStatus.generation() + "] for snapshot with old-format compatibility"; "Found non-null shard generation [" + snapshotStatus.generation() + "] for snapshot with old-format compatibility";
snapshot(shardId, snapshot, indexId, snapshotStatus, entry.useShardGenerations(), new ActionListener<String>() { snapshot(shardId, snapshot, indexId, entry.userMetadata(), snapshotStatus, entry.useShardGenerations(),
@Override new ActionListener<String>() {
public void onResponse(String newGeneration) { @Override
assert newGeneration != null; public void onResponse(String newGeneration) {
assert newGeneration.equals(snapshotStatus.generation()); assert newGeneration != null;
if (logger.isDebugEnabled()) { assert newGeneration.equals(snapshotStatus.generation());
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy(); if (logger.isDebugEnabled()) {
logger.debug("snapshot [{}] completed to [{}] with [{}] at generation [{}]", final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.asCopy();
snapshot, snapshot.getRepository(), lastSnapshotStatus, snapshotStatus.generation()); logger.debug("snapshot [{}] completed to [{}] with [{}] at generation [{}]",
snapshot, snapshot.getRepository(), lastSnapshotStatus, snapshotStatus.generation());
}
notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration);
} }
notifySuccessfulSnapshotShard(snapshot, shardId, newGeneration);
}
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
final String failure = ExceptionsHelper.stackTrace(e); final String failure = ExceptionsHelper.stackTrace(e);
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure); snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), failure);
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e); logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to snapshot shard", shardId, snapshot), e);
notifyFailedSnapshotShard(snapshot, shardId, failure); notifyFailedSnapshotShard(snapshot, shardId, failure);
} }
}); });
} }
}); });
} }
@ -314,7 +315,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
* @param snapshot snapshot * @param snapshot snapshot
* @param snapshotStatus snapshot status * @param snapshotStatus snapshot status
*/ */
private void snapshot(final ShardId shardId, final Snapshot snapshot, final IndexId indexId, private void snapshot(final ShardId shardId, final Snapshot snapshot, final IndexId indexId, final Map<String, Object> userMetadata,
final IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, ActionListener<String> listener) { final IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, ActionListener<String> listener) {
try { try {
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
@ -338,7 +339,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
// we flush first to make sure we get the latest writes snapshotted // we flush first to make sure we get the latest writes snapshotted
snapshotRef = indexShard.acquireLastIndexCommit(true); snapshotRef = indexShard.acquireLastIndexCommit(true);
repository.snapshotShard(indexShard.store(), indexShard.mapperService(), snapshot.getSnapshotId(), indexId, repository.snapshotShard(indexShard.store(), indexShard.mapperService(), snapshot.getSnapshotId(), indexId,
snapshotRef.getIndexCommit(), snapshotStatus, writeShardGens, ActionListener.runBefore(listener, snapshotRef::close)); snapshotRef.getIndexCommit(), snapshotStatus, writeShardGens, userMetadata,
ActionListener.runBefore(listener, snapshotRef::close));
} catch (Exception e) { } catch (Exception e) {
IOUtils.close(snapshotRef); IOUtils.close(snapshotRef);
throw e; throw e;

View File

@ -279,6 +279,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot()); final String snapshotName = indexNameExpressionResolver.resolveDateMathExpression(request.snapshot());
validate(repositoryName, snapshotName); validate(repositoryName, snapshotName);
final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot final SnapshotId snapshotId = new SnapshotId(snapshotName, UUIDs.randomBase64UUID()); // new UUID for the snapshot
Repository repository = repositoriesService.repository(request.repository());
final Map<String, Object> userMeta = repository.adaptUserMetadata(request.userMetadata());
clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() {
private SnapshotsInProgress.Entry newSnapshot = null; private SnapshotsInProgress.Entry newSnapshot = null;
@ -314,7 +316,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
threadPool.absoluteTimeInMillis(), threadPool.absoluteTimeInMillis(),
RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.UNKNOWN_REPO_GEN,
null, null,
request.userMetadata(), false userMeta, false
); );
initializingSnapshots.add(newSnapshot.snapshot()); initializingSnapshots.add(newSnapshot.snapshot());
snapshots = new SnapshotsInProgress(newSnapshot); snapshots = new SnapshotsInProgress(newSnapshot);
@ -337,7 +339,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
final Snapshot current = newSnapshot.snapshot(); final Snapshot current = newSnapshot.snapshot();
assert initializingSnapshots.contains(current); assert initializingSnapshots.contains(current);
assert indices != null; assert indices != null;
beginSnapshot(newState, newSnapshot, request.partial(), indices, new ActionListener<Snapshot>() { beginSnapshot(newState, newSnapshot, request.partial(), indices, repository, new ActionListener<Snapshot>() {
@Override @Override
public void onResponse(final Snapshot snapshot) { public void onResponse(final Snapshot snapshot) {
initializingSnapshots.remove(snapshot); initializingSnapshots.remove(snapshot);
@ -443,6 +445,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
final SnapshotsInProgress.Entry snapshot, final SnapshotsInProgress.Entry snapshot,
final boolean partial, final boolean partial,
final List<String> indices, final List<String> indices,
final Repository repository,
final ActionListener<Snapshot> userCreateSnapshotListener) { final ActionListener<Snapshot> userCreateSnapshotListener) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new AbstractRunnable() {
@ -453,8 +456,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
@Override @Override
protected void doRun() { protected void doRun() {
assert initializingSnapshots.contains(snapshot.snapshot()); assert initializingSnapshots.contains(snapshot.snapshot());
Repository repository = repositoriesService.repository(snapshot.snapshot().getRepository());
if (repository.isReadOnly()) { if (repository.isReadOnly()) {
throw new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository"); throw new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository");
} }

View File

@ -205,7 +205,7 @@ public class RepositoriesServiceTests extends ESTestCase {
@Override @Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit
snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
ActionListener<String> listener) { Map<String, Object> userMetadata, ActionListener<String> listener) {
} }

View File

@ -105,7 +105,8 @@ public class FsRepositoryTests extends ESTestCase {
final PlainActionFuture<String> future1 = PlainActionFuture.newFuture(); final PlainActionFuture<String> future1 = PlainActionFuture.newFuture();
runGeneric(threadPool, () -> { runGeneric(threadPool, () -> {
IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null); IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null);
repository.snapshotShard(store, null, snapshotId, indexId, indexCommit, snapshotStatus, true, future1); repository.snapshotShard(store, null, snapshotId, indexId, indexCommit, snapshotStatus, true,
Collections.emptyMap(), future1);
future1.actionGet(); future1.actionGet();
IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy(); IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy();
assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount()); assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());
@ -133,7 +134,8 @@ public class FsRepositoryTests extends ESTestCase {
final PlainActionFuture<String> future2 = PlainActionFuture.newFuture(); final PlainActionFuture<String> future2 = PlainActionFuture.newFuture();
runGeneric(threadPool, () -> { runGeneric(threadPool, () -> {
IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration); IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration);
repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus, true, future2); repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus, true,
Collections.emptyMap(), future2);
future2.actionGet(); future2.actionGet();
IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy(); IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy();
assertEquals(2, copy.getIncrementalFileCount()); assertEquals(2, copy.getIncrementalFileCount());

View File

@ -0,0 +1,111 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.snapshots;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
import org.elasticsearch.index.store.Store;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.is;
public class RepositoryFilterUserMetadataIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singleton(MetaDataFilteringPlugin.class);
}
public void testFilteredRepoMetaDataIsUsed() {
final String masterName = internalCluster().getMasterName();
final String repoName = "test-repo";
assertAcked(client().admin().cluster().preparePutRepository(repoName).setType(MetaDataFilteringPlugin.TYPE).setSettings(
Settings.builder().put("location", randomRepoPath())
.put(MetaDataFilteringPlugin.MASTER_SETTING_VALUE, masterName)));
createIndex("test-idx");
final SnapshotInfo snapshotInfo = client().admin().cluster().prepareCreateSnapshot(repoName, "test-snap")
.setWaitForCompletion(true).get().getSnapshotInfo();
assertThat(snapshotInfo.userMetadata(), is(Collections.singletonMap(MetaDataFilteringPlugin.MOCK_FILTERED_META, masterName)));
}
// Mock plugin that stores the name of the master node that started a snapshot in each snapshot's metadata
public static final class MetaDataFilteringPlugin extends org.elasticsearch.plugins.Plugin implements RepositoryPlugin {
private static final String MOCK_FILTERED_META = "mock_filtered_meta";
private static final String MASTER_SETTING_VALUE = "initial_master";
private static final String TYPE = "mock_meta_filtering";
@Override
public Map<String, Repository.Factory> getRepositories(Environment env, NamedXContentRegistry namedXContentRegistry,
ClusterService clusterService) {
return Collections.singletonMap("mock_meta_filtering", metadata ->
new FsRepository(metadata, env, namedXContentRegistry, clusterService) {
// Storing the initially expected metadata value here to verify that #filterUserMetadata is only called once on the
// initial master node starting the snapshot
private final String initialMetaValue = metadata.settings().get(MASTER_SETTING_VALUE);
@Override
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, MetaData clusterMetaData, Map<String, Object> userMetadata,
boolean writeShardGens, ActionListener<SnapshotInfo> listener) {
assertThat(userMetadata, is(Collections.singletonMap(MOCK_FILTERED_META, initialMetaValue)));
super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures,
repositoryStateId, includeGlobalState, clusterMetaData, userMetadata, writeShardGens, listener);
}
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus,
boolean writeShardGens, Map<String, Object> userMetadata, ActionListener<String> listener) {
assertThat(userMetadata, is(Collections.singletonMap(MOCK_FILTERED_META, initialMetaValue)));
super.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus,
writeShardGens, userMetadata, listener);
}
@Override
public Map<String, Object> adaptUserMetadata(Map<String, Object> userMetadata) {
return Collections.singletonMap(MOCK_FILTERED_META, clusterService.getNodeName());
}
});
}
}
}

View File

@ -840,7 +840,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
final String shardGen; final String shardGen;
try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) { try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) {
repository.snapshotShard(shard.store(), shard.mapperService(), snapshot.getSnapshotId(), indexId, repository.snapshotShard(shard.store(), shard.mapperService(), snapshot.getSnapshotId(), indexId,
indexCommitRef.getIndexCommit(), snapshotStatus, true, future); indexCommitRef.getIndexCommit(), snapshotStatus, true, Collections.emptyMap(), future);
shardGen = future.actionGet(); shardGen = future.actionGet();
} }

View File

@ -136,7 +136,7 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
@Override @Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
ActionListener<String> listener) { Map<String, Object> userMetadata, ActionListener<String> listener) {
} }
@Override @Override

View File

@ -302,7 +302,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
@Override @Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
ActionListener<String> listener) { Map<String, Object> userMetadata, ActionListener<String> listener) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
} }

View File

@ -137,7 +137,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
@Override @Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
ActionListener<String> listener) { Map<String, Object> userMetadata, ActionListener<String> listener) {
if (mapperService.documentMapper() != null // if there is no mapping this is null if (mapperService.documentMapper() != null // if there is no mapping this is null
&& mapperService.documentMapper().sourceMapper().isComplete() == false) { && mapperService.documentMapper().sourceMapper().isComplete() == false) {
listener.onFailure( listener.onFailure(
@ -177,7 +177,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
toClose.add(reader); toClose.add(reader);
IndexCommit indexCommit = reader.getIndexCommit(); IndexCommit indexCommit = reader.getIndexCommit();
super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, snapshotStatus, writeShardGens, super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, snapshotStatus, writeShardGens,
ActionListener.runBefore(listener, () -> IOUtils.close(toClose))); userMetadata, ActionListener.runBefore(listener, () -> IOUtils.close(toClose)));
} catch (IOException e) { } catch (IOException e) {
try { try {
IOUtils.close(toClose); IOUtils.close(toClose);

View File

@ -103,7 +103,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing("-1"); IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing("-1");
final PlainActionFuture<String> future = PlainActionFuture.newFuture(); final PlainActionFuture<String> future = PlainActionFuture.newFuture();
runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, future)); snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, Collections.emptyMap(), future));
IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, future::actionGet); IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, future::actionGet);
assertEquals( assertEquals(
"Can't snapshot _source only on an index that has incomplete source ie. has _source disabled or filters the source", "Can't snapshot _source only on an index that has incomplete source ie. has _source disabled or filters the source",
@ -129,7 +129,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
SnapshotId snapshotId = new SnapshotId("test", "test"); SnapshotId snapshotId = new SnapshotId("test", "test");
final PlainActionFuture<String> future = PlainActionFuture.newFuture(); final PlainActionFuture<String> future = PlainActionFuture.newFuture();
runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, future)); snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, Collections.emptyMap(), future));
shardGeneration = future.actionGet(); shardGeneration = future.actionGet();
IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount()); assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());
@ -145,7 +145,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration); IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration);
final PlainActionFuture<String> future = PlainActionFuture.newFuture(); final PlainActionFuture<String> future = PlainActionFuture.newFuture();
runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, future)); snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, Collections.emptyMap(), future));
shardGeneration = future.actionGet(); shardGeneration = future.actionGet();
IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
// we processed the segments_N file plus _1.si, _1.fdx, _1.fnm, _1.fdt // we processed the segments_N file plus _1.si, _1.fdx, _1.fnm, _1.fdt
@ -161,7 +161,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration); IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration);
final PlainActionFuture<String> future = PlainActionFuture.newFuture(); final PlainActionFuture<String> future = PlainActionFuture.newFuture();
runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, future)); snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, Collections.emptyMap(), future));
future.actionGet(); future.actionGet();
IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
// we processed the segments_N file plus _1_1.liv // we processed the segments_N file plus _1_1.liv
@ -209,7 +209,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
final PlainActionFuture<String> future = PlainActionFuture.newFuture(); final PlainActionFuture<String> future = PlainActionFuture.newFuture();
runAsSnapshot(shard.getThreadPool(), () -> { runAsSnapshot(shard.getThreadPool(), () -> {
repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(), repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(),
indexShardSnapshotStatus, true, future); indexShardSnapshotStatus, true, Collections.emptyMap(), future);
future.actionGet(); future.actionGet();
final PlainActionFuture<SnapshotInfo> finFuture = PlainActionFuture.newFuture(); final PlainActionFuture<SnapshotInfo> finFuture = PlainActionFuture.newFuture();
repository.finalizeSnapshot(snapshotId, repository.finalizeSnapshot(snapshotId,