Move all Snapshot Master Node Steps to SnapshotsService (#56365) (#59373)

This refactoring has three motivations:

1. Separate all master node steps during snapshot operations from all data node steps in code.
2. Set up next steps in concurrent repository operations and general improvements by centralizing tracking of each shard's state in the repository in `SnapshotsService` so that operations for each shard can be linearized efficiently (i.e. without having to inspect the full snapshot state for all shards on every cluster state update, allowing us to track more in memory and only fall back to inspecting the full CS on master failover like we do in the snapshot shards service).
    * This PR already contains some best effort examples of this, but obviously this could be way improved upon still (just did not want to do it in this PR for complexity reasons)
3. Make the `SnapshotsService` less expensive on the CS thread for large snapshots
This commit is contained in:
Armin Braun 2020-07-12 22:19:07 +02:00 committed by GitHub
parent a6a27b76dc
commit 483386136d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 342 additions and 266 deletions

View File

@ -555,10 +555,9 @@ public class Node implements Closeable {
RepositoriesService repositoryService = repositoriesModule.getRepositoryService();
repositoriesServiceReference.set(repositoryService);
SnapshotsService snapshotsService = new SnapshotsService(settings, clusterService,
clusterModule.getIndexNameExpressionResolver(), repositoryService, threadPool);
clusterModule.getIndexNameExpressionResolver(), repositoryService, transportService, actionModule.getActionFilters());
SnapshotShardsService snapshotShardsService = new SnapshotShardsService(settings, clusterService, repositoryService,
threadPool, transportService, indicesService, actionModule.getActionFilters(),
clusterModule.getIndexNameExpressionResolver());
transportService, indicesService);
TransportNodesSnapshotsStatus nodesSnapshotsStatus = new TransportNodesSnapshotsStatus(threadPool, clusterService,
transportService, snapshotShardsService, actionModule.getActionFilters());
RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(),

View File

@ -26,33 +26,19 @@ import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.seqno.SequenceNumbers;
@ -74,22 +60,18 @@ import org.elasticsearch.transport.TransportResponseHandler;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableList;
import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
/**
* This service runs on data and master nodes and controls currently snapshotted shards on these nodes. It is responsible for
* starting and stopping shard level snapshots
* This service runs on data nodes and controls currently running shard snapshots on these nodes. It is responsible for
* starting and stopping shard level snapshots.
* See package level documentation of {@link org.elasticsearch.snapshots} for details.
*/
public class SnapshotShardsService extends AbstractLifecycleComponent implements ClusterStateListener, IndexEventListener {
private static final Logger logger = LogManager.getLogger(SnapshotShardsService.class);
@ -110,31 +92,21 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
private final TransportRequestDeduplicator<UpdateIndexShardSnapshotStatusRequest> remoteFailedRequestDeduplicator =
new TransportRequestDeduplicator<>();
private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor();
private final UpdateSnapshotStatusAction updateSnapshotStatusHandler;
public SnapshotShardsService(Settings settings, ClusterService clusterService, RepositoriesService repositoriesService,
ThreadPool threadPool, TransportService transportService, IndicesService indicesService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
TransportService transportService, IndicesService indicesService) {
this.indicesService = indicesService;
this.repositoriesService = repositoriesService;
this.transportService = transportService;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.threadPool = transportService.getThreadPool();
if (DiscoveryNode.isDataNode(settings)) {
// this is only useful on the nodes that can hold data
clusterService.addListener(this);
}
// The constructor of UpdateSnapshotStatusAction will register itself to the TransportService.
this.updateSnapshotStatusHandler =
new UpdateSnapshotStatusAction(transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver);
}
@Override
protected void doStart() {
assert this.updateSnapshotStatusHandler != null;
assert transportService.getRequestHandler(SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null;
}
@Override
@ -444,77 +416,6 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
}
}
/**
* Internal request that is used to send changes in snapshot status to master
*/
public static class UpdateIndexShardSnapshotStatusRequest extends MasterNodeRequest<UpdateIndexShardSnapshotStatusRequest> {
private final Snapshot snapshot;
private final ShardId shardId;
private final ShardSnapshotStatus status;
public UpdateIndexShardSnapshotStatusRequest(StreamInput in) throws IOException {
super(in);
snapshot = new Snapshot(in);
shardId = new ShardId(in);
status = new ShardSnapshotStatus(in);
}
public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus status) {
this.snapshot = snapshot;
this.shardId = shardId;
this.status = status;
// By default, we keep trying to post snapshot status messages to avoid snapshot processes getting stuck.
this.masterNodeTimeout = TimeValue.timeValueNanos(Long.MAX_VALUE);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
snapshot.writeTo(out);
shardId.writeTo(out);
status.writeTo(out);
}
public Snapshot snapshot() {
return snapshot;
}
public ShardId shardId() {
return shardId;
}
public ShardSnapshotStatus status() {
return status;
}
@Override
public String toString() {
return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]";
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final UpdateIndexShardSnapshotStatusRequest that = (UpdateIndexShardSnapshotStatusRequest) o;
return snapshot.equals(that.snapshot) && shardId.equals(that.shardId) && status.equals(that.status);
}
@Override
public int hashCode() {
return Objects.hash(snapshot, shardId, status);
}
}
/** Notify the master node that the given shard has been successfully snapshotted **/
private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId, String generation) {
assert generation != null;
@ -569,125 +470,4 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
})
);
}
/**
* Updates the shard status on master node
*
* @param request update shard status request
*/
private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request,
ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) {
logger.trace("received updated snapshot restore state [{}]", request);
clusterService.submitStateUpdateTask(
"update snapshot state",
request,
ClusterStateTaskConfig.build(Priority.NORMAL),
snapshotStateExecutor,
new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new UpdateIndexShardSnapshotStatusResponse());
}
});
}
private static class SnapshotStateExecutor implements ClusterStateTaskExecutor<UpdateIndexShardSnapshotStatusRequest> {
@Override
public ClusterTasksResult<UpdateIndexShardSnapshotStatusRequest>
execute(ClusterState currentState, List<UpdateIndexShardSnapshotStatusRequest> tasks) {
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots != null) {
int changedCount = 0;
final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
boolean updated = false;
for (UpdateIndexShardSnapshotStatusRequest updateSnapshotState : tasks) {
if (entry.snapshot().equals(updateSnapshotState.snapshot())) {
logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(),
updateSnapshotState.shardId(), updateSnapshotState.status().state());
if (updated == false) {
shards.putAll(entry.shards());
updated = true;
}
shards.put(updateSnapshotState.shardId(), updateSnapshotState.status());
changedCount++;
}
}
if (updated) {
if (completed(shards.values()) == false) {
entries.add(new SnapshotsInProgress.Entry(entry, shards.build()));
} else {
// Snapshot is finished - mark it as done
// TODO: Add PARTIAL_SUCCESS status?
SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, State.SUCCESS, shards.build());
entries.add(updatedEntry);
}
} else {
entries.add(entry);
}
}
if (changedCount > 0) {
logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount);
return ClusterTasksResult.<UpdateIndexShardSnapshotStatusRequest>builder().successes(tasks)
.build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
SnapshotsInProgress.of(unmodifiableList(entries))).build());
}
}
return ClusterTasksResult.<UpdateIndexShardSnapshotStatusRequest>builder().successes(tasks).build(currentState);
}
}
static class UpdateIndexShardSnapshotStatusResponse extends ActionResponse {
UpdateIndexShardSnapshotStatusResponse() {}
UpdateIndexShardSnapshotStatusResponse(StreamInput in) throws IOException {
super(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {}
}
private class UpdateSnapshotStatusAction
extends TransportMasterNodeAction<UpdateIndexShardSnapshotStatusRequest, UpdateIndexShardSnapshotStatusResponse> {
UpdateSnapshotStatusAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
super(
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, false, transportService, clusterService, threadPool,
actionFilters, UpdateIndexShardSnapshotStatusRequest::new, indexNameExpressionResolver
);
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException {
return new UpdateIndexShardSnapshotStatusResponse(in);
}
@Override
protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state,
ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) {
innerUpdateSnapshotState(request, listener);
}
@Override
protected ClusterBlockException checkBlock(UpdateIndexShardSnapshotStatusRequest request, ClusterState state) {
return null;
}
}
}

View File

@ -31,9 +31,14 @@ import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.RepositoryCleanupInProgress;
@ -43,6 +48,7 @@ import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
@ -63,6 +69,7 @@ import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
@ -77,7 +84,9 @@ import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryMissingException;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@ -100,8 +109,9 @@ import static java.util.Collections.unmodifiableList;
import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
/**
* Service responsible for creating snapshots. See package level documentation of {@link org.elasticsearch.snapshots}
* for details.
* Service responsible for creating snapshots. This service runs all the steps executed on the master node during snapshot creation and
* deletion.
* See package level documentation of {@link org.elasticsearch.snapshots} for details.
*/
public class SnapshotsService extends AbstractLifecycleComponent implements ClusterStateApplier {
@ -138,13 +148,22 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
// Set of snapshots that are currently being ended by this node
private final Set<Snapshot> endingSnapshots = Collections.synchronizedSet(new HashSet<>());
private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor();
private final UpdateSnapshotStatusAction updateSnapshotStatusHandler;
private final TransportService transportService;
public SnapshotsService(Settings settings, ClusterService clusterService, IndexNameExpressionResolver indexNameExpressionResolver,
RepositoriesService repositoriesService, ThreadPool threadPool) {
RepositoriesService repositoriesService, TransportService transportService, ActionFilters actionFilters) {
this.clusterService = clusterService;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.repositoriesService = repositoriesService;
this.threadPool = threadPool;
this.threadPool = transportService.getThreadPool();
this.transportService = transportService;
// The constructor of UpdateSnapshotStatusAction will register itself to the TransportService.
this.updateSnapshotStatusHandler = new UpdateSnapshotStatusAction(
transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver);
if (DiscoveryNode.isMasterNode(settings)) {
// addLowPriorityApplier to make sure that Repository will be created before snapshot
clusterService.addLowPriorityApplier(this);
@ -181,10 +200,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
final Map<String, Object> userMeta = repository.adaptUserMetadata(request.userMetadata());
clusterService.submitStateUpdateTask("create_snapshot [" + snapshotName + ']', new ClusterStateUpdateTask() {
private SnapshotsInProgress.Entry newSnapshot = null;
private List<String> indices;
private SnapshotsInProgress.Entry newEntry;
@Override
public ClusterState execute(ClusterState currentState) {
validate(repositoryName, snapshotName, currentState);
@ -213,7 +232,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
indexNameExpressionResolver.dataStreamNames(currentState, request.indicesOptions(), request.indices());
logger.trace("[{}][{}] creating snapshot for indices [{}]", repositoryName, snapshotName, indices);
newSnapshot = new SnapshotsInProgress.Entry(
newEntry = new SnapshotsInProgress.Entry(
new Snapshot(repositoryName, snapshotId),
request.includeGlobalState(), request.partial(),
State.INIT,
@ -224,28 +243,28 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
null,
userMeta, Version.CURRENT
);
initializingSnapshots.add(newSnapshot.snapshot());
snapshots = SnapshotsInProgress.of(Collections.singletonList(newSnapshot));
initializingSnapshots.add(newEntry.snapshot());
snapshots = SnapshotsInProgress.of(Collections.singletonList(newEntry));
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, snapshots).build();
}
@Override
public void onFailure(String source, Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to create snapshot", repositoryName, snapshotName), e);
if (newSnapshot != null) {
initializingSnapshots.remove(newSnapshot.snapshot());
if (newEntry != null) {
initializingSnapshots.remove(newEntry.snapshot());
}
newSnapshot = null;
newEntry = null;
listener.onFailure(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
if (newSnapshot != null) {
final Snapshot current = newSnapshot.snapshot();
if (newEntry != null) {
final Snapshot current = newEntry.snapshot();
assert initializingSnapshots.contains(current);
assert indices != null;
beginSnapshot(newState, newSnapshot, request.partial(), indices, repository, new ActionListener<Snapshot>() {
beginSnapshot(newState, newEntry, request.partial(), indices, repository, new ActionListener<Snapshot>() {
@Override
public void onResponse(final Snapshot snapshot) {
initializingSnapshots.remove(snapshot);
@ -447,6 +466,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
final SnapshotsInProgress.Entry entry = snapshotsInProgress.snapshot(snapshot.snapshot());
assert entry != null;
endSnapshot(entry, newState.metadata());
} else {
endCompletedSnapshots(newState);
}
}
});
@ -600,15 +621,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
if (event.routingTableChanged() && waitingShardsStartedOrUnassigned(snapshotsInProgress, event)) {
processStartedShards();
}
// Cleanup all snapshots that have no more work left:
// 1. Completed snapshots
// 2. Snapshots in state INIT that the previous master failed to start
// 3. Snapshots in any other state that have all their shard tasks completed
snapshotsInProgress.entries().stream().filter(
entry -> entry.state().completed()
|| initializingSnapshots.contains(entry.snapshot()) == false
&& (entry.state() == State.INIT || completed(entry.shards().values()))
).forEach(entry -> endSnapshot(entry, event.state().metadata()));
if (newMaster) {
endCompletedSnapshots(event.state());
}
}
if (newMaster) {
finalizeSnapshotDeletionFromPreviousMaster(event.state());
@ -630,6 +645,20 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
assert assertConsistentWithClusterState(event.state());
}
/**
* Cleanup all snapshots found in the given cluster state that have no more work left:
* 1. Completed snapshots
* 2. Snapshots in state INIT that a previous master of an older version failed to start
* 3. Snapshots in any other state that have all their shard tasks completed
*/
private void endCompletedSnapshots(ClusterState state) {
SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
assert snapshotsInProgress != null;
snapshotsInProgress.entries().stream().filter(
entry -> entry.state().completed() || entry.state() == State.INIT || completed(entry.shards().values())
).forEach(entry -> endSnapshot(entry, state.metadata()));
}
private boolean assertConsistentWithClusterState(ClusterState state) {
final SnapshotsInProgress snapshotsInProgress = state.custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress != null && snapshotsInProgress.entries().isEmpty() == false) {
@ -672,6 +701,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
*/
private void processSnapshotsOnRemovedNodes() {
clusterService.submitStateUpdateTask("update snapshot state after node removal", new ClusterStateUpdateTask() {
private boolean changed = false;
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes nodes = currentState.nodes();
@ -679,7 +711,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
if (snapshots == null) {
return currentState;
}
boolean changed = false;
ArrayList<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
SnapshotsInProgress.Entry updatedSnapshot = snapshot;
@ -734,17 +765,26 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
public void onFailure(String source, Exception e) {
logger.warn("failed to update snapshot state after node removal");
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (changed) {
endCompletedSnapshots(newState);
}
}
});
}
private void processStartedShards() {
clusterService.submitStateUpdateTask("update snapshot state after shards started", new ClusterStateUpdateTask() {
private boolean changed = false;
@Override
public ClusterState execute(ClusterState currentState) {
RoutingTable routingTable = currentState.routingTable();
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots != null) {
boolean changed = false;
ArrayList<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
SnapshotsInProgress.Entry updatedSnapshot = snapshot;
@ -764,7 +804,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
}
if (changed) {
return ClusterState.builder(currentState)
.putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(unmodifiableList(entries))).build();
.putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build();
}
}
return currentState;
@ -775,6 +815,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
logger.warn(() ->
new ParameterizedMessage("failed to update snapshot state after shards started from [{}] ", source), e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (changed) {
endCompletedSnapshots(newState);
}
}
});
}
@ -1559,7 +1606,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
@Override
protected void doStart() {
assert this.updateSnapshotStatusHandler != null;
assert transportService.getRequestHandler(UPDATE_SNAPSHOT_STATUS_ACTION_NAME) != null;
}
@Override
@ -1585,4 +1633,116 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
}
return true;
}
private static class SnapshotStateExecutor implements ClusterStateTaskExecutor<UpdateIndexShardSnapshotStatusRequest> {
@Override
public ClusterTasksResult<UpdateIndexShardSnapshotStatusRequest>
execute(ClusterState currentState, List<UpdateIndexShardSnapshotStatusRequest> tasks) {
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots != null) {
int changedCount = 0;
final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
boolean updated = false;
for (UpdateIndexShardSnapshotStatusRequest updateSnapshotState : tasks) {
if (entry.snapshot().equals(updateSnapshotState.snapshot())) {
logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(),
updateSnapshotState.shardId(), updateSnapshotState.status().state());
if (updated == false) {
shards.putAll(entry.shards());
updated = true;
}
shards.put(updateSnapshotState.shardId(), updateSnapshotState.status());
changedCount++;
}
}
if (updated) {
if (completed(shards.values()) == false) {
entries.add(new SnapshotsInProgress.Entry(entry, shards.build()));
} else {
// Snapshot is finished - mark it as done
// TODO: Add PARTIAL_SUCCESS status?
SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, State.SUCCESS, shards.build());
entries.add(updatedEntry);
}
} else {
entries.add(entry);
}
}
if (changedCount > 0) {
logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount);
return ClusterTasksResult.<UpdateIndexShardSnapshotStatusRequest>builder().successes(tasks)
.build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
SnapshotsInProgress.of(entries)).build());
}
}
return ClusterTasksResult.<UpdateIndexShardSnapshotStatusRequest>builder().successes(tasks).build(currentState);
}
}
/**
* Updates the shard status on master node
*
* @param request update shard status request
*/
private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request,
ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) {
logger.trace("received updated snapshot restore state [{}]", request);
clusterService.submitStateUpdateTask(
"update snapshot state",
request,
ClusterStateTaskConfig.build(Priority.NORMAL),
snapshotStateExecutor,
new ClusterStateTaskListener() {
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
try {
listener.onResponse(new UpdateIndexShardSnapshotStatusResponse());
} finally {
endCompletedSnapshots(newState);
}
}
});
}
private class UpdateSnapshotStatusAction
extends TransportMasterNodeAction<UpdateIndexShardSnapshotStatusRequest, UpdateIndexShardSnapshotStatusResponse> {
UpdateSnapshotStatusAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(UPDATE_SNAPSHOT_STATUS_ACTION_NAME, false, transportService, clusterService, threadPool,
actionFilters, UpdateIndexShardSnapshotStatusRequest::new, indexNameExpressionResolver
);
}
@Override
protected String executor() {
return ThreadPool.Names.SAME;
}
@Override
protected UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException {
return new UpdateIndexShardSnapshotStatusResponse(in);
}
@Override
protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state,
ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) throws Exception {
innerUpdateSnapshotState(request, listener);
}
@Override
protected ClusterBlockException checkBlock(UpdateIndexShardSnapshotStatusRequest request, ClusterState state) {
return null;
}
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.snapshots;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import java.util.Objects;
/**
* Internal request that is used to send changes in snapshot status to master
*/
public class UpdateIndexShardSnapshotStatusRequest extends MasterNodeRequest<UpdateIndexShardSnapshotStatusRequest> {
private final Snapshot snapshot;
private final ShardId shardId;
private final SnapshotsInProgress.ShardSnapshotStatus status;
public UpdateIndexShardSnapshotStatusRequest(StreamInput in) throws IOException {
super(in);
snapshot = new Snapshot(in);
shardId = new ShardId(in);
status = new SnapshotsInProgress.ShardSnapshotStatus(in);
}
public UpdateIndexShardSnapshotStatusRequest(Snapshot snapshot, ShardId shardId, SnapshotsInProgress.ShardSnapshotStatus status) {
this.snapshot = snapshot;
this.shardId = shardId;
this.status = status;
// By default, we keep trying to post snapshot status messages to avoid snapshot processes getting stuck.
this.masterNodeTimeout = TimeValue.timeValueNanos(Long.MAX_VALUE);
}
@Override
public ActionRequestValidationException validate() {
return null;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
snapshot.writeTo(out);
shardId.writeTo(out);
status.writeTo(out);
}
public Snapshot snapshot() {
return snapshot;
}
public ShardId shardId() {
return shardId;
}
public SnapshotsInProgress.ShardSnapshotStatus status() {
return status;
}
@Override
public String toString() {
return snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]";
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final UpdateIndexShardSnapshotStatusRequest that = (UpdateIndexShardSnapshotStatusRequest) o;
return snapshot.equals(that.snapshot) && shardId.equals(that.shardId) && status.equals(that.status);
}
@Override
public int hashCode() {
return Objects.hash(snapshot, shardId, status);
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.snapshots;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
class UpdateIndexShardSnapshotStatusResponse extends ActionResponse {
UpdateIndexShardSnapshotStatusResponse() {}
UpdateIndexShardSnapshotStatusResponse(StreamInput in) throws IOException {
super(in);
}
@Override
public void writeTo(StreamOutput out) throws IOException {}
}

View File

@ -28,7 +28,7 @@
* {@link org.elasticsearch.cluster.SnapshotsInProgress}. All nodes consume the state of the {@code SnapshotsInProgress} and will start or
* abort relevant shard snapshot tasks accordingly.</li>
* <li>Nodes that are executing shard snapshot tasks report either success or failure of their snapshot task by submitting a
* {@link org.elasticsearch.snapshots.SnapshotShardsService.UpdateIndexShardSnapshotStatusRequest} to the master node that will update the
* {@link org.elasticsearch.snapshots.UpdateIndexShardSnapshotStatusRequest} to the master node that will update the
* snapshot's entry in the cluster state accordingly.</li>
* </ul>
*

View File

@ -1419,8 +1419,9 @@ public class SnapshotResiliencyTests extends ESTestCase {
settings, clusterService, transportService,
Collections.singletonMap(FsRepository.TYPE, getRepoFactory(environment)), emptyMap(), threadPool
);
snapshotsService =
new SnapshotsService(settings, clusterService, indexNameExpressionResolver, repositoriesService, threadPool);
final ActionFilters actionFilters = new ActionFilters(emptySet());
snapshotsService = new SnapshotsService(settings, clusterService, indexNameExpressionResolver, repositoriesService,
transportService, actionFilters);
nodeEnv = new NodeEnvironment(settings, environment);
final NamedXContentRegistry namedXContentRegistry = new NamedXContentRegistry(Collections.emptyList());
final ScriptService scriptService = new ScriptService(settings, emptyMap(), emptyMap());
@ -1453,10 +1454,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
null
);
final RecoverySettings recoverySettings = new RecoverySettings(settings, clusterSettings);
final ActionFilters actionFilters = new ActionFilters(emptySet());
snapshotShardsService = new SnapshotShardsService(
settings, clusterService, repositoriesService, threadPool,
transportService, indicesService, actionFilters, indexNameExpressionResolver);
snapshotShardsService =
new SnapshotShardsService(settings, clusterService, repositoriesService, transportService, indicesService);
final ShardStateAction shardStateAction = new ShardStateAction(
clusterService, transportService, allocationService,
new BatchedRerouteService(clusterService, allocationService::reroute),

View File

@ -44,18 +44,18 @@ public class SnapshotShardsServiceTests extends ESTestCase {
public void testEqualsAndHashcodeUpdateIndexShardSnapshotStatusRequest() {
EqualsHashCodeTestUtils.checkEqualsAndHashCode(
new SnapshotShardsService.UpdateIndexShardSnapshotStatusRequest(
new UpdateIndexShardSnapshotStatusRequest(
new Snapshot(randomAlphaOfLength(10),
new SnapshotId(randomAlphaOfLength(10), UUIDs.randomBase64UUID(random()))),
new ShardId(randomAlphaOfLength(10), UUIDs.randomBase64UUID(random()), randomInt(5)),
new SnapshotsInProgress.ShardSnapshotStatus(randomAlphaOfLength(10), UUIDs.randomBase64UUID(random()))),
request ->
new SnapshotShardsService.UpdateIndexShardSnapshotStatusRequest(request.snapshot(), request.shardId(), request.status()),
new UpdateIndexShardSnapshotStatusRequest(request.snapshot(), request.shardId(), request.status()),
request -> {
final boolean mutateSnapshot = randomBoolean();
final boolean mutateShardId = randomBoolean();
final boolean mutateStatus = (mutateSnapshot || mutateShardId) == false || randomBoolean();
return new SnapshotShardsService.UpdateIndexShardSnapshotStatusRequest(
return new UpdateIndexShardSnapshotStatusRequest(
mutateSnapshot ? new Snapshot(randomAlphaOfLength(10),
new SnapshotId(randomAlphaOfLength(10), UUIDs.randomBase64UUID(random()))) : request.snapshot(),
mutateShardId ?