From 5b9864db2c318b5fd9699075b6e8e3ccab565675 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 23 Mar 2020 16:43:41 +0100 Subject: [PATCH] Better Incrementality for Snapshots of Unchanged Shards (#52182) (#53984) Use sequence numbers and force merge UUID to determine whether a shard has changed or not instead before falling back to comparing files to get incremental snapshots on primary fail-over. --- .../BlobStoreIndexShardSnapshots.java | 18 +- .../snapshots/blobstore/SnapshotFiles.java | 21 +- .../repositories/FilterRepository.java | 8 +- .../repositories/Repository.java | 8 +- .../blobstore/BlobStoreRepository.java | 135 ++++++----- .../snapshots/SnapshotShardsService.java | 30 ++- .../RepositoriesServiceTests.java | 4 +- .../repositories/fs/FsRepositoryTests.java | 8 +- .../snapshots/BlobStoreIncrementalityIT.java | 219 ++++++++++++++++++ .../RepositoryFilterUserMetadataIT.java | 10 +- .../index/shard/IndexShardTestCase.java | 3 +- .../index/shard/RestoreOnlyRepository.java | 4 +- .../xpack/ccr/repository/CcrRepository.java | 6 +- .../SourceOnlySnapshotRepository.java | 8 +- .../SourceOnlySnapshotShardTests.java | 14 +- 15 files changed, 396 insertions(+), 100 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/snapshots/BlobStoreIncrementalityIT.java diff --git a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java index eebb5233dbe..5dcd1ee58f5 100644 --- a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java +++ b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/BlobStoreIndexShardSnapshots.java @@ -145,6 +145,7 @@ public class BlobStoreIndexShardSnapshots implements Iterable, To static final class ParseFields { static final ParseField FILES = new ParseField("files"); + static final ParseField SHARD_STATE_ID = new ParseField("shard_state_id"); static final ParseField SNAPSHOTS = new ParseField("snapshots"); } @@ -217,6 +218,9 @@ public class BlobStoreIndexShardSnapshots implements Iterable, To builder.value(fileInfo.name()); } builder.endArray(); + if (snapshot.shardStateIdentifier() != null) { + builder.field(ParseFields.SHARD_STATE_ID.getPreferredName(), snapshot.shardStateIdentifier()); + } builder.endObject(); } builder.endObject(); @@ -229,6 +233,8 @@ public class BlobStoreIndexShardSnapshots implements Iterable, To token = parser.nextToken(); } Map> snapshotsMap = new HashMap<>(); + Map historyUUIDs = new HashMap<>(); + Map globalCheckpoints = new HashMap<>(); Map files = new HashMap<>(); if (token == XContentParser.Token.START_OBJECT) { while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { @@ -260,15 +266,16 @@ public class BlobStoreIndexShardSnapshots implements Iterable, To while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { currentFieldName = parser.currentName(); - if (parser.nextToken() == XContentParser.Token.START_ARRAY) { - if (ParseFields.FILES.match(currentFieldName, parser.getDeprecationHandler()) == false) { - throw new ElasticsearchParseException("unknown array [{}]", currentFieldName); - } + if (ParseFields.FILES.match(currentFieldName, parser.getDeprecationHandler()) && + parser.nextToken() == XContentParser.Token.START_ARRAY) { List fileNames = new ArrayList<>(); while (parser.nextToken() != XContentParser.Token.END_ARRAY) { fileNames.add(parser.text()); } snapshotsMap.put(snapshot, fileNames); + } else if (ParseFields.SHARD_STATE_ID.match(currentFieldName, parser.getDeprecationHandler())) { + parser.nextToken(); + historyUUIDs.put(snapshot, parser.text()); } } } @@ -287,7 +294,8 @@ public class BlobStoreIndexShardSnapshots implements Iterable, To assert fileInfo != null; fileInfosBuilder.add(fileInfo); } - snapshots.add(new SnapshotFiles(entry.getKey(), Collections.unmodifiableList(fileInfosBuilder))); + snapshots.add(new SnapshotFiles(entry.getKey(), Collections.unmodifiableList(fileInfosBuilder), + historyUUIDs.get(entry.getKey()))); } return new BlobStoreIndexShardSnapshots(files, Collections.unmodifiableList(snapshots)); } diff --git a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/SnapshotFiles.java b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/SnapshotFiles.java index 7f0fae6fed4..c11ff41653a 100644 --- a/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/SnapshotFiles.java +++ b/server/src/main/java/org/elasticsearch/index/snapshots/blobstore/SnapshotFiles.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.index.snapshots.blobstore; +import org.elasticsearch.common.Nullable; import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo; import java.util.HashMap; @@ -33,6 +34,9 @@ public class SnapshotFiles { private final List indexFiles; + @Nullable + private final String shardStateIdentifier; + private Map physicalFiles = null; /** @@ -45,12 +49,23 @@ public class SnapshotFiles { } /** - * @param snapshot snapshot name - * @param indexFiles index files + * @param snapshot snapshot name + * @param indexFiles index files + * @param shardStateIdentifier unique identifier for the state of the shard that this snapshot was taken from */ - public SnapshotFiles(String snapshot, List indexFiles ) { + public SnapshotFiles(String snapshot, List indexFiles, @Nullable String shardStateIdentifier) { this.snapshot = snapshot; this.indexFiles = indexFiles; + this.shardStateIdentifier = shardStateIdentifier; + } + + /** + * Returns an identifier for the shard state that can be used to check whether a shard has changed between + * snapshots or not. + */ + @Nullable + public String shardStateIdentifier() { + return shardStateIdentifier; } /** diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index b9d30800162..4af05931e86 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -126,10 +126,10 @@ public class FilterRepository implements Repository { @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion, - Map userMetadata, ActionListener listener) { - in.snapshotShard( - store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, repositoryMetaVersion, userMetadata, listener); + IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus, + Version repositoryMetaVersion, Map userMetadata, ActionListener listener) { + in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, shardStateIdentifier, snapshotStatus, + repositoryMetaVersion, userMetadata, listener); } @Override public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState, diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 83577fa63f2..8d62e15613e 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -27,6 +27,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.shard.ShardId; @@ -212,14 +213,17 @@ public interface Repository extends LifecycleComponent { * @param snapshotId snapshot id * @param indexId id for the index being snapshotted * @param snapshotIndexCommit commit point + * @param shardStateIdentifier a unique identifier of the state of the shard that is stored with the shard's snapshot and used + * to detect if the shard has changed between snapshots. If {@code null} is passed as the identifier + * snapshotting will be done by inspecting the physical files referenced by {@code snapshotIndexCommit} * @param snapshotStatus snapshot status * @param repositoryMetaVersion version of the updated repository metadata to write * @param userMetadata user metadata of the snapshot found in {@link SnapshotsInProgress.Entry#userMetadata()} * @param listener listener invoked on completion */ void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, - IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion, Map userMetadata, - ActionListener listener); + @Nullable String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion, + Map userMetadata, ActionListener listener); /** * Restores snapshot of the shard. diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 2ebd34a20c3..c518b885a2e 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -126,6 +126,7 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; @@ -1552,8 +1553,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion, - Map userMetadata, ActionListener listener) { + IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus, + Version repositoryMetaVersion, Map userMetadata, ActionListener listener) { final ShardId shardId = store.shardId(); final long startTime = threadPool.absoluteTimeInMillis(); try { @@ -1579,76 +1580,92 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp throw new IndexShardSnapshotFailedException(shardId, "Duplicate snapshot name [" + snapshotId.getName() + "] detected, aborting"); } - - final List indexCommitPointFiles = new ArrayList<>(); - final BlockingQueue filesToSnapshot = new LinkedBlockingQueue<>(); - store.incRef(); - final Collection fileNames; - final Store.MetadataSnapshot metadataFromStore; - try { - // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should - try { - logger.trace( - "[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit); - metadataFromStore = store.getMetadata(snapshotIndexCommit); - fileNames = snapshotIndexCommit.getFileNames(); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e); + // First inspect all known SegmentInfos instances to see if we already have an equivalent commit in the repository + final List filesFromSegmentInfos = Optional.ofNullable(shardStateIdentifier).map(id -> { + for (SnapshotFiles snapshotFileSet : snapshots.snapshots()) { + if (id.equals(snapshotFileSet.shardStateIdentifier())) { + return snapshotFileSet.indexFiles(); + } } - } finally { - store.decRef(); - } + return null; + }).orElse(null); + + final List indexCommitPointFiles; int indexIncrementalFileCount = 0; int indexTotalNumberOfFiles = 0; long indexIncrementalSize = 0; - long indexTotalFileCount = 0; - for (String fileName : fileNames) { - if (snapshotStatus.isAborted()) { - logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); - throw new IndexShardSnapshotFailedException(shardId, "Aborted"); + long indexTotalFileSize = 0; + final BlockingQueue filesToSnapshot = new LinkedBlockingQueue<>(); + // If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files + // in the commit with files already in the repository + if (filesFromSegmentInfos == null) { + indexCommitPointFiles = new ArrayList<>(); + store.incRef(); + final Collection fileNames; + final Store.MetadataSnapshot metadataFromStore; + try { + // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should + try { + logger.trace( + "[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit); + metadataFromStore = store.getMetadata(snapshotIndexCommit); + fileNames = snapshotIndexCommit.getFileNames(); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e); + } + } finally { + store.decRef(); } + for (String fileName : fileNames) { + if (snapshotStatus.isAborted()) { + logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); + throw new IndexShardSnapshotFailedException(shardId, "Aborted"); + } - logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName); - final StoreFileMetaData md = metadataFromStore.get(fileName); - BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = null; - List filesInfo = snapshots.findPhysicalIndexFiles(fileName); - if (filesInfo != null) { - for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesInfo) { - if (fileInfo.isSame(md)) { - // a commit point file with the same name, size and checksum was already copied to repository - // we will reuse it for this snapshot - existingFileInfo = fileInfo; - break; + logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName); + final StoreFileMetaData md = metadataFromStore.get(fileName); + BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = null; + List filesInfo = snapshots.findPhysicalIndexFiles(fileName); + if (filesInfo != null) { + for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesInfo) { + if (fileInfo.isSame(md)) { + // a commit point file with the same name, size and checksum was already copied to repository + // we will reuse it for this snapshot + existingFileInfo = fileInfo; + break; + } } } - } - // We can skip writing blobs where the metadata hash is equal to the blob's contents because we store the hash/contents - // directly in the shard level metadata in this case - final boolean needsWrite = md.hashEqualsContents() == false; - indexTotalFileCount += md.length(); - indexTotalNumberOfFiles++; + // We can skip writing blobs where the metadata hash is equal to the blob's contents because we store the hash/contents + // directly in the shard level metadata in this case + final boolean needsWrite = md.hashEqualsContents() == false; + indexTotalFileSize += md.length(); + indexTotalNumberOfFiles++; - if (existingFileInfo == null) { - indexIncrementalFileCount++; - indexIncrementalSize += md.length(); - // create a new FileInfo - BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = - new BlobStoreIndexShardSnapshot.FileInfo( - (needsWrite ? UPLOADED_DATA_BLOB_PREFIX : VIRTUAL_DATA_BLOB_PREFIX) + UUIDs.randomBase64UUID(), - md, chunkSize()); - indexCommitPointFiles.add(snapshotFileInfo); - if (needsWrite) { - filesToSnapshot.add(snapshotFileInfo); + if (existingFileInfo == null) { + indexIncrementalFileCount++; + indexIncrementalSize += md.length(); + // create a new FileInfo + BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = + new BlobStoreIndexShardSnapshot.FileInfo( + (needsWrite ? UPLOADED_DATA_BLOB_PREFIX : VIRTUAL_DATA_BLOB_PREFIX) + UUIDs.randomBase64UUID(), + md, chunkSize()); + indexCommitPointFiles.add(snapshotFileInfo); + if (needsWrite) { + filesToSnapshot.add(snapshotFileInfo); + } + assert needsWrite || assertFileContentsMatchHash(snapshotFileInfo, store); + } else { + indexCommitPointFiles.add(existingFileInfo); } - assert needsWrite || assertFileContentsMatchHash(snapshotFileInfo, store); - } else { - indexCommitPointFiles.add(existingFileInfo); } + } else { + indexCommitPointFiles = filesFromSegmentInfos; } snapshotStatus.moveToStarted(startTime, indexIncrementalFileCount, - indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileCount); + indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileSize); final StepListener> allFilesUploadedListener = new StepListener<>(); allFilesUploadedListener.whenComplete(v -> { @@ -1673,7 +1690,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones List newSnapshotsList = new ArrayList<>(); - newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); + newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles(), shardStateIdentifier)); for (SnapshotFiles point : snapshots) { newSnapshotsList.add(point); } @@ -1760,7 +1777,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp final BlobContainer container = shardContainer(indexId, snapshotShardId); executor.execute(ActionRunnable.wrap(restoreListener, l -> { final BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId); - final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); + final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles(), null); new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) { @Override protected void restoreFiles(List filesToRecover, Store store, diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 92aa371427c..43e6874aa71 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -23,6 +23,7 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.lucene.index.IndexCommit; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; @@ -55,6 +56,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShardState; @@ -339,8 +341,9 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements try { // we flush first to make sure we get the latest writes snapshotted snapshotRef = indexShard.acquireLastIndexCommit(true); + final IndexCommit snapshotIndexCommit = snapshotRef.getIndexCommit(); repository.snapshotShard(indexShard.store(), indexShard.mapperService(), snapshot.getSnapshotId(), indexId, - snapshotRef.getIndexCommit(), snapshotStatus, version, userMetadata, + snapshotRef.getIndexCommit(), getShardStateId(indexShard, snapshotIndexCommit), snapshotStatus, version, userMetadata, ActionListener.runBefore(listener, snapshotRef::close)); } catch (Exception e) { IOUtils.close(snapshotRef); @@ -351,6 +354,31 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements } } + /** + * Generates an identifier from the current state of a shard that can be used to detect whether a shard's contents + * have changed between two snapshots. + * A shard is assumed to have unchanged contents if its global- and local checkpoint are equal, its maximum + * sequence number has not changed and its history- and force-merge-uuid have not changed. + * The method returns {@code null} if global and local checkpoint are different for a shard since no safe unique + * shard state id can be used in this case because of the possibility of a primary failover leading to different + * shard content for the same sequence number on a subsequent snapshot. + * + * @param indexShard Shard + * @param snapshotIndexCommit IndexCommit for shard + * @return shard state id or {@code null} if none can be used + */ + @Nullable + private static String getShardStateId(IndexShard indexShard, IndexCommit snapshotIndexCommit) throws IOException { + final Map userCommitData = snapshotIndexCommit.getUserData(); + final SequenceNumbers.CommitInfo seqNumInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userCommitData.entrySet()); + final long maxSeqNo = seqNumInfo.maxSeqNo; + if (maxSeqNo != seqNumInfo.localCheckpoint || maxSeqNo != indexShard.getLastSyncedGlobalCheckpoint()) { + return null; + } + return userCommitData.get(Engine.HISTORY_UUID_KEY) + "-" + + userCommitData.getOrDefault(Engine.FORCE_MERGE_UUID_KEY, "na") + "-" + maxSeqNo; + } + /** * Checks if any shards were processed that the new master doesn't know about */ diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 35d9b10862d..56ddf1e04f1 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -206,8 +206,8 @@ public class RepositoriesServiceTests extends ESTestCase { @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion, - Map userMetadata, ActionListener listener) { + IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus, + Version repositoryMetaVersion, Map userMetadata, ActionListener listener) { } diff --git a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java index 49ca7091180..22b704389aa 100644 --- a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java @@ -105,8 +105,8 @@ public class FsRepositoryTests extends ESTestCase { final PlainActionFuture future1 = PlainActionFuture.newFuture(); runGeneric(threadPool, () -> { IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null); - repository.snapshotShard(store, null, snapshotId, indexId, indexCommit, snapshotStatus, Version.CURRENT, - Collections.emptyMap(), future1); + repository.snapshotShard(store, null, snapshotId, indexId, indexCommit, null, + snapshotStatus, Version.CURRENT, Collections.emptyMap(), future1); future1.actionGet(); IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy(); assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount()); @@ -134,8 +134,8 @@ public class FsRepositoryTests extends ESTestCase { final PlainActionFuture future2 = PlainActionFuture.newFuture(); runGeneric(threadPool, () -> { IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration); - repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus, Version.CURRENT, - Collections.emptyMap(), future2); + repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, + null, snapshotStatus, Version.CURRENT, Collections.emptyMap(), future2); future2.actionGet(); IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy(); assertEquals(2, copy.getIncrementalFileCount()); diff --git a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreIncrementalityIT.java b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreIncrementalityIT.java new file mode 100644 index 00000000000..0111c2da8de --- /dev/null +++ b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreIncrementalityIT.java @@ -0,0 +1,219 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.snapshots; + +import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStats; +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; +import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse; +import org.elasticsearch.action.admin.indices.stats.IndexStats; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.test.ESIntegTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.is; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) +public class BlobStoreIncrementalityIT extends AbstractSnapshotIntegTestCase { + + public void testIncrementalBehaviorOnPrimaryFailover() throws InterruptedException, ExecutionException, IOException { + internalCluster().startMasterOnlyNode(); + final String primaryNode = internalCluster().startDataOnlyNode(); + final String indexName = "test-index"; + createIndex(indexName, Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0).build()); + ensureYellow(indexName); + final String newPrimary = internalCluster().startDataOnlyNode(); + final Collection toDelete = new ArrayList<>(); + ensureGreen(indexName); + + logger.info("--> adding some documents to test index"); + for (int j = 0; j < randomIntBetween(1, 10); ++j) { + final BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < scaledRandomIntBetween(1, 100); ++i) { + bulkRequest.add(new IndexRequest(indexName).source("foo" + j, "bar" + i)); + } + final BulkResponse bulkResponse = client().bulk(bulkRequest).get(); + for (BulkItemResponse item : bulkResponse.getItems()) { + if (randomBoolean()) { + toDelete.add(item.getId()); + } + } + } + refresh(indexName); + + final long documentCountOriginal = getCountForIndex(indexName); + + final String snapshot1 = "snap-1"; + final String repo = "test-repo"; + logger.info("--> creating repository"); + assertThat(client().admin().cluster().preparePutRepository(repo) + .setType("fs").setSettings(Settings.builder().put("location", randomRepoPath())).execute().actionGet().isAcknowledged(), + is(true)); + + logger.info("--> creating snapshot 1"); + client().admin().cluster().prepareCreateSnapshot(repo, snapshot1).setIndices(indexName).setWaitForCompletion(true).get(); + + logger.info("--> Shutting down initial primary node [{}]", primaryNode); + stopNode(primaryNode); + + ensureYellow(indexName); + final String snapshot2 = "snap-2"; + logger.info("--> creating snapshot 2"); + client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setIndices(indexName).setWaitForCompletion(true).get(); + + assertTwoIdenticalShardSnapshots(repo, indexName, snapshot1, snapshot2); + + ensureRestoreSingleShardSuccessfully(repo, indexName, snapshot1, "-copy-1"); + assertCountInIndexThenDelete(indexName + "-copy-1", documentCountOriginal); + + ensureRestoreSingleShardSuccessfully(repo, indexName, snapshot2, "-copy-2"); + assertCountInIndexThenDelete(indexName + "-copy-2", documentCountOriginal); + + internalCluster().startDataOnlyNode(); + ensureGreen(indexName); + + logger.info("--> delete some documents from test index"); + for (final String id : toDelete) { + assertThat(client().prepareDelete().setIndex(indexName).setId(id).get().getResult(), is(DocWriteResponse.Result.DELETED)); + } + + refresh(indexName); + + final String snapshot3 = "snap-3"; + logger.info("--> creating snapshot 3"); + client().admin().cluster().prepareCreateSnapshot(repo, snapshot3).setIndices(indexName).setWaitForCompletion(true).get(); + + logger.info("--> Shutting down new primary node [{}]", newPrimary); + stopNode(newPrimary); + ensureYellow(indexName); + + final String snapshot4 = "snap-4"; + logger.info("--> creating snapshot 4"); + client().admin().cluster().prepareCreateSnapshot(repo, snapshot4).setIndices(indexName).setWaitForCompletion(true).get(); + + assertTwoIdenticalShardSnapshots(repo, indexName, snapshot3, snapshot4); + + final long countAfterDelete = documentCountOriginal - toDelete.size(); + ensureRestoreSingleShardSuccessfully(repo, indexName, snapshot3, "-copy-3"); + assertCountInIndexThenDelete(indexName + "-copy-3", countAfterDelete); + + ensureRestoreSingleShardSuccessfully(repo, indexName, snapshot4, "-copy-4"); + assertCountInIndexThenDelete(indexName + "-copy-4", countAfterDelete); + } + + public void testForceMergeCausesFullSnapshot() throws Exception { + internalCluster().startMasterOnlyNode(); + internalCluster().ensureAtLeastNumDataNodes(2); + final String indexName = "test-index"; + createIndex(indexName, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).build()); + ensureGreen(indexName); + + logger.info("--> adding some documents to test index and flush in between to get at least two segments"); + for (int j = 0; j < 2; j++) { + final BulkRequest bulkRequest = new BulkRequest(); + for (int i = 0; i < scaledRandomIntBetween(1, 100); ++i) { + bulkRequest.add(new IndexRequest(indexName).source("foo" + j, "bar" + i)); + } + client().bulk(bulkRequest).get(); + flushAndRefresh(indexName); + } + final IndexStats indexStats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName); + assertThat(indexStats.getIndexShards().get(0).getPrimary().getSegments().getCount(), greaterThan(1L)); + + final String snapshot1 = "snap-1"; + final String repo = "test-repo"; + logger.info("--> creating repository"); + assertThat(client().admin().cluster().preparePutRepository(repo) + .setType("fs").setSettings(Settings.builder().put("location", randomRepoPath())).execute().actionGet().isAcknowledged(), + is(true)); + + logger.info("--> creating snapshot 1"); + client().admin().cluster().prepareCreateSnapshot(repo, snapshot1).setIndices(indexName).setWaitForCompletion(true).get(); + + logger.info("--> force merging down to a single segment"); + final ForceMergeResponse forceMergeResponse = + client().admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).setFlush(true).get(); + assertThat(forceMergeResponse.getFailedShards(), is(0)); + + final String snapshot2 = "snap-2"; + logger.info("--> creating snapshot 2"); + client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setIndices(indexName).setWaitForCompletion(true).get(); + + logger.info("--> asserting that the two snapshots refer to different files in the repository"); + final SnapshotsStatusResponse response = + client().admin().cluster().prepareSnapshotStatus(repo).setSnapshots(snapshot2).get(); + final SnapshotStats secondSnapshotShardStatus = + response.getSnapshots().get(0).getIndices().get(indexName).getShards().get(0).getStats(); + assertThat(secondSnapshotShardStatus.getIncrementalFileCount(), greaterThan(0)); + } + + private void assertCountInIndexThenDelete(String index, long expectedCount) throws ExecutionException, InterruptedException { + logger.info("--> asserting that index [{}] contains [{}] documents", index, expectedCount); + assertThat(getCountForIndex(index), is(expectedCount)); + logger.info("--> deleting index [{}]", index); + assertThat(client().admin().indices().prepareDelete(index).get().isAcknowledged(), is(true)); + } + + private void assertTwoIdenticalShardSnapshots(String repo, String indexName, String snapshot1, String snapshot2) { + logger.info( + "--> asserting that snapshots [{}] and [{}] are referring to the same files in the repository", snapshot1, snapshot2); + final SnapshotsStatusResponse response = + client().admin().cluster().prepareSnapshotStatus(repo).setSnapshots(snapshot1, snapshot2).get(); + final SnapshotStats firstSnapshotShardStatus = + response.getSnapshots().get(0).getIndices().get(indexName).getShards().get(0).getStats(); + final int totalFilesInShard = firstSnapshotShardStatus.getTotalFileCount(); + assertThat(totalFilesInShard, greaterThan(0)); + assertThat(firstSnapshotShardStatus.getIncrementalFileCount(), is(totalFilesInShard)); + final SnapshotStats secondSnapshotShardStatus = + response.getSnapshots().get(1).getIndices().get(indexName).getShards().get(0).getStats(); + assertThat(secondSnapshotShardStatus.getTotalFileCount(), is(totalFilesInShard)); + assertThat(secondSnapshotShardStatus.getIncrementalFileCount(), is(0)); + } + + private void ensureRestoreSingleShardSuccessfully(String repo, String indexName, String snapshot, String indexSuffix) { + logger.info("--> restoring [{}]", snapshot); + final RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot(repo, snapshot) + .setIndices(indexName).setRenamePattern("(.+)").setRenameReplacement("$1" + indexSuffix).setWaitForCompletion(true).get(); + final RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo(); + assertThat(restoreInfo.totalShards(), is(1)); + assertThat(restoreInfo.failedShards(), is(0)); + } + + private long getCountForIndex(String indexName) throws ExecutionException, InterruptedException { + return client().search(new SearchRequest(new SearchRequest(indexName).source( + new SearchSourceBuilder().size(0).trackTotalHits(true)))).get().getHits().getTotalHits().value; + } +} diff --git a/server/src/test/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java b/server/src/test/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java index 1f09afdc769..62eeea8b24f 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java @@ -95,12 +95,12 @@ public class RepositoryFilterUserMetadataIT extends ESIntegTestCase { @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, - Version repositoryMetaVersion, Map userMetadata, - ActionListener listener) { + IndexCommit snapshotIndexCommit, String shardStateIdentifier, + IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion, + Map userMetadata, ActionListener listener) { assertThat(userMetadata, is(Collections.singletonMap(MOCK_FILTERED_META, initialMetaValue))); - super.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, - repositoryMetaVersion, userMetadata, listener); + super.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, shardStateIdentifier, + snapshotStatus, repositoryMetaVersion, userMetadata, listener); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index e0dab6f1600..f274b58c922 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -839,7 +839,8 @@ public abstract class IndexShardTestCase extends ESTestCase { final String shardGen; try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) { repository.snapshotShard(shard.store(), shard.mapperService(), snapshot.getSnapshotId(), indexId, - indexCommitRef.getIndexCommit(), snapshotStatus, Version.CURRENT, Collections.emptyMap(), future); + indexCommitRef.getIndexCommit(), null, snapshotStatus, Version.CURRENT, + Collections.emptyMap(), future); shardGen = future.actionGet(); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 67362f8c272..bee6fa65dfc 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -137,8 +137,8 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion, - Map userMetadata, ActionListener listener) { + IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus, + Version repositoryMetaVersion, Map userMetadata, ActionListener listener) { } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 7c5d2d0f27c..ce1ac411622 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -302,8 +302,8 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion, - Map userMetadata, ActionListener listener) { + IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus, + Version repositoryMetaVersion, Map userMetadata, ActionListener listener) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } @@ -495,7 +495,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit ByteSizeValue fileSize = new ByteSizeValue(fileMetaData.length()); fileInfos.add(new FileInfo(fileMetaData.name(), fileMetaData, fileSize)); } - SnapshotFiles snapshotFiles = new SnapshotFiles(LATEST, fileInfos); + SnapshotFiles snapshotFiles = new SnapshotFiles(LATEST, fileInfos, null); restore(snapshotFiles, store, listener); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java index 4afe75bf122..e9a2a9ae8a9 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java @@ -138,8 +138,8 @@ public final class SourceOnlySnapshotRepository extends FilterRepository { @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion, - Map userMetadata, ActionListener listener) { + IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus, + Version repositoryMetaVersion, Map userMetadata, ActionListener listener) { if (mapperService.documentMapper() != null // if there is no mapping this is null && mapperService.documentMapper().sourceMapper().isComplete() == false) { listener.onFailure( @@ -179,8 +179,8 @@ public final class SourceOnlySnapshotRepository extends FilterRepository { Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY, BlockTreeTermsReader.FSTLoadMode.OFF_HEAP.name())); toClose.add(reader); IndexCommit indexCommit = reader.getIndexCommit(); - super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, snapshotStatus, repositoryMetaVersion, - userMetadata, ActionListener.runBefore(listener, () -> IOUtils.close(toClose))); + super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, shardStateIdentifier, snapshotStatus, + repositoryMetaVersion, userMetadata, ActionListener.runBefore(listener, () -> IOUtils.close(toClose))); } catch (IOException e) { try { IOUtils.close(toClose); diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index 1f8bdd840de..5c1fc9ff9d1 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -103,7 +103,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing("-1"); final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, - snapshotRef.getIndexCommit(), indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future)); + snapshotRef.getIndexCommit(), null, indexShardSnapshotStatus, Version.CURRENT, + Collections.emptyMap(), future)); IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, future::actionGet); assertEquals( "Can't snapshot _source only on an index that has incomplete source ie. has _source disabled or filters the source", @@ -129,7 +130,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { SnapshotId snapshotId = new SnapshotId("test", "test"); final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, - snapshotRef.getIndexCommit(), indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future)); + snapshotRef.getIndexCommit(), null, indexShardSnapshotStatus, Version.CURRENT, + Collections.emptyMap(), future)); shardGeneration = future.actionGet(); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount()); @@ -145,7 +147,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration); final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, - snapshotRef.getIndexCommit(), indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future)); + snapshotRef.getIndexCommit(), null, indexShardSnapshotStatus, Version.CURRENT, + Collections.emptyMap(), future)); shardGeneration = future.actionGet(); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); // we processed the segments_N file plus _1.si, _1.fnm, _1.fdx, _1.fdt, _1.fdm @@ -161,7 +164,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration); final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, - snapshotRef.getIndexCommit(), indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future)); + snapshotRef.getIndexCommit(), null, indexShardSnapshotStatus, Version.CURRENT, + Collections.emptyMap(), future)); future.actionGet(); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); // we processed the segments_N file plus _1_1.liv @@ -209,7 +213,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> { repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(), - indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future); + null, indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future); future.actionGet(); final PlainActionFuture finFuture = PlainActionFuture.newFuture(); repository.finalizeSnapshot(snapshotId,