From ac2774c9facc05d6fb2b785ae20a0c307e8fc0bf Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 9 Dec 2019 09:02:57 +0100 Subject: [PATCH] Use Cluster State to Track Repository Generation (#49729) (#49976) Step on the road to #49060. This commit adds the logic to keep track of a repository's generation across repository operations. See changes to package level Javadoc for the concrete changes in the distributed state machine. It updates the write side of new repository generations to be fully consistent via the cluster state. With this change, no `index-N` will be overwritten for the same repository ever. So eventual consistency issues around conflicting updates to the same `index-N` are not a possibility any longer. With this change the read side will still use listing of repository contents instead of relying solely on the cluster state contents. The logic for that will be introduced in #49060. This retains the ability to externally delete the contents of a repository and continue using it afterwards for the time being. In #49060 the use of listing to determine the repository generation will be removed in all cases (except for full-cluster restart) as the last step in this effort. --- .../get/GetRepositoriesResponse.java | 4 +- .../metadata/RepositoriesMetaData.java | 78 +++++++- .../cluster/metadata/RepositoryMetaData.java | 85 ++++++++- .../repositories/RepositoriesService.java | 7 +- .../repositories/RepositoryData.java | 6 + .../blobstore/BlobStoreRepository.java | 171 ++++++++++++++---- .../repositories/blobstore/package-info.java | 46 ++++- .../repositories/fs/FsRepository.java | 2 +- .../BlobStoreRepositoryRestoreTests.java | 2 +- .../blobstore/BlobStoreRepositoryTests.java | 19 +- .../DedicatedClusterSnapshotRestoreIT.java | 7 +- ...epositoriesMetaDataSerializationTests.java | 5 +- .../SharedClusterSnapshotRestoreIT.java | 5 +- .../MockEventuallyConsistentRepository.java | 10 +- ...ckEventuallyConsistentRepositoryTests.java | 9 +- .../blobstore/BlobStoreTestUtil.java | 25 ++- .../snapshots/mockstore/MockRepository.java | 8 + .../SourceOnlySnapshotShardTests.java | 3 +- 18 files changed, 413 insertions(+), 79 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/GetRepositoriesResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/GetRepositoriesResponse.java index ad55f17363d..71215cf6fe8 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/GetRepositoriesResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/get/GetRepositoriesResponse.java @@ -29,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; +import java.util.Collections; import java.util.List; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; @@ -66,7 +67,8 @@ public class GetRepositoriesResponse extends ActionResponse implements ToXConten @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - repositories.toXContent(builder, params); + repositories.toXContent(builder, + new DelegatingMapParams(Collections.singletonMap(RepositoriesMetaData.HIDE_GENERATIONS_PARAM, "true"), params)); builder.endObject(); return builder; } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java index 1c618c1ef88..0abe6864e84 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoriesMetaData.java @@ -24,12 +24,15 @@ import org.elasticsearch.Version; import org.elasticsearch.cluster.AbstractNamedDiffable; import org.elasticsearch.cluster.NamedDiff; import org.elasticsearch.cluster.metadata.MetaData.Custom; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.repositories.RepositoryData; import java.io.IOException; import java.util.ArrayList; @@ -45,6 +48,12 @@ public class RepositoriesMetaData extends AbstractNamedDiffable implemen public static final String TYPE = "repositories"; + /** + * Serialization parameter used to hide the {@link RepositoryMetaData#generation()} and {@link RepositoryMetaData#pendingGeneration()} + * in {@link org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse}. + */ + public static final String HIDE_GENERATIONS_PARAM = "hide_generations"; + private final List repositories; /** @@ -56,6 +65,30 @@ public class RepositoriesMetaData extends AbstractNamedDiffable implemen this.repositories = Collections.unmodifiableList(repositories); } + /** + * Creates a new instance that has the given repository moved to the given {@code safeGeneration} and {@code pendingGeneration}. + * + * @param repoName repository name + * @param safeGeneration new safe generation + * @param pendingGeneration new pending generation + * @return new instance with updated generations + */ + public RepositoriesMetaData withUpdatedGeneration(String repoName, long safeGeneration, long pendingGeneration) { + int indexOfRepo = -1; + for (int i = 0; i < repositories.size(); i++) { + if (repositories.get(i).name().equals(repoName)) { + indexOfRepo = i; + break; + } + } + if (indexOfRepo < 0) { + throw new IllegalArgumentException("Unknown repository [" + repoName + "]"); + } + final List updatedRepos = new ArrayList<>(repositories); + updatedRepos.set(indexOfRepo, new RepositoryMetaData(repositories.get(indexOfRepo), safeGeneration, pendingGeneration)); + return new RepositoriesMetaData(updatedRepos); + } + /** * Returns list of currently registered repositories * @@ -88,7 +121,29 @@ public class RepositoriesMetaData extends AbstractNamedDiffable implemen RepositoriesMetaData that = (RepositoriesMetaData) o; return repositories.equals(that.repositories); + } + /** + * Checks if this instance and the given instance share the same repositories by checking that this instances' repositories and the + * repositories in {@code other} are equal or only differ in their values of {@link RepositoryMetaData#generation()} and + * {@link RepositoryMetaData#pendingGeneration()}. + * + * @param other other repositories metadata + * @return {@code true} iff both instances contain the same repositories apart from differences in generations + */ + public boolean equalsIgnoreGenerations(@Nullable RepositoriesMetaData other) { + if (other == null) { + return false; + } + if (other.repositories.size() != repositories.size()) { + return false; + } + for (int i = 0; i < repositories.size(); i++) { + if (repositories.get(i).equalsIgnoreGenerations(other.repositories.get(i)) == false) { + return false; + } + } + return true; } @Override @@ -143,6 +198,8 @@ public class RepositoriesMetaData extends AbstractNamedDiffable implemen } String type = null; Settings settings = Settings.EMPTY; + long generation = RepositoryData.UNKNOWN_REPO_GEN; + long pendingGeneration = RepositoryData.EMPTY_REPO_GEN; while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { if (token == XContentParser.Token.FIELD_NAME) { String currentFieldName = parser.currentName(); @@ -156,6 +213,16 @@ public class RepositoriesMetaData extends AbstractNamedDiffable implemen throw new ElasticsearchParseException("failed to parse repository [{}], incompatible params", name); } settings = Settings.fromXContent(parser); + } else if ("generation".equals(currentFieldName)) { + if (parser.nextToken() != XContentParser.Token.VALUE_NUMBER) { + throw new ElasticsearchParseException("failed to parse repository [{}], unknown type", name); + } + generation = parser.longValue(); + } else if ("pending_generation".equals(currentFieldName)) { + if (parser.nextToken() != XContentParser.Token.VALUE_NUMBER) { + throw new ElasticsearchParseException("failed to parse repository [{}], unknown type", name); + } + pendingGeneration = parser.longValue(); } else { throw new ElasticsearchParseException("failed to parse repository [{}], unknown field [{}]", name, currentFieldName); @@ -167,7 +234,7 @@ public class RepositoriesMetaData extends AbstractNamedDiffable implemen if (type == null) { throw new ElasticsearchParseException("failed to parse repository [{}], missing repository type", name); } - repository.add(new RepositoryMetaData(name, type, settings)); + repository.add(new RepositoryMetaData(name, type, settings, generation, pendingGeneration)); } else { throw new ElasticsearchParseException("failed to parse repositories"); } @@ -205,6 +272,15 @@ public class RepositoriesMetaData extends AbstractNamedDiffable implemen repository.settings().toXContent(builder, params); builder.endObject(); + if (params.paramAsBoolean(HIDE_GENERATIONS_PARAM, false) == false) { + builder.field("generation", repository.generation()); + builder.field("pending_generation", repository.pendingGeneration()); + } builder.endObject(); } + + @Override + public String toString() { + return Strings.toString(this); + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetaData.java b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetaData.java index 847db915b8b..c57f7028055 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetaData.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/RepositoryMetaData.java @@ -18,20 +18,36 @@ */ package org.elasticsearch.cluster.metadata; +import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.repositories.RepositoryData; import java.io.IOException; +import java.util.Objects; /** * Metadata about registered repository */ public class RepositoryMetaData { + + public static final Version REPO_GEN_IN_CS_VERSION = Version.V_7_6_0; + private final String name; private final String type; private final Settings settings; + /** + * Safe repository generation. + */ + private final long generation; + + /** + * Pending repository generation. + */ + private final long pendingGeneration; + /** * Constructs new repository metadata * @@ -40,9 +56,21 @@ public class RepositoryMetaData { * @param settings repository settings */ public RepositoryMetaData(String name, String type, Settings settings) { + this(name, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.EMPTY_REPO_GEN); + } + + public RepositoryMetaData(RepositoryMetaData metaData, long generation, long pendingGeneration) { + this(metaData.name, metaData.type, metaData.settings, generation, pendingGeneration); + } + + public RepositoryMetaData(String name, String type, Settings settings, long generation, long pendingGeneration) { this.name = name; this.type = type; this.settings = settings; + this.generation = generation; + this.pendingGeneration = pendingGeneration; + assert generation <= pendingGeneration : + "Pending generation [" + pendingGeneration + "] must be greater or equal to generation [" + generation + "]"; } /** @@ -72,11 +100,41 @@ public class RepositoryMetaData { return this.settings; } + /** + * Returns the safe repository generation. {@link RepositoryData} for this generation is assumed to exist in the repository. + * All operations on the repository must be based on the {@link RepositoryData} at this generation. + * See package level documentation for the blob store based repositories {@link org.elasticsearch.repositories.blobstore} for details + * on how this value is used during snapshots. + * @return safe repository generation + */ + public long generation() { + return generation; + } + + /** + * Returns the pending repository generation. {@link RepositoryData} for this generation and all generations down to the safe + * generation {@link #generation} may exist in the repository and should not be reused for writing new {@link RepositoryData} to the + * repository. + * See package level documentation for the blob store based repositories {@link org.elasticsearch.repositories.blobstore} for details + * on how this value is used during snapshots. + * + * @return highest pending repository generation + */ + public long pendingGeneration() { + return pendingGeneration; + } public RepositoryMetaData(StreamInput in) throws IOException { name = in.readString(); type = in.readString(); settings = Settings.readSettingsFromStream(in); + if (in.getVersion().onOrAfter(REPO_GEN_IN_CS_VERSION)) { + generation = in.readLong(); + pendingGeneration = in.readLong(); + } else { + generation = RepositoryData.UNKNOWN_REPO_GEN; + pendingGeneration = RepositoryData.EMPTY_REPO_GEN; + } } /** @@ -88,6 +146,20 @@ public class RepositoryMetaData { out.writeString(name); out.writeString(type); Settings.writeSettingsToStream(settings, out); + if (out.getVersion().onOrAfter(REPO_GEN_IN_CS_VERSION)) { + out.writeLong(generation); + out.writeLong(pendingGeneration); + } + } + + /** + * Checks if this instance is equal to the other instance in all fields other than {@link #generation} and {@link #pendingGeneration}. + * + * @param other other repository metadata + * @return {@code true} if both instances equal in all fields but the generation fields + */ + public boolean equalsIgnoreGenerations(RepositoryMetaData other) { + return name.equals(other.name) && type.equals(other.type()) && settings.equals(other.settings()); } @Override @@ -99,15 +171,18 @@ public class RepositoryMetaData { if (!name.equals(that.name)) return false; if (!type.equals(that.type)) return false; + if (generation != that.generation) return false; + if (pendingGeneration != that.pendingGeneration) return false; return settings.equals(that.settings); - } @Override public int hashCode() { - int result = name.hashCode(); - result = 31 * result + type.hashCode(); - result = 31 * result + settings.hashCode(); - return result; + return Objects.hash(name, type, settings, generation, pendingGeneration); + } + + @Override + public String toString() { + return "RepositoryMetaData{" + name + "}{" + type + "}{" + settings + "}{" + generation + "}{" + pendingGeneration + "}"; } } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java index 77dc4b9caaa..bab9b2a78d5 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoriesService.java @@ -150,7 +150,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C for (RepositoryMetaData repositoryMetaData : repositories.repositories()) { if (repositoryMetaData.name().equals(newRepositoryMetaData.name())) { - if (newRepositoryMetaData.equals(repositoryMetaData)) { + if (newRepositoryMetaData.equalsIgnoreGenerations(repositoryMetaData)) { // Previous version is the same as this one no update is needed. return currentState; } @@ -292,7 +292,10 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C RepositoriesMetaData newMetaData = state.getMetaData().custom(RepositoriesMetaData.TYPE); // Check if repositories got changed - if ((oldMetaData == null && newMetaData == null) || (oldMetaData != null && oldMetaData.equals(newMetaData))) { + if ((oldMetaData == null && newMetaData == null) || (oldMetaData != null && oldMetaData.equalsIgnoreGenerations(newMetaData))) { + for (Repository repo : repositories.values()) { + repo.updateState(state); + } return; } diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java index 20dcdc23718..357268fa051 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java @@ -52,6 +52,12 @@ public final class RepositoryData { * The generation value indicating the repository has no index generational files. */ public static final long EMPTY_REPO_GEN = -1L; + + /** + * The generation value indicating that the repository generation is unknown. + */ + public static final long UNKNOWN_REPO_GEN = -2L; + /** * An instance initialized for an empty repository. */ diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 0732b182125..498b5bdbb49 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -36,11 +36,13 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.StepListener; import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.RepositoryCleanupInProgress; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.RepositoriesMetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; @@ -124,6 +126,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.LongStream; import java.util.stream.Stream; import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; @@ -140,7 +143,7 @@ import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSna public abstract class BlobStoreRepository extends AbstractLifecycleComponent implements Repository { private static final Logger logger = LogManager.getLogger(BlobStoreRepository.class); - protected final RepositoryMetaData metadata; + protected volatile RepositoryMetaData metadata; protected final NamedXContentRegistry namedXContentRegistry; @@ -204,6 +207,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp private final SetOnce blobStore = new SetOnce<>(); + private final ClusterService clusterService; + /** * Constructs new BlobStoreRepository * @param metadata The metadata for this repository including name and settings @@ -218,6 +223,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp this.metadata = metadata; this.namedXContentRegistry = namedXContentRegistry; this.threadPool = clusterService.getClusterApplierService().threadPool(); + this.clusterService = clusterService; snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB)); readOnly = metadata.settings().getAsBoolean("readonly", false); @@ -286,7 +292,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp bestGenerationFromCS = bestGeneration(cleanupInProgress.entries()); } - final long finalBestGen = bestGenerationFromCS; + metadata = getRepoMetaData(state); + final long finalBestGen = Math.max(bestGenerationFromCS, metadata.generation()); latestKnownRepoGen.updateAndGet(known -> Math.max(known, finalBestGen)); } @@ -980,8 +987,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp // Tracks the latest known repository generation in a best-effort way to detect inconsistent listing of root level index-N blobs // and concurrent modifications. - // Protected for use in MockEventuallyConsistentRepository - protected final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN); + private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN); @Override public void getRepositoryData(ActionListener listener) { @@ -1047,38 +1053,92 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } /** + * Writing a new index generation is a three step process. + * First, the {@link RepositoryMetaData} entry for this repository is set into a pending state by incrementing its + * pending generation {@code P} while its safe generation {@code N} remains unchanged. + * Second, the updated {@link RepositoryData} is written to generation {@code P + 1}. + * Lastly, the {@link RepositoryMetaData} entry for this repository is updated to the new generation {@code P + 1} and thus + * pending and safe generation are set to the same value marking the end of the update of the repository data. + * * @param repositoryData RepositoryData to write * @param expectedGen expected repository generation at the start of the operation * @param writeShardGens whether to write {@link ShardGenerations} to the new {@link RepositoryData} blob * @param listener completion listener */ protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, boolean writeShardGens, ActionListener listener) { - ActionListener.completeWith(listener, () -> { - assert isReadOnly() == false; // can not write to a read only repository - final long currentGen = repositoryData.getGenId(); - if (currentGen != expectedGen) { - // the index file was updated by a concurrent operation, so we were operating on stale - // repository data - throw new RepositoryException(metadata.name(), - "concurrent modification of the index-N file, expected current generation [" + expectedGen + - "], actual current generation [" + currentGen + "] - possibly due to simultaneous snapshot deletion requests"); - } - final long newGen = currentGen + 1; + assert isReadOnly() == false; // can not write to a read only repository + final long currentGen = repositoryData.getGenId(); + if (currentGen != expectedGen) { + // the index file was updated by a concurrent operation, so we were operating on stale + // repository data + listener.onFailure(new RepositoryException(metadata.name(), + "concurrent modification of the index-N file, expected current generation [" + expectedGen + + "], actual current generation [" + currentGen + "]")); + return; + } + + // Step 1: Set repository generation state to the next possible pending generation + final StepListener setPendingStep = new StepListener<>(); + clusterService.submitStateUpdateTask("set pending repository generation [" + metadata.name() + "][" + expectedGen + "]", + new ClusterStateUpdateTask() { + + private long newGen; + + @Override + public ClusterState execute(ClusterState currentState) { + final RepositoryMetaData meta = getRepoMetaData(currentState); + final String repoName = metadata.name(); + final long genInState = meta.generation(); + // TODO: Remove all usages of this variable, instead initialize the generation when loading RepositoryData + final boolean uninitializedMeta = meta.generation() == RepositoryData.UNKNOWN_REPO_GEN; + if (uninitializedMeta == false && meta.pendingGeneration() != genInState) { + logger.info("Trying to write new repository data over unfinished write, repo [{}] is at " + + "safe generation [{}] and pending generation [{}]", meta.name(), genInState, meta.pendingGeneration()); + } + assert expectedGen == RepositoryData.EMPTY_REPO_GEN || RepositoryData.UNKNOWN_REPO_GEN == meta.generation() + || expectedGen == meta.generation() : + "Expected non-empty generation [" + expectedGen + "] does not match generation tracked in [" + meta + "]"; + // If we run into the empty repo generation for the expected gen, the repo is assumed to have been cleared of + // all contents by an external process so we reset the safe generation to the empty generation. + final long safeGeneration = expectedGen == RepositoryData.EMPTY_REPO_GEN ? RepositoryData.EMPTY_REPO_GEN + : (uninitializedMeta ? expectedGen : genInState); + // Regardless of whether or not the safe generation has been reset, the pending generation always increments so that + // even if a repository has been manually cleared of all contents we will never reuse the same repository generation. + // This is motivated by the consistency behavior the S3 based blob repository implementation has to support which does + // not offer any consistency guarantees when it comes to overwriting the same blob name with different content. + newGen = uninitializedMeta ? expectedGen + 1: metadata.pendingGeneration() + 1; + assert newGen > latestKnownRepoGen.get() : "Attempted new generation [" + newGen + + "] must be larger than latest known generation [" + latestKnownRepoGen.get() + "]"; + return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(RepositoriesMetaData.TYPE, + currentState.metaData().custom(RepositoriesMetaData.TYPE).withUpdatedGeneration( + repoName, safeGeneration, newGen)).build()).build(); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure( + new RepositoryException(metadata.name(), "Failed to execute cluster state update [" + source + "]", e)); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + setPendingStep.onResponse(newGen); + } + }); + + // Step 2: Write new index-N blob to repository and update index.latest + setPendingStep.whenComplete(newGen -> threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> { if (latestKnownRepoGen.get() >= newGen) { throw new IllegalArgumentException( - "Tried writing generation [" + newGen + "] but repository is at least at generation [" + newGen + "] already"); + "Tried writing generation [" + newGen + "] but repository is at least at generation [" + latestKnownRepoGen.get() + + "] already"); } // write the index file final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen); logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob); writeAtomic(indexBlob, BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true); - final long latestKnownGen = latestKnownRepoGen.updateAndGet(known -> Math.max(known, newGen)); - if (newGen < latestKnownGen) { - // Don't mess up the index.latest blob - throw new IllegalStateException( - "Wrote generation [" + newGen + "] but latest known repo gen concurrently changed to [" + latestKnownGen + "]"); - } // write the current generation to the index-latest file final BytesReference genBytes; try (BytesStreamOutput bStream = new BytesStreamOutput()) { @@ -1086,18 +1146,63 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp genBytes = bStream.bytes(); } logger.debug("Repository [{}] updating index.latest with generation [{}]", metadata.name(), newGen); + writeAtomic(INDEX_LATEST_BLOB, genBytes, false); - // delete the N-2 index file if it exists, keep the previous one around as a backup - if (newGen - 2 >= 0) { - final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(newGen - 2); - try { - blobContainer().deleteBlobIgnoringIfNotExists(oldSnapshotIndexFile); - } catch (IOException e) { - logger.warn("Failed to clean up old index blob [{}]", oldSnapshotIndexFile); - } - } - return null; - }); + + // Step 3: Update CS to reflect new repository generation. + clusterService.submitStateUpdateTask("set safe repository generation [" + metadata.name() + "][" + newGen + "]", + new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + final RepositoryMetaData meta = getRepoMetaData(currentState); + if (meta.generation() != expectedGen) { + throw new IllegalStateException("Tried to update repo generation to [" + newGen + + "] but saw unexpected generation in state [" + meta + "]"); + } + if (meta.pendingGeneration() != newGen) { + throw new IllegalStateException( + "Tried to update from unexpected pending repo generation [" + meta.pendingGeneration() + + "] after write to generation [" + newGen + "]"); + } + return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.getMetaData()) + .putCustom(RepositoriesMetaData.TYPE, + currentState.metaData().custom(RepositoriesMetaData.TYPE).withUpdatedGeneration( + metadata.name(), newGen, newGen)).build()).build(); + } + + @Override + public void onFailure(String source, Exception e) { + l.onFailure( + new RepositoryException(metadata.name(), "Failed to execute cluster state update [" + source + "]", e)); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(l, () -> { + // Delete all now outdated index files up to 1000 blobs back from the new generation. + // If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them. + // Deleting one older than the current expectedGen is done for BwC reasons as older versions used to keep + // two index-N blobs around. + final List oldIndexN = LongStream.range( + Math.max(Math.max(expectedGen - 1, 0), newGen - 1000), newGen) + .mapToObj(gen -> INDEX_FILE_PREFIX + gen) + .collect(Collectors.toList()); + try { + blobContainer().deleteBlobsIgnoringIfNotExists(oldIndexN); + } catch (IOException e) { + logger.warn("Failed to clean up old index blobs {}", oldIndexN); + } + })); + } + }); + })), listener::onFailure); + } + + private RepositoryMetaData getRepoMetaData(ClusterState state) { + final RepositoryMetaData metaData = + state.getMetaData().custom(RepositoriesMetaData.TYPE).repository(metadata.name()); + assert metaData != null; + return metaData; } /** diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java index 0b72670a9bc..14cda93edac 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/package-info.java @@ -96,6 +96,9 @@ *
    *
  1. The blobstore repository stores the {@code RepositoryData} in blobs named with incrementing suffix {@code N} at {@code /index-N} * directly under the repository's root.
  2. + *
  3. For each {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository} an entry of type + * {@link org.elasticsearch.cluster.metadata.RepositoryMetaData} exists in the cluster state. It tracks the current valid + * generation {@code N} as well as the latest generation that a write was attempted for.
  4. *
  5. The blobstore also stores the most recent {@code N} as a 64bit long in the blob {@code /index.latest} directly under the * repository's root.
  6. *
@@ -116,6 +119,38 @@ * * * + * + *

Writing Updated RepositoryData to the Repository

+ * + *

Writing an updated {@link org.elasticsearch.repositories.RepositoryData} to a blob store repository is an operation that uses + * the cluster state to ensure that a specific {@code index-N} blob is never accidentally overwritten in a master failover scenario. + * The specific steps to writing a new {@code index-N} blob and thus making changes from a snapshot-create or delete operation visible + * to read operations on the repository are as follows and all run on the master node:

+ * + *
    + *
  1. Write an updated value of {@link org.elasticsearch.cluster.metadata.RepositoryMetaData} for the repository that has the same + * {@link org.elasticsearch.cluster.metadata.RepositoryMetaData#generation()} as the existing entry and has a value of + * {@link org.elasticsearch.cluster.metadata.RepositoryMetaData#pendingGeneration()} one greater than the {@code pendingGeneration} of the + * existing entry.
  2. + *
  3. On the same master node, after the cluster state has been updated in the first step, write the new {@code index-N} blob and + * also update the contents of the {@code index.latest} blob. Note that updating the index.latest blob is done on a best effort + * basis and that there is a chance for a stuck master-node to overwrite the contents of the {@code index.latest} blob after a newer + * {@code index-N} has been written by another master node. This is acceptable since the contents of {@code index.latest} are not used + * during normal operation of the repository and must only be correct for purposes of mounting the contents of a + * {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository} as a read-only url repository.
  4. + *
  5. After the write has finished, set the value of {@code RepositoriesState.State#generation} to the value used for + * {@code RepositoriesState.State#pendingGeneration} so that the new entry for the state of the repository has {@code generation} and + * {@code pendingGeneration} set to the same value to signalize a clean repository state with no potentially failed writes newer than the + * last valid {@code index-N} blob in the repository.
  6. + *
+ * + *

If either of the last two steps in the above fails or master fails over to a new node at any point, then a subsequent operation + * trying to write a new {@code index-N} blob will never use the same value of {@code N} used by a previous attempt. It will always start + * over at the first of the above three steps, incrementing the {@code pendingGeneration} generation before attempting a write, thus + * ensuring no overwriting of a {@code index-N} blob ever to occur. The use of the cluster state to track the latest repository generation + * {@code N} and ensuring no overwriting of {@code index-N} blobs to ever occur allows the blob store repository to properly function even + * on blob stores with neither a consistent list operation nor an atomic "write but not overwrite" operation.

+ * *

Creating a Snapshot

* *

Creating a snapshot in the repository happens in the three steps described in detail below.

@@ -174,11 +209,7 @@ * {@code /indices/${index-snapshot-uuid}/meta-${snapshot-uuid}.dat} *
  • Write the {@link org.elasticsearch.snapshots.SnapshotInfo} blob for the given snapshot to the key {@code /snap-${snapshot-uuid}.dat} * directly under the repository root.
  • - *
  • Write an updated {@code RepositoryData} blob to the key {@code /index-${N+1}} using the {@code N} determined when initializing the - * snapshot in the first step. When doing this, the implementation checks that the blob for generation {@code N + 1} has not yet been - * written to prevent concurrent updates to the repository. If the blob for {@code N + 1} already exists the execution of finalization - * stops under the assumption that a master failover occurred and the snapshot has already been finalized by the new master.
  • - *
  • Write the updated {@code /index.latest} blob containing the new repository generation {@code N + 1}.
  • + *
  • Write an updated {@code RepositoryData} blob containing the new snapshot.
  • * * *

    Deleting a Snapshot

    @@ -203,9 +234,8 @@ * blob so that it can be deleted at the end of the snapshot delete process. * * - *
  • Write an updated {@code RepositoryData} blob with the deleted snapshot removed to key {@code /index-${N+1}} directly under the - * repository root and the repository generations that were changed in the affected shards adjusted.
  • - *
  • Write an updated {@code index.latest} blob containing {@code N + 1}.
  • + *
  • Write an updated {@code RepositoryData} blob with the deleted snapshot removed and containing the updated repository generations + * that changed for the shards affected by the delete.
  • *
  • Delete the global {@code MetaData} blob {@code meta-${snapshot-uuid}.dat} stored directly under the repository root for the snapshot * as well as the {@code SnapshotInfo} blob at {@code /snap-${snapshot-uuid}.dat}.
  • *
  • Delete all unreferenced blobs previously collected when updating the shard directories. Also, remove any index folders or blobs diff --git a/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java b/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java index 77181b872b1..2a0790ffb9e 100644 --- a/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/fs/FsRepository.java @@ -114,7 +114,7 @@ public class FsRepository extends BlobStoreRepository { @Override protected BlobStore createBlobStore() throws Exception { - final String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings()); + final String location = REPOSITORIES_LOCATION_SETTING.get(getMetadata().settings()); final Path locationFile = environment.resolveRepoFile(location); return new FsBlobStore(environment.settings(), locationFile, isReadOnly()); } diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index b5d99db0a88..432091b81e1 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -194,7 +194,7 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings); final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), - BlobStoreTestUtil.mockClusterService()) { + BlobStoreTestUtil.mockClusterService(repositoryMetaData)) { @Override protected void assertSnapshotOrGenericThread() { // eliminate thread name check as we create repo manually diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index 84d0121fcc2..6cfb4ee7d0e 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -138,7 +138,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception { final BlobStoreRepository repository = setupRepo(); - + final long pendingGeneration = repository.metadata.pendingGeneration(); // write to and read from a index file with no entries assertThat(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getSnapshotIds().size(), equalTo(0)); final RepositoryData emptyData = RepositoryData.EMPTY; @@ -147,7 +147,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { assertEquals(repoData, emptyData); assertEquals(repoData.getIndices().size(), 0); assertEquals(repoData.getSnapshotIds().size(), 0); - assertEquals(0L, repoData.getGenId()); + assertEquals(pendingGeneration + 1L, repoData.getGenId()); // write to and read from an index file with snapshots but no indices repoData = addRandomSnapshotsToRepoData(repoData, false); @@ -164,27 +164,30 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { final BlobStoreRepository repository = setupRepo(); assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), RepositoryData.EMPTY); + final long pendingGeneration = repository.metadata.pendingGeneration(); + // write to index generational file RepositoryData repositoryData = generateRandomRepoData(); writeIndexGen(repository, repositoryData, RepositoryData.EMPTY_REPO_GEN); assertThat(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), equalTo(repositoryData)); - assertThat(repository.latestIndexBlobId(), equalTo(0L)); - assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(0L)); + final long expectedGeneration = pendingGeneration + 1L; + assertThat(repository.latestIndexBlobId(), equalTo(expectedGeneration)); + assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(expectedGeneration)); // adding more and writing to a new index generational file repositoryData = addRandomSnapshotsToRepoData(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), true); writeIndexGen(repository, repositoryData, repositoryData.getGenId()); assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), repositoryData); - assertThat(repository.latestIndexBlobId(), equalTo(1L)); - assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(1L)); + assertThat(repository.latestIndexBlobId(), equalTo(expectedGeneration + 1L)); + assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(expectedGeneration + 1L)); // removing a snapshot and writing to a new index generational file repositoryData = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).removeSnapshot( repositoryData.getSnapshotIds().iterator().next(), ShardGenerations.EMPTY); writeIndexGen(repository, repositoryData, repositoryData.getGenId()); assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), repositoryData); - assertThat(repository.latestIndexBlobId(), equalTo(2L)); - assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(2L)); + assertThat(repository.latestIndexBlobId(), equalTo(expectedGeneration + 2L)); + assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(expectedGeneration + 2L)); } public void testRepositoryDataConcurrentModificationNotAllowed() { diff --git a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 9634092cde2..da9a960991b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -499,11 +499,10 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest logger.info("--> Go through a loop of creating and deleting a snapshot to trigger repository cleanup"); client().admin().cluster().prepareCleanupRepository("test-repo").get(); - // Subtract four files that will remain in the repository: + // Expect two files to remain in the repository: // (1) index-(N+1) - // (2) index-N (because we keep the previous version) and - // (3) index-latest - assertFileCount(repo, 3); + // (2) index-latest + assertFileCount(repo, 2); logger.info("--> done"); } diff --git a/server/src/test/java/org/elasticsearch/snapshots/RepositoriesMetaDataSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/RepositoriesMetaDataSerializationTests.java index 17ae1def235..c7c97077fe9 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/RepositoriesMetaDataSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/RepositoriesMetaDataSerializationTests.java @@ -42,7 +42,10 @@ public class RepositoriesMetaDataSerializationTests extends AbstractDiffableSeri int numberOfRepositories = randomInt(10); List entries = new ArrayList<>(); for (int i = 0; i < numberOfRepositories; i++) { - entries.add(new RepositoryMetaData(randomAlphaOfLength(10), randomAlphaOfLength(10), randomSettings())); + // divide by 2 to not overflow when adding to this number for the pending generation below + final long generation = randomNonNegativeLong() / 2L; + entries.add(new RepositoryMetaData(randomAlphaOfLength(10), randomAlphaOfLength(10), randomSettings(), generation, + generation + randomLongBetween(0, generation))); } entries.sort(Comparator.comparing(RepositoryMetaData::name)); return new RepositoriesMetaData(entries); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 12f480bd3a3..8026c961969 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -1321,9 +1321,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas logger.info("--> delete the last snapshot"); client.admin().cluster().prepareDeleteSnapshot("test-repo", lastSnapshot).get(); - logger.info("--> make sure that number of files is back to what it was when the first snapshot was made, " + - "plus one because one backup index-N file should remain"); - assertFileCount(repo, numberOfFiles[0] + 1); + logger.info("--> make sure that number of files is back to what it was when the first snapshot was made"); + assertFileCount(repo, numberOfFiles[0]); } public void testDeleteSnapshotWithMissingIndexAndShardMetadata() throws Exception { diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java index d8702d5453f..9b26a87554d 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepository.java @@ -287,13 +287,13 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository { .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); } - // Randomly filter out the latest /index-N blob from a listing to test that tracking of it in latestKnownRepoGen - // overrides an inconsistent listing + // Randomly filter out the index-N blobs from a listing to test that tracking of it in latestKnownRepoGen and the cluster state + // ensures consistent repository operations private Map maybeMissLatestIndexN(Map listing) { - // Only filter out latest index-N at the repo root and only as long as we're not in a forced consistent state - if (path.parent() == null && context.consistent == false && random.nextBoolean()) { + // Randomly filter out index-N blobs at the repo root to proof that we don't need them to be consistently listed + if (path.parent() == null && context.consistent == false) { final Map filtered = new HashMap<>(listing); - filtered.remove(BlobStoreRepository.INDEX_FILE_PREFIX + latestKnownRepoGen.get()); + filtered.keySet().removeIf(b -> b.startsWith(BlobStoreRepository.INDEX_FILE_PREFIX) && random.nextBoolean()); return Collections.unmodifiableMap(filtered); } return listing; diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index f1cf314e315..e4e6d99c6e6 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.snapshots.mockstore; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.blobstore.BlobContainer; import org.elasticsearch.common.settings.Settings; @@ -134,9 +135,11 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase { public void testOverwriteSnapshotInfoBlob() { MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context(); - try (BlobStoreRepository repository = new MockEventuallyConsistentRepository( - new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY), - xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) { + final RepositoryMetaData metaData = new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY); + final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(metaData); + try (BlobStoreRepository repository = + new MockEventuallyConsistentRepository(metaData, xContentRegistry(), clusterService, blobStoreContext, random())) { + clusterService.addStateApplier(event -> repository.updateState(event.state())); repository.start(); // We create a snap- blob for snapshot "foo" in the first generation diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java index 66c49db542d..12130a1dd33 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java @@ -26,6 +26,9 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.metadata.RepositoriesMetaData; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; @@ -292,11 +295,29 @@ public final class BlobStoreTestUtil { /** * Creates a mocked {@link ClusterService} for use in {@link BlobStoreRepository} related tests that mocks out all the necessary - * functionality to make {@link BlobStoreRepository} work. + * functionality to make {@link BlobStoreRepository} work. Initializes the cluster state as {@link ClusterState#EMPTY_STATE}. * * @return Mock ClusterService */ public static ClusterService mockClusterService() { + return mockClusterService(ClusterState.EMPTY_STATE); + } + + /** + * Creates a mocked {@link ClusterService} for use in {@link BlobStoreRepository} related tests that mocks out all the necessary + * functionality to make {@link BlobStoreRepository} work. Initializes the cluster state with a {@link RepositoriesMetaData} instance + * that contains the given {@code metadata}. + * + * @param metaData RepositoryMetaData to initialize the cluster state with + * @return Mock ClusterService + */ + public static ClusterService mockClusterService(RepositoryMetaData metaData) { + return mockClusterService(ClusterState.builder(ClusterState.EMPTY_STATE).metaData( + MetaData.builder().putCustom(RepositoriesMetaData.TYPE, + new RepositoriesMetaData(Collections.singletonList(metaData))).build()).build()); + } + + private static ClusterService mockClusterService(ClusterState initialState) { final ThreadPool threadPool = mock(ThreadPool.class); when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService()); when(threadPool.generic()).thenReturn(new SameThreadExecutorService()); @@ -305,7 +326,7 @@ public final class BlobStoreTestUtil { final ClusterService clusterService = mock(ClusterService.class); final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class); when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService); - final AtomicReference currentState = new AtomicReference<>(ClusterState.EMPTY_STATE); + final AtomicReference currentState = new AtomicReference<>(initialState); when(clusterService.state()).then(invocationOnMock -> currentState.get()); final List appliers = new CopyOnWriteArrayList<>(); doAnswer(invocation -> { diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 218c6f4eeca..6c05cc625f5 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -100,6 +100,8 @@ public class MockRepository extends FsRepository { private final String randomPrefix; + private final Environment env; + private volatile boolean blockOnControlFiles; private volatile boolean blockOnDataFiles; @@ -125,9 +127,15 @@ public class MockRepository extends FsRepository { blockAndFailOnWriteSnapFile = metadata.settings().getAsBoolean("block_on_snap", false); randomPrefix = metadata.settings().get("random", "default"); waitAfterUnblock = metadata.settings().getAsLong("wait_after_unblock", 0L); + env = environment; logger.info("starting mock repository with random prefix {}", randomPrefix); } + @Override + public RepositoryMetaData getMetadata() { + return overrideSettings(super.getMetadata(), env); + } + private static RepositoryMetaData overrideSettings(RepositoryMetaData metadata, Environment environment) { // TODO: use another method of testing not being able to read the test file written by the master... // this is super duper hacky diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index 66cc842b23d..14a29a33f4f 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -353,7 +353,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { private Repository createRepository() { Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build(); RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings); - return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), BlobStoreTestUtil.mockClusterService()); + return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), + BlobStoreTestUtil.mockClusterService(repositoryMetaData)); } private static void runAsSnapshot(ThreadPool pool, Runnable runnable) {