From 1be84c273b5aeb4ed95742cb1a6154bd7c5719c5 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Fri, 25 Jan 2013 15:22:29 +0100 Subject: [PATCH] eagerly reroute when a node leaves the cluster --- .../org/elasticsearch/discovery/Discovery.java | 7 +++++++ .../discovery/local/LocalDiscovery.java | 14 +++++++++++++- .../discovery/zen/ZenDiscovery.java | 17 +++++++++++++++-- .../node/internal/InternalNode.java | 5 +++++ 4 files changed, 40 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/elasticsearch/discovery/Discovery.java b/src/main/java/org/elasticsearch/discovery/Discovery.java index 373b7e79bea..c8597349f87 100644 --- a/src/main/java/org/elasticsearch/discovery/Discovery.java +++ b/src/main/java/org/elasticsearch/discovery/Discovery.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.internal.Nullable; import org.elasticsearch.node.service.NodeService; @@ -50,6 +51,12 @@ public interface Discovery extends LifecycleComponent { */ void setNodeService(@Nullable NodeService nodeService); + /** + * Another hack to solve dep injection problem..., note, this will be called before + * any start is called. + */ + void setAllocationService(AllocationService allocationService); + /** * Publish all the changes to the cluster from the master (can be called just by the master). The publish * process should not publish this state to the master as well! (the master is sending it...). diff --git a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java index 6d6cd1ad29a..5511ad213cf 100644 --- a/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java @@ -26,6 +26,8 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.internal.Nullable; @@ -58,6 +60,8 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem private final DiscoveryNodeService discoveryNodeService; + private AllocationService allocationService; + private final ClusterName clusterName; private DiscoveryNode localNode; @@ -88,6 +92,11 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem // nothing to do here } + @Override + public void setAllocationService(AllocationService allocationService) { + this.allocationService = allocationService; + } + @Override protected void doStart() throws ElasticSearchException { synchronized (clusterGroups) { @@ -209,7 +218,10 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem if (delta.added()) { logger.warn("No new nodes should be created when a new discovery view is accepted"); } - return newClusterStateBuilder().state(currentState).nodes(newNodes).build(); + // reroute here, so we eagerly remove dead nodes from the routing + ClusterState updatedState = newClusterStateBuilder().state(currentState).nodes(newNodes).build(); + RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).build()); + return newClusterStateBuilder().state(updatedState).routingResult(routingResult).build(); } }); } diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index f03946da3bf..32ef9ba70b5 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -31,6 +31,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.UUID; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.Lifecycle; @@ -78,6 +80,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen private final ClusterService clusterService; + private AllocationService allocationService; + private final ClusterName clusterName; private final DiscoveryNodeService discoveryNodeService; @@ -161,6 +165,11 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen this.nodeService = nodeService; } + @Override + public void setAllocationService(AllocationService allocationService) { + this.allocationService = allocationService; + } + @Override protected void doStart() throws ElasticSearchException { Map nodeAttributes = discoveryNodeService.buildAttributes(); @@ -370,7 +379,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) { return rejoin(currentState, "not enough master nodes"); } - return currentState; + // eagerly run reroute to remove dead nodes from routing table + RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(currentState).build()); + return newClusterStateBuilder().state(currentState).routingResult(routingResult).build(); } }); } else { @@ -399,7 +410,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) { return rejoin(currentState, "not enough master nodes"); } - return currentState; + // eagerly run reroute to remove dead nodes from routing table + RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(currentState).build()); + return newClusterStateBuilder().state(currentState).routingResult(routingResult).build(); } @Override diff --git a/src/main/java/org/elasticsearch/node/internal/InternalNode.java b/src/main/java/org/elasticsearch/node/internal/InternalNode.java index 70be6056b10..2339e0602d3 100644 --- a/src/main/java/org/elasticsearch/node/internal/InternalNode.java +++ b/src/main/java/org/elasticsearch/node/internal/InternalNode.java @@ -32,6 +32,7 @@ import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterNameModule; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.CacheRecycler; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.collect.Tuple; @@ -50,6 +51,7 @@ import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.common.util.concurrent.ThreadLocals; +import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.env.Environment; @@ -187,6 +189,9 @@ public final class InternalNode implements Node { ESLogger logger = Loggers.getLogger(Node.class, settings.get("name")); logger.info("{{}}[{}]: starting ...", Version.CURRENT, JvmInfo.jvmInfo().pid()); + // hack around dependency injection problem (for now...) + injector.getInstance(Discovery.class).setAllocationService(injector.getInstance(AllocationService.class)); + for (Class plugin : pluginsService.services()) { injector.getInstance(plugin).start(); }