From 46b40f73b7697eeb84e542c2f1801a2f96501bdc Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Fri, 17 Jun 2016 17:32:38 +0200 Subject: [PATCH] Inline reroute with process of node join/master election (#18938) In the past, we had the semantics where the very first cluster state a node processed after joining could not contain shard assignment to it. This was to make sure the node cleans up local / stale shard copies before receiving new ones that might confuse it. Since then a lot of work in this area, most notably the introduction of allocation ids and #17270 . This means we don't have to be careful and just reroute in the same cluster state change where we process the join, keeping things simple and following the same pattern we have in other places. --- .../cluster/routing/RoutingService.java | 4 -- .../elasticsearch/discovery/Discovery.java | 4 +- .../discovery/local/LocalDiscovery.java | 24 ++++++------ .../discovery/zen/NodeJoinController.java | 39 +++++++------------ .../discovery/zen/ZenDiscovery.java | 17 ++++---- .../java/org/elasticsearch/node/Node.java | 3 +- .../zen/NodeJoinControllerTests.java | 30 ++++++-------- .../org/elasticsearch/test/NoopDiscovery.java | 4 +- 8 files changed, 49 insertions(+), 76 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index 78e7e15d389..d6ec9048688 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -72,10 +72,6 @@ public class RoutingService extends AbstractLifecycleComponent { protected void doClose() { } - public AllocationService getAllocationService() { - return this.allocationService; - } - /** * Initiates a reroute. */ diff --git a/core/src/main/java/org/elasticsearch/discovery/Discovery.java b/core/src/main/java/org/elasticsearch/discovery/Discovery.java index 778e2d15053..dabd45e36dd 100644 --- a/core/src/main/java/org/elasticsearch/discovery/Discovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/Discovery.java @@ -22,7 +22,7 @@ package org.elasticsearch.discovery; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.io.stream.StreamInput; @@ -44,7 +44,7 @@ public interface Discovery extends LifecycleComponent { * Another hack to solve dep injection problem..., note, this will be called before * any start is called. */ - void setRoutingService(RoutingService routingService); + void setAllocationService(AllocationService allocationService); /** * Publish all the changes to the cluster from the master (can be called just by the master). The publish diff --git a/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java index 696eb8812c9..ec9158c5932 100644 --- a/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/local/LocalDiscovery.java @@ -28,7 +28,7 @@ import org.elasticsearch.cluster.IncompatibleClusterStateVersionException; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.component.AbstractLifecycleComponent; @@ -61,7 +61,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0]; private final ClusterService clusterService; - private RoutingService routingService; + private AllocationService allocationService; private final ClusterName clusterName; private final DiscoverySettings discoverySettings; @@ -83,8 +83,8 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem } @Override - public void setRoutingService(RoutingService routingService) { - this.routingService = routingService; + public void setAllocationService(AllocationService allocationService) { + this.allocationService = allocationService; } @Override @@ -156,7 +156,12 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem nodesBuilder.put(discovery.localNode()); } nodesBuilder.localNodeId(master.localNode().getId()).masterNodeId(master.localNode().getId()); - return ClusterState.builder(currentState).nodes(nodesBuilder).build(); + currentState = ClusterState.builder(currentState).nodes(nodesBuilder).build(); + RoutingAllocation.Result result = master.allocationService.reroute(currentState, "node_add"); + if (result.changed()) { + currentState = ClusterState.builder(currentState).routingResult(result).build(); + } + return currentState; } @Override @@ -164,13 +169,6 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem logger.error("unexpected failure during [{}]", t, source); } - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - // we reroute not in the same cluster state update since in certain areas we rely on - // the node to be in the cluster state (sampled from ClusterService#state) to be there, also - // shard transitions need to better be handled in such cases - master.routingService.reroute("post_node_add"); - } }); } } // else, no master node, the next node that will start will fill things in... @@ -226,7 +224,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent implem } // reroute here, so we eagerly remove dead nodes from the routing ClusterState updatedState = ClusterState.builder(currentState).nodes(newNodes).build(); - RoutingAllocation.Result routingResult = master.routingService.getAllocationService().reroute( + RoutingAllocation.Result routingResult = master.allocationService.reroute( ClusterState.builder(updatedState).build(), "elected as master"); return ClusterState.builder(updatedState).routingResult(routingResult).build(); } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java index 9659434e03d..49815cdb703 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java @@ -29,7 +29,7 @@ import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; @@ -58,7 +58,7 @@ import java.util.concurrent.atomic.AtomicBoolean; public class NodeJoinController extends AbstractComponent { private final ClusterService clusterService; - private final RoutingService routingService; + private final AllocationService allocationService; private final ElectMasterService electMaster; private final DiscoverySettings discoverySettings; private final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(); @@ -68,10 +68,11 @@ public class NodeJoinController extends AbstractComponent { private ElectionContext electionContext = null; - public NodeJoinController(ClusterService clusterService, RoutingService routingService, ElectMasterService electMaster, DiscoverySettings discoverySettings, Settings settings) { + public NodeJoinController(ClusterService clusterService, AllocationService allocationService, ElectMasterService electMaster, + DiscoverySettings discoverySettings, Settings settings) { super(settings); this.clusterService = clusterService; - this.routingService = routingService; + this.allocationService = allocationService; this.electMaster = electMaster; this.discoverySettings = discoverySettings; } @@ -406,21 +407,7 @@ public class NodeJoinController extends AbstractComponent { ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()) .removeGlobalBlock(discoverySettings.getNoMasterBlock()).build(); newState.blocks(clusterBlocks); - newState.nodes(nodesBuilder); nodesChanged = true; - - // reroute now to remove any dead nodes (master may have stepped down when they left and didn't update the routing table) - // Note: also do it now to avoid assigning shards to these nodes. We will have another reroute after the cluster - // state is published. - // TODO: this publishing of a cluster state with no nodes assigned to joining nodes shouldn't be needed anymore. remove. - - final ClusterState tmpState = newState.build(); - RoutingAllocation.Result result = routingService.getAllocationService().reroute(tmpState, "nodes joined"); - newState = ClusterState.builder(tmpState); - if (result.changed()) { - newState.routingResult(result); - } - nodesBuilder = DiscoveryNodes.builder(tmpState.nodes()); } if (nodesBuilder.isLocalNodeElectedMaster() == false) { @@ -439,7 +426,8 @@ public class NodeJoinController extends AbstractComponent { for (DiscoveryNode existingNode : currentNodes) { if (node.getAddress().equals(existingNode.getAddress())) { nodesBuilder.remove(existingNode.getId()); - logger.warn("received join request from node [{}], but found existing node {} with same address, removing existing node", node, existingNode); + logger.warn("received join request from node [{}], but found existing node {} with same address, " + + "removing existing node", node, existingNode); } } } @@ -448,6 +436,12 @@ public class NodeJoinController extends AbstractComponent { if (nodesChanged) { newState.nodes(nodesBuilder); + final ClusterState tmpState = newState.build(); + RoutingAllocation.Result result = allocationService.reroute(tmpState, "node_join"); + newState = ClusterState.builder(tmpState); + if (result.changed()) { + newState.routingResult(result); + } } // we must return a new cluster state instance to force publishing. This is important @@ -463,13 +457,6 @@ public class NodeJoinController extends AbstractComponent { @Override public void clusterStatePublished(ClusterChangedEvent event) { - if (event.nodesDelta().hasChanges()) { - // we reroute not in the same cluster state update since in certain areas we rely on - // the node to be in the cluster state (sampled from ClusterService#state) to be there, also - // shard transitions need to better be handled in such cases - routingService.reroute("post_node_add"); - } - NodeJoinController.this.electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state()); } } diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index 54ef9928585..6a9a64ff411 100644 --- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -32,7 +32,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; @@ -113,7 +113,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen private final TransportService transportService; private final ClusterService clusterService; - private RoutingService routingService; + private AllocationService allocationService; private final ClusterName clusterName; private final DiscoverySettings discoverySettings; private final ZenPingService pingService; @@ -146,7 +146,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen /** counts the time this node has joined the cluster or have elected it self as master */ private final AtomicLong clusterJoinsCounter = new AtomicLong(); - // must initialized in doStart(), when we have the routingService set + // must initialized in doStart(), when we have the allocationService set private volatile NodeJoinController nodeJoinController; @Inject @@ -206,8 +206,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen } @Override - public void setRoutingService(RoutingService routingService) { - this.routingService = routingService; + public void setAllocationService(AllocationService allocationService) { + this.allocationService = allocationService; } @Override @@ -215,7 +215,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen nodesFD.setLocalNode(clusterService.localNode()); joinThreadControl.start(); pingService.start(); - this.nodeJoinController = new NodeJoinController(clusterService, routingService, electMaster, discoverySettings, settings); + this.nodeJoinController = new NodeJoinController(clusterService, allocationService, electMaster, discoverySettings, settings); } @Override @@ -516,8 +516,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen return rejoin(currentState, "not enough master nodes"); } // eagerly run reroute to remove dead nodes from routing table - RoutingAllocation.Result routingResult = routingService.getAllocationService().reroute( - ClusterState.builder(currentState).build(), + RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(currentState).build(), "[" + node + "] left"); return ClusterState.builder(currentState).routingResult(routingResult).build(); } @@ -561,7 +560,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen return rejoin(currentState, "not enough master nodes"); } // eagerly run reroute to remove dead nodes from routing table - RoutingAllocation.Result routingResult = routingService.getAllocationService().reroute( + RoutingAllocation.Result routingResult = allocationService.reroute( ClusterState.builder(currentState).build(), "[" + node + "] failed"); return ClusterState.builder(currentState).routingResult(routingResult).build(); diff --git a/core/src/main/java/org/elasticsearch/node/Node.java b/core/src/main/java/org/elasticsearch/node/Node.java index f34e16db140..043526306e9 100644 --- a/core/src/main/java/org/elasticsearch/node/Node.java +++ b/core/src/main/java/org/elasticsearch/node/Node.java @@ -37,6 +37,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.component.Lifecycle; @@ -321,7 +322,7 @@ public class Node implements Closeable { ESLogger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings)); logger.info("starting ..."); // hack around dependency injection problem (for now...) - injector.getInstance(Discovery.class).setRoutingService(injector.getInstance(RoutingService.class)); + injector.getInstance(Discovery.class).setAllocationService(injector.getInstance(AllocationService.class)); for (Class plugin : pluginsService.nodeServices()) { injector.getInstance(plugin).start(); } diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java index 27c38e66074..683bebc55a3 100644 --- a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java +++ b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java @@ -24,7 +24,6 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; @@ -99,7 +98,7 @@ public class NodeJoinControllerTests extends ESTestCase { // make sure we have a master setState(clusterService, ClusterState.builder(clusterService.state()).nodes( DiscoveryNodes.builder(initialNodes).masterNodeId(localNode.getId()))); - nodeJoinController = new NodeJoinController(clusterService, new NoopRoutingService(Settings.EMPTY), + nodeJoinController = new NodeJoinController(clusterService, new NoopAllocationService(Settings.EMPTY), new ElectMasterService(Settings.EMPTY, Version.CURRENT), new DiscoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), Settings.EMPTY); @@ -298,7 +297,8 @@ public class NodeJoinControllerTests extends ESTestCase { } logger.debug("--> asserting master election didn't finish yet"); - assertThat("election finished after [" + initialJoins + "] master nodes but required joins is [" + requiredJoins + "]", electionFuture.isDone(), equalTo(false)); + assertThat("election finished after [" + initialJoins + "] master nodes but required joins is [" + requiredJoins + "]", + electionFuture.isDone(), equalTo(false)); final int finalJoins = requiredJoins - initialJoins + randomInt(5); nodesToJoin.clear(); @@ -374,7 +374,8 @@ public class NodeJoinControllerTests extends ESTestCase { nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueMillis(1), new NodeJoinController.ElectionCallback() { @Override public void onElectedAsMaster(ClusterState state) { - assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(), equalTo(true)); + assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(), + equalTo(true)); latch.countDown(); } @@ -492,7 +493,8 @@ public class NodeJoinControllerTests extends ESTestCase { nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.ElectionCallback() { @Override public void onElectedAsMaster(ClusterState state) { - assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(), equalTo(true)); + assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(), + equalTo(true)); latch.countDown(); } @@ -516,18 +518,6 @@ public class NodeJoinControllerTests extends ESTestCase { } - static class NoopRoutingService extends RoutingService { - - public NoopRoutingService(Settings settings) { - super(settings, null, new NoopAllocationService(settings)); - } - - @Override - protected void performReroute(String reason) { - - } - } - static class NoopAllocationService extends AllocationService { public NoopAllocationService(Settings settings) { @@ -535,12 +525,14 @@ public class NodeJoinControllerTests extends ESTestCase { } @Override - public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List startedShards, boolean withReroute) { + public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List startedShards, + boolean withReroute) { return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } @Override - public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List failedShards) { + public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, + List failedShards) { return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); } diff --git a/core/src/test/java/org/elasticsearch/test/NoopDiscovery.java b/core/src/test/java/org/elasticsearch/test/NoopDiscovery.java index 3193aaf458e..967837e0044 100644 --- a/core/src/test/java/org/elasticsearch/test/NoopDiscovery.java +++ b/core/src/test/java/org/elasticsearch/test/NoopDiscovery.java @@ -20,7 +20,7 @@ package org.elasticsearch.test; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.RoutingService; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.discovery.Discovery; @@ -41,7 +41,7 @@ public class NoopDiscovery implements Discovery { } @Override - public void setRoutingService(RoutingService routingService) { + public void setAllocationService(AllocationService allocationService) { }