Cleaner Handling of Snapshot Related null Custom Values in CS (#58382) (#58501)

Add the ability to get a custom value while specifying a default and use it throughout the
codebase to get rid of the `null` edge case and shorten the code a little.
This commit is contained in:
Armin Braun 2020-06-24 17:24:44 +02:00 committed by GitHub
parent f4fad9c65a
commit 9e4c5d1dde
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 166 additions and 213 deletions

View File

@ -712,7 +712,7 @@ public class ClusterStateDiffIT extends ESIntegTestCase {
public ClusterState.Custom randomCreate(String name) {
switch (randomIntBetween(0, 1)) {
case 0:
return new SnapshotsInProgress(new SnapshotsInProgress.Entry(
return SnapshotsInProgress.of(Collections.singletonList(new SnapshotsInProgress.Entry(
new Snapshot(randomName("repo"), new SnapshotId(randomName("snap"), UUIDs.randomBase64UUID())),
randomBoolean(),
randomBoolean(),
@ -722,7 +722,7 @@ public class ClusterStateDiffIT extends ESIntegTestCase {
(long) randomIntBetween(0, 1000),
ImmutableOpenMap.of(),
SnapshotInfoTests.randomUserMetadata(),
randomVersion(random())));
randomVersion(random()))));
case 1:
return new RestoreInProgress.Builder().add(
new RestoreInProgress.Entry(

View File

@ -312,12 +312,13 @@ public class SnapshotDisruptionIT extends ESIntegTestCase {
logger.info("--> wait until the snapshot is done");
assertBusy(() -> {
ClusterState state = dataNodeClient().admin().cluster().prepareState().get().getState();
SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE);
SnapshotDeletionsInProgress snapshotDeletionsInProgress = state.custom(SnapshotDeletionsInProgress.TYPE);
if (snapshots != null && snapshots.entries().isEmpty() == false) {
SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
SnapshotDeletionsInProgress snapshotDeletionsInProgress =
state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY);
if (snapshots.entries().isEmpty() == false) {
logger.info("Current snapshot state [{}]", snapshots.entries().get(0).state());
fail("Snapshot is still running");
} else if (snapshotDeletionsInProgress != null && snapshotDeletionsInProgress.hasDeletionsInProgress()) {
} else if (snapshotDeletionsInProgress.hasDeletionsInProgress()) {
logger.info("Current snapshot deletion state [{}]", snapshotDeletionsInProgress);
fail("Snapshot deletion is still running");
} else {

View File

@ -49,6 +49,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Collections;
/**
* Repository cleanup action for repository implementations based on {@link BlobStoreRepository}.
@ -97,8 +98,9 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
// operations from corrupting the repository. This is the same safety mechanism used by snapshot deletes.
clusterService.addStateApplier(event -> {
if (event.localNodeMaster() && event.previousState().nodes().isLocalNodeElectedMaster() == false) {
final RepositoryCleanupInProgress repositoryCleanupInProgress = event.state().custom(RepositoryCleanupInProgress.TYPE);
if (repositoryCleanupInProgress == null || repositoryCleanupInProgress.hasCleanupInProgress() == false) {
final RepositoryCleanupInProgress repositoryCleanupInProgress =
event.state().custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY);
if (repositoryCleanupInProgress.hasCleanupInProgress() == false) {
return;
}
clusterService.submitStateUpdateTask("clean up repository cleanup task after master failover",
@ -124,19 +126,9 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
}
private static ClusterState removeInProgressCleanup(final ClusterState currentState) {
RepositoryCleanupInProgress cleanupInProgress = currentState.custom(RepositoryCleanupInProgress.TYPE);
if (cleanupInProgress != null) {
boolean changed = false;
if (cleanupInProgress.hasCleanupInProgress()) {
cleanupInProgress = new RepositoryCleanupInProgress();
changed = true;
}
if (changed) {
return ClusterState.builder(currentState).putCustom(
RepositoryCleanupInProgress.TYPE, cleanupInProgress).build();
}
}
return currentState;
return currentState.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress()
? ClusterState.builder(currentState).putCustom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).build()
: currentState;
}
@Override
@ -186,25 +178,26 @@ public final class TransportCleanupRepositoryAction extends TransportMasterNodeA
@Override
public ClusterState execute(ClusterState currentState) {
final RepositoryCleanupInProgress repositoryCleanupInProgress =
currentState.custom(RepositoryCleanupInProgress.TYPE);
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
currentState.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY);
if (repositoryCleanupInProgress.hasCleanupInProgress()) {
throw new IllegalStateException(
"Cannot cleanup [" + repositoryName + "] - a repository cleanup is already in-progress in ["
+ repositoryCleanupInProgress + "]");
}
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
final SnapshotDeletionsInProgress deletionsInProgress =
currentState.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY);
if (deletionsInProgress.hasDeletionsInProgress()) {
throw new IllegalStateException("Cannot cleanup [" + repositoryName
+ "] - a snapshot is currently being deleted in [" + deletionsInProgress + "]");
}
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots != null && !snapshots.entries().isEmpty()) {
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
if (snapshots.entries().isEmpty() == false) {
throw new IllegalStateException(
"Cannot cleanup [" + repositoryName + "] - a snapshot is currently running in [" + snapshots + "]");
}
return ClusterState.builder(currentState).putCustom(RepositoryCleanupInProgress.TYPE,
new RepositoryCleanupInProgress(
RepositoryCleanupInProgress.startedEntry(repositoryName, repositoryStateId))).build();
new RepositoryCleanupInProgress(Collections.singletonList(
RepositoryCleanupInProgress.startedEntry(repositoryName, repositoryStateId)))).build();
}
@Override

View File

@ -35,7 +35,6 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
@ -110,7 +109,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
protected void masterOperation(final SnapshotsStatusRequest request,
final ClusterState state,
final ActionListener<SnapshotsStatusResponse> listener) throws Exception {
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
List<SnapshotsInProgress.Entry> currentSnapshots =
SnapshotsService.currentSnapshots(snapshotsInProgress, request.repository(), Arrays.asList(request.snapshots()));
if (currentSnapshots.isEmpty()) {
@ -147,7 +146,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
}
private void buildResponse(@Nullable SnapshotsInProgress snapshotsInProgress, SnapshotsStatusRequest request,
private void buildResponse(SnapshotsInProgress snapshotsInProgress, SnapshotsStatusRequest request,
List<SnapshotsInProgress.Entry> currentSnapshotEntries,
TransportNodesSnapshotsStatus.NodesSnapshotStatus nodeSnapshotStatuses,
ActionListener<SnapshotsStatusResponse> listener) {
@ -216,7 +215,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
}
}
private void loadRepositoryData(@Nullable SnapshotsInProgress snapshotsInProgress, SnapshotsStatusRequest request,
private void loadRepositoryData(SnapshotsInProgress snapshotsInProgress, SnapshotsStatusRequest request,
List<SnapshotStatus> builder, Set<String> currentSnapshotNames, String repositoryName,
ActionListener<SnapshotsStatusResponse> listener) {
final Set<String> requestedSnapshotNames = Sets.newHashSet(request.snapshots());
@ -289,7 +288,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
* @return snapshot
* @throws SnapshotMissingException if snapshot is not found
*/
private SnapshotInfo snapshot(@Nullable SnapshotsInProgress snapshotsInProgress, String repositoryName, SnapshotId snapshotId) {
private SnapshotInfo snapshot(SnapshotsInProgress snapshotsInProgress, String repositoryName, SnapshotId snapshotId) {
List<SnapshotsInProgress.Entry> entries =
SnapshotsService.currentSnapshots(snapshotsInProgress, repositoryName, Collections.singletonList(snapshotId.getName()));
if (!entries.isEmpty()) {

View File

@ -271,6 +271,11 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>
return (T) customs.get(type);
}
@SuppressWarnings("unchecked")
public <T extends Custom> T custom(String type, T defaultValue) {
return (T) customs.getOrDefault(type, defaultValue);
}
public ClusterName getClusterName() {
return this.clusterName;
}

View File

@ -28,17 +28,19 @@ import org.elasticsearch.repositories.RepositoryOperation;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
public final class RepositoryCleanupInProgress extends AbstractNamedDiffable<ClusterState.Custom> implements ClusterState.Custom {
public static final RepositoryCleanupInProgress EMPTY = new RepositoryCleanupInProgress(Collections.emptyList());
public static final String TYPE = "repository_cleanup";
private final List<Entry> entries;
public RepositoryCleanupInProgress(Entry... entries) {
this.entries = Arrays.asList(entries);
public RepositoryCleanupInProgress(List<Entry> entries) {
this.entries = entries;
}
RepositoryCleanupInProgress(StreamInput in) throws IOException {

View File

@ -52,6 +52,8 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
public static final String TYPE = "restore";
public static final RestoreInProgress EMPTY = new RestoreInProgress(ImmutableOpenMap.of());
private final ImmutableOpenMap<String, Entry> entries;
/**
@ -118,7 +120,7 @@ public class RestoreInProgress extends AbstractNamedDiffable<Custom> implements
}
public RestoreInProgress build() {
return new RestoreInProgress(entries.build());
return entries.isEmpty() ? EMPTY : new RestoreInProgress(entries.build());
}
}

View File

@ -44,13 +44,22 @@ import java.util.Objects;
*/
public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> implements Custom {
public static final SnapshotDeletionsInProgress EMPTY = new SnapshotDeletionsInProgress(Collections.emptyList());
public static final String TYPE = "snapshot_deletions";
// the list of snapshot deletion request entries
private final List<Entry> entries;
public SnapshotDeletionsInProgress(List<Entry> entries) {
this.entries = Collections.unmodifiableList(entries);
private SnapshotDeletionsInProgress(List<Entry> entries) {
this.entries = entries;
}
public static SnapshotDeletionsInProgress of(List<SnapshotDeletionsInProgress.Entry> entries) {
if (entries.isEmpty()) {
return EMPTY;
}
return new SnapshotDeletionsInProgress(Collections.unmodifiableList(entries));
}
public SnapshotDeletionsInProgress(StreamInput in) throws IOException {
@ -72,7 +81,7 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
public SnapshotDeletionsInProgress withAddedEntry(Entry entry) {
List<Entry> entries = new ArrayList<>(getEntries());
entries.add(entry);
return new SnapshotDeletionsInProgress(entries);
return SnapshotDeletionsInProgress.of(entries);
}
/**
@ -82,7 +91,7 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
public SnapshotDeletionsInProgress withRemovedEntry(Entry entry) {
List<Entry> entries = new ArrayList<>(getEntries());
entries.remove(entry);
return new SnapshotDeletionsInProgress(entries);
return SnapshotDeletionsInProgress.of(entries);
}
/**

View File

@ -59,6 +59,8 @@ import static org.elasticsearch.snapshots.SnapshotInfo.DATA_STREAMS_IN_SNAPSHOT;
*/
public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implements Custom {
public static final SnapshotsInProgress EMPTY = new SnapshotsInProgress(Collections.emptyList());
private static final Version VERSION_IN_SNAPSHOT_VERSION = Version.V_7_7_0;
public static final String TYPE = "snapshots";
@ -483,12 +485,15 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
private final List<Entry> entries;
public SnapshotsInProgress(List<Entry> entries) {
this.entries = entries;
public static SnapshotsInProgress of(List<Entry> entries) {
if (entries.isEmpty()) {
return EMPTY;
}
return new SnapshotsInProgress(Collections.unmodifiableList(entries));
}
public SnapshotsInProgress(Entry... entries) {
this.entries = Arrays.asList(entries);
private SnapshotsInProgress(List<Entry> entries) {
this.entries = entries;
}
public List<Entry> entries() {

View File

@ -143,14 +143,12 @@ public class MetadataDeleteIndexService {
// update snapshot restore entries
ImmutableOpenMap<String, ClusterState.Custom> customs = currentState.getCustoms();
final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE);
if (restoreInProgress != null) {
RestoreInProgress updatedRestoreInProgress = RestoreService.updateRestoreStateWithDeletedIndices(restoreInProgress, indices);
if (updatedRestoreInProgress != restoreInProgress) {
ImmutableOpenMap.Builder<String, ClusterState.Custom> builder = ImmutableOpenMap.builder(customs);
builder.put(RestoreInProgress.TYPE, updatedRestoreInProgress);
customs = builder.build();
}
final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY);
RestoreInProgress updatedRestoreInProgress = RestoreService.updateRestoreStateWithDeletedIndices(restoreInProgress, indices);
if (updatedRestoreInProgress != restoreInProgress) {
ImmutableOpenMap.Builder<String, ClusterState.Custom> builder = ImmutableOpenMap.builder(customs);
builder.put(RestoreInProgress.TYPE, updatedRestoreInProgress);
customs = builder.build();
}
return allocationService.reroute(

View File

@ -476,36 +476,27 @@ public class RepositoriesService extends AbstractLifecycleComponent implements C
* @return true if repository is currently in use by one of the running snapshots
*/
private static boolean isRepositoryInUse(ClusterState clusterState, String repository) {
SnapshotsInProgress snapshots = clusterState.custom(SnapshotsInProgress.TYPE);
if (snapshots != null) {
for (SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
if (repository.equals(snapshot.snapshot().getRepository())) {
return true;
}
final SnapshotsInProgress snapshots = clusterState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
for (SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
if (repository.equals(snapshot.snapshot().getRepository())) {
return true;
}
}
SnapshotDeletionsInProgress deletionsInProgress = clusterState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null) {
for (SnapshotDeletionsInProgress.Entry entry : deletionsInProgress.getEntries()) {
if (entry.repository().equals(repository)) {
return true;
}
for (SnapshotDeletionsInProgress.Entry entry :
clusterState.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).getEntries()) {
if (entry.repository().equals(repository)) {
return true;
}
}
final RepositoryCleanupInProgress repositoryCleanupInProgress = clusterState.custom(RepositoryCleanupInProgress.TYPE);
if (repositoryCleanupInProgress != null) {
for (RepositoryCleanupInProgress.Entry entry : repositoryCleanupInProgress.entries()) {
if (entry.repository().equals(repository)) {
return true;
}
for (RepositoryCleanupInProgress.Entry entry :
clusterState.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).entries()) {
if (entry.repository().equals(repository)) {
return true;
}
}
RestoreInProgress restoreInProgress = clusterState.custom(RestoreInProgress.TYPE);
if (restoreInProgress != null) {
for (RestoreInProgress.Entry entry: restoreInProgress) {
if (repository.equals(entry.snapshot().getRepository())) {
return true;
}
for (RestoreInProgress.Entry entry : clusterState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)) {
if (repository.equals(entry.snapshot().getRepository())) {
return true;
}
}
return false;

View File

@ -415,21 +415,18 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
return;
}
if (bestEffortConsistency) {
long bestGenerationFromCS = RepositoryData.EMPTY_REPO_GEN;
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress != null) {
bestGenerationFromCS = bestGeneration(snapshotsInProgress.entries());
}
final SnapshotDeletionsInProgress deletionsInProgress = state.custom(SnapshotDeletionsInProgress.TYPE);
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
long bestGenerationFromCS = bestGeneration(snapshotsInProgress.entries());
// Don't use generation from the delete task if we already found a generation for an in progress snapshot.
// In this case, the generation points at the generation the repo will be in after the snapshot finishes so it may not yet
// exist
if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN && deletionsInProgress != null) {
bestGenerationFromCS = bestGeneration(deletionsInProgress.getEntries());
if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN) {
bestGenerationFromCS =
bestGeneration(state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).getEntries());
}
final RepositoryCleanupInProgress cleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE);
if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN && cleanupInProgress != null) {
bestGenerationFromCS = bestGeneration(cleanupInProgress.entries());
if (bestGenerationFromCS == RepositoryData.EMPTY_REPO_GEN) {
bestGenerationFromCS =
bestGeneration(state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).entries());
}
final long finalBestGen = Math.max(bestGenerationFromCS, metadata.generation());
latestKnownRepoGen.updateAndGet(known -> Math.max(known, finalBestGen));

View File

@ -89,7 +89,6 @@ import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import static java.util.Collections.emptySet;
import static java.util.Collections.unmodifiableSet;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_AUTO_EXPAND_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_CREATION_DATE;
@ -260,19 +259,19 @@ public class RestoreService implements ClusterStateApplier {
@Override
public ClusterState execute(ClusterState currentState) {
RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE);
RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY);
if (currentState.getNodes().getMinNodeVersion().before(Version.V_7_0_0)) {
// Check if another restore process is already running - cannot run two restore processes at the
// same time in versions prior to 7.0
if (restoreInProgress != null && restoreInProgress.isEmpty() == false) {
if (restoreInProgress.isEmpty() == false) {
throw new ConcurrentSnapshotExecutionException(snapshot,
"Restore process is already running in this cluster");
}
}
// Check if the snapshot to restore is currently being deleted
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null
&& deletionsInProgress.getEntries().stream().anyMatch(entry -> entry.getSnapshots().contains(snapshotId))) {
SnapshotDeletionsInProgress deletionsInProgress =
currentState.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY);
if (deletionsInProgress.getEntries().stream().anyMatch(entry -> entry.getSnapshots().contains(snapshotId))) {
throw new ConcurrentSnapshotExecutionException(snapshot,
"cannot restore a snapshot while a snapshot deletion is in-progress [" +
deletionsInProgress.getEntries().get(0) + "]");
@ -406,13 +405,8 @@ public class RestoreService implements ClusterStateApplier {
Collections.unmodifiableList(new ArrayList<>(indices.keySet())),
shards
);
RestoreInProgress.Builder restoreInProgressBuilder;
if (restoreInProgress != null) {
restoreInProgressBuilder = new RestoreInProgress.Builder(restoreInProgress);
} else {
restoreInProgressBuilder = new RestoreInProgress.Builder();
}
builder.putCustom(RestoreInProgress.TYPE, restoreInProgressBuilder.add(restoreEntry).build());
builder.putCustom(RestoreInProgress.TYPE, new RestoreInProgress.Builder(
currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)).add(restoreEntry).build());
} else {
shards = ImmutableOpenMap.of();
}
@ -763,11 +757,7 @@ public class RestoreService implements ClusterStateApplier {
}
public static RestoreInProgress.Entry restoreInProgress(ClusterState state, String restoreUUID) {
final RestoreInProgress restoreInProgress = state.custom(RestoreInProgress.TYPE);
if (restoreInProgress != null) {
return restoreInProgress.get(restoreUUID);
}
return null;
return state.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY).get(restoreUUID);
}
static class CleanRestoreStateTaskExecutor implements ClusterStateTaskExecutor<CleanRestoreStateTaskExecutor.Task>,
@ -791,15 +781,12 @@ public class RestoreService implements ClusterStateApplier {
final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks);
Set<String> completedRestores = tasks.stream().map(e -> e.uuid).collect(Collectors.toSet());
RestoreInProgress.Builder restoreInProgressBuilder = new RestoreInProgress.Builder();
final RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE);
boolean changed = false;
if (restoreInProgress != null) {
for (RestoreInProgress.Entry entry : restoreInProgress) {
if (completedRestores.contains(entry.uuid())) {
changed = true;
} else {
restoreInProgressBuilder.add(entry);
}
for (RestoreInProgress.Entry entry : currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)) {
if (completedRestores.contains(entry.uuid())) {
changed = true;
} else {
restoreInProgressBuilder.add(entry);
}
}
if (changed == false) {
@ -824,20 +811,15 @@ public class RestoreService implements ClusterStateApplier {
}
private void cleanupRestoreState(ClusterChangedEvent event) {
ClusterState state = event.state();
RestoreInProgress restoreInProgress = state.custom(RestoreInProgress.TYPE);
if (restoreInProgress != null) {
for (RestoreInProgress.Entry entry : restoreInProgress) {
if (entry.state().completed()) {
assert completed(entry.shards()) : "state says completed but restore entries are not";
clusterService.submitStateUpdateTask(
"clean up snapshot restore state",
new CleanRestoreStateTaskExecutor.Task(entry.uuid()),
ClusterStateTaskConfig.build(Priority.URGENT),
cleanRestoreStateTaskExecutor,
cleanRestoreStateTaskExecutor);
}
for (RestoreInProgress.Entry entry : event.state().custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)) {
if (entry.state().completed()) {
assert completed(entry.shards()) : "state says completed but restore entries are not";
clusterService.submitStateUpdateTask(
"clean up snapshot restore state",
new CleanRestoreStateTaskExecutor.Task(entry.uuid()),
ClusterStateTaskConfig.build(Priority.URGENT),
cleanRestoreStateTaskExecutor,
cleanRestoreStateTaskExecutor);
}
}
}
@ -939,13 +921,8 @@ public class RestoreService implements ClusterStateApplier {
* Returns the indices that are currently being restored and that are contained in the indices-to-check set.
*/
public static Set<Index> restoringIndices(final ClusterState currentState, final Set<Index> indicesToCheck) {
final RestoreInProgress restore = currentState.custom(RestoreInProgress.TYPE);
if (restore == null) {
return emptySet();
}
final Set<Index> indices = new HashSet<>();
for (RestoreInProgress.Entry entry : restore) {
for (RestoreInProgress.Entry entry : currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY)) {
for (ObjectObjectCursor<ShardId, RestoreInProgress.ShardRestoreStatus> shard : entry.shards()) {
Index index = shard.key.getIndex();
if (indicesToCheck.contains(index)

View File

@ -149,15 +149,12 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
@Override
public void clusterChanged(ClusterChangedEvent event) {
try {
SnapshotsInProgress previousSnapshots = event.previousState().custom(SnapshotsInProgress.TYPE);
SnapshotsInProgress currentSnapshots = event.state().custom(SnapshotsInProgress.TYPE);
if ((previousSnapshots == null && currentSnapshots != null)
|| (previousSnapshots != null && previousSnapshots.equals(currentSnapshots) == false)) {
SnapshotsInProgress previousSnapshots = event.previousState().custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
SnapshotsInProgress currentSnapshots = event.state().custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
if (previousSnapshots.equals(currentSnapshots) == false) {
synchronized (shardSnapshots) {
cancelRemoved(currentSnapshots);
if (currentSnapshots != null) {
startNewSnapshots(currentSnapshots);
}
startNewSnapshots(currentSnapshots);
}
}
@ -203,13 +200,13 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
}
}
private void cancelRemoved(@Nullable SnapshotsInProgress snapshotsInProgress) {
private void cancelRemoved(SnapshotsInProgress snapshotsInProgress) {
// First, remove snapshots that are no longer there
Iterator<Map.Entry<Snapshot, Map<ShardId, IndexShardSnapshotStatus>>> it = shardSnapshots.entrySet().iterator();
while (it.hasNext()) {
final Map.Entry<Snapshot, Map<ShardId, IndexShardSnapshotStatus>> entry = it.next();
final Snapshot snapshot = entry.getKey();
if (snapshotsInProgress == null || snapshotsInProgress.snapshot(snapshot) == null) {
if (snapshotsInProgress.snapshot(snapshot) == null) {
// abort any running snapshots of shards for the removed entry;
// this could happen if for some reason the cluster state update for aborting
// running shards is missed, then the snapshot is removed is a subsequent cluster
@ -642,7 +639,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount);
return ClusterTasksResult.<UpdateIndexShardSnapshotStatusRequest>builder().successes(tasks)
.build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
new SnapshotsInProgress(unmodifiableList(entries))).build());
SnapshotsInProgress.of(unmodifiableList(entries))).build());
}
}
return ClusterTasksResult.<UpdateIndexShardSnapshotStatusRequest>builder().successes(tasks).build(currentState);

View File

@ -231,7 +231,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
userMeta, Version.CURRENT
);
initializingSnapshots.add(newSnapshot.snapshot());
snapshots = new SnapshotsInProgress(newSnapshot);
snapshots = SnapshotsInProgress.of(Collections.singletonList(newSnapshot));
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
}
@ -419,8 +419,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
}
}
return ClusterState.builder(currentState)
.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries)))
.build();
.putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(unmodifiableList(entries))).build();
}
@Override
@ -721,7 +720,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
}
if (changed) {
return ClusterState.builder(currentState)
.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))).build();
.putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(unmodifiableList(entries))).build();
}
return currentState;
}
@ -760,7 +759,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
}
if (changed) {
return ClusterState.builder(currentState)
.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))).build();
.putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(unmodifiableList(entries))).build();
}
}
return currentState;
@ -963,7 +962,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
}
if (changed) {
return ClusterState.builder(state).putCustom(
SnapshotsInProgress.TYPE, new SnapshotsInProgress(unmodifiableList(entries))).build();
SnapshotsInProgress.TYPE, SnapshotsInProgress.of(unmodifiableList(entries))).build();
}
}
return state;
@ -1122,7 +1121,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
failure = snapshotEntry.failure();
}
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
new SnapshotsInProgress(snapshots.entries().stream().map(existing -> {
SnapshotsInProgress.of(snapshots.entries().stream().map(existing -> {
if (existing.equals(snapshotEntry)) {
return new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards, failure);
}

View File

@ -99,7 +99,7 @@ public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCas
ClusterState cs = getClusterStateWithDataStreams(Arrays.asList(new Tuple<>(dataStreamName, 2), new Tuple<>(dataStreamName2, 2)),
otherIndices);
SnapshotsInProgress snapshotsInProgress = new SnapshotsInProgress(Arrays.asList(
SnapshotsInProgress snapshotsInProgress = SnapshotsInProgress.of(Arrays.asList(
createEntry(dataStreamName, "repo1", false),
createEntry(dataStreamName2, "repo2", true)));
ClusterState snapshotCs = ClusterState.builder(cs).putCustom(SnapshotsInProgress.TYPE, snapshotsInProgress).build();

View File

@ -79,10 +79,10 @@ public class MetadataDeleteIndexServiceTests extends ESTestCase {
public void testDeleteSnapshotting() {
String index = randomAlphaOfLength(5);
Snapshot snapshot = new Snapshot("doesn't matter", new SnapshotId("snapshot name", "snapshot uuid"));
SnapshotsInProgress snaps = new SnapshotsInProgress(new SnapshotsInProgress.Entry(snapshot, true, false,
SnapshotsInProgress snaps = SnapshotsInProgress.of(List.of(new SnapshotsInProgress.Entry(snapshot, true, false,
SnapshotsInProgress.State.INIT, singletonList(new IndexId(index, "doesn't matter")),
System.currentTimeMillis(), (long) randomIntBetween(0, 1000), ImmutableOpenMap.of(),
SnapshotInfoTests.randomUserMetadata(), VersionUtils.randomVersion(random())));
SnapshotInfoTests.randomUserMetadata(), VersionUtils.randomVersion(random()))));
ClusterState state = ClusterState.builder(clusterState(index))
.putCustom(SnapshotsInProgress.TYPE, snaps)
.build();

View File

@ -463,7 +463,8 @@ public class MetadataIndexStateServiceTests extends ESTestCase {
new SnapshotsInProgress.Entry(snapshot, randomBoolean(), false, SnapshotsInProgress.State.INIT,
Collections.singletonList(new IndexId(index, index)), randomNonNegativeLong(), randomLong(), shardsBuilder.build(),
SnapshotInfoTests.randomUserMetadata(), VersionUtils.randomVersion(random()));
return ClusterState.builder(newState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(entry)).build();
return ClusterState.builder(newState).putCustom(
SnapshotsInProgress.TYPE, SnapshotsInProgress.of(Collections.singletonList(entry))).build();
}
private static ClusterState addIndex(final ClusterState currentState,

View File

@ -121,11 +121,10 @@ public class ClusterSerializationTests extends ESAllocationTestCase {
ClusterState.Builder builder = ClusterState.builder(ClusterState.EMPTY_STATE)
.putCustom(SnapshotDeletionsInProgress.TYPE,
SnapshotDeletionsInProgress.newInstance(
SnapshotDeletionsInProgress.of(Collections.singletonList(
new SnapshotDeletionsInProgress.Entry(
Collections.singletonList(new SnapshotId("snap1", UUIDs.randomBase64UUID())), "repo1",
randomNonNegativeLong(), randomNonNegativeLong())
));
randomNonNegativeLong(), randomNonNegativeLong()))));
if (includeRestore) {
builder.putCustom(RestoreInProgress.TYPE,
new RestoreInProgress.Builder().add(

View File

@ -183,21 +183,17 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
if (snapshotInfos.get(0).state().completed()) {
// Make sure that snapshot clean up operations are finished
ClusterStateResponse stateResponse = client().admin().cluster().prepareState().get();
SnapshotsInProgress snapshotsInProgress = stateResponse.getState().custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress == null) {
boolean found = false;
for (SnapshotsInProgress.Entry entry :
stateResponse.getState().custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
final Snapshot curr = entry.snapshot();
if (curr.getRepository().equals(repository) && curr.getSnapshotId().getName().equals(snapshotName)) {
found = true;
break;
}
}
if (found == false) {
return snapshotInfos.get(0);
} else {
boolean found = false;
for (SnapshotsInProgress.Entry entry : snapshotsInProgress.entries()) {
final Snapshot curr = entry.snapshot();
if (curr.getRepository().equals(repository) && curr.getSnapshotId().getName().equals(snapshotName)) {
found = true;
break;
}
}
if (found == false) {
return snapshotInfos.get(0);
}
}
}
Thread.sleep(100);

View File

@ -437,14 +437,9 @@ public class SnapshotResiliencyTests extends ESTestCase {
.prepareDeleteSnapshot(repoName, snapshotName).execute(ActionListener.wrap(() -> snapshotDeleteResponded.set(true)));
});
runUntil(() -> testClusterNodes.randomMasterNode().map(master -> {
if (snapshotDeleteResponded.get() == false) {
return false;
}
final SnapshotDeletionsInProgress snapshotDeletionsInProgress =
master.clusterService.state().custom(SnapshotDeletionsInProgress.TYPE);
return snapshotDeletionsInProgress == null || snapshotDeletionsInProgress.getEntries().isEmpty();
}).orElse(false), TimeUnit.MINUTES.toMillis(1L));
runUntil(() -> testClusterNodes.randomMasterNode().map(master -> snapshotDeleteResponded.get() &&
master.clusterService.state().custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY)
.getEntries().isEmpty()).orElse(false), TimeUnit.MINUTES.toMillis(1L));
clearDisruptionsAndAwaitSync();
@ -479,8 +474,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
masterNode.clusterService.addListener(new ClusterStateListener() {
@Override
public void clusterChanged(ClusterChangedEvent event) {
final SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress != null && snapshotsInProgress.entries().isEmpty() == false) {
if (event.state().custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().isEmpty() == false) {
client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(deleteSnapshotStepListener);
masterNode.clusterService.removeListener(this);
}
@ -914,16 +908,14 @@ public class SnapshotResiliencyTests extends ESTestCase {
if (createdSnapshot.get() == false) {
return false;
}
final SnapshotsInProgress snapshotsInProgress = master.clusterService.state().custom(SnapshotsInProgress.TYPE);
return snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty();
return master.clusterService.state().custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().isEmpty();
}).orElse(false), TimeUnit.MINUTES.toMillis(1L));
clearDisruptionsAndAwaitSync();
assertTrue(createdSnapshot.get());
final SnapshotsInProgress finalSnapshotsInProgress = testClusterNodes.randomDataNodeSafe()
.clusterService.state().custom(SnapshotsInProgress.TYPE);
assertThat(finalSnapshotsInProgress.entries(), empty());
assertThat(testClusterNodes.randomDataNodeSafe().clusterService.state()
.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries(), empty());
final Repository repository = testClusterNodes.randomMasterNodeSafe().repositoriesService.repository(repoName);
Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
assertThat(snapshotIds, either(hasSize(1)).or(hasSize(0)));

View File

@ -49,7 +49,7 @@ public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireS
for (int i = 0; i < numberOfSnapshots; i++) {
entries.add(randomSnapshot());
}
return new SnapshotsInProgress(entries);
return SnapshotsInProgress.of(entries);
}
private Entry randomSnapshot() {
@ -112,7 +112,7 @@ public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireS
}
}
}
return new SnapshotsInProgress(entries);
return SnapshotsInProgress.of(entries);
}
@Override
@ -134,7 +134,6 @@ public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireS
} else {
entries.remove(randomIntBetween(0, entries.size() - 1));
}
return new SnapshotsInProgress(entries);
return SnapshotsInProgress.of(entries);
}
}

View File

@ -134,13 +134,7 @@ public abstract class AsyncRetryDuringSnapshotActionStep extends AsyncActionStep
}
private boolean snapshotInProgress(ClusterState state) {
SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) {
// No snapshots are running, new state is acceptable to proceed
return false;
}
for (SnapshotsInProgress.Entry snapshot : snapshotsInProgress.entries()) {
for (SnapshotsInProgress.Entry snapshot : state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
if (snapshot.indices().stream()
.map(IndexId::getName)
.anyMatch(name -> name.equals(indexName))) {
@ -148,7 +142,7 @@ public abstract class AsyncRetryDuringSnapshotActionStep extends AsyncActionStep
return true;
}
}
// There are snapshots, but none for this index, so it's okay to proceed with this state
// There are no snapshots for this index, so it's okay to proceed with this state
return false;
}

View File

@ -467,29 +467,25 @@ public class SnapshotRetentionTask implements SchedulerEngine.Listener {
public static boolean okayToDeleteSnapshots(ClusterState state) {
// Cannot delete during a snapshot
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress != null && snapshotsInProgress.entries().size() > 0) {
if (state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().size() > 0) {
logger.trace("deletion cannot proceed as there are snapshots in progress");
return false;
}
// Cannot delete during an existing delete
final SnapshotDeletionsInProgress deletionsInProgress = state.custom(SnapshotDeletionsInProgress.TYPE);
if (deletionsInProgress != null && deletionsInProgress.hasDeletionsInProgress()) {
if (state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).hasDeletionsInProgress()) {
logger.trace("deletion cannot proceed as there are snapshot deletions in progress");
return false;
}
// Cannot delete while a repository is being cleaned
final RepositoryCleanupInProgress repositoryCleanupInProgress = state.custom(RepositoryCleanupInProgress.TYPE);
if (repositoryCleanupInProgress != null && repositoryCleanupInProgress.hasCleanupInProgress()) {
if (state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress()) {
logger.trace("deletion cannot proceed as there are repository cleanups in progress");
return false;
}
// Cannot delete during a restore
final RestoreInProgress restoreInProgress = state.custom(RestoreInProgress.TYPE);
if (restoreInProgress != null && restoreInProgress.isEmpty() == false) {
if (state.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY).isEmpty() == false) {
logger.trace("deletion cannot proceed as there are snapshot restores in progress");
return false;
}

View File

@ -341,19 +341,19 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
public void testOkToDeleteSnapshots() {
final Snapshot snapshot = new Snapshot("repo", new SnapshotId("name", "uuid"));
SnapshotsInProgress inProgress = new SnapshotsInProgress(
new SnapshotsInProgress.Entry(
SnapshotsInProgress inProgress = SnapshotsInProgress.of(
Collections.singletonList(new SnapshotsInProgress.Entry(
snapshot, true, false, SnapshotsInProgress.State.INIT,
Collections.singletonList(new IndexId("name", "id")), 0, 0,
ImmutableOpenMap.<ShardId, SnapshotsInProgress.ShardSnapshotStatus>builder().build(), Collections.emptyMap(),
VersionUtils.randomVersion(random())));
VersionUtils.randomVersion(random()))));
ClusterState state = ClusterState.builder(new ClusterName("cluster"))
.putCustom(SnapshotsInProgress.TYPE, inProgress)
.build();
assertThat(SnapshotRetentionTask.okayToDeleteSnapshots(state), equalTo(false));
SnapshotDeletionsInProgress delInProgress = new SnapshotDeletionsInProgress(
SnapshotDeletionsInProgress delInProgress = SnapshotDeletionsInProgress.of(
Collections.singletonList(new SnapshotDeletionsInProgress.Entry(
Collections.singletonList(snapshot.getSnapshotId()), snapshot.getRepository(), 0, 0)));
state = ClusterState.builder(new ClusterName("cluster"))
@ -362,7 +362,8 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
assertThat(SnapshotRetentionTask.okayToDeleteSnapshots(state), equalTo(false));
RepositoryCleanupInProgress cleanupInProgress = new RepositoryCleanupInProgress(new RepositoryCleanupInProgress.Entry("repo", 0));
RepositoryCleanupInProgress cleanupInProgress =
new RepositoryCleanupInProgress(Collections.singletonList(new RepositoryCleanupInProgress.Entry("repo", 0)));
state = ClusterState.builder(new ClusterName("cluster"))
.putCustom(RepositoryCleanupInProgress.TYPE, cleanupInProgress)
.build();