diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index d6ec9048688..78e7e15d389 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -72,6 +72,10 @@ public class RoutingService extends AbstractLifecycleComponent { protected void doClose() { } + public AllocationService getAllocationService() { + return this.allocationService; + } + /** * Initiates a reroute. */ diff --git a/core/src/main/java/org/elasticsearch/discovery/Discovery.java b/core/src/main/java/org/elasticsearch/discovery/Discovery.java index dabd45e36dd..778e2d15053 100644 --- a/core/src/main/java/org/elasticsearch/discovery/Discovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/Discovery.java @@ -22,7 +22,7 @@ package org.elasticsearch.discovery; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; @@ -44,7 +44,7 @@ public interface Discovery extends LifecycleComponent { * Another hack to solve dep injection problem..., note, this will be called before * any start is called. */ - void setAllocationService(AllocationService allocationService); + void setRoutingService(RoutingService routingService); /** * Publish all the changes to the cluster from the master (can be called just by the master). The publish diff --git a/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java index ec9158c5932..696eb8812c9 100644 --- a/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java @@ -28,7 +28,7 @@ import org.elasticsearch.cluster.IncompatibleClusterStateVersionException; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -61,7 +61,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0]; private final ClusterService clusterService; - private AllocationService allocationService; + private RoutingService routingService; private final ClusterName clusterName; private final DiscoverySettings discoverySettings; @@ -83,8 +83,8 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem } @Override - public void setAllocationService(AllocationService allocationService) { - this.allocationService = allocationService; + public void setRoutingService(RoutingService routingService) { + this.routingService = routingService; } @Override @@ -156,12 +156,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem nodesBuilder.put(discovery.localNode()); } nodesBuilder.localNodeId(master.localNode().getId()).masterNodeId(master.localNode().getId()); - currentState = ClusterState.builder(currentState).nodes(nodesBuilder).build(); - RoutingAllocation.Result result = master.allocationService.reroute(currentState, "node_add"); - if (result.changed()) { - currentState = ClusterState.builder(currentState).routingResult(result).build(); - } - return currentState; + return ClusterState.builder(currentState).nodes(nodesBuilder).build(); } @Override @@ -169,6 +164,13 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem logger.error("unexpected failure during [{}]", t, source); } + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + // we reroute not in the same cluster state update since in certain areas we rely on + // the node to be in the cluster state (sampled from ClusterService#state) to be there, also + // shard transitions need to better be handled in such cases + master.routingService.reroute("post_node_add"); + } }); } } // else, no master node, the next node that will start will fill things in... @@ -224,7 +226,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem } // reroute here, so we eagerly remove dead nodes from the routing ClusterState updatedState = ClusterState.builder(currentState).nodes(newNodes).build(); - RoutingAllocation.Result routingResult = master.allocationService.reroute( + RoutingAllocation.Result routingResult = master.routingService.getAllocationService().reroute( ClusterState.builder(updatedState).build(), "elected as master"); return ClusterState.builder(updatedState).routingResult(routingResult).build(); } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java index 49815cdb703..9659434e03d 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java @@ -29,7 +29,7 @@ import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; @@ -58,7 +58,7 @@ import java.util.concurrent.atomic.AtomicBoolean; public class NodeJoinController extends AbstractComponent { private final ClusterService clusterService; - private final AllocationService allocationService; + private final RoutingService routingService; private final ElectMasterService electMaster; private final DiscoverySettings discoverySettings; private final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(); @@ -68,11 +68,10 @@ public class NodeJoinController extends AbstractComponent { private ElectionContext electionContext = null; - public NodeJoinController(ClusterService clusterService, AllocationService allocationService, ElectMasterService electMaster, - DiscoverySettings discoverySettings, Settings settings) { + public NodeJoinController(ClusterService clusterService, RoutingService routingService, ElectMasterService electMaster, DiscoverySettings discoverySettings, Settings settings) { super(settings); this.clusterService = clusterService; - this.allocationService = allocationService; + this.routingService = routingService; this.electMaster = electMaster; this.discoverySettings = discoverySettings; } @@ -407,7 +406,21 @@ public class NodeJoinController extends AbstractComponent { ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()) .removeGlobalBlock(discoverySettings.getNoMasterBlock()).build(); newState.blocks(clusterBlocks); + newState.nodes(nodesBuilder); nodesChanged = true; + + // reroute now to remove any dead nodes (master may have stepped down when they left and didn't update the routing table) + // Note: also do it now to avoid assigning shards to these nodes. We will have another reroute after the cluster + // state is published. + // TODO: this publishing of a cluster state with no nodes assigned to joining nodes shouldn't be needed anymore. remove. + + final ClusterState tmpState = newState.build(); + RoutingAllocation.Result result = routingService.getAllocationService().reroute(tmpState, "nodes joined"); + newState = ClusterState.builder(tmpState); + if (result.changed()) { + newState.routingResult(result); + } + nodesBuilder = DiscoveryNodes.builder(tmpState.nodes()); } if (nodesBuilder.isLocalNodeElectedMaster() == false) { @@ -426,8 +439,7 @@ public class NodeJoinController extends AbstractComponent { for (DiscoveryNode existingNode : currentNodes) { if (node.getAddress().equals(existingNode.getAddress())) { nodesBuilder.remove(existingNode.getId()); - logger.warn("received join request from node [{}], but found existing node {} with same address, " + - "removing existing node", node, existingNode); + logger.warn("received join request from node [{}], but found existing node {} with same address, removing existing node", node, existingNode); } } } @@ -436,12 +448,6 @@ public class NodeJoinController extends AbstractComponent { if (nodesChanged) { newState.nodes(nodesBuilder); - final ClusterState tmpState = newState.build(); - RoutingAllocation.Result result = allocationService.reroute(tmpState, "node_join"); - newState = ClusterState.builder(tmpState); - if (result.changed()) { - newState.routingResult(result); - } } // we must return a new cluster state instance to force publishing. This is important @@ -457,6 +463,13 @@ public class NodeJoinController extends AbstractComponent { @Override public void clusterStatePublished(ClusterChangedEvent event) { + if (event.nodesDelta().hasChanges()) { + // we reroute not in the same cluster state update since in certain areas we rely on + // the node to be in the cluster state (sampled from ClusterService#state) to be there, also + // shard transitions need to better be handled in such cases + routingService.reroute("post_node_add"); + } + NodeJoinController.this.electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state()); } } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 6a9a64ff411..54ef9928585 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -32,7 +32,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; @@ -113,7 +113,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen private final TransportService transportService; private final ClusterService clusterService; - private AllocationService allocationService; + private RoutingService routingService; private final ClusterName clusterName; private final DiscoverySettings discoverySettings; private final ZenPingService pingService; @@ -146,7 +146,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen /** counts the time this node has joined the cluster or have elected it self as master */ private final AtomicLong clusterJoinsCounter = new AtomicLong(); - // must initialized in doStart(), when we have the allocationService set + // must initialized in doStart(), when we have the routingService set private volatile NodeJoinController nodeJoinController; @Inject @@ -206,8 +206,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } @Override - public void setAllocationService(AllocationService allocationService) { - this.allocationService = allocationService; + public void setRoutingService(RoutingService routingService) { + this.routingService = routingService; } @Override @@ -215,7 +215,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen nodesFD.setLocalNode(clusterService.localNode()); joinThreadControl.start(); pingService.start(); - this.nodeJoinController = new NodeJoinController(clusterService, allocationService, electMaster, discoverySettings, settings); + this.nodeJoinController = new NodeJoinController(clusterService, routingService, electMaster, discoverySettings, settings); } @Override @@ -516,7 +516,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen return rejoin(currentState, "not enough master nodes"); } // eagerly run reroute to remove dead nodes from routing table - RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(currentState).build(), + RoutingAllocation.Result routingResult = routingService.getAllocationService().reroute( + ClusterState.builder(currentState).build(), "[" + node + "] left"); return ClusterState.builder(currentState).routingResult(routingResult).build(); } @@ -560,7 +561,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen return rejoin(currentState, "not enough master nodes"); } // eagerly run reroute to remove dead nodes from routing table - RoutingAllocation.Result routingResult = allocationService.reroute( + RoutingAllocation.Result routingResult = routingService.getAllocationService().reroute( ClusterState.builder(currentState).build(), "[" + node + "] failed"); return ClusterState.builder(currentState).routingResult(routingResult).build(); diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index 3c036ff3f29..fb38c530d45 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -37,7 +37,6 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.cluster.routing.RoutingService; -import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.component.Lifecycle; @@ -328,7 +327,7 @@ public class Node implements Closeable { ESLogger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings)); logger.info("starting ..."); // hack around dependency injection problem (for now...) - injector.getInstance(Discovery.class).setAllocationService(injector.getInstance(AllocationService.class)); + injector.getInstance(Discovery.class).setRoutingService(injector.getInstance(RoutingService.class)); for (Class plugin : pluginsService.nodeServices()) { injector.getInstance(plugin).start(); } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java index e994c376c4d..135352343b6 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; @@ -98,7 +99,7 @@ public class NodeJoinControllerTests extends ESTestCase { // make sure we have a master setState(clusterService, ClusterState.builder(clusterService.state()).nodes( DiscoveryNodes.builder(initialNodes).masterNodeId(localNode.getId()))); - nodeJoinController = new NodeJoinController(clusterService, new NoopAllocationService(Settings.EMPTY), + nodeJoinController = new NodeJoinController(clusterService, new NoopRoutingService(Settings.EMPTY), new ElectMasterService(Settings.EMPTY), new DiscoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), Settings.EMPTY); @@ -297,8 +298,7 @@ public class NodeJoinControllerTests extends ESTestCase { } logger.debug("--> asserting master election didn't finish yet"); - assertThat("election finished after [" + initialJoins + "] master nodes but required joins is [" + requiredJoins + "]", - electionFuture.isDone(), equalTo(false)); + assertThat("election finished after [" + initialJoins + "] master nodes but required joins is [" + requiredJoins + "]", electionFuture.isDone(), equalTo(false)); final int finalJoins = requiredJoins - initialJoins + randomInt(5); nodesToJoin.clear(); @@ -374,8 +374,7 @@ public class NodeJoinControllerTests extends ESTestCase { nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueMillis(1), new NodeJoinController.ElectionCallback() { @Override public void onElectedAsMaster(ClusterState state) { - assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(), - equalTo(true)); + assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(), equalTo(true)); latch.countDown(); } @@ -493,8 +492,7 @@ public class NodeJoinControllerTests extends ESTestCase { nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.ElectionCallback() { @Override public void onElectedAsMaster(ClusterState state) { - assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(), - equalTo(true)); + assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(), equalTo(true)); latch.countDown(); } @@ -518,6 +516,18 @@ public class NodeJoinControllerTests extends ESTestCase { } + static class NoopRoutingService extends RoutingService { + + public NoopRoutingService(Settings settings) { + super(settings, null, new NoopAllocationService(settings)); + } + + @Override + protected void performReroute(String reason) { + + } + } + static class NoopAllocationService extends AllocationService { public NoopAllocationService(Settings settings) { @@ -525,14 +535,12 @@ public class NodeJoinControllerTests extends ESTestCase { } @Override - public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List startedShards, - boolean withReroute) { + public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List startedShards, boolean withReroute) { return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } @Override - public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, - List failedShards) { + public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List failedShards) { return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } diff --git a/core/src/test/java/org/elasticsearch/test/NoopDiscovery.java b/core/src/test/java/org/elasticsearch/test/NoopDiscovery.java index 967837e0044..3193aaf458e 100644 --- a/core/src/test/java/org/elasticsearch/test/NoopDiscovery.java +++ b/core/src/test/java/org/elasticsearch/test/NoopDiscovery.java @@ -20,7 +20,7 @@ package org.elasticsearch.test; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.discovery.Discovery; @@ -41,7 +41,7 @@ public class NoopDiscovery implements Discovery { } @Override - public void setAllocationService(AllocationService allocationService) { + public void setRoutingService(RoutingService routingService) { }