Refactor Inflexible Snapshot Repository BwC (#52365) (#52557)

* Refactor Inflexible Snapshot Repository BwC (#52365)

Transport the version to use for  a snapshot instead of whether to use shard generations in the snapshots in progress entry. This allows making upcoming repository metadata changes in a flexible manner in an analogous way to how we handle serialization BwC elsewhere.
Also, exposing the version at the repository API level will make it easier to do BwC relevant changes in derived repositories like source only or encrypted.
This commit is contained in:
Armin Braun 2020-02-21 09:14:34 +01:00 committed by GitHub
parent b84e8db7b5
commit 4bb780bc37
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 229 additions and 170 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.repositories.s3;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.cluster.metadata.MetaData;
@ -270,21 +271,22 @@ class S3Repository extends BlobStoreRepository {
@Override
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
MetaData clusterMetaData, Map<String, Object> userMetadata, boolean writeShardGens,
MetaData clusterMetaData, Map<String, Object> userMetadata, Version repositoryMetaVersion,
ActionListener<SnapshotInfo> listener) {
if (writeShardGens == false) {
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
listener = delayedListener(listener);
}
super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, clusterMetaData, userMetadata, writeShardGens, listener);
includeGlobalState, clusterMetaData, userMetadata, repositoryMetaVersion, listener);
}
@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
if (writeShardGens == false) {
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener) {
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
listener = delayedListener(listener);
}
super.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener);
super.deleteSnapshot(snapshotId, repositoryStateId, repositoryMetaVersion, listener);
}
/**

View File

@ -219,7 +219,7 @@ public class MultiVersionRepositoryAccessIT extends ESRestTestCase {
ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards);
}
} else {
if (minimumNodeVersion().before(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
if (SnapshotsService.useShardGenerations(minimumNodeVersion()) == false) {
assertThat(TEST_STEP, is(TestStep.STEP3_OLD_CLUSTER));
final List<Class<? extends Exception>> expectedExceptions =
Arrays.asList(ResponseException.class, ElasticsearchStatusException.class);

View File

@ -219,8 +219,8 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener,
l -> blobStoreRepository.cleanup(
repositoryStateId,
newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)
&& snapshotsService.hasOldVersionSnapshots(repositoryName, repositoryData, null) == false,
snapshotsService.minCompatibleVersion(
newState.nodes().getMinNodeVersion(), repositoryName, repositoryData, null),
ActionListener.wrap(result -> after(null, result), e -> after(e, null)))
));
}

View File

@ -56,6 +56,9 @@ import static org.elasticsearch.snapshots.SnapshotInfo.METADATA_FIELD_INTRODUCED
* Meta data about snapshots that are currently executing
*/
public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implements Custom {
private static final Version VERSION_IN_SNAPSHOT_VERSION = Version.V_7_7_0;
public static final String TYPE = "snapshots";
@Override
@ -93,13 +96,13 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
private final long startTime;
private final long repositoryStateId;
// see #useShardGenerations
private final boolean useShardGenerations;
private final Version version;
@Nullable private final Map<String, Object> userMetadata;
@Nullable private final String failure;
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
String failure, Map<String, Object> userMetadata, boolean useShardGenerations) {
String failure, Map<String, Object> userMetadata, Version version) {
this.state = state;
this.snapshot = snapshot;
this.includeGlobalState = includeGlobalState;
@ -117,7 +120,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
this.repositoryStateId = repositoryStateId;
this.failure = failure;
this.userMetadata = userMetadata;
this.useShardGenerations = useShardGenerations;
this.version = version;
}
private static boolean assertShardsConsistent(State state, List<IndexId> indices,
@ -135,25 +138,25 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
Map<String, Object> userMetadata, boolean useShardGenerations) {
Map<String, Object> userMetadata, Version version) {
this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null, userMetadata,
useShardGenerations);
version);
}
public Entry(Entry entry, State state, List<IndexId> indices, long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, boolean useShardGenerations, String failure) {
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, Version version, String failure) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, indices, entry.startTime, repositoryStateId, shards,
failure, entry.userMetadata, useShardGenerations);
failure, entry.userMetadata, version);
}
public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.useShardGenerations);
entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.version);
}
public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime,
entry.repositoryStateId, shards, failure, entry.userMetadata, entry.useShardGenerations);
entry.repositoryStateId, shards, failure, entry.userMetadata, entry.version);
}
public Entry(Entry entry, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
@ -211,13 +214,10 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
}
/**
* Whether to write to the repository in a format only understood by versions newer than
* {@link SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}.
*
* @return true if writing to repository in new format
* What version of metadata to use for the snapshot in the repository
*/
public boolean useShardGenerations() {
return useShardGenerations;
public Version version() {
return version;
}
@Override
@ -235,7 +235,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
if (!snapshot.equals(entry.snapshot)) return false;
if (state != entry.state) return false;
if (repositoryStateId != entry.repositoryStateId) return false;
if (useShardGenerations != entry.useShardGenerations) return false;
if (version.equals(entry.version) == false) return false;
return true;
}
@ -250,7 +250,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
result = 31 * result + indices.hashCode();
result = 31 * result + Long.hashCode(startTime);
result = 31 * result + Long.hashCode(repositoryStateId);
result = 31 * result + (useShardGenerations ? 1 : 0);
result = 31 * result + version.hashCode();
return result;
}
@ -365,7 +365,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
public ShardSnapshotStatus(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
state = ShardState.fromValue(in.readByte());
if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
if (SnapshotsService.useShardGenerations(in.getVersion())) {
generation = in.readOptionalString();
} else {
generation = null;
@ -392,7 +392,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalString(nodeId);
out.writeByte(state.value);
if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
if (SnapshotsService.useShardGenerations(out.getVersion())) {
out.writeOptionalString(generation);
}
out.writeOptionalString(reason);
@ -547,11 +547,16 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
if (in.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
userMetadata = in.readMap();
}
final boolean useShardGenerations;
if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
useShardGenerations = in.readBoolean();
final Version version;
if (in.getVersion().onOrAfter(VERSION_IN_SNAPSHOT_VERSION)) {
version = Version.readVersion(in);
} else if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
// If an older master informs us that shard generations are supported we use the minimum shard generation compatible
// version. If shard generations are not supported yet we use a placeholder for a version that does not use shard
// generations.
version = in.readBoolean() ? SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION : SnapshotsService.OLD_SNAPSHOT_FORMAT;
} else {
useShardGenerations = false;
version = SnapshotsService.OLD_SNAPSHOT_FORMAT;
}
entries[i] = new Entry(snapshot,
includeGlobalState,
@ -563,7 +568,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
builder.build(),
failure,
userMetadata,
useShardGenerations
version
);
}
this.entries = Arrays.asList(entries);
@ -599,8 +604,10 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
if (out.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) {
out.writeMap(entry.userMetadata);
}
if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
out.writeBoolean(entry.useShardGenerations);
if (out.getVersion().onOrAfter(VERSION_IN_SNAPSHOT_VERSION)) {
Version.writeVersion(entry.version, out);
} else if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) {
out.writeBoolean(SnapshotsService.useShardGenerations(entry.version));
}
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.repositories;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -82,14 +83,15 @@ public class FilterRepository implements Repository {
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata,
boolean writeShardGens, ActionListener<SnapshotInfo> listener) {
Version repositoryMetaVersion, ActionListener<SnapshotInfo> listener) {
in.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, metaData, userMetadata, writeShardGens, listener);
includeGlobalState, metaData, userMetadata, repositoryMetaVersion, listener);
}
@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
in.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener);
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener) {
in.deleteSnapshot(snapshotId, repositoryStateId, repositoryMetaVersion, listener);
}
@Override
@ -124,10 +126,10 @@ public class FilterRepository implements Repository {
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
Map<String, Object> userMetadata, ActionListener<String> listener) {
in.snapshotShard(
store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, writeShardGens, userMetadata, listener);
store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, repositoryMetaVersion, userMetadata, listener);
}
@Override
public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState,

View File

@ -19,6 +19,7 @@
package org.elasticsearch.repositories;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.SnapshotsInProgress;
@ -137,23 +138,23 @@ public interface Repository extends LifecycleComponent {
* @param includeGlobalState include cluster global state
* @param clusterMetaData cluster metadata
* @param userMetadata user metadata
* @param writeShardGens if shard generations should be written to the repository
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param listener listener to be called on completion of the snapshot
*/
void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, MetaData clusterMetaData, Map<String, Object> userMetadata,
boolean writeShardGens, ActionListener<SnapshotInfo> listener);
Version repositoryMetaVersion, ActionListener<SnapshotInfo> listener);
/**
* Deletes snapshot
*
* @param snapshotId snapshot id
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began
* @param writeShardGens if shard generations should be written to the repository
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param listener completion listener
*/
void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener);
void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion, ActionListener<Void> listener);
/**
* Returns snapshot throttle time in nanoseconds
@ -212,11 +213,12 @@ public interface Repository extends LifecycleComponent {
* @param indexId id for the index being snapshotted
* @param snapshotIndexCommit commit point
* @param snapshotStatus snapshot status
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param userMetadata user metadata of the snapshot found in {@link SnapshotsInProgress.Entry#userMetadata()}
* @param listener listener invoked on completion
*/
void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit,
IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, Map<String, Object> userMetadata,
IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion, Map<String, Object> userMetadata,
ActionListener<String> listener);
/**

View File

@ -523,7 +523,7 @@ public final class RepositoryData {
throw new ElasticsearchParseException("version string expected [min_version]");
}
final Version version = Version.fromString(parser.text());
assert version.onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION);
assert SnapshotsService.useShardGenerations(version);
} else {
throw new ElasticsearchParseException("unknown field name [" + field + "]");
}

View File

@ -489,8 +489,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
}
@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener) {
if (isReadOnly()) {
listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"));
} else {
@ -507,7 +507,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
// delete an index that was created by another master node after writing this index-N blob.
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
doDeleteShardSnapshots(snapshotId, repositoryStateId, foundIndices, rootBlobs, repositoryData, writeShardGens, listener);
doDeleteShardSnapshots(snapshotId, repositoryStateId, foundIndices, rootBlobs, repositoryData,
SnapshotsService.useShardGenerations(repositoryMetaVersion), listener);
} catch (Exception ex) {
listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex));
}
@ -764,10 +765,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
* <li>Deleting unreferenced root level blobs {@link #cleanupStaleRootFiles}</li>
* </ul>
* @param repositoryStateId Current repository state id
* @param writeShardGens If shard generations should be written to the repository
* @param repositoryMetaVersion version of the updated repository metadata to write
* @param listener Listener to complete when done
*/
public void cleanup(long repositoryStateId, boolean writeShardGens, ActionListener<RepositoryCleanupResult> listener) {
public void cleanup(long repositoryStateId, Version repositoryMetaVersion, ActionListener<RepositoryCleanupResult> listener) {
try {
if (isReadOnly()) {
throw new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository");
@ -783,7 +784,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
listener.onResponse(new RepositoryCleanupResult(DeleteResult.ZERO));
} else {
// write new index-N blob to ensure concurrent operations will fail
writeIndexGen(repositoryData, repositoryStateId, writeShardGens,
writeIndexGen(repositoryData, repositoryStateId, SnapshotsService.useShardGenerations(repositoryMetaVersion),
ActionListener.wrap(v -> cleanupStaleBlobs(foundIndices, rootBlobs, repositoryData,
ActionListener.map(listener, RepositoryCleanupResult::new)), listener::onFailure));
}
@ -882,7 +883,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
final boolean includeGlobalState,
final MetaData clusterMetaData,
final Map<String, Object> userMetadata,
boolean writeShardGens,
Version repositoryMetaVersion,
final ActionListener<SnapshotInfo> listener) {
final Collection<IndexId> indices = shardGenerations.indices();
@ -890,6 +891,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// directory if all nodes are at least at version SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION
// If there are older version nodes in the cluster, we don't need to run this cleanup as it will have already happened
// when writing the index-${N} to each shard directory.
final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion);
final Consumer<Exception> onUpdateFailure =
e -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e));
final ActionListener<SnapshotInfo> allMetaListener = new GroupedActionListener<>(
@ -1467,7 +1469,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
Map<String, Object> userMetadata, ActionListener<String> listener) {
final ShardId shardId = store.shardId();
final long startTime = threadPool.absoluteTimeInMillis();
@ -1594,6 +1596,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
final List<String> blobsToDelete;
final String indexGeneration;
final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion);
if (writeShardGens) {
indexGeneration = UUIDs.randomBase64UUID();
blobsToDelete = Collections.emptyList();

View File

@ -24,6 +24,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
@ -281,9 +282,9 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
final IndexShardSnapshotStatus snapshotStatus = shardEntry.getValue();
final IndexId indexId = indicesMap.get(shardId.getIndexName());
assert indexId != null;
assert entry.useShardGenerations() || snapshotStatus.generation() == null :
assert SnapshotsService.useShardGenerations(entry.version()) || snapshotStatus.generation() == null :
"Found non-null shard generation [" + snapshotStatus.generation() + "] for snapshot with old-format compatibility";
snapshot(shardId, snapshot, indexId, entry.userMetadata(), snapshotStatus, entry.useShardGenerations(),
snapshot(shardId, snapshot, indexId, entry.userMetadata(), snapshotStatus, entry.version(),
new ActionListener<String>() {
@Override
public void onResponse(String newGeneration) {
@ -316,7 +317,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
* @param snapshotStatus snapshot status
*/
private void snapshot(final ShardId shardId, final Snapshot snapshot, final IndexId indexId, final Map<String, Object> userMetadata,
final IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, ActionListener<String> listener) {
final IndexShardSnapshotStatus snapshotStatus, Version version, ActionListener<String> listener) {
try {
final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id());
if (indexShard.routingEntry().primary() == false) {
@ -339,7 +340,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
// we flush first to make sure we get the latest writes snapshotted
snapshotRef = indexShard.acquireLastIndexCommit(true);
repository.snapshotShard(indexShard.store(), indexShard.mapperService(), snapshot.getSnapshotId(), indexId,
snapshotRef.getIndexCommit(), snapshotStatus, writeShardGens, userMetadata,
snapshotRef.getIndexCommit(), snapshotStatus, version, userMetadata,
ActionListener.runBefore(listener, snapshotRef::close));
} catch (Exception e) {
IOUtils.close(snapshotRef);

View File

@ -130,6 +130,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
public static final Version SHARD_GEN_IN_REPO_DATA_VERSION = Version.V_7_6_0;
public static final Version OLD_SNAPSHOT_FORMAT = Version.V_7_5_0;
private static final Logger logger = LogManager.getLogger(SnapshotsService.class);
private final ClusterService clusterService;
@ -316,7 +318,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
threadPool.absoluteTimeInMillis(),
RepositoryData.UNKNOWN_REPO_GEN,
null,
userMeta, false
userMeta, Version.CURRENT
);
initializingSnapshots.add(newSnapshot.snapshot());
snapshots = new SnapshotsInProgress(newSnapshot);
@ -362,34 +364,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
});
}
public boolean hasOldVersionSnapshots(String repositoryName, RepositoryData repositoryData, @Nullable SnapshotId excluded) {
final Collection<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
final boolean hasOldFormatSnapshots;
if (snapshotIds.isEmpty()) {
hasOldFormatSnapshots = false;
} else {
if (repositoryData.shardGenerations().totalShards() > 0) {
hasOldFormatSnapshots = false;
} else {
try {
final Repository repository = repositoriesService.repository(repositoryName);
hasOldFormatSnapshots = snapshotIds.stream().filter(snapshotId -> snapshotId.equals(excluded) == false).anyMatch(
snapshotId -> {
final Version known = repositoryData.getVersion(snapshotId);
return (known == null ? repository.getSnapshotInfo(snapshotId).version() : known)
.before(SHARD_GEN_IN_REPO_DATA_VERSION);
});
} catch (SnapshotMissingException e) {
logger.warn("Failed to load snapshot metadata, assuming repository is in old format", e);
return true;
}
}
}
assert hasOldFormatSnapshots == false || repositoryData.shardGenerations().totalShards() == 0 :
"Found non-empty shard generations [" + repositoryData.shardGenerations() + "] but repository contained old version snapshots";
return hasOldFormatSnapshots;
}
/**
* Validates snapshot request
*
@ -480,15 +454,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
snapshotCreated = true;
logger.info("snapshot [{}] started", snapshot.snapshot());
final boolean hasOldFormatSnapshots =
hasOldVersionSnapshots(snapshot.snapshot().getRepository(), repositoryData, null);
final boolean writeShardGenerations = hasOldFormatSnapshots == false &&
clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION);
final Version version =
minCompatibleVersion(clusterState.nodes().getMinNodeVersion(), snapshot.repository(), repositoryData, null);
if (indices.isEmpty()) {
// No indices in this snapshot - we are done
userCreateSnapshotListener.onResponse(snapshot.snapshot());
endSnapshot(new SnapshotsInProgress.Entry(
snapshot, State.STARTED, Collections.emptyList(), repositoryData.getGenId(), null, writeShardGenerations,
snapshot, State.STARTED, Collections.emptyList(), repositoryData.getGenId(), null, version,
null), clusterState.metaData());
return;
}
@ -512,7 +484,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
final List<IndexId> indexIds = repositoryData.resolveNewIndices(indices);
// Replace the snapshot that was just initialized
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards =
shards(currentState, indexIds, writeShardGenerations, repositoryData);
shards(currentState, indexIds, useShardGenerations(version), repositoryData);
if (!partial) {
Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards,
currentState.metaData());
@ -532,12 +504,12 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
failureMessage.append(closed);
}
entries.add(new SnapshotsInProgress.Entry(entry, State.FAILED, indexIds,
repositoryData.getGenId(), shards, writeShardGenerations, failureMessage.toString()));
repositoryData.getGenId(), shards, version, failureMessage.toString()));
continue;
}
}
entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, indexIds, repositoryData.getGenId(),
shards, writeShardGenerations, null));
shards, version, null));
}
}
return ClusterState.builder(currentState)
@ -637,7 +609,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
snapshot.includeGlobalState(),
metaDataForSnapshot(snapshot, metaData),
snapshot.userMetadata(),
snapshot.useShardGenerations(),
snapshot.version(),
ActionListener.runAfter(ActionListener.wrap(ignored -> {
}, inner -> {
inner.addSuppressed(exception);
@ -853,8 +825,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
assert deletionsInProgress.getEntries().size() == 1 : "only one in-progress deletion allowed per cluster";
SnapshotDeletionsInProgress.Entry entry = deletionsInProgress.getEntries().get(0);
deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.repositoryStateId(),
state.nodes().getMinNodeVersion());
deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.repositoryStateId(), state.nodes().getMinNodeVersion());
}
}
@ -1118,7 +1089,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
entry.includeGlobalState(),
metaDataForSnapshot(entry, metaData),
entry.userMetadata(),
entry.useShardGenerations(),
entry.version(),
ActionListener.wrap(snapshotInfo -> {
removeSnapshotFromClusterState(snapshot, snapshotInfo, null);
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
@ -1422,6 +1393,60 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
});
}
/**
* Determines the minimum {@link Version} that the snapshot repository must be compatible with from the current nodes in the cluster
* and the contents of the repository. The minimum version is determined as the lowest version found across all snapshots in the
* repository and all nodes in the cluster.
*
* @param minNodeVersion minimum node version in the cluster
* @param repositoryName name of the repository to modify
* @param repositoryData current {@link RepositoryData} of that repository
* @param excluded snapshot id to ignore when computing the minimum version
* (used to use newer metadata version after a snapshot delete)
* @return minimum node version that must still be able to read the repository metadata
*/
public Version minCompatibleVersion(Version minNodeVersion, String repositoryName, RepositoryData repositoryData,
@Nullable SnapshotId excluded) {
Version minCompatVersion = minNodeVersion;
final Collection<SnapshotId> snapshotIds = repositoryData.getSnapshotIds();
final Repository repository = repositoriesService.repository(repositoryName);
for (SnapshotId snapshotId :
snapshotIds.stream().filter(snapshotId -> snapshotId.equals(excluded) == false).collect(Collectors.toList())) {
final Version known = repositoryData.getVersion(snapshotId);
// If we don't have the version cached in the repository data yet we load it from the snapshot info blobs
if (known == null) {
assert repositoryData.shardGenerations().totalShards() == 0 :
"Saw shard generations [" + repositoryData.shardGenerations() +
"] but did not have versions tracked for snapshot [" + snapshotId + "]";
try {
final Version foundVersion = repository.getSnapshotInfo(snapshotId).version();
if (useShardGenerations(foundVersion) == false) {
// We don't really care about the exact version if its before 7.6 as the 7.5 metadata is the oldest we are able
// to write out so we stop iterating here and just use 7.5.0 as a placeholder.
return OLD_SNAPSHOT_FORMAT;
}
minCompatVersion = minCompatVersion.before(foundVersion) ? minCompatVersion : foundVersion;
} catch (SnapshotMissingException e) {
logger.warn("Failed to load snapshot metadata, assuming repository is in old format", e);
return OLD_SNAPSHOT_FORMAT;
}
} else {
minCompatVersion = minCompatVersion.before(known) ? minCompatVersion : known;
}
}
return minCompatVersion;
}
/**
* Checks whether the metadata version supports writing {@link ShardGenerations} to the repository.
*
* @param repositoryMetaVersion version to check
* @return true if version supports {@link ShardGenerations}
*/
public static boolean useShardGenerations(Version repositoryMetaVersion) {
return repositoryMetaVersion.onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION);
}
/**
* Checks if a repository is currently in use by one of the snapshots
*
@ -1463,16 +1488,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
* @param snapshot snapshot
* @param listener listener
* @param repositoryStateId the unique id representing the state of the repository at the time the deletion began
* @param version minimum ES version the repository should be readable by
* @param minNodeVersion minimum node version in the cluster
*/
private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener<Void> listener, long repositoryStateId,
Version version) {
Version minNodeVersion) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> {
Repository repository = repositoriesService.repository(snapshot.getRepository());
repository.getRepositoryData(ActionListener.wrap(repositoryData -> repository.deleteSnapshot(snapshot.getSnapshotId(),
repositoryStateId,
version.onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION) &&
hasOldVersionSnapshots(snapshot.getRepository(), repositoryData, snapshot.getSnapshotId()) == false,
minCompatibleVersion(minNodeVersion, snapshot.getRepository(), repositoryData, snapshot.getSnapshotId()),
ActionListener.wrap(v -> {
logger.info("snapshot [{}] deleted", snapshot);
removeSnapshotDeletionFromClusterState(snapshot, null, l);

View File

@ -722,7 +722,7 @@ public class ClusterStateDiffIT extends ESIntegTestCase {
(long) randomIntBetween(0, 1000),
ImmutableOpenMap.of(),
SnapshotInfoTests.randomUserMetadata(),
randomBoolean()));
randomVersion(random())));
case 1:
return new RestoreInProgress.Builder().add(
new RestoreInProgress.Entry(

View File

@ -30,6 +30,7 @@ import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfoTests;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import java.util.Arrays;
import java.util.List;
@ -66,7 +67,8 @@ public class SnapshotsInProgressTests extends ESTestCase {
// test no waiting shards in an index
shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "", "1"));
Entry entry = new Entry(snapshot, randomBoolean(), randomBoolean(), State.INIT,
indices, System.currentTimeMillis(), randomLong(), shards.build(), SnapshotInfoTests.randomUserMetadata(), randomBoolean());
indices, System.currentTimeMillis(), randomLong(), shards.build(), SnapshotInfoTests.randomUserMetadata(),
VersionUtils.randomVersion(random()));
ImmutableOpenMap<String, List<ShardId>> waitingIndices = entry.waitingIndices();
assertEquals(2, waitingIndices.get(idx1Name).size());

View File

@ -62,7 +62,7 @@ public class MetaDataDeleteIndexServiceTests extends ESTestCase {
SnapshotsInProgress snaps = new SnapshotsInProgress(new SnapshotsInProgress.Entry(snapshot, true, false,
SnapshotsInProgress.State.INIT, singletonList(new IndexId(index, "doesn't matter")),
System.currentTimeMillis(), (long) randomIntBetween(0, 1000), ImmutableOpenMap.of(),
SnapshotInfoTests.randomUserMetadata(), randomBoolean()));
SnapshotInfoTests.randomUserMetadata(), VersionUtils.randomVersion(random())));
ClusterState state = ClusterState.builder(clusterState(index))
.putCustom(SnapshotsInProgress.TYPE, snaps)
.build();

View File

@ -52,6 +52,7 @@ import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.snapshots.SnapshotInfoTests;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import java.util.Arrays;
import java.util.Collection;
@ -472,7 +473,7 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
final SnapshotsInProgress.Entry entry =
new SnapshotsInProgress.Entry(snapshot, randomBoolean(), false, SnapshotsInProgress.State.INIT,
Collections.singletonList(new IndexId(index, index)), randomNonNegativeLong(), randomLong(), shardsBuilder.build(),
SnapshotInfoTests.randomUserMetadata(), randomBoolean());
SnapshotInfoTests.randomUserMetadata(), VersionUtils.randomVersion(random()));
return ClusterState.builder(newState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(entry)).build();
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.repositories;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest;
import org.elasticsearch.cluster.ClusterState;
@ -163,12 +164,13 @@ public class RepositoriesServiceTests extends ESTestCase {
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations indices, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata,
boolean writeShardGens, ActionListener<SnapshotInfo> listener) {
Version repositoryMetaVersion, ActionListener<SnapshotInfo> listener) {
listener.onResponse(null);
}
@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener) {
listener.onResponse(null);
}
@ -203,8 +205,8 @@ public class RepositoriesServiceTests extends ESTestCase {
}
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit
snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
Map<String, Object> userMetadata, ActionListener<String> listener) {
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.repositories.blobstore;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.TestUtil;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
@ -173,7 +174,7 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {
repository.finalizeSnapshot(snapshot.getSnapshotId(),
ShardGenerations.builder().put(indexId, 0, shardGen).build(),
0L, null, 1, Collections.emptyList(), -1L, false,
MetaData.builder().put(shard.indexSettings().getIndexMetaData(), false).build(), Collections.emptyMap(), true,
MetaData.builder().put(shard.indexSettings().getIndexMetaData(), false).build(), Collections.emptyMap(), Version.CURRENT,
future);
future.actionGet();
IndexShardSnapshotFailedException isfe = expectThrows(IndexShardSnapshotFailedException.class,

View File

@ -105,7 +105,7 @@ public class FsRepositoryTests extends ESTestCase {
final PlainActionFuture<String> future1 = PlainActionFuture.newFuture();
runGeneric(threadPool, () -> {
IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null);
repository.snapshotShard(store, null, snapshotId, indexId, indexCommit, snapshotStatus, true,
repository.snapshotShard(store, null, snapshotId, indexId, indexCommit, snapshotStatus, Version.CURRENT,
Collections.emptyMap(), future1);
future1.actionGet();
IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy();
@ -134,7 +134,7 @@ public class FsRepositoryTests extends ESTestCase {
final PlainActionFuture<String> future2 = PlainActionFuture.newFuture();
runGeneric(threadPool, () -> {
IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration);
repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus, true,
repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus, Version.CURRENT,
Collections.emptyMap(), future2);
future2.actionGet();
IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy();

View File

@ -287,16 +287,18 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
final SnapshotsService snapshotsService = internalCluster().getCurrentMasterNodeInstance(SnapshotsService.class);
final ThreadPool threadPool = internalCluster().getCurrentMasterNodeInstance(ThreadPool.class);
assertThat(PlainActionFuture.get(f -> threadPool.generic().execute(
ActionRunnable.supply(f, () -> snapshotsService.hasOldVersionSnapshots(repoName, getRepositoryData(repository), null)))),
is(true));
ActionRunnable.supply(f, () ->
snapshotsService.minCompatibleVersion(Version.CURRENT, repoName, getRepositoryData(repository), null)))),
is(SnapshotsService.OLD_SNAPSHOT_FORMAT));
logger.info("--> verify that snapshot with missing root level metadata can be deleted");
assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotToCorrupt.getName()).get());
logger.info("--> verify that repository is assumed in new metadata format after removing corrupted snapshot");
assertThat(PlainActionFuture.get(f -> threadPool.generic().execute(
ActionRunnable.supply(f, () -> snapshotsService.hasOldVersionSnapshots(repoName, getRepositoryData(repository), null)))),
is(false));
ActionRunnable.supply(f, () ->
snapshotsService.minCompatibleVersion(Version.CURRENT, repoName, getRepositoryData(repository), null)))),
is(Version.CURRENT));
final RepositoryData finalRepositoryData = getRepositoryData(repository);
for (SnapshotId snapshotId : finalRepositoryData.getSnapshotIds()) {
assertThat(finalRepositoryData.getVersion(snapshotId), is(Version.CURRENT));

View File

@ -19,6 +19,7 @@
package org.elasticsearch.snapshots;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
@ -86,19 +87,20 @@ public class RepositoryFilterUserMetadataIT extends ESIntegTestCase {
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, MetaData clusterMetaData, Map<String, Object> userMetadata,
boolean writeShardGens, ActionListener<SnapshotInfo> listener) {
Version repositoryMetaVersion, ActionListener<SnapshotInfo> listener) {
assertThat(userMetadata, is(Collections.singletonMap(MOCK_FILTERED_META, initialMetaValue)));
super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures,
repositoryStateId, includeGlobalState, clusterMetaData, userMetadata, writeShardGens, listener);
repositoryStateId, includeGlobalState, clusterMetaData, userMetadata, repositoryMetaVersion, listener);
}
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus,
boolean writeShardGens, Map<String, Object> userMetadata, ActionListener<String> listener) {
Version repositoryMetaVersion, Map<String, Object> userMetadata,
ActionListener<String> listener) {
assertThat(userMetadata, is(Collections.singletonMap(MOCK_FILTERED_META, initialMetaValue)));
super.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus,
writeShardGens, userMetadata, listener);
repositoryMetaVersion, userMetadata, listener);
}
@Override

View File

@ -33,6 +33,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.test.AbstractDiffableWireSerializationTestCase;
import org.elasticsearch.test.VersionUtils;
import java.util.ArrayList;
import java.util.List;
@ -77,7 +78,7 @@ public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireS
}
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = builder.build();
return new Entry(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards,
SnapshotInfoTests.randomUserMetadata(), randomBoolean());
SnapshotInfoTests.randomUserMetadata(), VersionUtils.randomVersion(random()));
}
@Override

View File

@ -18,6 +18,7 @@
*/
package org.elasticsearch.snapshots.mockstore;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
@ -149,7 +150,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
final SnapshotId snapshotId = new SnapshotId("foo", UUIDs.randomBase64UUID());
// We try to write another snap- blob for "foo" in the next generation. It fails because the content differs.
repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(),
-1L, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), true, future);
-1L, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), Version.CURRENT, future);
future.actionGet();
// We try to write another snap- blob for "foo" in the next generation. It fails because the content differs.
@ -158,7 +159,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
final PlainActionFuture<SnapshotInfo> fut = PlainActionFuture.newFuture();
repository.finalizeSnapshot(
snapshotId, ShardGenerations.EMPTY, 1L, null, 6, Collections.emptyList(),
0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), true, fut);
0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), Version.CURRENT, fut);
fut.actionGet();
});
assertThat(assertionError.getMessage(), equalTo("\nExpected: <6>\n but: was <5>"));
@ -167,7 +168,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
// It passes cleanly because the content of the blob except for the timestamps.
final PlainActionFuture<SnapshotInfo> future2 = PlainActionFuture.newFuture();
repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(),
0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(),true, future2);
0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(),Version.CURRENT, future2);
future2.actionGet();
}
}

View File

@ -839,7 +839,7 @@ public abstract class IndexShardTestCase extends ESTestCase {
final String shardGen;
try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) {
repository.snapshotShard(shard.store(), shard.mapperService(), snapshot.getSnapshotId(), indexId,
indexCommitRef.getIndexCommit(), snapshotStatus, true, Collections.emptyMap(), future);
indexCommitRef.getIndexCommit(), snapshotStatus, Version.CURRENT, Collections.emptyMap(), future);
shardGen = future.actionGet();
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.shard;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -99,13 +100,14 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
@Override
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata, boolean writeShardGens,
ActionListener<SnapshotInfo> listener) {
boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata,
Version repositoryMetaVersion, ActionListener<SnapshotInfo> listener) {
listener.onResponse(null);
}
@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener) {
listener.onResponse(null);
}
@ -135,7 +137,7 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
Map<String, Object> userMetadata, ActionListener<String> listener) {
}

View File

@ -260,13 +260,14 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
@Override
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards,
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
MetaData metaData, Map<String, Object> userMetadata, boolean writeShardGens,
MetaData metaData, Map<String, Object> userMetadata, Version repositoryMetaVersion,
ActionListener<SnapshotInfo> listener) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}
@Override
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener<Void> listener) {
public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion,
ActionListener<Void> listener) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}
@ -301,7 +302,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
Map<String, Object> userMetadata, ActionListener<String> listener) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

View File

@ -15,6 +15,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.SimpleFSDirectory;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
@ -95,13 +96,14 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
boolean includeGlobalState, MetaData metaData, Map<String, Object> userMetadata,
boolean writeShardGens, ActionListener<SnapshotInfo> listener) {
Version repositoryMetaVersion, ActionListener<SnapshotInfo> listener) {
// we process the index metadata at snapshot time. This means if somebody tries to restore
// a _source only snapshot with a plain repository it will be just fine since we already set the
// required engine, that the index is read-only and the mapping to a default mapping
try {
super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
includeGlobalState, metadataToSnapshot(shardGenerations.indices(), metaData), userMetadata, writeShardGens, listener);
includeGlobalState, metadataToSnapshot(shardGenerations.indices(), metaData), userMetadata, repositoryMetaVersion,
listener);
} catch (IOException ex) {
listener.onFailure(ex);
}
@ -136,7 +138,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion,
Map<String, Object> userMetadata, ActionListener<String> listener) {
if (mapperService.documentMapper() != null // if there is no mapping this is null
&& mapperService.documentMapper().sourceMapper().isComplete() == false) {
@ -176,7 +178,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY, BlockTreeTermsReader.FSTLoadMode.OFF_HEAP.name()));
toClose.add(reader);
IndexCommit indexCommit = reader.getIndexCommit();
super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, snapshotStatus, writeShardGens,
super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, snapshotStatus, repositoryMetaVersion,
userMetadata, ActionListener.runBefore(listener, () -> IOUtils.close(toClose)));
} catch (IOException e) {
try {

View File

@ -103,7 +103,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing("-1");
final PlainActionFuture<String> future = PlainActionFuture.newFuture();
runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, Collections.emptyMap(), future));
snapshotRef.getIndexCommit(), indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future));
IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, future::actionGet);
assertEquals(
"Can't snapshot _source only on an index that has incomplete source ie. has _source disabled or filters the source",
@ -129,7 +129,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
SnapshotId snapshotId = new SnapshotId("test", "test");
final PlainActionFuture<String> future = PlainActionFuture.newFuture();
runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, Collections.emptyMap(), future));
snapshotRef.getIndexCommit(), indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future));
shardGeneration = future.actionGet();
IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount());
@ -145,7 +145,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration);
final PlainActionFuture<String> future = PlainActionFuture.newFuture();
runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, Collections.emptyMap(), future));
snapshotRef.getIndexCommit(), indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future));
shardGeneration = future.actionGet();
IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
// we processed the segments_N file plus _1.si, _1.fnm, _1.fdx, _1.fdt, _1.fdm
@ -161,7 +161,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration);
final PlainActionFuture<String> future = PlainActionFuture.newFuture();
runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId,
snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, Collections.emptyMap(), future));
snapshotRef.getIndexCommit(), indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future));
future.actionGet();
IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy();
// we processed the segments_N file plus _1_1.liv
@ -209,7 +209,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
final PlainActionFuture<String> future = PlainActionFuture.newFuture();
runAsSnapshot(shard.getThreadPool(), () -> {
repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(),
indexShardSnapshotStatus, true, Collections.emptyMap(), future);
indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future);
future.actionGet();
final PlainActionFuture<SnapshotInfo> finFuture = PlainActionFuture.newFuture();
repository.finalizeSnapshot(snapshotId,
@ -217,7 +217,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
indexShardSnapshotStatus.asCopy().getStartTime(), null, 1, Collections.emptyList(),
ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getGenId(), true,
MetaData.builder().put(shard.indexSettings().getIndexMetaData(), false).build(), Collections.emptyMap(),
true,
Version.CURRENT,
finFuture);
finFuture.actionGet();
});

View File

@ -27,6 +27,7 @@ import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
@ -336,7 +337,7 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
snapshot, true, false, SnapshotsInProgress.State.INIT,
Collections.singletonList(new IndexId("name", "id")), 0, 0,
ImmutableOpenMap.<ShardId, SnapshotsInProgress.ShardSnapshotStatus>builder().build(), Collections.emptyMap(),
randomBoolean()));
VersionUtils.randomVersion(random())));
ClusterState state = ClusterState.builder(new ClusterName("cluster"))
.putCustom(SnapshotsInProgress.TYPE, inProgress)
.build();