From 99eac0e7d78d54340dbb2e89432a9d50b3b41547 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 25 Nov 2015 11:31:43 -0500 Subject: [PATCH] Use general cluster state batching mechanism for shard started This commit modifies the handling of shard started cluster state updates to use the general cluster state batching mechanism. An advantage of this approach is we now get correct per-listener notification on failures. --- .../action/shard/ShardStateAction.java | 86 ++++++++----------- .../routing/allocation/AllocationService.java | 16 ++++ 2 files changed, 50 insertions(+), 52 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index a01f601ba12..b4048407841 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -36,14 +36,12 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.BlockingQueue; import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry; @@ -60,8 +58,6 @@ public class ShardStateAction extends AbstractComponent { private final AllocationService allocationService; private final RoutingService routingService; - private final BlockingQueue startedShardsQueue = ConcurrentCollections.newBlockingQueue(); - @Inject public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService, AllocationService allocationService, RoutingService routingService) { @@ -185,60 +181,46 @@ public class ShardStateAction extends AbstractComponent { } } + private final ShardStartedClusterStateHandler shardStartedClusterStateHandler = + new ShardStartedClusterStateHandler(); + private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) { logger.debug("received shard started for {}", shardRoutingEntry); - // buffer shard started requests, and the state update tasks will simply drain it - // this is to optimize the number of "started" events we generate, and batch them - // possibly, we can do time based batching as well, but usually, we would want to - // process started events as fast as possible, to make shards available - startedShardsQueue.add(shardRoutingEntry); - clusterService.submitStateUpdateTask("shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]", - new ClusterStateUpdateTask() { - @Override - public Priority priority() { - return Priority.URGENT; + clusterService.submitStateUpdateTask( + "shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]", + shardRoutingEntry, + ClusterStateTaskConfig.build(Priority.URGENT), + shardStartedClusterStateHandler, + shardStartedClusterStateHandler); + } + + class ShardStartedClusterStateHandler implements ClusterStateTaskExecutor, ClusterStateTaskListener { + @Override + public BatchResult execute(ClusterState currentState, List tasks) throws Exception { + BatchResult.Builder builder = BatchResult.builder(); + ClusterState accumulator = ClusterState.builder(currentState).build(); + for (ShardRoutingEntry task : tasks) { + task.processed = true; + try { + RoutingAllocation.Result result = + allocationService.applyStartedShard(currentState, task.shardRouting, true); + builder.success(task); + if (result.changed()) { + accumulator = ClusterState.builder(accumulator).routingResult(result).build(); } + } catch (Throwable t) { + builder.failure(task, t); + } + } - @Override - public ClusterState execute(ClusterState currentState) { + return builder.build(accumulator); + } - if (shardRoutingEntry.processed) { - return currentState; - } - - List shardRoutingEntries = new ArrayList<>(); - startedShardsQueue.drainTo(shardRoutingEntries); - - // nothing to process (a previous event has processed it already) - if (shardRoutingEntries.isEmpty()) { - return currentState; - } - - List shardRoutingToBeApplied = new ArrayList<>(shardRoutingEntries.size()); - - // mark all entries as processed - for (ShardRoutingEntry entry : shardRoutingEntries) { - entry.processed = true; - shardRoutingToBeApplied.add(entry.shardRouting); - } - - if (shardRoutingToBeApplied.isEmpty()) { - return currentState; - } - - RoutingAllocation.Result routingResult = allocationService.applyStartedShards(currentState, shardRoutingToBeApplied, true); - if (!routingResult.changed()) { - return currentState; - } - return ClusterState.builder(currentState).routingResult(routingResult).build(); - } - - @Override - public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); - } - }); + @Override + public void onFailure(String source, Throwable t) { + logger.error("unexpected failure during [{}]", t, source); + } } private class ShardFailedTransportHandler implements TransportRequestHandler { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index f819d6fde0a..af17c839582 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -63,6 +63,22 @@ public class AllocationService extends AbstractComponent { this.clusterInfoService = clusterInfoService; } + /** + * Applies the started shard. Note, shards can be called several + * times within this method. If the same instance of the routing + * table is returned, then no change has been made. + * @param clusterState the cluster state + * @param startedShard the shard to start + * @param withReroute whether or not to reroute the resulting allocation + * @return the resulting routing table + */ + public RoutingAllocation.Result applyStartedShard( + ClusterState clusterState, + ShardRouting startedShard, + boolean withReroute) { + return applyStartedShards(clusterState, Collections.singletonList(startedShard), withReroute); + } + /** * Applies the started shards. Note, shards can be called several times within this method. *