diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index 78e7e15d389..d6ec9048688 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -72,10 +72,6 @@ public class RoutingService extends AbstractLifecycleComponent { protected void doClose() { } - public AllocationService getAllocationService() { - return this.allocationService; - } - /** * Initiates a reroute. */ diff --git a/core/src/main/java/org/elasticsearch/discovery/Discovery.java b/core/src/main/java/org/elasticsearch/discovery/Discovery.java index 778e2d15053..dabd45e36dd 100644 --- a/core/src/main/java/org/elasticsearch/discovery/Discovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/Discovery.java @@ -22,7 +22,7 @@ package org.elasticsearch.discovery; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; @@ -44,7 +44,7 @@ public interface Discovery extends LifecycleComponent { * Another hack to solve dep injection problem..., note, this will be called before * any start is called. */ - void setRoutingService(RoutingService routingService); + void setAllocationService(AllocationService allocationService); /** * Publish all the changes to the cluster from the master (can be called just by the master). The publish diff --git a/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java index 696eb8812c9..ec9158c5932 100644 --- a/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java @@ -28,7 +28,7 @@ import org.elasticsearch.cluster.IncompatibleClusterStateVersionException; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -61,7 +61,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0]; private final ClusterService clusterService; - private RoutingService routingService; + private AllocationService allocationService; private final ClusterName clusterName; private final DiscoverySettings discoverySettings; @@ -83,8 +83,8 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem } @Override - public void setRoutingService(RoutingService routingService) { - this.routingService = routingService; + public void setAllocationService(AllocationService allocationService) { + this.allocationService = allocationService; } @Override @@ -156,7 +156,12 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem nodesBuilder.put(discovery.localNode()); } nodesBuilder.localNodeId(master.localNode().getId()).masterNodeId(master.localNode().getId()); - return ClusterState.builder(currentState).nodes(nodesBuilder).build(); + currentState = ClusterState.builder(currentState).nodes(nodesBuilder).build(); + RoutingAllocation.Result result = master.allocationService.reroute(currentState, "node_add"); + if (result.changed()) { + currentState = ClusterState.builder(currentState).routingResult(result).build(); + } + return currentState; } @Override @@ -164,13 +169,6 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem logger.error("unexpected failure during [{}]", t, source); } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - // we reroute not in the same cluster state update since in certain areas we rely on - // the node to be in the cluster state (sampled from ClusterService#state) to be there, also - // shard transitions need to better be handled in such cases - master.routingService.reroute("post_node_add"); - } }); } } // else, no master node, the next node that will start will fill things in... @@ -226,7 +224,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem } // reroute here, so we eagerly remove dead nodes from the routing ClusterState updatedState = ClusterState.builder(currentState).nodes(newNodes).build(); - RoutingAllocation.Result routingResult = master.routingService.getAllocationService().reroute( + RoutingAllocation.Result routingResult = master.allocationService.reroute( ClusterState.builder(updatedState).build(), "elected as master"); return ClusterState.builder(updatedState).routingResult(routingResult).build(); } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java index 9659434e03d..49815cdb703 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java @@ -29,7 +29,7 @@ import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; @@ -58,7 +58,7 @@ import java.util.concurrent.atomic.AtomicBoolean; public class NodeJoinController extends AbstractComponent { private final ClusterService clusterService; - private final RoutingService routingService; + private final AllocationService allocationService; private final ElectMasterService electMaster; private final DiscoverySettings discoverySettings; private final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(); @@ -68,10 +68,11 @@ public class NodeJoinController extends AbstractComponent { private ElectionContext electionContext = null; - public NodeJoinController(ClusterService clusterService, RoutingService routingService, ElectMasterService electMaster, DiscoverySettings discoverySettings, Settings settings) { + public NodeJoinController(ClusterService clusterService, AllocationService allocationService, ElectMasterService electMaster, + DiscoverySettings discoverySettings, Settings settings) { super(settings); this.clusterService = clusterService; - this.routingService = routingService; + this.allocationService = allocationService; this.electMaster = electMaster; this.discoverySettings = discoverySettings; } @@ -406,21 +407,7 @@ public class NodeJoinController extends AbstractComponent { ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()) .removeGlobalBlock(discoverySettings.getNoMasterBlock()).build(); newState.blocks(clusterBlocks); - newState.nodes(nodesBuilder); nodesChanged = true; - - // reroute now to remove any dead nodes (master may have stepped down when they left and didn't update the routing table) - // Note: also do it now to avoid assigning shards to these nodes. We will have another reroute after the cluster - // state is published. - // TODO: this publishing of a cluster state with no nodes assigned to joining nodes shouldn't be needed anymore. remove. - - final ClusterState tmpState = newState.build(); - RoutingAllocation.Result result = routingService.getAllocationService().reroute(tmpState, "nodes joined"); - newState = ClusterState.builder(tmpState); - if (result.changed()) { - newState.routingResult(result); - } - nodesBuilder = DiscoveryNodes.builder(tmpState.nodes()); } if (nodesBuilder.isLocalNodeElectedMaster() == false) { @@ -439,7 +426,8 @@ public class NodeJoinController extends AbstractComponent { for (DiscoveryNode existingNode : currentNodes) { if (node.getAddress().equals(existingNode.getAddress())) { nodesBuilder.remove(existingNode.getId()); - logger.warn("received join request from node [{}], but found existing node {} with same address, removing existing node", node, existingNode); + logger.warn("received join request from node [{}], but found existing node {} with same address, " + + "removing existing node", node, existingNode); } } } @@ -448,6 +436,12 @@ public class NodeJoinController extends AbstractComponent { if (nodesChanged) { newState.nodes(nodesBuilder); + final ClusterState tmpState = newState.build(); + RoutingAllocation.Result result = allocationService.reroute(tmpState, "node_join"); + newState = ClusterState.builder(tmpState); + if (result.changed()) { + newState.routingResult(result); + } } // we must return a new cluster state instance to force publishing. This is important @@ -463,13 +457,6 @@ public class NodeJoinController extends AbstractComponent { @Override public void clusterStatePublished(ClusterChangedEvent event) { - if (event.nodesDelta().hasChanges()) { - // we reroute not in the same cluster state update since in certain areas we rely on - // the node to be in the cluster state (sampled from ClusterService#state) to be there, also - // shard transitions need to better be handled in such cases - routingService.reroute("post_node_add"); - } - NodeJoinController.this.electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state()); } } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 54ef9928585..6a9a64ff411 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -32,7 +32,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; @@ -113,7 +113,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen private final TransportService transportService; private final ClusterService clusterService; - private RoutingService routingService; + private AllocationService allocationService; private final ClusterName clusterName; private final DiscoverySettings discoverySettings; private final ZenPingService pingService; @@ -146,7 +146,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen /** counts the time this node has joined the cluster or have elected it self as master */ private final AtomicLong clusterJoinsCounter = new AtomicLong(); - // must initialized in doStart(), when we have the routingService set + // must initialized in doStart(), when we have the allocationService set private volatile NodeJoinController nodeJoinController; @Inject @@ -206,8 +206,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } @Override - public void setRoutingService(RoutingService routingService) { - this.routingService = routingService; + public void setAllocationService(AllocationService allocationService) { + this.allocationService = allocationService; } @Override @@ -215,7 +215,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen nodesFD.setLocalNode(clusterService.localNode()); joinThreadControl.start(); pingService.start(); - this.nodeJoinController = new NodeJoinController(clusterService, routingService, electMaster, discoverySettings, settings); + this.nodeJoinController = new NodeJoinController(clusterService, allocationService, electMaster, discoverySettings, settings); } @Override @@ -516,8 +516,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen return rejoin(currentState, "not enough master nodes"); } // eagerly run reroute to remove dead nodes from routing table - RoutingAllocation.Result routingResult = routingService.getAllocationService().reroute( - ClusterState.builder(currentState).build(), + RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(currentState).build(), "[" + node + "] left"); return ClusterState.builder(currentState).routingResult(routingResult).build(); } @@ -561,7 +560,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen return rejoin(currentState, "not enough master nodes"); } // eagerly run reroute to remove dead nodes from routing table - RoutingAllocation.Result routingResult = routingService.getAllocationService().reroute( + RoutingAllocation.Result routingResult = allocationService.reroute( ClusterState.builder(currentState).build(), "[" + node + "] failed"); return ClusterState.builder(currentState).routingResult(routingResult).build(); diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index f34e16db140..043526306e9 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -37,6 +37,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.component.Lifecycle; @@ -321,7 +322,7 @@ public class Node implements Closeable { ESLogger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings)); logger.info("starting ..."); // hack around dependency injection problem (for now...) - injector.getInstance(Discovery.class).setRoutingService(injector.getInstance(RoutingService.class)); + injector.getInstance(Discovery.class).setAllocationService(injector.getInstance(AllocationService.class)); for (Class plugin : pluginsService.nodeServices()) { injector.getInstance(plugin).start(); } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java index 27c38e66074..683bebc55a3 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java @@ -24,7 +24,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; @@ -99,7 +98,7 @@ public class NodeJoinControllerTests extends ESTestCase { // make sure we have a master setState(clusterService, ClusterState.builder(clusterService.state()).nodes( DiscoveryNodes.builder(initialNodes).masterNodeId(localNode.getId()))); - nodeJoinController = new NodeJoinController(clusterService, new NoopRoutingService(Settings.EMPTY), + nodeJoinController = new NodeJoinController(clusterService, new NoopAllocationService(Settings.EMPTY), new ElectMasterService(Settings.EMPTY, Version.CURRENT), new DiscoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), Settings.EMPTY); @@ -298,7 +297,8 @@ public class NodeJoinControllerTests extends ESTestCase { } logger.debug("--> asserting master election didn't finish yet"); - assertThat("election finished after [" + initialJoins + "] master nodes but required joins is [" + requiredJoins + "]", electionFuture.isDone(), equalTo(false)); + assertThat("election finished after [" + initialJoins + "] master nodes but required joins is [" + requiredJoins + "]", + electionFuture.isDone(), equalTo(false)); final int finalJoins = requiredJoins - initialJoins + randomInt(5); nodesToJoin.clear(); @@ -374,7 +374,8 @@ public class NodeJoinControllerTests extends ESTestCase { nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueMillis(1), new NodeJoinController.ElectionCallback() { @Override public void onElectedAsMaster(ClusterState state) { - assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(), equalTo(true)); + assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(), + equalTo(true)); latch.countDown(); } @@ -492,7 +493,8 @@ public class NodeJoinControllerTests extends ESTestCase { nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.ElectionCallback() { @Override public void onElectedAsMaster(ClusterState state) { - assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(), equalTo(true)); + assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(), + equalTo(true)); latch.countDown(); } @@ -516,18 +518,6 @@ public class NodeJoinControllerTests extends ESTestCase { } - static class NoopRoutingService extends RoutingService { - - public NoopRoutingService(Settings settings) { - super(settings, null, new NoopAllocationService(settings)); - } - - @Override - protected void performReroute(String reason) { - - } - } - static class NoopAllocationService extends AllocationService { public NoopAllocationService(Settings settings) { @@ -535,12 +525,14 @@ public class NodeJoinControllerTests extends ESTestCase { } @Override - public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List startedShards, boolean withReroute) { + public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List startedShards, + boolean withReroute) { return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } @Override - public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List failedShards) { + public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, + List failedShards) { return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } diff --git a/core/src/test/java/org/elasticsearch/test/NoopDiscovery.java b/core/src/test/java/org/elasticsearch/test/NoopDiscovery.java index 3193aaf458e..967837e0044 100644 --- a/core/src/test/java/org/elasticsearch/test/NoopDiscovery.java +++ b/core/src/test/java/org/elasticsearch/test/NoopDiscovery.java @@ -20,7 +20,7 @@ package org.elasticsearch.test; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.discovery.Discovery; @@ -41,7 +41,7 @@ public class NoopDiscovery implements Discovery { } @Override - public void setRoutingService(RoutingService routingService) { + public void setAllocationService(AllocationService allocationService) { }