mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-22 21:05:23 +00:00
Many of the parameters we pass into this method were only used to build the `SnapshotInfo` instance to write. This change simplifies the signature. Also, it seems less error prone to build `SnapshotInfo` in `SnapshotsService` isntead of relying on the fact that each repository implementation will build the correct `SnapshotInfo`.
This commit is contained in:
parent
16a47e0d08
commit
e1014038e9
@ -34,7 +34,6 @@ import org.elasticsearch.common.blobstore.BlobStore;
|
||||
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||
import org.elasticsearch.common.settings.SecureSetting;
|
||||
import org.elasticsearch.common.settings.SecureString;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
@ -48,14 +47,11 @@ import org.elasticsearch.repositories.ShardGenerations;
|
||||
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
import org.elasticsearch.snapshots.SnapshotShardFailure;
|
||||
import org.elasticsearch.snapshots.SnapshotsService;
|
||||
import org.elasticsearch.threadpool.Scheduler;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Function;
|
||||
@ -276,16 +272,15 @@ class S3Repository extends BlobStoreRepository {
|
||||
private final AtomicReference<Scheduler.Cancellable> finalizationFuture = new AtomicReference<>();
|
||||
|
||||
@Override
|
||||
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards,
|
||||
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
|
||||
Metadata clusterMetadata, Map<String, Object> userMetadata, Version repositoryMetaVersion,
|
||||
public void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId, Metadata clusterMetadata,
|
||||
SnapshotInfo snapshotInfo, Version repositoryMetaVersion,
|
||||
Function<ClusterState, ClusterState> stateTransformer,
|
||||
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
|
||||
ActionListener<RepositoryData> listener) {
|
||||
if (SnapshotsService.useShardGenerations(repositoryMetaVersion) == false) {
|
||||
listener = delayedListener(listener);
|
||||
}
|
||||
super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
|
||||
includeGlobalState, clusterMetadata, userMetadata, repositoryMetaVersion, stateTransformer, listener);
|
||||
super.finalizeSnapshot(shardGenerations, repositoryStateId, clusterMetadata, snapshotInfo, repositoryMetaVersion,
|
||||
stateTransformer, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -24,7 +24,6 @@ import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.Metadata;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||
import org.elasticsearch.env.Environment;
|
||||
@ -43,7 +42,6 @@ import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
@ -89,15 +87,12 @@ public class RepositoryFilterUserMetadataIT extends ESIntegTestCase {
|
||||
private final String initialMetaValue = metadata.settings().get(MASTER_SETTING_VALUE);
|
||||
|
||||
@Override
|
||||
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
|
||||
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
|
||||
boolean includeGlobalState, Metadata clusterMetadata, Map<String, Object> userMetadata,
|
||||
Version repositoryMetaVersion, Function<ClusterState, ClusterState> stateTransformer,
|
||||
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
|
||||
assertThat(userMetadata, is(Collections.singletonMap(MOCK_FILTERED_META, initialMetaValue)));
|
||||
super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures,
|
||||
repositoryStateId, includeGlobalState, clusterMetadata, userMetadata, repositoryMetaVersion, stateTransformer,
|
||||
listener);
|
||||
public void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId,
|
||||
Metadata clusterMetadata, SnapshotInfo snapshotInfo, Version repositoryMetaVersion,
|
||||
Function<ClusterState, ClusterState> stateTransformer,
|
||||
ActionListener<RepositoryData> listener) {
|
||||
super.finalizeSnapshot(shardGenerations, repositoryStateId, clusterMetadata, snapshotInfo,
|
||||
repositoryMetaVersion, stateTransformer, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -27,7 +27,6 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.Metadata;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.component.LifecycleListener;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
@ -37,7 +36,6 @@ import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
import org.elasticsearch.snapshots.SnapshotShardFailure;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
@ -85,13 +83,11 @@ public class FilterRepository implements Repository {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
|
||||
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
|
||||
boolean includeGlobalState, Metadata metadata, Map<String, Object> userMetadata,
|
||||
Version repositoryMetaVersion, Function<ClusterState, ClusterState> stateTransformer,
|
||||
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
|
||||
in.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
|
||||
includeGlobalState, metadata, userMetadata, repositoryMetaVersion, stateTransformer, listener);
|
||||
public void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId, Metadata clusterMetadata,
|
||||
SnapshotInfo snapshotInfo, Version repositoryMetaVersion,
|
||||
Function<ClusterState, ClusterState> stateTransformer, ActionListener<RepositoryData> listener) {
|
||||
in.finalizeSnapshot(shardGenerations, repositoryStateId, clusterMetadata, snapshotInfo, repositoryMetaVersion, stateTransformer,
|
||||
listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -29,7 +29,6 @@ import org.elasticsearch.cluster.metadata.Metadata;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
@ -38,7 +37,6 @@ import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
import org.elasticsearch.snapshots.SnapshotShardFailure;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
@ -134,27 +132,18 @@ public interface Repository extends LifecycleComponent {
|
||||
* <p>
|
||||
* This method is called on master after all shards are snapshotted.
|
||||
*
|
||||
* @param snapshotId snapshot id
|
||||
* @param shardGenerations updated shard generations
|
||||
* @param startTime start time of the snapshot
|
||||
* @param failure global failure reason or null
|
||||
* @param totalShards total number of shards
|
||||
* @param shardFailures list of shard failures
|
||||
* @param repositoryStateId the unique id identifying the state of the repository when the snapshot began
|
||||
* @param includeGlobalState include cluster global state
|
||||
* @param clusterMetadata cluster metadata
|
||||
* @param userMetadata user metadata
|
||||
* @param snapshotInfo SnapshotInfo instance to write for this snapshot
|
||||
* @param repositoryMetaVersion version of the updated repository metadata to write
|
||||
* @param stateTransformer a function that filters the last cluster state update that the snapshot finalization will execute and
|
||||
* is used to remove any state tracked for the in-progress snapshot from the cluster state
|
||||
* @param listener listener to be invoked with the new {@link RepositoryData} and the snapshot's {@link SnapshotInfo}
|
||||
* completion of the snapshot
|
||||
* @param listener listener to be invoked with the new {@link RepositoryData} after completing the snapshot
|
||||
*/
|
||||
void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
|
||||
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
|
||||
boolean includeGlobalState, Metadata clusterMetadata, Map<String, Object> userMetadata,
|
||||
Version repositoryMetaVersion, Function<ClusterState, ClusterState> stateTransformer,
|
||||
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener);
|
||||
void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId, Metadata clusterMetadata,
|
||||
SnapshotInfo snapshotInfo, Version repositoryMetaVersion, Function<ClusterState, ClusterState> stateTransformer,
|
||||
ActionListener<RepositoryData> listener);
|
||||
|
||||
/**
|
||||
* Deletes snapshots
|
||||
|
@ -116,7 +116,6 @@ import org.elasticsearch.snapshots.SnapshotException;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
import org.elasticsearch.snapshots.SnapshotMissingException;
|
||||
import org.elasticsearch.snapshots.SnapshotShardFailure;
|
||||
import org.elasticsearch.snapshots.SnapshotsService;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
@ -989,22 +988,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finalizeSnapshot(final SnapshotId snapshotId,
|
||||
final ShardGenerations shardGenerations,
|
||||
final long startTime,
|
||||
final String failure,
|
||||
final int totalShards,
|
||||
final List<SnapshotShardFailure> shardFailures,
|
||||
public void finalizeSnapshot(final ShardGenerations shardGenerations,
|
||||
final long repositoryStateId,
|
||||
final boolean includeGlobalState,
|
||||
final Metadata clusterMetadata,
|
||||
final Map<String, Object> userMetadata,
|
||||
SnapshotInfo snapshotInfo,
|
||||
Version repositoryMetaVersion,
|
||||
Function<ClusterState, ClusterState> stateTransformer,
|
||||
final ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
|
||||
final ActionListener<RepositoryData> listener) {
|
||||
assert repositoryStateId > RepositoryData.UNKNOWN_REPO_GEN :
|
||||
"Must finalize based on a valid repository generation but received [" + repositoryStateId + "]";
|
||||
final Collection<IndexId> indices = shardGenerations.indices();
|
||||
final SnapshotId snapshotId = snapshotInfo.snapshotId();
|
||||
// Once we are done writing the updated index-N blob we remove the now unreferenced index-${uuid} blobs in each shard
|
||||
// directory if all nodes are at least at version SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION
|
||||
// If there are older version nodes in the cluster, we don't need to run this cleanup as it will have already happened
|
||||
@ -1031,10 +1025,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||
indexMetaIdentifiers = null;
|
||||
}
|
||||
|
||||
final ActionListener<SnapshotInfo> allMetaListener = new GroupedActionListener<>(
|
||||
ActionListener.wrap(snapshotInfos -> {
|
||||
assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos;
|
||||
final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next();
|
||||
final ActionListener<Void> allMetaListener = new GroupedActionListener<>(
|
||||
ActionListener.wrap(v -> {
|
||||
final RepositoryData updatedRepositoryData = existingRepositoryData.addSnapshot(
|
||||
snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations, indexMetas, indexMetaIdentifiers);
|
||||
writeIndexGen(updatedRepositoryData, repositoryStateId, repositoryMetaVersion, stateTransformer,
|
||||
@ -1043,7 +1035,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||
if (writeShardGens) {
|
||||
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
|
||||
}
|
||||
listener.onResponse(new Tuple<>(newRepoData, snapshotInfo));
|
||||
listener.onResponse(newRepoData);
|
||||
}, onUpdateFailure));
|
||||
}, onUpdateFailure), 2 + indices.size());
|
||||
|
||||
@ -1078,15 +1070,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||
}
|
||||
));
|
||||
}
|
||||
executor.execute(ActionRunnable.supply(allMetaListener, () -> {
|
||||
final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId,
|
||||
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
|
||||
new ArrayList<>(clusterMetadata.dataStreams().keySet()),
|
||||
startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures,
|
||||
includeGlobalState, userMetadata);
|
||||
snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false);
|
||||
return snapshotInfo;
|
||||
}));
|
||||
executor.execute(ActionRunnable.run(allMetaListener,
|
||||
() -> snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false)));
|
||||
}, onUpdateFailure);
|
||||
}
|
||||
|
||||
|
@ -1064,23 +1064,24 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||
}
|
||||
}
|
||||
final ShardGenerations shardGenerations = buildGenerations(entry, metadata);
|
||||
final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshot.getSnapshotId(),
|
||||
shardGenerations.indices().stream().map(IndexId::getName).collect(Collectors.toList()),
|
||||
entry.dataStreams(),
|
||||
entry.startTime(), failure, threadPool.absoluteTimeInMillis(),
|
||||
entry.partial() ? shardGenerations.totalShards() : entry.shards().size(), shardFailures,
|
||||
entry.includeGlobalState(), entry.userMetadata());
|
||||
repositoriesService.repository(snapshot.getRepository()).finalizeSnapshot(
|
||||
snapshot.getSnapshotId(),
|
||||
shardGenerations,
|
||||
entry.startTime(),
|
||||
failure,
|
||||
entry.partial() ? shardGenerations.totalShards() : entry.shards().size(),
|
||||
unmodifiableList(shardFailures),
|
||||
entry.repositoryStateId(),
|
||||
entry.includeGlobalState(),
|
||||
metadataForSnapshot(entry, metadata),
|
||||
entry.userMetadata(),
|
||||
snapshotInfo,
|
||||
entry.version(),
|
||||
state -> stateWithoutSnapshot(state, snapshot),
|
||||
ActionListener.wrap(result -> {
|
||||
ActionListener.wrap(newRepoData -> {
|
||||
final List<ActionListener<Tuple<RepositoryData, SnapshotInfo>>> completionListeners =
|
||||
snapshotCompletionListeners.remove(snapshot);
|
||||
if (completionListeners != null) {
|
||||
final Tuple<RepositoryData, SnapshotInfo> result = Tuple.tuple(newRepoData, snapshotInfo);
|
||||
try {
|
||||
ActionListener.onResponse(completionListeners, result);
|
||||
} catch (Exception e) {
|
||||
@ -1088,7 +1089,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||
}
|
||||
}
|
||||
endingSnapshots.remove(snapshot);
|
||||
logger.info("snapshot [{}] completed with state [{}]", snapshot, result.v2().state());
|
||||
logger.info("snapshot [{}] completed with state [{}]", snapshot, snapshotInfo.state());
|
||||
}, e -> handleFinalizationFailure(e, entry)));
|
||||
} catch (Exception e) {
|
||||
handleFinalizationFailure(e, entry);
|
||||
|
@ -32,7 +32,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.component.LifecycleListener;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -43,7 +42,6 @@ import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
import org.elasticsearch.snapshots.SnapshotShardFailure;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.Transport;
|
||||
@ -165,11 +163,10 @@ public class RepositoriesServiceTests extends ESTestCase {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations indices, long startTime, String failure,
|
||||
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
|
||||
boolean includeGlobalState, Metadata metadata, Map<String, Object> userMetadata,
|
||||
Version repositoryMetaVersion, Function<ClusterState, ClusterState> stateTransformer,
|
||||
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
|
||||
public void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId, Metadata clusterMetadata,
|
||||
SnapshotInfo snapshotInfo, Version repositoryMetaVersion,
|
||||
Function<ClusterState, ClusterState> stateTransformer,
|
||||
ActionListener<RepositoryData> listener) {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
|
||||
|
@ -30,7 +30,6 @@ import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
@ -62,6 +61,7 @@ import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
||||
@ -175,11 +175,15 @@ public class BlobStoreRepositoryRestoreTests extends IndexShardTestCase {
|
||||
assertNotNull(shardGen);
|
||||
final Snapshot snapshotWithSameName = new Snapshot(repository.getMetadata().name(), new SnapshotId(
|
||||
snapshot.getSnapshotId().getName(), "_uuid2"));
|
||||
PlainActionFuture.<Tuple<RepositoryData, SnapshotInfo>, Exception>get(f ->
|
||||
repository.finalizeSnapshot(snapshot.getSnapshotId(),
|
||||
ShardGenerations.builder().put(indexId, 0, shardGen).build(),
|
||||
0L, null, 1, Collections.emptyList(), -1L, false,
|
||||
Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(), Collections.emptyMap(),
|
||||
final ShardGenerations shardGenerations = ShardGenerations.builder().put(indexId, 0, shardGen).build();
|
||||
PlainActionFuture.<RepositoryData, Exception>get(f ->
|
||||
repository.finalizeSnapshot(
|
||||
shardGenerations,
|
||||
RepositoryData.EMPTY_REPO_GEN,
|
||||
Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(),
|
||||
new SnapshotInfo(snapshot.getSnapshotId(), shardGenerations.indices().stream()
|
||||
.map(IndexId::getName).collect(Collectors.toList()), Collections.emptyList(), 0L, null, 1L, 6,
|
||||
Collections.emptyList(), true, Collections.emptyMap()),
|
||||
Version.CURRENT, Function.identity(), f));
|
||||
IndexShardSnapshotFailedException isfe = expectThrows(IndexShardSnapshotFailedException.class,
|
||||
() -> snapshotShard(shard, snapshotWithSameName, repository));
|
||||
|
@ -25,7 +25,6 @@ import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.blobstore.BlobContainer;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||
@ -156,23 +155,29 @@ public class MockEventuallyConsistentRepositoryTests extends ESTestCase {
|
||||
|
||||
// We create a snap- blob for snapshot "foo" in the first generation
|
||||
final SnapshotId snapshotId = new SnapshotId("foo", UUIDs.randomBase64UUID());
|
||||
PlainActionFuture.<Tuple<RepositoryData, SnapshotInfo>, Exception>get(f ->
|
||||
PlainActionFuture.<RepositoryData, Exception>get(f ->
|
||||
// We try to write another snap- blob for "foo" in the next generation. It fails because the content differs.
|
||||
repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(),
|
||||
-1L, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, Function.identity(), f));
|
||||
repository.finalizeSnapshot(ShardGenerations.EMPTY, RepositoryData.EMPTY_REPO_GEN, Metadata.EMPTY_METADATA,
|
||||
new SnapshotInfo(snapshotId, Collections.emptyList(), Collections.emptyList(),
|
||||
0L, null, 1L, 5, Collections.emptyList(), true, Collections.emptyMap()),
|
||||
Version.CURRENT, Function.identity(), f));
|
||||
|
||||
// We try to write another snap- blob for "foo" in the next generation. It fails because the content differs.
|
||||
final AssertionError assertionError = expectThrows(AssertionError.class,
|
||||
() -> PlainActionFuture.<Tuple<RepositoryData, SnapshotInfo>, Exception>get(f ->
|
||||
repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 6, Collections.emptyList(),
|
||||
0, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, Function.identity(), f)));
|
||||
() -> PlainActionFuture.<RepositoryData, Exception>get(f ->
|
||||
repository.finalizeSnapshot(ShardGenerations.EMPTY, 0L, Metadata.EMPTY_METADATA,
|
||||
new SnapshotInfo(snapshotId, Collections.emptyList(), Collections.emptyList(),
|
||||
0L, null, 1L, 6, Collections.emptyList(), true, Collections.emptyMap()),
|
||||
Version.CURRENT, Function.identity(), f)));
|
||||
assertThat(assertionError.getMessage(), equalTo("\nExpected: <6>\n but: was <5>"));
|
||||
|
||||
// We try to write yet another snap- blob for "foo" in the next generation.
|
||||
// It passes cleanly because the content of the blob except for the timestamps.
|
||||
PlainActionFuture.<Tuple<RepositoryData, SnapshotInfo>, Exception>get(f ->
|
||||
repository.finalizeSnapshot(snapshotId, ShardGenerations.EMPTY, 1L, null, 5, Collections.emptyList(),
|
||||
0, false, Metadata.EMPTY_METADATA, Collections.emptyMap(), Version.CURRENT, Function.identity(), f));
|
||||
PlainActionFuture.<RepositoryData, Exception>get(f ->
|
||||
repository.finalizeSnapshot(ShardGenerations.EMPTY, 0L, Metadata.EMPTY_METADATA,
|
||||
new SnapshotInfo(snapshotId, Collections.emptyList(), Collections.emptyList(),
|
||||
0L, null, 2L, 5, Collections.emptyList(), true, Collections.emptyMap()),
|
||||
Version.CURRENT, Function.identity(), f));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,7 +27,6 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.Metadata;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||
@ -39,7 +38,6 @@ import org.elasticsearch.repositories.RepositoryData;
|
||||
import org.elasticsearch.repositories.ShardGenerations;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
import org.elasticsearch.snapshots.SnapshotShardFailure;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
@ -103,11 +101,10 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
|
||||
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
|
||||
boolean includeGlobalState, Metadata metadata, Map<String, Object> userMetadata,
|
||||
Version repositoryMetaVersion, Function<ClusterState, ClusterState> stateTransformer,
|
||||
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
|
||||
public void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId,
|
||||
Metadata clusterMetadata, SnapshotInfo snapshotInfo, Version repositoryMetaVersion,
|
||||
Function<ClusterState, ClusterState> stateTransformer,
|
||||
ActionListener<RepositoryData> listener) {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
|
||||
|
@ -34,7 +34,6 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.metrics.CounterMetric;
|
||||
@ -68,7 +67,6 @@ import org.elasticsearch.repositories.ShardGenerations;
|
||||
import org.elasticsearch.repositories.blobstore.FileRestoreContext;
|
||||
import org.elasticsearch.snapshots.SnapshotId;
|
||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||
import org.elasticsearch.snapshots.SnapshotShardFailure;
|
||||
import org.elasticsearch.snapshots.SnapshotState;
|
||||
import org.elasticsearch.threadpool.Scheduler;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
@ -266,12 +264,12 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
||||
public void initializeSnapshot(SnapshotId snapshotId, List<IndexId> indices, Metadata metadata) {
|
||||
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure, int totalShards,
|
||||
List<SnapshotShardFailure> shardFailures, long repositoryStateId, boolean includeGlobalState,
|
||||
Metadata metadata, Map<String, Object> userMetadata, Version repositoryMetaVersion,
|
||||
public void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId, Metadata metadata,
|
||||
SnapshotInfo snapshotInfo, Version repositoryMetaVersion,
|
||||
Function<ClusterState, ClusterState> stateTransformer,
|
||||
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
|
||||
ActionListener<RepositoryData> listener) {
|
||||
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
|
||||
}
|
||||
|
||||
|
@ -23,7 +23,6 @@ import org.elasticsearch.cluster.metadata.Metadata;
|
||||
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.lucene.search.Queries;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -94,18 +93,16 @@ public final class SourceOnlySnapshotRepository extends FilterRepository {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void finalizeSnapshot(SnapshotId snapshotId, ShardGenerations shardGenerations, long startTime, String failure,
|
||||
int totalShards, List<SnapshotShardFailure> shardFailures, long repositoryStateId,
|
||||
boolean includeGlobalState, Metadata metadata, Map<String, Object> userMetadata,
|
||||
Version repositoryMetaVersion, Function<ClusterState, ClusterState> stateTransformer,
|
||||
ActionListener<Tuple<RepositoryData, SnapshotInfo>> listener) {
|
||||
public void finalizeSnapshot(ShardGenerations shardGenerations, long repositoryStateId, Metadata metadata,
|
||||
SnapshotInfo snapshotInfo, Version repositoryMetaVersion,
|
||||
Function<ClusterState, ClusterState> stateTransformer,
|
||||
ActionListener<RepositoryData> listener) {
|
||||
// we process the index metadata at snapshot time. This means if somebody tries to restore
|
||||
// a _source only snapshot with a plain repository it will be just fine since we already set the
|
||||
// required engine, that the index is read-only and the mapping to a default mapping
|
||||
try {
|
||||
super.finalizeSnapshot(snapshotId, shardGenerations, startTime, failure, totalShards, shardFailures, repositoryStateId,
|
||||
includeGlobalState, metadataToSnapshot(shardGenerations.indices(), metadata), userMetadata, repositoryMetaVersion,
|
||||
stateTransformer, listener);
|
||||
super.finalizeSnapshot(shardGenerations, repositoryStateId, metadataToSnapshot(shardGenerations.indices(), metadata),
|
||||
snapshotInfo, repositoryMetaVersion, stateTransformer, listener);
|
||||
} catch (IOException ex) {
|
||||
listener.onFailure(ex);
|
||||
}
|
||||
|
@ -35,7 +35,6 @@ import org.elasticsearch.cluster.routing.TestShardRouting;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
import org.elasticsearch.common.settings.ClusterSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -77,6 +76,7 @@ import java.util.Collections;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
|
||||
|
||||
@ -219,12 +219,18 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
|
||||
repository.snapshotShard(shard.store(), shard.mapperService(), snapshotId, indexId, snapshotRef.getIndexCommit(),
|
||||
null, indexShardSnapshotStatus, Version.CURRENT, Collections.emptyMap(), future);
|
||||
future.actionGet();
|
||||
final PlainActionFuture<Tuple<RepositoryData, SnapshotInfo>> finFuture = PlainActionFuture.newFuture();
|
||||
repository.finalizeSnapshot(snapshotId,
|
||||
ShardGenerations.builder().put(indexId, 0, indexShardSnapshotStatus.generation()).build(),
|
||||
indexShardSnapshotStatus.asCopy().getStartTime(), null, 1, Collections.emptyList(),
|
||||
ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getGenId(), true,
|
||||
Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(), Collections.emptyMap(),
|
||||
final PlainActionFuture<RepositoryData> finFuture = PlainActionFuture.newFuture();
|
||||
final ShardGenerations shardGenerations =
|
||||
ShardGenerations.builder().put(indexId, 0, indexShardSnapshotStatus.generation()).build();
|
||||
repository.finalizeSnapshot(
|
||||
shardGenerations,
|
||||
ESBlobStoreRepositoryIntegTestCase.getRepositoryData(repository).getGenId(),
|
||||
Metadata.builder().put(shard.indexSettings().getIndexMetadata(), false).build(),
|
||||
new SnapshotInfo(snapshotId,
|
||||
shardGenerations.indices().stream()
|
||||
.map(IndexId::getName).collect(Collectors.toList()), Collections.emptyList(), 0L, null, 1L,
|
||||
shardGenerations.totalShards(),
|
||||
Collections.emptyList(), true, Collections.emptyMap()),
|
||||
Version.CURRENT, Function.identity(), finFuture);
|
||||
finFuture.actionGet();
|
||||
});
|
||||
|
Loading…
x
Reference in New Issue
Block a user