Small refactoring to shorten the diff with the clone logic in #61839: * Since clones will create a different kind of shard state update that isn't the same request sent by the snapshot shards service (and cannot be the same request because we have no `ShardId`) base the shard state updates on a different class that can be extended to be general enough to accomodate shard clones as well. * Make the update executor a singleton (can't make it an inline lambda as that would break CS update batching because the executor is used as a map key but this change still makes it crystal clear that there's no internal state to the executor) * Make shard state update responses a singleton (can't use TransportResponse.Empty because we need an action response but still it makes it clear that there's no actual response with content here)
This commit is contained in:
parent
de6eeecbd3
commit
51d0ed1bf3
|
@ -450,8 +450,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
|||
SnapshotsService.UPDATE_SNAPSHOT_STATUS_ACTION_NAME, req,
|
||||
new TransportResponseHandler<UpdateIndexShardSnapshotStatusResponse>() {
|
||||
@Override
|
||||
public UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException {
|
||||
return new UpdateIndexShardSnapshotStatusResponse(in);
|
||||
public UpdateIndexShardSnapshotStatusResponse read(StreamInput in) {
|
||||
return UpdateIndexShardSnapshotStatusResponse.INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -100,6 +100,7 @@ import java.util.LinkedList;
|
|||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -165,7 +166,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
// Set of snapshots that are currently being ended by this node
|
||||
private final Set<Snapshot> endingSnapshots = Collections.synchronizedSet(new HashSet<>());
|
||||
|
||||
private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor();
|
||||
private final UpdateSnapshotStatusAction updateSnapshotStatusHandler;
|
||||
|
||||
private final TransportService transportService;
|
||||
|
@ -2452,101 +2452,130 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
return true;
|
||||
}
|
||||
|
||||
private static class SnapshotStateExecutor implements ClusterStateTaskExecutor<UpdateIndexShardSnapshotStatusRequest> {
|
||||
|
||||
@Override
|
||||
public ClusterTasksResult<UpdateIndexShardSnapshotStatusRequest>
|
||||
execute(ClusterState currentState, List<UpdateIndexShardSnapshotStatusRequest> tasks) {
|
||||
int changedCount = 0;
|
||||
int startedCount = 0;
|
||||
final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
|
||||
// Tasks to check for updates for running snapshots.
|
||||
final List<UpdateIndexShardSnapshotStatusRequest> unconsumedTasks = new ArrayList<>(tasks);
|
||||
// Tasks that were used to complete an existing in-progress shard snapshot
|
||||
final Set<UpdateIndexShardSnapshotStatusRequest> executedTasks = new HashSet<>();
|
||||
for (SnapshotsInProgress.Entry entry : currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
|
||||
if (entry.state().completed()) {
|
||||
entries.add(entry);
|
||||
private static final ClusterStateTaskExecutor<ShardSnapshotUpdate> SHARD_STATE_EXECUTOR = (currentState, tasks) -> {
|
||||
int changedCount = 0;
|
||||
int startedCount = 0;
|
||||
final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
|
||||
// Tasks to check for updates for running snapshots.
|
||||
final List<ShardSnapshotUpdate> unconsumedTasks = new ArrayList<>(tasks);
|
||||
// Tasks that were used to complete an existing in-progress shard snapshot
|
||||
final Set<ShardSnapshotUpdate> executedTasks = new HashSet<>();
|
||||
for (SnapshotsInProgress.Entry entry : currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
|
||||
if (entry.state().completed()) {
|
||||
entries.add(entry);
|
||||
continue;
|
||||
}
|
||||
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = null;
|
||||
for (Iterator<ShardSnapshotUpdate> iterator = unconsumedTasks.iterator(); iterator.hasNext(); ) {
|
||||
final ShardSnapshotUpdate updateSnapshotState = iterator.next();
|
||||
final Snapshot updatedSnapshot = updateSnapshotState.snapshot;
|
||||
final String updatedRepository = updatedSnapshot.getRepository();
|
||||
if (entry.repository().equals(updatedRepository) == false) {
|
||||
continue;
|
||||
}
|
||||
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = null;
|
||||
for (Iterator<UpdateIndexShardSnapshotStatusRequest> iterator = unconsumedTasks.iterator(); iterator.hasNext(); ) {
|
||||
final UpdateIndexShardSnapshotStatusRequest updateSnapshotState = iterator.next();
|
||||
final Snapshot updatedSnapshot = updateSnapshotState.snapshot();
|
||||
final String updatedRepository = updatedSnapshot.getRepository();
|
||||
if (entry.repository().equals(updatedRepository) == false) {
|
||||
final ShardId finishedShardId = updateSnapshotState.shardId;
|
||||
if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) {
|
||||
final ShardSnapshotStatus existing = entry.shards().get(finishedShardId);
|
||||
if (existing == null) {
|
||||
logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]",
|
||||
updateSnapshotState, entry);
|
||||
assert false : "This should never happen, data nodes should only send updates for expected shards";
|
||||
continue;
|
||||
}
|
||||
final ShardId finishedShardId = updateSnapshotState.shardId();
|
||||
if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) {
|
||||
final ShardSnapshotStatus existing = entry.shards().get(finishedShardId);
|
||||
if (existing == null) {
|
||||
logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]",
|
||||
updateSnapshotState, entry);
|
||||
assert false : "This should never happen, data nodes should only send updates for expected shards";
|
||||
continue;
|
||||
}
|
||||
if (existing.state().completed()) {
|
||||
// No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
|
||||
iterator.remove();
|
||||
continue;
|
||||
}
|
||||
logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot,
|
||||
finishedShardId, updateSnapshotState.status().state());
|
||||
if (shards == null) {
|
||||
shards = ImmutableOpenMap.builder(entry.shards());
|
||||
}
|
||||
shards.put(finishedShardId, updateSnapshotState.status());
|
||||
executedTasks.add(updateSnapshotState);
|
||||
changedCount++;
|
||||
} else if (executedTasks.contains(updateSnapshotState)) {
|
||||
// tasks that completed a shard might allow starting a new shard snapshot for the current snapshot
|
||||
final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId);
|
||||
if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
|
||||
continue;
|
||||
}
|
||||
if (shards == null) {
|
||||
shards = ImmutableOpenMap.builder(entry.shards());
|
||||
}
|
||||
final ShardSnapshotStatus finishedStatus = updateSnapshotState.status();
|
||||
logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId,
|
||||
finishedStatus.nodeId(), finishedStatus.generation());
|
||||
shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation()));
|
||||
if (existing.state().completed()) {
|
||||
// No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
|
||||
iterator.remove();
|
||||
startedCount++;
|
||||
continue;
|
||||
}
|
||||
logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot,
|
||||
finishedShardId, updateSnapshotState.updatedState.state());
|
||||
if (shards == null) {
|
||||
shards = ImmutableOpenMap.builder(entry.shards());
|
||||
}
|
||||
shards.put(finishedShardId, updateSnapshotState.updatedState);
|
||||
executedTasks.add(updateSnapshotState);
|
||||
changedCount++;
|
||||
} else if (executedTasks.contains(updateSnapshotState)) {
|
||||
// tasks that completed a shard might allow starting a new shard snapshot for the current snapshot
|
||||
final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId);
|
||||
if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
|
||||
continue;
|
||||
}
|
||||
if (shards == null) {
|
||||
shards = ImmutableOpenMap.builder(entry.shards());
|
||||
}
|
||||
final ShardSnapshotStatus finishedStatus = updateSnapshotState.updatedState;
|
||||
logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId,
|
||||
finishedStatus.nodeId(), finishedStatus.generation());
|
||||
shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation()));
|
||||
iterator.remove();
|
||||
startedCount++;
|
||||
}
|
||||
}
|
||||
|
||||
if (shards == null) {
|
||||
entries.add(entry);
|
||||
} else {
|
||||
entries.add(entry.withShardStates(shards.build()));
|
||||
}
|
||||
if (shards == null) {
|
||||
entries.add(entry);
|
||||
} else {
|
||||
entries.add(entry.withShardStates(shards.build()));
|
||||
}
|
||||
if (changedCount > 0) {
|
||||
logger.trace("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " +
|
||||
"[{}] shard snapshots", changedCount, startedCount);
|
||||
return ClusterTasksResult.<UpdateIndexShardSnapshotStatusRequest>builder().successes(tasks)
|
||||
.build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
|
||||
SnapshotsInProgress.of(entries)).build());
|
||||
}
|
||||
if (changedCount > 0) {
|
||||
logger.trace("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " +
|
||||
"[{}] shard snapshots", changedCount, startedCount);
|
||||
return ClusterStateTaskExecutor.ClusterTasksResult.<ShardSnapshotUpdate>builder().successes(tasks).build(
|
||||
ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build());
|
||||
}
|
||||
return ClusterStateTaskExecutor.ClusterTasksResult.<ShardSnapshotUpdate>builder().successes(tasks).build(currentState);
|
||||
};
|
||||
|
||||
/**
|
||||
* An update to the snapshot state of a shard.
|
||||
*/
|
||||
private static final class ShardSnapshotUpdate {
|
||||
|
||||
private final Snapshot snapshot;
|
||||
|
||||
private final ShardId shardId;
|
||||
|
||||
private final ShardSnapshotStatus updatedState;
|
||||
|
||||
private ShardSnapshotUpdate(Snapshot snapshot, ShardId shardId, ShardSnapshotStatus updatedState) {
|
||||
this.snapshot = snapshot;
|
||||
this.shardId = shardId;
|
||||
this.updatedState = updatedState;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other) {
|
||||
if (this == other) {
|
||||
return true;
|
||||
}
|
||||
return ClusterTasksResult.<UpdateIndexShardSnapshotStatusRequest>builder().successes(tasks).build(currentState);
|
||||
if ((other instanceof ShardSnapshotUpdate) == false) {
|
||||
return false;
|
||||
}
|
||||
final ShardSnapshotUpdate that = (ShardSnapshotUpdate) other;
|
||||
return this.snapshot.equals(that.snapshot) && this.shardId.equals(that.shardId) && this.updatedState == that.updatedState;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(snapshot, shardId, updatedState);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the shard status on master node
|
||||
* Updates the shard status in the cluster state
|
||||
*
|
||||
* @param request update shard status request
|
||||
* @param update shard snapshot status update
|
||||
*/
|
||||
private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request,
|
||||
ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) {
|
||||
logger.trace("received updated snapshot restore state [{}]", request);
|
||||
private void innerUpdateSnapshotState(ShardSnapshotUpdate update, ActionListener<Void> listener) {
|
||||
logger.trace("received updated snapshot restore state [{}]", update);
|
||||
clusterService.submitStateUpdateTask(
|
||||
"update snapshot state",
|
||||
request,
|
||||
update,
|
||||
ClusterStateTaskConfig.build(Priority.NORMAL),
|
||||
snapshotStateExecutor,
|
||||
SHARD_STATE_EXECUTOR,
|
||||
new ClusterStateTaskListener() {
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
|
@ -2556,13 +2585,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
try {
|
||||
listener.onResponse(new UpdateIndexShardSnapshotStatusResponse());
|
||||
listener.onResponse(null);
|
||||
} finally {
|
||||
// Maybe this state update completed the snapshot. If we are not already ending it because of a concurrent
|
||||
// state update we check if its state is completed and end it if it is.
|
||||
if (endingSnapshots.contains(request.snapshot()) == false) {
|
||||
if (endingSnapshots.contains(update.snapshot) == false) {
|
||||
final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE);
|
||||
final SnapshotsInProgress.Entry updatedEntry = snapshotsInProgress.snapshot(request.snapshot());
|
||||
final SnapshotsInProgress.Entry updatedEntry = snapshotsInProgress.snapshot(update.snapshot);
|
||||
// If the entry is still in the cluster state and is completed, try finalizing the snapshot in the repo
|
||||
if (updatedEntry != null && updatedEntry.state().completed()) {
|
||||
endSnapshot(updatedEntry, newState.metadata(), null);
|
||||
|
@ -2590,13 +2619,14 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
|
||||
@Override
|
||||
protected UpdateIndexShardSnapshotStatusResponse read(StreamInput in) throws IOException {
|
||||
return new UpdateIndexShardSnapshotStatusResponse(in);
|
||||
return UpdateIndexShardSnapshotStatusResponse.INSTANCE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(UpdateIndexShardSnapshotStatusRequest request, ClusterState state,
|
||||
ActionListener<UpdateIndexShardSnapshotStatusResponse> listener) throws Exception {
|
||||
innerUpdateSnapshotState(request, listener);
|
||||
innerUpdateSnapshotState(new ShardSnapshotUpdate(request.snapshot(), request.shardId(), request.status()),
|
||||
ActionListener.delegateFailure(listener, (l, v) -> l.onResponse(UpdateIndexShardSnapshotStatusResponse.INSTANCE)));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,18 +19,15 @@
|
|||
package org.elasticsearch.snapshots;
|
||||
|
||||
import org.elasticsearch.action.ActionResponse;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
class UpdateIndexShardSnapshotStatusResponse extends ActionResponse {
|
||||
|
||||
UpdateIndexShardSnapshotStatusResponse() {}
|
||||
public static final UpdateIndexShardSnapshotStatusResponse INSTANCE = new UpdateIndexShardSnapshotStatusResponse();
|
||||
|
||||
UpdateIndexShardSnapshotStatusResponse(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
}
|
||||
private UpdateIndexShardSnapshotStatusResponse() {}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {}
|
||||
|
|
Loading…
Reference in New Issue