revert - Inline reroute with process of node join/master election (#18938)
There are secondary issues with async shard fetch going out to nodes before they have a cluster state published to them that need to be solved first. For example: - async fetch uses transport node action that resolves nodes based on the cluster state (but it's not yet exposed by ClusterService since we inline the reroute) - after disruption nodes will respond with an allocated shard (they didn't clean up their shards yet) which throws of decisions master side. - nodes deed the index meta data in question but they may not have if they didn't recieve the latest CS
This commit is contained in:
parent
157645fe9e
commit
4be94cdc95
|
@ -72,6 +72,10 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> {
|
|||
protected void doClose() {
|
||||
}
|
||||
|
||||
public AllocationService getAllocationService() {
|
||||
return this.allocationService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initiates a reroute.
|
||||
*/
|
||||
|
|
|
@ -22,7 +22,7 @@ package org.elasticsearch.discovery;
|
|||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.routing.RoutingService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
|
@ -44,7 +44,7 @@ public interface Discovery extends LifecycleComponent<Discovery> {
|
|||
* Another hack to solve dep injection problem..., note, this will be called before
|
||||
* any start is called.
|
||||
*/
|
||||
void setAllocationService(AllocationService allocationService);
|
||||
void setRoutingService(RoutingService routingService);
|
||||
|
||||
/**
|
||||
* Publish all the changes to the cluster from the master (can be called just by the master). The publish
|
||||
|
|
|
@ -28,7 +28,7 @@ import org.elasticsearch.cluster.IncompatibleClusterStateVersionException;
|
|||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.routing.RoutingService;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
|
@ -61,7 +61,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
|
|||
private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0];
|
||||
|
||||
private final ClusterService clusterService;
|
||||
private AllocationService allocationService;
|
||||
private RoutingService routingService;
|
||||
private final ClusterName clusterName;
|
||||
|
||||
private final DiscoverySettings discoverySettings;
|
||||
|
@ -83,8 +83,8 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setAllocationService(AllocationService allocationService) {
|
||||
this.allocationService = allocationService;
|
||||
public void setRoutingService(RoutingService routingService) {
|
||||
this.routingService = routingService;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -156,12 +156,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
|
|||
nodesBuilder.put(discovery.localNode());
|
||||
}
|
||||
nodesBuilder.localNodeId(master.localNode().getId()).masterNodeId(master.localNode().getId());
|
||||
currentState = ClusterState.builder(currentState).nodes(nodesBuilder).build();
|
||||
RoutingAllocation.Result result = master.allocationService.reroute(currentState, "node_add");
|
||||
if (result.changed()) {
|
||||
currentState = ClusterState.builder(currentState).routingResult(result).build();
|
||||
}
|
||||
return currentState;
|
||||
return ClusterState.builder(currentState).nodes(nodesBuilder).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -169,6 +164,13 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
|
|||
logger.error("unexpected failure during [{}]", t, source);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
// we reroute not in the same cluster state update since in certain areas we rely on
|
||||
// the node to be in the cluster state (sampled from ClusterService#state) to be there, also
|
||||
// shard transitions need to better be handled in such cases
|
||||
master.routingService.reroute("post_node_add");
|
||||
}
|
||||
});
|
||||
}
|
||||
} // else, no master node, the next node that will start will fill things in...
|
||||
|
@ -224,7 +226,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
|
|||
}
|
||||
// reroute here, so we eagerly remove dead nodes from the routing
|
||||
ClusterState updatedState = ClusterState.builder(currentState).nodes(newNodes).build();
|
||||
RoutingAllocation.Result routingResult = master.allocationService.reroute(
|
||||
RoutingAllocation.Result routingResult = master.routingService.getAllocationService().reroute(
|
||||
ClusterState.builder(updatedState).build(), "elected as master");
|
||||
return ClusterState.builder(updatedState).routingResult(routingResult).build();
|
||||
}
|
||||
|
|
|
@ -29,7 +29,7 @@ import org.elasticsearch.cluster.NotMasterException;
|
|||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.routing.RoutingService;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
|
@ -58,7 +58,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
public class NodeJoinController extends AbstractComponent {
|
||||
|
||||
private final ClusterService clusterService;
|
||||
private final AllocationService allocationService;
|
||||
private final RoutingService routingService;
|
||||
private final ElectMasterService electMaster;
|
||||
private final DiscoverySettings discoverySettings;
|
||||
private final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor();
|
||||
|
@ -68,11 +68,10 @@ public class NodeJoinController extends AbstractComponent {
|
|||
private ElectionContext electionContext = null;
|
||||
|
||||
|
||||
public NodeJoinController(ClusterService clusterService, AllocationService allocationService, ElectMasterService electMaster,
|
||||
DiscoverySettings discoverySettings, Settings settings) {
|
||||
public NodeJoinController(ClusterService clusterService, RoutingService routingService, ElectMasterService electMaster, DiscoverySettings discoverySettings, Settings settings) {
|
||||
super(settings);
|
||||
this.clusterService = clusterService;
|
||||
this.allocationService = allocationService;
|
||||
this.routingService = routingService;
|
||||
this.electMaster = electMaster;
|
||||
this.discoverySettings = discoverySettings;
|
||||
}
|
||||
|
@ -407,7 +406,21 @@ public class NodeJoinController extends AbstractComponent {
|
|||
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks())
|
||||
.removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
|
||||
newState.blocks(clusterBlocks);
|
||||
newState.nodes(nodesBuilder);
|
||||
nodesChanged = true;
|
||||
|
||||
// reroute now to remove any dead nodes (master may have stepped down when they left and didn't update the routing table)
|
||||
// Note: also do it now to avoid assigning shards to these nodes. We will have another reroute after the cluster
|
||||
// state is published.
|
||||
// TODO: this publishing of a cluster state with no nodes assigned to joining nodes shouldn't be needed anymore. remove.
|
||||
|
||||
final ClusterState tmpState = newState.build();
|
||||
RoutingAllocation.Result result = routingService.getAllocationService().reroute(tmpState, "nodes joined");
|
||||
newState = ClusterState.builder(tmpState);
|
||||
if (result.changed()) {
|
||||
newState.routingResult(result);
|
||||
}
|
||||
nodesBuilder = DiscoveryNodes.builder(tmpState.nodes());
|
||||
}
|
||||
|
||||
if (nodesBuilder.isLocalNodeElectedMaster() == false) {
|
||||
|
@ -426,8 +439,7 @@ public class NodeJoinController extends AbstractComponent {
|
|||
for (DiscoveryNode existingNode : currentNodes) {
|
||||
if (node.getAddress().equals(existingNode.getAddress())) {
|
||||
nodesBuilder.remove(existingNode.getId());
|
||||
logger.warn("received join request from node [{}], but found existing node {} with same address, " +
|
||||
"removing existing node", node, existingNode);
|
||||
logger.warn("received join request from node [{}], but found existing node {} with same address, removing existing node", node, existingNode);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -436,12 +448,6 @@ public class NodeJoinController extends AbstractComponent {
|
|||
|
||||
if (nodesChanged) {
|
||||
newState.nodes(nodesBuilder);
|
||||
final ClusterState tmpState = newState.build();
|
||||
RoutingAllocation.Result result = allocationService.reroute(tmpState, "node_join");
|
||||
newState = ClusterState.builder(tmpState);
|
||||
if (result.changed()) {
|
||||
newState.routingResult(result);
|
||||
}
|
||||
}
|
||||
|
||||
// we must return a new cluster state instance to force publishing. This is important
|
||||
|
@ -457,6 +463,13 @@ public class NodeJoinController extends AbstractComponent {
|
|||
|
||||
@Override
|
||||
public void clusterStatePublished(ClusterChangedEvent event) {
|
||||
if (event.nodesDelta().hasChanges()) {
|
||||
// we reroute not in the same cluster state update since in certain areas we rely on
|
||||
// the node to be in the cluster state (sampled from ClusterService#state) to be there, also
|
||||
// shard transitions need to better be handled in such cases
|
||||
routingService.reroute("post_node_add");
|
||||
}
|
||||
|
||||
NodeJoinController.this.electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.routing.RoutingService;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
|
@ -113,7 +113,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|||
|
||||
private final TransportService transportService;
|
||||
private final ClusterService clusterService;
|
||||
private AllocationService allocationService;
|
||||
private RoutingService routingService;
|
||||
private final ClusterName clusterName;
|
||||
private final DiscoverySettings discoverySettings;
|
||||
private final ZenPingService pingService;
|
||||
|
@ -146,7 +146,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|||
/** counts the time this node has joined the cluster or have elected it self as master */
|
||||
private final AtomicLong clusterJoinsCounter = new AtomicLong();
|
||||
|
||||
// must initialized in doStart(), when we have the allocationService set
|
||||
// must initialized in doStart(), when we have the routingService set
|
||||
private volatile NodeJoinController nodeJoinController;
|
||||
|
||||
@Inject
|
||||
|
@ -206,8 +206,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setAllocationService(AllocationService allocationService) {
|
||||
this.allocationService = allocationService;
|
||||
public void setRoutingService(RoutingService routingService) {
|
||||
this.routingService = routingService;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -215,7 +215,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|||
nodesFD.setLocalNode(clusterService.localNode());
|
||||
joinThreadControl.start();
|
||||
pingService.start();
|
||||
this.nodeJoinController = new NodeJoinController(clusterService, allocationService, electMaster, discoverySettings, settings);
|
||||
this.nodeJoinController = new NodeJoinController(clusterService, routingService, electMaster, discoverySettings, settings);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -516,7 +516,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|||
return rejoin(currentState, "not enough master nodes");
|
||||
}
|
||||
// eagerly run reroute to remove dead nodes from routing table
|
||||
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(currentState).build(),
|
||||
RoutingAllocation.Result routingResult = routingService.getAllocationService().reroute(
|
||||
ClusterState.builder(currentState).build(),
|
||||
"[" + node + "] left");
|
||||
return ClusterState.builder(currentState).routingResult(routingResult).build();
|
||||
}
|
||||
|
@ -560,7 +561,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|||
return rejoin(currentState, "not enough master nodes");
|
||||
}
|
||||
// eagerly run reroute to remove dead nodes from routing table
|
||||
RoutingAllocation.Result routingResult = allocationService.reroute(
|
||||
RoutingAllocation.Result routingResult = routingService.getAllocationService().reroute(
|
||||
ClusterState.builder(currentState).build(),
|
||||
"[" + node + "] failed");
|
||||
return ClusterState.builder(currentState).routingResult(routingResult).build();
|
||||
|
|
|
@ -37,7 +37,6 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeService;
|
||||
import org.elasticsearch.cluster.routing.RoutingService;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.StopWatch;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
|
@ -328,7 +327,7 @@ public class Node implements Closeable {
|
|||
ESLogger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
|
||||
logger.info("starting ...");
|
||||
// hack around dependency injection problem (for now...)
|
||||
injector.getInstance(Discovery.class).setAllocationService(injector.getInstance(AllocationService.class));
|
||||
injector.getInstance(Discovery.class).setRoutingService(injector.getInstance(RoutingService.class));
|
||||
for (Class<? extends LifecycleComponent> plugin : pluginsService.nodeServices()) {
|
||||
injector.getInstance(plugin).start();
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.NotMasterException;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingService;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
|
||||
|
@ -98,7 +99,7 @@ public class NodeJoinControllerTests extends ESTestCase {
|
|||
// make sure we have a master
|
||||
setState(clusterService, ClusterState.builder(clusterService.state()).nodes(
|
||||
DiscoveryNodes.builder(initialNodes).masterNodeId(localNode.getId())));
|
||||
nodeJoinController = new NodeJoinController(clusterService, new NoopAllocationService(Settings.EMPTY),
|
||||
nodeJoinController = new NodeJoinController(clusterService, new NoopRoutingService(Settings.EMPTY),
|
||||
new ElectMasterService(Settings.EMPTY),
|
||||
new DiscoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
|
||||
Settings.EMPTY);
|
||||
|
@ -297,8 +298,7 @@ public class NodeJoinControllerTests extends ESTestCase {
|
|||
}
|
||||
|
||||
logger.debug("--> asserting master election didn't finish yet");
|
||||
assertThat("election finished after [" + initialJoins + "] master nodes but required joins is [" + requiredJoins + "]",
|
||||
electionFuture.isDone(), equalTo(false));
|
||||
assertThat("election finished after [" + initialJoins + "] master nodes but required joins is [" + requiredJoins + "]", electionFuture.isDone(), equalTo(false));
|
||||
|
||||
final int finalJoins = requiredJoins - initialJoins + randomInt(5);
|
||||
nodesToJoin.clear();
|
||||
|
@ -374,8 +374,7 @@ public class NodeJoinControllerTests extends ESTestCase {
|
|||
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueMillis(1), new NodeJoinController.ElectionCallback() {
|
||||
@Override
|
||||
public void onElectedAsMaster(ClusterState state) {
|
||||
assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(),
|
||||
equalTo(true));
|
||||
assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(), equalTo(true));
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
|
@ -493,8 +492,7 @@ public class NodeJoinControllerTests extends ESTestCase {
|
|||
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.ElectionCallback() {
|
||||
@Override
|
||||
public void onElectedAsMaster(ClusterState state) {
|
||||
assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(),
|
||||
equalTo(true));
|
||||
assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(), equalTo(true));
|
||||
latch.countDown();
|
||||
}
|
||||
|
||||
|
@ -518,6 +516,18 @@ public class NodeJoinControllerTests extends ESTestCase {
|
|||
}
|
||||
|
||||
|
||||
static class NoopRoutingService extends RoutingService {
|
||||
|
||||
public NoopRoutingService(Settings settings) {
|
||||
super(settings, null, new NoopAllocationService(settings));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void performReroute(String reason) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
static class NoopAllocationService extends AllocationService {
|
||||
|
||||
public NoopAllocationService(Settings settings) {
|
||||
|
@ -525,14 +535,12 @@ public class NodeJoinControllerTests extends ESTestCase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List<? extends ShardRouting> startedShards,
|
||||
boolean withReroute) {
|
||||
public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List<? extends ShardRouting> startedShards, boolean withReroute) {
|
||||
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
|
||||
}
|
||||
|
||||
@Override
|
||||
public RoutingAllocation.Result applyFailedShards(ClusterState clusterState,
|
||||
List<FailedRerouteAllocation.FailedShard> failedShards) {
|
||||
public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List<FailedRerouteAllocation.FailedShard> failedShards) {
|
||||
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ package org.elasticsearch.test;
|
|||
|
||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.routing.RoutingService;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
import org.elasticsearch.common.component.LifecycleListener;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
|
@ -41,7 +41,7 @@ public class NoopDiscovery implements Discovery {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void setAllocationService(AllocationService allocationService) {
|
||||
public void setRoutingService(RoutingService routingService) {
|
||||
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue