Inline reroute with process of node join/master election (#18938)

In the past, we had the semantics where the very first cluster state a node processed after joining could not contain shard assignment to it. This was to make sure the node cleans up local / stale shard copies before receiving new ones that might confuse it. Since then a lot of work in this area, most notably the introduction of allocation ids and #17270 . This means we don't have to be careful and just reroute in the same cluster state change where we process the join, keeping things simple and following the same pattern we have in other places.
This commit is contained in:
Boaz Leskes 2016-06-17 17:32:38 +02:00 committed by GitHub
parent bf7a6f5509
commit 46b40f73b7
8 changed files with 49 additions and 76 deletions

View File

@ -72,10 +72,6 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> {
protected void doClose() {
}
public AllocationService getAllocationService() {
return this.allocationService;
}
/**
* Initiates a reroute.
*/

View File

@ -22,7 +22,7 @@ package org.elasticsearch.discovery;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
@ -44,7 +44,7 @@ public interface Discovery extends LifecycleComponent<Discovery> {
* Another hack to solve dep injection problem..., note, this will be called before
* any start is called.
*/
void setRoutingService(RoutingService routingService);
void setAllocationService(AllocationService allocationService);
/**
* Publish all the changes to the cluster from the master (can be called just by the master). The publish

View File

@ -28,7 +28,7 @@ import org.elasticsearch.cluster.IncompatibleClusterStateVersionException;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -61,7 +61,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0];
private final ClusterService clusterService;
private RoutingService routingService;
private AllocationService allocationService;
private final ClusterName clusterName;
private final DiscoverySettings discoverySettings;
@ -83,8 +83,8 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
}
@Override
public void setRoutingService(RoutingService routingService) {
this.routingService = routingService;
public void setAllocationService(AllocationService allocationService) {
this.allocationService = allocationService;
}
@Override
@ -156,7 +156,12 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
nodesBuilder.put(discovery.localNode());
}
nodesBuilder.localNodeId(master.localNode().getId()).masterNodeId(master.localNode().getId());
return ClusterState.builder(currentState).nodes(nodesBuilder).build();
currentState = ClusterState.builder(currentState).nodes(nodesBuilder).build();
RoutingAllocation.Result result = master.allocationService.reroute(currentState, "node_add");
if (result.changed()) {
currentState = ClusterState.builder(currentState).routingResult(result).build();
}
return currentState;
}
@Override
@ -164,13 +169,6 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
logger.error("unexpected failure during [{}]", t, source);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
// we reroute not in the same cluster state update since in certain areas we rely on
// the node to be in the cluster state (sampled from ClusterService#state) to be there, also
// shard transitions need to better be handled in such cases
master.routingService.reroute("post_node_add");
}
});
}
} // else, no master node, the next node that will start will fill things in...
@ -226,7 +224,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
}
// reroute here, so we eagerly remove dead nodes from the routing
ClusterState updatedState = ClusterState.builder(currentState).nodes(newNodes).build();
RoutingAllocation.Result routingResult = master.routingService.getAllocationService().reroute(
RoutingAllocation.Result routingResult = master.allocationService.reroute(
ClusterState.builder(updatedState).build(), "elected as master");
return ClusterState.builder(updatedState).routingResult(routingResult).build();
}

View File

@ -29,7 +29,7 @@ import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
@ -58,7 +58,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class NodeJoinController extends AbstractComponent {
private final ClusterService clusterService;
private final RoutingService routingService;
private final AllocationService allocationService;
private final ElectMasterService electMaster;
private final DiscoverySettings discoverySettings;
private final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor();
@ -68,10 +68,11 @@ public class NodeJoinController extends AbstractComponent {
private ElectionContext electionContext = null;
public NodeJoinController(ClusterService clusterService, RoutingService routingService, ElectMasterService electMaster, DiscoverySettings discoverySettings, Settings settings) {
public NodeJoinController(ClusterService clusterService, AllocationService allocationService, ElectMasterService electMaster,
DiscoverySettings discoverySettings, Settings settings) {
super(settings);
this.clusterService = clusterService;
this.routingService = routingService;
this.allocationService = allocationService;
this.electMaster = electMaster;
this.discoverySettings = discoverySettings;
}
@ -406,21 +407,7 @@ public class NodeJoinController extends AbstractComponent {
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks())
.removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
newState.blocks(clusterBlocks);
newState.nodes(nodesBuilder);
nodesChanged = true;
// reroute now to remove any dead nodes (master may have stepped down when they left and didn't update the routing table)
// Note: also do it now to avoid assigning shards to these nodes. We will have another reroute after the cluster
// state is published.
// TODO: this publishing of a cluster state with no nodes assigned to joining nodes shouldn't be needed anymore. remove.
final ClusterState tmpState = newState.build();
RoutingAllocation.Result result = routingService.getAllocationService().reroute(tmpState, "nodes joined");
newState = ClusterState.builder(tmpState);
if (result.changed()) {
newState.routingResult(result);
}
nodesBuilder = DiscoveryNodes.builder(tmpState.nodes());
}
if (nodesBuilder.isLocalNodeElectedMaster() == false) {
@ -439,7 +426,8 @@ public class NodeJoinController extends AbstractComponent {
for (DiscoveryNode existingNode : currentNodes) {
if (node.getAddress().equals(existingNode.getAddress())) {
nodesBuilder.remove(existingNode.getId());
logger.warn("received join request from node [{}], but found existing node {} with same address, removing existing node", node, existingNode);
logger.warn("received join request from node [{}], but found existing node {} with same address, " +
"removing existing node", node, existingNode);
}
}
}
@ -448,6 +436,12 @@ public class NodeJoinController extends AbstractComponent {
if (nodesChanged) {
newState.nodes(nodesBuilder);
final ClusterState tmpState = newState.build();
RoutingAllocation.Result result = allocationService.reroute(tmpState, "node_join");
newState = ClusterState.builder(tmpState);
if (result.changed()) {
newState.routingResult(result);
}
}
// we must return a new cluster state instance to force publishing. This is important
@ -463,13 +457,6 @@ public class NodeJoinController extends AbstractComponent {
@Override
public void clusterStatePublished(ClusterChangedEvent event) {
if (event.nodesDelta().hasChanges()) {
// we reroute not in the same cluster state update since in certain areas we rely on
// the node to be in the cluster state (sampled from ClusterService#state) to be there, also
// shard transitions need to better be handled in such cases
routingService.reroute("post_node_add");
}
NodeJoinController.this.electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state());
}
}

View File

@ -32,7 +32,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
@ -113,7 +113,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
private final TransportService transportService;
private final ClusterService clusterService;
private RoutingService routingService;
private AllocationService allocationService;
private final ClusterName clusterName;
private final DiscoverySettings discoverySettings;
private final ZenPingService pingService;
@ -146,7 +146,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
/** counts the time this node has joined the cluster or have elected it self as master */
private final AtomicLong clusterJoinsCounter = new AtomicLong();
// must initialized in doStart(), when we have the routingService set
// must initialized in doStart(), when we have the allocationService set
private volatile NodeJoinController nodeJoinController;
@Inject
@ -206,8 +206,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
@Override
public void setRoutingService(RoutingService routingService) {
this.routingService = routingService;
public void setAllocationService(AllocationService allocationService) {
this.allocationService = allocationService;
}
@Override
@ -215,7 +215,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
nodesFD.setLocalNode(clusterService.localNode());
joinThreadControl.start();
pingService.start();
this.nodeJoinController = new NodeJoinController(clusterService, routingService, electMaster, discoverySettings, settings);
this.nodeJoinController = new NodeJoinController(clusterService, allocationService, electMaster, discoverySettings, settings);
}
@Override
@ -516,8 +516,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return rejoin(currentState, "not enough master nodes");
}
// eagerly run reroute to remove dead nodes from routing table
RoutingAllocation.Result routingResult = routingService.getAllocationService().reroute(
ClusterState.builder(currentState).build(),
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(currentState).build(),
"[" + node + "] left");
return ClusterState.builder(currentState).routingResult(routingResult).build();
}
@ -561,7 +560,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return rejoin(currentState, "not enough master nodes");
}
// eagerly run reroute to remove dead nodes from routing table
RoutingAllocation.Result routingResult = routingService.getAllocationService().reroute(
RoutingAllocation.Result routingResult = allocationService.reroute(
ClusterState.builder(currentState).build(),
"[" + node + "] failed");
return ClusterState.builder(currentState).routingResult(routingResult).build();

View File

@ -37,6 +37,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.component.Lifecycle;
@ -321,7 +322,7 @@ public class Node implements Closeable {
ESLogger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
logger.info("starting ...");
// hack around dependency injection problem (for now...)
injector.getInstance(Discovery.class).setRoutingService(injector.getInstance(RoutingService.class));
injector.getInstance(Discovery.class).setAllocationService(injector.getInstance(AllocationService.class));
for (Class<? extends LifecycleComponent> plugin : pluginsService.nodeServices()) {
injector.getInstance(plugin).start();
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
@ -99,7 +98,7 @@ public class NodeJoinControllerTests extends ESTestCase {
// make sure we have a master
setState(clusterService, ClusterState.builder(clusterService.state()).nodes(
DiscoveryNodes.builder(initialNodes).masterNodeId(localNode.getId())));
nodeJoinController = new NodeJoinController(clusterService, new NoopRoutingService(Settings.EMPTY),
nodeJoinController = new NodeJoinController(clusterService, new NoopAllocationService(Settings.EMPTY),
new ElectMasterService(Settings.EMPTY, Version.CURRENT),
new DiscoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
Settings.EMPTY);
@ -298,7 +297,8 @@ public class NodeJoinControllerTests extends ESTestCase {
}
logger.debug("--> asserting master election didn't finish yet");
assertThat("election finished after [" + initialJoins + "] master nodes but required joins is [" + requiredJoins + "]", electionFuture.isDone(), equalTo(false));
assertThat("election finished after [" + initialJoins + "] master nodes but required joins is [" + requiredJoins + "]",
electionFuture.isDone(), equalTo(false));
final int finalJoins = requiredJoins - initialJoins + randomInt(5);
nodesToJoin.clear();
@ -374,7 +374,8 @@ public class NodeJoinControllerTests extends ESTestCase {
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueMillis(1), new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(), equalTo(true));
assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(),
equalTo(true));
latch.countDown();
}
@ -492,7 +493,8 @@ public class NodeJoinControllerTests extends ESTestCase {
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(), equalTo(true));
assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(),
equalTo(true));
latch.countDown();
}
@ -516,18 +518,6 @@ public class NodeJoinControllerTests extends ESTestCase {
}
static class NoopRoutingService extends RoutingService {
public NoopRoutingService(Settings settings) {
super(settings, null, new NoopAllocationService(settings));
}
@Override
protected void performReroute(String reason) {
}
}
static class NoopAllocationService extends AllocationService {
public NoopAllocationService(Settings settings) {
@ -535,12 +525,14 @@ public class NodeJoinControllerTests extends ESTestCase {
}
@Override
public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List<? extends ShardRouting> startedShards, boolean withReroute) {
public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List<? extends ShardRouting> startedShards,
boolean withReroute) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
}
@Override
public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List<FailedRerouteAllocation.FailedShard> failedShards) {
public RoutingAllocation.Result applyFailedShards(ClusterState clusterState,
List<FailedRerouteAllocation.FailedShard> failedShards) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
}

View File

@ -20,7 +20,7 @@ package org.elasticsearch.test;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.discovery.Discovery;
@ -41,7 +41,7 @@ public class NoopDiscovery implements Discovery {
}
@Override
public void setRoutingService(RoutingService routingService) {
public void setAllocationService(AllocationService allocationService) {
}