diff --git a/.idea/projectCodeStyle.xml b/.idea/projectCodeStyle.xml
index adcd12690a6..de5584676bd 100644
--- a/.idea/projectCodeStyle.xml
+++ b/.idea/projectCodeStyle.xml
@@ -16,6 +16,16 @@
+
+
+
+
+
+
+
+
+
+
@@ -36,6 +46,16 @@
+
+
+
+
+
+
+
+
+
+
@@ -126,6 +146,11 @@
+
+
+
+
+
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterService.java
index 293930979c4..e2fff7d5872 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterService.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/ClusterService.java
@@ -54,6 +54,11 @@ public interface ClusterService extends LifecycleComponent {
*/
OperationRouting operationRouting();
+ /**
+ * Adds a priority listener for updated cluster states.
+ */
+ void addPriority(ClusterStateListener listener);
+
/**
* Adds a listener for updated cluster states.
*/
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java
index 691273197e3..1a7e234dec9 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java
@@ -39,6 +39,8 @@ import static org.elasticsearch.common.unit.TimeValue.*;
*/
public class RoutingService extends AbstractLifecycleComponent implements ClusterStateListener {
+ private static final String CLUSTER_UPDATE_TASK_SOURCE = "routing-table-updater";
+
private final ThreadPool threadPool;
private final ClusterService clusterService;
@@ -60,7 +62,7 @@ public class RoutingService extends AbstractLifecycleComponent i
}
@Override protected void doStart() throws ElasticSearchException {
- clusterService.add(this);
+ clusterService.addPriority(this);
}
@Override protected void doStop() throws ElasticSearchException {
@@ -75,7 +77,7 @@ public class RoutingService extends AbstractLifecycleComponent i
}
@Override public void clusterChanged(ClusterChangedEvent event) {
- if (event.source().equals(RoutingTableUpdater.CLUSTER_UPDATE_TASK_SOURCE)) {
+ if (event.source().equals(CLUSTER_UPDATE_TASK_SOURCE)) {
// that's us, ignore this event
return;
}
@@ -93,7 +95,7 @@ public class RoutingService extends AbstractLifecycleComponent i
// also, if the routing table changed, it means that we have new indices, or shard have started
// or failed, we want to apply this as fast as possible
routingTableDirty = true;
- threadPool.cached().execute(new RoutingTableUpdater());
+ reroute();
} else {
if (event.nodesAdded()) {
routingTableDirty = true;
@@ -107,32 +109,34 @@ public class RoutingService extends AbstractLifecycleComponent i
}
}
+ private void reroute() {
+ try {
+ if (!routingTableDirty) {
+ return;
+ }
+ if (lifecycle.stopped()) {
+ return;
+ }
+ clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, new ClusterStateUpdateTask() {
+ @Override public ClusterState execute(ClusterState currentState) {
+ RoutingAllocation.Result routingResult = shardsAllocation.reroute(currentState);
+ if (!routingResult.changed()) {
+ // no state changed
+ return currentState;
+ }
+ return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
+ }
+ });
+ routingTableDirty = false;
+ } catch (Exception e) {
+ logger.warn("Failed to reroute routing table", e);
+ }
+ }
+
private class RoutingTableUpdater implements Runnable {
- private static final String CLUSTER_UPDATE_TASK_SOURCE = "routing-table-updater";
-
@Override public void run() {
- try {
- if (!routingTableDirty) {
- return;
- }
- if (lifecycle.stopped()) {
- return;
- }
- clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, new ClusterStateUpdateTask() {
- @Override public ClusterState execute(ClusterState currentState) {
- RoutingAllocation.Result routingResult = shardsAllocation.reroute(currentState);
- if (!routingResult.changed()) {
- // no state changed
- return currentState;
- }
- return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
- }
- });
- routingTableDirty = false;
- } catch (Exception e) {
- logger.warn("Failed to reroute routing table", e);
- }
+ reroute();
}
}
}
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java
index f9328ab8f55..f10b0483234 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java
@@ -65,6 +65,7 @@ public class InternalClusterService extends AbstractLifecycleComponent priorityClusterStateListeners = new CopyOnWriteArrayList();
private final List clusterStateListeners = new CopyOnWriteArrayList();
private final Queue onGoingTimeouts = new LinkedTransferQueue();
@@ -127,13 +128,17 @@ public class InternalClusterService extends AbstractLifecycleComponent it = onGoingTimeouts.iterator(); it.hasNext();) {
+ for (Iterator it = onGoingTimeouts.iterator(); it.hasNext(); ) {
NotifyTimeout timeout = it.next();
if (timeout.listener.equals(listener)) {
timeout.cancel();
@@ -226,6 +231,9 @@ public class InternalClusterService extends AbstractLifecycleComponent