diff --git a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index d6598eb3a12..6b42d178694 100644 --- a/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/core/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -27,13 +27,15 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateApplier; -import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.ClusterStateTaskConfig; +import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; import org.elasticsearch.cluster.SnapshotsInProgress.State; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.Priority; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; @@ -43,7 +45,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.engine.SnapshotFailedEngineException; import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexShard; @@ -68,7 +69,6 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; @@ -105,8 +105,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements private volatile Map shardSnapshots = emptyMap(); - private final BlockingQueue updatedSnapshotStateQueue = ConcurrentCollections.newBlockingQueue(); - + private final SnapshotStateExecutor snapshotStateExecutor = new SnapshotStateExecutor(); @Inject public SnapshotShardsService(Settings settings, ClusterService clusterService, SnapshotsService snapshotsService, ThreadPool threadPool, @@ -458,8 +457,6 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements private ShardId shardId; private ShardSnapshotStatus status; - private volatile boolean processed; // state field, no need to serialize - public UpdateIndexShardSnapshotStatusRequest() { } @@ -502,14 +499,6 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements public String toString() { return "" + snapshot + ", shardId [" + shardId + "], status [" + status.state() + "]"; } - - public void markAsProcessed() { - processed = true; - } - - public boolean isProcessed() { - return processed; - } } /** @@ -531,83 +520,65 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements */ private void innerUpdateSnapshotState(final UpdateIndexShardSnapshotStatusRequest request) { logger.trace("received updated snapshot restore state [{}]", request); - updatedSnapshotStateQueue.add(request); + clusterService.submitStateUpdateTask( + "update snapshot state", + request, + ClusterStateTaskConfig.build(Priority.NORMAL), + snapshotStateExecutor, + (source, e) -> logger.warn((Supplier) () -> new ParameterizedMessage("[{}][{}] failed to update snapshot status to [{}]", + request.snapshot(), request.shardId(), request.status()), e)); + } - clusterService.submitStateUpdateTask("update snapshot state", new ClusterStateUpdateTask() { - private final List drainedRequests = new ArrayList<>(); + class SnapshotStateExecutor implements ClusterStateTaskExecutor { - @Override - public ClusterState execute(ClusterState currentState) { - // The request was already processed as a part of an early batch - skipping - if (request.isProcessed()) { - return currentState; - } + @Override + public ClusterTasksResult execute(ClusterState currentState, List tasks) throws Exception { + final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + if (snapshots != null) { + int changedCount = 0; + final List entries = new ArrayList<>(); + for (SnapshotsInProgress.Entry entry : snapshots.entries()) { + ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); + boolean updated = false; - updatedSnapshotStateQueue.drainTo(drainedRequests); - - final int batchSize = drainedRequests.size(); - - // nothing to process (a previous event has processed it already) - if (batchSize == 0) { - return currentState; - } - - final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - if (snapshots != null) { - int changedCount = 0; - final List entries = new ArrayList<>(); - for (SnapshotsInProgress.Entry entry : snapshots.entries()) { - ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); - boolean updated = false; - - for (int i = 0; i < batchSize; i++) { - final UpdateIndexShardSnapshotStatusRequest updateSnapshotState = drainedRequests.get(i); - updateSnapshotState.markAsProcessed(); - - if (entry.snapshot().equals(updateSnapshotState.snapshot())) { - logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(), updateSnapshotState.shardId(), updateSnapshotState.status().state()); - if (updated == false) { - shards.putAll(entry.shards()); - updated = true; - } - shards.put(updateSnapshotState.shardId(), updateSnapshotState.status()); - changedCount++; + for (UpdateIndexShardSnapshotStatusRequest updateSnapshotState : tasks) { + if (entry.snapshot().equals(updateSnapshotState.snapshot())) { + logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(), updateSnapshotState.shardId(), updateSnapshotState.status().state()); + if (updated == false) { + shards.putAll(entry.shards()); + updated = true; } + shards.put(updateSnapshotState.shardId(), updateSnapshotState.status()); + changedCount++; } + } - if (updated) { - if (completed(shards.values()) == false) { - entries.add(new SnapshotsInProgress.Entry(entry, shards.build())); - } else { - // Snapshot is finished - mark it as done - // TODO: Add PARTIAL_SUCCESS status? - SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, State.SUCCESS, shards.build()); - entries.add(updatedEntry); - // Finalize snapshot in the repository - snapshotsService.endSnapshot(updatedEntry); - logger.info("snapshot [{}] is done", updatedEntry.snapshot()); - } + if (updated) { + if (completed(shards.values()) == false) { + entries.add(new SnapshotsInProgress.Entry(entry, shards.build())); } else { - entries.add(entry); + // Snapshot is finished - mark it as done + // TODO: Add PARTIAL_SUCCESS status? + SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, State.SUCCESS, shards.build()); + entries.add(updatedEntry); + // Finalize snapshot in the repository + snapshotsService.endSnapshot(updatedEntry); + logger.info("snapshot [{}] is done", updatedEntry.snapshot()); } - } - if (changedCount > 0) { - logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount); - - final SnapshotsInProgress updatedSnapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()])); - return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, updatedSnapshots).build(); + } else { + entries.add(entry); } } - return currentState; - } + if (changedCount > 0) { + logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount); - @Override - public void onFailure(String source, Exception e) { - for (UpdateIndexShardSnapshotStatusRequest request : drainedRequests) { - logger.warn((Supplier) () -> new ParameterizedMessage("[{}][{}] failed to update snapshot status to [{}]", request.snapshot(), request.shardId(), request.status()), e); + final SnapshotsInProgress updatedSnapshots = new SnapshotsInProgress(entries.toArray(new SnapshotsInProgress.Entry[entries.size()])); + return ClusterTasksResult.builder().successes(tasks).build( + ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, updatedSnapshots).build()); } } - }); + return ClusterTasksResult.builder().successes(tasks).build(currentState); + } } /**