diff --git a/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index af070ceb6be..fe17d79d45b 100644 --- a/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -23,7 +23,6 @@ import org.elasticsearch.ElasticSearchException; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; @@ -45,7 +44,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder; import static org.elasticsearch.cluster.routing.ImmutableShardRouting.readShardRoutingEntry; @@ -61,7 +59,6 @@ public class ShardStateAction extends AbstractComponent { private final ThreadPool threadPool; private final BlockingQueue startedShardsQueue = ConcurrentCollections.newBlockingQueue(); - private final AtomicBoolean rerouteRequired = new AtomicBoolean(); @Inject public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService, @@ -145,7 +142,7 @@ public class ShardStateAction extends AbstractComponent { // process started events as fast as possible, to make shards available startedShardsQueue.add(shardRouting); - clusterService.submitStateUpdateTask("shard-started (" + shardRouting + "), reason [" + reason + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("shard-started (" + shardRouting + "), reason [" + reason + "]", Priority.HIGH, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { @@ -190,8 +187,8 @@ public class ShardStateAction extends AbstractComponent { if (logger.isDebugEnabled()) { logger.debug("applying started shards {}, reason [{}]", shards, reason); } - // we don't do reroute right away, we do it after publishing the fact that it was started - RoutingAllocation.Result routingResult = allocationService.applyStartedShards(currentState, shards, false); + + RoutingAllocation.Result routingResult = allocationService.applyStartedShards(currentState, shards, true); if (!routingResult.changed()) { return currentState; } @@ -202,30 +199,6 @@ public class ShardStateAction extends AbstractComponent { public void onFailure(String source, Throwable t) { logger.error("unexpected failure during [{}]", t, source); } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - rerouteRequired.set(true); - clusterService.submitStateUpdateTask("reroute post shard-started (" + shardRouting + "), reason [" + reason + "]", Priority.HIGH, new ClusterStateUpdateTask() { - @Override - public ClusterState execute(ClusterState currentState) { - if (rerouteRequired.compareAndSet(true, false)) { - RoutingAllocation.Result routingResult = allocationService.reroute(currentState); - if (!routingResult.changed()) { - return currentState; - } - return newClusterStateBuilder().state(currentState).routingResult(routingResult).build(); - } else { - return currentState; - } - } - - @Override - public void onFailure(String source, Throwable t) { - logger.error("unexpected failure during [{}]", t, source); - } - }); - } }); }