eagerly reroute when a node leaves the cluster
This commit is contained in:
parent
a1ef1f02cc
commit
1be84c273b
|
@ -23,6 +23,7 @@ import org.elasticsearch.cluster.ClusterState;
|
|||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.common.component.LifecycleComponent;
|
||||
import org.elasticsearch.common.inject.internal.Nullable;
|
||||
import org.elasticsearch.node.service.NodeService;
|
||||
|
@ -50,6 +51,12 @@ public interface Discovery extends LifecycleComponent<Discovery> {
|
|||
*/
|
||||
void setNodeService(@Nullable NodeService nodeService);
|
||||
|
||||
/**
|
||||
* Another hack to solve dep injection problem..., note, this will be called before
|
||||
* any start is called.
|
||||
*/
|
||||
void setAllocationService(AllocationService allocationService);
|
||||
|
||||
/**
|
||||
* Publish all the changes to the cluster from the master (can be called just by the master). The publish
|
||||
* process should not publish this state to the master as well! (the master is sending it...).
|
||||
|
|
|
@ -26,6 +26,8 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodeService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.inject.internal.Nullable;
|
||||
|
@ -58,6 +60,8 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
|
|||
|
||||
private final DiscoveryNodeService discoveryNodeService;
|
||||
|
||||
private AllocationService allocationService;
|
||||
|
||||
private final ClusterName clusterName;
|
||||
|
||||
private DiscoveryNode localNode;
|
||||
|
@ -88,6 +92,11 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
|
|||
// nothing to do here
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAllocationService(AllocationService allocationService) {
|
||||
this.allocationService = allocationService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() throws ElasticSearchException {
|
||||
synchronized (clusterGroups) {
|
||||
|
@ -209,7 +218,10 @@ public class LocalDiscovery extends AbstractLifecycleComponent<Discovery> implem
|
|||
if (delta.added()) {
|
||||
logger.warn("No new nodes should be created when a new discovery view is accepted");
|
||||
}
|
||||
return newClusterStateBuilder().state(currentState).nodes(newNodes).build();
|
||||
// reroute here, so we eagerly remove dead nodes from the routing
|
||||
ClusterState updatedState = newClusterStateBuilder().state(currentState).nodes(newNodes).build();
|
||||
RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(updatedState).build());
|
||||
return newClusterStateBuilder().state(updatedState).routingResult(routingResult).build();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
|
|
@ -31,6 +31,8 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNodeService;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.elasticsearch.common.UUID;
|
||||
import org.elasticsearch.common.component.AbstractLifecycleComponent;
|
||||
import org.elasticsearch.common.component.Lifecycle;
|
||||
|
@ -78,6 +80,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|||
|
||||
private final ClusterService clusterService;
|
||||
|
||||
private AllocationService allocationService;
|
||||
|
||||
private final ClusterName clusterName;
|
||||
|
||||
private final DiscoveryNodeService discoveryNodeService;
|
||||
|
@ -161,6 +165,11 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|||
this.nodeService = nodeService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAllocationService(AllocationService allocationService) {
|
||||
this.allocationService = allocationService;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() throws ElasticSearchException {
|
||||
Map<String, String> nodeAttributes = discoveryNodeService.buildAttributes();
|
||||
|
@ -370,7 +379,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|||
if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) {
|
||||
return rejoin(currentState, "not enough master nodes");
|
||||
}
|
||||
return currentState;
|
||||
// eagerly run reroute to remove dead nodes from routing table
|
||||
RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(currentState).build());
|
||||
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
|
||||
}
|
||||
});
|
||||
} else {
|
||||
|
@ -399,7 +410,9 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
|
|||
if (!electMaster.hasEnoughMasterNodes(currentState.nodes())) {
|
||||
return rejoin(currentState, "not enough master nodes");
|
||||
}
|
||||
return currentState;
|
||||
// eagerly run reroute to remove dead nodes from routing table
|
||||
RoutingAllocation.Result routingResult = allocationService.reroute(newClusterStateBuilder().state(currentState).build());
|
||||
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -32,6 +32,7 @@ import org.elasticsearch.cluster.ClusterModule;
|
|||
import org.elasticsearch.cluster.ClusterNameModule;
|
||||
import org.elasticsearch.cluster.ClusterService;
|
||||
import org.elasticsearch.cluster.routing.RoutingService;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.common.CacheRecycler;
|
||||
import org.elasticsearch.common.StopWatch;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
|
@ -50,6 +51,7 @@ import org.elasticsearch.common.settings.ImmutableSettings;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.settings.SettingsModule;
|
||||
import org.elasticsearch.common.util.concurrent.ThreadLocals;
|
||||
import org.elasticsearch.discovery.Discovery;
|
||||
import org.elasticsearch.discovery.DiscoveryModule;
|
||||
import org.elasticsearch.discovery.DiscoveryService;
|
||||
import org.elasticsearch.env.Environment;
|
||||
|
@ -187,6 +189,9 @@ public final class InternalNode implements Node {
|
|||
ESLogger logger = Loggers.getLogger(Node.class, settings.get("name"));
|
||||
logger.info("{{}}[{}]: starting ...", Version.CURRENT, JvmInfo.jvmInfo().pid());
|
||||
|
||||
// hack around dependency injection problem (for now...)
|
||||
injector.getInstance(Discovery.class).setAllocationService(injector.getInstance(AllocationService.class));
|
||||
|
||||
for (Class<? extends LifecycleComponent> plugin : pluginsService.services()) {
|
||||
injector.getInstance(plugin).start();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue