Reroute eagerly on shard started events
We have an optimization where we try to delay reroute after we processed the shard started events to try and combine a few into the same event. With teh queueing of shard started events in places, we don't need to do it, and we can reroute right away, which will actually reduce the amount of cluster state events we send. This will also have a nice side effect of not missing on "waitForRelocatingShards(0)" on cluster health checks since relocations will happen right away. closes #3417
This commit is contained in:
parent
2f8a397aa5
commit
e3480a1c0a
|
@ -23,7 +23,6 @@ import org.elasticsearch.ElasticSearchException;
|
||||||
import org.elasticsearch.cluster.ClusterService;
|
import org.elasticsearch.cluster.ClusterService;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
import org.elasticsearch.cluster.ClusterStateUpdateTask;
|
||||||
import org.elasticsearch.cluster.ProcessedClusterStateUpdateTask;
|
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||||
|
@ -45,7 +44,6 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
|
|
||||||
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
|
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
|
||||||
import static org.elasticsearch.cluster.routing.ImmutableShardRouting.readShardRoutingEntry;
|
import static org.elasticsearch.cluster.routing.ImmutableShardRouting.readShardRoutingEntry;
|
||||||
|
@ -61,7 +59,6 @@ public class ShardStateAction extends AbstractComponent {
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
|
|
||||||
private final BlockingQueue<ShardRouting> startedShardsQueue = ConcurrentCollections.newBlockingQueue();
|
private final BlockingQueue<ShardRouting> startedShardsQueue = ConcurrentCollections.newBlockingQueue();
|
||||||
private final AtomicBoolean rerouteRequired = new AtomicBoolean();
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
|
public ShardStateAction(Settings settings, ClusterService clusterService, TransportService transportService,
|
||||||
|
@ -145,7 +142,7 @@ public class ShardStateAction extends AbstractComponent {
|
||||||
// process started events as fast as possible, to make shards available
|
// process started events as fast as possible, to make shards available
|
||||||
startedShardsQueue.add(shardRouting);
|
startedShardsQueue.add(shardRouting);
|
||||||
|
|
||||||
clusterService.submitStateUpdateTask("shard-started (" + shardRouting + "), reason [" + reason + "]", Priority.HIGH, new ProcessedClusterStateUpdateTask() {
|
clusterService.submitStateUpdateTask("shard-started (" + shardRouting + "), reason [" + reason + "]", Priority.HIGH, new ClusterStateUpdateTask() {
|
||||||
@Override
|
@Override
|
||||||
public ClusterState execute(ClusterState currentState) {
|
public ClusterState execute(ClusterState currentState) {
|
||||||
|
|
||||||
|
@ -190,8 +187,8 @@ public class ShardStateAction extends AbstractComponent {
|
||||||
if (logger.isDebugEnabled()) {
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("applying started shards {}, reason [{}]", shards, reason);
|
logger.debug("applying started shards {}, reason [{}]", shards, reason);
|
||||||
}
|
}
|
||||||
// we don't do reroute right away, we do it after publishing the fact that it was started
|
|
||||||
RoutingAllocation.Result routingResult = allocationService.applyStartedShards(currentState, shards, false);
|
RoutingAllocation.Result routingResult = allocationService.applyStartedShards(currentState, shards, true);
|
||||||
if (!routingResult.changed()) {
|
if (!routingResult.changed()) {
|
||||||
return currentState;
|
return currentState;
|
||||||
}
|
}
|
||||||
|
@ -202,30 +199,6 @@ public class ShardStateAction extends AbstractComponent {
|
||||||
public void onFailure(String source, Throwable t) {
|
public void onFailure(String source, Throwable t) {
|
||||||
logger.error("unexpected failure during [{}]", t, source);
|
logger.error("unexpected failure during [{}]", t, source);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
|
||||||
rerouteRequired.set(true);
|
|
||||||
clusterService.submitStateUpdateTask("reroute post shard-started (" + shardRouting + "), reason [" + reason + "]", Priority.HIGH, new ClusterStateUpdateTask() {
|
|
||||||
@Override
|
|
||||||
public ClusterState execute(ClusterState currentState) {
|
|
||||||
if (rerouteRequired.compareAndSet(true, false)) {
|
|
||||||
RoutingAllocation.Result routingResult = allocationService.reroute(currentState);
|
|
||||||
if (!routingResult.changed()) {
|
|
||||||
return currentState;
|
|
||||||
}
|
|
||||||
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
|
|
||||||
} else {
|
|
||||||
return currentState;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(String source, Throwable t) {
|
|
||||||
logger.error("unexpected failure during [{}]", t, source);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue