From 99eac0e7d78d54340dbb2e89432a9d50b3b41547 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Wed, 25 Nov 2015 11:31:43 -0500 Subject: [PATCH 1/3] Use general cluster state batching mechanism for shard started This commit modifies the handling of shard started cluster state updates to use the general cluster state batching mechanism. An advantage of this approach is we now get correct per-listener notification on failures. --- .../action/shard/ShardStateAction.java | 86 ++++++++----------- .../routing/allocation/AllocationService.java | 16 ++++ 2 files changed, 50 insertions(+), 52 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index a01f601ba12..b4048407841 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -36,14 +36,12 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; -import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.*; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.BlockingQueue; import static org.elasticsearch.cluster.routing.ShardRouting.readShardRoutingEntry; @@ -60,8 +58,6 @@ public class ShardStateAction extends AbstractComponent { private final AllocationService allocationService; private final RoutingService routingService; - private final BlockingQueue startedShardsQueue = ConcurrentCollections.newBlockingQueue(); - @Inject public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService, AllocationService allocationService, RoutingService routingService) { @@ -185,60 +181,46 @@ public class ShardStateAction extends AbstractComponent { } } + private final ShardStartedClusterStateHandler shardStartedClusterStateHandler = + new ShardStartedClusterStateHandler(); + private void shardStartedOnMaster(final ShardRoutingEntry shardRoutingEntry) { logger.debug("received shard started for {}", shardRoutingEntry); - // buffer shard started requests, and the state update tasks will simply drain it - // this is to optimize the number of "started" events we generate, and batch them - // possibly, we can do time based batching as well, but usually, we would want to - // process started events as fast as possible, to make shards available - startedShardsQueue.add(shardRoutingEntry); - clusterService.submitStateUpdateTask("shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]", - new ClusterStateUpdateTask() { - @Override - public Priority priority() { - return Priority.URGENT; + clusterService.submitStateUpdateTask( + "shard-started (" + shardRoutingEntry.shardRouting + "), reason [" + shardRoutingEntry.message + "]", + shardRoutingEntry, + ClusterStateTaskConfig.build(Priority.URGENT), + shardStartedClusterStateHandler, + shardStartedClusterStateHandler); + } + + class ShardStartedClusterStateHandler implements ClusterStateTaskExecutor, ClusterStateTaskListener { + @Override + public BatchResult execute(ClusterState currentState, List tasks) throws Exception { + BatchResult.Builder builder = BatchResult.builder(); + ClusterState accumulator = ClusterState.builder(currentState).build(); + for (ShardRoutingEntry task : tasks) { + task.processed = true; + try { + RoutingAllocation.Result result = + allocationService.applyStartedShard(currentState, task.shardRouting, true); + builder.success(task); + if (result.changed()) { + accumulator = ClusterState.builder(accumulator).routingResult(result).build(); } + } catch (Throwable t) { + builder.failure(task, t); + } + } - @Override - public ClusterState execute(ClusterState currentState) { + return builder.build(accumulator); + } - if (shardRoutingEntry.processed) { - return currentState; - } - - List shardRoutingEntries = new ArrayList<>(); - startedShardsQueue.drainTo(shardRoutingEntries); - - // nothing to process (a previous event has processed it already) - if (shardRoutingEntries.isEmpty()) { - return currentState; - } - - List shardRoutingToBeApplied = new ArrayList<>(shardRoutingEntries.size()); - - // mark all entries as processed - for (ShardRoutingEntry entry : shardRoutingEntries) { - entry.processed = true; - shardRoutingToBeApplied.add(entry.shardRouting); - } - - if (shardRoutingToBeApplied.isEmpty()) { - return currentState; - } - - RoutingAllocation.Result routingResult = allocationService.applyStartedShards(currentState, shardRoutingToBeApplied, true); - if (!routingResult.changed()) { - return currentState; - } - return ClusterState.builder(currentState).routingResult(routingResult).build(); - } - - @Override - public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); - } - }); + @Override + public void onFailure(String source, Throwable t) { + logger.error("unexpected failure during [{}]", t, source); + } } private class ShardFailedTransportHandler implements TransportRequestHandler { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index f819d6fde0a..af17c839582 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -63,6 +63,22 @@ public class AllocationService extends AbstractComponent { this.clusterInfoService = clusterInfoService; } + /** + * Applies the started shard. Note, shards can be called several + * times within this method. If the same instance of the routing + * table is returned, then no change has been made. + * @param clusterState the cluster state + * @param startedShard the shard to start + * @param withReroute whether or not to reroute the resulting allocation + * @return the resulting routing table + */ + public RoutingAllocation.Result applyStartedShard( + ClusterState clusterState, + ShardRouting startedShard, + boolean withReroute) { + return applyStartedShards(clusterState, Collections.singletonList(startedShard), withReroute); + } + /** * Applies the started shards. Note, shards can be called several times within this method. *

From 928d53a884b5bfc6248b6db97198da25666091d5 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Tue, 1 Dec 2015 11:36:29 -0500 Subject: [PATCH 2/3] Apply shard starts in a single batch --- .../action/shard/ShardStateAction.java | 24 ++++++++++--------- .../routing/allocation/AllocationService.java | 16 ------------- 2 files changed, 13 insertions(+), 27 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index b4048407841..a74b2ca5ed8 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -199,22 +199,24 @@ public class ShardStateAction extends AbstractComponent { @Override public BatchResult execute(ClusterState currentState, List tasks) throws Exception { BatchResult.Builder builder = BatchResult.builder(); - ClusterState accumulator = ClusterState.builder(currentState).build(); + List shardRoutingsToBeApplied = new ArrayList<>(tasks.size()); for (ShardRoutingEntry task : tasks) { task.processed = true; - try { - RoutingAllocation.Result result = - allocationService.applyStartedShard(currentState, task.shardRouting, true); - builder.success(task); - if (result.changed()) { - accumulator = ClusterState.builder(accumulator).routingResult(result).build(); - } - } catch (Throwable t) { - builder.failure(task, t); + shardRoutingsToBeApplied.add(task.shardRouting); + } + ClusterState maybeUpdatedState = currentState; + try { + RoutingAllocation.Result result = + allocationService.applyStartedShards(currentState, shardRoutingsToBeApplied, true); + if (result.changed()) { + maybeUpdatedState = ClusterState.builder(currentState).routingResult(result).build(); } + builder.successes(tasks); + } catch (Throwable t) { + builder.failures(tasks, t); } - return builder.build(accumulator); + return builder.build(maybeUpdatedState); } @Override diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index af17c839582..f819d6fde0a 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -63,22 +63,6 @@ public class AllocationService extends AbstractComponent { this.clusterInfoService = clusterInfoService; } - /** - * Applies the started shard. Note, shards can be called several - * times within this method. If the same instance of the routing - * table is returned, then no change has been made. - * @param clusterState the cluster state - * @param startedShard the shard to start - * @param withReroute whether or not to reroute the resulting allocation - * @return the resulting routing table - */ - public RoutingAllocation.Result applyStartedShard( - ClusterState clusterState, - ShardRouting startedShard, - boolean withReroute) { - return applyStartedShards(clusterState, Collections.singletonList(startedShard), withReroute); - } - /** * Applies the started shards. Note, shards can be called several times within this method. *

From b58d82f66c8f8f6490268122fd3e8dbf3c6b08ae Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Thu, 3 Dec 2015 14:08:35 -0500 Subject: [PATCH 3/3] Remove obsolete flag in ShardStateAction$ShardRoutingEntry --- .../elasticsearch/cluster/action/shard/ShardStateAction.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index a74b2ca5ed8..d09df094a68 100644 --- a/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/core/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -151,7 +151,6 @@ public class ShardStateAction extends AbstractComponent { BatchResult.Builder batchResultBuilder = BatchResult.builder(); List shardRoutingsToBeApplied = new ArrayList<>(tasks.size()); for (ShardRoutingEntry task : tasks) { - task.processed = true; shardRoutingsToBeApplied.add(new FailedRerouteAllocation.FailedShard(task.shardRouting, task.message, task.failure)); } ClusterState maybeUpdatedState = currentState; @@ -201,7 +200,6 @@ public class ShardStateAction extends AbstractComponent { BatchResult.Builder builder = BatchResult.builder(); List shardRoutingsToBeApplied = new ArrayList<>(tasks.size()); for (ShardRoutingEntry task : tasks) { - task.processed = true; shardRoutingsToBeApplied.add(task.shardRouting); } ClusterState maybeUpdatedState = currentState; @@ -250,8 +248,6 @@ public class ShardStateAction extends AbstractComponent { String message; Throwable failure; - volatile boolean processed; // state field, no need to serialize - public ShardRoutingEntry() { }