From b0f09b279f6289ff28d1b5c812135a10bd3a4a61 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 17 Sep 2019 13:09:39 +0200 Subject: [PATCH] Make Snapshot Logic Write Metadata after Segments (#45689) (#46764) * Write metadata during snapshot finalization after segment files to prevent outdated metadata in case of dynamic mapping updates as explained in #41581 * Keep the old behavior of writing the metadata beforehand in the case of mixed version clusters for BwC reasons * Still overwrite the metadata in the end, so even a mixed version cluster is fixed by this change if a newer version master does the finalization * Fixes #41581 --- .../repositories/FilterRepository.java | 4 +- .../repositories/Repository.java | 8 +- .../blobstore/BlobStoreRepository.java | 41 ++-- .../blobstore/ChecksumBlobStoreFormat.java | 11 +- .../repositories/blobstore/package-info.java | 213 ++++++++++++++++++ .../snapshots/SnapshotsService.java | 76 +++++-- .../RepositoriesServiceTests.java | 2 +- .../snapshots/BlobStoreFormatIT.java | 10 +- .../SharedClusterSnapshotRestoreIT.java | 172 +++----------- .../snapshots/SnapshotResiliencyTests.java | 81 +++++++ .../MockEventuallyConsistentRepository.java | 6 +- ...ckEventuallyConsistentRepositoryTests.java | 7 +- .../index/shard/RestoreOnlyRepository.java | 2 +- .../snapshots/mockstore/MockRepository.java | 17 +- .../xpack/ccr/repository/CcrRepository.java | 3 +- .../SourceOnlySnapshotRepository.java | 67 ++++-- .../SourceOnlySnapshotShardTests.java | 11 +- 17 files changed, 480 insertions(+), 251 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index 39fd92f9eaa..6d9cba05748 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -81,9 +81,9 @@ public class FilterRepository implements Repository { @Override public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, - Map userMetadata) { + MetaData metaData, Map userMetadata) { return in.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId, - includeGlobalState, userMetadata); + includeGlobalState, metaData, userMetadata); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index a5072293e36..f83712249f7 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -49,8 +49,6 @@ import java.util.function.Function; *

* To perform a snapshot: *

    - *
  • Master calls {@link #initializeSnapshot(SnapshotId, List, org.elasticsearch.cluster.metadata.MetaData)} - * with list of indices that will be included into the snapshot
  • *
  • Data nodes call {@link Repository#snapshotShard} * for each shard
  • *
  • When all shard calls return master calls {@link #finalizeSnapshot} with possible list of failures
  • @@ -117,7 +115,11 @@ public interface Repository extends LifecycleComponent { * @param snapshotId snapshot id * @param indices list of indices to be snapshotted * @param metaData cluster metadata + * + * @deprecated this method is only used when taking snapshots in a mixed version cluster where a master node older than + * {@link org.elasticsearch.snapshots.SnapshotsService#NO_REPO_INITIALIZE_VERSION} is present. */ + @Deprecated void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData); /** @@ -137,7 +139,7 @@ public interface Repository extends LifecycleComponent { */ SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, - Map userMetadata); + MetaData clusterMetaData, Map userMetadata); /** * Deletes snapshot diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 7311484566c..7bf341cfc53 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -86,7 +86,6 @@ import org.elasticsearch.repositories.RepositoryCleanupResult; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryVerificationException; -import org.elasticsearch.snapshots.InvalidSnapshotNameException; import org.elasticsearch.snapshots.SnapshotCreationException; import org.elasticsearch.snapshots.SnapshotException; import org.elasticsearch.snapshots.SnapshotId; @@ -182,7 +181,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private static final String TESTS_FILE = "tests-"; - private static final String METADATA_PREFIX = "meta-"; + public static final String METADATA_PREFIX = "meta-"; public static final String METADATA_NAME_FORMAT = METADATA_PREFIX + "%s.dat"; @@ -386,23 +385,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp @Override public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData clusterMetaData) { - if (isReadOnly()) { - throw new RepositoryException(metadata.name(), "cannot create snapshot in a readonly repository"); - } try { - final String snapshotName = snapshotId.getName(); - // check if the snapshot name already exists in the repository - final RepositoryData repositoryData = getRepositoryData(); - if (repositoryData.getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) { - throw new InvalidSnapshotNameException(metadata.name(), snapshotId.getName(), "snapshot with the same name already exists"); - } - // Write Global MetaData - globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID()); + globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), true); // write the index metadata for each index in the snapshot for (IndexId index : indices) { - indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID()); + indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), true); } } catch (IOException ex) { throw new SnapshotCreationException(metadata.name(), snapshotId, ex); @@ -667,14 +656,34 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp final List shardFailures, final long repositoryStateId, final boolean includeGlobalState, + final MetaData clusterMetaData, final Map userMetadata) { SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId, indices.stream().map(IndexId::getName).collect(Collectors.toList()), startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures, includeGlobalState, userMetadata); + + try { + // We ignore all FileAlreadyExistsException here since otherwise a master failover while in this method will + // mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the + // index or global metadata will be compatible with the segments written in this snapshot as well. + // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way that + // decrements the generation it points at + + // Write Global MetaData + globalMetaDataFormat.write(clusterMetaData, blobContainer(), snapshotId.getUUID(), false); + + // write the index metadata for each index in the snapshot + for (IndexId index : indices) { + indexMetaDataFormat.write(clusterMetaData.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false); + } + } catch (IOException ex) { + throw new SnapshotException(metadata.name(), snapshotId, "failed to write metadata for snapshot", ex); + } + try { final RepositoryData updatedRepositoryData = getRepositoryData().addSnapshot(snapshotId, blobStoreSnapshot.state(), indices); - snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID()); + snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID(), false); writeIndexGen(updatedRepositoryData, repositoryStateId); } catch (FileAlreadyExistsException ex) { // if another master was elected and took over finalizing the snapshot, it is possible @@ -1052,7 +1061,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); try { - indexShardSnapshotFormat.write(snapshot, shardContainer, snapshotId.getUUID()); + indexShardSnapshotFormat.write(snapshot, shardContainer, snapshotId.getUUID(), false); } catch (IOException e) { throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java index 605dcae6489..9c7c7559fcb 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/ChecksumBlobStoreFormat.java @@ -175,15 +175,16 @@ public final class ChecksumBlobStoreFormat { *

    * The blob will be compressed and checksum will be written if required. * - * @param obj object to be serialized - * @param blobContainer blob container - * @param name blob name + * @param obj object to be serialized + * @param blobContainer blob container + * @param name blob name + * @param failIfAlreadyExists Whether to fail if the blob already exists */ - public void write(T obj, BlobContainer blobContainer, String name) throws IOException { + public void write(T obj, BlobContainer blobContainer, String name, boolean failIfAlreadyExists) throws IOException { final String blobName = blobName(name); writeTo(obj, blobName, bytesArray -> { try (InputStream stream = bytesArray.streamInput()) { - blobContainer.writeBlob(blobName, stream, bytesArray.length(), true); + blobContainer.writeBlob(blobName, stream, bytesArray.length(), failIfAlreadyExists); } }); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java new file mode 100644 index 00000000000..5cc98f6c3e9 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java @@ -0,0 +1,213 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/** + *

    This package exposes the blobstore repository used by Elasticsearch Snapshots.

    + * + *

    Preliminaries

    + * + *

    The {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository} forms the basis of implementations of + * {@link org.elasticsearch.repositories.Repository} on top of a blob store. A blobstore can be used as the basis for an implementation + * as long as it provides for GET, PUT, DELETE, and LIST operations. For a read-only repository, it suffices if the blobstore provides only + * GET operations. + * These operations are formally defined as specified by the {@link org.elasticsearch.common.blobstore.BlobContainer} interface that + * any {@code BlobStoreRepository} implementation must provide via its implementation of + * {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository#getBlobContainer()}.

    + * + *

    The blob store is written to and read from by master-eligible nodes and data nodes. All metadata related to a snapshot's + * scope and health is written by the master node.

    + *

    The data-nodes on the other hand, write the data for each individual shard but do not write any blobs outside of shard directories for + * shards that they hold the primary of. For each shard, the data-node holding the shard's primary writes the actual data in form of + * the shard's segment files to the repository as well as metadata about all the segment files that the repository stores for the shard.

    + * + *

    For the specifics on how the operations on the repository documented below are invoked during the snapshot process please refer to + * the documentation of the {@link org.elasticsearch.snapshots} package.

    + * + *

    {@code BlobStoreRepository} maintains the following structure of blobs containing data and metadata in the blob store. The exact + * operations executed on these blobs are explained below.

    + *
    + * {@code
    + *   STORE_ROOT
    + *   |- index-N           - JSON serialized {@link org.elasticsearch.repositories.RepositoryData} containing a list of all snapshot ids
    + *   |                      and the indices belonging to each snapshot, N is the generation of the file
    + *   |- index.latest      - contains the numeric value of the latest generation of the index file (i.e. N from above)
    + *   |- incompatible-snapshots - list of all snapshot ids that are no longer compatible with the current version of the cluster
    + *   |- snap-20131010.dat - SMILE serialized {@link org.elasticsearch.snapshots.SnapshotInfo} for snapshot "20131010"
    + *   |- meta-20131010.dat - SMILE serialized {@link org.elasticsearch.cluster.metadata.MetaData} for snapshot "20131010"
    + *   |                      (includes only global metadata)
    + *   |- snap-20131011.dat - SMILE serialized {@link org.elasticsearch.snapshots.SnapshotInfo} for snapshot "20131011"
    + *   |- meta-20131011.dat - SMILE serialized {@link org.elasticsearch.cluster.metadata.MetaData} for snapshot "20131011"
    + *   .....
    + *   |- indices/ - data for all indices
    + *      |- Ac1342-B_x/ - data for index "foo" which was assigned the unique id Ac1342-B_x (not to be confused with the actual index uuid)
    + *      |  |             in the repository
    + *      |  |- meta-20131010.dat - JSON Serialized {@link org.elasticsearch.cluster.metadata.IndexMetaData} for index "foo"
    + *      |  |- 0/ - data for shard "0" of index "foo"
    + *      |  |  |- __1                      \  (files with numeric names were created by older ES versions)
    + *      |  |  |- __2                      |
    + *      |  |  |- __VPO5oDMVT5y4Akv8T_AO_A |- files from different segments see snap-* for their mappings to real segment files
    + *      |  |  |- __1gbJy18wS_2kv1qI7FgKuQ |
    + *      |  |  |- __R8JvZAHlSMyMXyZc2SS8Zg /
    + *      |  |  .....
    + *      |  |  |- snap-20131010.dat - SMILE serialized {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot} for
    + *      |  |  |                      snapshot "20131010"
    + *      |  |  |- snap-20131011.dat - SMILE serialized {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot} for
    + *      |  |  |                      snapshot "20131011"
    + *      |  |  |- index-123         - SMILE serialized {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} for
    + *      |  |  |                      the shard
    + *      |  |
    + *      |  |- 1/ - data for shard "1" of index "foo"
    + *      |  |  |- __1
    + *      |  |  .....
    + *      |  |
    + *      |  |-2/
    + *      |  ......
    + *      |
    + *      |- 1xB0D8_B3y/ - data for index "bar" which was assigned the unique id of 1xB0D8_B3y in the repository
    + *      ......
    + * }
    + * 
    + * + *

    Getting the Repository's RepositoryData

    + * + *

    Loading the {@link org.elasticsearch.repositories.RepositoryData} that holds the list of all snapshots as well as the mapping of + * indices' names to their repository {@link org.elasticsearch.repositories.IndexId} is done by invoking + * {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository#getRepositoryData} and implemented as follows:

    + *
      + *
    1. + *
        + *
      1. The blobstore repository stores the {@code RepositoryData} in blobs named with incrementing suffix {@code N} at {@code /index-N} + * directly under the repository's root.
      2. + *
      3. The blobstore also stores the most recent {@code N} as a 64bit long in the blob {@code /index.latest} directly under the + * repository's root.
      4. + *
      + *
    2. + *
    3. + *
        + *
      1. First, find the most recent {@code RepositoryData} by getting a list of all index-N blobs through listing all blobs with prefix + * "index-" under the repository root and then selecting the one with the highest value for N.
      2. + *
      3. If this operation fails because the repository's {@code BlobContainer} does not support list operations (in the case of read-only + * repositories), read the highest value of N from the index.latest blob.
      4. + *
      + *
    4. + *
    5. + *
        + *
      1. Use the just determined value of {@code N} and get the {@code /index-N} blob and deserialize the {@code RepositoryData} from it.
      2. + *
      3. If no value of {@code N} could be found since neither an {@code index.latest} nor any {@code index-N} blobs exist in the repository, + * it is assumed to be empty and {@link org.elasticsearch.repositories.RepositoryData#EMPTY} is returned.
      4. + *
      + *
    6. + *
    + *

    Creating a Snapshot

    + * + *

    Creating a snapshot in the repository happens in the three steps described in detail below.

    + * + *

    Initializing a Snapshot in the Repository (Mixed Version Clusters only)

    + * + *

    In mixed version clusters that contain a node older than + * {@link org.elasticsearch.snapshots.SnapshotsService#NO_REPO_INITIALIZE_VERSION}, creating a snapshot in the repository starts with a + * call to {@link org.elasticsearch.repositories.Repository#initializeSnapshot} which the blob store repository implements via the + * following actions:

    + *
      + *
    1. Verify that no snapshot by the requested name exists.
    2. + *
    3. Write a blob containing the cluster metadata to the root of the blob store repository at {@code /meta-${snapshot-uuid}.dat}
    4. + *
    5. Write the metadata for each index to a blob in that index's directory at + * {@code /indices/${index-snapshot-uuid}/meta-${snapshot-uuid}.dat}
    6. + *
    + * TODO: Remove this section once BwC logic it references is removed + * + *

    Writing Shard Data (Segments)

    + * + *

    Once all the metadata has been written by the snapshot initialization, the snapshot process moves on to writing the actual shard data + * to the repository by invoking {@link org.elasticsearch.repositories.Repository#snapshotShard} on the data-nodes that hold the primaries + * for the shards in the current snapshot. It is implemented as follows:

    + * + *

    Note:

    + *
      + *
    • For each shard {@code i} in a given index, its path in the blob store is located at {@code /indices/${index-snapshot-uuid}/${i}}
    • + *
    • All the following steps are executed exclusively on the shard's primary's data node.
    • + *
    + * + *
      + *
    1. Create the {@link org.apache.lucene.index.IndexCommit} for the shard to snapshot.
    2. + *
    3. List all blobs in the shard's path. Find the {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots} blob + * with name {@code index-${N}} for the highest possible value of {@code N} in the list to get the information of what segment files are + * already available in the blobstore.
    4. + *
    5. By comparing the files in the {@code IndexCommit} and the available file list from the previous step, determine the segment files + * that need to be written to the blob store. For each segment that needs to be added to the blob store, generate a unique name by combining + * the segment data blob prefix {@code __} and a UUID and write the segment to the blobstore.
    6. + *
    7. After completing all segment writes, a blob containing a + * {@link org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot} with name {@code snap-${snapshot-uuid}.dat} is written to + * the shard's path and contains a list of all the files referenced by the snapshot as well as some metadata about the snapshot. See the + * documentation of {@code BlobStoreIndexShardSnapshot} for details on its contents.
    8. + *
    9. Once all the segments and the {@code BlobStoreIndexShardSnapshot} blob have been written, an updated + * {@code BlobStoreIndexShardSnapshots} blob is written to the shard's path with name {@code index-${N+1}}.
    10. + *
    + * + *

    Finalizing the Snapshot

    + * + *

    After all primaries have finished writing the necessary segment files to the blob store in the previous step, the master node moves on + * to finalizing the snapshot by invoking {@link org.elasticsearch.repositories.Repository#finalizeSnapshot}. This method executes the + * following actions in order:

    + *
      + *
    1. Write a blob containing the cluster metadata to the root of the blob store repository at {@code /meta-${snapshot-uuid}.dat}
    2. + *
    3. Write the metadata for each index to a blob in that index's directory at + * {@code /indices/${index-snapshot-uuid}/meta-${snapshot-uuid}.dat}
    4. + *
    5. Write the {@link org.elasticsearch.snapshots.SnapshotInfo} blob for the given snapshot to the key {@code /snap-${snapshot-uuid}.dat} + * directly under the repository root.
    6. + *
    7. Write an updated {@code RepositoryData} blob to the key {@code /index-${N+1}} using the {@code N} determined when initializing the + * snapshot in the first step. When doing this, the implementation checks that the blob for generation {@code N + 1} has not yet been + * written to prevent concurrent updates to the repository. If the blob for {@code N + 1} already exists the execution of finalization + * stops under the assumption that a master failover occurred and the snapshot has already been finalized by the new master.
    8. + *
    9. Write the updated {@code /index.latest} blob containing the new repository generation {@code N + 1}.
    10. + *
    + * + *

    Deleting a Snapshot

    + * + *

    Deleting a snapshot is an operation that is exclusively executed on the master node that runs through the following sequence of + * action when {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository#deleteSnapshot} is invoked:

    + * + *
      + *
    1. Get the current {@code RepositoryData} from the latest {@code index-N} blob at the repository root.
    2. + *
    3. Write an updated {@code RepositoryData} blob with the deleted snapshot removed to key {@code /index-${N+1}} directly under the + * repository root.
    4. + *
    5. Write an updated {@code index.latest} blob containing {@code N + 1}.
    6. + *
    7. Delete the global {@code MetaData} blob {@code meta-${snapshot-uuid}.dat} stored directly under the repository root for the snapshot + * as well as the {@code SnapshotInfo} blob at {@code /snap-${snapshot-uuid}.dat}.
    8. + *
    9. For each index referenced by the snapshot: + *
        + *
      1. Delete the snapshot's {@code IndexMetaData} at {@code /indices/${index-snapshot-uuid}/meta-${snapshot-uuid}}.
      2. + *
      3. Go through all shard directories {@code /indices/${index-snapshot-uuid}/${i}} and: + *
          + *
        1. Remove the {@code BlobStoreIndexShardSnapshot} blob at {@code /indices/${index-snapshot-uuid}/${i}/snap-${snapshot-uuid}.dat}.
        2. + *
        3. List all blobs in the shard path {@code /indices/${index-snapshot-uuid}} and build a new {@code BlobStoreIndexShardSnapshots} from + * the remaining {@code BlobStoreIndexShardSnapshot} blobs in the shard. Afterwards, write it to the next shard generation blob at + * {@code /indices/${index-snapshot-uuid}/${i}/index-${N+1}} (The shard's generation is determined from the list of {@code index-N} blobs + * in the shard directory).
        4. + *
        5. Delete all segment blobs (identified by having the data blob prefix {@code __}) in the shard directory which are not referenced by + * the new {@code BlobStoreIndexShardSnapshots} that has been written in the previous step.
        6. + *
        + *
      4. + *
      + *
    10. + *
    + * TODO: The above sequence of actions can lead to leaking files when an index completely goes out of scope. Adjust this documentation once + * https://github.com/elastic/elasticsearch/issues/13159 is fixed. + */ +package org.elasticsearch.repositories.blobstore; diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index a9ebb75edd3..5af6e397a55 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -26,6 +26,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.util.CollectionUtil; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; @@ -68,6 +69,7 @@ import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.threadpool.ThreadPool; @@ -108,12 +110,19 @@ import static org.elasticsearch.cluster.SnapshotsInProgress.completed; * the {@link SnapshotShardsService#sendSnapshotShardUpdate(Snapshot, ShardId, ShardSnapshotStatus)} method *
  • When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot * as completed
  • - *
  • After cluster state is updated, the {@link #endSnapshot(SnapshotsInProgress.Entry)} finalizes snapshot in the repository, + *
  • After cluster state is updated, the {@link #endSnapshot(SnapshotsInProgress.Entry, MetaData)} finalizes snapshot in the repository, * notifies all {@link #snapshotCompletionListeners} that snapshot is completed, and finally calls * {@link #removeSnapshotFromClusterState(Snapshot, SnapshotInfo, Exception)} to remove snapshot from cluster state
  • *
*/ public class SnapshotsService extends AbstractLifecycleComponent implements ClusterStateApplier { + + /** + * Minimum node version which does not use {@link Repository#initializeSnapshot(SnapshotId, List, MetaData)} to write snapshot metadata + * when starting a snapshot. + */ + public static final Version NO_REPO_INITIALIZE_VERSION = Version.V_7_5_0; + private static final Logger logger = LogManager.getLogger(SnapshotsService.class); private final ClusterService clusterService; @@ -398,24 +407,29 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus assert initializingSnapshots.contains(snapshot.snapshot()); Repository repository = repositoriesService.repository(snapshot.snapshot().getRepository()); - MetaData metaData = clusterState.metaData(); - if (!snapshot.includeGlobalState()) { - // Remove global state from the cluster state - MetaData.Builder builder = MetaData.builder(); - for (IndexId index : snapshot.indices()) { - builder.put(metaData.index(index.getName()), false); - } - metaData = builder.build(); + if (repository.isReadOnly()) { + throw new RepositoryException(repository.getMetadata().name(), "cannot create snapshot in a readonly repository"); + } + final String snapshotName = snapshot.snapshot().getSnapshotId().getName(); + // check if the snapshot name already exists in the repository + if (repository.getRepositoryData().getSnapshotIds().stream().anyMatch(s -> s.getName().equals(snapshotName))) { + throw new InvalidSnapshotNameException( + repository.getMetadata().name(), snapshotName, "snapshot with the same name already exists"); + } + if (clusterState.nodes().getMinNodeVersion().onOrAfter(NO_REPO_INITIALIZE_VERSION) == false) { + // In mixed version clusters we initialize the snapshot in the repository so that in case of a master failover to an + // older version master node snapshot finalization (that assumes initializeSnapshot was called) produces a valid + // snapshot. + repository.initializeSnapshot( + snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaDataForSnapshot(snapshot, clusterState.metaData())); } - - repository.initializeSnapshot(snapshot.snapshot().getSnapshotId(), snapshot.indices(), metaData); snapshotCreated = true; logger.info("snapshot [{}] started", snapshot.snapshot()); if (snapshot.indices().isEmpty()) { // No indices in this snapshot - we are done userCreateSnapshotListener.onResponse(snapshot.snapshot()); - endSnapshot(snapshot); + endSnapshot(snapshot, clusterState.metaData()); return; } clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() { @@ -498,7 +512,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus assert snapshotsInProgress != null; final SnapshotsInProgress.Entry entry = snapshotsInProgress.snapshot(snapshot.snapshot()); assert entry != null; - endSnapshot(entry); + endSnapshot(entry, newState.metaData()); } } }); @@ -553,15 +567,16 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus snapshot.indices(), snapshot.startTime(), ExceptionsHelper.detailedMessage(exception), - 0, - Collections.emptyList(), - snapshot.getRepositoryStateId(), - snapshot.includeGlobalState(), - snapshot.userMetadata()); - } catch (Exception inner) { - inner.addSuppressed(exception); - logger.warn(() -> new ParameterizedMessage("[{}] failed to close snapshot in repository", - snapshot.snapshot()), inner); + 0, + Collections.emptyList(), + snapshot.getRepositoryStateId(), + snapshot.includeGlobalState(), + metaDataForSnapshot(snapshot, clusterService.state().metaData()), + snapshot.userMetadata()); + } catch (Exception inner) { + inner.addSuppressed(exception); + logger.warn(() -> new ParameterizedMessage("[{}] failed to close snapshot in repository", + snapshot.snapshot()), inner); } } userCreateSnapshotListener.onFailure(e); @@ -569,6 +584,18 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus } } + private static MetaData metaDataForSnapshot(SnapshotsInProgress.Entry snapshot, MetaData metaData) { + if (snapshot.includeGlobalState() == false) { + // Remove global state from the cluster state + MetaData.Builder builder = MetaData.builder(); + for (IndexId index : snapshot.indices()) { + builder.put(metaData.index(index.getName()), false); + } + metaData = builder.build(); + } + return metaData; + } + private static SnapshotInfo inProgressSnapshot(SnapshotsInProgress.Entry entry) { return new SnapshotInfo(entry.snapshot().getSnapshotId(), entry.indices().stream().map(IndexId::getName).collect(Collectors.toList()), @@ -714,7 +741,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus entry -> entry.state().completed() || initializingSnapshots.contains(entry.snapshot()) == false && (entry.state() == State.INIT || completed(entry.shards().values())) - ).forEach(this::endSnapshot); + ).forEach(entry -> endSnapshot(entry, event.state().metaData())); } if (newMaster) { finalizeSnapshotDeletionFromPreviousMaster(event); @@ -961,7 +988,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus * * @param entry snapshot */ - private void endSnapshot(final SnapshotsInProgress.Entry entry) { + private void endSnapshot(SnapshotsInProgress.Entry entry, MetaData metaData) { if (endingSnapshots.add(entry.snapshot()) == false) { return; } @@ -989,6 +1016,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus unmodifiableList(shardFailures), entry.getRepositoryStateId(), entry.includeGlobalState(), + metaDataForSnapshot(entry, metaData), entry.userMetadata()); removeSnapshotFromClusterState(snapshot, snapshotInfo, null); logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 55a365af5d5..7a1bcefea9d 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -161,7 +161,7 @@ public class RepositoriesServiceTests extends ESTestCase { @Override public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, - boolean includeGlobalState, Map userMetadata) { + boolean includeGlobalState, MetaData metaData, Map userMetadata) { return null; } diff --git a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java index 882b3cc4b1e..c00760899c4 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/BlobStoreFormatIT.java @@ -116,8 +116,8 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase { xContentRegistry(), true); // Write blobs in different formats - checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile"); - checksumSMILECompressed.write(new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp"); + checksumSMILE.write(new BlobObj("checksum smile"), blobContainer, "check-smile", true); + checksumSMILECompressed.write(new BlobObj("checksum smile compressed"), blobContainer, "check-smile-comp", true); // Assert that all checksum blobs can be read by all formats assertEquals(checksumSMILE.read(blobContainer, "check-smile").getText(), "checksum smile"); @@ -136,8 +136,8 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase { ChecksumBlobStoreFormat checksumFormatComp = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent, xContentRegistry(), true); BlobObj blobObj = new BlobObj(veryRedundantText.toString()); - checksumFormatComp.write(blobObj, blobContainer, "blob-comp"); - checksumFormat.write(blobObj, blobContainer, "blob-not-comp"); + checksumFormatComp.write(blobObj, blobContainer, "blob-comp", true); + checksumFormat.write(blobObj, blobContainer, "blob-not-comp", true); Map blobs = blobContainer.listBlobsByPrefix("blob-"); assertEquals(blobs.size(), 2); assertThat(blobs.get("blob-not-comp").length(), greaterThan(blobs.get("blob-comp").length())); @@ -150,7 +150,7 @@ public class BlobStoreFormatIT extends AbstractSnapshotIntegTestCase { BlobObj blobObj = new BlobObj(testString); ChecksumBlobStoreFormat checksumFormat = new ChecksumBlobStoreFormat<>(BLOB_CODEC, "%s", BlobObj::fromXContent, xContentRegistry(), randomBoolean()); - checksumFormat.write(blobObj, blobContainer, "test-path"); + checksumFormat.write(blobObj, blobContainer, "test-path", true); assertEquals(checksumFormat.read(blobContainer, "test-path").getText(), testString); randomCorruption(blobContainer, "test-path"); try { diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 4fca615cb77..139d8201cad 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -48,7 +48,6 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; @@ -111,7 +110,6 @@ import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -2434,28 +2432,15 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas Client client = client(); - boolean allowPartial = randomBoolean(); logger.info("--> creating repository"); - // only block on repo init if we have partial snapshot or we run into deadlock when acquiring shard locks for index deletion/closing - boolean initBlocking = allowPartial || randomBoolean(); - if (initBlocking) { - assertAcked(client.admin().cluster().preparePutRepository("test-repo") - .setType("mock").setSettings(Settings.builder() - .put("location", randomRepoPath()) - .put("compress", randomBoolean()) - .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES) - .put("block_on_init", true) - )); - } else { - assertAcked(client.admin().cluster().preparePutRepository("test-repo") - .setType("mock").setSettings(Settings.builder() - .put("location", randomRepoPath()) - .put("compress", randomBoolean()) - .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES) - .put("block_on_data", true) - )); - } + assertAcked(client.admin().cluster().preparePutRepository("test-repo") + .setType("mock").setSettings(Settings.builder() + .put("location", randomRepoPath()) + .put("compress", randomBoolean()) + .put("chunk_size", randomIntBetween(100, 1000), ByteSizeUnit.BYTES) + .put("block_on_data", true))); + createIndex("test-idx-1", "test-idx-2", "test-idx-3"); ensureGreen(); @@ -2471,70 +2456,40 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertThat(client.prepareSearch("test-idx-2").setSize(0).get().getHits().getTotalHits().value, equalTo(100L)); assertThat(client.prepareSearch("test-idx-3").setSize(0).get().getHits().getTotalHits().value, equalTo(100L)); - logger.info("--> snapshot allow partial {}", allowPartial); + logger.info("--> snapshot"); ActionFuture future = client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap") - .setIndices("test-idx-*").setWaitForCompletion(true).setPartial(allowPartial).execute(); + .setIndices("test-idx-*").setWaitForCompletion(true).setPartial(false).execute(); logger.info("--> wait for block to kick in"); - if (initBlocking) { - waitForBlock(internalCluster().getMasterName(), "test-repo", TimeValue.timeValueMinutes(1)); - } else { - waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1)); - } - boolean closedOnPartial = false; + waitForBlockOnAnyDataNode("test-repo", TimeValue.timeValueMinutes(1)); + try { - if (allowPartial) { - // partial snapshots allow close / delete operations - if (randomBoolean()) { - logger.info("--> delete index while partial snapshot is running"); + // non-partial snapshots do not allow close / delete operations on indices where snapshot has not been completed + if (randomBoolean()) { + try { + logger.info("--> delete index while non-partial snapshot is running"); client.admin().indices().prepareDelete("test-idx-1").get(); - } else { - logger.info("--> close index while partial snapshot is running"); - closedOnPartial = true; - client.admin().indices().prepareClose("test-idx-1").setWaitForActiveShards(ActiveShardCount.DEFAULT).get(); + fail("Expected deleting index to fail during snapshot"); + } catch (SnapshotInProgressException e) { + assertThat(e.getMessage(), containsString("Cannot delete indices that are being snapshotted: [[test-idx-1/")); } } else { - // non-partial snapshots do not allow close / delete operations on indices where snapshot has not been completed - if (randomBoolean()) { - try { - logger.info("--> delete index while non-partial snapshot is running"); - client.admin().indices().prepareDelete("test-idx-1").get(); - fail("Expected deleting index to fail during snapshot"); - } catch (SnapshotInProgressException e) { - assertThat(e.getMessage(), containsString("Cannot delete indices that are being snapshotted: [[test-idx-1/")); - } - } else { - try { - logger.info("--> close index while non-partial snapshot is running"); - client.admin().indices().prepareClose("test-idx-1").get(); - fail("Expected closing index to fail during snapshot"); - } catch (SnapshotInProgressException e) { - assertThat(e.getMessage(), containsString("Cannot close indices that are being snapshotted: [[test-idx-1/")); - } + try { + logger.info("--> close index while non-partial snapshot is running"); + client.admin().indices().prepareClose("test-idx-1").get(); + fail("Expected closing index to fail during snapshot"); + } catch (SnapshotInProgressException e) { + assertThat(e.getMessage(), containsString("Cannot close indices that are being snapshotted: [[test-idx-1/")); } } } finally { - if (initBlocking) { - logger.info("--> unblock running master node"); - unblockNode("test-repo", internalCluster().getMasterName()); - } else { - logger.info("--> unblock all data nodes"); - unblockAllDataNodes("test-repo"); - } + logger.info("--> unblock all data nodes"); + unblockAllDataNodes("test-repo"); } logger.info("--> waiting for snapshot to finish"); CreateSnapshotResponse createSnapshotResponse = future.get(); - if (allowPartial && closedOnPartial == false) { - logger.info("Deleted/Closed index during snapshot, but allow partial"); - assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.PARTIAL))); - assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); - assertThat(createSnapshotResponse.getSnapshotInfo().failedShards(), greaterThan(0)); - assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), - lessThan(createSnapshotResponse.getSnapshotInfo().totalShards())); - } else { - logger.info("Snapshot successfully completed"); - assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.SUCCESS))); - } + logger.info("Snapshot successfully completed"); + assertThat(createSnapshotResponse.getSnapshotInfo().state(), equalTo((SnapshotState.SUCCESS))); } public void testCloseIndexDuringRestore() throws Exception { @@ -3385,7 +3340,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertThat(shardFailure.reason(), containsString("Random IOException")); } } - } catch (SnapshotCreationException | RepositoryException ex) { + } catch (SnapshotException | RepositoryException ex) { // sometimes, the snapshot will fail with a top level I/O exception assertThat(ExceptionsHelper.stackTrace(ex), containsString("Random IOException")); } @@ -3748,75 +3703,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertThat(client.prepareGet(restoredIndexName2, typeName, sameSourceIndex ? docId : docId2).get().isExists(), equalTo(true)); } - public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception { - final Client client = client(); - - // Blocks on initialization - assertAcked(client.admin().cluster().preparePutRepository("repository") - .setType("mock").setSettings(Settings.builder() - .put("location", randomRepoPath()) - .put("block_on_init", true) - )); - - createIndex("test-idx"); - final int nbDocs = scaledRandomIntBetween(100, 500); - for (int i = 0; i < nbDocs; i++) { - index("test-idx", "_doc", Integer.toString(i), "foo", "bar" + i); - } - flushAndRefresh("test-idx"); - assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo((long) nbDocs)); - - // Create a snapshot - client.admin().cluster().prepareCreateSnapshot("repository", "snap").execute(); - waitForBlock(internalCluster().getMasterName(), "repository", TimeValue.timeValueMinutes(1)); - boolean blocked = true; - - // Snapshot is initializing (and is blocked at this stage) - SnapshotsStatusResponse snapshotsStatus = client.admin().cluster().prepareSnapshotStatus("repository").setSnapshots("snap").get(); - assertThat(snapshotsStatus.getSnapshots().iterator().next().getState(), equalTo(State.INIT)); - - final List states = new CopyOnWriteArrayList<>(); - final ClusterStateListener listener = event -> { - SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); - for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) { - if ("snap".equals(entry.snapshot().getSnapshotId().getName())) { - states.add(entry.state()); - } - } - }; - - try { - // Record the upcoming states of the snapshot on all nodes - internalCluster().getInstances(ClusterService.class).forEach(clusterService -> clusterService.addListener(listener)); - - // Delete the snapshot while it is being initialized - ActionFuture delete = client.admin().cluster().prepareDeleteSnapshot("repository", "snap").execute(); - - // The deletion must set the snapshot in the ABORTED state - assertBusy(() -> { - SnapshotsStatusResponse status = - client.admin().cluster().prepareSnapshotStatus("repository").setSnapshots("snap").get(); - assertThat(status.getSnapshots().iterator().next().getState(), equalTo(State.ABORTED)); - }); - - // Now unblock the repository - unblockNode("repository", internalCluster().getMasterName()); - blocked = false; - - assertAcked(delete.get()); - expectThrows(SnapshotMissingException.class, () -> - client.admin().cluster().prepareGetSnapshots("repository").setSnapshots("snap").get()); - - assertFalse("Expecting snapshot state to be updated", states.isEmpty()); - assertFalse("Expecting snapshot to be aborted and not started at all", states.contains(State.STARTED)); - } finally { - internalCluster().getInstances(ClusterService.class).forEach(clusterService -> clusterService.removeListener(listener)); - if (blocked) { - unblockNode("repository", internalCluster().getMasterName()); - } - } - } - public void testRestoreIncreasesPrimaryTerms() { final String indexName = randomAlphaOfLengthBetween(5, 10).toLowerCase(Locale.ROOT); createIndex(indexName, Settings.builder() diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 3ffa975a203..5a802414864 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -196,12 +196,14 @@ import java.util.stream.Stream; import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; +import static org.elasticsearch.action.support.ActionTestUtils.assertNoFailureListener; import static org.elasticsearch.env.Environment.PATH_HOME_SETTING; import static org.elasticsearch.node.Node.NODE_NAME_SETTING; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.mockito.Mockito.mock; public class SnapshotResiliencyTests extends ESTestCase { @@ -493,6 +495,85 @@ public class SnapshotResiliencyTests extends ESTestCase { assertThat(snapshotIds, either(hasSize(1)).or(hasSize(0))); } + public void testSuccessfulSnapshotWithConcurrentDynamicMappingUpdates() { + setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); + + String repoName = "repo"; + String snapshotName = "snapshot"; + final String index = "test"; + + final int shards = randomIntBetween(1, 10); + final int documents = randomIntBetween(2, 100); + TestClusterNode masterNode = + testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); + + final StepListener createSnapshotResponseStepListener = new StepListener<>(); + + continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> { + final AtomicBoolean initiatedSnapshot = new AtomicBoolean(false); + for (int i = 0; i < documents; ++i) { + // Index a few documents with different field names so we trigger a dynamic mapping update for each of them + masterNode.client.bulk( + new BulkRequest().add(new IndexRequest(index).source(Collections.singletonMap("foo" + i, "bar"))) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), + assertNoFailureListener( + bulkResponse -> { + assertFalse("Failures in bulkresponse: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); + if (initiatedSnapshot.compareAndSet(false, true)) { + masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true).execute(createSnapshotResponseStepListener); + } + })); + } + }); + + final String restoredIndex = "restored"; + + final StepListener restoreSnapshotResponseStepListener = new StepListener<>(); + + continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> masterNode.client.admin().cluster().restoreSnapshot( + new RestoreSnapshotRequest(repoName, snapshotName) + .renamePattern(index).renameReplacement(restoredIndex).waitForCompletion(true), restoreSnapshotResponseStepListener)); + + final StepListener searchResponseStepListener = new StepListener<>(); + + continueOrDie(restoreSnapshotResponseStepListener, restoreSnapshotResponse -> { + assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards()); + masterNode.client.search( + new SearchRequest(restoredIndex).source(new SearchSourceBuilder().size(documents).trackTotalHits(true)), + searchResponseStepListener); + }); + + final AtomicBoolean documentCountVerified = new AtomicBoolean(); + + continueOrDie(searchResponseStepListener, r -> { + final long hitCount = r.getHits().getTotalHits().value; + assertThat( + "Documents were restored but the restored index mapping was older than some documents and misses some of their fields", + (int) hitCount, + lessThanOrEqualTo(((Map) masterNode.clusterService.state().metaData().index(restoredIndex).mapping() + .sourceAsMap().get("properties")).size()) + ); + documentCountVerified.set(true); + }); + + runUntil(documentCountVerified::get, TimeUnit.MINUTES.toMillis(5L)); + + assertNotNull(createSnapshotResponseStepListener.result()); + assertNotNull(restoreSnapshotResponseStepListener.result()); + SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); + assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); + final Repository repository = masterNode.repositoriesService.repository(repoName); + Collection snapshotIds = repository.getRepositoryData().getSnapshotIds(); + assertThat(snapshotIds, hasSize(1)); + + final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotIds.iterator().next()); + assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); + assertThat(snapshotInfo.indices(), containsInAnyOrder(index)); + assertEquals(shards, snapshotInfo.successfulShards()); + assertEquals(0, snapshotInfo.failedShards()); + } + private StepListener createRepoAndIndex(TestClusterNode masterNode, String repoName, String index, int shards) { final AdminClient adminClient = masterNode.client.admin(); diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index 9de395d5f82..11f30e0633b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -291,9 +291,11 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository { // We do some checks in case there is a consistent state for a blob to prevent turning it inconsistent. final boolean hasConsistentContent = relevantActions.size() == 1 && relevantActions.get(0).operation == Operation.PUT; - if (BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName)) { + if (BlobStoreRepository.INDEX_LATEST_BLOB.equals(blobName) + || blobName.startsWith(BlobStoreRepository.METADATA_PREFIX)) { // TODO: Ensure that it is impossible to ever decrement the generation id stored in index.latest then assert that - // it never decrements here + // it never decrements here. Same goes for the metadata, ensure that we never overwrite newer with older + // metadata. } else if (blobName.startsWith(BlobStoreRepository.SNAPSHOT_PREFIX)) { if (hasConsistentContent) { if (basePath().buildAsString().equals(path().buildAsString())) { diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index 5500d603ac5..14d4a5ba60b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.snapshots.mockstore; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.blobstore.BlobContainer; @@ -158,19 +159,19 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase { // We create a snap- blob for snapshot "foo" in the first generation final SnapshotId snapshotId = new SnapshotId("foo", UUIDs.randomBase64UUID()); repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(), - -1L, false, Collections.emptyMap()); + -1L, false, MetaData.EMPTY_META_DATA, Collections.emptyMap()); // We try to write another snap- blob for "foo" in the next generation. It fails because the content differs. final AssertionError assertionError = expectThrows(AssertionError.class, () -> repository.finalizeSnapshot( snapshotId, Collections.emptyList(), 1L, null, 6, Collections.emptyList(), - 0, false, Collections.emptyMap())); + 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap())); assertThat(assertionError.getMessage(), equalTo("\nExpected: <6>\n but: was <5>")); // We try to write yet another snap- blob for "foo" in the next generation. // It passes cleanly because the content of the blob except for the timestamps. repository.finalizeSnapshot(snapshotId, Collections.emptyList(), 1L, null, 5, Collections.emptyList(), - 0, false, Collections.emptyMap()); + 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap()); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 313bf7c5daa..417e4e98649 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -100,7 +100,7 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i @Override public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, - boolean includeGlobalState, Map userMetadata) { + boolean includeGlobalState, MetaData metaData, Map userMetadata) { return null; } diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index bd0a5cc772f..fa33f8aef86 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -24,7 +24,6 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.index.CorruptIndexException; import org.elasticsearch.ElasticsearchException; -import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.blobstore.BlobMetaData; @@ -39,10 +38,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.env.Environment; import org.elasticsearch.plugins.RepositoryPlugin; -import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.fs.FsRepository; -import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -102,8 +99,6 @@ public class MockRepository extends FsRepository { private final String randomPrefix; - private volatile boolean blockOnInitialization; - private volatile boolean blockOnControlFiles; private volatile boolean blockOnDataFiles; @@ -126,21 +121,12 @@ public class MockRepository extends FsRepository { maximumNumberOfFailures = metadata.settings().getAsLong("max_failure_number", 100L); blockOnControlFiles = metadata.settings().getAsBoolean("block_on_control", false); blockOnDataFiles = metadata.settings().getAsBoolean("block_on_data", false); - blockOnInitialization = metadata.settings().getAsBoolean("block_on_init", false); blockAndFailOnWriteSnapFile = metadata.settings().getAsBoolean("block_on_snap", false); randomPrefix = metadata.settings().get("random", "default"); waitAfterUnblock = metadata.settings().getAsLong("wait_after_unblock", 0L); logger.info("starting mock repository with random prefix {}", randomPrefix); } - @Override - public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData clusterMetadata) { - if (blockOnInitialization) { - blockExecution(); - } - super.initializeSnapshot(snapshotId, indices, clusterMetadata); - } - private static RepositoryMetaData overrideSettings(RepositoryMetaData metadata, Environment environment) { // TODO: use another method of testing not being able to read the test file written by the master... // this is super duper hacky @@ -174,7 +160,6 @@ public class MockRepository extends FsRepository { // Clean blocking flags, so we wouldn't try to block again blockOnDataFiles = false; blockOnControlFiles = false; - blockOnInitialization = false; blockOnWriteIndexFile = false; blockAndFailOnWriteSnapFile = false; this.notifyAll(); @@ -200,7 +185,7 @@ public class MockRepository extends FsRepository { logger.debug("[{}] Blocking execution", metadata.name()); boolean wasBlocked = false; try { - while (blockOnDataFiles || blockOnControlFiles || blockOnInitialization || blockOnWriteIndexFile || + while (blockOnDataFiles || blockOnControlFiles || blockOnWriteIndexFile || blockAndFailOnWriteSnapFile) { blocked = true; this.wait(); diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 0016517e65c..293fc04989f 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -253,11 +253,10 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit public void initializeSnapshot(SnapshotId snapshotId, List indices, MetaData metaData) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } - @Override public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, - Map userMetadata) { + MetaData metaData, Map userMetadata) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java index 8e0f7d04c30..280e4a43445 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java @@ -45,6 +45,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.function.Function; import java.util.function.Supplier; @@ -82,34 +83,54 @@ public final class SourceOnlySnapshotRepository extends FilterRepository { // a _source only snapshot with a plain repository it will be just fine since we already set the // required engine, that the index is read-only and the mapping to a default mapping try { - MetaData.Builder builder = MetaData.builder(metaData); - for (IndexId indexId : indices) { - IndexMetaData index = metaData.index(indexId.getName()); - IndexMetaData.Builder indexMetadataBuilder = IndexMetaData.builder(index); - // for a minimal restore we basically disable indexing on all fields and only create an index - // that is valid from an operational perspective. ie. it will have all metadata fields like version/ - // seqID etc. and an indexed ID field such that we can potentially perform updates on them or delete documents. - ImmutableOpenMap mappings = index.getMappings(); - Iterator> iterator = mappings.iterator(); - while (iterator.hasNext()) { - ObjectObjectCursor next = iterator.next(); - // we don't need to obey any routing here stuff is read-only anyway and get is disabled - final String mapping = "{ \"" + next.key + "\": { \"enabled\": false, \"_meta\": " + next.value.source().string() - + " } }"; - indexMetadataBuilder.putMapping(next.key, mapping); - } - indexMetadataBuilder.settings(Settings.builder().put(index.getSettings()) - .put(SOURCE_ONLY.getKey(), true) - .put("index.blocks.write", true)); // read-only! - indexMetadataBuilder.settingsVersion(1 + indexMetadataBuilder.settingsVersion()); - builder.put(indexMetadataBuilder); - } - super.initializeSnapshot(snapshotId, indices, builder.build()); + super.initializeSnapshot(snapshotId, indices, metadataToSnapshot(indices, metaData)); } catch (IOException ex) { throw new UncheckedIOException(ex); } } + @Override + public SnapshotInfo finalizeSnapshot(SnapshotId snapshotId, List indices, long startTime, String failure, int totalShards, + List shardFailures, long repositoryStateId, boolean includeGlobalState, MetaData metaData, + Map userMetadata) { + // we process the index metadata at snapshot time. This means if somebody tries to restore + // a _source only snapshot with a plain repository it will be just fine since we already set the + // required engine, that the index is read-only and the mapping to a default mapping + try { + return super.finalizeSnapshot(snapshotId, indices, startTime, failure, totalShards, shardFailures, repositoryStateId, + includeGlobalState, metadataToSnapshot(indices, metaData), userMetadata); + } catch (IOException ex) { + throw new UncheckedIOException(ex); + } + } + + private static MetaData metadataToSnapshot(List indices, MetaData metaData) throws IOException { + MetaData.Builder builder = MetaData.builder(metaData); + for (IndexId indexId : indices) { + IndexMetaData index = metaData.index(indexId.getName()); + IndexMetaData.Builder indexMetadataBuilder = IndexMetaData.builder(index); + // for a minimal restore we basically disable indexing on all fields and only create an index + // that is valid from an operational perspective. ie. it will have all metadata fields like version/ + // seqID etc. and an indexed ID field such that we can potentially perform updates on them or delete documents. + ImmutableOpenMap mappings = index.getMappings(); + Iterator> iterator = mappings.iterator(); + while (iterator.hasNext()) { + ObjectObjectCursor next = iterator.next(); + // we don't need to obey any routing here stuff is read-only anyway and get is disabled + final String mapping = "{ \"" + next.key + "\": { \"enabled\": false, \"_meta\": " + next.value.source().string() + + " } }"; + indexMetadataBuilder.putMapping(next.key, mapping); + } + indexMetadataBuilder.settings(Settings.builder().put(index.getSettings()) + .put(SOURCE_ONLY.getKey(), true) + .put("index.blocks.write", true)); // read-only! + indexMetadataBuilder.settingsVersion(1 + indexMetadataBuilder.settingsVersion()); + builder.put(indexMetadataBuilder); + } + return builder.build(); + } + + @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, ActionListener listener) { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index b875f76ac59..34acf179c3f 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -65,7 +65,7 @@ import org.hamcrest.Matchers; import java.io.IOException; import java.nio.file.Path; -import java.util.Arrays; +import java.util.Collections; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -200,14 +200,15 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { repository.start(); try (Engine.IndexCommitRef snapshotRef = shard.acquireLastIndexCommit(true)) { IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(); + final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> { - repository.initializeSnapshot(snapshotId, Arrays.asList(indexId), - MetaData.builder().put(shard.indexSettings() - .getIndexMetaData(), false).build()); - final PlainActionFuture future = PlainActionFuture.newFuture(); repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(), indexShardSnapshotStatus, future); future.actionGet(); + repository.finalizeSnapshot(snapshotId, Collections.singletonList(indexId), + indexShardSnapshotStatus.asCopy().getStartTime(), null, 1, Collections.emptyList(), + repository.getRepositoryData().getGenId(), true, + MetaData.builder().put(shard.indexSettings().getIndexMetaData(), false).build(), Collections.emptyMap()); }); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());