Defer reroute when nodes join (#42855)

Today the master eagerly reroutes the cluster as part of processing node joins.
However, it is not necessary to do this reroute straight away, and it is
sometimes preferable to defer it until later. For instance, when the master
wins its election it processes joins and performs a reroute, but it would be
better to defer the reroute until after the master has become properly
established.

This change defers this reroute into a separate task, and batches multiple such
tasks together.
This commit is contained in:
David Turner 2019-06-11 12:16:01 +01:00
parent 5d3849215b
commit 04cde1d6e2
18 changed files with 92 additions and 36 deletions

View File

@ -33,7 +33,6 @@ import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.routing.DelayedAllocationService;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
@ -274,7 +273,6 @@ public class ClusterModule extends AbstractModule {
bind(MetaDataUpdateSettingsService.class).asEagerSingleton();
bind(MetaDataIndexTemplateService.class).asEagerSingleton();
bind(IndexNameExpressionResolver.class).toInstance(indexNameExpressionResolver);
bind(RoutingService.class).asEagerSingleton();
bind(DelayedAllocationService.class).asEagerSingleton();
bind(ShardStateAction.class).asEagerSingleton();
bind(NodeMappingRefreshAction.class).asEagerSingleton();

View File

@ -82,6 +82,7 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
@ -144,10 +145,17 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
private JoinHelper.JoinAccumulator joinAccumulator;
private Optional<CoordinatorPublication> currentPublication = Optional.empty();
/**
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
* @param onJoinValidators A collection of join validators to restrict which nodes may join the cluster.
* @param reroute A callback to call when the membership of the cluster has changed, to recalculate the assignment of shards. In
* production code this calls {@link org.elasticsearch.cluster.routing.RoutingService#reroute(String)}.
*/
public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
Supplier<CoordinationState.PersistedState> persistedStateSupplier, SeedHostsProvider seedHostsProvider,
ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random) {
ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random,
Consumer<String> reroute) {
this.settings = settings;
this.transportService = transportService;
this.masterService = masterService;
@ -155,7 +163,8 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.singleNodeDiscovery = DiscoveryModule.SINGLE_NODE_DISCOVERY_TYPE.equals(DiscoveryModule.DISCOVERY_TYPE_SETTING.get(settings));
this.joinHelper = new JoinHelper(settings, allocationService, masterService, transportService,
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators);
this::getCurrentTerm, this::getStateForMasterService, this::handleJoinRequest, this::joinLeaderInTerm, this.onJoinValidators,
reroute);
this.persistedStateSupplier = persistedStateSupplier;
this.noMasterBlockService = new NoMasterBlockService(settings, clusterSettings);
this.lastKnownLeader = Optional.empty();

View File

@ -64,6 +64,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
@ -93,11 +94,11 @@ public class JoinHelper {
public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators) {
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, Consumer<String> reroute) {
this.masterService = masterService;
this.transportService = transportService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);
this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger) {
this.joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, reroute) {
@Override
public ClusterTasksResult<JoinTaskExecutor.Task> execute(ClusterState currentState, List<JoinTaskExecutor.Task> joiningTasks)

View File

@ -38,6 +38,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
@ -46,6 +47,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
private final AllocationService allocationService;
private final Logger logger;
private final Consumer<String> reroute;
private final int minimumMasterNodesOnLocalNode;
@ -84,10 +86,11 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
private static final String FINISH_ELECTION_TASK_REASON = "_FINISH_ELECTION_";
}
public JoinTaskExecutor(Settings settings, AllocationService allocationService, Logger logger) {
public JoinTaskExecutor(Settings settings, AllocationService allocationService, Logger logger, Consumer<String> reroute) {
this.allocationService = allocationService;
this.logger = logger;
minimumMasterNodesOnLocalNode = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
this.reroute = reroute;
}
@Override
@ -151,8 +154,8 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
results.success(joinTask);
}
if (nodesChanged) {
newState.nodes(nodesBuilder);
return results.build(allocationService.reroute(newState.build(), "node_join"));
reroute.accept("post-join reroute");
return results.build(allocationService.adaptAutoExpandReplicas(newState.nodes(nodesBuilder).build()));
} else {
// we must return a new cluster state instance to force publishing. This is important
// for the joining node to finalize its join and set us as a master

View File

@ -240,7 +240,7 @@ public class AllocationService {
* Checks if the are replicas with the auto-expand feature that need to be adapted.
* Returns an updated cluster state if changes were necessary, or the identical cluster if no changes were required.
*/
private ClusterState adaptAutoExpandReplicas(ClusterState clusterState) {
public ClusterState adaptAutoExpandReplicas(ClusterState clusterState) {
final Map<Integer, List<String>> autoExpandReplicaChanges =
AutoExpandReplicas.getAutoExpandReplicaChanges(clusterState.metaData(), clusterState.nodes());
if (autoExpandReplicaChanges.isEmpty()) {

View File

@ -24,6 +24,7 @@ import org.apache.logging.log4j.LogManager;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.ClusterApplierService;
@ -84,7 +85,8 @@ public class DiscoveryModule {
public DiscoveryModule(Settings settings, ThreadPool threadPool, TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry, NetworkService networkService, MasterService masterService,
ClusterApplier clusterApplier, ClusterSettings clusterSettings, List<DiscoveryPlugin> plugins,
AllocationService allocationService, Path configFile, GatewayMetaState gatewayMetaState) {
AllocationService allocationService, Path configFile, GatewayMetaState gatewayMetaState,
RoutingService routingService) {
final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
hostProviders.put("settings", () -> new SettingsBasedSeedHostsProvider(settings, transportService));
@ -134,10 +136,10 @@ public class DiscoveryModule {
settings, clusterSettings,
transportService, namedWriteableRegistry, allocationService, masterService,
() -> gatewayMetaState.getPersistedState(settings, (ClusterApplierService) clusterApplier), seedHostsProvider,
clusterApplier, joinValidators, new Random(Randomness.get().nextLong()));
clusterApplier, joinValidators, new Random(Randomness.get().nextLong()), routingService::reroute);
} else if (ZEN_DISCOVERY_TYPE.equals(discoveryType)) {
discovery = new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
clusterSettings, seedHostsProvider, allocationService, joinValidators, gatewayMetaState);
clusterSettings, seedHostsProvider, allocationService, joinValidators, gatewayMetaState, routingService::reroute);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
}

View File

@ -43,6 +43,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
/**
* This class processes incoming join request (passed zia {@link ZenDiscovery}). Incoming nodes
@ -61,9 +62,9 @@ public class NodeJoinController {
public NodeJoinController(Settings settings, MasterService masterService, AllocationService allocationService,
ElectMasterService electMaster) {
ElectMasterService electMaster, Consumer<String> reroute) {
this.masterService = masterService;
joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger) {
joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, reroute) {
@Override
public void clusterStatePublished(ClusterChangedEvent event) {
electMaster.logMinimumMasterNodesWarningIfNecessary(event.previousState(), event.state());

View File

@ -164,7 +164,8 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
public ZenDiscovery(Settings settings, ThreadPool threadPool, TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry, MasterService masterService, ClusterApplier clusterApplier,
ClusterSettings clusterSettings, SeedHostsProvider hostsProvider, AllocationService allocationService,
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, GatewayMetaState gatewayMetaState) {
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, GatewayMetaState gatewayMetaState,
Consumer<String> reroute) {
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.masterService = masterService;
this.clusterApplier = clusterApplier;
@ -225,7 +226,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
this.membership = new MembershipAction(transportService, new MembershipListener(), onJoinValidators);
this.joinThreadControl = new JoinThreadControl();
this.nodeJoinController = new NodeJoinController(settings, masterService, allocationService, electMaster);
this.nodeJoinController = new NodeJoinController(settings, masterService, allocationService, electMaster, reroute);
this.nodeRemovalExecutor = new ZenNodeRemovalClusterStateTaskExecutor(allocationService, electMaster, this::submitRejoin, logger);
masterService.setClusterStateSupplier(this::clusterState);

View File

@ -492,10 +492,11 @@ public class Node implements Closeable {
RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(),
metaDataCreateIndexService, metaDataIndexUpgradeService, clusterService.getClusterSettings());
final RoutingService routingService = new RoutingService(clusterService, clusterModule.getAllocationService());
final DiscoveryModule discoveryModule = new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry,
networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState);
clusterModule.getAllocationService(), environment.configFile(), gatewayMetaState, routingService);
this.nodeService = new NodeService(settings, threadPool, monitorService, discoveryModule.getDiscovery(),
transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
@ -570,6 +571,7 @@ public class Node implements Closeable {
b.bind(SnapshotShardsService.class).toInstance(snapshotShardsService);
b.bind(TransportNodesSnapshotsStatus.class).toInstance(nodesSnapshotsStatus);
b.bind(RestoreService.class).toInstance(restoreService);
b.bind(RoutingService.class).toInstance(routingService);
}
);
injector = modules.createInjector();

View File

@ -20,9 +20,12 @@
package org.elasticsearch.cluster;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
@ -35,13 +38,16 @@ import static org.elasticsearch.client.Requests.createIndexRequest;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.hamcrest.Matchers.equalTo;
@ClusterScope(scope= Scope.TEST, numDataNodes =0)
@ClusterScope(scope = Scope.TEST, numDataNodes = 0)
public class SimpleDataNodesIT extends ESIntegTestCase {
public void testDataNodes() throws Exception {
private static final String SOURCE = "{\"type1\":{\"id\":\"1\",\"name\":\"test\"}}";
public void testIndexingBeforeAndAfterDataNodesStart() {
internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build());
client().admin().indices().create(createIndexRequest("test").waitForActiveShards(ActiveShardCount.NONE)).actionGet();
try {
client().index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"), XContentType.JSON)
client().index(Requests.indexRequest("test").id("1").source(SOURCE, XContentType.JSON)
.timeout(timeValueSeconds(1))).actionGet();
fail("no allocation should happen");
} catch (UnavailableShardsException e) {
@ -54,7 +60,7 @@ public class SimpleDataNodesIT extends ESIntegTestCase {
// still no shard should be allocated
try {
client().index(Requests.indexRequest("test").type("type1").id("1").source(source("1", "test"), XContentType.JSON)
client().index(Requests.indexRequest("test").id("1").source(SOURCE, XContentType.JSON)
.timeout(timeValueSeconds(1))).actionGet();
fail("no allocation should happen");
} catch (UnavailableShardsException e) {
@ -66,13 +72,43 @@ public class SimpleDataNodesIT extends ESIntegTestCase {
assertThat(client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForNodes("3")
.setLocal(true).execute().actionGet().isTimedOut(), equalTo(false));
IndexResponse indexResponse = client().index(Requests.indexRequest("test").type("type1").id("1")
.source(source("1", "test"), XContentType.JSON)).actionGet();
IndexResponse indexResponse = client().index(Requests.indexRequest("test").id("1")
.source(SOURCE, XContentType.JSON)).actionGet();
assertThat(indexResponse.getId(), equalTo("1"));
assertThat(indexResponse.getType(), equalTo("type1"));
}
private String source(String id, String nameValue) {
return "{ \"type1\" : { \"id\" : \"" + id + "\", \"name\" : \"" + nameValue + "\" } }";
public void testShardsAllocatedAfterDataNodesStart() {
internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build());
client().admin().indices().create(createIndexRequest("test")
.settings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)).waitForActiveShards(ActiveShardCount.NONE))
.actionGet();
final ClusterHealthResponse healthResponse1 = client().admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID).execute().actionGet();
assertThat(healthResponse1.isTimedOut(), equalTo(false));
assertThat(healthResponse1.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); // TODO should be RED, see #41073
assertThat(healthResponse1.getActiveShards(), equalTo(0));
internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), true).build());
assertThat(client().admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID).setWaitForNodes("2").setWaitForGreenStatus().execute().actionGet().isTimedOut(),
equalTo(false));
}
public void testAutoExpandReplicasAdjustedWhenDataNodeJoins() {
internalCluster().startNode(Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).build());
client().admin().indices().create(createIndexRequest("test")
.settings(Settings.builder().put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-all"))
.waitForActiveShards(ActiveShardCount.NONE))
.actionGet();
final ClusterHealthResponse healthResponse1 = client().admin().cluster().prepareHealth()
.setWaitForEvents(Priority.LANGUID).execute().actionGet();
assertThat(healthResponse1.isTimedOut(), equalTo(false));
assertThat(healthResponse1.getStatus(), equalTo(ClusterHealthStatus.YELLOW)); // TODO should be RED, see #41073
assertThat(healthResponse1.getActiveShards(), equalTo(0));
internalCluster().startNode();
internalCluster().startNode();
client().admin().cluster().prepareReroute().setRetryFailed(true).get();
}
}

View File

@ -1924,7 +1924,7 @@ public class CoordinatorTests extends ESTestCase {
final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
allocationService, masterService, this::getPersistedState,
Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get());
Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), s -> {});
masterService.setClusterStatePublisher(coordinator);
final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,
deterministicTaskQueue.getThreadPool(this::onNode), null, coordinator);

View File

@ -58,7 +58,7 @@ public class JoinHelperTests extends ESTestCase {
x -> localNode, null, Collections.emptySet());
JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> null,
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList());
Collections.emptyList(), s -> {});
transportService.start();
DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT);
@ -164,7 +164,7 @@ public class JoinHelperTests extends ESTestCase {
x -> localNode, null, Collections.emptySet());
new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> localClusterState,
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList()); // registers request handler
Collections.emptyList(), s -> {}); // registers request handler
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -176,7 +176,7 @@ public class NodeJoinTests extends ESTestCase {
() -> new InMemoryPersistedState(term, initialState), r -> emptyList(),
new NoOpClusterApplier(),
Collections.emptyList(),
random);
random, s -> {});
transportService.start();
transportService.acceptIncomingRequests();
transport = capturingTransport;

View File

@ -22,6 +22,7 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.service.ClusterApplier;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@ -90,7 +91,8 @@ public class DiscoveryModuleTests extends ESTestCase {
private DiscoveryModule newModule(Settings settings, List<DiscoveryPlugin> plugins) {
return new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry, null, masterService,
clusterApplier, clusterSettings, plugins, null, createTempDir().toAbsolutePath(), gatewayMetaState);
clusterApplier, clusterSettings, plugins, null, createTempDir().toAbsolutePath(), gatewayMetaState,
mock(RoutingService.class));
}
public void testDefaults() {

View File

@ -142,7 +142,7 @@ public class NodeJoinControllerTests extends ESTestCase {
}
masterService = ClusterServiceUtils.createMasterService(threadPool, initialState);
nodeJoinController = new NodeJoinController(Settings.EMPTY, masterService, createAllocationService(Settings.EMPTY),
new ElectMasterService(Settings.EMPTY));
new ElectMasterService(Settings.EMPTY), s -> {});
}
public void testSimpleJoinAccumulation() throws InterruptedException, ExecutionException {

View File

@ -371,7 +371,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
masterService, clusterApplier, clusterSettings, hostsResolver -> Collections.emptyList(),
ESAllocationTestCase.createAllocationService(),
Collections.emptyList(), mock(GatewayMetaState.class));
Collections.emptyList(), mock(GatewayMetaState.class), s -> {});
zenDiscovery.start();
return zenDiscovery;
}

View File

@ -213,7 +213,7 @@ public class ClusterStateChanges {
transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver);
nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger);
joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, s -> {});
}
public ClusterState createIndex(ClusterState state, CreateIndexRequest request) {

View File

@ -1244,7 +1244,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
allocationService, masterService, () -> persistedState,
hostsResolver -> testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode())
.map(n -> n.node.getAddress()).collect(Collectors.toList()),
clusterService.getClusterApplierService(), Collections.emptyList(), random());
clusterService.getClusterApplierService(), Collections.emptyList(), random(),
new RoutingService(clusterService, allocationService)::reroute);
masterService.setClusterStatePublisher(coordinator);
coordinator.start();
masterService.start();