Step on the road to #49060. This commit adds the logic to keep track of a repository's generation across repository operations. See changes to package level Javadoc for the concrete changes in the distributed state machine. It updates the write side of new repository generations to be fully consistent via the cluster state. With this change, no `index-N` will be overwritten for the same repository ever. So eventual consistency issues around conflicting updates to the same `index-N` are not a possibility any longer. With this change the read side will still use listing of repository contents instead of relying solely on the cluster state contents. The logic for that will be introduced in #49060. This retains the ability to externally delete the contents of a repository and continue using it afterwards for the time being. In #49060 the use of listing to determine the repository generation will be removed in all cases (except for full-cluster restart) as the last step in this effort.
This commit is contained in:
parent
7a2e35caa0
commit
ac2774c9fa
|
@ -29,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
|
|||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
|
||||
|
@ -66,7 +67,8 @@ public class GetRepositoriesResponse extends ActionResponse implements ToXConten
|
|||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject();
|
||||
repositories.toXContent(builder, params);
|
||||
repositories.toXContent(builder,
|
||||
new DelegatingMapParams(Collections.singletonMap(RepositoriesMetaData.HIDE_GENERATIONS_PARAM, "true"), params));
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
|
|
@ -24,12 +24,15 @@ import org.elasticsearch.Version;
|
|||
import org.elasticsearch.cluster.AbstractNamedDiffable;
|
||||
import org.elasticsearch.cluster.NamedDiff;
|
||||
import org.elasticsearch.cluster.metadata.MetaData.Custom;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -45,6 +48,12 @@ public class RepositoriesMetaData extends AbstractNamedDiffable<Custom> implemen
|
|||
|
||||
public static final String TYPE = "repositories";
|
||||
|
||||
/**
|
||||
* Serialization parameter used to hide the {@link RepositoryMetaData#generation()} and {@link RepositoryMetaData#pendingGeneration()}
|
||||
* in {@link org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesResponse}.
|
||||
*/
|
||||
public static final String HIDE_GENERATIONS_PARAM = "hide_generations";
|
||||
|
||||
private final List<RepositoryMetaData> repositories;
|
||||
|
||||
/**
|
||||
|
@ -56,6 +65,30 @@ public class RepositoriesMetaData extends AbstractNamedDiffable<Custom> implemen
|
|||
this.repositories = Collections.unmodifiableList(repositories);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new instance that has the given repository moved to the given {@code safeGeneration} and {@code pendingGeneration}.
|
||||
*
|
||||
* @param repoName repository name
|
||||
* @param safeGeneration new safe generation
|
||||
* @param pendingGeneration new pending generation
|
||||
* @return new instance with updated generations
|
||||
*/
|
||||
public RepositoriesMetaData withUpdatedGeneration(String repoName, long safeGeneration, long pendingGeneration) {
|
||||
int indexOfRepo = -1;
|
||||
for (int i = 0; i < repositories.size(); i++) {
|
||||
if (repositories.get(i).name().equals(repoName)) {
|
||||
indexOfRepo = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (indexOfRepo < 0) {
|
||||
throw new IllegalArgumentException("Unknown repository [" + repoName + "]");
|
||||
}
|
||||
final List<RepositoryMetaData> updatedRepos = new ArrayList<>(repositories);
|
||||
updatedRepos.set(indexOfRepo, new RepositoryMetaData(repositories.get(indexOfRepo), safeGeneration, pendingGeneration));
|
||||
return new RepositoriesMetaData(updatedRepos);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns list of currently registered repositories
|
||||
*
|
||||
|
@ -88,7 +121,29 @@ public class RepositoriesMetaData extends AbstractNamedDiffable<Custom> implemen
|
|||
RepositoriesMetaData that = (RepositoriesMetaData) o;
|
||||
|
||||
return repositories.equals(that.repositories);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if this instance and the given instance share the same repositories by checking that this instances' repositories and the
|
||||
* repositories in {@code other} are equal or only differ in their values of {@link RepositoryMetaData#generation()} and
|
||||
* {@link RepositoryMetaData#pendingGeneration()}.
|
||||
*
|
||||
* @param other other repositories metadata
|
||||
* @return {@code true} iff both instances contain the same repositories apart from differences in generations
|
||||
*/
|
||||
public boolean equalsIgnoreGenerations(@Nullable RepositoriesMetaData other) {
|
||||
if (other == null) {
|
||||
return false;
|
||||
}
|
||||
if (other.repositories.size() != repositories.size()) {
|
||||
return false;
|
||||
}
|
||||
for (int i = 0; i < repositories.size(); i++) {
|
||||
if (repositories.get(i).equalsIgnoreGenerations(other.repositories.get(i)) == false) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -143,6 +198,8 @@ public class RepositoriesMetaData extends AbstractNamedDiffable<Custom> implemen
|
|||
}
|
||||
String type = null;
|
||||
Settings settings = Settings.EMPTY;
|
||||
long generation = RepositoryData.UNKNOWN_REPO_GEN;
|
||||
long pendingGeneration = RepositoryData.EMPTY_REPO_GEN;
|
||||
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
|
||||
if (token == XContentParser.Token.FIELD_NAME) {
|
||||
String currentFieldName = parser.currentName();
|
||||
|
@ -156,6 +213,16 @@ public class RepositoriesMetaData extends AbstractNamedDiffable<Custom> implemen
|
|||
throw new ElasticsearchParseException("failed to parse repository [{}], incompatible params", name);
|
||||
}
|
||||
settings = Settings.fromXContent(parser);
|
||||
} else if ("generation".equals(currentFieldName)) {
|
||||
if (parser.nextToken() != XContentParser.Token.VALUE_NUMBER) {
|
||||
throw new ElasticsearchParseException("failed to parse repository [{}], unknown type", name);
|
||||
}
|
||||
generation = parser.longValue();
|
||||
} else if ("pending_generation".equals(currentFieldName)) {
|
||||
if (parser.nextToken() != XContentParser.Token.VALUE_NUMBER) {
|
||||
throw new ElasticsearchParseException("failed to parse repository [{}], unknown type", name);
|
||||
}
|
||||
pendingGeneration = parser.longValue();
|
||||
} else {
|
||||
throw new ElasticsearchParseException("failed to parse repository [{}], unknown field [{}]",
|
||||
name, currentFieldName);
|
||||
|
@ -167,7 +234,7 @@ public class RepositoriesMetaData extends AbstractNamedDiffable<Custom> implemen
|
|||
if (type == null) {
|
||||
throw new ElasticsearchParseException("failed to parse repository [{}], missing repository type", name);
|
||||
}
|
||||
repository.add(new RepositoryMetaData(name, type, settings));
|
||||
repository.add(new RepositoryMetaData(name, type, settings, generation, pendingGeneration));
|
||||
} else {
|
||||
throw new ElasticsearchParseException("failed to parse repositories");
|
||||
}
|
||||
|
@ -205,6 +272,15 @@ public class RepositoriesMetaData extends AbstractNamedDiffable<Custom> implemen
|
|||
repository.settings().toXContent(builder, params);
|
||||
builder.endObject();
|
||||
|
||||
if (params.paramAsBoolean(HIDE_GENERATIONS_PARAM, false) == false) {
|
||||
builder.field("generation", repository.generation());
|
||||
builder.field("pending_generation", repository.pendingGeneration());
|
||||
}
|
||||
builder.endObject();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Strings.toString(this);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,20 +18,36 @@
|
|||
*/
|
||||
package org.elasticsearch.cluster.metadata;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.repositories.RepositoryData;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Metadata about registered repository
|
||||
*/
|
||||
public class RepositoryMetaData {
|
||||
|
||||
public static final Version REPO_GEN_IN_CS_VERSION = Version.V_7_6_0;
|
||||
|
||||
private final String name;
|
||||
private final String type;
|
||||
private final Settings settings;
|
||||
|
||||
/**
|
||||
* Safe repository generation.
|
||||
*/
|
||||
private final long generation;
|
||||
|
||||
/**
|
||||
* Pending repository generation.
|
||||
*/
|
||||
private final long pendingGeneration;
|
||||
|
||||
/**
|
||||
* Constructs new repository metadata
|
||||
*
|
||||
|
@ -40,9 +56,21 @@ public class RepositoryMetaData {
|
|||
* @param settings repository settings
|
||||
*/
|
||||
public RepositoryMetaData(String name, String type, Settings settings) {
|
||||
this(name, type, settings, RepositoryData.UNKNOWN_REPO_GEN, RepositoryData.EMPTY_REPO_GEN);
|
||||
}
|
||||
|
||||
public RepositoryMetaData(RepositoryMetaData metaData, long generation, long pendingGeneration) {
|
||||
this(metaData.name, metaData.type, metaData.settings, generation, pendingGeneration);
|
||||
}
|
||||
|
||||
public RepositoryMetaData(String name, String type, Settings settings, long generation, long pendingGeneration) {
|
||||
this.name = name;
|
||||
this.type = type;
|
||||
this.settings = settings;
|
||||
this.generation = generation;
|
||||
this.pendingGeneration = pendingGeneration;
|
||||
assert generation <= pendingGeneration :
|
||||
"Pending generation [" + pendingGeneration + "] must be greater or equal to generation [" + generation + "]";
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -72,11 +100,41 @@ public class RepositoryMetaData {
|
|||
return this.settings;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the safe repository generation. {@link RepositoryData} for this generation is assumed to exist in the repository.
|
||||
* All operations on the repository must be based on the {@link RepositoryData} at this generation.
|
||||
* See package level documentation for the blob store based repositories {@link org.elasticsearch.repositories.blobstore} for details
|
||||
* on how this value is used during snapshots.
|
||||
* @return safe repository generation
|
||||
*/
|
||||
public long generation() {
|
||||
return generation;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the pending repository generation. {@link RepositoryData} for this generation and all generations down to the safe
|
||||
* generation {@link #generation} may exist in the repository and should not be reused for writing new {@link RepositoryData} to the
|
||||
* repository.
|
||||
* See package level documentation for the blob store based repositories {@link org.elasticsearch.repositories.blobstore} for details
|
||||
* on how this value is used during snapshots.
|
||||
*
|
||||
* @return highest pending repository generation
|
||||
*/
|
||||
public long pendingGeneration() {
|
||||
return pendingGeneration;
|
||||
}
|
||||
|
||||
public RepositoryMetaData(StreamInput in) throws IOException {
|
||||
name = in.readString();
|
||||
type = in.readString();
|
||||
settings = Settings.readSettingsFromStream(in);
|
||||
if (in.getVersion().onOrAfter(REPO_GEN_IN_CS_VERSION)) {
|
||||
generation = in.readLong();
|
||||
pendingGeneration = in.readLong();
|
||||
} else {
|
||||
generation = RepositoryData.UNKNOWN_REPO_GEN;
|
||||
pendingGeneration = RepositoryData.EMPTY_REPO_GEN;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -88,6 +146,20 @@ public class RepositoryMetaData {
|
|||
out.writeString(name);
|
||||
out.writeString(type);
|
||||
Settings.writeSettingsToStream(settings, out);
|
||||
if (out.getVersion().onOrAfter(REPO_GEN_IN_CS_VERSION)) {
|
||||
out.writeLong(generation);
|
||||
out.writeLong(pendingGeneration);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if this instance is equal to the other instance in all fields other than {@link #generation} and {@link #pendingGeneration}.
|
||||
*
|
||||
* @param other other repository metadata
|
||||
* @return {@code true} if both instances equal in all fields but the generation fields
|
||||
*/
|
||||
public boolean equalsIgnoreGenerations(RepositoryMetaData other) {
|
||||
return name.equals(other.name) && type.equals(other.type()) && settings.equals(other.settings());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -99,15 +171,18 @@ public class RepositoryMetaData {
|
|||
|
||||
if (!name.equals(that.name)) return false;
|
||||
if (!type.equals(that.type)) return false;
|
||||
if (generation != that.generation) return false;
|
||||
if (pendingGeneration != that.pendingGeneration) return false;
|
||||
return settings.equals(that.settings);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
int result = name.hashCode();
|
||||
result = 31 * result + type.hashCode();
|
||||
result = 31 * result + settings.hashCode();
|
||||
return result;
|
||||
return Objects.hash(name, type, settings, generation, pendingGeneration);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "RepositoryMetaData{" + name + "}{" + type + "}{" + settings + "}{" + generation + "}{" + pendingGeneration + "}";
|
||||
}
|
||||
}
|
||||
|
|
|
@ -150,7 +150,7 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
|
|||
|
||||
for (RepositoryMetaData repositoryMetaData : repositories.repositories()) {
|
||||
if (repositoryMetaData.name().equals(newRepositoryMetaData.name())) {
|
||||
if (newRepositoryMetaData.equals(repositoryMetaData)) {
|
||||
if (newRepositoryMetaData.equalsIgnoreGenerations(repositoryMetaData)) {
|
||||
// Previous version is the same as this one no update is needed.
|
||||
return currentState;
|
||||
}
|
||||
|
@ -292,7 +292,10 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
|
|||
RepositoriesMetaData newMetaData = state.getMetaData().custom(RepositoriesMetaData.TYPE);
|
||||
|
||||
// Check if repositories got changed
|
||||
if ((oldMetaData == null && newMetaData == null) || (oldMetaData != null && oldMetaData.equals(newMetaData))) {
|
||||
if ((oldMetaData == null && newMetaData == null) || (oldMetaData != null && oldMetaData.equalsIgnoreGenerations(newMetaData))) {
|
||||
for (Repository repo : repositories.values()) {
|
||||
repo.updateState(state);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -52,6 +52,12 @@ public final class RepositoryData {
|
|||
* The generation value indicating the repository has no index generational files.
|
||||
*/
|
||||
public static final long EMPTY_REPO_GEN = -1L;
|
||||
|
||||
/**
|
||||
* The generation value indicating that the repository generation is unknown.
|
||||
*/
|
||||
public static final long UNKNOWN_REPO_GEN = -2L;
|
||||
|
||||
/**
|
||||
* An instance initialized for an empty repository.
|
||||
*/
|
||||
|
|
|
@ -36,11 +36,13 @@ import org.elasticsearch.action.ActionRunnable;
|
|||
import org.elasticsearch.action.StepListener;
|
||||
import org.elasticsearch.action.support.GroupedActionListener;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
|
||||
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
|
@ -124,6 +126,7 @@ import java.util.concurrent.TimeUnit;
|
|||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.LongStream;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
|
||||
|
@ -140,7 +143,7 @@ import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSna
|
|||
public abstract class BlobStoreRepository extends AbstractLifecycleComponent implements Repository {
|
||||
private static final Logger logger = LogManager.getLogger(BlobStoreRepository.class);
|
||||
|
||||
protected final RepositoryMetaData metadata;
|
||||
protected volatile RepositoryMetaData metadata;
|
||||
|
||||
protected final NamedXContentRegistry namedXContentRegistry;
|
||||
|
||||
|
@ -204,6 +207,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
|
||||
private final SetOnce<BlobStore> blobStore = new SetOnce<>();
|
||||
|
||||
private final ClusterService clusterService;
|
||||
|
||||
/**
|
||||
* Constructs new BlobStoreRepository
|
||||
* @param metadata The metadata for this repository including name and settings
|
||||
|
@ -218,6 +223,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
this.metadata = metadata;
|
||||
this.namedXContentRegistry = namedXContentRegistry;
|
||||
this.threadPool = clusterService.getClusterApplierService().threadPool();
|
||||
this.clusterService = clusterService;
|
||||
snapshotRateLimiter = getRateLimiter(metadata.settings(), "max_snapshot_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
|
||||
restoreRateLimiter = getRateLimiter(metadata.settings(), "max_restore_bytes_per_sec", new ByteSizeValue(40, ByteSizeUnit.MB));
|
||||
readOnly = metadata.settings().getAsBoolean("readonly", false);
|
||||
|
@ -286,7 +292,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
bestGenerationFromCS = bestGeneration(cleanupInProgress.entries());
|
||||
}
|
||||
|
||||
final long finalBestGen = bestGenerationFromCS;
|
||||
metadata = getRepoMetaData(state);
|
||||
final long finalBestGen = Math.max(bestGenerationFromCS, metadata.generation());
|
||||
latestKnownRepoGen.updateAndGet(known -> Math.max(known, finalBestGen));
|
||||
}
|
||||
|
||||
|
@ -980,8 +987,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
|
||||
// Tracks the latest known repository generation in a best-effort way to detect inconsistent listing of root level index-N blobs
|
||||
// and concurrent modifications.
|
||||
// Protected for use in MockEventuallyConsistentRepository
|
||||
protected final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN);
|
||||
private final AtomicLong latestKnownRepoGen = new AtomicLong(RepositoryData.EMPTY_REPO_GEN);
|
||||
|
||||
@Override
|
||||
public void getRepositoryData(ActionListener<RepositoryData> listener) {
|
||||
|
@ -1047,38 +1053,92 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
|
||||
/**
|
||||
* Writing a new index generation is a three step process.
|
||||
* First, the {@link RepositoryMetaData} entry for this repository is set into a pending state by incrementing its
|
||||
* pending generation {@code P} while its safe generation {@code N} remains unchanged.
|
||||
* Second, the updated {@link RepositoryData} is written to generation {@code P + 1}.
|
||||
* Lastly, the {@link RepositoryMetaData} entry for this repository is updated to the new generation {@code P + 1} and thus
|
||||
* pending and safe generation are set to the same value marking the end of the update of the repository data.
|
||||
*
|
||||
* @param repositoryData RepositoryData to write
|
||||
* @param expectedGen expected repository generation at the start of the operation
|
||||
* @param writeShardGens whether to write {@link ShardGenerations} to the new {@link RepositoryData} blob
|
||||
* @param listener completion listener
|
||||
*/
|
||||
protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, boolean writeShardGens, ActionListener<Void> listener) {
|
||||
ActionListener.completeWith(listener, () -> {
|
||||
assert isReadOnly() == false; // can not write to a read only repository
|
||||
final long currentGen = repositoryData.getGenId();
|
||||
if (currentGen != expectedGen) {
|
||||
// the index file was updated by a concurrent operation, so we were operating on stale
|
||||
// repository data
|
||||
throw new RepositoryException(metadata.name(),
|
||||
"concurrent modification of the index-N file, expected current generation [" + expectedGen +
|
||||
"], actual current generation [" + currentGen + "] - possibly due to simultaneous snapshot deletion requests");
|
||||
}
|
||||
final long newGen = currentGen + 1;
|
||||
assert isReadOnly() == false; // can not write to a read only repository
|
||||
final long currentGen = repositoryData.getGenId();
|
||||
if (currentGen != expectedGen) {
|
||||
// the index file was updated by a concurrent operation, so we were operating on stale
|
||||
// repository data
|
||||
listener.onFailure(new RepositoryException(metadata.name(),
|
||||
"concurrent modification of the index-N file, expected current generation [" + expectedGen +
|
||||
"], actual current generation [" + currentGen + "]"));
|
||||
return;
|
||||
}
|
||||
|
||||
// Step 1: Set repository generation state to the next possible pending generation
|
||||
final StepListener<Long> setPendingStep = new StepListener<>();
|
||||
clusterService.submitStateUpdateTask("set pending repository generation [" + metadata.name() + "][" + expectedGen + "]",
|
||||
new ClusterStateUpdateTask() {
|
||||
|
||||
private long newGen;
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
final RepositoryMetaData meta = getRepoMetaData(currentState);
|
||||
final String repoName = metadata.name();
|
||||
final long genInState = meta.generation();
|
||||
// TODO: Remove all usages of this variable, instead initialize the generation when loading RepositoryData
|
||||
final boolean uninitializedMeta = meta.generation() == RepositoryData.UNKNOWN_REPO_GEN;
|
||||
if (uninitializedMeta == false && meta.pendingGeneration() != genInState) {
|
||||
logger.info("Trying to write new repository data over unfinished write, repo [{}] is at " +
|
||||
"safe generation [{}] and pending generation [{}]", meta.name(), genInState, meta.pendingGeneration());
|
||||
}
|
||||
assert expectedGen == RepositoryData.EMPTY_REPO_GEN || RepositoryData.UNKNOWN_REPO_GEN == meta.generation()
|
||||
|| expectedGen == meta.generation() :
|
||||
"Expected non-empty generation [" + expectedGen + "] does not match generation tracked in [" + meta + "]";
|
||||
// If we run into the empty repo generation for the expected gen, the repo is assumed to have been cleared of
|
||||
// all contents by an external process so we reset the safe generation to the empty generation.
|
||||
final long safeGeneration = expectedGen == RepositoryData.EMPTY_REPO_GEN ? RepositoryData.EMPTY_REPO_GEN
|
||||
: (uninitializedMeta ? expectedGen : genInState);
|
||||
// Regardless of whether or not the safe generation has been reset, the pending generation always increments so that
|
||||
// even if a repository has been manually cleared of all contents we will never reuse the same repository generation.
|
||||
// This is motivated by the consistency behavior the S3 based blob repository implementation has to support which does
|
||||
// not offer any consistency guarantees when it comes to overwriting the same blob name with different content.
|
||||
newGen = uninitializedMeta ? expectedGen + 1: metadata.pendingGeneration() + 1;
|
||||
assert newGen > latestKnownRepoGen.get() : "Attempted new generation [" + newGen +
|
||||
"] must be larger than latest known generation [" + latestKnownRepoGen.get() + "]";
|
||||
return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.getMetaData())
|
||||
.putCustom(RepositoriesMetaData.TYPE,
|
||||
currentState.metaData().<RepositoriesMetaData>custom(RepositoriesMetaData.TYPE).withUpdatedGeneration(
|
||||
repoName, safeGeneration, newGen)).build()).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
listener.onFailure(
|
||||
new RepositoryException(metadata.name(), "Failed to execute cluster state update [" + source + "]", e));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
setPendingStep.onResponse(newGen);
|
||||
}
|
||||
});
|
||||
|
||||
// Step 2: Write new index-N blob to repository and update index.latest
|
||||
setPendingStep.whenComplete(newGen -> threadPool().executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
|
||||
if (latestKnownRepoGen.get() >= newGen) {
|
||||
throw new IllegalArgumentException(
|
||||
"Tried writing generation [" + newGen + "] but repository is at least at generation [" + newGen + "] already");
|
||||
"Tried writing generation [" + newGen + "] but repository is at least at generation [" + latestKnownRepoGen.get()
|
||||
+ "] already");
|
||||
}
|
||||
// write the index file
|
||||
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
|
||||
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
|
||||
writeAtomic(indexBlob,
|
||||
BytesReference.bytes(repositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
|
||||
final long latestKnownGen = latestKnownRepoGen.updateAndGet(known -> Math.max(known, newGen));
|
||||
if (newGen < latestKnownGen) {
|
||||
// Don't mess up the index.latest blob
|
||||
throw new IllegalStateException(
|
||||
"Wrote generation [" + newGen + "] but latest known repo gen concurrently changed to [" + latestKnownGen + "]");
|
||||
}
|
||||
// write the current generation to the index-latest file
|
||||
final BytesReference genBytes;
|
||||
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
|
||||
|
@ -1086,18 +1146,63 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
genBytes = bStream.bytes();
|
||||
}
|
||||
logger.debug("Repository [{}] updating index.latest with generation [{}]", metadata.name(), newGen);
|
||||
|
||||
writeAtomic(INDEX_LATEST_BLOB, genBytes, false);
|
||||
// delete the N-2 index file if it exists, keep the previous one around as a backup
|
||||
if (newGen - 2 >= 0) {
|
||||
final String oldSnapshotIndexFile = INDEX_FILE_PREFIX + Long.toString(newGen - 2);
|
||||
try {
|
||||
blobContainer().deleteBlobIgnoringIfNotExists(oldSnapshotIndexFile);
|
||||
} catch (IOException e) {
|
||||
logger.warn("Failed to clean up old index blob [{}]", oldSnapshotIndexFile);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
});
|
||||
|
||||
// Step 3: Update CS to reflect new repository generation.
|
||||
clusterService.submitStateUpdateTask("set safe repository generation [" + metadata.name() + "][" + newGen + "]",
|
||||
new ClusterStateUpdateTask() {
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
final RepositoryMetaData meta = getRepoMetaData(currentState);
|
||||
if (meta.generation() != expectedGen) {
|
||||
throw new IllegalStateException("Tried to update repo generation to [" + newGen
|
||||
+ "] but saw unexpected generation in state [" + meta + "]");
|
||||
}
|
||||
if (meta.pendingGeneration() != newGen) {
|
||||
throw new IllegalStateException(
|
||||
"Tried to update from unexpected pending repo generation [" + meta.pendingGeneration() +
|
||||
"] after write to generation [" + newGen + "]");
|
||||
}
|
||||
return ClusterState.builder(currentState).metaData(MetaData.builder(currentState.getMetaData())
|
||||
.putCustom(RepositoriesMetaData.TYPE,
|
||||
currentState.metaData().<RepositoriesMetaData>custom(RepositoriesMetaData.TYPE).withUpdatedGeneration(
|
||||
metadata.name(), newGen, newGen)).build()).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
l.onFailure(
|
||||
new RepositoryException(metadata.name(), "Failed to execute cluster state update [" + source + "]", e));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.run(l, () -> {
|
||||
// Delete all now outdated index files up to 1000 blobs back from the new generation.
|
||||
// If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.
|
||||
// Deleting one older than the current expectedGen is done for BwC reasons as older versions used to keep
|
||||
// two index-N blobs around.
|
||||
final List<String> oldIndexN = LongStream.range(
|
||||
Math.max(Math.max(expectedGen - 1, 0), newGen - 1000), newGen)
|
||||
.mapToObj(gen -> INDEX_FILE_PREFIX + gen)
|
||||
.collect(Collectors.toList());
|
||||
try {
|
||||
blobContainer().deleteBlobsIgnoringIfNotExists(oldIndexN);
|
||||
} catch (IOException e) {
|
||||
logger.warn("Failed to clean up old index blobs {}", oldIndexN);
|
||||
}
|
||||
}));
|
||||
}
|
||||
});
|
||||
})), listener::onFailure);
|
||||
}
|
||||
|
||||
private RepositoryMetaData getRepoMetaData(ClusterState state) {
|
||||
final RepositoryMetaData metaData =
|
||||
state.getMetaData().<RepositoriesMetaData>custom(RepositoriesMetaData.TYPE).repository(metadata.name());
|
||||
assert metaData != null;
|
||||
return metaData;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -96,6 +96,9 @@
|
|||
* <ol>
|
||||
* <li>The blobstore repository stores the {@code RepositoryData} in blobs named with incrementing suffix {@code N} at {@code /index-N}
|
||||
* directly under the repository's root.</li>
|
||||
* <li>For each {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository} an entry of type
|
||||
* {@link org.elasticsearch.cluster.metadata.RepositoryMetaData} exists in the cluster state. It tracks the current valid
|
||||
* generation {@code N} as well as the latest generation that a write was attempted for.</li>
|
||||
* <li>The blobstore also stores the most recent {@code N} as a 64bit long in the blob {@code /index.latest} directly under the
|
||||
* repository's root.</li>
|
||||
* </ol>
|
||||
|
@ -116,6 +119,38 @@
|
|||
* </ol>
|
||||
* </li>
|
||||
* </ol>
|
||||
*
|
||||
* <h2>Writing Updated RepositoryData to the Repository</h2>
|
||||
*
|
||||
* <p>Writing an updated {@link org.elasticsearch.repositories.RepositoryData} to a blob store repository is an operation that uses
|
||||
* the cluster state to ensure that a specific {@code index-N} blob is never accidentally overwritten in a master failover scenario.
|
||||
* The specific steps to writing a new {@code index-N} blob and thus making changes from a snapshot-create or delete operation visible
|
||||
* to read operations on the repository are as follows and all run on the master node:</p>
|
||||
*
|
||||
* <ol>
|
||||
* <li>Write an updated value of {@link org.elasticsearch.cluster.metadata.RepositoryMetaData} for the repository that has the same
|
||||
* {@link org.elasticsearch.cluster.metadata.RepositoryMetaData#generation()} as the existing entry and has a value of
|
||||
* {@link org.elasticsearch.cluster.metadata.RepositoryMetaData#pendingGeneration()} one greater than the {@code pendingGeneration} of the
|
||||
* existing entry.</li>
|
||||
* <li>On the same master node, after the cluster state has been updated in the first step, write the new {@code index-N} blob and
|
||||
* also update the contents of the {@code index.latest} blob. Note that updating the index.latest blob is done on a best effort
|
||||
* basis and that there is a chance for a stuck master-node to overwrite the contents of the {@code index.latest} blob after a newer
|
||||
* {@code index-N} has been written by another master node. This is acceptable since the contents of {@code index.latest} are not used
|
||||
* during normal operation of the repository and must only be correct for purposes of mounting the contents of a
|
||||
* {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository} as a read-only url repository.</li>
|
||||
* <li>After the write has finished, set the value of {@code RepositoriesState.State#generation} to the value used for
|
||||
* {@code RepositoriesState.State#pendingGeneration} so that the new entry for the state of the repository has {@code generation} and
|
||||
* {@code pendingGeneration} set to the same value to signalize a clean repository state with no potentially failed writes newer than the
|
||||
* last valid {@code index-N} blob in the repository.</li>
|
||||
* </ol>
|
||||
*
|
||||
* <p>If either of the last two steps in the above fails or master fails over to a new node at any point, then a subsequent operation
|
||||
* trying to write a new {@code index-N} blob will never use the same value of {@code N} used by a previous attempt. It will always start
|
||||
* over at the first of the above three steps, incrementing the {@code pendingGeneration} generation before attempting a write, thus
|
||||
* ensuring no overwriting of a {@code index-N} blob ever to occur. The use of the cluster state to track the latest repository generation
|
||||
* {@code N} and ensuring no overwriting of {@code index-N} blobs to ever occur allows the blob store repository to properly function even
|
||||
* on blob stores with neither a consistent list operation nor an atomic "write but not overwrite" operation.</p>
|
||||
*
|
||||
* <h2>Creating a Snapshot</h2>
|
||||
*
|
||||
* <p>Creating a snapshot in the repository happens in the three steps described in detail below.</p>
|
||||
|
@ -174,11 +209,7 @@
|
|||
* {@code /indices/${index-snapshot-uuid}/meta-${snapshot-uuid}.dat}</li>
|
||||
* <li>Write the {@link org.elasticsearch.snapshots.SnapshotInfo} blob for the given snapshot to the key {@code /snap-${snapshot-uuid}.dat}
|
||||
* directly under the repository root.</li>
|
||||
* <li>Write an updated {@code RepositoryData} blob to the key {@code /index-${N+1}} using the {@code N} determined when initializing the
|
||||
* snapshot in the first step. When doing this, the implementation checks that the blob for generation {@code N + 1} has not yet been
|
||||
* written to prevent concurrent updates to the repository. If the blob for {@code N + 1} already exists the execution of finalization
|
||||
* stops under the assumption that a master failover occurred and the snapshot has already been finalized by the new master.</li>
|
||||
* <li>Write the updated {@code /index.latest} blob containing the new repository generation {@code N + 1}.</li>
|
||||
* <li>Write an updated {@code RepositoryData} blob containing the new snapshot.</li>
|
||||
* </ol>
|
||||
*
|
||||
* <h2>Deleting a Snapshot</h2>
|
||||
|
@ -203,9 +234,8 @@
|
|||
* blob so that it can be deleted at the end of the snapshot delete process.</li>
|
||||
* </ol>
|
||||
* </li>
|
||||
* <li>Write an updated {@code RepositoryData} blob with the deleted snapshot removed to key {@code /index-${N+1}} directly under the
|
||||
* repository root and the repository generations that were changed in the affected shards adjusted.</li>
|
||||
* <li>Write an updated {@code index.latest} blob containing {@code N + 1}.</li>
|
||||
* <li>Write an updated {@code RepositoryData} blob with the deleted snapshot removed and containing the updated repository generations
|
||||
* that changed for the shards affected by the delete.</li>
|
||||
* <li>Delete the global {@code MetaData} blob {@code meta-${snapshot-uuid}.dat} stored directly under the repository root for the snapshot
|
||||
* as well as the {@code SnapshotInfo} blob at {@code /snap-${snapshot-uuid}.dat}.</li>
|
||||
* <li>Delete all unreferenced blobs previously collected when updating the shard directories. Also, remove any index folders or blobs
|
||||
|
|
|
@ -114,7 +114,7 @@ public class FsRepository extends BlobStoreRepository {
|
|||
|
||||
@Override
|
||||
protected BlobStore createBlobStore() throws Exception {
|
||||
final String location = REPOSITORIES_LOCATION_SETTING.get(metadata.settings());
|
||||
final String location = REPOSITORIES_LOCATION_SETTING.get(getMetadata().settings());
|
||||
final Path locationFile = environment.resolveRepoFile(location);
|
||||
return new FsBlobStore(environment.settings(), locationFile, isReadOnly());
|
||||
}
|
||||
|
|
|
@ -194,7 +194,7 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {
|
|||
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
|
||||
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
|
||||
final FsRepository repository = new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(),
|
||||
BlobStoreTestUtil.mockClusterService()) {
|
||||
BlobStoreTestUtil.mockClusterService(repositoryMetaData)) {
|
||||
@Override
|
||||
protected void assertSnapshotOrGenericThread() {
|
||||
// eliminate thread name check as we create repo manually
|
||||
|
|
|
@ -138,7 +138,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
|
|||
|
||||
public void testReadAndWriteSnapshotsThroughIndexFile() throws Exception {
|
||||
final BlobStoreRepository repository = setupRepo();
|
||||
|
||||
final long pendingGeneration = repository.metadata.pendingGeneration();
|
||||
// write to and read from a index file with no entries
|
||||
assertThat(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getSnapshotIds().size(), equalTo(0));
|
||||
final RepositoryData emptyData = RepositoryData.EMPTY;
|
||||
|
@ -147,7 +147,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
|
|||
assertEquals(repoData, emptyData);
|
||||
assertEquals(repoData.getIndices().size(), 0);
|
||||
assertEquals(repoData.getSnapshotIds().size(), 0);
|
||||
assertEquals(0L, repoData.getGenId());
|
||||
assertEquals(pendingGeneration + 1L, repoData.getGenId());
|
||||
|
||||
// write to and read from an index file with snapshots but no indices
|
||||
repoData = addRandomSnapshotsToRepoData(repoData, false);
|
||||
|
@ -164,27 +164,30 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
|
|||
final BlobStoreRepository repository = setupRepo();
|
||||
assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), RepositoryData.EMPTY);
|
||||
|
||||
final long pendingGeneration = repository.metadata.pendingGeneration();
|
||||
|
||||
// write to index generational file
|
||||
RepositoryData repositoryData = generateRandomRepoData();
|
||||
writeIndexGen(repository, repositoryData, RepositoryData.EMPTY_REPO_GEN);
|
||||
assertThat(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), equalTo(repositoryData));
|
||||
assertThat(repository.latestIndexBlobId(), equalTo(0L));
|
||||
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(0L));
|
||||
final long expectedGeneration = pendingGeneration + 1L;
|
||||
assertThat(repository.latestIndexBlobId(), equalTo(expectedGeneration));
|
||||
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(expectedGeneration));
|
||||
|
||||
// adding more and writing to a new index generational file
|
||||
repositoryData = addRandomSnapshotsToRepoData(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), true);
|
||||
writeIndexGen(repository, repositoryData, repositoryData.getGenId());
|
||||
assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), repositoryData);
|
||||
assertThat(repository.latestIndexBlobId(), equalTo(1L));
|
||||
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(1L));
|
||||
assertThat(repository.latestIndexBlobId(), equalTo(expectedGeneration + 1L));
|
||||
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(expectedGeneration + 1L));
|
||||
|
||||
// removing a snapshot and writing to a new index generational file
|
||||
repositoryData = ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).removeSnapshot(
|
||||
repositoryData.getSnapshotIds().iterator().next(), ShardGenerations.EMPTY);
|
||||
writeIndexGen(repository, repositoryData, repositoryData.getGenId());
|
||||
assertEquals(ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository), repositoryData);
|
||||
assertThat(repository.latestIndexBlobId(), equalTo(2L));
|
||||
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(2L));
|
||||
assertThat(repository.latestIndexBlobId(), equalTo(expectedGeneration + 2L));
|
||||
assertThat(repository.readSnapshotIndexLatestBlob(), equalTo(expectedGeneration + 2L));
|
||||
}
|
||||
|
||||
public void testRepositoryDataConcurrentModificationNotAllowed() {
|
||||
|
|
|
@ -499,11 +499,10 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
logger.info("--> Go through a loop of creating and deleting a snapshot to trigger repository cleanup");
|
||||
client().admin().cluster().prepareCleanupRepository("test-repo").get();
|
||||
|
||||
// Subtract four files that will remain in the repository:
|
||||
// Expect two files to remain in the repository:
|
||||
// (1) index-(N+1)
|
||||
// (2) index-N (because we keep the previous version) and
|
||||
// (3) index-latest
|
||||
assertFileCount(repo, 3);
|
||||
// (2) index-latest
|
||||
assertFileCount(repo, 2);
|
||||
logger.info("--> done");
|
||||
}
|
||||
|
||||
|
|
|
@ -42,7 +42,10 @@ public class RepositoriesMetaDataSerializationTests extends AbstractDiffableSeri
|
|||
int numberOfRepositories = randomInt(10);
|
||||
List<RepositoryMetaData> entries = new ArrayList<>();
|
||||
for (int i = 0; i < numberOfRepositories; i++) {
|
||||
entries.add(new RepositoryMetaData(randomAlphaOfLength(10), randomAlphaOfLength(10), randomSettings()));
|
||||
// divide by 2 to not overflow when adding to this number for the pending generation below
|
||||
final long generation = randomNonNegativeLong() / 2L;
|
||||
entries.add(new RepositoryMetaData(randomAlphaOfLength(10), randomAlphaOfLength(10), randomSettings(), generation,
|
||||
generation + randomLongBetween(0, generation)));
|
||||
}
|
||||
entries.sort(Comparator.comparing(RepositoryMetaData::name));
|
||||
return new RepositoriesMetaData(entries);
|
||||
|
|
|
@ -1321,9 +1321,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
|
||||
logger.info("--> delete the last snapshot");
|
||||
client.admin().cluster().prepareDeleteSnapshot("test-repo", lastSnapshot).get();
|
||||
logger.info("--> make sure that number of files is back to what it was when the first snapshot was made, " +
|
||||
"plus one because one backup index-N file should remain");
|
||||
assertFileCount(repo, numberOfFiles[0] + 1);
|
||||
logger.info("--> make sure that number of files is back to what it was when the first snapshot was made");
|
||||
assertFileCount(repo, numberOfFiles[0]);
|
||||
}
|
||||
|
||||
public void testDeleteSnapshotWithMissingIndexAndShardMetadata() throws Exception {
|
||||
|
|
|
@ -287,13 +287,13 @@ public class MockEventuallyConsistentRepository extends BlobStoreRepository {
|
|||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
|
||||
}
|
||||
|
||||
// Randomly filter out the latest /index-N blob from a listing to test that tracking of it in latestKnownRepoGen
|
||||
// overrides an inconsistent listing
|
||||
// Randomly filter out the index-N blobs from a listing to test that tracking of it in latestKnownRepoGen and the cluster state
|
||||
// ensures consistent repository operations
|
||||
private Map<String, BlobMetaData> maybeMissLatestIndexN(Map<String, BlobMetaData> listing) {
|
||||
// Only filter out latest index-N at the repo root and only as long as we're not in a forced consistent state
|
||||
if (path.parent() == null && context.consistent == false && random.nextBoolean()) {
|
||||
// Randomly filter out index-N blobs at the repo root to proof that we don't need them to be consistently listed
|
||||
if (path.parent() == null && context.consistent == false) {
|
||||
final Map<String, BlobMetaData> filtered = new HashMap<>(listing);
|
||||
filtered.remove(BlobStoreRepository.INDEX_FILE_PREFIX + latestKnownRepoGen.get());
|
||||
filtered.keySet().removeIf(b -> b.startsWith(BlobStoreRepository.INDEX_FILE_PREFIX) && random.nextBoolean());
|
||||
return Collections.unmodifiableMap(filtered);
|
||||
}
|
||||
return listing;
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.snapshots.mockstore;
|
|||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -134,9 +135,11 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
|
|||
|
||||
public void testOverwriteSnapshotInfoBlob() {
|
||||
MockEventuallyConsistentRepository.Context blobStoreContext = new MockEventuallyConsistentRepository.Context();
|
||||
try (BlobStoreRepository repository = new MockEventuallyConsistentRepository(
|
||||
new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY),
|
||||
xContentRegistry(), BlobStoreTestUtil.mockClusterService(), blobStoreContext, random())) {
|
||||
final RepositoryMetaData metaData = new RepositoryMetaData("testRepo", "mockEventuallyConsistent", Settings.EMPTY);
|
||||
final ClusterService clusterService = BlobStoreTestUtil.mockClusterService(metaData);
|
||||
try (BlobStoreRepository repository =
|
||||
new MockEventuallyConsistentRepository(metaData, xContentRegistry(), clusterService, blobStoreContext, random())) {
|
||||
clusterService.addStateApplier(event -> repository.updateState(event.state()));
|
||||
repository.start();
|
||||
|
||||
// We create a snap- blob for snapshot "foo" in the first generation
|
||||
|
|
|
@ -26,6 +26,9 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.ClusterStateApplier;
|
||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
|
||||
import org.elasticsearch.cluster.service.ClusterApplierService;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Strings;
|
||||
|
@ -292,11 +295,29 @@ public final class BlobStoreTestUtil {
|
|||
|
||||
/**
|
||||
* Creates a mocked {@link ClusterService} for use in {@link BlobStoreRepository} related tests that mocks out all the necessary
|
||||
* functionality to make {@link BlobStoreRepository} work.
|
||||
* functionality to make {@link BlobStoreRepository} work. Initializes the cluster state as {@link ClusterState#EMPTY_STATE}.
|
||||
*
|
||||
* @return Mock ClusterService
|
||||
*/
|
||||
public static ClusterService mockClusterService() {
|
||||
return mockClusterService(ClusterState.EMPTY_STATE);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a mocked {@link ClusterService} for use in {@link BlobStoreRepository} related tests that mocks out all the necessary
|
||||
* functionality to make {@link BlobStoreRepository} work. Initializes the cluster state with a {@link RepositoriesMetaData} instance
|
||||
* that contains the given {@code metadata}.
|
||||
*
|
||||
* @param metaData RepositoryMetaData to initialize the cluster state with
|
||||
* @return Mock ClusterService
|
||||
*/
|
||||
public static ClusterService mockClusterService(RepositoryMetaData metaData) {
|
||||
return mockClusterService(ClusterState.builder(ClusterState.EMPTY_STATE).metaData(
|
||||
MetaData.builder().putCustom(RepositoriesMetaData.TYPE,
|
||||
new RepositoriesMetaData(Collections.singletonList(metaData))).build()).build());
|
||||
}
|
||||
|
||||
private static ClusterService mockClusterService(ClusterState initialState) {
|
||||
final ThreadPool threadPool = mock(ThreadPool.class);
|
||||
when(threadPool.executor(ThreadPool.Names.SNAPSHOT)).thenReturn(new SameThreadExecutorService());
|
||||
when(threadPool.generic()).thenReturn(new SameThreadExecutorService());
|
||||
|
@ -305,7 +326,7 @@ public final class BlobStoreTestUtil {
|
|||
final ClusterService clusterService = mock(ClusterService.class);
|
||||
final ClusterApplierService clusterApplierService = mock(ClusterApplierService.class);
|
||||
when(clusterService.getClusterApplierService()).thenReturn(clusterApplierService);
|
||||
final AtomicReference<ClusterState> currentState = new AtomicReference<>(ClusterState.EMPTY_STATE);
|
||||
final AtomicReference<ClusterState> currentState = new AtomicReference<>(initialState);
|
||||
when(clusterService.state()).then(invocationOnMock -> currentState.get());
|
||||
final List<ClusterStateApplier> appliers = new CopyOnWriteArrayList<>();
|
||||
doAnswer(invocation -> {
|
||||
|
|
|
@ -100,6 +100,8 @@ public class MockRepository extends FsRepository {
|
|||
|
||||
private final String randomPrefix;
|
||||
|
||||
private final Environment env;
|
||||
|
||||
private volatile boolean blockOnControlFiles;
|
||||
|
||||
private volatile boolean blockOnDataFiles;
|
||||
|
@ -125,9 +127,15 @@ public class MockRepository extends FsRepository {
|
|||
blockAndFailOnWriteSnapFile = metadata.settings().getAsBoolean("block_on_snap", false);
|
||||
randomPrefix = metadata.settings().get("random", "default");
|
||||
waitAfterUnblock = metadata.settings().getAsLong("wait_after_unblock", 0L);
|
||||
env = environment;
|
||||
logger.info("starting mock repository with random prefix {}", randomPrefix);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RepositoryMetaData getMetadata() {
|
||||
return overrideSettings(super.getMetadata(), env);
|
||||
}
|
||||
|
||||
private static RepositoryMetaData overrideSettings(RepositoryMetaData metadata, Environment environment) {
|
||||
// TODO: use another method of testing not being able to read the test file written by the master...
|
||||
// this is super duper hacky
|
||||
|
|
|
@ -353,7 +353,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
|
|||
private Repository createRepository() {
|
||||
Settings settings = Settings.builder().put("location", randomAlphaOfLength(10)).build();
|
||||
RepositoryMetaData repositoryMetaData = new RepositoryMetaData(randomAlphaOfLength(10), FsRepository.TYPE, settings);
|
||||
return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(), BlobStoreTestUtil.mockClusterService());
|
||||
return new FsRepository(repositoryMetaData, createEnvironment(), xContentRegistry(),
|
||||
BlobStoreTestUtil.mockClusterService(repositoryMetaData));
|
||||
}
|
||||
|
||||
private static void runAsSnapshot(ThreadPool pool, Runnable runnable) {
|
||||
|
|
Loading…
Reference in New Issue