diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 2f5ea4581b5..e23fdc32031 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -555,10 +555,9 @@ public class Node implements Closeable { RepositoriesService repositoryService = repositoriesModule.getRepositoryService(); repositoriesServiceReference.set(repositoryService); SnapshotsService snapshotsService = new SnapshotsService(settings, clusterService, - clusterModule.getIndexNameExpressionResolver(), repositoryService, threadPool); + clusterModule.getIndexNameExpressionResolver(), repositoryService, transportService, actionModule.getActionFilters()); SnapshotShardsService snapshotShardsService = new SnapshotShardsService(settings, clusterService, repositoryService, - threadPool, transportService, indicesService, actionModule.getActionFilters(), - clusterModule.getIndexNameExpressionResolver()); + transportService, indicesService); TransportNodesSnapshotsStatus nodesSnapshotsStatus = new TransportNodesSnapshotsStatus(threadPool, clusterService, transportService, snapshotShardsService, actionModule.getActionFilters()); RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(), diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 89c4e553a80..ae58101949e 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -26,33 +26,19 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.index.IndexCommit; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.ActionResponse; -import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.master.MasterNodeRequest; -import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.ClusterStateTaskConfig; -import org.elasticsearch.cluster.ClusterStateTaskExecutor; -import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; import org.elasticsearch.cluster.SnapshotsInProgress.ShardState; import org.elasticsearch.cluster.SnapshotsInProgress.State; -import org.elasticsearch.cluster.block.ClusterBlockException; -import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.Priority; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; -import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.seqno.SequenceNumbers; @@ -74,22 +60,18 @@ import org.elasticsearch.transport.TransportResponseHandler; import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; -import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.function.Function; import java.util.stream.Collectors; import static java.util.Collections.emptyMap; -import static java.util.Collections.unmodifiableList; -import static org.elasticsearch.cluster.SnapshotsInProgress.completed; /** - * This service runs on data and master nodes and controls currently snapshotted shards on these nodes. It is responsible for - * starting and stopping shard level snapshots + * This service runs on data nodes and controls currently running shard snapshots on these nodes. It is responsible for + * starting and stopping shard level snapshots. + * See package level documentation of {@link org.elasticsearch.snapshots} for details. */ public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener { private static final Logger logger = LogManager.getLogger(SnapshotShardsService.class); @@ -110,31 +92,21 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements private final TransportRequestDeduplicator remoteFailedRequestDeduplicator = new TransportRequestDeduplicator<>(); - private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor(); - private final UpdateSnapshotStatusAction updateSnapshotStatusHandler; - public SnapshotShardsService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService, - ThreadPool threadPool, TransportService transportService, IndicesService indicesService, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + TransportService transportService, IndicesService indicesService) { this.indicesService = indicesService; this.repositoriesService = repositoriesService; this.transportService = transportService; this.clusterService = clusterService; - this.threadPool = threadPool; + this.threadPool = transportService.getThreadPool(); if (DiscoveryNode.isDataNode(settings)) { // this is only useful on the nodes that can hold data clusterService.addListener(this); } - - // The constructor of UpdateSnapshotStatusAction will register itself to the TransportService. - this.updateSnapshotStatusHandler = - new UpdateSnapshotStatusAction(transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver); } @Override protected void doStart() { - assert this.updateSnapshotStatusHandler != null; - assert transportService.getRequestHandler(SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null; } @Override @@ -444,77 +416,6 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements } } - /** - * Internal request that is used to send changes in snapshot status to master - */ - public static class UpdateIndexShardSnapshotStatusRequest extends MasterNodeRequest { - private final Snapshot snapshot; - private final ShardId shardId; - private final ShardSnapshotStatus status; - - public UpdateIndexShardSnapshotStatusRequest(StreamInput in) throws IOException { - super(in); - snapshot = new Snapshot(in); - shardId = new ShardId(in); - status = new ShardSnapshotStatus(in); - } - - public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) { - this.snapshot = snapshot; - this.shardId = shardId; - this.status = status; - // By default, we keep trying to post snapshot status messages to avoid snapshot processes getting stuck. - this.masterNodeTimeout = TimeValue.timeValueNanos(Long.MAX_VALUE); - } - - @Override - public ActionRequestValidationException validate() { - return null; - } - - @Override - public void writeTo(StreamOutput out) throws IOException { - super.writeTo(out); - snapshot.writeTo(out); - shardId.writeTo(out); - status.writeTo(out); - } - - public Snapshot snapshot() { - return snapshot; - } - - public ShardId shardId() { - return shardId; - } - - public ShardSnapshotStatus status() { - return status; - } - - @Override - public String toString() { - return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]"; - } - - @Override - public boolean equals(final Object o) { - if (this == o) { - return true; - } - if (o == null || getClass() != o.getClass()) { - return false; - } - final UpdateIndexShardSnapshotStatusRequest that = (UpdateIndexShardSnapshotStatusRequest) o; - return snapshot.equals(that.snapshot) && shardId.equals(that.shardId) && status.equals(that.status); - } - - @Override - public int hashCode() { - return Objects.hash(snapshot, shardId, status); - } - } - /** Notify the master node that the given shard has been successfully snapshotted **/ private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId, String generation) { assert generation != null; @@ -569,125 +470,4 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements }) ); } - - /** - * Updates the shard status on master node - * - * @param request update shard status request - */ - private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request, - ActionListener listener) { - logger.trace("received updated snapshot restore state [{}]", request); - clusterService.submitStateUpdateTask( - "update snapshot state", - request, - ClusterStateTaskConfig.build(Priority.NORMAL), - snapshotStateExecutor, - new ClusterStateTaskListener() { - @Override - public void onFailure(String source, Exception e) { - listener.onFailure(e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new UpdateIndexShardSnapshotStatusResponse()); - } - }); - } - - private static class SnapshotStateExecutor implements ClusterStateTaskExecutor { - - @Override - public ClusterTasksResult - execute(ClusterState currentState, List tasks) { - final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - if (snapshots != null) { - int changedCount = 0; - final List entries = new ArrayList<>(); - for (SnapshotsInProgress.Entry entry : snapshots.entries()) { - ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); - boolean updated = false; - - for (UpdateIndexShardSnapshotStatusRequest updateSnapshotState : tasks) { - if (entry.snapshot().equals(updateSnapshotState.snapshot())) { - logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(), - updateSnapshotState.shardId(), updateSnapshotState.status().state()); - if (updated == false) { - shards.putAll(entry.shards()); - updated = true; - } - shards.put(updateSnapshotState.shardId(), updateSnapshotState.status()); - changedCount++; - } - } - - if (updated) { - if (completed(shards.values()) == false) { - entries.add(new SnapshotsInProgress.Entry(entry, shards.build())); - } else { - // Snapshot is finished - mark it as done - // TODO: Add PARTIAL_SUCCESS status? - SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, State.SUCCESS, shards.build()); - entries.add(updatedEntry); - } - } else { - entries.add(entry); - } - } - if (changedCount > 0) { - logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount); - return ClusterTasksResult.builder().successes(tasks) - .build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, - SnapshotsInProgress.of(unmodifiableList(entries))).build()); - } - } - return ClusterTasksResult.builder().successes(tasks).build(currentState); - } - } - - static class UpdateIndexShardSnapshotStatusResponse extends ActionResponse { - - UpdateIndexShardSnapshotStatusResponse() {} - - UpdateIndexShardSnapshotStatusResponse(StreamInput in) throws IOException { - super(in); - } - - @Override - public void writeTo(StreamOutput out) throws IOException {} - } - - private class UpdateSnapshotStatusAction - extends TransportMasterNodeAction { - UpdateSnapshotStatusAction(TransportService transportService, ClusterService clusterService, - ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { - super( - SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, false, transportService, clusterService, threadPool, - actionFilters, UpdateIndexShardSnapshotStatusRequest::new, indexNameExpressionResolver - ); - } - - @Override - protected String executor() { - return ThreadPool.Names.SAME; - } - - @Override - protected UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException { - return new UpdateIndexShardSnapshotStatusResponse(in); - } - - @Override - protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state, - ActionListener listener) { - innerUpdateSnapshotState(request, listener); - } - - @Override - protected ClusterBlockException checkBlock(UpdateIndexShardSnapshotStatusRequest request, ClusterState state) { - return null; - } - } - } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index f95ba972ee8..676fc83c340 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -31,9 +31,14 @@ import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.StepListener; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; +import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.RepositoryCleanupInProgress; @@ -43,6 +48,7 @@ import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; import org.elasticsearch.cluster.SnapshotsInProgress.ShardState; import org.elasticsearch.cluster.SnapshotsInProgress.State; +import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexMetadata; @@ -63,6 +69,7 @@ import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.component.AbstractLifecycleComponent; +import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; @@ -77,7 +84,9 @@ import org.elasticsearch.repositories.RepositoryException; import org.elasticsearch.repositories.RepositoryMissingException; import org.elasticsearch.repositories.ShardGenerations; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -100,8 +109,9 @@ import static java.util.Collections.unmodifiableList; import static org.elasticsearch.cluster.SnapshotsInProgress.completed; /** - * Service responsible for creating snapshots. See package level documentation of {@link org.elasticsearch.snapshots} - * for details. + * Service responsible for creating snapshots. This service runs all the steps executed on the master node during snapshot creation and + * deletion. + * See package level documentation of {@link org.elasticsearch.snapshots} for details. */ public class SnapshotsService extends AbstractLifecycleComponent implements ClusterStateApplier { @@ -138,13 +148,22 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus // Set of snapshots that are currently being ended by this node private final Set endingSnapshots = Collections.synchronizedSet(new HashSet<>()); + private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor(); + private final UpdateSnapshotStatusAction updateSnapshotStatusHandler; + + private final TransportService transportService; + public SnapshotsService(Settings settings, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver, - RepositoriesService repositoriesService, ThreadPool threadPool) { + RepositoriesService repositoriesService, TransportService transportService, ActionFilters actionFilters) { this.clusterService = clusterService; this.indexNameExpressionResolver = indexNameExpressionResolver; this.repositoriesService = repositoriesService; - this.threadPool = threadPool; + this.threadPool = transportService.getThreadPool(); + this.transportService = transportService; + // The constructor of UpdateSnapshotStatusAction will register itself to the TransportService. + this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction( + transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver); if (DiscoveryNode.isMasterNode(settings)) { // addLowPriorityApplier to make sure that Repository will be created before snapshot clusterService.addLowPriorityApplier(this); @@ -181,10 +200,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus final Map userMeta = repository.adaptUserMetadata(request.userMetadata()); clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() { - private SnapshotsInProgress.Entry newSnapshot = null; - private List indices; + private SnapshotsInProgress.Entry newEntry; + @Override public ClusterState execute(ClusterState currentState) { validate(repositoryName, snapshotName, currentState); @@ -213,7 +232,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus indexNameExpressionResolver.dataStreamNames(currentState, request.indicesOptions(), request.indices()); logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices); - newSnapshot = new SnapshotsInProgress.Entry( + newEntry = new SnapshotsInProgress.Entry( new Snapshot(repositoryName, snapshotId), request.includeGlobalState(), request.partial(), State.INIT, @@ -224,28 +243,28 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus null, userMeta, Version.CURRENT ); - initializingSnapshots.add(newSnapshot.snapshot()); - snapshots = SnapshotsInProgress.of(Collections.singletonList(newSnapshot)); + initializingSnapshots.add(newEntry.snapshot()); + snapshots = SnapshotsInProgress.of(Collections.singletonList(newEntry)); return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build(); } @Override public void onFailure(String source, Exception e) { logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e); - if (newSnapshot != null) { - initializingSnapshots.remove(newSnapshot.snapshot()); + if (newEntry != null) { + initializingSnapshots.remove(newEntry.snapshot()); } - newSnapshot = null; + newEntry = null; listener.onFailure(e); } @Override public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) { - if (newSnapshot != null) { - final Snapshot current = newSnapshot.snapshot(); + if (newEntry != null) { + final Snapshot current = newEntry.snapshot(); assert initializingSnapshots.contains(current); assert indices != null; - beginSnapshot(newState, newSnapshot, request.partial(), indices, repository, new ActionListener() { + beginSnapshot(newState, newEntry, request.partial(), indices, repository, new ActionListener() { @Override public void onResponse(final Snapshot snapshot) { initializingSnapshots.remove(snapshot); @@ -447,6 +466,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus final SnapshotsInProgress.Entry entry = snapshotsInProgress.snapshot(snapshot.snapshot()); assert entry != null; endSnapshot(entry, newState.metadata()); + } else { + endCompletedSnapshots(newState); } } }); @@ -600,15 +621,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus if (event.routingTableChanged() && waitingShardsStartedOrUnassigned(snapshotsInProgress, event)) { processStartedShards(); } - // Cleanup all snapshots that have no more work left: - // 1. Completed snapshots - // 2. Snapshots in state INIT that the previous master failed to start - // 3. Snapshots in any other state that have all their shard tasks completed - snapshotsInProgress.entries().stream().filter( - entry -> entry.state().completed() - || initializingSnapshots.contains(entry.snapshot()) == false - && (entry.state() == State.INIT || completed(entry.shards().values())) - ).forEach(entry -> endSnapshot(entry, event.state().metadata())); + if (newMaster) { + endCompletedSnapshots(event.state()); + } } if (newMaster) { finalizeSnapshotDeletionFromPreviousMaster(event.state()); @@ -630,6 +645,20 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus assert assertConsistentWithClusterState(event.state()); } + /** + * Cleanup all snapshots found in the given cluster state that have no more work left: + * 1. Completed snapshots + * 2. Snapshots in state INIT that a previous master of an older version failed to start + * 3. Snapshots in any other state that have all their shard tasks completed + */ + private void endCompletedSnapshots(ClusterState state) { + SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE); + assert snapshotsInProgress != null; + snapshotsInProgress.entries().stream().filter( + entry -> entry.state().completed() || entry.state() == State.INIT || completed(entry.shards().values()) + ).forEach(entry -> endSnapshot(entry, state.metadata())); + } + private boolean assertConsistentWithClusterState(ClusterState state) { final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE); if (snapshotsInProgress != null && snapshotsInProgress.entries().isEmpty() == false) { @@ -672,6 +701,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus */ private void processSnapshotsOnRemovedNodes() { clusterService.submitStateUpdateTask("update snapshot state after node removal", new ClusterStateUpdateTask() { + + private boolean changed = false; + @Override public ClusterState execute(ClusterState currentState) { DiscoveryNodes nodes = currentState.nodes(); @@ -679,7 +711,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus if (snapshots == null) { return currentState; } - boolean changed = false; ArrayList entries = new ArrayList<>(); for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) { SnapshotsInProgress.Entry updatedSnapshot = snapshot; @@ -734,17 +765,26 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus public void onFailure(String source, Exception e) { logger.warn("failed to update snapshot state after node removal"); } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (changed) { + endCompletedSnapshots(newState); + } + } }); } private void processStartedShards() { clusterService.submitStateUpdateTask("update snapshot state after shards started", new ClusterStateUpdateTask() { + + private boolean changed = false; + @Override public ClusterState execute(ClusterState currentState) { RoutingTable routingTable = currentState.routingTable(); SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); if (snapshots != null) { - boolean changed = false; ArrayList entries = new ArrayList<>(); for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) { SnapshotsInProgress.Entry updatedSnapshot = snapshot; @@ -764,7 +804,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus } if (changed) { return ClusterState.builder(currentState) - .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(unmodifiableList(entries))).build(); + .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build(); } } return currentState; @@ -775,6 +815,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus logger.warn(() -> new ParameterizedMessage("failed to update snapshot state after shards started from [{}] ", source), e); } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (changed) { + endCompletedSnapshots(newState); + } + } }); } @@ -1559,7 +1606,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus @Override protected void doStart() { - + assert this.updateSnapshotStatusHandler != null; + assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null; } @Override @@ -1585,4 +1633,116 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus } return true; } + + private static class SnapshotStateExecutor implements ClusterStateTaskExecutor { + + @Override + public ClusterTasksResult + execute(ClusterState currentState, List tasks) { + final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + if (snapshots != null) { + int changedCount = 0; + final List entries = new ArrayList<>(); + for (SnapshotsInProgress.Entry entry : snapshots.entries()) { + ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); + boolean updated = false; + + for (UpdateIndexShardSnapshotStatusRequest updateSnapshotState : tasks) { + if (entry.snapshot().equals(updateSnapshotState.snapshot())) { + logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(), + updateSnapshotState.shardId(), updateSnapshotState.status().state()); + if (updated == false) { + shards.putAll(entry.shards()); + updated = true; + } + shards.put(updateSnapshotState.shardId(), updateSnapshotState.status()); + changedCount++; + } + } + + if (updated) { + if (completed(shards.values()) == false) { + entries.add(new SnapshotsInProgress.Entry(entry, shards.build())); + } else { + // Snapshot is finished - mark it as done + // TODO: Add PARTIAL_SUCCESS status? + SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, State.SUCCESS, shards.build()); + entries.add(updatedEntry); + } + } else { + entries.add(entry); + } + } + if (changedCount > 0) { + logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount); + return ClusterTasksResult.builder().successes(tasks) + .build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, + SnapshotsInProgress.of(entries)).build()); + } + } + return ClusterTasksResult.builder().successes(tasks).build(currentState); + } + } + + /** + * Updates the shard status on master node + * + * @param request update shard status request + */ + private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request, + ActionListener listener) { + logger.trace("received updated snapshot restore state [{}]", request); + clusterService.submitStateUpdateTask( + "update snapshot state", + request, + ClusterStateTaskConfig.build(Priority.NORMAL), + snapshotStateExecutor, + new ClusterStateTaskListener() { + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + try { + listener.onResponse(new UpdateIndexShardSnapshotStatusResponse()); + } finally { + endCompletedSnapshots(newState); + } + } + }); + } + + private class UpdateSnapshotStatusAction + extends TransportMasterNodeAction { + UpdateSnapshotStatusAction(TransportService transportService, ClusterService clusterService, + ThreadPool threadPool, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver) { + super(UPDATE_SNAPSHOT_STATUS_ACTION_NAME, false, transportService, clusterService, threadPool, + actionFilters, UpdateIndexShardSnapshotStatusRequest::new, indexNameExpressionResolver + ); + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException { + return new UpdateIndexShardSnapshotStatusResponse(in); + } + + @Override + protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state, + ActionListener listener) throws Exception { + innerUpdateSnapshotState(request, listener); + } + + @Override + protected ClusterBlockException checkBlock(UpdateIndexShardSnapshotStatusRequest request, ClusterState state) { + return null; + } + } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusRequest.java b/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusRequest.java new file mode 100644 index 00000000000..1d20d2d109b --- /dev/null +++ b/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusRequest.java @@ -0,0 +1,101 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.snapshots; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.support.master.MasterNodeRequest; +import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.Objects; + +/** + * Internal request that is used to send changes in snapshot status to master + */ +public class UpdateIndexShardSnapshotStatusRequest extends MasterNodeRequest { + private final Snapshot snapshot; + private final ShardId shardId; + private final SnapshotsInProgress.ShardSnapshotStatus status; + + public UpdateIndexShardSnapshotStatusRequest(StreamInput in) throws IOException { + super(in); + snapshot = new Snapshot(in); + shardId = new ShardId(in); + status = new SnapshotsInProgress.ShardSnapshotStatus(in); + } + + public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, SnapshotsInProgress.ShardSnapshotStatus status) { + this.snapshot = snapshot; + this.shardId = shardId; + this.status = status; + // By default, we keep trying to post snapshot status messages to avoid snapshot processes getting stuck. + this.masterNodeTimeout = TimeValue.timeValueNanos(Long.MAX_VALUE); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + snapshot.writeTo(out); + shardId.writeTo(out); + status.writeTo(out); + } + + public Snapshot snapshot() { + return snapshot; + } + + public ShardId shardId() { + return shardId; + } + + public SnapshotsInProgress.ShardSnapshotStatus status() { + return status; + } + + @Override + public String toString() { + return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]"; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final UpdateIndexShardSnapshotStatusRequest that = (UpdateIndexShardSnapshotStatusRequest) o; + return snapshot.equals(that.snapshot) && shardId.equals(that.shardId) && status.equals(that.status); + } + + @Override + public int hashCode() { + return Objects.hash(snapshot, shardId, status); + } +} diff --git a/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusResponse.java b/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusResponse.java new file mode 100644 index 00000000000..6ee95f87498 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/snapshots/UpdateIndexShardSnapshotStatusResponse.java @@ -0,0 +1,37 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.snapshots; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +class UpdateIndexShardSnapshotStatusResponse extends ActionResponse { + + UpdateIndexShardSnapshotStatusResponse() {} + + UpdateIndexShardSnapshotStatusResponse(StreamInput in) throws IOException { + super(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException {} +} diff --git a/server/src/main/java/org/elasticsearch/snapshots/package-info.java b/server/src/main/java/org/elasticsearch/snapshots/package-info.java index d4a31ad4135..c651b74e69d 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/package-info.java +++ b/server/src/main/java/org/elasticsearch/snapshots/package-info.java @@ -28,7 +28,7 @@ * {@link org.elasticsearch.cluster.SnapshotsInProgress}. All nodes consume the state of the {@code SnapshotsInProgress} and will start or * abort relevant shard snapshot tasks accordingly. *
  • Nodes that are executing shard snapshot tasks report either success or failure of their snapshot task by submitting a - * {@link org.elasticsearch.snapshots.SnapshotShardsService.UpdateIndexShardSnapshotStatusRequest} to the master node that will update the + * {@link org.elasticsearch.snapshots.UpdateIndexShardSnapshotStatusRequest} to the master node that will update the * snapshot's entry in the cluster state accordingly.
  • * * diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 5ca8fec8451..6c09a997cbd 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1419,8 +1419,9 @@ public class SnapshotResiliencyTests extends ESTestCase { settings, clusterService, transportService, Collections.singletonMap(FsRepository.TYPE, getRepoFactory(environment)), emptyMap(), threadPool ); - snapshotsService = - new SnapshotsService(settings, clusterService, indexNameExpressionResolver, repositoriesService, threadPool); + final ActionFilters actionFilters = new ActionFilters(emptySet()); + snapshotsService = new SnapshotsService(settings, clusterService, indexNameExpressionResolver, repositoriesService, + transportService, actionFilters); nodeEnv = new NodeEnvironment(settings, environment); final NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(Collections.emptyList()); final ScriptService scriptService = new ScriptService(settings, emptyMap(), emptyMap()); @@ -1453,10 +1454,8 @@ public class SnapshotResiliencyTests extends ESTestCase { null ); final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings); - final ActionFilters actionFilters = new ActionFilters(emptySet()); - snapshotShardsService = new SnapshotShardsService( - settings, clusterService, repositoriesService, threadPool, - transportService, indicesService, actionFilters, indexNameExpressionResolver); + snapshotShardsService = + new SnapshotShardsService(settings, clusterService, repositoriesService, transportService, indicesService); final ShardStateAction shardStateAction = new ShardStateAction( clusterService, transportService, allocationService, new BatchedRerouteService(clusterService, allocationService::reroute), diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceTests.java index 5a513d16036..564ee3aecac 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotShardsServiceTests.java @@ -44,18 +44,18 @@ public class SnapshotShardsServiceTests extends ESTestCase { public void testEqualsAndHashcodeUpdateIndexShardSnapshotStatusRequest() { EqualsHashCodeTestUtils.checkEqualsAndHashCode( - new SnapshotShardsService.UpdateIndexShardSnapshotStatusRequest( + new UpdateIndexShardSnapshotStatusRequest( new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(10), UUIDs.randomBase64UUID(random()))), new ShardId(randomAlphaOfLength(10), UUIDs.randomBase64UUID(random()), randomInt(5)), new SnapshotsInProgress.ShardSnapshotStatus(randomAlphaOfLength(10), UUIDs.randomBase64UUID(random()))), request -> - new SnapshotShardsService.UpdateIndexShardSnapshotStatusRequest(request.snapshot(), request.shardId(), request.status()), + new UpdateIndexShardSnapshotStatusRequest(request.snapshot(), request.shardId(), request.status()), request -> { final boolean mutateSnapshot = randomBoolean(); final boolean mutateShardId = randomBoolean(); final boolean mutateStatus = (mutateSnapshot || mutateShardId) == false || randomBoolean(); - return new SnapshotShardsService.UpdateIndexShardSnapshotStatusRequest( + return new UpdateIndexShardSnapshotStatusRequest( mutateSnapshot ? new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(10), UUIDs.randomBase64UUID(random()))) : request.snapshot(), mutateShardId ?