This commit is contained in:
Areek Zillur 2016-06-17 11:41:23 -04:00
commit 9bca264dcd
8 changed files with 49 additions and 76 deletions

View File

@ -72,10 +72,6 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> {
protected void doClose() { protected void doClose() {
} }
public AllocationService getAllocationService() {
return this.allocationService;
}
/** /**
* Initiates a reroute. * Initiates a reroute.
*/ */

View File

@ -22,7 +22,7 @@ package org.elasticsearch.discovery;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -44,7 +44,7 @@ public interface Discovery extends LifecycleComponent<Discovery> {
* Another hack to solve dep injection problem..., note, this will be called before * Another hack to solve dep injection problem..., note, this will be called before
* any start is called. * any start is called.
*/ */
void setRoutingService(RoutingService routingService); void setAllocationService(AllocationService allocationService);
/** /**
* Publish all the changes to the cluster from the master (can be called just by the master). The publish * Publish all the changes to the cluster from the master (can be called just by the master). The publish

View File

@ -28,7 +28,7 @@ import org.elasticsearch.cluster.IncompatibleClusterStateVersionException;
import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
@ -61,7 +61,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0]; private static final LocalDiscovery[] NO_MEMBERS = new LocalDiscovery[0];
private final ClusterService clusterService; private final ClusterService clusterService;
private RoutingService routingService; private AllocationService allocationService;
private final ClusterName clusterName; private final ClusterName clusterName;
private final DiscoverySettings discoverySettings; private final DiscoverySettings discoverySettings;
@ -83,8 +83,8 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
} }
@Override @Override
public void setRoutingService(RoutingService routingService) { public void setAllocationService(AllocationService allocationService) {
this.routingService = routingService; this.allocationService = allocationService;
} }
@Override @Override
@ -156,7 +156,12 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
nodesBuilder.put(discovery.localNode()); nodesBuilder.put(discovery.localNode());
} }
nodesBuilder.localNodeId(master.localNode().getId()).masterNodeId(master.localNode().getId()); nodesBuilder.localNodeId(master.localNode().getId()).masterNodeId(master.localNode().getId());
return ClusterState.builder(currentState).nodes(nodesBuilder).build(); currentState = ClusterState.builder(currentState).nodes(nodesBuilder).build();
RoutingAllocation.Result result = master.allocationService.reroute(currentState, "node_add");
if (result.changed()) {
currentState = ClusterState.builder(currentState).routingResult(result).build();
}
return currentState;
} }
@Override @Override
@ -164,13 +169,6 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
logger.error("unexpected failure during [{}]", t, source); logger.error("unexpected failure during [{}]", t, source);
} }
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
// we reroute not in the same cluster state update since in certain areas we rely on
// the node to be in the cluster state (sampled from ClusterService#state) to be there, also
// shard transitions need to better be handled in such cases
master.routingService.reroute("post_node_add");
}
}); });
} }
} // else, no master node, the next node that will start will fill things in... } // else, no master node, the next node that will start will fill things in...
@ -226,7 +224,7 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
} }
// reroute here, so we eagerly remove dead nodes from the routing // reroute here, so we eagerly remove dead nodes from the routing
ClusterState updatedState = ClusterState.builder(currentState).nodes(newNodes).build(); ClusterState updatedState = ClusterState.builder(currentState).nodes(newNodes).build();
RoutingAllocation.Result routingResult = master.routingService.getAllocationService().reroute( RoutingAllocation.Result routingResult = master.allocationService.reroute(
ClusterState.builder(updatedState).build(), "elected as master"); ClusterState.builder(updatedState).build(), "elected as master");
return ClusterState.builder(updatedState).routingResult(routingResult).build(); return ClusterState.builder(updatedState).routingResult(routingResult).build();
} }

View File

@ -29,7 +29,7 @@ import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
@ -58,7 +58,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class NodeJoinController extends AbstractComponent { public class NodeJoinController extends AbstractComponent {
private final ClusterService clusterService; private final ClusterService clusterService;
private final RoutingService routingService; private final AllocationService allocationService;
private final ElectMasterService electMaster; private final ElectMasterService electMaster;
private final DiscoverySettings discoverySettings; private final DiscoverySettings discoverySettings;
private final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor(); private final JoinTaskExecutor joinTaskExecutor = new JoinTaskExecutor();
@ -68,10 +68,11 @@ public class NodeJoinController extends AbstractComponent {
private ElectionContext electionContext = null; private ElectionContext electionContext = null;
public NodeJoinController(ClusterService clusterService, RoutingService routingService, ElectMasterService electMaster, DiscoverySettings discoverySettings, Settings settings) { public NodeJoinController(ClusterService clusterService, AllocationService allocationService, ElectMasterService electMaster,
DiscoverySettings discoverySettings, Settings settings) {
super(settings); super(settings);
this.clusterService = clusterService; this.clusterService = clusterService;
this.routingService = routingService; this.allocationService = allocationService;
this.electMaster = electMaster; this.electMaster = electMaster;
this.discoverySettings = discoverySettings; this.discoverySettings = discoverySettings;
} }
@ -406,21 +407,7 @@ public class NodeJoinController extends AbstractComponent {
ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks()) ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(currentState.blocks())
.removeGlobalBlock(discoverySettings.getNoMasterBlock()).build(); .removeGlobalBlock(discoverySettings.getNoMasterBlock()).build();
newState.blocks(clusterBlocks); newState.blocks(clusterBlocks);
newState.nodes(nodesBuilder);
nodesChanged = true; nodesChanged = true;
// reroute now to remove any dead nodes (master may have stepped down when they left and didn't update the routing table)
// Note: also do it now to avoid assigning shards to these nodes. We will have another reroute after the cluster
// state is published.
// TODO: this publishing of a cluster state with no nodes assigned to joining nodes shouldn't be needed anymore. remove.
final ClusterState tmpState = newState.build();
RoutingAllocation.Result result = routingService.getAllocationService().reroute(tmpState, "nodes joined");
newState = ClusterState.builder(tmpState);
if (result.changed()) {
newState.routingResult(result);
}
nodesBuilder = DiscoveryNodes.builder(tmpState.nodes());
} }
if (nodesBuilder.isLocalNodeElectedMaster() == false) { if (nodesBuilder.isLocalNodeElectedMaster() == false) {
@ -439,7 +426,8 @@ public class NodeJoinController extends AbstractComponent {
for (DiscoveryNode existingNode : currentNodes) { for (DiscoveryNode existingNode : currentNodes) {
if (node.getAddress().equals(existingNode.getAddress())) { if (node.getAddress().equals(existingNode.getAddress())) {
nodesBuilder.remove(existingNode.getId()); nodesBuilder.remove(existingNode.getId());
logger.warn("received join request from node [{}], but found existing node {} with same address, removing existing node", node, existingNode); logger.warn("received join request from node [{}], but found existing node {} with same address, " +
"removing existing node", node, existingNode);
} }
} }
} }
@ -448,6 +436,12 @@ public class NodeJoinController extends AbstractComponent {
if (nodesChanged) { if (nodesChanged) {
newState.nodes(nodesBuilder); newState.nodes(nodesBuilder);
final ClusterState tmpState = newState.build();
RoutingAllocation.Result result = allocationService.reroute(tmpState, "node_join");
newState = ClusterState.builder(tmpState);
if (result.changed()) {
newState.routingResult(result);
}
} }
// we must return a new cluster state instance to force publishing. This is important // we must return a new cluster state instance to force publishing. This is important
@ -463,13 +457,6 @@ public class NodeJoinController extends AbstractComponent {
@Override @Override
public void clusterStatePublished(ClusterChangedEvent event) { public void clusterStatePublished(ClusterChangedEvent event) {
if (event.nodesDelta().hasChanges()) {
// we reroute not in the same cluster state update since in certain areas we rely on
// the node to be in the cluster state (sampled from ClusterService#state) to be there, also
// shard transitions need to better be handled in such cases
routingService.reroute("post_node_add");
}
NodeJoinController.this.electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state()); NodeJoinController.this.electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state());
} }
} }

View File

@ -32,7 +32,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
@ -113,7 +113,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
private final TransportService transportService; private final TransportService transportService;
private final ClusterService clusterService; private final ClusterService clusterService;
private RoutingService routingService; private AllocationService allocationService;
private final ClusterName clusterName; private final ClusterName clusterName;
private final DiscoverySettings discoverySettings; private final DiscoverySettings discoverySettings;
private final ZenPingService pingService; private final ZenPingService pingService;
@ -146,7 +146,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
/** counts the time this node has joined the cluster or have elected it self as master */ /** counts the time this node has joined the cluster or have elected it self as master */
private final AtomicLong clusterJoinsCounter = new AtomicLong(); private final AtomicLong clusterJoinsCounter = new AtomicLong();
// must initialized in doStart(), when we have the routingService set // must initialized in doStart(), when we have the allocationService set
private volatile NodeJoinController nodeJoinController; private volatile NodeJoinController nodeJoinController;
@Inject @Inject
@ -206,8 +206,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
} }
@Override @Override
public void setRoutingService(RoutingService routingService) { public void setAllocationService(AllocationService allocationService) {
this.routingService = routingService; this.allocationService = allocationService;
} }
@Override @Override
@ -215,7 +215,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
nodesFD.setLocalNode(clusterService.localNode()); nodesFD.setLocalNode(clusterService.localNode());
joinThreadControl.start(); joinThreadControl.start();
pingService.start(); pingService.start();
this.nodeJoinController = new NodeJoinController(clusterService, routingService, electMaster, discoverySettings, settings); this.nodeJoinController = new NodeJoinController(clusterService, allocationService, electMaster, discoverySettings, settings);
} }
@Override @Override
@ -516,8 +516,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return rejoin(currentState, "not enough master nodes"); return rejoin(currentState, "not enough master nodes");
} }
// eagerly run reroute to remove dead nodes from routing table // eagerly run reroute to remove dead nodes from routing table
RoutingAllocation.Result routingResult = routingService.getAllocationService().reroute( RoutingAllocation.Result routingResult = allocationService.reroute(ClusterState.builder(currentState).build(),
ClusterState.builder(currentState).build(),
"[" + node + "] left"); "[" + node + "] left");
return ClusterState.builder(currentState).routingResult(routingResult).build(); return ClusterState.builder(currentState).routingResult(routingResult).build();
} }
@ -561,7 +560,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return rejoin(currentState, "not enough master nodes"); return rejoin(currentState, "not enough master nodes");
} }
// eagerly run reroute to remove dead nodes from routing table // eagerly run reroute to remove dead nodes from routing table
RoutingAllocation.Result routingResult = routingService.getAllocationService().reroute( RoutingAllocation.Result routingResult = allocationService.reroute(
ClusterState.builder(currentState).build(), ClusterState.builder(currentState).build(),
"[" + node + "] failed"); "[" + node + "] failed");
return ClusterState.builder(currentState).routingResult(routingResult).build(); return ClusterState.builder(currentState).routingResult(routingResult).build();

View File

@ -37,6 +37,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodeService; import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.Lifecycle;
@ -321,7 +322,7 @@ public class Node implements Closeable {
ESLogger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings)); ESLogger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
logger.info("starting ..."); logger.info("starting ...");
// hack around dependency injection problem (for now...) // hack around dependency injection problem (for now...)
injector.getInstance(Discovery.class).setRoutingService(injector.getInstance(RoutingService.class)); injector.getInstance(Discovery.class).setAllocationService(injector.getInstance(AllocationService.class));
for (Class<? extends LifecycleComponent> plugin : pluginsService.nodeServices()) { for (Class<? extends LifecycleComponent> plugin : pluginsService.nodeServices()) {
injector.getInstance(plugin).start(); injector.getInstance(plugin).start();
} }

View File

@ -24,7 +24,6 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
@ -99,7 +98,7 @@ public class NodeJoinControllerTests extends ESTestCase {
// make sure we have a master // make sure we have a master
setState(clusterService, ClusterState.builder(clusterService.state()).nodes( setState(clusterService, ClusterState.builder(clusterService.state()).nodes(
DiscoveryNodes.builder(initialNodes).masterNodeId(localNode.getId()))); DiscoveryNodes.builder(initialNodes).masterNodeId(localNode.getId())));
nodeJoinController = new NodeJoinController(clusterService, new NoopRoutingService(Settings.EMPTY), nodeJoinController = new NodeJoinController(clusterService, new NoopAllocationService(Settings.EMPTY),
new ElectMasterService(Settings.EMPTY, Version.CURRENT), new ElectMasterService(Settings.EMPTY, Version.CURRENT),
new DiscoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)), new DiscoverySettings(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)),
Settings.EMPTY); Settings.EMPTY);
@ -298,7 +297,8 @@ public class NodeJoinControllerTests extends ESTestCase {
} }
logger.debug("--> asserting master election didn't finish yet"); logger.debug("--> asserting master election didn't finish yet");
assertThat("election finished after [" + initialJoins + "] master nodes but required joins is [" + requiredJoins + "]", electionFuture.isDone(), equalTo(false)); assertThat("election finished after [" + initialJoins + "] master nodes but required joins is [" + requiredJoins + "]",
electionFuture.isDone(), equalTo(false));
final int finalJoins = requiredJoins - initialJoins + randomInt(5); final int finalJoins = requiredJoins - initialJoins + randomInt(5);
nodesToJoin.clear(); nodesToJoin.clear();
@ -374,7 +374,8 @@ public class NodeJoinControllerTests extends ESTestCase {
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueMillis(1), new NodeJoinController.ElectionCallback() { nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueMillis(1), new NodeJoinController.ElectionCallback() {
@Override @Override
public void onElectedAsMaster(ClusterState state) { public void onElectedAsMaster(ClusterState state) {
assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(), equalTo(true)); assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(),
equalTo(true));
latch.countDown(); latch.countDown();
} }
@ -492,7 +493,8 @@ public class NodeJoinControllerTests extends ESTestCase {
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.ElectionCallback() { nodeJoinController.waitToBeElectedAsMaster(requiredJoins, TimeValue.timeValueHours(30), new NodeJoinController.ElectionCallback() {
@Override @Override
public void onElectedAsMaster(ClusterState state) { public void onElectedAsMaster(ClusterState state) {
assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(), equalTo(true)); assertThat("callback called with elected as master, but state disagrees", state.nodes().isLocalNodeElectedMaster(),
equalTo(true));
latch.countDown(); latch.countDown();
} }
@ -516,18 +518,6 @@ public class NodeJoinControllerTests extends ESTestCase {
} }
static class NoopRoutingService extends RoutingService {
public NoopRoutingService(Settings settings) {
super(settings, null, new NoopAllocationService(settings));
}
@Override
protected void performReroute(String reason) {
}
}
static class NoopAllocationService extends AllocationService { static class NoopAllocationService extends AllocationService {
public NoopAllocationService(Settings settings) { public NoopAllocationService(Settings settings) {
@ -535,12 +525,14 @@ public class NodeJoinControllerTests extends ESTestCase {
} }
@Override @Override
public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List<? extends ShardRouting> startedShards, boolean withReroute) { public RoutingAllocation.Result applyStartedShards(ClusterState clusterState, List<? extends ShardRouting> startedShards,
boolean withReroute) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
} }
@Override @Override
public RoutingAllocation.Result applyFailedShards(ClusterState clusterState, List<FailedRerouteAllocation.FailedShard> failedShards) { public RoutingAllocation.Result applyFailedShards(ClusterState clusterState,
List<FailedRerouteAllocation.FailedShard> failedShards) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData()); return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
} }

View File

@ -20,7 +20,7 @@ package org.elasticsearch.test;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.component.Lifecycle; import org.elasticsearch.common.component.Lifecycle;
import org.elasticsearch.common.component.LifecycleListener; import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
@ -41,7 +41,7 @@ public class NoopDiscovery implements Discovery {
} }
@Override @Override
public void setRoutingService(RoutingService routingService) { public void setAllocationService(AllocationService allocationService) {
} }