From 437c43cd7fe3906e96cfc4eb878bedeb250d5db9 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Wed, 5 Aug 2015 14:59:31 -0600 Subject: [PATCH] Avoid extra reroutes of delayed shards in RoutingService In order to avoid extra reroutes, `RoutingService` should avoid scheduling a reroute of any shards where the delay is negative. To make sure that we don't encounter a race condition between the GatewayAllocator thinking a shard is delayed and RoutingService thinking it is not, the GatewayAllocator will update the RoutingService with the last time it checked in order to use a consistent "view" of the delay. Resolves #12456 Relates to #12515 and #12456 --- .../health/TransportClusterHealthAction.java | 4 +- .../cluster/routing/RoutingService.java | 49 ++++++++++++++----- .../cluster/routing/UnassignedInfo.java | 12 ++--- .../gateway/GatewayAllocator.java | 6 ++- .../gateway/ReplicaShardAllocator.java | 6 ++- .../cluster/allocation/ClusterRerouteIT.java | 37 ++++++++++++++ .../cluster/routing/RoutingServiceTests.java | 43 ++++++++++++++++ .../cluster/routing/UnassignedInfoTests.java | 18 ++++--- 8 files changed, 146 insertions(+), 29 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java index 06d8d6361b6..2d997690214 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/health/TransportClusterHealthAction.java @@ -272,13 +272,13 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction< } catch (IndexNotFoundException e) { // one of the specified indices is not there - treat it as RED. ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), Strings.EMPTY_ARRAY, clusterState, - numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(settings, clusterState), + numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(), settings, clusterState), pendingTaskTimeInQueue); response.status = ClusterHealthStatus.RED; return response; } return new ClusterHealthResponse(clusterName.value(), concreteIndices, clusterState, numberOfPendingTasks, - numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(settings, clusterState), pendingTaskTimeInQueue); + numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(), settings, clusterState), pendingTaskTimeInQueue); } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index bfc1d93d940..a3aa9b2ed07 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -57,6 +57,7 @@ public class RoutingService extends AbstractLifecycleComponent i private AtomicBoolean rerouting = new AtomicBoolean(); private volatile long registeredNextDelaySetting = Long.MAX_VALUE; private volatile ScheduledFuture registeredNextDelayFuture; + private volatile long unassignedShardsAllocatedTimestamp = 0; @Inject public RoutingService(Settings settings, ThreadPool threadPool, ClusterService clusterService, AllocationService allocationService) { @@ -87,6 +88,19 @@ public class RoutingService extends AbstractLifecycleComponent i return this.allocationService; } + /** + * Update the last time the allocator tried to assign unassigned shards + * + * This is used so that both the GatewayAllocator and RoutingService use a + * consistent timestamp for comparing which shards have been delayed to + * avoid a race condition where GatewayAllocator thinks the shard should + * be delayed and the RoutingService thinks it has already passed the delay + * and that the GatewayAllocator has/will handle it. + */ + public void setUnassignedShardsAllocatedTimestamp(long timeInMillis) { + this.unassignedShardsAllocatedTimestamp = timeInMillis; + } + /** * Initiates a reroute. */ @@ -108,20 +122,29 @@ public class RoutingService extends AbstractLifecycleComponent i if (nextDelaySetting > 0 && nextDelaySetting < registeredNextDelaySetting) { FutureUtils.cancel(registeredNextDelayFuture); registeredNextDelaySetting = nextDelaySetting; - TimeValue nextDelay = TimeValue.timeValueMillis(UnassignedInfo.findNextDelayedAllocationIn(settings, event.state())); - logger.info("delaying allocation for [{}] unassigned shards, next check in [{}]", UnassignedInfo.getNumberOfDelayedUnassigned(settings, event.state()), nextDelay); - registeredNextDelayFuture = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() { - @Override - protected void doRun() throws Exception { - registeredNextDelaySetting = Long.MAX_VALUE; - reroute("assign delayed unassigned shards"); - } + // We use System.currentTimeMillis here because we want the + // next delay from the "now" perspective, rather than the + // delay from the last time the GatewayAllocator tried to + // assign/delay the shard + TimeValue nextDelay = TimeValue.timeValueMillis(UnassignedInfo.findNextDelayedAllocationIn(System.currentTimeMillis(), settings, event.state())); + int unassignedDelayedShards = UnassignedInfo.getNumberOfDelayedUnassigned(unassignedShardsAllocatedTimestamp, settings, event.state()); + if (unassignedDelayedShards > 0) { + logger.info("delaying allocation for [{}] unassigned shards, next check in [{}]", + unassignedDelayedShards, nextDelay); + registeredNextDelayFuture = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + registeredNextDelaySetting = Long.MAX_VALUE; + reroute("assign delayed unassigned shards"); + } - @Override - public void onFailure(Throwable t) { - logger.warn("failed to schedule/execute reroute post unassigned shard", t); - } - }); + @Override + public void onFailure(Throwable t) { + logger.warn("failed to schedule/execute reroute post unassigned shard", t); + registeredNextDelaySetting = Long.MAX_VALUE; + } + }); + } } else { logger.trace("no need to schedule reroute due to delayed unassigned, next_delay_setting [{}], registered [{}]", nextDelaySetting, registeredNextDelaySetting); } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index a09b0349365..1c602491f26 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -199,12 +199,12 @@ public class UnassignedInfo implements ToXContent, Writeable { /** * The time in millisecond until this unassigned shard can be reassigned. */ - public long getDelayAllocationExpirationIn(Settings settings, Settings indexSettings) { + public long getDelayAllocationExpirationIn(long unassignedShardsAllocatedTimestamp, Settings settings, Settings indexSettings) { long delayTimeout = getAllocationDelayTimeoutSetting(settings, indexSettings); if (delayTimeout == 0) { return 0; } - long delta = System.currentTimeMillis() - timestamp; + long delta = unassignedShardsAllocatedTimestamp - timestamp; // account for time drift, treat it as no timeout if (delta < 0) { return 0; @@ -216,12 +216,12 @@ public class UnassignedInfo implements ToXContent, Writeable { /** * Returns the number of shards that are unassigned and currently being delayed. */ - public static int getNumberOfDelayedUnassigned(Settings settings, ClusterState state) { + public static int getNumberOfDelayedUnassigned(long unassignedShardsAllocatedTimestamp, Settings settings, ClusterState state) { int count = 0; for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) { if (shard.primary() == false) { IndexMetaData indexMetaData = state.metaData().index(shard.getIndex()); - long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(settings, indexMetaData.getSettings()); + long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(unassignedShardsAllocatedTimestamp, settings, indexMetaData.getSettings()); if (delay > 0) { count++; } @@ -251,12 +251,12 @@ public class UnassignedInfo implements ToXContent, Writeable { /** * Finds the next (closest) delay expiration of an unassigned shard. Returns 0 if there are none. */ - public static long findNextDelayedAllocationIn(Settings settings, ClusterState state) { + public static long findNextDelayedAllocationIn(long unassignedShardsAllocatedTimestamp, Settings settings, ClusterState state) { long nextDelay = Long.MAX_VALUE; for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) { if (shard.primary() == false) { IndexMetaData indexMetaData = state.metaData().index(shard.getIndex()); - long nextShardDelay = shard.unassignedInfo().getDelayAllocationExpirationIn(settings, indexMetaData.getSettings()); + long nextShardDelay = shard.unassignedInfo().getDelayAllocationExpirationIn(unassignedShardsAllocatedTimestamp, settings, indexMetaData.getSettings()); if (nextShardDelay > 0 && nextShardDelay < nextDelay) { nextDelay = nextShardDelay; } diff --git a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java index 7b6da4dae27..449bd67e26c 100644 --- a/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java @@ -113,6 +113,10 @@ public class GatewayAllocator extends AbstractComponent { } public boolean allocateUnassigned(final RoutingAllocation allocation) { + // Take a snapshot of the current time and tell the RoutingService + // about it, so it will use a consistent timestamp for delays + long lastAllocateUnassignedRun = System.currentTimeMillis(); + this.routingService.setUnassignedShardsAllocatedTimestamp(lastAllocateUnassignedRun); boolean changed = false; RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned(); @@ -127,7 +131,7 @@ public class GatewayAllocator extends AbstractComponent { changed |= primaryShardAllocator.allocateUnassigned(allocation); changed |= replicaShardAllocator.processExistingRecoveries(allocation); - changed |= replicaShardAllocator.allocateUnassigned(allocation); + changed |= replicaShardAllocator.allocateUnassigned(allocation, lastAllocateUnassignedRun); return changed; } diff --git a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index 6ab016be247..9af9a4e8d5f 100644 --- a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -111,6 +111,10 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { } public boolean allocateUnassigned(RoutingAllocation allocation) { + return allocateUnassigned(allocation, System.currentTimeMillis()); + } + + public boolean allocateUnassigned(RoutingAllocation allocation, long allocateUnassignedTimestapm) { boolean changed = false; final RoutingNodes routingNodes = allocation.routingNodes(); final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator(); @@ -174,7 +178,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { // will anyhow wait to find an existing copy of the shard to be allocated // note: the other side of the equation is scheduling a reroute in a timely manner, which happens in the RoutingService IndexMetaData indexMetaData = allocation.metaData().index(shard.getIndex()); - long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(settings, indexMetaData.getSettings()); + long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(allocateUnassignedTimestapm, settings, indexMetaData.getSettings()); if (delay > 0) { logger.debug("[{}][{}]: delaying allocation of [{}] for [{}]", shard.index(), shard.id(), shard, TimeValue.timeValueMillis(delay)); /** diff --git a/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java b/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java index c9643d8601b..d6e780dd183 100644 --- a/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/allocation/ClusterRerouteIT.java @@ -34,15 +34,18 @@ import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationComman import org.elasticsearch.cluster.routing.allocation.decider.Decision; import org.elasticsearch.cluster.routing.allocation.decider.DisableAllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.Allocation; +import org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.elasticsearch.common.Priority; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; +import org.elasticsearch.test.InternalTestCluster; import org.junit.Test; import java.nio.file.Path; @@ -160,6 +163,40 @@ public class ClusterRerouteIT extends ESIntegTestCase { rerouteWithAllocateLocalGateway(commonSettings); } + @Test + public void testDelayWithALargeAmountOfShards() throws Exception { + Settings commonSettings = settingsBuilder() + .put("gateway.type", "local") + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CONCURRENT_RECOVERIES, 1) + .build(); + logger.info("--> starting 4 nodes"); + String node_1 = internalCluster().startNode(commonSettings); + internalCluster().startNode(commonSettings); + internalCluster().startNode(commonSettings); + internalCluster().startNode(commonSettings); + + assertThat(cluster().size(), equalTo(4)); + ClusterHealthResponse healthResponse = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet(); + assertThat(healthResponse.isTimedOut(), equalTo(false)); + + logger.info("--> create indices"); + for (int i = 0; i < 25; i++) { + client().admin().indices().prepareCreate("test" + i) + .setSettings(settingsBuilder() + .put("index.number_of_shards", 5).put("index.number_of_replicas", 1) + .put("index.unassigned.node_left.delayed_timeout", randomIntBetween(250, 1000) + "ms")) + .execute().actionGet(); + } + + ensureGreen(TimeValue.timeValueMinutes(1)); + + logger.info("--> stopping node1"); + internalCluster().stopRandomNode(InternalTestCluster.nameFilter(node_1)); + + // This might run slowly on older hardware + ensureGreen(TimeValue.timeValueMinutes(2)); + } + private void rerouteWithAllocateLocalGateway(Settings commonSettings) throws Exception { logger.info("--> starting 2 nodes"); String node_1 = internalCluster().startNode(commonSettings); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java index 94c9f19c4ae..7f10a75e902 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java @@ -28,6 +28,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ESAllocationTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; @@ -112,6 +113,10 @@ public class RoutingServiceTests extends ESAllocationTestCase { ClusterState prevState = clusterState; clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build(); clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build(); + // We need to update the routing service's last attempted run to + // signal that the GatewayAllocator tried to allocated it but + // it was delayed + routingService.setUnassignedShardsAllocatedTimestamp(System.currentTimeMillis()); ClusterState newState = clusterState; routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState)); @@ -125,6 +130,44 @@ public class RoutingServiceTests extends ESAllocationTestCase { assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(Long.MAX_VALUE)); } + @Test + public void testDelayedUnassignedDoesNotRerouteForNegativeDelays() throws Exception { + AllocationService allocation = createAllocationService(); + MetaData metaData = MetaData.builder() + .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms")) + .numberOfShards(1).numberOfReplicas(1)) + .build(); + ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) + .metaData(metaData) + .routingTable(RoutingTable.builder().addAsNew(metaData.index("test"))).build(); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).localNodeId("node1").masterNodeId("node1")).build(); + clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build(); + // starting primaries + clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build(); + // starting replicas + clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build(); + assertThat(clusterState.routingNodes().hasUnassigned(), equalTo(false)); + // remove node2 and reroute + ClusterState prevState = clusterState; + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build(); + clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build(); + // Set it in the future so the delay will be negative + routingService.setUnassignedShardsAllocatedTimestamp(System.currentTimeMillis() + TimeValue.timeValueMinutes(1).millis()); + + ClusterState newState = clusterState; + + routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState)); + assertBusy(new Runnable() { + @Override + public void run() { + assertThat(routingService.hasReroutedAndClear(), equalTo(false)); + + // verify the registration has been updated + assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(100L)); + } + }); + } + private class TestRoutingService extends RoutingService { private AtomicBoolean rerouted = new AtomicBoolean(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index 21c86c82989..c24da2df500 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -273,7 +273,8 @@ public class UnassignedInfoTests extends ESAllocationTestCase { assertBusy(new Runnable() { @Override public void run() { - long delay = unassignedInfo.getDelayAllocationExpirationIn(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), Settings.EMPTY); + long delay = unassignedInfo.getDelayAllocationExpirationIn(System.currentTimeMillis(), + Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), Settings.EMPTY); assertThat(delay, greaterThan(0l)); assertThat(delay, lessThan(TimeValue.timeValueHours(10).millis())); } @@ -290,7 +291,8 @@ public class UnassignedInfoTests extends ESAllocationTestCase { UnassignedInfo unassignedInfo = new UnassignedInfo(RandomPicks.randomFrom(getRandom(), reasons), null); long delay = unassignedInfo.getAllocationDelayTimeoutSetting(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), Settings.EMPTY); assertThat(delay, equalTo(0l)); - delay = unassignedInfo.getDelayAllocationExpirationIn(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), Settings.EMPTY); + delay = unassignedInfo.getDelayAllocationExpirationIn(System.currentTimeMillis(), + Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), Settings.EMPTY); assertThat(delay, equalTo(0l)); } @@ -306,7 +308,8 @@ public class UnassignedInfoTests extends ESAllocationTestCase { .routingTable(RoutingTable.builder().addAsNew(metaData.index("test1")).addAsNew(metaData.index("test2"))).build(); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build(); clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build(); - assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState), equalTo(0)); + assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(), + Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState), equalTo(0)); // starting primaries clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build(); // starting replicas @@ -315,7 +318,8 @@ public class UnassignedInfoTests extends ESAllocationTestCase { // remove node2 and reroute clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build(); clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build(); - assertThat(clusterState.prettyPrint(), UnassignedInfo.getNumberOfDelayedUnassigned(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState), equalTo(2)); + assertThat(clusterState.prettyPrint(), UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(), + Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState), equalTo(2)); } @Test @@ -330,7 +334,8 @@ public class UnassignedInfoTests extends ESAllocationTestCase { .routingTable(RoutingTable.builder().addAsNew(metaData.index("test1")).addAsNew(metaData.index("test2"))).build(); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build(); clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build(); - assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState), equalTo(0)); + assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(), + Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState), equalTo(0)); // starting primaries clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING))).build(); // starting replicas @@ -343,7 +348,8 @@ public class UnassignedInfoTests extends ESAllocationTestCase { long nextDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSetting(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState); assertThat(nextDelaySetting, equalTo(TimeValue.timeValueHours(10).millis())); - long nextDelay = UnassignedInfo.findNextDelayedAllocationIn(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState); + long nextDelay = UnassignedInfo.findNextDelayedAllocationIn(System.currentTimeMillis(), + Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState); assertThat(nextDelay, greaterThan(TimeValue.timeValueHours(9).millis())); assertThat(nextDelay, lessThanOrEqualTo(TimeValue.timeValueHours(10).millis())); }