Better Incrementality for Snapshots of Unchanged Shards (#52182) (#53984)

Use sequence numbers and force merge UUID to determine whether a shard has changed or not instead before falling back to comparing files to get incremental snapshots on primary fail-over.
This commit is contained in:
Armin Braun 2020-03-23 16:43:41 +01:00 committed by GitHub
parent 8b9d6e6dbb
commit 5b9864db2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 396 additions and 100 deletions

View File

@ -145,6 +145,7 @@ public class BlobStoreIndexShardSnapshots implements Iterable<SnapshotFiles>, To
static final class ParseFields {
static final ParseField FILES = new ParseField("files");
static final ParseField SHARD_STATE_ID = new ParseField("shard_state_id");
static final ParseField SNAPSHOTS = new ParseField("snapshots");
}
@ -217,6 +218,9 @@ public class BlobStoreIndexShardSnapshots implements Iterable<SnapshotFiles>, To
builder.value(fileInfo.name());
}
builder.endArray();
if (snapshot.shardStateIdentifier() != null) {
builder.field(ParseFields.SHARD_STATE_ID.getPreferredName(), snapshot.shardStateIdentifier());
}
builder.endObject();
}
builder.endObject();
@ -229,6 +233,8 @@ public class BlobStoreIndexShardSnapshots implements Iterable<SnapshotFiles>, To
token = parser.nextToken();
}
Map<String, List<String>> snapshotsMap = new HashMap<>();
Map<String, String> historyUUIDs = new HashMap<>();
Map<String, Long> globalCheckpoints = new HashMap<>();
Map<String, FileInfo> files = new HashMap<>();
if (token == XContentParser.Token.START_OBJECT) {
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
@ -260,15 +266,16 @@ public class BlobStoreIndexShardSnapshots implements Iterable<SnapshotFiles>, To
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
if (parser.nextToken() == XContentParser.Token.START_ARRAY) {
if (ParseFields.FILES.match(currentFieldName, parser.getDeprecationHandler()) == false) {
throw new ElasticsearchParseException("unknown array [{}]", currentFieldName);
}
if (ParseFields.FILES.match(currentFieldName, parser.getDeprecationHandler()) &&
parser.nextToken() == XContentParser.Token.START_ARRAY) {
List<String> fileNames = new ArrayList<>();
while (parser.nextToken() != XContentParser.Token.END_ARRAY) {
fileNames.add(parser.text());
}
snapshotsMap.put(snapshot, fileNames);
} else if (ParseFields.SHARD_STATE_ID.match(currentFieldName, parser.getDeprecationHandler())) {
parser.nextToken();
historyUUIDs.put(snapshot, parser.text());
}
}
}
@ -287,7 +294,8 @@ public class BlobStoreIndexShardSnapshots implements Iterable<SnapshotFiles>, To
assert fileInfo != null;
fileInfosBuilder.add(fileInfo);
}
snapshots.add(new SnapshotFiles(entry.getKey(), Collections.unmodifiableList(fileInfosBuilder)));
snapshots.add(new SnapshotFiles(entry.getKey(), Collections.unmodifiableList(fileInfosBuilder),
historyUUIDs.get(entry.getKey())));
}
return new BlobStoreIndexShardSnapshots(files, Collections.unmodifiableList(snapshots));
}

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.index.snapshots.blobstore;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo;
import java.util.HashMap;
@ -33,6 +34,9 @@ public class SnapshotFiles {
private final List<FileInfo> indexFiles;
@Nullable
private final String shardStateIdentifier;
private Map<String, FileInfo> physicalFiles = null;
/**
@ -45,12 +49,23 @@ public class SnapshotFiles {
}
/**
* @param snapshot snapshot name
* @param indexFiles index files
* @param snapshot snapshot name
* @param indexFiles index files
* @param shardStateIdentifier unique identifier for the state of the shard that this snapshot was taken from
*/
public SnapshotFiles(String snapshot, List<FileInfo> indexFiles ) {
public SnapshotFiles(String snapshot, List<FileInfo> indexFiles, @Nullable String shardStateIdentifier) {
this.snapshot = snapshot;
this.indexFiles = indexFiles;
this.shardStateIdentifier = shardStateIdentifier;
}
/**
* Returns an identifier for the shard state that can be used to check whether a shard has changed between
* snapshots or not.
*/
@Nullable
public String shardStateIdentifier() {
return shardStateIdentifier;
}
/**

View File

@ -126,10 +126,10 @@ public class FilterRepository implements Repository {
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
Map<String, Object> userMetadata, ActionListener<String> listener) {
in.snapshotShard(
store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, repositoryMetaVersion, userMetadata, listener);
IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus,
Version repositoryMetaVersion, Map<String, Object> userMetadata, ActionListener<String> listener) {
in.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, shardStateIdentifier, snapshotStatus,
repositoryMetaVersion, userMetadata, listener);
}
@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState,

View File

@ -27,6 +27,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.ShardId;
@ -212,14 +213,17 @@ public interface Repository extends LifecycleComponent {
* @param snapshotId snapshot id
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param shardStateIdentifier a unique identifier of the state of the shard that is stored with the shard's snapshot and used
* to detect if the shard has changed between snapshots. If {@code null} is passed as the identifier
* snapshotting will be done by inspecting the physical files referenced by {@code snapshotIndexCommit}
* @param snapshotStatus snapshot status
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param userMetadata user metadata of the snapshot found in {@link SnapshotsInProgress.Entry#userMetadata()}
* @param listener listener invoked on completion
*/
void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion, Map<String, Object> userMetadata,
ActionListener<String> listener);
@Nullable String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
Map<String, Object> userMetadata, ActionListener<String> listener);
/**
* Restores snapshot of the shard.

View File

@ -126,6 +126,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
@ -1552,8 +1553,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
Map<String, Object> userMetadata, ActionListener<String> listener) {
IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus,
Version repositoryMetaVersion, Map<String, Object> userMetadata, ActionListener<String> listener) {
final ShardId shardId = store.shardId();
final long startTime = threadPool.absoluteTimeInMillis();
try {
@ -1579,76 +1580,92 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
throw new IndexShardSnapshotFailedException(shardId,
"Duplicate snapshot name [" + snapshotId.getName() + "] detected, aborting");
}
final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles = new ArrayList<>();
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new LinkedBlockingQueue<>();
store.incRef();
final Collection<String> fileNames;
final Store.MetadataSnapshot metadataFromStore;
try {
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
try {
logger.trace(
"[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit);
metadataFromStore = store.getMetadata(snapshotIndexCommit);
fileNames = snapshotIndexCommit.getFileNames();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
// First inspect all known SegmentInfos instances to see if we already have an equivalent commit in the repository
final List<BlobStoreIndexShardSnapshot.FileInfo> filesFromSegmentInfos = Optional.ofNullable(shardStateIdentifier).map(id -> {
for (SnapshotFiles snapshotFileSet : snapshots.snapshots()) {
if (id.equals(snapshotFileSet.shardStateIdentifier())) {
return snapshotFileSet.indexFiles();
}
}
} finally {
store.decRef();
}
return null;
}).orElse(null);
final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles;
int indexIncrementalFileCount = 0;
int indexTotalNumberOfFiles = 0;
long indexIncrementalSize = 0;
long indexTotalFileCount = 0;
for (String fileName : fileNames) {
if (snapshotStatus.isAborted()) {
logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName);
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
long indexTotalFileSize = 0;
final BlockingQueue<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new LinkedBlockingQueue<>();
// If we did not find a set of files that is equal to the current commit we determine the files to upload by comparing files
// in the commit with files already in the repository
if (filesFromSegmentInfos == null) {
indexCommitPointFiles = new ArrayList<>();
store.incRef();
final Collection<String> fileNames;
final Store.MetadataSnapshot metadataFromStore;
try {
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
try {
logger.trace(
"[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit);
metadataFromStore = store.getMetadata(snapshotIndexCommit);
fileNames = snapshotIndexCommit.getFileNames();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
}
} finally {
store.decRef();
}
for (String fileName : fileNames) {
if (snapshotStatus.isAborted()) {
logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName);
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
}
logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName);
final StoreFileMetaData md = metadataFromStore.get(fileName);
BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = null;
List<BlobStoreIndexShardSnapshot.FileInfo> filesInfo = snapshots.findPhysicalIndexFiles(fileName);
if (filesInfo != null) {
for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesInfo) {
if (fileInfo.isSame(md)) {
// a commit point file with the same name, size and checksum was already copied to repository
// we will reuse it for this snapshot
existingFileInfo = fileInfo;
break;
logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName);
final StoreFileMetaData md = metadataFromStore.get(fileName);
BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = null;
List<BlobStoreIndexShardSnapshot.FileInfo> filesInfo = snapshots.findPhysicalIndexFiles(fileName);
if (filesInfo != null) {
for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesInfo) {
if (fileInfo.isSame(md)) {
// a commit point file with the same name, size and checksum was already copied to repository
// we will reuse it for this snapshot
existingFileInfo = fileInfo;
break;
}
}
}
}
// We can skip writing blobs where the metadata hash is equal to the blob's contents because we store the hash/contents
// directly in the shard level metadata in this case
final boolean needsWrite = md.hashEqualsContents() == false;
indexTotalFileCount += md.length();
indexTotalNumberOfFiles++;
// We can skip writing blobs where the metadata hash is equal to the blob's contents because we store the hash/contents
// directly in the shard level metadata in this case
final boolean needsWrite = md.hashEqualsContents() == false;
indexTotalFileSize += md.length();
indexTotalNumberOfFiles++;
if (existingFileInfo == null) {
indexIncrementalFileCount++;
indexIncrementalSize += md.length();
// create a new FileInfo
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo =
new BlobStoreIndexShardSnapshot.FileInfo(
(needsWrite ? UPLOADED_DATA_BLOB_PREFIX : VIRTUAL_DATA_BLOB_PREFIX) + UUIDs.randomBase64UUID(),
md, chunkSize());
indexCommitPointFiles.add(snapshotFileInfo);
if (needsWrite) {
filesToSnapshot.add(snapshotFileInfo);
if (existingFileInfo == null) {
indexIncrementalFileCount++;
indexIncrementalSize += md.length();
// create a new FileInfo
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo =
new BlobStoreIndexShardSnapshot.FileInfo(
(needsWrite ? UPLOADED_DATA_BLOB_PREFIX : VIRTUAL_DATA_BLOB_PREFIX) + UUIDs.randomBase64UUID(),
md, chunkSize());
indexCommitPointFiles.add(snapshotFileInfo);
if (needsWrite) {
filesToSnapshot.add(snapshotFileInfo);
}
assert needsWrite || assertFileContentsMatchHash(snapshotFileInfo, store);
} else {
indexCommitPointFiles.add(existingFileInfo);
}
assert needsWrite || assertFileContentsMatchHash(snapshotFileInfo, store);
} else {
indexCommitPointFiles.add(existingFileInfo);
}
} else {
indexCommitPointFiles = filesFromSegmentInfos;
}
snapshotStatus.moveToStarted(startTime, indexIncrementalFileCount,
indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileCount);
indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileSize);
final StepListener<Collection<Void>> allFilesUploadedListener = new StepListener<>();
allFilesUploadedListener.whenComplete(v -> {
@ -1673,7 +1690,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
// build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()));
newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles(), shardStateIdentifier));
for (SnapshotFiles point : snapshots) {
newSnapshotsList.add(point);
}
@ -1760,7 +1777,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
final BlobContainer container = shardContainer(indexId, snapshotShardId);
executor.execute(ActionRunnable.wrap(restoreListener, l -> {
final BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId);
final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
final SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles(), null);
new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState) {
@Override
protected void restoreFiles(List<BlobStoreIndexShardSnapshot.FileInfo> filesToRecover, Store store,

View File

@ -23,6 +23,7 @@ import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
@ -55,6 +56,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
@ -339,8 +341,9 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
try {
// we flush first to make sure we get the latest writes snapshotted
snapshotRef = indexShard.acquireLastIndexCommit(true);
final IndexCommit snapshotIndexCommit = snapshotRef.getIndexCommit();
repository.snapshotShard(indexShard.store(), indexShard.mapperService(), snapshot.getSnapshotId(), indexId,
snapshotRef.getIndexCommit(), snapshotStatus, version, userMetadata,
snapshotRef.getIndexCommit(), getShardStateId(indexShard, snapshotIndexCommit), snapshotStatus, version, userMetadata,
ActionListener.runBefore(listener, snapshotRef::close));
} catch (Exception e) {
IOUtils.close(snapshotRef);
@ -351,6 +354,31 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
}
}
/**
* Generates an identifier from the current state of a shard that can be used to detect whether a shard's contents
* have changed between two snapshots.
* A shard is assumed to have unchanged contents if its global- and local checkpoint are equal, its maximum
* sequence number has not changed and its history- and force-merge-uuid have not changed.
* The method returns {@code null} if global and local checkpoint are different for a shard since no safe unique
* shard state id can be used in this case because of the possibility of a primary failover leading to different
* shard content for the same sequence number on a subsequent snapshot.
*
* @param indexShard Shard
* @param snapshotIndexCommit IndexCommit for shard
* @return shard state id or {@code null} if none can be used
*/
@Nullable
private static String getShardStateId(IndexShard indexShard, IndexCommit snapshotIndexCommit) throws IOException {
final Map<String, String> userCommitData = snapshotIndexCommit.getUserData();
final SequenceNumbers.CommitInfo seqNumInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(userCommitData.entrySet());
final long maxSeqNo = seqNumInfo.maxSeqNo;
if (maxSeqNo != seqNumInfo.localCheckpoint || maxSeqNo != indexShard.getLastSyncedGlobalCheckpoint()) {
return null;
}
return userCommitData.get(Engine.HISTORY_UUID_KEY) + "-" +
userCommitData.getOrDefault(Engine.FORCE_MERGE_UUID_KEY, "na") + "-" + maxSeqNo;
}
/**
* Checks if any shards were processed that the new master doesn't know about
*/

View File

@ -206,8 +206,8 @@ public class RepositoriesServiceTests extends ESTestCase {
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
Map<String, Object> userMetadata, ActionListener<String> listener) {
IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus,
Version repositoryMetaVersion, Map<String, Object> userMetadata, ActionListener<String> listener) {
}

View File

@ -105,8 +105,8 @@ public class FsRepositoryTests extends ESTestCase {
final PlainActionFuture<String> future1 = PlainActionFuture.newFuture();
runGeneric(threadPool, () -> {
IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null);
repository.snapshotShard(store, null, snapshotId, indexId, indexCommit, snapshotStatus, Version.CURRENT,
Collections.emptyMap(), future1);
repository.snapshotShard(store, null, snapshotId, indexId, indexCommit, null,
snapshotStatus, Version.CURRENT, Collections.emptyMap(), future1);
future1.actionGet();
IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy();
assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());
@ -134,8 +134,8 @@ public class FsRepositoryTests extends ESTestCase {
final PlainActionFuture<String> future2 = PlainActionFuture.newFuture();
runGeneric(threadPool, () -> {
IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration);
repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus, Version.CURRENT,
Collections.emptyMap(), future2);
repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit,
null, snapshotStatus, Version.CURRENT, Collections.emptyMap(), future2);
future2.actionGet();
IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy();
assertEquals(2, copy.getIncrementalFileCount());

View File

@ -0,0 +1,219 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.snapshots;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStats;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.test.ESIntegTestCase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class BlobStoreIncrementalityIT extends AbstractSnapshotIntegTestCase {
public void testIncrementalBehaviorOnPrimaryFailover() throws InterruptedException, ExecutionException, IOException {
internalCluster().startMasterOnlyNode();
final String primaryNode = internalCluster().startDataOnlyNode();
final String indexName = "test-index";
createIndex(indexName, Settings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), 0).build());
ensureYellow(indexName);
final String newPrimary = internalCluster().startDataOnlyNode();
final Collection<String> toDelete = new ArrayList<>();
ensureGreen(indexName);
logger.info("--> adding some documents to test index");
for (int j = 0; j < randomIntBetween(1, 10); ++j) {
final BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < scaledRandomIntBetween(1, 100); ++i) {
bulkRequest.add(new IndexRequest(indexName).source("foo" + j, "bar" + i));
}
final BulkResponse bulkResponse = client().bulk(bulkRequest).get();
for (BulkItemResponse item : bulkResponse.getItems()) {
if (randomBoolean()) {
toDelete.add(item.getId());
}
}
}
refresh(indexName);
final long documentCountOriginal = getCountForIndex(indexName);
final String snapshot1 = "snap-1";
final String repo = "test-repo";
logger.info("--> creating repository");
assertThat(client().admin().cluster().preparePutRepository(repo)
.setType("fs").setSettings(Settings.builder().put("location", randomRepoPath())).execute().actionGet().isAcknowledged(),
is(true));
logger.info("--> creating snapshot 1");
client().admin().cluster().prepareCreateSnapshot(repo, snapshot1).setIndices(indexName).setWaitForCompletion(true).get();
logger.info("--> Shutting down initial primary node [{}]", primaryNode);
stopNode(primaryNode);
ensureYellow(indexName);
final String snapshot2 = "snap-2";
logger.info("--> creating snapshot 2");
client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setIndices(indexName).setWaitForCompletion(true).get();
assertTwoIdenticalShardSnapshots(repo, indexName, snapshot1, snapshot2);
ensureRestoreSingleShardSuccessfully(repo, indexName, snapshot1, "-copy-1");
assertCountInIndexThenDelete(indexName + "-copy-1", documentCountOriginal);
ensureRestoreSingleShardSuccessfully(repo, indexName, snapshot2, "-copy-2");
assertCountInIndexThenDelete(indexName + "-copy-2", documentCountOriginal);
internalCluster().startDataOnlyNode();
ensureGreen(indexName);
logger.info("--> delete some documents from test index");
for (final String id : toDelete) {
assertThat(client().prepareDelete().setIndex(indexName).setId(id).get().getResult(), is(DocWriteResponse.Result.DELETED));
}
refresh(indexName);
final String snapshot3 = "snap-3";
logger.info("--> creating snapshot 3");
client().admin().cluster().prepareCreateSnapshot(repo, snapshot3).setIndices(indexName).setWaitForCompletion(true).get();
logger.info("--> Shutting down new primary node [{}]", newPrimary);
stopNode(newPrimary);
ensureYellow(indexName);
final String snapshot4 = "snap-4";
logger.info("--> creating snapshot 4");
client().admin().cluster().prepareCreateSnapshot(repo, snapshot4).setIndices(indexName).setWaitForCompletion(true).get();
assertTwoIdenticalShardSnapshots(repo, indexName, snapshot3, snapshot4);
final long countAfterDelete = documentCountOriginal - toDelete.size();
ensureRestoreSingleShardSuccessfully(repo, indexName, snapshot3, "-copy-3");
assertCountInIndexThenDelete(indexName + "-copy-3", countAfterDelete);
ensureRestoreSingleShardSuccessfully(repo, indexName, snapshot4, "-copy-4");
assertCountInIndexThenDelete(indexName + "-copy-4", countAfterDelete);
}
public void testForceMergeCausesFullSnapshot() throws Exception {
internalCluster().startMasterOnlyNode();
internalCluster().ensureAtLeastNumDataNodes(2);
final String indexName = "test-index";
createIndex(indexName, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1).build());
ensureGreen(indexName);
logger.info("--> adding some documents to test index and flush in between to get at least two segments");
for (int j = 0; j < 2; j++) {
final BulkRequest bulkRequest = new BulkRequest();
for (int i = 0; i < scaledRandomIntBetween(1, 100); ++i) {
bulkRequest.add(new IndexRequest(indexName).source("foo" + j, "bar" + i));
}
client().bulk(bulkRequest).get();
flushAndRefresh(indexName);
}
final IndexStats indexStats = client().admin().indices().prepareStats(indexName).get().getIndex(indexName);
assertThat(indexStats.getIndexShards().get(0).getPrimary().getSegments().getCount(), greaterThan(1L));
final String snapshot1 = "snap-1";
final String repo = "test-repo";
logger.info("--> creating repository");
assertThat(client().admin().cluster().preparePutRepository(repo)
.setType("fs").setSettings(Settings.builder().put("location", randomRepoPath())).execute().actionGet().isAcknowledged(),
is(true));
logger.info("--> creating snapshot 1");
client().admin().cluster().prepareCreateSnapshot(repo, snapshot1).setIndices(indexName).setWaitForCompletion(true).get();
logger.info("--> force merging down to a single segment");
final ForceMergeResponse forceMergeResponse =
client().admin().indices().prepareForceMerge(indexName).setMaxNumSegments(1).setFlush(true).get();
assertThat(forceMergeResponse.getFailedShards(), is(0));
final String snapshot2 = "snap-2";
logger.info("--> creating snapshot 2");
client().admin().cluster().prepareCreateSnapshot(repo, snapshot2).setIndices(indexName).setWaitForCompletion(true).get();
logger.info("--> asserting that the two snapshots refer to different files in the repository");
final SnapshotsStatusResponse response =
client().admin().cluster().prepareSnapshotStatus(repo).setSnapshots(snapshot2).get();
final SnapshotStats secondSnapshotShardStatus =
response.getSnapshots().get(0).getIndices().get(indexName).getShards().get(0).getStats();
assertThat(secondSnapshotShardStatus.getIncrementalFileCount(), greaterThan(0));
}
private void assertCountInIndexThenDelete(String index, long expectedCount) throws ExecutionException, InterruptedException {
logger.info("--> asserting that index [{}] contains [{}] documents", index, expectedCount);
assertThat(getCountForIndex(index), is(expectedCount));
logger.info("--> deleting index [{}]", index);
assertThat(client().admin().indices().prepareDelete(index).get().isAcknowledged(), is(true));
}
private void assertTwoIdenticalShardSnapshots(String repo, String indexName, String snapshot1, String snapshot2) {
logger.info(
"--> asserting that snapshots [{}] and [{}] are referring to the same files in the repository", snapshot1, snapshot2);
final SnapshotsStatusResponse response =
client().admin().cluster().prepareSnapshotStatus(repo).setSnapshots(snapshot1, snapshot2).get();
final SnapshotStats firstSnapshotShardStatus =
response.getSnapshots().get(0).getIndices().get(indexName).getShards().get(0).getStats();
final int totalFilesInShard = firstSnapshotShardStatus.getTotalFileCount();
assertThat(totalFilesInShard, greaterThan(0));
assertThat(firstSnapshotShardStatus.getIncrementalFileCount(), is(totalFilesInShard));
final SnapshotStats secondSnapshotShardStatus =
response.getSnapshots().get(1).getIndices().get(indexName).getShards().get(0).getStats();
assertThat(secondSnapshotShardStatus.getTotalFileCount(), is(totalFilesInShard));
assertThat(secondSnapshotShardStatus.getIncrementalFileCount(), is(0));
}
private void ensureRestoreSingleShardSuccessfully(String repo, String indexName, String snapshot, String indexSuffix) {
logger.info("--> restoring [{}]", snapshot);
final RestoreSnapshotResponse restoreSnapshotResponse = client().admin().cluster().prepareRestoreSnapshot(repo, snapshot)
.setIndices(indexName).setRenamePattern("(.+)").setRenameReplacement("$1" + indexSuffix).setWaitForCompletion(true).get();
final RestoreInfo restoreInfo = restoreSnapshotResponse.getRestoreInfo();
assertThat(restoreInfo.totalShards(), is(1));
assertThat(restoreInfo.failedShards(), is(0));
}
private long getCountForIndex(String indexName) throws ExecutionException, InterruptedException {
return client().search(new SearchRequest(new SearchRequest(indexName).source(
new SearchSourceBuilder().size(0).trackTotalHits(true)))).get().getHits().getTotalHits().value;
}
}

View File

@ -95,12 +95,12 @@ public class RepositoryFilterUserMetadataIT extends ESIntegTestCase {
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus,
Version repositoryMetaVersion, Map<String, Object> userMetadata,
ActionListener<String> listener) {
IndexCommit snapshotIndexCommit, String shardStateIdentifier,
IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
Map<String, Object> userMetadata, ActionListener<String> listener) {
assertThat(userMetadata, is(Collections.singletonMap(MOCK_FILTERED_META, initialMetaValue)));
super.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus,
repositoryMetaVersion, userMetadata, listener);
super.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, shardStateIdentifier,
snapshotStatus, repositoryMetaVersion, userMetadata, listener);
}
@Override

View File

@ -839,7 +839,8 @@ public abstract class IndexShardTestCase extends ESTestCase {
final String shardGen;
try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) {
repository.snapshotShard(shard.store(), shard.mapperService(), snapshot.getSnapshotId(), indexId,
indexCommitRef.getIndexCommit(), snapshotStatus, Version.CURRENT, Collections.emptyMap(), future);
indexCommitRef.getIndexCommit(), null, snapshotStatus, Version.CURRENT,
Collections.emptyMap(), future);
shardGen = future.actionGet();
}

View File

@ -137,8 +137,8 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
Map<String, Object> userMetadata, ActionListener<String> listener) {
IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus,
Version repositoryMetaVersion, Map<String, Object> userMetadata, ActionListener<String> listener) {
}
@Override

View File

@ -302,8 +302,8 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
Map<String, Object> userMetadata, ActionListener<String> listener) {
IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus,
Version repositoryMetaVersion, Map<String, Object> userMetadata, ActionListener<String> listener) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}
@ -495,7 +495,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
ByteSizeValue fileSize = new ByteSizeValue(fileMetaData.length());
fileInfos.add(new FileInfo(fileMetaData.name(), fileMetaData, fileSize));
}
SnapshotFiles snapshotFiles = new SnapshotFiles(LATEST, fileInfos);
SnapshotFiles snapshotFiles = new SnapshotFiles(LATEST, fileInfos, null);
restore(snapshotFiles, store, listener);
}

View File

@ -138,8 +138,8 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
Map<String, Object> userMetadata, ActionListener<String> listener) {
IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus,
Version repositoryMetaVersion, Map<String, Object> userMetadata, ActionListener<String> listener) {
if (mapperService.documentMapper() != null // if there is no mapping this is null
&& mapperService.documentMapper().sourceMapper().isComplete() == false) {
listener.onFailure(
@ -179,8 +179,8 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY, BlockTreeTermsReader.FSTLoadMode.OFF_HEAP.name()));
toClose.add(reader);
IndexCommit indexCommit = reader.getIndexCommit();
super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, snapshotStatus, repositoryMetaVersion,
userMetadata, ActionListener.runBefore(listener, () -> IOUtils.close(toClose)));
super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, shardStateIdentifier, snapshotStatus,
repositoryMetaVersion, userMetadata, ActionListener.runBefore(listener, () -> IOUtils.close(toClose)));
} catch (IOException e) {
try {
IOUtils.close(toClose);

View File

@ -103,7 +103,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing("-1");
final PlainActionFuture<String> future = PlainActionFuture.newFuture();
runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
snapshotRef.getIndexCommit(), indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future));
snapshotRef.getIndexCommit(), null, indexShardSnapshotStatus, Version.CURRENT,
Collections.emptyMap(), future));
IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, future::actionGet);
assertEquals(
"Can't snapshot _source only on an index that has incomplete source ie. has _source disabled or filters the source",
@ -129,7 +130,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
SnapshotId snapshotId = new SnapshotId("test", "test");
final PlainActionFuture<String> future = PlainActionFuture.newFuture();
runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
snapshotRef.getIndexCommit(), indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future));
snapshotRef.getIndexCommit(), null, indexShardSnapshotStatus, Version.CURRENT,
Collections.emptyMap(), future));
shardGeneration = future.actionGet();
IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());
@ -145,7 +147,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration);
final PlainActionFuture<String> future = PlainActionFuture.newFuture();
runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
snapshotRef.getIndexCommit(), indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future));
snapshotRef.getIndexCommit(), null, indexShardSnapshotStatus, Version.CURRENT,
Collections.emptyMap(), future));
shardGeneration = future.actionGet();
IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
// we processed the segments_N file plus _1.si, _1.fnm, _1.fdx, _1.fdt, _1.fdm
@ -161,7 +164,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration);
final PlainActionFuture<String> future = PlainActionFuture.newFuture();
runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
snapshotRef.getIndexCommit(), indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future));
snapshotRef.getIndexCommit(), null, indexShardSnapshotStatus, Version.CURRENT,
Collections.emptyMap(), future));
future.actionGet();
IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
// we processed the segments_N file plus _1_1.liv
@ -209,7 +213,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
final PlainActionFuture<String> future = PlainActionFuture.newFuture();
runAsSnapshot(shard.getThreadPool(), () -> {
repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(),
indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future);
null, indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future);
future.actionGet();
final PlainActionFuture<SnapshotInfo> finFuture = PlainActionFuture.newFuture();
repository.finalizeSnapshot(snapshotId,