Make Snapshot Logic Write Metadata after Segments (#45689) (#46764)

* Write metadata during snapshot finalization after segment files to prevent outdated metadata in case of dynamic mapping updates as explained in #41581
* Keep the old behavior of writing the metadata beforehand in the case of mixed version clusters for BwC reasons
   * Still overwrite the metadata in the end, so even a mixed version cluster is fixed by this change if a newer version master does the finalization
* Fixes #41581
This commit is contained in:
Armin Braun 2019-09-17 13:09:39 +02:00 committed by GitHub
parent e1cf103980
commit b0f09b279f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 480 additions and 251 deletions

View File

@ -81,9 +81,9 @@ public class FilterRepository implements Repository {
@Override @Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards, public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState, List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
Map<String, Object> userMetadata) { MetaData metaData, Map<String, Object> userMetadata) {
return in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId, return in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, userMetadata); includeGlobalState, metaData, userMetadata);
} }
@Override @Override

View File

@ -49,8 +49,6 @@ import java.util.function.Function;
* <p> * <p>
* To perform a snapshot: * To perform a snapshot:
* <ul> * <ul>
* <li>Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)}
* with list of indices that will be included into the snapshot</li>
* <li>Data nodes call {@link Repository#snapshotShard} * <li>Data nodes call {@link Repository#snapshotShard}
* for each shard</li> * for each shard</li>
* <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li> * <li>When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures</li>
@ -117,7 +115,11 @@ public interface Repository extends LifecycleComponent {
* @param snapshotId snapshot id * @param snapshotId snapshot id
* @param indices list of indices to be snapshotted * @param indices list of indices to be snapshotted
* @param metaData cluster metadata * @param metaData cluster metadata
*
* @deprecated this method is only used when taking snapshots in a mixed version cluster where a master node older than
* {@link org.elasticsearch.snapshots.SnapshotsService#NO_REPO_INITIALIZE_VERSION} is present.
*/ */
@Deprecated
void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData); void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData);
/** /**
@ -137,7 +139,7 @@ public interface Repository extends LifecycleComponent {
*/ */
SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards, SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState, List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
Map<String, Object> userMetadata); MetaData clusterMetaData, Map<String, Object> userMetadata);
/** /**
* Deletes snapshot * Deletes snapshot

View File

@ -86,7 +86,6 @@ import org.elasticsearch.repositories.RepositoryCleanupResult;
import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryVerificationException; import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.InvalidSnapshotNameException;
import org.elasticsearch.snapshots.SnapshotCreationException; import org.elasticsearch.snapshots.SnapshotCreationException;
import org.elasticsearch.snapshots.SnapshotException; import org.elasticsearch.snapshots.SnapshotException;
import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotId;
@ -182,7 +181,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private static final String TESTS_FILE = "tests-"; private static final String TESTS_FILE = "tests-";
private static final String METADATA_PREFIX = "meta-"; public static final String METADATA_PREFIX = "meta-";
public static final String METADATA_NAME_FORMAT = METADATA_PREFIX + "%s.dat"; public static final String METADATA_NAME_FORMAT = METADATA_PREFIX + "%s.dat";
@ -386,23 +385,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override @Override
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData clusterMetaData) { public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData clusterMetaData) {
if (isReadOnly()) {
throw new RepositoryException(metadata.name(), "cannot create snapshot in a readonly repository");
}
try { try {
final String snapshotName = snapshotId.getName();
// check if the snapshot name already exists in the repository
final RepositoryData repositoryData = getRepositoryData();
if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
throw new InvalidSnapshotNameException(metadata.name(), snapshotId.getName(), "snapshot with the same name already exists");
}
// Write Global MetaData // Write Global MetaData
globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID()); globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), true);
// write the index metadata for each index in the snapshot // write the index metadata for each index in the snapshot
for (IndexId index : indices) { for (IndexId index : indices) {
indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID()); indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), true);
} }
} catch (IOException ex) { } catch (IOException ex) {
throw new SnapshotCreationException(metadata.name(), snapshotId, ex); throw new SnapshotCreationException(metadata.name(), snapshotId, ex);
@ -667,14 +656,34 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
final List<SnapshotShardFailure> shardFailures, final List<SnapshotShardFailure> shardFailures,
final long repositoryStateId, final long repositoryStateId,
final boolean includeGlobalState, final boolean includeGlobalState,
final MetaData clusterMetaData,
final Map<String, Object> userMetadata) { final Map<String, Object> userMetadata) {
SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId, SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId,
indices.stream().map(IndexId::getName).collect(Collectors.toList()), indices.stream().map(IndexId::getName).collect(Collectors.toList()),
startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures, startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures,
includeGlobalState, userMetadata); includeGlobalState, userMetadata);
try {
// We ignore all FileAlreadyExistsException here since otherwise a master failover while in this method will
// mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the
// index or global metadata will be compatible with the segments written in this snapshot as well.
// Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way that
// decrements the generation it points at
// Write Global MetaData
globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), false);
// write the index metadata for each index in the snapshot
for (IndexId index : indices) {
indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false);
}
} catch (IOException ex) {
throw new SnapshotException(metadata.name(), snapshotId, "failed to write metadata for snapshot", ex);
}
try { try {
final RepositoryData updatedRepositoryData = getRepositoryData().addSnapshot(snapshotId, blobStoreSnapshot.state(), indices); final RepositoryData updatedRepositoryData = getRepositoryData().addSnapshot(snapshotId, blobStoreSnapshot.state(), indices);
snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID()); snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID(), false);
writeIndexGen(updatedRepositoryData, repositoryStateId); writeIndexGen(updatedRepositoryData, repositoryStateId);
} catch (FileAlreadyExistsException ex) { } catch (FileAlreadyExistsException ex) {
// if another master was elected and took over finalizing the snapshot, it is possible // if another master was elected and took over finalizing the snapshot, it is possible
@ -1052,7 +1061,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
try { try {
indexShardSnapshotFormat.write(snapshot, shardContainer, snapshotId.getUUID()); indexShardSnapshotFormat.write(snapshot, shardContainer, snapshotId.getUUID(), false);
} catch (IOException e) { } catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
} }

View File

@ -178,12 +178,13 @@ public final class ChecksumBlobStoreFormat<T extends ToXContent> {
* @param obj object to be serialized * @param obj object to be serialized
* @param blobContainer blob container * @param blobContainer blob container
* @param name blob name * @param name blob name
* @param failIfAlreadyExists Whether to fail if the blob already exists
*/ */
public void write(T obj, BlobContainer blobContainer, String name) throws IOException { public void write(T obj, BlobContainer blobContainer, String name, boolean failIfAlreadyExists) throws IOException {
final String blobName = blobName(name); final String blobName = blobName(name);
writeTo(obj, blobName, bytesArray -> { writeTo(obj, blobName, bytesArray -> {
try (InputStream stream = bytesArray.streamInput()) { try (InputStream stream = bytesArray.streamInput()) {
blobContainer.writeBlob(blobName, stream, bytesArray.length(), true); blobContainer.writeBlob(blobName, stream, bytesArray.length(), failIfAlreadyExists);
} }
}); });
} }

View File

@ -0,0 +1,213 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* <p>This package exposes the blobstore repository used by Elasticsearch Snapshots.</p>
*
* <h2>Preliminaries</h2>
*
* <p>The {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository} forms the basis of implementations of
* {@link org.elasticsearch.repositories.Repository} on top of a blob store. A blobstore can be used as the basis for an implementation
* as long as it provides for GET, PUT, DELETE, and LIST operations. For a read-only repository, it suffices if the blobstore provides only
* GET operations.
* These operations are formally defined as specified by the {@link org.elasticsearch.common.blobstore.BlobContainer} interface that
* any {@code BlobStoreRepository} implementation must provide via its implementation of
* {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository#getBlobContainer()}.</p>
*
* <p>The blob store is written to and read from by master-eligible nodes and data nodes. All metadata related to a snapshot's
* scope and health is written by the master node.</p>
* <p>The data-nodes on the other hand, write the data for each individual shard but do not write any blobs outside of shard directories for
* shards that they hold the primary of. For each shard, the data-node holding the shard's primary writes the actual data in form of
* the shard's segment files to the repository as well as metadata about all the segment files that the repository stores for the shard.</p>
*
* <p>For the specifics on how the operations on the repository documented below are invoked during the snapshot process please refer to
* the documentation of the {@link org.elasticsearch.snapshots} package.</p>
*
* <p>{@code BlobStoreRepository} maintains the following structure of blobs containing data and metadata in the blob store. The exact
* operations executed on these blobs are explained below.</p>
* <pre>
* {@code
* STORE_ROOT
* |- index-N - JSON serialized {@link org.elasticsearch.repositories.RepositoryData} containing a list of all snapshot ids
* | and the indices belonging to each snapshot, N is the generation of the file
* |- index.latest - contains the numeric value of the latest generation of the index file (i.e. N from above)
* |- incompatible-snapshots - list of all snapshot ids that are no longer compatible with the current version of the cluster
* |- snap-20131010.dat - SMILE serialized {@link org.elasticsearch.snapshots.SnapshotInfo} for snapshot "20131010"
* |- meta-20131010.dat - SMILE serialized {@link org.elasticsearch.cluster.metadata.MetaData} for snapshot "20131010"
* | (includes only global metadata)
* |- snap-20131011.dat - SMILE serialized {@link org.elasticsearch.snapshots.SnapshotInfo} for snapshot "20131011"
* |- meta-20131011.dat - SMILE serialized {@link org.elasticsearch.cluster.metadata.MetaData} for snapshot "20131011"
* .....
* |- indices/ - data for all indices
* |- Ac1342-B_x/ - data for index "foo" which was assigned the unique id Ac1342-B_x (not to be confused with the actual index uuid)
* | | in the repository
* | |- meta-20131010.dat - JSON Serialized {@link org.elasticsearch.cluster.metadata.IndexMetaData} for index "foo"
* | |- 0/ - data for shard "0" of index "foo"
* | | |- __1 \ (files with numeric names were created by older ES versions)
* | | |- __2 |
* | | |- __VPO5oDMVT5y4Akv8T_AO_A |- files from different segments see snap-* for their mappings to real segment files
* | | |- __1gbJy18wS_2kv1qI7FgKuQ |
* | | |- __R8JvZAHlSMyMXyZc2SS8Zg /
* | | .....
* | | |- snap-20131010.dat - SMILE serialized {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot} for
* | | | snapshot "20131010"
* | | |- snap-20131011.dat - SMILE serialized {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot} for
* | | | snapshot "20131011"
* | | |- index-123 - SMILE serialized {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} for
* | | | the shard
* | |
* | |- 1/ - data for shard "1" of index "foo"
* | | |- __1
* | | .....
* | |
* | |-2/
* | ......
* |
* |- 1xB0D8_B3y/ - data for index "bar" which was assigned the unique id of 1xB0D8_B3y in the repository
* ......
* }
* </pre>
*
* <h2>Getting the Repository's RepositoryData</h2>
*
* <p>Loading the {@link org.elasticsearch.repositories.RepositoryData} that holds the list of all snapshots as well as the mapping of
* indices' names to their repository {@link org.elasticsearch.repositories.IndexId} is done by invoking
* {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository#getRepositoryData} and implemented as follows:</p>
* <ol>
* <li>
* <ol>
* <li>The blobstore repository stores the {@code RepositoryData} in blobs named with incrementing suffix {@code N} at {@code /index-N}
* directly under the repository's root.</li>
* <li>The blobstore also stores the most recent {@code N} as a 64bit long in the blob {@code /index.latest} directly under the
* repository's root.</li>
* </ol>
* </li>
* <li>
* <ol>
* <li>First, find the most recent {@code RepositoryData} by getting a list of all index-N blobs through listing all blobs with prefix
* "index-" under the repository root and then selecting the one with the highest value for N.</li>
* <li>If this operation fails because the repository's {@code BlobContainer} does not support list operations (in the case of read-only
* repositories), read the highest value of N from the index.latest blob.</li>
* </ol>
* </li>
* <li>
* <ol>
* <li>Use the just determined value of {@code N} and get the {@code /index-N} blob and deserialize the {@code RepositoryData} from it.</li>
* <li>If no value of {@code N} could be found since neither an {@code index.latest} nor any {@code index-N} blobs exist in the repository,
* it is assumed to be empty and {@link org.elasticsearch.repositories.RepositoryData#EMPTY} is returned.</li>
* </ol>
* </li>
* </ol>
* <h2>Creating a Snapshot</h2>
*
* <p>Creating a snapshot in the repository happens in the three steps described in detail below.</p>
*
* <h3>Initializing a Snapshot in the Repository (Mixed Version Clusters only)</h3>
*
* <p>In mixed version clusters that contain a node older than
* {@link org.elasticsearch.snapshots.SnapshotsService#NO_REPO_INITIALIZE_VERSION}, creating a snapshot in the repository starts with a
* call to {@link org.elasticsearch.repositories.Repository#initializeSnapshot} which the blob store repository implements via the
* following actions:</p>
* <ol>
* <li>Verify that no snapshot by the requested name exists.</li>
* <li>Write a blob containing the cluster metadata to the root of the blob store repository at {@code /meta-${snapshot-uuid}.dat}</li>
* <li>Write the metadata for each index to a blob in that index's directory at
* {@code /indices/${index-snapshot-uuid}/meta-${snapshot-uuid}.dat}</li>
* </ol>
* TODO: Remove this section once BwC logic it references is removed
*
* <h3>Writing Shard Data (Segments)</h3>
*
* <p>Once all the metadata has been written by the snapshot initialization, the snapshot process moves on to writing the actual shard data
* to the repository by invoking {@link org.elasticsearch.repositories.Repository#snapshotShard} on the data-nodes that hold the primaries
* for the shards in the current snapshot. It is implemented as follows:</p>
*
* <p>Note:</p>
* <ul>
* <li>For each shard {@code i} in a given index, its path in the blob store is located at {@code /indices/${index-snapshot-uuid}/${i}}</li>
* <li>All the following steps are executed exclusively on the shard's primary's data node.</li>
* </ul>
*
* <ol>
* <li>Create the {@link org.apache.lucene.index.IndexCommit} for the shard to snapshot.</li>
* <li>List all blobs in the shard's path. Find the {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} blob
* with name {@code index-${N}} for the highest possible value of {@code N} in the list to get the information of what segment files are
* already available in the blobstore.</li>
* <li>By comparing the files in the {@code IndexCommit} and the available file list from the previous step, determine the segment files
* that need to be written to the blob store. For each segment that needs to be added to the blob store, generate a unique name by combining
* the segment data blob prefix {@code __} and a UUID and write the segment to the blobstore.</li>
* <li>After completing all segment writes, a blob containing a
* {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot} with name {@code snap-${snapshot-uuid}.dat} is written to
* the shard's path and contains a list of all the files referenced by the snapshot as well as some metadata about the snapshot. See the
* documentation of {@code BlobStoreIndexShardSnapshot} for details on its contents.</li>
* <li>Once all the segments and the {@code BlobStoreIndexShardSnapshot} blob have been written, an updated
* {@code BlobStoreIndexShardSnapshots} blob is written to the shard's path with name {@code index-${N+1}}.</li>
* </ol>
*
* <h3>Finalizing the Snapshot</h3>
*
* <p>After all primaries have finished writing the necessary segment files to the blob store in the previous step, the master node moves on
* to finalizing the snapshot by invoking {@link org.elasticsearch.repositories.Repository#finalizeSnapshot}. This method executes the
* following actions in order:</p>
* <ol>
* <li>Write a blob containing the cluster metadata to the root of the blob store repository at {@code /meta-${snapshot-uuid}.dat}</li>
* <li>Write the metadata for each index to a blob in that index's directory at
* {@code /indices/${index-snapshot-uuid}/meta-${snapshot-uuid}.dat}</li>
* <li>Write the {@link org.elasticsearch.snapshots.SnapshotInfo} blob for the given snapshot to the key {@code /snap-${snapshot-uuid}.dat}
* directly under the repository root.</li>
* <li>Write an updated {@code RepositoryData} blob to the key {@code /index-${N+1}} using the {@code N} determined when initializing the
* snapshot in the first step. When doing this, the implementation checks that the blob for generation {@code N + 1} has not yet been
* written to prevent concurrent updates to the repository. If the blob for {@code N + 1} already exists the execution of finalization
* stops under the assumption that a master failover occurred and the snapshot has already been finalized by the new master.</li>
* <li>Write the updated {@code /index.latest} blob containing the new repository generation {@code N + 1}.</li>
* </ol>
*
* <h2>Deleting a Snapshot</h2>
*
* <p>Deleting a snapshot is an operation that is exclusively executed on the master node that runs through the following sequence of
* action when {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository#deleteSnapshot} is invoked:</p>
*
* <ol>
* <li>Get the current {@code RepositoryData} from the latest {@code index-N} blob at the repository root.</li>
* <li>Write an updated {@code RepositoryData} blob with the deleted snapshot removed to key {@code /index-${N+1}} directly under the
* repository root.</li>
* <li>Write an updated {@code index.latest} blob containing {@code N + 1}.</li>
* <li>Delete the global {@code MetaData} blob {@code meta-${snapshot-uuid}.dat} stored directly under the repository root for the snapshot
* as well as the {@code SnapshotInfo} blob at {@code /snap-${snapshot-uuid}.dat}.</li>
* <li>For each index referenced by the snapshot:
* <ol>
* <li>Delete the snapshot's {@code IndexMetaData} at {@code /indices/${index-snapshot-uuid}/meta-${snapshot-uuid}}.</li>
* <li>Go through all shard directories {@code /indices/${index-snapshot-uuid}/${i}} and:
* <ol>
* <li>Remove the {@code BlobStoreIndexShardSnapshot} blob at {@code /indices/${index-snapshot-uuid}/${i}/snap-${snapshot-uuid}.dat}.</li>
* <li>List all blobs in the shard path {@code /indices/${index-snapshot-uuid}} and build a new {@code BlobStoreIndexShardSnapshots} from
* the remaining {@code BlobStoreIndexShardSnapshot} blobs in the shard. Afterwards, write it to the next shard generation blob at
* {@code /indices/${index-snapshot-uuid}/${i}/index-${N+1}} (The shard's generation is determined from the list of {@code index-N} blobs
* in the shard directory).</li>
* <li>Delete all segment blobs (identified by having the data blob prefix {@code __}) in the shard directory which are not referenced by
* the new {@code BlobStoreIndexShardSnapshots} that has been written in the previous step.</li>
* </ol>
* </li>
* </ol>
* </li>
* </ol>
* TODO: The above sequence of actions can lead to leaking files when an index completely goes out of scope. Adjust this documentation once
* https://github.com/elastic/elasticsearch/issues/13159 is fixed.
*/
package org.elasticsearch.repositories.blobstore;

View File

@ -26,6 +26,7 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.util.CollectionUtil; import org.apache.lucene.util.CollectionUtil;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
@ -68,6 +69,7 @@ import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -108,12 +110,19 @@ import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
* the {@link SnapshotShardsService#sendSnapshotShardUpdate(Snapshot, ShardId, ShardSnapshotStatus)} method</li> * the {@link SnapshotShardsService#sendSnapshotShardUpdate(Snapshot, ShardId, ShardSnapshotStatus)} method</li>
* <li>When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot * <li>When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot
* as completed</li> * as completed</li>
* <li>After cluster state is updated, the {@link #endSnapshot(SnapshotsInProgress.Entry)} finalizes snapshot in the repository, * <li>After cluster state is updated, the {@link #endSnapshot(SnapshotsInProgress.Entry, MetaData)} finalizes snapshot in the repository,
* notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls * notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls
* {@link #removeSnapshotFromClusterState(Snapshot, SnapshotInfo, Exception)} to remove snapshot from cluster state</li> * {@link #removeSnapshotFromClusterState(Snapshot, SnapshotInfo, Exception)} to remove snapshot from cluster state</li>
* </ul> * </ul>
*/ */
public class SnapshotsService extends AbstractLifecycleComponent implements ClusterStateApplier { public class SnapshotsService extends AbstractLifecycleComponent implements ClusterStateApplier {
/**
* Minimum node version which does not use {@link Repository#initializeSnapshot(SnapshotId, List, MetaData)} to write snapshot metadata
* when starting a snapshot.
*/
public static final Version NO_REPO_INITIALIZE_VERSION = Version.V_7_5_0;
private static final Logger logger = LogManager.getLogger(SnapshotsService.class); private static final Logger logger = LogManager.getLogger(SnapshotsService.class);
private final ClusterService clusterService; private final ClusterService clusterService;
@ -398,24 +407,29 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
assert initializingSnapshots.contains(snapshot.snapshot()); assert initializingSnapshots.contains(snapshot.snapshot());
Repository repository = repositoriesService.repository(snapshot.snapshot().getRepository()); Repository repository = repositoriesService.repository(snapshot.snapshot().getRepository());
MetaData metaData = clusterState.metaData(); if (repository.isReadOnly()) {
if (!snapshot.includeGlobalState()) { throw new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository");
// Remove global state from the cluster state
MetaData.Builder builder = MetaData.builder();
for (IndexId index : snapshot.indices()) {
builder.put(metaData.index(index.getName()), false);
} }
metaData = builder.build(); final String snapshotName = snapshot.snapshot().getSnapshotId().getName();
// check if the snapshot name already exists in the repository
if (repository.getRepositoryData().getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) {
throw new InvalidSnapshotNameException(
repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists");
}
if (clusterState.nodes().getMinNodeVersion().onOrAfter(NO_REPO_INITIALIZE_VERSION) == false) {
// In mixed version clusters we initialize the snapshot in the repository so that in case of a master failover to an
// older version master node snapshot finalization (that assumes initializeSnapshot was called) produces a valid
// snapshot.
repository.initializeSnapshot(
snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaDataForSnapshot(snapshot, clusterState.metaData()));
} }
repository.initializeSnapshot(snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaData);
snapshotCreated = true; snapshotCreated = true;
logger.info("snapshot [{}] started", snapshot.snapshot()); logger.info("snapshot [{}] started", snapshot.snapshot());
if (snapshot.indices().isEmpty()) { if (snapshot.indices().isEmpty()) {
// No indices in this snapshot - we are done // No indices in this snapshot - we are done
userCreateSnapshotListener.onResponse(snapshot.snapshot()); userCreateSnapshotListener.onResponse(snapshot.snapshot());
endSnapshot(snapshot); endSnapshot(snapshot, clusterState.metaData());
return; return;
} }
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
@ -498,7 +512,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
assert snapshotsInProgress != null; assert snapshotsInProgress != null;
final SnapshotsInProgress.Entry entry = snapshotsInProgress.snapshot(snapshot.snapshot()); final SnapshotsInProgress.Entry entry = snapshotsInProgress.snapshot(snapshot.snapshot());
assert entry != null; assert entry != null;
endSnapshot(entry); endSnapshot(entry, newState.metaData());
} }
} }
}); });
@ -557,6 +571,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
Collections.emptyList(), Collections.emptyList(),
snapshot.getRepositoryStateId(), snapshot.getRepositoryStateId(),
snapshot.includeGlobalState(), snapshot.includeGlobalState(),
metaDataForSnapshot(snapshot, clusterService.state().metaData()),
snapshot.userMetadata()); snapshot.userMetadata());
} catch (Exception inner) { } catch (Exception inner) {
inner.addSuppressed(exception); inner.addSuppressed(exception);
@ -569,6 +584,18 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
} }
} }
private static MetaData metaDataForSnapshot(SnapshotsInProgress.Entry snapshot, MetaData metaData) {
if (snapshot.includeGlobalState() == false) {
// Remove global state from the cluster state
MetaData.Builder builder = MetaData.builder();
for (IndexId index : snapshot.indices()) {
builder.put(metaData.index(index.getName()), false);
}
metaData = builder.build();
}
return metaData;
}
private static SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) { private static SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) {
return new SnapshotInfo(entry.snapshot().getSnapshotId(), return new SnapshotInfo(entry.snapshot().getSnapshotId(),
entry.indices().stream().map(IndexId::getName).collect(Collectors.toList()), entry.indices().stream().map(IndexId::getName).collect(Collectors.toList()),
@ -714,7 +741,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
entry -> entry.state().completed() entry -> entry.state().completed()
|| initializingSnapshots.contains(entry.snapshot()) == false || initializingSnapshots.contains(entry.snapshot()) == false
&& (entry.state() == State.INIT || completed(entry.shards().values())) && (entry.state() == State.INIT || completed(entry.shards().values()))
).forEach(this::endSnapshot); ).forEach(entry -> endSnapshot(entry, event.state().metaData()));
} }
if (newMaster) { if (newMaster) {
finalizeSnapshotDeletionFromPreviousMaster(event); finalizeSnapshotDeletionFromPreviousMaster(event);
@ -961,7 +988,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
* *
* @param entry snapshot * @param entry snapshot
*/ */
private void endSnapshot(final SnapshotsInProgress.Entry entry) { private void endSnapshot(SnapshotsInProgress.Entry entry, MetaData metaData) {
if (endingSnapshots.add(entry.snapshot()) == false) { if (endingSnapshots.add(entry.snapshot()) == false) {
return; return;
} }
@ -989,6 +1016,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
unmodifiableList(shardFailures), unmodifiableList(shardFailures),
entry.getRepositoryStateId(), entry.getRepositoryStateId(),
entry.includeGlobalState(), entry.includeGlobalState(),
metaDataForSnapshot(entry, metaData),
entry.userMetadata()); entry.userMetadata());
removeSnapshotFromClusterState(snapshot, snapshotInfo, null); removeSnapshotFromClusterState(snapshot, snapshotInfo, null);
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());

View File

@ -161,7 +161,7 @@ public class RepositoriesServiceTests extends ESTestCase {
@Override @Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId, int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, Map<String, Object> userMetadata) { boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata) {
return null; return null;
} }

View File

@ -116,8 +116,8 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase {
xContentRegistry(), true); xContentRegistry(), true);
// Write blobs in different formats // Write blobs in different formats
checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile"); checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile", true);
checksumSMILECompressed.write(new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp"); checksumSMILECompressed.write(new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp", true);
// Assert that all checksum blobs can be read by all formats // Assert that all checksum blobs can be read by all formats
assertEquals(checksumSMILE.read(blobContainer, "check-smile").getText(), "checksum smile"); assertEquals(checksumSMILE.read(blobContainer, "check-smile").getText(), "checksum smile");
@ -136,8 +136,8 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase {
ChecksumBlobStoreFormat<BlobObj> checksumFormatComp = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent, ChecksumBlobStoreFormat<BlobObj> checksumFormatComp = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent,
xContentRegistry(), true); xContentRegistry(), true);
BlobObj blobObj = new BlobObj(veryRedundantText.toString()); BlobObj blobObj = new BlobObj(veryRedundantText.toString());
checksumFormatComp.write(blobObj, blobContainer, "blob-comp"); checksumFormatComp.write(blobObj, blobContainer, "blob-comp", true);
checksumFormat.write(blobObj, blobContainer, "blob-not-comp"); checksumFormat.write(blobObj, blobContainer, "blob-not-comp", true);
Map<String, BlobMetaData> blobs = blobContainer.listBlobsByPrefix("blob-"); Map<String, BlobMetaData> blobs = blobContainer.listBlobsByPrefix("blob-");
assertEquals(blobs.size(), 2); assertEquals(blobs.size(), 2);
assertThat(blobs.get("blob-not-comp").length(), greaterThan(blobs.get("blob-comp").length())); assertThat(blobs.get("blob-not-comp").length(), greaterThan(blobs.get("blob-comp").length()));
@ -150,7 +150,7 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase {
BlobObj blobObj = new BlobObj(testString); BlobObj blobObj = new BlobObj(testString);
ChecksumBlobStoreFormat<BlobObj> checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent, ChecksumBlobStoreFormat<BlobObj> checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent,
xContentRegistry(), randomBoolean()); xContentRegistry(), randomBoolean());
checksumFormat.write(blobObj, blobContainer, "test-path"); checksumFormat.write(blobObj, blobContainer, "test-path", true);
assertEquals(checksumFormat.read(blobContainer, "test-path").getText(), testString); assertEquals(checksumFormat.read(blobContainer, "test-path").getText(), testString);
randomCorruption(blobContainer, "test-path"); randomCorruption(blobContainer, "test-path");
try { try {

View File

@ -48,7 +48,6 @@ import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress;
@ -111,7 +110,6 @@ import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -2434,28 +2432,15 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
Client client = client(); Client client = client();
boolean allowPartial = randomBoolean();
logger.info("--> creating repository"); logger.info("--> creating repository");
// only block on repo init if we have partial snapshot or we run into deadlock when acquiring shard locks for index deletion/closing
boolean initBlocking = allowPartial || randomBoolean();
if (initBlocking) {
assertAcked(client.admin().cluster().preparePutRepository("test-repo") assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(Settings.builder() .setType("mock").setSettings(Settings.builder()
.put("location", randomRepoPath()) .put("location", randomRepoPath())
.put("compress", randomBoolean()) .put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES) .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put("block_on_init", true) .put("block_on_data", true)));
));
} else {
assertAcked(client.admin().cluster().preparePutRepository("test-repo")
.setType("mock").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("compress", randomBoolean())
.put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES)
.put("block_on_data", true)
));
}
createIndex("test-idx-1", "test-idx-2", "test-idx-3"); createIndex("test-idx-1", "test-idx-2", "test-idx-3");
ensureGreen(); ensureGreen();
@ -2471,28 +2456,13 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(client.prepareSearch("test-idx-2").setSize(0).get().getHits().getTotalHits().value, equalTo(100L)); assertThat(client.prepareSearch("test-idx-2").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
assertThat(client.prepareSearch("test-idx-3").setSize(0).get().getHits().getTotalHits().value, equalTo(100L)); assertThat(client.prepareSearch("test-idx-3").setSize(0).get().getHits().getTotalHits().value, equalTo(100L));
logger.info("--> snapshot allow partial {}", allowPartial); logger.info("--> snapshot");
ActionFuture<CreateSnapshotResponse> future = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") ActionFuture<CreateSnapshotResponse> future = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
.setIndices("test-idx-*").setWaitForCompletion(true).setPartial(allowPartial).execute(); .setIndices("test-idx-*").setWaitForCompletion(true).setPartial(false).execute();
logger.info("--> wait for block to kick in"); logger.info("--> wait for block to kick in");
if (initBlocking) {
waitForBlock(internalCluster().getMasterName(), "test-repo", TimeValue.timeValueMinutes(1));
} else {
waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1)); waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1));
}
boolean closedOnPartial = false;
try { try {
if (allowPartial) {
// partial snapshots allow close / delete operations
if (randomBoolean()) {
logger.info("--> delete index while partial snapshot is running");
client.admin().indices().prepareDelete("test-idx-1").get();
} else {
logger.info("--> close index while partial snapshot is running");
closedOnPartial = true;
client.admin().indices().prepareClose("test-idx-1").setWaitForActiveShards(ActiveShardCount.DEFAULT).get();
}
} else {
// non-partial snapshots do not allow close / delete operations on indices where snapshot has not been completed // non-partial snapshots do not allow close / delete operations on indices where snapshot has not been completed
if (randomBoolean()) { if (randomBoolean()) {
try { try {
@ -2511,31 +2481,16 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(e.getMessage(), containsString("Cannot close indices that are being snapshotted: [[test-idx-1/")); assertThat(e.getMessage(), containsString("Cannot close indices that are being snapshotted: [[test-idx-1/"));
} }
} }
}
} finally { } finally {
if (initBlocking) {
logger.info("--> unblock running master node");
unblockNode("test-repo", internalCluster().getMasterName());
} else {
logger.info("--> unblock all data nodes"); logger.info("--> unblock all data nodes");
unblockAllDataNodes("test-repo"); unblockAllDataNodes("test-repo");
} }
}
logger.info("--> waiting for snapshot to finish"); logger.info("--> waiting for snapshot to finish");
CreateSnapshotResponse createSnapshotResponse = future.get(); CreateSnapshotResponse createSnapshotResponse = future.get();
if (allowPartial && closedOnPartial == false) {
logger.info("Deleted/Closed index during snapshot, but allow partial");
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.PARTIAL)));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().failedShards(), greaterThan(0));
assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(),
lessThan(createSnapshotResponse.getSnapshotInfo().totalShards()));
} else {
logger.info("Snapshot successfully completed"); logger.info("Snapshot successfully completed");
assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.SUCCESS))); assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.SUCCESS)));
} }
}
public void testCloseIndexDuringRestore() throws Exception { public void testCloseIndexDuringRestore() throws Exception {
Client client = client(); Client client = client();
@ -3385,7 +3340,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(shardFailure.reason(), containsString("Random IOException")); assertThat(shardFailure.reason(), containsString("Random IOException"));
} }
} }
} catch (SnapshotCreationException | RepositoryException ex) { } catch (SnapshotException | RepositoryException ex) {
// sometimes, the snapshot will fail with a top level I/O exception // sometimes, the snapshot will fail with a top level I/O exception
assertThat(ExceptionsHelper.stackTrace(ex), containsString("Random IOException")); assertThat(ExceptionsHelper.stackTrace(ex), containsString("Random IOException"));
} }
@ -3748,75 +3703,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
assertThat(client.prepareGet(restoredIndexName2, typeName, sameSourceIndex ? docId : docId2).get().isExists(), equalTo(true)); assertThat(client.prepareGet(restoredIndexName2, typeName, sameSourceIndex ? docId : docId2).get().isExists(), equalTo(true));
} }
public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception {
final Client client = client();
// Blocks on initialization
assertAcked(client.admin().cluster().preparePutRepository("repository")
.setType("mock").setSettings(Settings.builder()
.put("location", randomRepoPath())
.put("block_on_init", true)
));
createIndex("test-idx");
final int nbDocs = scaledRandomIntBetween(100, 500);
for (int i = 0; i < nbDocs; i++) {
index("test-idx", "_doc", Integer.toString(i), "foo", "bar" + i);
}
flushAndRefresh("test-idx");
assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo((long) nbDocs));
// Create a snapshot
client.admin().cluster().prepareCreateSnapshot("repository", "snap").execute();
waitForBlock(internalCluster().getMasterName(), "repository", TimeValue.timeValueMinutes(1));
boolean blocked = true;
// Snapshot is initializing (and is blocked at this stage)
SnapshotsStatusResponse snapshotsStatus = client.admin().cluster().prepareSnapshotStatus("repository").setSnapshots("snap").get();
assertThat(snapshotsStatus.getSnapshots().iterator().next().getState(), equalTo(State.INIT));
final List<State> states = new CopyOnWriteArrayList<>();
final ClusterStateListener listener = event -> {
SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE);
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
if ("snap".equals(entry.snapshot().getSnapshotId().getName())) {
states.add(entry.state());
}
}
};
try {
// Record the upcoming states of the snapshot on all nodes
internalCluster().getInstances(ClusterService.class).forEach(clusterService -> clusterService.addListener(listener));
// Delete the snapshot while it is being initialized
ActionFuture<AcknowledgedResponse> delete = client.admin().cluster().prepareDeleteSnapshot("repository", "snap").execute();
// The deletion must set the snapshot in the ABORTED state
assertBusy(() -> {
SnapshotsStatusResponse status =
client.admin().cluster().prepareSnapshotStatus("repository").setSnapshots("snap").get();
assertThat(status.getSnapshots().iterator().next().getState(), equalTo(State.ABORTED));
});
// Now unblock the repository
unblockNode("repository", internalCluster().getMasterName());
blocked = false;
assertAcked(delete.get());
expectThrows(SnapshotMissingException.class, () ->
client.admin().cluster().prepareGetSnapshots("repository").setSnapshots("snap").get());
assertFalse("Expecting snapshot state to be updated", states.isEmpty());
assertFalse("Expecting snapshot to be aborted and not started at all", states.contains(State.STARTED));
} finally {
internalCluster().getInstances(ClusterService.class).forEach(clusterService -> clusterService.removeListener(listener));
if (blocked) {
unblockNode("repository", internalCluster().getMasterName());
}
}
}
public void testRestoreIncreasesPrimaryTerms() { public void testRestoreIncreasesPrimaryTerms() {
final String indexName = randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT); final String indexName = randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT);
createIndex(indexName, Settings.builder() createIndex(indexName, Settings.builder()

View File

@ -196,12 +196,14 @@ import java.util.stream.Stream;
import static java.util.Collections.emptyMap; import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet; import static java.util.Collections.emptySet;
import static org.elasticsearch.action.support.ActionTestUtils.assertNoFailureListener;
import static org.elasticsearch.env.Environment.PATH_HOME_SETTING; import static org.elasticsearch.env.Environment.PATH_HOME_SETTING;
import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING;
import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
public class SnapshotResiliencyTests extends ESTestCase { public class SnapshotResiliencyTests extends ESTestCase {
@ -493,6 +495,85 @@ public class SnapshotResiliencyTests extends ESTestCase {
assertThat(snapshotIds, either(hasSize(1)).or(hasSize(0))); assertThat(snapshotIds, either(hasSize(1)).or(hasSize(0)));
} }
public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));
String repoName = "repo";
String snapshotName = "snapshot";
final String index = "test";
final int shards = randomIntBetween(1, 10);
final int documents = randomIntBetween(2, 100);
TestClusterNode masterNode =
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> {
final AtomicBoolean initiatedSnapshot = new AtomicBoolean(false);
for (int i = 0; i < documents; ++i) {
// Index a few documents with different field names so we trigger a dynamic mapping update for each of them
masterNode.client.bulk(
new BulkRequest().add(new IndexRequest(index).source(Collections.singletonMap("foo" + i, "bar")))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
assertNoFailureListener(
bulkResponse -> {
assertFalse("Failures in bulkresponse: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures());
if (initiatedSnapshot.compareAndSet(false, true)) {
masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).execute(createSnapshotResponseStepListener);
}
}));
}
});
final String restoredIndex = "restored";
final StepListener<RestoreSnapshotResponse> restoreSnapshotResponseStepListener = new StepListener<>();
continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> masterNode.client.admin().cluster().restoreSnapshot(
new RestoreSnapshotRequest(repoName, snapshotName)
.renamePattern(index).renameReplacement(restoredIndex).waitForCompletion(true), restoreSnapshotResponseStepListener));
final StepListener<SearchResponse> searchResponseStepListener = new StepListener<>();
continueOrDie(restoreSnapshotResponseStepListener, restoreSnapshotResponse -> {
assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
masterNode.client.search(
new SearchRequest(restoredIndex).source(new SearchSourceBuilder().size(documents).trackTotalHits(true)),
searchResponseStepListener);
});
final AtomicBoolean documentCountVerified = new AtomicBoolean();
continueOrDie(searchResponseStepListener, r -> {
final long hitCount = r.getHits().getTotalHits().value;
assertThat(
"Documents were restored but the restored index mapping was older than some documents and misses some of their fields",
(int) hitCount,
lessThanOrEqualTo(((Map<?, ?>) masterNode.clusterService.state().metaData().index(restoredIndex).mapping()
.sourceAsMap().get("properties")).size())
);
documentCountVerified.set(true);
});
runUntil(documentCountVerified::get, TimeUnit.MINUTES.toMillis(5L));
assertNotNull(createSnapshotResponseStepListener.result());
assertNotNull(restoreSnapshotResponseStepListener.result());
SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
final Repository repository = masterNode.repositoriesService.repository(repoName);
Collection<SnapshotId> snapshotIds = repository.getRepositoryData().getSnapshotIds();
assertThat(snapshotIds, hasSize(1));
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next());
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
assertThat(snapshotInfo.indices(), containsInAnyOrder(index));
assertEquals(shards, snapshotInfo.successfulShards());
assertEquals(0, snapshotInfo.failedShards());
}
private StepListener<CreateIndexResponse> createRepoAndIndex(TestClusterNode masterNode, String repoName, String index, int shards) { private StepListener<CreateIndexResponse> createRepoAndIndex(TestClusterNode masterNode, String repoName, String index, int shards) {
final AdminClient adminClient = masterNode.client.admin(); final AdminClient adminClient = masterNode.client.admin();

View File

@ -291,9 +291,11 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository {
// We do some checks in case there is a consistent state for a blob to prevent turning it inconsistent. // We do some checks in case there is a consistent state for a blob to prevent turning it inconsistent.
final boolean hasConsistentContent = final boolean hasConsistentContent =
relevantActions.size() == 1 && relevantActions.get(0).operation == Operation.PUT; relevantActions.size() == 1 && relevantActions.get(0).operation == Operation.PUT;
if (BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)) { if (BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)
|| blobName.startsWith(BlobStoreRepository.METADATA_PREFIX)) {
// TODO: Ensure that it is impossible to ever decrement the generation id stored in index.latest then assert that // TODO: Ensure that it is impossible to ever decrement the generation id stored in index.latest then assert that
// it never decrements here // it never decrements here. Same goes for the metadata, ensure that we never overwrite newer with older
// metadata.
} else if (blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX)) { } else if (blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX)) {
if (hasConsistentContent) { if (hasConsistentContent) {
if (basePath().buildAsString().equals(path().buildAsString())) { if (basePath().buildAsString().equals(path().buildAsString())) {

View File

@ -18,6 +18,7 @@
*/ */
package org.elasticsearch.snapshots.mockstore; package org.elasticsearch.snapshots.mockstore;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobContainer;
@ -158,19 +159,19 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
// We create a snap- blob for snapshot "foo" in the first generation // We create a snap- blob for snapshot "foo" in the first generation
final SnapshotId snapshotId = new SnapshotId("foo", UUIDs.randomBase64UUID()); final SnapshotId snapshotId = new SnapshotId("foo", UUIDs.randomBase64UUID());
repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(), repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(),
-1L, false, Collections.emptyMap()); -1L, false, MetaData.EMPTY_META_DATA, Collections.emptyMap());
// We try to write another snap- blob for "foo" in the next generation. It fails because the content differs. // We try to write another snap- blob for "foo" in the next generation. It fails because the content differs.
final AssertionError assertionError = expectThrows(AssertionError.class, final AssertionError assertionError = expectThrows(AssertionError.class,
() -> repository.finalizeSnapshot( () -> repository.finalizeSnapshot(
snapshotId, Collections.emptyList(), 1L, null, 6, Collections.emptyList(), snapshotId, Collections.emptyList(), 1L, null, 6, Collections.emptyList(),
0, false, Collections.emptyMap())); 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap()));
assertThat(assertionError.getMessage(), equalTo("\nExpected: <6>\n but: was <5>")); assertThat(assertionError.getMessage(), equalTo("\nExpected: <6>\n but: was <5>"));
// We try to write yet another snap- blob for "foo" in the next generation. // We try to write yet another snap- blob for "foo" in the next generation.
// It passes cleanly because the content of the blob except for the timestamps. // It passes cleanly because the content of the blob except for the timestamps.
repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(), repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(),
0, false, Collections.emptyMap()); 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap());
} }
} }

View File

@ -100,7 +100,7 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
@Override @Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId, int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, Map<String, Object> userMetadata) { boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata) {
return null; return null;
} }

View File

@ -24,7 +24,6 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.CorruptIndexException; import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData; import org.elasticsearch.common.blobstore.BlobMetaData;
@ -39,10 +38,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.RepositoryPlugin; import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException; import java.io.IOException;
@ -102,8 +99,6 @@ public class MockRepository extends FsRepository {
private final String randomPrefix; private final String randomPrefix;
private volatile boolean blockOnInitialization;
private volatile boolean blockOnControlFiles; private volatile boolean blockOnControlFiles;
private volatile boolean blockOnDataFiles; private volatile boolean blockOnDataFiles;
@ -126,21 +121,12 @@ public class MockRepository extends FsRepository {
maximumNumberOfFailures = metadata.settings().getAsLong("max_failure_number", 100L); maximumNumberOfFailures = metadata.settings().getAsLong("max_failure_number", 100L);
blockOnControlFiles = metadata.settings().getAsBoolean("block_on_control", false); blockOnControlFiles = metadata.settings().getAsBoolean("block_on_control", false);
blockOnDataFiles = metadata.settings().getAsBoolean("block_on_data", false); blockOnDataFiles = metadata.settings().getAsBoolean("block_on_data", false);
blockOnInitialization = metadata.settings().getAsBoolean("block_on_init", false);
blockAndFailOnWriteSnapFile = metadata.settings().getAsBoolean("block_on_snap", false); blockAndFailOnWriteSnapFile = metadata.settings().getAsBoolean("block_on_snap", false);
randomPrefix = metadata.settings().get("random", "default"); randomPrefix = metadata.settings().get("random", "default");
waitAfterUnblock = metadata.settings().getAsLong("wait_after_unblock", 0L); waitAfterUnblock = metadata.settings().getAsLong("wait_after_unblock", 0L);
logger.info("starting mock repository with random prefix {}", randomPrefix); logger.info("starting mock repository with random prefix {}", randomPrefix);
} }
@Override
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData clusterMetadata) {
if (blockOnInitialization) {
blockExecution();
}
super.initializeSnapshot(snapshotId, indices, clusterMetadata);
}
private static RepositoryMetaData overrideSettings(RepositoryMetaData metadata, Environment environment) { private static RepositoryMetaData overrideSettings(RepositoryMetaData metadata, Environment environment) {
// TODO: use another method of testing not being able to read the test file written by the master... // TODO: use another method of testing not being able to read the test file written by the master...
// this is super duper hacky // this is super duper hacky
@ -174,7 +160,6 @@ public class MockRepository extends FsRepository {
// Clean blocking flags, so we wouldn't try to block again // Clean blocking flags, so we wouldn't try to block again
blockOnDataFiles = false; blockOnDataFiles = false;
blockOnControlFiles = false; blockOnControlFiles = false;
blockOnInitialization = false;
blockOnWriteIndexFile = false; blockOnWriteIndexFile = false;
blockAndFailOnWriteSnapFile = false; blockAndFailOnWriteSnapFile = false;
this.notifyAll(); this.notifyAll();
@ -200,7 +185,7 @@ public class MockRepository extends FsRepository {
logger.debug("[{}] Blocking execution", metadata.name()); logger.debug("[{}] Blocking execution", metadata.name());
boolean wasBlocked = false; boolean wasBlocked = false;
try { try {
while (blockOnDataFiles || blockOnControlFiles || blockOnInitialization || blockOnWriteIndexFile || while (blockOnDataFiles || blockOnControlFiles || blockOnWriteIndexFile ||
blockAndFailOnWriteSnapFile) { blockAndFailOnWriteSnapFile) {
blocked = true; blocked = true;
this.wait(); this.wait();

View File

@ -253,11 +253,10 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData) { public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, MetaData metaData) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
} }
@Override @Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards, public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState, List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
Map<String, Object> userMetadata) { MetaData metaData, Map<String, Object> userMetadata) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
} }

View File

@ -45,6 +45,7 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.function.Function; import java.util.function.Function;
import java.util.function.Supplier; import java.util.function.Supplier;
@ -82,6 +83,28 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
// a _source only snapshot with a plain repository it will be just fine since we already set the // a _source only snapshot with a plain repository it will be just fine since we already set the
// required engine, that the index is read-only and the mapping to a default mapping // required engine, that the index is read-only and the mapping to a default mapping
try { try {
super.initializeSnapshot(snapshotId, indices, metadataToSnapshot(indices, metaData));
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
@Override
public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List<IndexId> indices, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState, MetaData metaData,
Map<String, Object> userMetadata) {
// we process the index metadata at snapshot time. This means if somebody tries to restore
// a _source only snapshot with a plain repository it will be just fine since we already set the
// required engine, that the index is read-only and the mapping to a default mapping
try {
return super.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, metadataToSnapshot(indices, metaData), userMetadata);
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
}
private static MetaData metadataToSnapshot(List<IndexId> indices, MetaData metaData) throws IOException {
MetaData.Builder builder = MetaData.builder(metaData); MetaData.Builder builder = MetaData.builder(metaData);
for (IndexId indexId : indices) { for (IndexId indexId : indices) {
IndexMetaData index = metaData.index(indexId.getName()); IndexMetaData index = metaData.index(indexId.getName());
@ -104,12 +127,10 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
indexMetadataBuilder.settingsVersion(1 + indexMetadataBuilder.settingsVersion()); indexMetadataBuilder.settingsVersion(1 + indexMetadataBuilder.settingsVersion());
builder.put(indexMetadataBuilder); builder.put(indexMetadataBuilder);
} }
super.initializeSnapshot(snapshotId, indices, builder.build()); return builder.build();
} catch (IOException ex) {
throw new UncheckedIOException(ex);
}
} }
@Override @Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) { IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener<Void> listener) {

View File

@ -65,7 +65,7 @@ import org.hamcrest.Matchers;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Arrays; import java.util.Collections;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -200,14 +200,15 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
repository.start(); repository.start();
try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) { try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) {
IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(); IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing();
runAsSnapshot(shard.getThreadPool(), () -> {
repository.initializeSnapshot(snapshotId, Arrays.asList(indexId),
MetaData.builder().put(shard.indexSettings()
.getIndexMetaData(), false).build());
final PlainActionFuture<Void> future = PlainActionFuture.newFuture(); final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
runAsSnapshot(shard.getThreadPool(), () -> {
repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(), repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(),
indexShardSnapshotStatus, future); indexShardSnapshotStatus, future);
future.actionGet(); future.actionGet();
repository.finalizeSnapshot(snapshotId, Collections.singletonList(indexId),
indexShardSnapshotStatus.asCopy().getStartTime(), null, 1, Collections.emptyList(),
repository.getRepositoryData().getGenId(), true,
MetaData.builder().put(shard.indexSettings().getIndexMetaData(), false).build(), Collections.emptyMap());
}); });
IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount()); assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());