Avoid parallel reroutes in DiskThresholdMonitor (#43381)

Today the `DiskThresholdMonitor` limits the frequency with which it submits
reroute tasks, but it might still submit these tasks faster than the master can
process them if, for instance, each reroute takes over 60 seconds. This causes
a problem since the reroute task runs with priority `IMMEDIATE` and is always
scheduled when there is a node over the high watermark, so this can starve any
other pending tasks on the master.

This change avoids further updates from the monitor while its last task(s) are
still in progress, and it measures the time of each update from the completion
time of the reroute task rather than its start time, to allow a larger window
for other tasks to run.

It also now makes use of the `RoutingService` to submit the reroute task, in
order to batch this task with any other pending reroutes. It enhances the
`RoutingService` to notify its listeners on completion.

Fixes #40174
Relates #42559
This commit is contained in:
David Turner 2019-06-30 16:44:57 +01:00
parent 55b3ec8d7b
commit fca7a19713
22 changed files with 503 additions and 152 deletions

View File

@ -131,13 +131,13 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
logger.trace("I have been elected master, scheduling a ClusterInfoUpdateJob");
}
// Submit a job that will start after DEFAULT_STARTING_INTERVAL, and reschedule itself after running
// Submit a job that will reschedule itself after running
threadPool.scheduleUnlessShuttingDown(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob());
try {
if (clusterService.state().getNodes().getDataNodes().size() > 1) {
// Submit an info update job to be run immediately
threadPool.executor(executorName()).execute(() -> maybeRefresh());
threadPool.executor(executorName()).execute(this::maybeRefresh);
}
} catch (EsRejectedExecutionException ex) {
logger.debug("Couldn't schedule cluster info update task - node might be shutting down", ex);
@ -173,7 +173,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
if (logger.isDebugEnabled()) {
logger.debug("data node was added, retrieving new cluster info");
}
threadPool.executor(executorName()).execute(() -> maybeRefresh());
threadPool.executor(executorName()).execute(this::maybeRefresh);
}
if (this.isMaster && event.nodesRemoved()) {
@ -316,7 +316,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
ShardStats[] stats = indicesStatsResponse.getShards();
ImmutableOpenMap.Builder<String, Long> newShardSizes = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath = ImmutableOpenMap.builder();
buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath, clusterService.state());
buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath);
shardSizes = newShardSizes.build();
shardRoutingToDataPath = newShardRoutingToDataPath.build();
}
@ -365,7 +365,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode
}
static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder<String, Long> newShardSizes,
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath, ClusterState state) {
ImmutableOpenMap.Builder<ShardRouting, String> newShardRoutingToDataPath) {
for (ShardStats s : stats) {
newShardRoutingToDataPath.put(s.getShardRouting(), s.getDataPath());
long size = s.getStats().getStore().sizeInBytes();

View File

@ -382,7 +382,9 @@ public class ShardStateAction {
if (logger.isTraceEnabled()) {
logger.trace("{}, scheduling a reroute", reason);
}
routingService.reroute(reason);
routingService.reroute(reason, ActionListener.wrap(
r -> logger.trace("{}, reroute completed", reason),
e -> logger.debug(new ParameterizedMessage("{}, reroute failed", reason), e)));
}
}
}

View File

@ -83,7 +83,6 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -152,13 +151,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery
* @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}.
* @param onJoinValidators A collection of join validators to restrict which nodes may join the cluster.
* @param reroute A callback to call when the membership of the cluster has changed, to recalculate the assignment of shards. In
* production code this calls {@link org.elasticsearch.cluster.routing.RoutingService#reroute(String)}.
* production code this calls
* {@link org.elasticsearch.cluster.routing.RoutingService#reroute(String, ActionListener)}.
*/
public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService,
Supplier<CoordinationState.PersistedState> persistedStateSupplier, SeedHostsProvider seedHostsProvider,
ClusterApplier clusterApplier, Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, Random random,
Consumer<String> reroute, ElectionStrategy electionStrategy) {
BiConsumer<String, ActionListener<Void>> reroute, ElectionStrategy electionStrategy) {
this.settings = settings;
this.transportService = transportService;
this.masterService = masterService;

View File

@ -64,7 +64,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
@ -91,10 +90,10 @@ public class JoinHelper {
private AtomicReference<FailedJoinAttempt> lastFailedJoinAttempt = new AtomicReference<>();
public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService,
TransportService transportService, LongSupplier currentTermSupplier, Supplier<ClusterState> currentStateSupplier,
BiConsumer<JoinRequest, JoinCallback> joinHandler, Function<StartJoinRequest, Join> joinLeaderInTerm,
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, Consumer<String> reroute) {
Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators, BiConsumer<String, ActionListener<Void>> reroute) {
this.masterService = masterService;
this.transportService = transportService;
this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings);

View File

@ -20,6 +20,7 @@ package org.elasticsearch.cluster.coordination;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.NotMasterException;
@ -38,7 +39,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;
@ -47,7 +47,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
private final AllocationService allocationService;
private final Logger logger;
private final Consumer<String> reroute;
private final BiConsumer<String, ActionListener<Void>> reroute;
private final int minimumMasterNodesOnLocalNode;
@ -86,7 +86,8 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
private static final String FINISH_ELECTION_TASK_REASON = "_FINISH_ELECTION_";
}
public JoinTaskExecutor(Settings settings, AllocationService allocationService, Logger logger, Consumer<String> reroute) {
public JoinTaskExecutor(Settings settings, AllocationService allocationService, Logger logger,
BiConsumer<String, ActionListener<Void>> reroute) {
this.allocationService = allocationService;
this.logger = logger;
minimumMasterNodesOnLocalNode = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings);
@ -154,7 +155,10 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
results.success(joinTask);
}
if (nodesChanged) {
reroute.accept("post-join reroute");
reroute.accept("post-join reroute", ActionListener.wrap(
r -> logger.trace("post-join reroute completed"),
e -> logger.debug("post-join reroute failed", e)));
return results.build(allocationService.adaptAutoExpandReplicas(newState.nodes(nodesBuilder).build()));
} else {
// we must return a new cluster state instance to force publishing. This is important

View File

@ -22,16 +22,20 @@ package org.elasticsearch.cluster.routing;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.NotMasterException;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
/**
* A {@link RoutingService} listens to clusters state. When this service
@ -51,14 +55,16 @@ public class RoutingService extends AbstractLifecycleComponent {
private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute";
private final ClusterService clusterService;
private final AllocationService allocationService;
private final BiFunction<ClusterState, String, ClusterState> reroute;
private AtomicBoolean rerouting = new AtomicBoolean();
private final Object mutex = new Object();
@Nullable // null if no reroute is currently pending
private PlainListenableActionFuture<Void> pendingRerouteListeners;
@Inject
public RoutingService(ClusterService clusterService, AllocationService allocationService) {
public RoutingService(ClusterService clusterService, BiFunction<ClusterState, String, ClusterState> reroute) {
this.clusterService = clusterService;
this.allocationService = allocationService;
this.reroute = reroute;
}
@Override
@ -76,34 +82,55 @@ public class RoutingService extends AbstractLifecycleComponent {
/**
* Initiates a reroute.
*/
public final void reroute(String reason) {
public final void reroute(String reason, ActionListener<Void> listener) {
if (lifecycle.started() == false) {
listener.onFailure(new IllegalStateException(
"rejecting delayed reroute [" + reason + "] in state [" + lifecycleState() + "]"));
return;
}
final PlainListenableActionFuture<Void> currentListeners;
synchronized (mutex) {
if (pendingRerouteListeners != null) {
logger.trace("already has pending reroute, adding [{}] to batch", reason);
pendingRerouteListeners.addListener(listener);
return;
}
currentListeners = PlainListenableActionFuture.newListenableFuture();
currentListeners.addListener(listener);
pendingRerouteListeners = currentListeners;
}
logger.trace("rerouting [{}]", reason);
try {
if (lifecycle.stopped()) {
return;
}
if (rerouting.compareAndSet(false, true) == false) {
logger.trace("already has pending reroute, ignoring {}", reason);
return;
}
logger.trace("rerouting {}", reason);
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")",
new ClusterStateUpdateTask(Priority.HIGH) {
@Override
public ClusterState execute(ClusterState currentState) {
rerouting.set(false);
return allocationService.reroute(currentState, reason);
synchronized (mutex) {
assert pendingRerouteListeners == currentListeners;
pendingRerouteListeners = null;
}
return reroute.apply(currentState, reason);
}
@Override
public void onNoLongerMaster(String source) {
rerouting.set(false);
// no biggie
synchronized (mutex) {
if (pendingRerouteListeners == currentListeners) {
pendingRerouteListeners = null;
}
}
currentListeners.onFailure(new NotMasterException("delayed reroute [" + reason + "] cancelled"));
// no big deal, the new master will reroute again
}
@Override
public void onFailure(String source, Exception e) {
rerouting.set(false);
ClusterState state = clusterService.state();
synchronized (mutex) {
if (pendingRerouteListeners == currentListeners) {
pendingRerouteListeners = null;
}
}
final ClusterState state = clusterService.state();
if (logger.isTraceEnabled()) {
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state:\n{}",
source, state), e);
@ -111,12 +138,22 @@ public class RoutingService extends AbstractLifecycleComponent {
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state version [{}]",
source, state.version()), e);
}
currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] failed", e));
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
currentListeners.onResponse(null);
}
});
} catch (Exception e) {
rerouting.set(false);
synchronized (mutex) {
assert pendingRerouteListeners == currentListeners;
pendingRerouteListeners = null;
}
ClusterState state = clusterService.state();
logger.warn(() -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e);
currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] could not be submitted", e));
}
}
}

View File

@ -21,12 +21,20 @@ package org.elasticsearch.cluster.routing.allocation;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import com.carrotsearch.hppc.ObjectLookupContainer;
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
@ -54,11 +62,15 @@ public class DiskThresholdMonitor {
private final Client client;
private final Set<String> nodeHasPassedWatermark = Sets.newConcurrentHashSet();
private final Supplier<ClusterState> clusterStateSupplier;
private long lastRunNS;
private final LongSupplier currentTimeMillisSupplier;
private final AtomicLong lastRunTimeMillis = new AtomicLong(Long.MIN_VALUE);
private final AtomicBoolean checkInProgress = new AtomicBoolean();
private final SetOnce<Consumer<ActionListener<Void>>> rerouteAction = new SetOnce<>();
public DiskThresholdMonitor(Settings settings, Supplier<ClusterState> clusterStateSupplier, ClusterSettings clusterSettings,
Client client) {
Client client, LongSupplier currentTimeMillisSupplier) {
this.clusterStateSupplier = clusterStateSupplier;
this.currentTimeMillisSupplier = currentTimeMillisSupplier;
this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings);
this.client = client;
}
@ -92,30 +104,48 @@ public class DiskThresholdMonitor {
}
}
private void checkFinished() {
final boolean checkFinished = checkInProgress.compareAndSet(true, false);
assert checkFinished;
}
public void onNewInfo(ClusterInfo info) {
ImmutableOpenMap<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages();
if (usages != null) {
assert rerouteAction.get() != null;
if (checkInProgress.compareAndSet(false, true) == false) {
logger.info("skipping monitor as a check is already in progress");
return;
}
final ImmutableOpenMap<String, DiskUsage> usages = info.getNodeLeastAvailableDiskUsages();
if (usages == null) {
checkFinished();
return;
}
boolean reroute = false;
String explanation = "";
final long currentTimeMillis = currentTimeMillisSupplier.getAsLong();
// Garbage collect nodes that have been removed from the cluster
// from the map that tracks watermark crossing
ObjectLookupContainer<String> nodes = usages.keys();
final ObjectLookupContainer<String> nodes = usages.keys();
for (String node : nodeHasPassedWatermark) {
if (nodes.contains(node) == false) {
nodeHasPassedWatermark.remove(node);
}
}
ClusterState state = clusterStateSupplier.get();
Set<String> indicesToMarkReadOnly = new HashSet<>();
for (ObjectObjectCursor<String, DiskUsage> entry : usages) {
String node = entry.key;
DiskUsage usage = entry.value;
final ClusterState state = clusterStateSupplier.get();
final Set<String> indicesToMarkReadOnly = new HashSet<>();
for (final ObjectObjectCursor<String, DiskUsage> entry : usages) {
final String node = entry.key;
final DiskUsage usage = entry.value;
warnAboutDiskIfNeeded(usage);
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() ||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {
RoutingNode routingNode = state.getRoutingNodes().node(node);
final RoutingNode routingNode = state.getRoutingNodes().node(node);
if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?!
for (ShardRouting routing : routingNode) {
indicesToMarkReadOnly.add(routing.index().getName());
@ -123,8 +153,7 @@ public class DiskThresholdMonitor {
}
} else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() ||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
if ((System.nanoTime() - lastRunNS) > diskThresholdSettings.getRerouteInterval().nanos()) {
lastRunNS = System.nanoTime();
if (lastRunTimeMillis.get() < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) {
reroute = true;
explanation = "high disk watermark exceeded on one or more nodes";
} else {
@ -142,8 +171,7 @@ public class DiskThresholdMonitor {
// low watermark, but is no longer, so we should
// reroute so any unassigned shards can be allocated
// if they are able to be
if ((System.nanoTime() - lastRunNS) > diskThresholdSettings.getRerouteInterval().nanos()) {
lastRunNS = System.nanoTime();
if (lastRunTimeMillis.get() < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) {
reroute = true;
explanation = "one or more nodes has gone under the high or low watermark";
nodeHasPassedWatermark.remove(node);
@ -155,25 +183,50 @@ public class DiskThresholdMonitor {
}
}
}
final ActionListener<Void> listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 2);
if (reroute) {
logger.info("rerouting shards: [{}]", explanation);
reroute();
rerouteAction.get().accept(ActionListener.wrap(r -> {
setLastRunTimeMillis();
listener.onResponse(r);
}, e -> {
logger.debug("reroute failed", e);
setLastRunTimeMillis();
listener.onFailure(e);
}));
} else {
listener.onResponse(null);
}
indicesToMarkReadOnly.removeIf(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index));
if (indicesToMarkReadOnly.isEmpty() == false) {
markIndicesReadOnly(indicesToMarkReadOnly);
}
markIndicesReadOnly(indicesToMarkReadOnly, ActionListener.wrap(r -> {
setLastRunTimeMillis();
listener.onResponse(r);
}, e -> {
logger.debug("marking indices readonly failed", e);
setLastRunTimeMillis();
listener.onFailure(e);
}));
} else {
listener.onResponse(null);
}
}
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly) {
private void setLastRunTimeMillis() {
lastRunTimeMillis.getAndUpdate(l -> Math.max(l, currentTimeMillisSupplier.getAsLong()));
}
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
// set read-only block but don't block on the response
client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY)).
setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build()).execute();
client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY))
.setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build())
.execute(ActionListener.map(listener, r -> null));
}
protected void reroute() {
// Execute an empty reroute, but don't block on the response
client.admin().cluster().prepareReroute().execute();
public void setRerouteAction(BiConsumer<String, ActionListener<Void>> rerouteAction) {
this.rerouteAction.set(listener -> rerouteAction.accept("disk threshold monitor", listener));
}
}

View File

@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateTaskConfig;
@ -43,7 +44,7 @@ import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.BiConsumer;
/**
* This class processes incoming join request (passed zia {@link ZenDiscovery}). Incoming nodes
@ -62,7 +63,7 @@ public class NodeJoinController {
public NodeJoinController(Settings settings, MasterService masterService, AllocationService allocationService,
ElectMasterService electMaster, Consumer<String> reroute) {
ElectMasterService electMaster, BiConsumer<String, ActionListener<Void>> reroute) {
this.masterService = masterService;
joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, reroute) {
@Override

View File

@ -165,7 +165,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover
NamedWriteableRegistry namedWriteableRegistry, MasterService masterService, ClusterApplier clusterApplier,
ClusterSettings clusterSettings, SeedHostsProvider hostsProvider, AllocationService allocationService,
Collection<BiConsumer<DiscoveryNode, ClusterState>> onJoinValidators, GatewayMetaState gatewayMetaState,
Consumer<String> reroute) {
BiConsumer<String, ActionListener<Void>> reroute) {
this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators);
this.masterService = masterService;
this.clusterApplier = clusterApplier;

View File

@ -21,6 +21,8 @@ package org.elasticsearch.gateway;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.cluster.routing.RoutingNodes;
@ -137,7 +139,9 @@ public class GatewayAllocator {
@Override
protected void reroute(ShardId shardId, String reason) {
logger.trace("{} scheduling reroute for {}", shardId, reason);
routingService.reroute("async_shard_fetch");
routingService.reroute("async_shard_fetch", ActionListener.wrap(
r -> logger.trace("{} scheduled reroute completed for {}", shardId, reason),
e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", shardId, reason), e)));
}
}

View File

@ -368,10 +368,10 @@ public class Node implements Closeable {
.newHashPublisher());
final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment,
scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class));
final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state,
clusterService.getClusterSettings(), client);
final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state,
clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis);
final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client,
listener::onNewInfo);
diskThresholdMonitor::onNewInfo);
final UsageService usageService = new UsageService();
ModulesBuilder modules = new ModulesBuilder();
@ -506,7 +506,7 @@ public class Node implements Closeable {
RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(),
metaDataCreateIndexService, metaDataIndexUpgradeService, clusterService.getClusterSettings());
final RoutingService routingService = new RoutingService(clusterService, clusterModule.getAllocationService());
final RoutingService routingService = new RoutingService(clusterService, clusterModule.getAllocationService()::reroute);
final DiscoveryModule discoveryModule = new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry,
networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(),
clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class),
@ -515,6 +515,7 @@ public class Node implements Closeable {
transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
searchTransportService);
diskThresholdMonitor.setRerouteAction(routingService::reroute);
final SearchService searchService = newSearchService(clusterService, indicesService,
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),

View File

@ -119,7 +119,7 @@ public class DiskUsageTests extends ESTestCase {
ImmutableOpenMap.Builder<String, Long> shardSizes = ImmutableOpenMap.builder();
ImmutableOpenMap.Builder<ShardRouting, String> routingToPath = ImmutableOpenMap.builder();
ClusterState state = ClusterState.builder(new ClusterName("blarg")).version(0).build();
InternalClusterInfoService.buildShardLevelInfo(logger, stats, shardSizes, routingToPath, state);
InternalClusterInfoService.buildShardLevelInfo(logger, stats, shardSizes, routingToPath);
assertEquals(2, shardSizes.size());
assertTrue(shardSizes.containsKey(ClusterInfo.shardIdentifierFromRouting(test_0)));
assertTrue(shardSizes.containsKey(ClusterInfo.shardIdentifierFromRouting(test_1)));

View File

@ -58,7 +58,7 @@ public class JoinHelperTests extends ESTestCase {
x -> localNode, null, Collections.emptySet());
JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> null,
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList(), s -> {});
Collections.emptyList(), (s, r) -> {});
transportService.start();
DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT);
@ -164,7 +164,7 @@ public class JoinHelperTests extends ESTestCase {
x -> localNode, null, Collections.emptySet());
new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> localClusterState,
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList(), s -> {}); // registers request handler
Collections.emptyList(), (s, r) -> {}); // registers request handler
transportService.start();
transportService.acceptIncomingRequests();

View File

@ -174,7 +174,7 @@ public class NodeJoinTests extends ESTestCase {
() -> new InMemoryPersistedState(term, initialState), r -> emptyList(),
new NoOpClusterApplier(),
Collections.emptyList(),
random, s -> {}, ElectionStrategy.DEFAULT_INSTANCE);
random, (s, r) -> {}, ElectionStrategy.DEFAULT_INSTANCE);
transportService.start();
transportService.acceptIncomingRequests();
transport = capturingTransport;

View File

@ -0,0 +1,170 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.startsWith;
public class RoutingServiceTests extends ESTestCase {
private ThreadPool threadPool;
private ClusterService clusterService;
@Before
public void beforeTest() {
threadPool = new TestThreadPool("test");
clusterService = ClusterServiceUtils.createClusterService(threadPool);
}
@After
public void afterTest() {
clusterService.stop();
threadPool.shutdown();
}
public void testRejectionUnlessStarted() {
final RoutingService routingService = new RoutingService(clusterService, (s, r) -> s);
final PlainActionFuture<Void> future = new PlainActionFuture<>();
if (randomBoolean()) {
routingService.start();
routingService.stop();
} else if (randomBoolean()) {
routingService.close();
}
routingService.reroute("test", future);
assertTrue(future.isDone());
assertThat(expectThrows(IllegalStateException.class, future::actionGet).getMessage(),
startsWith("rejecting delayed reroute [test] in state ["));
}
public void testReroutesWhenRequested() throws InterruptedException {
final AtomicLong rerouteCount = new AtomicLong();
final RoutingService routingService = new RoutingService(clusterService, (s, r) -> {
rerouteCount.incrementAndGet();
return s;
});
routingService.start();
long rerouteCountBeforeReroute = 0L;
final int iterations = between(1, 100);
final CountDownLatch countDownLatch = new CountDownLatch(iterations);
for (int i = 0; i < iterations; i++) {
rerouteCountBeforeReroute = Math.max(rerouteCountBeforeReroute, rerouteCount.get());
routingService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown));
}
countDownLatch.await(10, TimeUnit.SECONDS);
assertThat(rerouteCountBeforeReroute, lessThan(rerouteCount.get()));
}
public void testBatchesReroutesTogether() throws BrokenBarrierException, InterruptedException {
final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
clusterService.submitStateUpdateTask("block master service", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
cyclicBarrier.await(); // notify test that we are blocked
cyclicBarrier.await(); // wait to be unblocked by test
return currentState;
}
@Override
public void onFailure(String source, Exception e) {
throw new AssertionError(source, e);
}
});
cyclicBarrier.await(); // wait for master thread to be blocked
final AtomicBoolean rerouteExecuted = new AtomicBoolean();
final RoutingService routingService = new RoutingService(clusterService, (s, r) -> {
assertTrue(rerouteExecuted.compareAndSet(false, true)); // only called once
return s;
});
routingService.start();
final int iterations = between(1, 100);
final CountDownLatch countDownLatch = new CountDownLatch(iterations);
for (int i = 0; i < iterations; i++) {
routingService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown));
}
cyclicBarrier.await(); // allow master thread to continue;
countDownLatch.await(); // wait for reroute to complete
assertTrue(rerouteExecuted.get()); // see above for assertion that it's only called once
}
public void testNotifiesOnFailure() throws InterruptedException {
final RoutingService routingService = new RoutingService(clusterService, (s, r) -> {
if (rarely()) {
throw new ElasticsearchException("simulated");
}
return randomBoolean() ? s : ClusterState.builder(s).build();
});
routingService.start();
final int iterations = between(1, 100);
final CountDownLatch countDownLatch = new CountDownLatch(iterations);
for (int i = 0; i < iterations; i++) {
routingService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown));
if (rarely()) {
clusterService.getMasterService().setClusterStatePublisher(
randomBoolean()
? ClusterServiceUtils.createClusterStatePublisher(clusterService.getClusterApplierService())
: (event, publishListener, ackListener)
-> publishListener.onFailure(new FailedToCommitClusterStateException("simulated")));
}
if (rarely()) {
clusterService.getClusterApplierService().onNewClusterState("simulated", () -> {
ClusterState state = clusterService.state();
return ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes())
.masterNodeId(randomBoolean() ? null : state.nodes().getLocalNodeId())).build();
}, (source, e) -> { });
}
}
assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); // i.e. it doesn't leak any listeners
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.cluster.routing.allocation;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
@ -35,16 +36,17 @@ import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
public class DiskThresholdMonitorTests extends ESAllocationTestCase {
public void testMarkFloodStageIndicesReadOnly() {
AllocationService allocation = createAllocationService(Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 10).build());
@ -61,7 +63,6 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
.addAsNew(metaData.index("test"))
.addAsNew(metaData.index("test_1"))
.addAsNew(metaData.index("test_2"))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metaData(metaData).routingTable(routingTable).build();
@ -74,18 +75,21 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
ClusterState finalState = clusterState;
AtomicBoolean reroute = new AtomicBoolean(false);
AtomicReference<Set<String>> indices = new AtomicReference<>();
AtomicLong currentTime = new AtomicLong();
DiskThresholdMonitor monitor = new DiskThresholdMonitor(settings, () -> finalState,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) {
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get) {
@Override
protected void reroute() {
assertTrue(reroute.compareAndSet(false, true));
}
@Override
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly) {
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly));
listener.onResponse(null);
}
};
monitor.setRerouteAction((reason, listener) -> {
assertTrue(reroute.compareAndSet(false, true));
listener.onResponse(null);
});
ImmutableOpenMap.Builder<String, DiskUsage> builder = ImmutableOpenMap.builder();
builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4));
builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 30));
@ -97,6 +101,7 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
builder = ImmutableOpenMap.builder();
builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4));
builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 5));
currentTime.addAndGet(randomLongBetween(60001, 120000));
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
assertTrue(reroute.get());
assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indices.get());
@ -114,17 +119,17 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
assertTrue(anotherFinalClusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2"));
monitor = new DiskThresholdMonitor(settings, () -> anotherFinalClusterState,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) {
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get) {
@Override
protected void reroute() {
assertTrue(reroute.compareAndSet(false, true));
}
@Override
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly) {
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly));
listener.onResponse(null);
}
};
monitor.setRerouteAction((reason, listener) -> {
assertTrue(reroute.compareAndSet(false, true));
listener.onResponse(null);
});
indices.set(null);
reroute.set(false);
@ -133,6 +138,90 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 5));
monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null));
assertTrue(reroute.get());
assertEquals(new HashSet<>(Arrays.asList("test_1")), indices.get());
assertEquals(Collections.singleton("test_1"), indices.get());
}
public void testDoesNotSubmitRerouteTaskTooFrequently() {
final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build();
AtomicLong currentTime = new AtomicLong();
AtomicReference<ActionListener<Void>> listenerReference = new AtomicReference<>();
DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterState,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get) {
@Override
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
throw new AssertionError("unexpected");
}
};
monitor.setRerouteAction((reason, listener) -> {
assertNotNull(listener);
assertTrue(listenerReference.compareAndSet(null, listener));
});
final ImmutableOpenMap.Builder<String, DiskUsage> allDisksOkBuilder;
allDisksOkBuilder = ImmutableOpenMap.builder();
allDisksOkBuilder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 50));
allDisksOkBuilder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 50));
final ImmutableOpenMap<String, DiskUsage> allDisksOk = allDisksOkBuilder.build();
final ImmutableOpenMap.Builder<String, DiskUsage> oneDiskAboveWatermarkBuilder = ImmutableOpenMap.builder();
oneDiskAboveWatermarkBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(5, 9)));
oneDiskAboveWatermarkBuilder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, 50));
final ImmutableOpenMap<String, DiskUsage> oneDiskAboveWatermark = oneDiskAboveWatermarkBuilder.build();
// should not reroute when all disks are ok
currentTime.addAndGet(randomLongBetween(0, 120000));
monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null));
assertNull(listenerReference.get());
// should reroute when one disk goes over the watermark
currentTime.addAndGet(randomLongBetween(0, 120000));
monitor.onNewInfo(new ClusterInfo(oneDiskAboveWatermark, null, null, null));
assertNotNull(listenerReference.get());
listenerReference.getAndSet(null).onResponse(null);
if (randomBoolean()) {
// should not re-route again within the reroute interval
currentTime.addAndGet(randomLongBetween(0,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis()));
monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null));
assertNull(listenerReference.get());
}
// should reroute again when one disk is still over the watermark
currentTime.addAndGet(randomLongBetween(
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis() + 1, 120000));
monitor.onNewInfo(new ClusterInfo(oneDiskAboveWatermark, null, null, null));
assertNotNull(listenerReference.get());
final ActionListener<Void> rerouteListener1 = listenerReference.getAndSet(null);
// should not re-route again before reroute has completed
currentTime.addAndGet(randomLongBetween(0, 120000));
monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null));
assertNull(listenerReference.get());
// complete reroute
rerouteListener1.onResponse(null);
if (randomBoolean()) {
// should not re-route again within the reroute interval
currentTime.addAndGet(randomLongBetween(0,
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis()));
monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null));
assertNull(listenerReference.get());
}
// should reroute again after the reroute interval
currentTime.addAndGet(randomLongBetween(
DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis() + 1, 120000));
monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null));
assertNotNull(listenerReference.get());
listenerReference.getAndSet(null).onResponse(null);
// should not reroute again when it is not required
currentTime.addAndGet(randomLongBetween(0, 120000));
monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null));
assertNull(listenerReference.get());
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.cluster.routing.allocation.decider;
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
@ -31,7 +30,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
import java.util.ArrayList;
import java.util.Collection;
@ -53,7 +51,6 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
return Collections.singletonList(MockInternalClusterInfoService.TestPlugin.class);
}
@TestLogging("org.elasticsearch.indices.recovery:TRACE,org.elasticsearch.cluster.service:TRACE")
public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception {
List<String> nodes = internalCluster().startNodes(3);
@ -105,12 +102,6 @@ public class MockDiskUsagesIT extends ESIntegTestCase {
assertBusy(() -> {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
logger.info("--> {}", clusterState.routingTable());
final RecoveryResponse recoveryResponse = client().admin().indices()
.prepareRecoveries("test").setActiveOnly(true).setDetailed(true).get();
logger.info("--> recoveries: {}", recoveryResponse);
final Map<String, Integer> nodesToShardCount = new HashMap<>();
for (final RoutingNode node : clusterState.getRoutingNodes()) {
logger.info("--> node {} has {} shards",

View File

@ -139,7 +139,7 @@ public class NodeJoinControllerTests extends ESTestCase {
}
masterService = ClusterServiceUtils.createMasterService(threadPool, initialState);
nodeJoinController = new NodeJoinController(Settings.EMPTY, masterService, createAllocationService(Settings.EMPTY),
new ElectMasterService(Settings.EMPTY), s -> {});
new ElectMasterService(Settings.EMPTY), (s, r) -> {});
}
public void testSimpleJoinAccumulation() throws InterruptedException, ExecutionException {

View File

@ -370,7 +370,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
masterService, clusterApplier, clusterSettings, hostsResolver -> Collections.emptyList(),
ESAllocationTestCase.createAllocationService(),
Collections.emptyList(), mock(GatewayMetaState.class), s -> {});
Collections.emptyList(), mock(GatewayMetaState.class), (s, r) -> {});
zenDiscovery.start();
return zenDiscovery;
}

View File

@ -213,7 +213,7 @@ public class ClusterStateChanges {
transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver);
nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, s -> {});
joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, (s, r) -> {});
}
public ClusterState createIndex(ClusterState state, CreateIndexRequest request) {

View File

@ -1065,7 +1065,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
transportService, indicesService, actionFilters, indexNameExpressionResolver);
final ShardStateAction shardStateAction = new ShardStateAction(
clusterService, transportService, allocationService,
new RoutingService(clusterService, allocationService),
new RoutingService(clusterService, allocationService::reroute),
threadPool
);
final MetaDataMappingService metaDataMappingService = new MetaDataMappingService(clusterService, indicesService);
@ -1248,7 +1248,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
hostsResolver -> testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode())
.map(n -> n.node.getAddress()).collect(Collectors.toList()),
clusterService.getClusterApplierService(), Collections.emptyList(), random(),
new RoutingService(clusterService, allocationService)::reroute, ElectionStrategy.DEFAULT_INSTANCE);
new RoutingService(clusterService, allocationService::reroute)::reroute, ElectionStrategy.DEFAULT_INSTANCE);
masterService.setClusterStatePublisher(coordinator);
coordinator.start();
masterService.start();

View File

@ -841,7 +841,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
allocationService, masterService, this::getPersistedState,
Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), s -> {},
Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), (s, r) -> {},
getElectionStrategy());
masterService.setClusterStatePublisher(coordinator);
final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,