diff --git a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java index 0e1ebcfdd78..1bbca19ae0b 100644 --- a/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java +++ b/plugins/repository-s3/src/main/java/org/elasticsearch/repositories/s3/S3Repository.java @@ -21,6 +21,7 @@ package org.elasticsearch.repositories.s3; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.cluster.metadata.MetaData; @@ -270,21 +271,22 @@ class S3Repository extends BlobStoreRepository { @Override public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, - MetaData clusterMetaData, Map userMetadata, boolean writeShardGens, + MetaData clusterMetaData, Map userMetadata, Version repositoryMetaVersion, ActionListener listener) { - if (writeShardGens == false) { + if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) { listener = delayedListener(listener); } super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId, - includeGlobalState, clusterMetaData, userMetadata, writeShardGens, listener); + includeGlobalState, clusterMetaData, userMetadata, repositoryMetaVersion, listener); } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener listener) { - if (writeShardGens == false) { + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion, + ActionListener listener) { + if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) { listener = delayedListener(listener); } - super.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener); + super.deleteSnapshot(snapshotId, repositoryStateId, repositoryMetaVersion, listener); } /** diff --git a/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java b/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java index 910d902b0ed..06f7f192654 100644 --- a/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java +++ b/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java @@ -219,7 +219,7 @@ public class MultiVersionRepositoryAccessIT extends ESRestTestCase { ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards); } } else { - if (minimumNodeVersion().before(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) { + if (SnapshotsService.useShardGenerations(minimumNodeVersion()) == false) { assertThat(TEST_STEP, is(TestStep.STEP3_OLD_CLUSTER)); final List> expectedExceptions = Arrays.asList(ResponseException.class, ElasticsearchStatusException.class); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java index 3dc729575ae..9c45a5332cf 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/repositories/cleanup/TransportCleanupRepositoryAction.java @@ -219,8 +219,8 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> blobStoreRepository.cleanup( repositoryStateId, - newState.nodes().getMinNodeVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION) - && snapshotsService.hasOldVersionSnapshots(repositoryName, repositoryData, null) == false, + snapshotsService.minCompatibleVersion( + newState.nodes().getMinNodeVersion(), repositoryName, repositoryData, null), ActionListener.wrap(result -> after(null, result), e -> after(e, null))) )); } diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index ebb06e11971..46f092d3bc5 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -56,6 +56,9 @@ import static org.elasticsearch.snapshots.SnapshotInfo.METADATA_FIELD_INTRODUCED * Meta data about snapshots that are currently executing */ public class SnapshotsInProgress extends AbstractNamedDiffable implements Custom { + + private static final Version VERSION_IN_SNAPSHOT_VERSION = Version.V_7_7_0; + public static final String TYPE = "snapshots"; @Override @@ -93,13 +96,13 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement private final long startTime; private final long repositoryStateId; // see #useShardGenerations - private final boolean useShardGenerations; + private final Version version; @Nullable private final Map userMetadata; @Nullable private final String failure; public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, long startTime, long repositoryStateId, ImmutableOpenMap shards, - String failure, Map userMetadata, boolean useShardGenerations) { + String failure, Map userMetadata, Version version) { this.state = state; this.snapshot = snapshot; this.includeGlobalState = includeGlobalState; @@ -117,7 +120,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement this.repositoryStateId = repositoryStateId; this.failure = failure; this.userMetadata = userMetadata; - this.useShardGenerations = useShardGenerations; + this.version = version; } private static boolean assertShardsConsistent(State state, List indices, @@ -135,25 +138,25 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List indices, long startTime, long repositoryStateId, ImmutableOpenMap shards, - Map userMetadata, boolean useShardGenerations) { + Map userMetadata, Version version) { this(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, null, userMetadata, - useShardGenerations); + version); } public Entry(Entry entry, State state, List indices, long repositoryStateId, - ImmutableOpenMap shards, boolean useShardGenerations, String failure) { + ImmutableOpenMap shards, Version version, String failure) { this(entry.snapshot, entry.includeGlobalState, entry.partial, state, indices, entry.startTime, repositoryStateId, shards, - failure, entry.userMetadata, useShardGenerations); + failure, entry.userMetadata, version); } public Entry(Entry entry, State state, ImmutableOpenMap shards) { this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, - entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.useShardGenerations); + entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.version); } public Entry(Entry entry, State state, ImmutableOpenMap shards, String failure) { this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.startTime, - entry.repositoryStateId, shards, failure, entry.userMetadata, entry.useShardGenerations); + entry.repositoryStateId, shards, failure, entry.userMetadata, entry.version); } public Entry(Entry entry, ImmutableOpenMap shards) { @@ -211,13 +214,10 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement } /** - * Whether to write to the repository in a format only understood by versions newer than - * {@link SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION}. - * - * @return true if writing to repository in new format + * What version of metadata to use for the snapshot in the repository */ - public boolean useShardGenerations() { - return useShardGenerations; + public Version version() { + return version; } @Override @@ -235,7 +235,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement if (!snapshot.equals(entry.snapshot)) return false; if (state != entry.state) return false; if (repositoryStateId != entry.repositoryStateId) return false; - if (useShardGenerations != entry.useShardGenerations) return false; + if (version.equals(entry.version) == false) return false; return true; } @@ -250,7 +250,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement result = 31 * result + indices.hashCode(); result = 31 * result + Long.hashCode(startTime); result = 31 * result + Long.hashCode(repositoryStateId); - result = 31 * result + (useShardGenerations ? 1 : 0); + result = 31 * result + version.hashCode(); return result; } @@ -365,7 +365,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement public ShardSnapshotStatus(StreamInput in) throws IOException { nodeId = in.readOptionalString(); state = ShardState.fromValue(in.readByte()); - if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) { + if (SnapshotsService.useShardGenerations(in.getVersion())) { generation = in.readOptionalString(); } else { generation = null; @@ -392,7 +392,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement public void writeTo(StreamOutput out) throws IOException { out.writeOptionalString(nodeId); out.writeByte(state.value); - if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) { + if (SnapshotsService.useShardGenerations(out.getVersion())) { out.writeOptionalString(generation); } out.writeOptionalString(reason); @@ -547,11 +547,16 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement if (in.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) { userMetadata = in.readMap(); } - final boolean useShardGenerations; - if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) { - useShardGenerations = in.readBoolean(); + final Version version; + if (in.getVersion().onOrAfter(VERSION_IN_SNAPSHOT_VERSION)) { + version = Version.readVersion(in); + } else if (in.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) { + // If an older master informs us that shard generations are supported we use the minimum shard generation compatible + // version. If shard generations are not supported yet we use a placeholder for a version that does not use shard + // generations. + version = in.readBoolean() ? SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION : SnapshotsService.OLD_SNAPSHOT_FORMAT; } else { - useShardGenerations = false; + version = SnapshotsService.OLD_SNAPSHOT_FORMAT; } entries[i] = new Entry(snapshot, includeGlobalState, @@ -563,7 +568,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement builder.build(), failure, userMetadata, - useShardGenerations + version ); } this.entries = Arrays.asList(entries); @@ -599,8 +604,10 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement if (out.getVersion().onOrAfter(METADATA_FIELD_INTRODUCED)) { out.writeMap(entry.userMetadata); } - if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) { - out.writeBoolean(entry.useShardGenerations); + if (out.getVersion().onOrAfter(VERSION_IN_SNAPSHOT_VERSION)) { + Version.writeVersion(entry.version, out); + } else if (out.getVersion().onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION)) { + out.writeBoolean(SnapshotsService.useShardGenerations(entry.version)); } } } diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index e0cf414f86f..b9d30800162 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -19,6 +19,7 @@ package org.elasticsearch.repositories; import org.apache.lucene.index.IndexCommit; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -82,14 +83,15 @@ public class FilterRepository implements Repository { public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, MetaData metaData, Map userMetadata, - boolean writeShardGens, ActionListener listener) { + Version repositoryMetaVersion, ActionListener listener) { in.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId, - includeGlobalState, metaData, userMetadata, writeShardGens, listener); + includeGlobalState, metaData, userMetadata, repositoryMetaVersion, listener); } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener listener) { - in.deleteSnapshot(snapshotId, repositoryStateId, writeShardGens, listener); + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion, + ActionListener listener) { + in.deleteSnapshot(snapshotId, repositoryStateId, repositoryMetaVersion, listener); } @Override @@ -124,10 +126,10 @@ public class FilterRepository implements Repository { @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion, Map userMetadata, ActionListener listener) { in.snapshotShard( - store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, writeShardGens, userMetadata, listener); + store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, repositoryMetaVersion, userMetadata, listener); } @Override public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState, diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index bba82c24c10..83577fa63f2 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -19,6 +19,7 @@ package org.elasticsearch.repositories; import org.apache.lucene.index.IndexCommit; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.SnapshotsInProgress; @@ -127,33 +128,33 @@ public interface Repository extends LifecycleComponent { *

* This method is called on master after all shards are snapshotted. * - * @param snapshotId snapshot id - * @param shardGenerations updated shard generations - * @param startTime start time of the snapshot - * @param failure global failure reason or null - * @param totalShards total number of shards - * @param shardFailures list of shard failures - * @param repositoryStateId the unique id identifying the state of the repository when the snapshot began - * @param includeGlobalState include cluster global state - * @param clusterMetaData cluster metadata - * @param userMetadata user metadata - * @param writeShardGens if shard generations should be written to the repository - * @param listener listener to be called on completion of the snapshot + * @param snapshotId snapshot id + * @param shardGenerations updated shard generations + * @param startTime start time of the snapshot + * @param failure global failure reason or null + * @param totalShards total number of shards + * @param shardFailures list of shard failures + * @param repositoryStateId the unique id identifying the state of the repository when the snapshot began + * @param includeGlobalState include cluster global state + * @param clusterMetaData cluster metadata + * @param userMetadata user metadata + * @param repositoryMetaVersion version of the updated repository metadata to write + * @param listener listener to be called on completion of the snapshot */ void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, MetaData clusterMetaData, Map userMetadata, - boolean writeShardGens, ActionListener listener); + Version repositoryMetaVersion, ActionListener listener); /** * Deletes snapshot * - * @param snapshotId snapshot id - * @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began - * @param writeShardGens if shard generations should be written to the repository - * @param listener completion listener + * @param snapshotId snapshot id + * @param repositoryStateId the unique id identifying the state of the repository when the snapshot deletion began + * @param repositoryMetaVersion version of the updated repository metadata to write + * @param listener completion listener */ - void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener listener); + void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion, ActionListener listener); /** * Returns snapshot throttle time in nanoseconds @@ -206,17 +207,18 @@ public interface Repository extends LifecycleComponent { *

* As snapshot process progresses, implementation of this method should update {@link IndexShardSnapshotStatus} object and check * {@link IndexShardSnapshotStatus#isAborted()} to see if the snapshot process should be aborted. - * @param store store to be snapshotted - * @param mapperService the shards mapper service - * @param snapshotId snapshot id - * @param indexId id for the index being snapshotted - * @param snapshotIndexCommit commit point - * @param snapshotStatus snapshot status - * @param userMetadata user metadata of the snapshot found in {@link SnapshotsInProgress.Entry#userMetadata()} - * @param listener listener invoked on completion + * @param store store to be snapshotted + * @param mapperService the shards mapper service + * @param snapshotId snapshot id + * @param indexId id for the index being snapshotted + * @param snapshotIndexCommit commit point + * @param snapshotStatus snapshot status + * @param repositoryMetaVersion version of the updated repository metadata to write + * @param userMetadata user metadata of the snapshot found in {@link SnapshotsInProgress.Entry#userMetadata()} + * @param listener listener invoked on completion */ void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, - IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, Map userMetadata, + IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion, Map userMetadata, ActionListener listener); /** diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java index 618eb246a14..6e6b5eaf4f4 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java @@ -523,7 +523,7 @@ public final class RepositoryData { throw new ElasticsearchParseException("version string expected [min_version]"); } final Version version = Version.fromString(parser.text()); - assert version.onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION); + assert SnapshotsService.useShardGenerations(version); } else { throw new ElasticsearchParseException("unknown field name [" + field + "]"); } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index eef39aa0c75..5aca363ae71 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -489,8 +489,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } } - @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener listener) { + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion, + ActionListener listener) { if (isReadOnly()) { listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository")); } else { @@ -507,7 +507,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp // Cache the indices that were found before writing out the new index-N blob so that a stuck master will never // delete an index that was created by another master node after writing this index-N blob. final Map foundIndices = blobStore().blobContainer(indicesPath()).children(); - doDeleteShardSnapshots(snapshotId, repositoryStateId, foundIndices, rootBlobs, repositoryData, writeShardGens, listener); + doDeleteShardSnapshots(snapshotId, repositoryStateId, foundIndices, rootBlobs, repositoryData, + SnapshotsService.useShardGenerations(repositoryMetaVersion), listener); } catch (Exception ex) { listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex)); } @@ -763,11 +764,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp *

  • Deleting stale indices {@link #cleanupStaleIndices}
  • *
  • Deleting unreferenced root level blobs {@link #cleanupStaleRootFiles}
  • * - * @param repositoryStateId Current repository state id - * @param writeShardGens If shard generations should be written to the repository - * @param listener Listener to complete when done + * @param repositoryStateId Current repository state id + * @param repositoryMetaVersion version of the updated repository metadata to write + * @param listener Listener to complete when done */ - public void cleanup(long repositoryStateId, boolean writeShardGens, ActionListener listener) { + public void cleanup(long repositoryStateId, Version repositoryMetaVersion, ActionListener listener) { try { if (isReadOnly()) { throw new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository"); @@ -783,7 +784,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp listener.onResponse(new RepositoryCleanupResult(DeleteResult.ZERO)); } else { // write new index-N blob to ensure concurrent operations will fail - writeIndexGen(repositoryData, repositoryStateId, writeShardGens, + writeIndexGen(repositoryData, repositoryStateId, SnapshotsService.useShardGenerations(repositoryMetaVersion), ActionListener.wrap(v -> cleanupStaleBlobs(foundIndices, rootBlobs, repositoryData, ActionListener.map(listener, RepositoryCleanupResult::new)), listener::onFailure)); } @@ -882,7 +883,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp final boolean includeGlobalState, final MetaData clusterMetaData, final Map userMetadata, - boolean writeShardGens, + Version repositoryMetaVersion, final ActionListener listener) { final Collection indices = shardGenerations.indices(); @@ -890,6 +891,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp // directory if all nodes are at least at version SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION // If there are older version nodes in the cluster, we don't need to run this cleanup as it will have already happened // when writing the index-${N} to each shard directory. + final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion); final Consumer onUpdateFailure = e -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e)); final ActionListener allMetaListener = new GroupedActionListener<>( @@ -1467,7 +1469,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion, Map userMetadata, ActionListener listener) { final ShardId shardId = store.shardId(); final long startTime = threadPool.absoluteTimeInMillis(); @@ -1594,6 +1596,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } final List blobsToDelete; final String indexGeneration; + final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion); if (writeShardGens) { indexGeneration = UUIDs.randomBase64UUID(); blobsToDelete = Collections.emptyList(); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 888216b46f0..92aa371427c 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -24,6 +24,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionResponse; @@ -281,9 +282,9 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements final IndexShardSnapshotStatus snapshotStatus = shardEntry.getValue(); final IndexId indexId = indicesMap.get(shardId.getIndexName()); assert indexId != null; - assert entry.useShardGenerations() || snapshotStatus.generation() == null : + assert SnapshotsService.useShardGenerations(entry.version()) || snapshotStatus.generation() == null : "Found non-null shard generation [" + snapshotStatus.generation() + "] for snapshot with old-format compatibility"; - snapshot(shardId, snapshot, indexId, entry.userMetadata(), snapshotStatus, entry.useShardGenerations(), + snapshot(shardId, snapshot, indexId, entry.userMetadata(), snapshotStatus, entry.version(), new ActionListener() { @Override public void onResponse(String newGeneration) { @@ -316,7 +317,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements * @param snapshotStatus snapshot status */ private void snapshot(final ShardId shardId, final Snapshot snapshot, final IndexId indexId, final Map userMetadata, - final IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, ActionListener listener) { + final IndexShardSnapshotStatus snapshotStatus, Version version, ActionListener listener) { try { final IndexShard indexShard = indicesService.indexServiceSafe(shardId.getIndex()).getShardOrNull(shardId.id()); if (indexShard.routingEntry().primary() == false) { @@ -339,7 +340,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements // we flush first to make sure we get the latest writes snapshotted snapshotRef = indexShard.acquireLastIndexCommit(true); repository.snapshotShard(indexShard.store(), indexShard.mapperService(), snapshot.getSnapshotId(), indexId, - snapshotRef.getIndexCommit(), snapshotStatus, writeShardGens, userMetadata, + snapshotRef.getIndexCommit(), snapshotStatus, version, userMetadata, ActionListener.runBefore(listener, snapshotRef::close)); } catch (Exception e) { IOUtils.close(snapshotRef); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 48e1e33d34c..29397f9d1e8 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -130,6 +130,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus public static final Version SHARD_GEN_IN_REPO_DATA_VERSION = Version.V_7_6_0; + public static final Version OLD_SNAPSHOT_FORMAT = Version.V_7_5_0; + private static final Logger logger = LogManager.getLogger(SnapshotsService.class); private final ClusterService clusterService; @@ -316,7 +318,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus threadPool.absoluteTimeInMillis(), RepositoryData.UNKNOWN_REPO_GEN, null, - userMeta, false + userMeta, Version.CURRENT ); initializingSnapshots.add(newSnapshot.snapshot()); snapshots = new SnapshotsInProgress(newSnapshot); @@ -362,34 +364,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus }); } - public boolean hasOldVersionSnapshots(String repositoryName, RepositoryData repositoryData, @Nullable SnapshotId excluded) { - final Collection snapshotIds = repositoryData.getSnapshotIds(); - final boolean hasOldFormatSnapshots; - if (snapshotIds.isEmpty()) { - hasOldFormatSnapshots = false; - } else { - if (repositoryData.shardGenerations().totalShards() > 0) { - hasOldFormatSnapshots = false; - } else { - try { - final Repository repository = repositoriesService.repository(repositoryName); - hasOldFormatSnapshots = snapshotIds.stream().filter(snapshotId -> snapshotId.equals(excluded) == false).anyMatch( - snapshotId -> { - final Version known = repositoryData.getVersion(snapshotId); - return (known == null ? repository.getSnapshotInfo(snapshotId).version() : known) - .before(SHARD_GEN_IN_REPO_DATA_VERSION); - }); - } catch (SnapshotMissingException e) { - logger.warn("Failed to load snapshot metadata, assuming repository is in old format", e); - return true; - } - } - } - assert hasOldFormatSnapshots == false || repositoryData.shardGenerations().totalShards() == 0 : - "Found non-empty shard generations [" + repositoryData.shardGenerations() + "] but repository contained old version snapshots"; - return hasOldFormatSnapshots; - } - /** * Validates snapshot request * @@ -480,15 +454,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus snapshotCreated = true; logger.info("snapshot [{}] started", snapshot.snapshot()); - final boolean hasOldFormatSnapshots = - hasOldVersionSnapshots(snapshot.snapshot().getRepository(), repositoryData, null); - final boolean writeShardGenerations = hasOldFormatSnapshots == false && - clusterService.state().nodes().getMinNodeVersion().onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION); + final Version version = + minCompatibleVersion(clusterState.nodes().getMinNodeVersion(), snapshot.repository(), repositoryData, null); if (indices.isEmpty()) { // No indices in this snapshot - we are done userCreateSnapshotListener.onResponse(snapshot.snapshot()); endSnapshot(new SnapshotsInProgress.Entry( - snapshot, State.STARTED, Collections.emptyList(), repositoryData.getGenId(), null, writeShardGenerations, + snapshot, State.STARTED, Collections.emptyList(), repositoryData.getGenId(), null, version, null), clusterState.metaData()); return; } @@ -512,7 +484,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus final List indexIds = repositoryData.resolveNewIndices(indices); // Replace the snapshot that was just initialized ImmutableOpenMap shards = - shards(currentState, indexIds, writeShardGenerations, repositoryData); + shards(currentState, indexIds, useShardGenerations(version), repositoryData); if (!partial) { Tuple, Set> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData()); @@ -532,12 +504,12 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus failureMessage.append(closed); } entries.add(new SnapshotsInProgress.Entry(entry, State.FAILED, indexIds, - repositoryData.getGenId(), shards, writeShardGenerations, failureMessage.toString())); + repositoryData.getGenId(), shards, version, failureMessage.toString())); continue; } } entries.add(new SnapshotsInProgress.Entry(entry, State.STARTED, indexIds, repositoryData.getGenId(), - shards, writeShardGenerations, null)); + shards, version, null)); } } return ClusterState.builder(currentState) @@ -637,7 +609,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus snapshot.includeGlobalState(), metaDataForSnapshot(snapshot, metaData), snapshot.userMetadata(), - snapshot.useShardGenerations(), + snapshot.version(), ActionListener.runAfter(ActionListener.wrap(ignored -> { }, inner -> { inner.addSuppressed(exception); @@ -853,8 +825,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) { assert deletionsInProgress.getEntries().size() == 1 : "only one in-progress deletion allowed per cluster"; SnapshotDeletionsInProgress.Entry entry = deletionsInProgress.getEntries().get(0); - deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.repositoryStateId(), - state.nodes().getMinNodeVersion()); + deleteSnapshotFromRepository(entry.getSnapshot(), null, entry.repositoryStateId(), state.nodes().getMinNodeVersion()); } } @@ -1118,7 +1089,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus entry.includeGlobalState(), metaDataForSnapshot(entry, metaData), entry.userMetadata(), - entry.useShardGenerations(), + entry.version(), ActionListener.wrap(snapshotInfo -> { removeSnapshotFromClusterState(snapshot, snapshotInfo, null); logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state()); @@ -1422,6 +1393,60 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus }); } + /** + * Determines the minimum {@link Version} that the snapshot repository must be compatible with from the current nodes in the cluster + * and the contents of the repository. The minimum version is determined as the lowest version found across all snapshots in the + * repository and all nodes in the cluster. + * + * @param minNodeVersion minimum node version in the cluster + * @param repositoryName name of the repository to modify + * @param repositoryData current {@link RepositoryData} of that repository + * @param excluded snapshot id to ignore when computing the minimum version + * (used to use newer metadata version after a snapshot delete) + * @return minimum node version that must still be able to read the repository metadata + */ + public Version minCompatibleVersion(Version minNodeVersion, String repositoryName, RepositoryData repositoryData, + @Nullable SnapshotId excluded) { + Version minCompatVersion = minNodeVersion; + final Collection snapshotIds = repositoryData.getSnapshotIds(); + final Repository repository = repositoriesService.repository(repositoryName); + for (SnapshotId snapshotId : + snapshotIds.stream().filter(snapshotId -> snapshotId.equals(excluded) == false).collect(Collectors.toList())) { + final Version known = repositoryData.getVersion(snapshotId); + // If we don't have the version cached in the repository data yet we load it from the snapshot info blobs + if (known == null) { + assert repositoryData.shardGenerations().totalShards() == 0 : + "Saw shard generations [" + repositoryData.shardGenerations() + + "] but did not have versions tracked for snapshot [" + snapshotId + "]"; + try { + final Version foundVersion = repository.getSnapshotInfo(snapshotId).version(); + if (useShardGenerations(foundVersion) == false) { + // We don't really care about the exact version if its before 7.6 as the 7.5 metadata is the oldest we are able + // to write out so we stop iterating here and just use 7.5.0 as a placeholder. + return OLD_SNAPSHOT_FORMAT; + } + minCompatVersion = minCompatVersion.before(foundVersion) ? minCompatVersion : foundVersion; + } catch (SnapshotMissingException e) { + logger.warn("Failed to load snapshot metadata, assuming repository is in old format", e); + return OLD_SNAPSHOT_FORMAT; + } + } else { + minCompatVersion = minCompatVersion.before(known) ? minCompatVersion : known; + } + } + return minCompatVersion; + } + + /** + * Checks whether the metadata version supports writing {@link ShardGenerations} to the repository. + * + * @param repositoryMetaVersion version to check + * @return true if version supports {@link ShardGenerations} + */ + public static boolean useShardGenerations(Version repositoryMetaVersion) { + return repositoryMetaVersion.onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION); + } + /** * Checks if a repository is currently in use by one of the snapshots * @@ -1463,16 +1488,15 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus * @param snapshot snapshot * @param listener listener * @param repositoryStateId the unique id representing the state of the repository at the time the deletion began - * @param version minimum ES version the repository should be readable by + * @param minNodeVersion minimum node version in the cluster */ private void deleteSnapshotFromRepository(Snapshot snapshot, @Nullable ActionListener listener, long repositoryStateId, - Version version) { + Version minNodeVersion) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(listener, l -> { Repository repository = repositoriesService.repository(snapshot.getRepository()); repository.getRepositoryData(ActionListener.wrap(repositoryData -> repository.deleteSnapshot(snapshot.getSnapshotId(), repositoryStateId, - version.onOrAfter(SHARD_GEN_IN_REPO_DATA_VERSION) && - hasOldVersionSnapshots(snapshot.getRepository(), repositoryData, snapshot.getSnapshotId()) == false, + minCompatibleVersion(minNodeVersion, snapshot.getRepository(), repositoryData, snapshot.getSnapshotId()), ActionListener.wrap(v -> { logger.info("snapshot [{}] deleted", snapshot); removeSnapshotDeletionFromClusterState(snapshot, null, l); diff --git a/server/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java b/server/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java index cc43c5bf1e3..a22c0523677 100644 --- a/server/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java @@ -722,7 +722,7 @@ public class ClusterStateDiffIT extends ESIntegTestCase { (long) randomIntBetween(0, 1000), ImmutableOpenMap.of(), SnapshotInfoTests.randomUserMetadata(), - randomBoolean())); + randomVersion(random()))); case 1: return new RestoreInProgress.Builder().add( new RestoreInProgress.Entry( diff --git a/server/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java b/server/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java index 1ddafdde031..0bab6efb0cd 100644 --- a/server/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java @@ -30,6 +30,7 @@ import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfoTests; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import java.util.Arrays; import java.util.List; @@ -66,7 +67,8 @@ public class SnapshotsInProgressTests extends ESTestCase { // test no waiting shards in an index shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "", "1")); Entry entry = new Entry(snapshot, randomBoolean(), randomBoolean(), State.INIT, - indices, System.currentTimeMillis(), randomLong(), shards.build(), SnapshotInfoTests.randomUserMetadata(), randomBoolean()); + indices, System.currentTimeMillis(), randomLong(), shards.build(), SnapshotInfoTests.randomUserMetadata(), + VersionUtils.randomVersion(random())); ImmutableOpenMap> waitingIndices = entry.waitingIndices(); assertEquals(2, waitingIndices.get(idx1Name).size()); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java index b77653c34c7..81340eab594 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataDeleteIndexServiceTests.java @@ -62,7 +62,7 @@ public class MetaDataDeleteIndexServiceTests extends ESTestCase { SnapshotsInProgress snaps = new SnapshotsInProgress(new SnapshotsInProgress.Entry(snapshot, true, false, SnapshotsInProgress.State.INIT, singletonList(new IndexId(index, "doesn't matter")), System.currentTimeMillis(), (long) randomIntBetween(0, 1000), ImmutableOpenMap.of(), - SnapshotInfoTests.randomUserMetadata(), randomBoolean())); + SnapshotInfoTests.randomUserMetadata(), VersionUtils.randomVersion(random()))); ClusterState state = ClusterState.builder(clusterState(index)) .putCustom(SnapshotsInProgress.TYPE, snaps) .build(); diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java index 7eb58d80fec..a6a265c660a 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java @@ -52,6 +52,7 @@ import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInProgressException; import org.elasticsearch.snapshots.SnapshotInfoTests; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import java.util.Arrays; import java.util.Collection; @@ -472,7 +473,7 @@ public class MetaDataIndexStateServiceTests extends ESTestCase { final SnapshotsInProgress.Entry entry = new SnapshotsInProgress.Entry(snapshot, randomBoolean(), false, SnapshotsInProgress.State.INIT, Collections.singletonList(new IndexId(index, index)), randomNonNegativeLong(), randomLong(), shardsBuilder.build(), - SnapshotInfoTests.randomUserMetadata(), randomBoolean()); + SnapshotInfoTests.randomUserMetadata(), VersionUtils.randomVersion(random())); return ClusterState.builder(newState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(entry)).build(); } diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index 4000b857ab3..35d9b10862d 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.repositories; import org.apache.lucene.index.IndexCommit; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryRequest; import org.elasticsearch.cluster.ClusterState; @@ -163,12 +164,13 @@ public class RepositoriesServiceTests extends ESTestCase { public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations indices, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, MetaData metaData, Map userMetadata, - boolean writeShardGens, ActionListener listener) { + Version repositoryMetaVersion, ActionListener listener) { listener.onResponse(null); } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener listener) { + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion, + ActionListener listener) { listener.onResponse(null); } @@ -203,8 +205,8 @@ public class RepositoriesServiceTests extends ESTestCase { } @Override - public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit - snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, + public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion, Map userMetadata, ActionListener listener) { } diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index 40e17a81be4..d5b765281a2 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.repositories.blobstore; import org.apache.lucene.store.Directory; import org.apache.lucene.util.TestUtil; +import org.elasticsearch.Version; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -173,7 +174,7 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase { repository.finalizeSnapshot(snapshot.getSnapshotId(), ShardGenerations.builder().put(indexId, 0, shardGen).build(), 0L, null, 1, Collections.emptyList(), -1L, false, - MetaData.builder().put(shard.indexSettings().getIndexMetaData(), false).build(), Collections.emptyMap(), true, + MetaData.builder().put(shard.indexSettings().getIndexMetaData(), false).build(), Collections.emptyMap(), Version.CURRENT, future); future.actionGet(); IndexShardSnapshotFailedException isfe = expectThrows(IndexShardSnapshotFailedException.class, diff --git a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java index 42d3ab7f4ef..49ca7091180 100644 --- a/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/fs/FsRepositoryTests.java @@ -105,7 +105,7 @@ public class FsRepositoryTests extends ESTestCase { final PlainActionFuture future1 = PlainActionFuture.newFuture(); runGeneric(threadPool, () -> { IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(null); - repository.snapshotShard(store, null, snapshotId, indexId, indexCommit, snapshotStatus, true, + repository.snapshotShard(store, null, snapshotId, indexId, indexCommit, snapshotStatus, Version.CURRENT, Collections.emptyMap(), future1); future1.actionGet(); IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy(); @@ -134,7 +134,7 @@ public class FsRepositoryTests extends ESTestCase { final PlainActionFuture future2 = PlainActionFuture.newFuture(); runGeneric(threadPool, () -> { IndexShardSnapshotStatus snapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration); - repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus, true, + repository.snapshotShard(store, null, incSnapshotId, indexId, incIndexCommit, snapshotStatus, Version.CURRENT, Collections.emptyMap(), future2); future2.actionGet(); IndexShardSnapshotStatus.Copy copy = snapshotStatus.asCopy(); diff --git a/server/src/test/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java b/server/src/test/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java index 56866029464..abd9adbb430 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java @@ -287,16 +287,18 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas final SnapshotsService snapshotsService = internalCluster().getCurrentMasterNodeInstance(SnapshotsService.class); final ThreadPool threadPool = internalCluster().getCurrentMasterNodeInstance(ThreadPool.class); assertThat(PlainActionFuture.get(f -> threadPool.generic().execute( - ActionRunnable.supply(f, () -> snapshotsService.hasOldVersionSnapshots(repoName, getRepositoryData(repository), null)))), - is(true)); + ActionRunnable.supply(f, () -> + snapshotsService.minCompatibleVersion(Version.CURRENT, repoName, getRepositoryData(repository), null)))), + is(SnapshotsService.OLD_SNAPSHOT_FORMAT)); logger.info("--> verify that snapshot with missing root level metadata can be deleted"); assertAcked(client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotToCorrupt.getName()).get()); logger.info("--> verify that repository is assumed in new metadata format after removing corrupted snapshot"); assertThat(PlainActionFuture.get(f -> threadPool.generic().execute( - ActionRunnable.supply(f, () -> snapshotsService.hasOldVersionSnapshots(repoName, getRepositoryData(repository), null)))), - is(false)); + ActionRunnable.supply(f, () -> + snapshotsService.minCompatibleVersion(Version.CURRENT, repoName, getRepositoryData(repository), null)))), + is(Version.CURRENT)); final RepositoryData finalRepositoryData = getRepositoryData(repository); for (SnapshotId snapshotId : finalRepositoryData.getSnapshotIds()) { assertThat(finalRepositoryData.getVersion(snapshotId), is(Version.CURRENT)); diff --git a/server/src/test/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java b/server/src/test/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java index 05d0bf4516d..1f09afdc769 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/RepositoryFilterUserMetadataIT.java @@ -19,6 +19,7 @@ package org.elasticsearch.snapshots; import org.apache.lucene.index.IndexCommit; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.service.ClusterService; @@ -86,19 +87,20 @@ public class RepositoryFilterUserMetadataIT extends ESIntegTestCase { public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, MetaData clusterMetaData, Map userMetadata, - boolean writeShardGens, ActionListener listener) { + Version repositoryMetaVersion, ActionListener listener) { assertThat(userMetadata, is(Collections.singletonMap(MOCK_FILTERED_META, initialMetaValue))); super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, - repositoryStateId, includeGlobalState, clusterMetaData, userMetadata, writeShardGens, listener); + repositoryStateId, includeGlobalState, clusterMetaData, userMetadata, repositoryMetaVersion, listener); } @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, - boolean writeShardGens, Map userMetadata, ActionListener listener) { + Version repositoryMetaVersion, Map userMetadata, + ActionListener listener) { assertThat(userMetadata, is(Collections.singletonMap(MOCK_FILTERED_META, initialMetaValue))); super.snapshotShard(store, mapperService, snapshotId, indexId, snapshotIndexCommit, snapshotStatus, - writeShardGens, userMetadata, listener); + repositoryMetaVersion, userMetadata, listener); } @Override diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java index 6059b440734..01b81cebb98 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java @@ -33,6 +33,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.test.AbstractDiffableWireSerializationTestCase; +import org.elasticsearch.test.VersionUtils; import java.util.ArrayList; import java.util.List; @@ -77,7 +78,7 @@ public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireS } ImmutableOpenMap shards = builder.build(); return new Entry(snapshot, includeGlobalState, partial, state, indices, startTime, repositoryStateId, shards, - SnapshotInfoTests.randomUserMetadata(), randomBoolean()); + SnapshotInfoTests.randomUserMetadata(), VersionUtils.randomVersion(random())); } @Override diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java index 7443eaded77..6dc15a248bf 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockEventuallyConsistentRepositoryTests.java @@ -18,6 +18,7 @@ */ package org.elasticsearch.snapshots.mockstore; +import org.elasticsearch.Version; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.RepositoryMetaData; @@ -149,7 +150,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase { final SnapshotId snapshotId = new SnapshotId("foo", UUIDs.randomBase64UUID()); // We try to write another snap- blob for "foo" in the next generation. It fails because the content differs. repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(), - -1L, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), true, future); + -1L, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), Version.CURRENT, future); future.actionGet(); // We try to write another snap- blob for "foo" in the next generation. It fails because the content differs. @@ -158,7 +159,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase { final PlainActionFuture fut = PlainActionFuture.newFuture(); repository.finalizeSnapshot( snapshotId, ShardGenerations.EMPTY, 1L, null, 6, Collections.emptyList(), - 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), true, fut); + 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(), Version.CURRENT, fut); fut.actionGet(); }); assertThat(assertionError.getMessage(), equalTo("\nExpected: <6>\n but: was <5>")); @@ -167,7 +168,7 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase { // It passes cleanly because the content of the blob except for the timestamps. final PlainActionFuture future2 = PlainActionFuture.newFuture(); repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(), - 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(),true, future2); + 0, false, MetaData.EMPTY_META_DATA, Collections.emptyMap(),Version.CURRENT, future2); future2.actionGet(); } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java index 622afae1dea..e0dab6f1600 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/IndexShardTestCase.java @@ -839,7 +839,7 @@ public abstract class IndexShardTestCase extends ESTestCase { final String shardGen; try (Engine.IndexCommitRef indexCommitRef = shard.acquireLastIndexCommit(true)) { repository.snapshotShard(shard.store(), shard.mapperService(), snapshot.getSnapshotId(), indexId, - indexCommitRef.getIndexCommit(), snapshotStatus, true, Collections.emptyMap(), future); + indexCommitRef.getIndexCommit(), snapshotStatus, Version.CURRENT, Collections.emptyMap(), future); shardGen = future.actionGet(); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index ce0704f260e..67362f8c272 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.shard; import org.apache.lucene.index.IndexCommit; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -99,13 +100,14 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i @Override public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, - boolean includeGlobalState, MetaData metaData, Map userMetadata, boolean writeShardGens, - ActionListener listener) { + boolean includeGlobalState, MetaData metaData, Map userMetadata, + Version repositoryMetaVersion, ActionListener listener) { listener.onResponse(null); } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener listener) { + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion, + ActionListener listener) { listener.onResponse(null); } @@ -135,7 +137,7 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion, Map userMetadata, ActionListener listener) { } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index edc8eb93b82..7c5d2d0f27c 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -260,13 +260,14 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit @Override public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, - MetaData metaData, Map userMetadata, boolean writeShardGens, + MetaData metaData, Map userMetadata, Version repositoryMetaVersion, ActionListener listener) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } @Override - public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, boolean writeShardGens, ActionListener listener) { + public void deleteSnapshot(SnapshotId snapshotId, long repositoryStateId, Version repositoryMetaVersion, + ActionListener listener) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } @@ -301,7 +302,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion, Map userMetadata, ActionListener listener) { throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE); } diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java index 1318877a5ae..eac4ba9c88b 100644 --- a/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/snapshots/SourceOnlySnapshotRepository.java @@ -15,6 +15,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.FSDirectory; import org.apache.lucene.store.FilterDirectory; import org.apache.lucene.store.SimpleFSDirectory; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; @@ -95,13 +96,14 @@ public final class SourceOnlySnapshotRepository extends FilterRepository { public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards, List shardFailures, long repositoryStateId, boolean includeGlobalState, MetaData metaData, Map userMetadata, - boolean writeShardGens, ActionListener listener) { + Version repositoryMetaVersion, ActionListener listener) { // we process the index metadata at snapshot time. This means if somebody tries to restore // a _source only snapshot with a plain repository it will be just fine since we already set the // required engine, that the index is read-only and the mapping to a default mapping try { super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId, - includeGlobalState, metadataToSnapshot(shardGenerations.indices(), metaData), userMetadata, writeShardGens, listener); + includeGlobalState, metadataToSnapshot(shardGenerations.indices(), metaData), userMetadata, repositoryMetaVersion, + listener); } catch (IOException ex) { listener.onFailure(ex); } @@ -136,7 +138,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository { @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, - IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, boolean writeShardGens, + IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus, Version repositoryMetaVersion, Map userMetadata, ActionListener listener) { if (mapperService.documentMapper() != null // if there is no mapping this is null && mapperService.documentMapper().sourceMapper().isComplete() == false) { @@ -176,7 +178,7 @@ public final class SourceOnlySnapshotRepository extends FilterRepository { Collections.singletonMap(BlockTreeTermsReader.FST_MODE_KEY, BlockTreeTermsReader.FSTLoadMode.OFF_HEAP.name())); toClose.add(reader); IndexCommit indexCommit = reader.getIndexCommit(); - super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, snapshotStatus, writeShardGens, + super.snapshotShard(tempStore, mapperService, snapshotId, indexId, indexCommit, snapshotStatus, repositoryMetaVersion, userMetadata, ActionListener.runBefore(listener, () -> IOUtils.close(toClose))); } catch (IOException e) { try { diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index 488b3f262b4..1f8bdd840de 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -103,7 +103,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing("-1"); final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, - snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, Collections.emptyMap(), future)); + snapshotRef.getIndexCommit(), indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future)); IllegalStateException illegalStateException = expectThrows(IllegalStateException.class, future::actionGet); assertEquals( "Can't snapshot _source only on an index that has incomplete source ie. has _source disabled or filters the source", @@ -129,7 +129,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { SnapshotId snapshotId = new SnapshotId("test", "test"); final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, - snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, Collections.emptyMap(), future)); + snapshotRef.getIndexCommit(), indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future)); shardGeneration = future.actionGet(); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); assertEquals(copy.getTotalFileCount(), copy.getIncrementalFileCount()); @@ -145,7 +145,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration); final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, - snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, Collections.emptyMap(), future)); + snapshotRef.getIndexCommit(), indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future)); shardGeneration = future.actionGet(); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); // we processed the segments_N file plus _1.si, _1.fnm, _1.fdx, _1.fdt, _1.fdm @@ -161,7 +161,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { IndexShardSnapshotStatus indexShardSnapshotStatus = IndexShardSnapshotStatus.newInitializing(shardGeneration); final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, - snapshotRef.getIndexCommit(), indexShardSnapshotStatus, true, Collections.emptyMap(), future)); + snapshotRef.getIndexCommit(), indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future)); future.actionGet(); IndexShardSnapshotStatus.Copy copy = indexShardSnapshotStatus.asCopy(); // we processed the segments_N file plus _1_1.liv @@ -209,7 +209,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { final PlainActionFuture future = PlainActionFuture.newFuture(); runAsSnapshot(shard.getThreadPool(), () -> { repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(), - indexShardSnapshotStatus, true, Collections.emptyMap(), future); + indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future); future.actionGet(); final PlainActionFuture finFuture = PlainActionFuture.newFuture(); repository.finalizeSnapshot(snapshotId, @@ -217,7 +217,7 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { indexShardSnapshotStatus.asCopy().getStartTime(), null, 1, Collections.emptyList(), ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getGenId(), true, MetaData.builder().put(shard.indexSettings().getIndexMetaData(), false).build(), Collections.emptyMap(), - true, + Version.CURRENT, finFuture); finFuture.actionGet(); }); diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java index 91887f3146a..4669a6e9cce 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SnapshotRetentionTaskTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.VersionUtils; import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; @@ -336,7 +337,7 @@ public class SnapshotRetentionTaskTests extends ESTestCase { snapshot, true, false, SnapshotsInProgress.State.INIT, Collections.singletonList(new IndexId("name", "id")), 0, 0, ImmutableOpenMap.builder().build(), Collections.emptyMap(), - randomBoolean())); + VersionUtils.randomVersion(random()))); ClusterState state = ClusterState.builder(new ClusterName("cluster")) .putCustom(SnapshotsInProgress.TYPE, inProgress) .build();