From fca7a1971302b5541272f3a8f350dac3c079b779 Mon Sep 17 00:00:00 2001 From: David Turner Date: Sun, 30 Jun 2019 16:44:57 +0100 Subject: [PATCH] Avoid parallel reroutes in DiskThresholdMonitor (#43381) Today the `DiskThresholdMonitor` limits the frequency with which it submits reroute tasks, but it might still submit these tasks faster than the master can process them if, for instance, each reroute takes over 60 seconds. This causes a problem since the reroute task runs with priority `IMMEDIATE` and is always scheduled when there is a node over the high watermark, so this can starve any other pending tasks on the master. This change avoids further updates from the monitor while its last task(s) are still in progress, and it measures the time of each update from the completion time of the reroute task rather than its start time, to allow a larger window for other tasks to run. It also now makes use of the `RoutingService` to submit the reroute task, in order to batch this task with any other pending reroutes. It enhances the `RoutingService` to notify its listeners on completion. Fixes #40174 Relates #42559 --- .../cluster/InternalClusterInfoService.java | 10 +- .../action/shard/ShardStateAction.java | 4 +- .../cluster/coordination/Coordinator.java | 6 +- .../cluster/coordination/JoinHelper.java | 9 +- .../coordination/JoinTaskExecutor.java | 12 +- .../cluster/routing/RoutingService.java | 81 ++++++-- .../allocation/DiskThresholdMonitor.java | 189 +++++++++++------- .../discovery/zen/NodeJoinController.java | 5 +- .../discovery/zen/ZenDiscovery.java | 2 +- .../gateway/GatewayAllocator.java | 6 +- .../java/org/elasticsearch/node/Node.java | 9 +- .../elasticsearch/cluster/DiskUsageTests.java | 2 +- .../cluster/coordination/JoinHelperTests.java | 4 +- .../cluster/coordination/NodeJoinTests.java | 2 +- .../cluster/routing/RoutingServiceTests.java | 170 ++++++++++++++++ .../allocation/DiskThresholdMonitorTests.java | 123 ++++++++++-- .../allocation/decider/MockDiskUsagesIT.java | 9 - .../zen/NodeJoinControllerTests.java | 2 +- .../discovery/zen/ZenDiscoveryUnitTests.java | 2 +- .../indices/cluster/ClusterStateChanges.java | 2 +- .../snapshots/SnapshotResiliencyTests.java | 4 +- .../AbstractCoordinatorTestCase.java | 2 +- 22 files changed, 503 insertions(+), 152 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 8d78f9c838e..4b893619891 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -131,13 +131,13 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode logger.trace("I have been elected master, scheduling a ClusterInfoUpdateJob"); } - // Submit a job that will start after DEFAULT_STARTING_INTERVAL, and reschedule itself after running + // Submit a job that will reschedule itself after running threadPool.scheduleUnlessShuttingDown(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob()); try { if (clusterService.state().getNodes().getDataNodes().size() > 1) { // Submit an info update job to be run immediately - threadPool.executor(executorName()).execute(() -> maybeRefresh()); + threadPool.executor(executorName()).execute(this::maybeRefresh); } } catch (EsRejectedExecutionException ex) { logger.debug("Couldn't schedule cluster info update task - node might be shutting down", ex); @@ -173,7 +173,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode if (logger.isDebugEnabled()) { logger.debug("data node was added, retrieving new cluster info"); } - threadPool.executor(executorName()).execute(() -> maybeRefresh()); + threadPool.executor(executorName()).execute(this::maybeRefresh); } if (this.isMaster && event.nodesRemoved()) { @@ -316,7 +316,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode ShardStats[] stats = indicesStatsResponse.getShards(); ImmutableOpenMap.Builder newShardSizes = ImmutableOpenMap.builder(); ImmutableOpenMap.Builder newShardRoutingToDataPath = ImmutableOpenMap.builder(); - buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath, clusterService.state()); + buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath); shardSizes = newShardSizes.build(); shardRoutingToDataPath = newShardRoutingToDataPath.build(); } @@ -365,7 +365,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode } static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder newShardSizes, - ImmutableOpenMap.Builder newShardRoutingToDataPath, ClusterState state) { + ImmutableOpenMap.Builder newShardRoutingToDataPath) { for (ShardStats s : stats) { newShardRoutingToDataPath.put(s.getShardRouting(), s.getDataPath()); long size = s.getStats().getStore().sizeInBytes(); diff --git a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java index b07ba8d09f3..32cc265a6c7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/shard/ShardStateAction.java @@ -382,7 +382,9 @@ public class ShardStateAction { if (logger.isTraceEnabled()) { logger.trace("{}, scheduling a reroute", reason); } - routingService.reroute(reason); + routingService.reroute(reason, ActionListener.wrap( + r -> logger.trace("{}, reroute completed", reason), + e -> logger.debug(new ParameterizedMessage("{}, reroute failed", reason), e))); } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java index e4619d33a7e..b9f3fdbd8c7 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/Coordinator.java @@ -83,7 +83,6 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -152,13 +151,14 @@ public class Coordinator extends AbstractLifecycleComponent implements Discovery * @param nodeName The name of the node, used to name the {@link java.util.concurrent.ExecutorService} of the {@link SeedHostsResolver}. * @param onJoinValidators A collection of join validators to restrict which nodes may join the cluster. * @param reroute A callback to call when the membership of the cluster has changed, to recalculate the assignment of shards. In - * production code this calls {@link org.elasticsearch.cluster.routing.RoutingService#reroute(String)}. + * production code this calls + * {@link org.elasticsearch.cluster.routing.RoutingService#reroute(String, ActionListener)}. */ public Coordinator(String nodeName, Settings settings, ClusterSettings clusterSettings, TransportService transportService, NamedWriteableRegistry namedWriteableRegistry, AllocationService allocationService, MasterService masterService, Supplier persistedStateSupplier, SeedHostsProvider seedHostsProvider, ClusterApplier clusterApplier, Collection> onJoinValidators, Random random, - Consumer reroute, ElectionStrategy electionStrategy) { + BiConsumer> reroute, ElectionStrategy electionStrategy) { this.settings = settings; this.transportService = transportService; this.masterService = masterService; diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index bdf2afe9213..7d4a1f41cd2 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -64,7 +64,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; -import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; @@ -91,10 +90,10 @@ public class JoinHelper { private AtomicReference lastFailedJoinAttempt = new AtomicReference<>(); - public JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService, - TransportService transportService, LongSupplier currentTermSupplier, Supplier currentStateSupplier, - BiConsumer joinHandler, Function joinLeaderInTerm, - Collection> joinValidators, Consumer reroute) { + JoinHelper(Settings settings, AllocationService allocationService, MasterService masterService, + TransportService transportService, LongSupplier currentTermSupplier, Supplier currentStateSupplier, + BiConsumer joinHandler, Function joinLeaderInTerm, + Collection> joinValidators, BiConsumer> reroute) { this.masterService = masterService; this.transportService = transportService; this.joinTimeout = JOIN_TIMEOUT_SETTING.get(settings); diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java index f82ff1a1155..2f129fb1936 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinTaskExecutor.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.coordination; import org.apache.logging.log4j.Logger; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.NotMasterException; @@ -38,7 +39,6 @@ import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.function.BiConsumer; -import java.util.function.Consumer; import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK; @@ -47,7 +47,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor reroute; + private final BiConsumer> reroute; private final int minimumMasterNodesOnLocalNode; @@ -86,7 +86,8 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor reroute) { + public JoinTaskExecutor(Settings settings, AllocationService allocationService, Logger logger, + BiConsumer> reroute) { this.allocationService = allocationService; this.logger = logger; minimumMasterNodesOnLocalNode = ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING.get(settings); @@ -154,7 +155,10 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor logger.trace("post-join reroute completed"), + e -> logger.debug("post-join reroute failed", e))); + return results.build(allocationService.adaptAutoExpandReplicas(newState.nodes(nodesBuilder).build())); } else { // we must return a new cluster state instance to force publishing. This is important diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index 89e19e02b30..7068f907905 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -22,16 +22,20 @@ package org.elasticsearch.cluster.routing; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainListenableActionFuture; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; -import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiFunction; /** * A {@link RoutingService} listens to clusters state. When this service @@ -51,14 +55,16 @@ public class RoutingService extends AbstractLifecycleComponent { private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute"; private final ClusterService clusterService; - private final AllocationService allocationService; + private final BiFunction reroute; - private AtomicBoolean rerouting = new AtomicBoolean(); + private final Object mutex = new Object(); + @Nullable // null if no reroute is currently pending + private PlainListenableActionFuture pendingRerouteListeners; @Inject - public RoutingService(ClusterService clusterService, AllocationService allocationService) { + public RoutingService(ClusterService clusterService, BiFunction reroute) { this.clusterService = clusterService; - this.allocationService = allocationService; + this.reroute = reroute; } @Override @@ -76,34 +82,55 @@ public class RoutingService extends AbstractLifecycleComponent { /** * Initiates a reroute. */ - public final void reroute(String reason) { + public final void reroute(String reason, ActionListener listener) { + if (lifecycle.started() == false) { + listener.onFailure(new IllegalStateException( + "rejecting delayed reroute [" + reason + "] in state [" + lifecycleState() + "]")); + return; + } + final PlainListenableActionFuture currentListeners; + synchronized (mutex) { + if (pendingRerouteListeners != null) { + logger.trace("already has pending reroute, adding [{}] to batch", reason); + pendingRerouteListeners.addListener(listener); + return; + } + currentListeners = PlainListenableActionFuture.newListenableFuture(); + currentListeners.addListener(listener); + pendingRerouteListeners = currentListeners; + } + logger.trace("rerouting [{}]", reason); try { - if (lifecycle.stopped()) { - return; - } - if (rerouting.compareAndSet(false, true) == false) { - logger.trace("already has pending reroute, ignoring {}", reason); - return; - } - logger.trace("rerouting {}", reason); clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")", new ClusterStateUpdateTask(Priority.HIGH) { @Override public ClusterState execute(ClusterState currentState) { - rerouting.set(false); - return allocationService.reroute(currentState, reason); + synchronized (mutex) { + assert pendingRerouteListeners == currentListeners; + pendingRerouteListeners = null; + } + return reroute.apply(currentState, reason); } @Override public void onNoLongerMaster(String source) { - rerouting.set(false); - // no biggie + synchronized (mutex) { + if (pendingRerouteListeners == currentListeners) { + pendingRerouteListeners = null; + } + } + currentListeners.onFailure(new NotMasterException("delayed reroute [" + reason + "] cancelled")); + // no big deal, the new master will reroute again } @Override public void onFailure(String source, Exception e) { - rerouting.set(false); - ClusterState state = clusterService.state(); + synchronized (mutex) { + if (pendingRerouteListeners == currentListeners) { + pendingRerouteListeners = null; + } + } + final ClusterState state = clusterService.state(); if (logger.isTraceEnabled()) { logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state:\n{}", source, state), e); @@ -111,12 +138,22 @@ public class RoutingService extends AbstractLifecycleComponent { logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state version [{}]", source, state.version()), e); } + currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] failed", e)); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + currentListeners.onResponse(null); } }); } catch (Exception e) { - rerouting.set(false); + synchronized (mutex) { + assert pendingRerouteListeners == currentListeners; + pendingRerouteListeners = null; + } ClusterState state = clusterService.state(); logger.warn(() -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e); + currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] could not be submitted", e)); } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java index 4badab5a0ca..96e4974b9b4 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -21,12 +21,20 @@ package org.elasticsearch.cluster.routing.allocation; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.LongSupplier; import java.util.function.Supplier; import com.carrotsearch.hppc.ObjectLookupContainer; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.GroupedActionListener; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterState; @@ -54,11 +62,15 @@ public class DiskThresholdMonitor { private final Client client; private final Set nodeHasPassedWatermark = Sets.newConcurrentHashSet(); private final Supplier clusterStateSupplier; - private long lastRunNS; + private final LongSupplier currentTimeMillisSupplier; + private final AtomicLong lastRunTimeMillis = new AtomicLong(Long.MIN_VALUE); + private final AtomicBoolean checkInProgress = new AtomicBoolean(); + private final SetOnce>> rerouteAction = new SetOnce<>(); public DiskThresholdMonitor(Settings settings, Supplier clusterStateSupplier, ClusterSettings clusterSettings, - Client client) { + Client client, LongSupplier currentTimeMillisSupplier) { this.clusterStateSupplier = clusterStateSupplier; + this.currentTimeMillisSupplier = currentTimeMillisSupplier; this.diskThresholdSettings = new DiskThresholdSettings(settings, clusterSettings); this.client = client; } @@ -92,88 +104,129 @@ public class DiskThresholdMonitor { } } + private void checkFinished() { + final boolean checkFinished = checkInProgress.compareAndSet(true, false); + assert checkFinished; + } public void onNewInfo(ClusterInfo info) { - ImmutableOpenMap usages = info.getNodeLeastAvailableDiskUsages(); - if (usages != null) { - boolean reroute = false; - String explanation = ""; - // Garbage collect nodes that have been removed from the cluster - // from the map that tracks watermark crossing - ObjectLookupContainer nodes = usages.keys(); - for (String node : nodeHasPassedWatermark) { - if (nodes.contains(node) == false) { - nodeHasPassedWatermark.remove(node); - } + assert rerouteAction.get() != null; + + if (checkInProgress.compareAndSet(false, true) == false) { + logger.info("skipping monitor as a check is already in progress"); + return; + } + + final ImmutableOpenMap usages = info.getNodeLeastAvailableDiskUsages(); + if (usages == null) { + checkFinished(); + return; + } + + boolean reroute = false; + String explanation = ""; + final long currentTimeMillis = currentTimeMillisSupplier.getAsLong(); + + // Garbage collect nodes that have been removed from the cluster + // from the map that tracks watermark crossing + final ObjectLookupContainer nodes = usages.keys(); + for (String node : nodeHasPassedWatermark) { + if (nodes.contains(node) == false) { + nodeHasPassedWatermark.remove(node); } - ClusterState state = clusterStateSupplier.get(); - Set indicesToMarkReadOnly = new HashSet<>(); - for (ObjectObjectCursor entry : usages) { - String node = entry.key; - DiskUsage usage = entry.value; - warnAboutDiskIfNeeded(usage); - if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() || - usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) { - RoutingNode routingNode = state.getRoutingNodes().node(node); - if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?! - for (ShardRouting routing : routingNode) { - indicesToMarkReadOnly.add(routing.index().getName()); - } + } + final ClusterState state = clusterStateSupplier.get(); + final Set indicesToMarkReadOnly = new HashSet<>(); + + for (final ObjectObjectCursor entry : usages) { + final String node = entry.key; + final DiskUsage usage = entry.value; + warnAboutDiskIfNeeded(usage); + if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() || + usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) { + final RoutingNode routingNode = state.getRoutingNodes().node(node); + if (routingNode != null) { // this might happen if we haven't got the full cluster-state yet?! + for (ShardRouting routing : routingNode) { + indicesToMarkReadOnly.add(routing.index().getName()); } - } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() || - usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { - if ((System.nanoTime() - lastRunNS) > diskThresholdSettings.getRerouteInterval().nanos()) { - lastRunNS = System.nanoTime(); + } + } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() || + usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { + if (lastRunTimeMillis.get() < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) { + reroute = true; + explanation = "high disk watermark exceeded on one or more nodes"; + } else { + logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred " + + "in the last [{}], skipping reroute", + node, diskThresholdSettings.getRerouteInterval()); + } + nodeHasPassedWatermark.add(node); + } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes() || + usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) { + nodeHasPassedWatermark.add(node); + } else { + if (nodeHasPassedWatermark.contains(node)) { + // The node has previously been over the high or + // low watermark, but is no longer, so we should + // reroute so any unassigned shards can be allocated + // if they are able to be + if (lastRunTimeMillis.get() < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) { reroute = true; - explanation = "high disk watermark exceeded on one or more nodes"; + explanation = "one or more nodes has gone under the high or low watermark"; + nodeHasPassedWatermark.remove(node); } else { - logger.debug("high disk watermark exceeded on {} but an automatic reroute has occurred " + + logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred " + "in the last [{}], skipping reroute", node, diskThresholdSettings.getRerouteInterval()); } - nodeHasPassedWatermark.add(node); - } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes() || - usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) { - nodeHasPassedWatermark.add(node); - } else { - if (nodeHasPassedWatermark.contains(node)) { - // The node has previously been over the high or - // low watermark, but is no longer, so we should - // reroute so any unassigned shards can be allocated - // if they are able to be - if ((System.nanoTime() - lastRunNS) > diskThresholdSettings.getRerouteInterval().nanos()) { - lastRunNS = System.nanoTime(); - reroute = true; - explanation = "one or more nodes has gone under the high or low watermark"; - nodeHasPassedWatermark.remove(node); - } else { - logger.debug("{} has gone below a disk threshold, but an automatic reroute has occurred " + - "in the last [{}], skipping reroute", - node, diskThresholdSettings.getRerouteInterval()); - } - } } } - if (reroute) { - logger.info("rerouting shards: [{}]", explanation); - reroute(); - } - indicesToMarkReadOnly.removeIf(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index)); - if (indicesToMarkReadOnly.isEmpty() == false) { - markIndicesReadOnly(indicesToMarkReadOnly); - } + } + + final ActionListener listener = new GroupedActionListener<>(ActionListener.wrap(this::checkFinished), 2); + + if (reroute) { + logger.info("rerouting shards: [{}]", explanation); + rerouteAction.get().accept(ActionListener.wrap(r -> { + setLastRunTimeMillis(); + listener.onResponse(r); + }, e -> { + logger.debug("reroute failed", e); + setLastRunTimeMillis(); + listener.onFailure(e); + })); + } else { + listener.onResponse(null); + } + + indicesToMarkReadOnly.removeIf(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index)); + if (indicesToMarkReadOnly.isEmpty() == false) { + markIndicesReadOnly(indicesToMarkReadOnly, ActionListener.wrap(r -> { + setLastRunTimeMillis(); + listener.onResponse(r); + }, e -> { + logger.debug("marking indices readonly failed", e); + setLastRunTimeMillis(); + listener.onFailure(e); + })); + } else { + listener.onResponse(null); } } - protected void markIndicesReadOnly(Set indicesToMarkReadOnly) { - // set read-only block but don't block on the response - client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY)). - setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build()).execute(); + private void setLastRunTimeMillis() { + lastRunTimeMillis.getAndUpdate(l -> Math.max(l, currentTimeMillisSupplier.getAsLong())); } - protected void reroute() { - // Execute an empty reroute, but don't block on the response - client.admin().cluster().prepareReroute().execute(); + protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener) { + // set read-only block but don't block on the response + client.admin().indices().prepareUpdateSettings(indicesToMarkReadOnly.toArray(Strings.EMPTY_ARRAY)) + .setSettings(Settings.builder().put(IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE, true).build()) + .execute(ActionListener.map(listener, r -> null)); + } + + public void setRerouteAction(BiConsumer> rerouteAction) { + this.rerouteAction.set(listener -> rerouteAction.accept("disk threshold monitor", listener)); } } diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java b/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java index bce5695a817..f382a6b7a84 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java @@ -22,6 +22,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateTaskConfig; @@ -43,7 +44,7 @@ import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; +import java.util.function.BiConsumer; /** * This class processes incoming join request (passed zia {@link ZenDiscovery}). Incoming nodes @@ -62,7 +63,7 @@ public class NodeJoinController { public NodeJoinController(Settings settings, MasterService masterService, AllocationService allocationService, - ElectMasterService electMaster, Consumer reroute) { + ElectMasterService electMaster, BiConsumer> reroute) { this.masterService = masterService; joinTaskExecutor = new JoinTaskExecutor(settings, allocationService, logger, reroute) { @Override diff --git a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index c11dfb16ad5..09dcd74c6e3 100644 --- a/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -165,7 +165,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implements Discover NamedWriteableRegistry namedWriteableRegistry, MasterService masterService, ClusterApplier clusterApplier, ClusterSettings clusterSettings, SeedHostsProvider hostsProvider, AllocationService allocationService, Collection> onJoinValidators, GatewayMetaState gatewayMetaState, - Consumer reroute) { + BiConsumer> reroute) { this.onJoinValidators = JoinTaskExecutor.addBuiltInJoinValidators(onJoinValidators); this.masterService = masterService; this.clusterApplier = clusterApplier; diff --git a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index 82627cfdc0b..6543d5d1174 100644 --- a/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -21,6 +21,8 @@ package org.elasticsearch.gateway; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.nodes.BaseNodeResponse; import org.elasticsearch.action.support.nodes.BaseNodesResponse; import org.elasticsearch.cluster.routing.RoutingNodes; @@ -137,7 +139,9 @@ public class GatewayAllocator { @Override protected void reroute(ShardId shardId, String reason) { logger.trace("{} scheduling reroute for {}", shardId, reason); - routingService.reroute("async_shard_fetch"); + routingService.reroute("async_shard_fetch", ActionListener.wrap( + r -> logger.trace("{} scheduled reroute completed for {}", shardId, reason), + e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", shardId, reason), e))); } } diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 0f9c45fd1a7..cfc98d236e9 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -368,10 +368,10 @@ public class Node implements Closeable { .newHashPublisher()); final IngestService ingestService = new IngestService(clusterService, threadPool, this.environment, scriptModule.getScriptService(), analysisModule.getAnalysisRegistry(), pluginsService.filterPlugins(IngestPlugin.class)); - final DiskThresholdMonitor listener = new DiskThresholdMonitor(settings, clusterService::state, - clusterService.getClusterSettings(), client); + final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(settings, clusterService::state, + clusterService.getClusterSettings(), client, threadPool::relativeTimeInMillis); final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client, - listener::onNewInfo); + diskThresholdMonitor::onNewInfo); final UsageService usageService = new UsageService(); ModulesBuilder modules = new ModulesBuilder(); @@ -506,7 +506,7 @@ public class Node implements Closeable { RestoreService restoreService = new RestoreService(clusterService, repositoryService, clusterModule.getAllocationService(), metaDataCreateIndexService, metaDataIndexUpgradeService, clusterService.getClusterSettings()); - final RoutingService routingService = new RoutingService(clusterService, clusterModule.getAllocationService()); + final RoutingService routingService = new RoutingService(clusterService, clusterModule.getAllocationService()::reroute); final DiscoveryModule discoveryModule = new DiscoveryModule(settings, threadPool, transportService, namedWriteableRegistry, networkService, clusterService.getMasterService(), clusterService.getClusterApplierService(), clusterService.getClusterSettings(), pluginsService.filterPlugins(DiscoveryPlugin.class), @@ -515,6 +515,7 @@ public class Node implements Closeable { transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(), httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService, searchTransportService); + diskThresholdMonitor.setRerouteAction(routingService::reroute); final SearchService searchService = newSearchService(clusterService, indicesService, threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(), diff --git a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java index fcccaf6c0f0..55eae6fc0e9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java @@ -119,7 +119,7 @@ public class DiskUsageTests extends ESTestCase { ImmutableOpenMap.Builder shardSizes = ImmutableOpenMap.builder(); ImmutableOpenMap.Builder routingToPath = ImmutableOpenMap.builder(); ClusterState state = ClusterState.builder(new ClusterName("blarg")).version(0).build(); - InternalClusterInfoService.buildShardLevelInfo(logger, stats, shardSizes, routingToPath, state); + InternalClusterInfoService.buildShardLevelInfo(logger, stats, shardSizes, routingToPath); assertEquals(2, shardSizes.size()); assertTrue(shardSizes.containsKey(ClusterInfo.shardIdentifierFromRouting(test_0))); assertTrue(shardSizes.containsKey(ClusterInfo.shardIdentifierFromRouting(test_1))); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java index e0b161b76b3..4aeee62948b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/JoinHelperTests.java @@ -58,7 +58,7 @@ public class JoinHelperTests extends ESTestCase { x -> localNode, null, Collections.emptySet()); JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> null, (joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); }, - Collections.emptyList(), s -> {}); + Collections.emptyList(), (s, r) -> {}); transportService.start(); DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT); @@ -164,7 +164,7 @@ public class JoinHelperTests extends ESTestCase { x -> localNode, null, Collections.emptySet()); new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> localClusterState, (joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); }, - Collections.emptyList(), s -> {}); // registers request handler + Collections.emptyList(), (s, r) -> {}); // registers request handler transportService.start(); transportService.acceptIncomingRequests(); diff --git a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java index d407ebdc2ce..dcb6d26de9b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/coordination/NodeJoinTests.java @@ -174,7 +174,7 @@ public class NodeJoinTests extends ESTestCase { () -> new InMemoryPersistedState(term, initialState), r -> emptyList(), new NoOpClusterApplier(), Collections.emptyList(), - random, s -> {}, ElectionStrategy.DEFAULT_INSTANCE); + random, (s, r) -> {}, ElectionStrategy.DEFAULT_INSTANCE); transportService.start(); transportService.acceptIncomingRequests(); transport = capturingTransport; diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java new file mode 100644 index 00000000000..5368c1c5544 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java @@ -0,0 +1,170 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.cluster.routing; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateUpdateTask; +import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; +import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.test.ClusterServiceUtils; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.threadpool.TestThreadPool; +import org.elasticsearch.threadpool.ThreadPool; +import org.junit.After; +import org.junit.Before; + +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.startsWith; + +public class RoutingServiceTests extends ESTestCase { + + private ThreadPool threadPool; + private ClusterService clusterService; + + @Before + public void beforeTest() { + threadPool = new TestThreadPool("test"); + clusterService = ClusterServiceUtils.createClusterService(threadPool); + } + + @After + public void afterTest() { + clusterService.stop(); + threadPool.shutdown(); + } + + public void testRejectionUnlessStarted() { + final RoutingService routingService = new RoutingService(clusterService, (s, r) -> s); + final PlainActionFuture future = new PlainActionFuture<>(); + + if (randomBoolean()) { + routingService.start(); + routingService.stop(); + } else if (randomBoolean()) { + routingService.close(); + } + + routingService.reroute("test", future); + assertTrue(future.isDone()); + assertThat(expectThrows(IllegalStateException.class, future::actionGet).getMessage(), + startsWith("rejecting delayed reroute [test] in state [")); + } + + public void testReroutesWhenRequested() throws InterruptedException { + final AtomicLong rerouteCount = new AtomicLong(); + final RoutingService routingService = new RoutingService(clusterService, (s, r) -> { + rerouteCount.incrementAndGet(); + return s; + }); + + routingService.start(); + + long rerouteCountBeforeReroute = 0L; + final int iterations = between(1, 100); + final CountDownLatch countDownLatch = new CountDownLatch(iterations); + for (int i = 0; i < iterations; i++) { + rerouteCountBeforeReroute = Math.max(rerouteCountBeforeReroute, rerouteCount.get()); + routingService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown)); + } + countDownLatch.await(10, TimeUnit.SECONDS); + assertThat(rerouteCountBeforeReroute, lessThan(rerouteCount.get())); + } + + public void testBatchesReroutesTogether() throws BrokenBarrierException, InterruptedException { + final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); + clusterService.submitStateUpdateTask("block master service", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + cyclicBarrier.await(); // notify test that we are blocked + cyclicBarrier.await(); // wait to be unblocked by test + return currentState; + } + + @Override + public void onFailure(String source, Exception e) { + throw new AssertionError(source, e); + } + }); + + cyclicBarrier.await(); // wait for master thread to be blocked + + final AtomicBoolean rerouteExecuted = new AtomicBoolean(); + final RoutingService routingService = new RoutingService(clusterService, (s, r) -> { + assertTrue(rerouteExecuted.compareAndSet(false, true)); // only called once + return s; + }); + + routingService.start(); + + final int iterations = between(1, 100); + final CountDownLatch countDownLatch = new CountDownLatch(iterations); + for (int i = 0; i < iterations; i++) { + routingService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown)); + } + + cyclicBarrier.await(); // allow master thread to continue; + countDownLatch.await(); // wait for reroute to complete + assertTrue(rerouteExecuted.get()); // see above for assertion that it's only called once + } + + public void testNotifiesOnFailure() throws InterruptedException { + + final RoutingService routingService = new RoutingService(clusterService, (s, r) -> { + if (rarely()) { + throw new ElasticsearchException("simulated"); + } + return randomBoolean() ? s : ClusterState.builder(s).build(); + }); + routingService.start(); + + final int iterations = between(1, 100); + final CountDownLatch countDownLatch = new CountDownLatch(iterations); + for (int i = 0; i < iterations; i++) { + routingService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown)); + if (rarely()) { + clusterService.getMasterService().setClusterStatePublisher( + randomBoolean() + ? ClusterServiceUtils.createClusterStatePublisher(clusterService.getClusterApplierService()) + : (event, publishListener, ackListener) + -> publishListener.onFailure(new FailedToCommitClusterStateException("simulated"))); + } + + if (rarely()) { + clusterService.getClusterApplierService().onNewClusterState("simulated", () -> { + ClusterState state = clusterService.state(); + return ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()) + .masterNodeId(randomBoolean() ? null : state.nodes().getLocalNodeId())).build(); + }, (source, e) -> { }); + } + } + + assertTrue(countDownLatch.await(10, TimeUnit.SECONDS)); // i.e. it doesn't leak any listeners + } +} diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index b245b0d35d6..5ba5b7a0a70 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.cluster.routing.allocation; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -35,16 +36,17 @@ import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; public class DiskThresholdMonitorTests extends ESAllocationTestCase { - public void testMarkFloodStageIndicesReadOnly() { AllocationService allocation = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10).build()); @@ -61,7 +63,6 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase { .addAsNew(metaData.index("test")) .addAsNew(metaData.index("test_1")) .addAsNew(metaData.index("test_2")) - .build(); ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) .metaData(metaData).routingTable(routingTable).build(); @@ -74,18 +75,21 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase { ClusterState finalState = clusterState; AtomicBoolean reroute = new AtomicBoolean(false); AtomicReference> indices = new AtomicReference<>(); + AtomicLong currentTime = new AtomicLong(); DiskThresholdMonitor monitor = new DiskThresholdMonitor(settings, () -> finalState, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) { + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get) { @Override - protected void reroute() { - assertTrue(reroute.compareAndSet(false, true)); - } - - @Override - protected void markIndicesReadOnly(Set indicesToMarkReadOnly) { + protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener) { assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly)); + listener.onResponse(null); } }; + + monitor.setRerouteAction((reason, listener) -> { + assertTrue(reroute.compareAndSet(false, true)); + listener.onResponse(null); + }); + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4)); builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 30)); @@ -97,6 +101,7 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase { builder = ImmutableOpenMap.builder(); builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4)); builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 5)); + currentTime.addAndGet(randomLongBetween(60001, 120000)); monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); assertTrue(reroute.get()); assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indices.get()); @@ -114,17 +119,17 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase { assertTrue(anotherFinalClusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2")); monitor = new DiskThresholdMonitor(settings, () -> anotherFinalClusterState, - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null) { + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get) { @Override - protected void reroute() { - assertTrue(reroute.compareAndSet(false, true)); - } - - @Override - protected void markIndicesReadOnly(Set indicesToMarkReadOnly) { + protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener) { assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly)); + listener.onResponse(null); } }; + monitor.setRerouteAction((reason, listener) -> { + assertTrue(reroute.compareAndSet(false, true)); + listener.onResponse(null); + }); indices.set(null); reroute.set(false); @@ -133,6 +138,90 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase { builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 5)); monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); assertTrue(reroute.get()); - assertEquals(new HashSet<>(Arrays.asList("test_1")), indices.get()); + assertEquals(Collections.singleton("test_1"), indices.get()); + } + + public void testDoesNotSubmitRerouteTaskTooFrequently() { + final ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build(); + AtomicLong currentTime = new AtomicLong(); + AtomicReference> listenerReference = new AtomicReference<>(); + DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterState, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get) { + @Override + protected void markIndicesReadOnly(Set indicesToMarkReadOnly, ActionListener listener) { + throw new AssertionError("unexpected"); + } + }; + + monitor.setRerouteAction((reason, listener) -> { + assertNotNull(listener); + assertTrue(listenerReference.compareAndSet(null, listener)); + }); + + final ImmutableOpenMap.Builder allDisksOkBuilder; + allDisksOkBuilder = ImmutableOpenMap.builder(); + allDisksOkBuilder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 50)); + allDisksOkBuilder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 50)); + final ImmutableOpenMap allDisksOk = allDisksOkBuilder.build(); + + final ImmutableOpenMap.Builder oneDiskAboveWatermarkBuilder = ImmutableOpenMap.builder(); + oneDiskAboveWatermarkBuilder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(5, 9))); + oneDiskAboveWatermarkBuilder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, 50)); + final ImmutableOpenMap oneDiskAboveWatermark = oneDiskAboveWatermarkBuilder.build(); + + // should not reroute when all disks are ok + currentTime.addAndGet(randomLongBetween(0, 120000)); + monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null)); + assertNull(listenerReference.get()); + + // should reroute when one disk goes over the watermark + currentTime.addAndGet(randomLongBetween(0, 120000)); + monitor.onNewInfo(new ClusterInfo(oneDiskAboveWatermark, null, null, null)); + assertNotNull(listenerReference.get()); + listenerReference.getAndSet(null).onResponse(null); + + if (randomBoolean()) { + // should not re-route again within the reroute interval + currentTime.addAndGet(randomLongBetween(0, + DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis())); + monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null)); + assertNull(listenerReference.get()); + } + + // should reroute again when one disk is still over the watermark + currentTime.addAndGet(randomLongBetween( + DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis() + 1, 120000)); + monitor.onNewInfo(new ClusterInfo(oneDiskAboveWatermark, null, null, null)); + assertNotNull(listenerReference.get()); + final ActionListener rerouteListener1 = listenerReference.getAndSet(null); + + // should not re-route again before reroute has completed + currentTime.addAndGet(randomLongBetween(0, 120000)); + monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null)); + assertNull(listenerReference.get()); + + // complete reroute + rerouteListener1.onResponse(null); + + if (randomBoolean()) { + // should not re-route again within the reroute interval + currentTime.addAndGet(randomLongBetween(0, + DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis())); + monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null)); + assertNull(listenerReference.get()); + } + + // should reroute again after the reroute interval + currentTime.addAndGet(randomLongBetween( + DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis() + 1, 120000)); + monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null)); + assertNotNull(listenerReference.get()); + listenerReference.getAndSet(null).onResponse(null); + + // should not reroute again when it is not required + currentTime.addAndGet(randomLongBetween(0, 120000)); + monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null)); + assertNull(listenerReference.get()); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java index 4580c5b59ed..595c144fa17 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/MockDiskUsagesIT.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster.routing.allocation.decider; -import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; @@ -31,7 +30,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; -import org.elasticsearch.test.junit.annotations.TestLogging; import java.util.ArrayList; import java.util.Collection; @@ -53,7 +51,6 @@ public class MockDiskUsagesIT extends ESIntegTestCase { return Collections.singletonList(MockInternalClusterInfoService.TestPlugin.class); } - @TestLogging("org.elasticsearch.indices.recovery:TRACE,org.elasticsearch.cluster.service:TRACE") public void testRerouteOccursOnDiskPassingHighWatermark() throws Exception { List nodes = internalCluster().startNodes(3); @@ -105,12 +102,6 @@ public class MockDiskUsagesIT extends ESIntegTestCase { assertBusy(() -> { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - logger.info("--> {}", clusterState.routingTable()); - - final RecoveryResponse recoveryResponse = client().admin().indices() - .prepareRecoveries("test").setActiveOnly(true).setDetailed(true).get(); - logger.info("--> recoveries: {}", recoveryResponse); - final Map nodesToShardCount = new HashMap<>(); for (final RoutingNode node : clusterState.getRoutingNodes()) { logger.info("--> node {} has {} shards", diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java index f4e6647cc05..0d16bdc3b05 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java @@ -139,7 +139,7 @@ public class NodeJoinControllerTests extends ESTestCase { } masterService = ClusterServiceUtils.createMasterService(threadPool, initialState); nodeJoinController = new NodeJoinController(Settings.EMPTY, masterService, createAllocationService(Settings.EMPTY), - new ElectMasterService(Settings.EMPTY), s -> {}); + new ElectMasterService(Settings.EMPTY), (s, r) -> {}); } public void testSimpleJoinAccumulation() throws InterruptedException, ExecutionException { diff --git a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java b/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java index 7eea2e24bd8..037d9f4174f 100644 --- a/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java +++ b/server/src/test/java/org/elasticsearch/discovery/zen/ZenDiscoveryUnitTests.java @@ -370,7 +370,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase { new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), masterService, clusterApplier, clusterSettings, hostsResolver -> Collections.emptyList(), ESAllocationTestCase.createAllocationService(), - Collections.emptyList(), mock(GatewayMetaState.class), s -> {}); + Collections.emptyList(), mock(GatewayMetaState.class), (s, r) -> {}); zenDiscovery.start(); return zenDiscovery; } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index e9e9a094c16..95ca2aacf13 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -213,7 +213,7 @@ public class ClusterStateChanges { transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver); nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); - joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, s -> {}); + joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, (s, r) -> {}); } public ClusterState createIndex(ClusterState state, CreateIndexRequest request) { diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 0ebc0402f96..28a706c7fca 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -1065,7 +1065,7 @@ public class SnapshotResiliencyTests extends ESTestCase { transportService, indicesService, actionFilters, indexNameExpressionResolver); final ShardStateAction shardStateAction = new ShardStateAction( clusterService, transportService, allocationService, - new RoutingService(clusterService, allocationService), + new RoutingService(clusterService, allocationService::reroute), threadPool ); final MetaDataMappingService metaDataMappingService = new MetaDataMappingService(clusterService, indicesService); @@ -1248,7 +1248,7 @@ public class SnapshotResiliencyTests extends ESTestCase { hostsResolver -> testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode()) .map(n -> n.node.getAddress()).collect(Collectors.toList()), clusterService.getClusterApplierService(), Collections.emptyList(), random(), - new RoutingService(clusterService, allocationService)::reroute, ElectionStrategy.DEFAULT_INSTANCE); + new RoutingService(clusterService, allocationService::reroute)::reroute, ElectionStrategy.DEFAULT_INSTANCE); masterService.setClusterStatePublisher(coordinator); coordinator.start(); masterService.start(); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index 8bdedaceba7..0c27f84d7f1 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -841,7 +841,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase { final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY); coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(), allocationService, masterService, this::getPersistedState, - Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), s -> {}, + Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), (s, r) -> {}, getElectionStrategy()); masterService.setClusterStatePublisher(coordinator); final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,