Discovery: reroute after node join is processed

- shard listing actions underpinning shard allocation do not have access to that new node yet (causing errors during shard allocation see #11923
 - the very first cluster state published to a node already has shard assignments to it. This surfaced other issues we are working to fix separately

 This commit changes the reroute to be done post processing the initial join cluster state to side step these issues while we work on a longer term solution.

Closes #11960
This commit is contained in:
Boaz Leskes 2015-06-30 21:47:26 +02:00
parent 48b85421ec
commit 182c59f5b4
5 changed files with 34 additions and 20 deletions

View File

@ -83,6 +83,10 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
clusterService.remove(this);
}
public AllocationService getAllocationService() {
return this.allocationService;
}
/**
* Initiates a reroute.
*/

View File

@ -22,6 +22,7 @@ package org.elasticsearch.discovery;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.LifecycleComponent;
@ -51,7 +52,7 @@ public interface Discovery extends LifecycleComponent<Discovery> {
* Another hack to solve dep injection problem..., note, this will be called before
* any start is called.
*/
void setAllocationService(AllocationService allocationService);
void setRoutingService(RoutingService routingService);
/**
* Publish all the changes to the cluster from the master (can be called just by the master). The publish

View File

@ -26,6 +26,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -60,7 +61,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
private final TransportService transportService;
private final ClusterService clusterService;
private final DiscoveryNodeService discoveryNodeService;
private AllocationService allocationService;
private RoutingService routingService;
private final ClusterName clusterName;
private final Version version;
@ -96,8 +97,8 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
}
@Override
public void setAllocationService(AllocationService allocationService) {
this.allocationService = allocationService;
public void setRoutingService(RoutingService routingService) {
this.routingService = routingService;
}
@Override
@ -176,9 +177,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
nodesBuilder.put(discovery.localNode);
}
nodesBuilder.localNodeId(master.localNode().id()).masterNodeId(master.localNode().id());
ClusterState updatedState = ClusterState.builder(currentState).nodes(nodesBuilder).build();
RoutingAllocation.Result routingResult = master.allocationService.reroute(ClusterState.builder(updatedState).build());
return ClusterState.builder(updatedState).routingResult(routingResult).build();
return ClusterState.builder(currentState).nodes(nodesBuilder).build();
}
@Override
@ -189,6 +188,10 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
sendInitialStateEventIfNeeded();
// we reroute not in the same cluster state update since in certain areas we rely on
// the node to be in the cluster state (sampled from ClusterService#state) to be there, also
// shard transitions need to better be handled in such cases
master.routingService.reroute("post_node_add");
}
});
}
@ -240,7 +243,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
}
// reroute here, so we eagerly remove dead nodes from the routing
ClusterState updatedState = ClusterState.builder(currentState).nodes(newNodes).build();
RoutingAllocation.Result routingResult = master.allocationService.reroute(ClusterState.builder(updatedState).build());
RoutingAllocation.Result routingResult = master.routingService.getAllocationService().reroute(ClusterState.builder(updatedState).build());
return ClusterState.builder(updatedState).routingResult(routingResult).build();
}

View File

@ -29,7 +29,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.InternalClusterService;
import org.elasticsearch.cluster.settings.ClusterDynamicSettings;
@ -97,7 +97,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
private final TransportService transportService;
private final ClusterService clusterService;
private AllocationService allocationService;
private RoutingService routingService;
private final ClusterName clusterName;
private final DiscoverySettings discoverySettings;
private final ZenPingService pingService;
@ -219,8 +219,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
@Override
public void setAllocationService(AllocationService allocationService) {
this.allocationService = allocationService;
public void setRoutingService(RoutingService routingService) {
this.routingService = routingService;
}
@Override
@ -379,7 +379,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
currentState = ClusterState.builder(currentState).nodes(builder).blocks(clusterBlocks).build();
// eagerly run reroute to remove dead nodes from routing table
RoutingAllocation.Result result = allocationService.reroute(currentState);
RoutingAllocation.Result result = routingService.getAllocationService().reroute(currentState);
return ClusterState.builder(currentState).routingResult(result).build();
}
@ -510,7 +510,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return rejoin(currentState, "not enough master nodes");
}
// eagerly run reroute to remove dead nodes from routing table
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(currentState).build());
RoutingAllocation.Result routingResult = routingService.getAllocationService().reroute(ClusterState.builder(currentState).build());
return ClusterState.builder(currentState).routingResult(routingResult).build();
}
@ -553,7 +553,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return rejoin(currentState, "not enough master nodes");
}
// eagerly run reroute to remove dead nodes from routing table
RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(currentState).build());
RoutingAllocation.Result routingResult = routingService.getAllocationService().reroute(ClusterState.builder(currentState).build());
return ClusterState.builder(currentState).routingResult(routingResult).build();
}
@ -902,6 +902,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
private final List<Tuple<DiscoveryNode, MembershipAction.JoinCallback>> drainedJoinRequests = new ArrayList<>();
private boolean nodeAdded = false;
@Override
public ClusterState execute(ClusterState currentState) {
@ -910,14 +911,13 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return currentState;
}
boolean modified = false;
DiscoveryNodes.Builder nodesBuilder = DiscoveryNodes.builder(currentState.nodes());
for (Tuple<DiscoveryNode, MembershipAction.JoinCallback> task : drainedJoinRequests) {
DiscoveryNode node = task.v1();
if (currentState.nodes().nodeExists(node.id())) {
logger.debug("received a join request for an existing node [{}]", node);
} else {
modified = true;
nodeAdded = true;
nodesBuilder.put(node);
for (DiscoveryNode existingNode : currentState.nodes()) {
if (node.address().equals(existingNode.address())) {
@ -929,12 +929,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
}
ClusterState.Builder stateBuilder = ClusterState.builder(currentState);
if (modified) {
if (nodeAdded) {
stateBuilder.nodes(nodesBuilder);
}
currentState = stateBuilder.build();
// eagerly run reroute to apply the node addition
RoutingAllocation.Result result = allocationService.reroute(currentState);
RoutingAllocation.Result result = routingService.getAllocationService().reroute(currentState);
return ClusterState.builder(currentState).routingResult(result).build();
}
@ -964,6 +964,12 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (nodeAdded) {
// we reroute not in the same cluster state update since in certain areas we rely on
// the node to be in the cluster state (sampled from ClusterService#state) to be there, also
// shard transitions need to better be handled in such cases
routingService.reroute("post_node_add");
}
for (Tuple<DiscoveryNode, MembershipAction.JoinCallback> drainedTask : drainedJoinRequests) {
try {
drainedTask.v2().onSuccess();

View File

@ -234,7 +234,7 @@ public class Node implements Releasable {
logger.info("starting ...");
// hack around dependency injection problem (for now...)
injector.getInstance(Discovery.class).setAllocationService(injector.getInstance(AllocationService.class));
injector.getInstance(Discovery.class).setRoutingService(injector.getInstance(RoutingService.class));
for (Class<? extends LifecycleComponent> plugin : pluginsService.services()) {
injector.getInstance(plugin).start();