diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java index 0f04c750d38..6f43e880e3f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java @@ -55,7 +55,7 @@ public class RoutingService extends AbstractLifecycleComponent i private final AllocationService allocationService; private AtomicBoolean rerouting = new AtomicBoolean(); - private volatile long minDelaySettingAtLastScheduling = Long.MAX_VALUE; + private volatile long minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE; private volatile ScheduledFuture registeredNextDelayFuture; @Inject @@ -100,14 +100,14 @@ public class RoutingService extends AbstractLifecycleComponent i // Figure out if an existing scheduled reroute is good enough or whether we need to cancel and reschedule. // If the minimum of the currently relevant delay settings is larger than something we scheduled in the past, // we are guaranteed that the planned schedule will happen before any of the current shard delays are expired. - long minDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSetting(settings, event.state()); + long minDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSettingNanos(settings, event.state()); if (minDelaySetting <= 0) { - logger.trace("no need to schedule reroute - no delayed unassigned shards, minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastScheduling); - minDelaySettingAtLastScheduling = Long.MAX_VALUE; + logger.trace("no need to schedule reroute - no delayed unassigned shards, minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastSchedulingNanos); + minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE; FutureUtils.cancel(registeredNextDelayFuture); - } else if (minDelaySetting < minDelaySettingAtLastScheduling) { + } else if (minDelaySetting < minDelaySettingAtLastSchedulingNanos) { FutureUtils.cancel(registeredNextDelayFuture); - minDelaySettingAtLastScheduling = minDelaySetting; + minDelaySettingAtLastSchedulingNanos = minDelaySetting; TimeValue nextDelay = TimeValue.timeValueNanos(UnassignedInfo.findNextDelayedAllocationIn(event.state())); assert nextDelay.nanos() > 0 : "next delay must be non 0 as minDelaySetting is [" + minDelaySetting + "]"; logger.info("delaying allocation for [{}] unassigned shards, next check in [{}]", @@ -115,25 +115,25 @@ public class RoutingService extends AbstractLifecycleComponent i registeredNextDelayFuture = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() { @Override protected void doRun() throws Exception { - minDelaySettingAtLastScheduling = Long.MAX_VALUE; + minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE; reroute("assign delayed unassigned shards"); } @Override public void onFailure(Throwable t) { logger.warn("failed to schedule/execute reroute post unassigned shard", t); - minDelaySettingAtLastScheduling = Long.MAX_VALUE; + minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE; } }); } else { - logger.trace("no need to schedule reroute - current schedule reroute is enough. minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastScheduling); + logger.trace("no need to schedule reroute - current schedule reroute is enough. minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastSchedulingNanos); } } } // visible for testing - long getMinDelaySettingAtLastScheduling() { - return this.minDelaySettingAtLastScheduling; + long getMinDelaySettingAtLastSchedulingNanos() { + return this.minDelaySettingAtLastSchedulingNanos; } // visible for testing diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java index bfe3c316cda..23733b96f0c 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java @@ -34,7 +34,6 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import java.io.IOException; -import java.util.concurrent.TimeUnit; /** * Holds additional information as to why the shard is in unassigned state. @@ -110,18 +109,27 @@ public class UnassignedInfo implements ToXContent, Writeable { private final String message; private final Throwable failure; + /** + * creates an UnassingedInfo object based **current** time + * + * @param reason the cause for making this shard unassigned. See {@link Reason} for more information. + * @param message more information about cause. + **/ public UnassignedInfo(Reason reason, String message) { - this(reason, System.currentTimeMillis(), System.nanoTime(), message, null); + this(reason, message, null, System.nanoTime(), System.currentTimeMillis()); } - public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure) { - this(reason, System.currentTimeMillis(), System.nanoTime(), message, failure); - } - - private UnassignedInfo(Reason reason, long unassignedTimeMillis, long timestampNanos, String message, Throwable failure) { + /** + * @param reason the cause for making this shard unassigned. See {@link Reason} for more information. + * @param message more information about cause. + * @param failure the shard level failure that caused this shard to be unassigned, if exists. + * @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation + * @param unassignedTimeMillis the time of unassignment used to display to in our reporting. + */ + public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure, long unassignedTimeNanos, long unassignedTimeMillis) { this.reason = reason; this.unassignedTimeMillis = unassignedTimeMillis; - this.unassignedTimeNanos = timestampNanos; + this.unassignedTimeNanos = unassignedTimeNanos; this.message = message; this.failure = failure; assert !(message == null && failure != null) : "provide a message if a failure exception is provided"; @@ -201,14 +209,14 @@ public class UnassignedInfo implements ToXContent, Writeable { } /** - * The allocation delay value in milliseconds associated with the index (defaulting to node settings if not set). + * The allocation delay value in nano seconds associated with the index (defaulting to node settings if not set). */ - public long getAllocationDelayTimeoutSetting(Settings settings, Settings indexSettings) { + public long getAllocationDelayTimeoutSettingNanos(Settings settings, Settings indexSettings) { if (reason != Reason.NODE_LEFT) { return 0; } TimeValue delayTimeout = indexSettings.getAsTime(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, settings.getAsTime(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, DEFAULT_DELAYED_NODE_LEFT_TIMEOUT)); - return Math.max(0l, delayTimeout.millis()); + return Math.max(0l, delayTimeout.nanos()); } /** @@ -221,18 +229,17 @@ public class UnassignedInfo implements ToXContent, Writeable { /** * Updates delay left based on current time (in nanoseconds) and index/node settings. - * Should only be called from ReplicaShardAllocator. + * * @return updated delay in nanoseconds */ public long updateDelay(long nanoTimeNow, Settings settings, Settings indexSettings) { - long delayTimeoutMillis = getAllocationDelayTimeoutSetting(settings, indexSettings); + long delayTimeoutNanos = getAllocationDelayTimeoutSettingNanos(settings, indexSettings); final long newComputedLeftDelayNanos; - if (delayTimeoutMillis == 0l) { + if (delayTimeoutNanos == 0l) { newComputedLeftDelayNanos = 0l; } else { assert nanoTimeNow >= unassignedTimeNanos; - long delayTimeoutNanos = TimeUnit.NANOSECONDS.convert(delayTimeoutMillis, TimeUnit.MILLISECONDS); - newComputedLeftDelayNanos = Math.max(0l, delayTimeoutNanos - (nanoTimeNow - unassignedTimeNanos)); + newComputedLeftDelayNanos = Math.max(0L, delayTimeoutNanos - (nanoTimeNow - unassignedTimeNanos)); } lastComputedLeftDelayNanos = newComputedLeftDelayNanos; return newComputedLeftDelayNanos; @@ -255,21 +262,21 @@ public class UnassignedInfo implements ToXContent, Writeable { } /** - * Finds the smallest delay expiration setting in milliseconds of all unassigned shards that are still delayed. Returns 0 if there are none. + * Finds the smallest delay expiration setting in nanos of all unassigned shards that are still delayed. Returns 0 if there are none. */ - public static long findSmallestDelayedAllocationSetting(Settings settings, ClusterState state) { - long nextDelaySetting = Long.MAX_VALUE; + public static long findSmallestDelayedAllocationSettingNanos(Settings settings, ClusterState state) { + long minDelaySetting = Long.MAX_VALUE; for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) { if (shard.primary() == false) { IndexMetaData indexMetaData = state.metaData().index(shard.getIndex()); - long leftDelayNanos = shard.unassignedInfo().getLastComputedLeftDelayNanos(); - long delayTimeoutSetting = shard.unassignedInfo().getAllocationDelayTimeoutSetting(settings, indexMetaData.getSettings()); - if (leftDelayNanos > 0 && delayTimeoutSetting > 0 && delayTimeoutSetting < nextDelaySetting) { - nextDelaySetting = delayTimeoutSetting; + boolean delayed = shard.unassignedInfo().getLastComputedLeftDelayNanos() > 0; + long delayTimeoutSetting = shard.unassignedInfo().getAllocationDelayTimeoutSettingNanos(settings, indexMetaData.getSettings()); + if (delayed && delayTimeoutSetting > 0 && delayTimeoutSetting < minDelaySetting) { + minDelaySetting = delayTimeoutSetting; } } } - return nextDelaySetting == Long.MAX_VALUE ? 0l : nextDelaySetting; + return minDelaySetting == Long.MAX_VALUE ? 0l : minDelaySetting; } @@ -320,14 +327,24 @@ public class UnassignedInfo implements ToXContent, Writeable { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } UnassignedInfo that = (UnassignedInfo) o; - if (unassignedTimeMillis != that.unassignedTimeMillis) return false; - if (reason != that.reason) return false; - if (message != null ? !message.equals(that.message) : that.message != null) return false; + if (unassignedTimeMillis != that.unassignedTimeMillis) { + return false; + } + if (reason != that.reason) { + return false; + } + if (message != null ? !message.equals(that.message) : that.message != null) { + return false; + } return !(failure != null ? !failure.equals(that.failure) : that.failure != null); } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java index b8f02f24578..f819d6fde0a 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java @@ -22,16 +22,12 @@ package org.elasticsearch.cluster.routing.allocation; import com.carrotsearch.hppc.cursors.ObjectCursor; import org.elasticsearch.cluster.ClusterInfoService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.health.ClusterStateHealth; import org.elasticsearch.cluster.health.ClusterHealthStatus; +import org.elasticsearch.cluster.health.ClusterStateHealth; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.RoutingNodes; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; @@ -43,9 +39,7 @@ import org.elasticsearch.common.settings.Settings; import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Locale; import java.util.function.Function; -import java.util.function.Supplier; import java.util.stream.Collectors; @@ -119,7 +113,8 @@ public class AllocationService extends AbstractComponent { FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), failedShards, clusterInfoService.getClusterInfo()); boolean changed = false; for (FailedRerouteAllocation.FailedShard failedShard : failedShards) { - changed |= applyFailedShard(allocation, failedShard.shard, true, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShard.message, failedShard.failure)); + changed |= applyFailedShard(allocation, failedShard.shard, true, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShard.message, failedShard.failure, + System.nanoTime(), System.currentTimeMillis())); } if (!changed) { return new RoutingAllocation.Result(false, clusterState.routingTable()); @@ -163,7 +158,7 @@ public class AllocationService extends AbstractComponent { // we don't shuffle the unassigned shards here, to try and get as close as possible to // a consistent result of the effect the commands have on the routing // this allows systems to dry run the commands, see the resulting cluster state, and act on it - RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo()); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo(), currentNanoTime()); // don't short circuit deciders, we want a full explanation allocation.debugDecision(true); // we ignore disable allocation, because commands are explicit @@ -202,7 +197,7 @@ public class AllocationService extends AbstractComponent { RoutingNodes routingNodes = getMutableRoutingNodes(clusterState); // shuffle the unassigned nodes, just so we won't have things like poison failed shards routingNodes.unassigned().shuffle(); - RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo()); + RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo(), currentNanoTime()); allocation.debugDecision(debug); if (!reroute(allocation)) { return new RoutingAllocation.Result(false, clusterState.routingTable()); @@ -239,6 +234,8 @@ public class AllocationService extends AbstractComponent { // now allocate all the unassigned to available nodes if (allocation.routingNodes().unassigned().size() > 0) { + updateLeftDelayOfUnassignedShards(allocation, settings); + changed |= shardsAllocators.allocateUnassigned(allocation); } @@ -251,6 +248,15 @@ public class AllocationService extends AbstractComponent { return changed; } + // public for testing + public static void updateLeftDelayOfUnassignedShards(RoutingAllocation allocation, Settings settings) { + for (ShardRouting shardRouting : allocation.routingNodes().unassigned()) { + final MetaData metaData = allocation.metaData(); + final IndexMetaData indexMetaData = metaData.index(shardRouting.index()); + shardRouting.unassignedInfo().updateDelay(allocation.getCurrentNanoTime(), settings, indexMetaData.getSettings()); + } + } + private boolean moveShards(RoutingAllocation allocation) { boolean changed = false; @@ -312,7 +318,9 @@ public class AllocationService extends AbstractComponent { } } for (ShardRouting shardToFail : shardsToFail) { - changed |= applyFailedShard(allocation, shardToFail, false, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing")); + changed |= applyFailedShard(allocation, shardToFail, false, + new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing", + null, allocation.getCurrentNanoTime(), System.currentTimeMillis())); } // now, go over and elect a new primary if possible, not, from this code block on, if one is elected, @@ -372,8 +380,9 @@ public class AllocationService extends AbstractComponent { } changed = true; // now, go over all the shards routing on the node, and fail them - UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]"); for (ShardRouting shardRouting : node.copyShards()) { + UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]", null, + allocation.getCurrentNanoTime(), System.currentTimeMillis()); applyFailedShard(allocation, shardRouting, false, unassignedInfo); } // its a dead node, remove it, note, its important to remove it *after* we apply failed shard @@ -531,4 +540,9 @@ public class AllocationService extends AbstractComponent { RoutingNodes routingNodes = new RoutingNodes(clusterState, false); // this is a costly operation - only call this once! return routingNodes; } + + /** ovrride this to control time based decisions during allocation */ + protected long currentNanoTime() { + return System.nanoTime(); + } } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java index 24e38279f4d..835556a265b 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java @@ -58,7 +58,7 @@ public class FailedRerouteAllocation extends RoutingAllocation { private final List failedShards; public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, List failedShards, ClusterInfo clusterInfo) { - super(deciders, routingNodes, nodes, clusterInfo); + super(deciders, routingNodes, nodes, clusterInfo, System.nanoTime()); this.failedShards = failedShards; } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java index 140c4ad6692..a8a546d946e 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java @@ -120,19 +120,27 @@ public class RoutingAllocation { private boolean hasPendingAsyncFetch = false; + private final long currentNanoTime; + /** * Creates a new {@link RoutingAllocation} - * - * @param deciders {@link AllocationDeciders} to used to make decisions for routing allocations - * @param routingNodes Routing nodes in the current cluster + * @param deciders {@link AllocationDeciders} to used to make decisions for routing allocations + * @param routingNodes Routing nodes in the current cluster * @param nodes TODO: Documentation + * @param currentNanoTime the nano time to use for all delay allocation calculation (typically {@link System#nanoTime()}) */ - public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, ClusterInfo clusterInfo) { + public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, ClusterInfo clusterInfo, long currentNanoTime) { this.deciders = deciders; this.routingNodes = routingNodes; this.nodes = nodes; this.clusterInfo = clusterInfo; + this.currentNanoTime = currentNanoTime; + } + + /** returns the nano time captured at the beginning of the allocation. used to make sure all time based decisions are aligned */ + public long getCurrentNanoTime() { + return currentNanoTime; } /** diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java index da69419d948..00f3944ae03 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java @@ -36,7 +36,7 @@ public class StartedRerouteAllocation extends RoutingAllocation { private final List startedShards; public StartedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, List startedShards, ClusterInfo clusterInfo) { - super(deciders, routingNodes, nodes, clusterInfo); + super(deciders, routingNodes, nodes, clusterInfo, System.nanoTime()); this.startedShards = startedShards; } diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java index 003988f7bd5..a9ce43c5f76 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/ShardsAllocators.java @@ -19,8 +19,8 @@ package org.elasticsearch.cluster.routing.allocation.allocator; -import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; @@ -74,6 +74,10 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat return changed; } + protected long nanoTime() { + return System.nanoTime(); + } + @Override public boolean rebalance(RoutingAllocation allocation) { if (allocation.hasPendingAsyncFetch() == false) { diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java index b210557b687..5646d308dda 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateAllocationCommand.java @@ -229,7 +229,8 @@ public class AllocateAllocationCommand implements AllocationCommand { // it was index creation if (unassigned.primary() && unassigned.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED) { unassigned.updateUnassignedInfo(new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, - "force allocation from previous reason " + unassigned.unassignedInfo().getReason() + ", " + unassigned.unassignedInfo().getMessage(), unassigned.unassignedInfo().getFailure())); + "force allocation from previous reason " + unassigned.unassignedInfo().getReason() + ", " + unassigned.unassignedInfo().getMessage(), + unassigned.unassignedInfo().getFailure(), System.nanoTime(), System.currentTimeMillis())); } it.initialize(routingNode.nodeId(), unassigned.version(), allocation.clusterInfo().getShardSize(unassigned, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)); break; diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java index d9ad8c4f871..7554fa4c46f 100644 --- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java +++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java @@ -21,7 +21,10 @@ package org.elasticsearch.cluster.routing.allocation.command; import org.elasticsearch.ElasticsearchParseException; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.cluster.routing.allocation.RerouteExplanation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.Decision; @@ -34,7 +37,6 @@ import org.elasticsearch.index.shard.ShardId; import java.io.IOException; -import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; /** diff --git a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java index a830aa6198e..c87f4d94755 100644 --- a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java +++ b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java @@ -24,7 +24,6 @@ import com.carrotsearch.hppc.ObjectLongMap; import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectLongCursor; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; @@ -101,7 +100,8 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { // we found a better match that has a full sync id match, the existing allocation is not fully synced // so we found a better one, cancel this one it.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA, - "existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node [" + nodeWithHighestMatch + "]")); + "existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node [" + nodeWithHighestMatch + "]", + null, allocation.getCurrentNanoTime(), System.currentTimeMillis())); changed = true; } } @@ -111,7 +111,6 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { } public boolean allocateUnassigned(RoutingAllocation allocation) { - long nanoTimeNow = System.nanoTime(); boolean changed = false; final RoutingNodes routingNodes = allocation.routingNodes(); final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator(); @@ -171,7 +170,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { } } else if (matchingNodes.hasAnyData() == false) { // if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation of the replica shard needs to be delayed - changed |= ignoreUnassignedIfDelayed(nanoTimeNow, allocation, unassignedIterator, shard); + changed |= ignoreUnassignedIfDelayed(unassignedIterator, shard); } } return changed; @@ -185,16 +184,13 @@ public abstract class ReplicaShardAllocator extends AbstractComponent { * * PUBLIC FOR TESTS! * - * @param timeNowNanos Timestamp in nanoseconds representing "now" - * @param allocation the routing allocation * @param unassignedIterator iterator over unassigned shards * @param shard the shard which might be delayed * @return true iff allocation is delayed for this shard */ - public boolean ignoreUnassignedIfDelayed(long timeNowNanos, RoutingAllocation allocation, RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator, ShardRouting shard) { - IndexMetaData indexMetaData = allocation.metaData().index(shard.getIndex()); + public boolean ignoreUnassignedIfDelayed(RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator, ShardRouting shard) { // calculate delay and store it in UnassignedInfo to be used by RoutingService - long delay = shard.unassignedInfo().updateDelay(timeNowNanos, settings, indexMetaData.getSettings()); + long delay = shard.unassignedInfo().getLastComputedLeftDelayNanos(); if (delay > 0) { logger.debug("[{}][{}]: delaying allocation of [{}] for [{}]", shard.index(), shard.id(), shard, TimeValue.timeValueNanos(delay)); /** diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java b/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java index 913d52d5b17..406e476b4e0 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java @@ -27,12 +27,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.*; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.DummyTransportAddress; import org.elasticsearch.index.shard.ShardId; @@ -40,10 +35,7 @@ import org.elasticsearch.index.shard.ShardId; import java.util.HashSet; import java.util.Set; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; -import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; +import static org.elasticsearch.cluster.metadata.IndexMetaData.*; import static org.elasticsearch.test.ESTestCase.randomFrom; import static org.elasticsearch.test.ESTestCase.randomIntBetween; diff --git a/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java b/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java index c2e646dde19..8d4540aad3b 100644 --- a/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/ClusterStateDiffIT.java @@ -20,25 +20,13 @@ package org.elasticsearch.cluster; import com.carrotsearch.hppc.cursors.ObjectCursor; - import org.elasticsearch.Version; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; -import org.elasticsearch.cluster.metadata.AliasMetaData; -import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; -import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.metadata.RepositoriesMetaData; -import org.elasticsearch.cluster.metadata.SnapshotId; +import org.elasticsearch.cluster.metadata.*; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.*; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -50,7 +38,6 @@ import org.elasticsearch.discovery.DiscoverySettings; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.shard.ShardId; -import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.warmer.IndexWarmersMetaData; import org.elasticsearch.test.ESIntegTestCase; diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java index 8d6953ed929..88ad66c4a91 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/AllocationIdTests.java @@ -21,10 +21,7 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.test.ESTestCase; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.*; /** */ diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java index f34f46a7ad4..c236ea54878 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster.routing; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -37,7 +36,6 @@ import static org.hamcrest.Matchers.equalTo; /** */ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) -@LuceneTestCase.AwaitsFix(bugUrl = "http://build-us-00.elastic.co/job/es_core_master_windows-2012-r2/2074/testReport/ (boaz on it)") public class DelayedAllocationIT extends ESIntegTestCase { /** diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java b/core/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java index b451183826b..47ae3e68580 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/RandomShardRoutingMutator.java @@ -19,9 +19,7 @@ package org.elasticsearch.cluster.routing; -import static org.elasticsearch.test.ESTestCase.randomAsciiOfLength; -import static org.elasticsearch.test.ESTestCase.randomFrom; -import static org.elasticsearch.test.ESTestCase.randomInt; +import static org.elasticsearch.test.ESTestCase.*; /** * Utility class the makes random modifications to ShardRouting diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java index b0e597614d8..1711b0c33a8 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java @@ -88,15 +88,14 @@ public class RoutingServiceTests extends ESAllocationTestCase { clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build(); ClusterState newState = clusterState; - assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(Long.MAX_VALUE)); + assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(Long.MAX_VALUE)); routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState)); - assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(Long.MAX_VALUE)); + assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(Long.MAX_VALUE)); assertThat(routingService.hasReroutedAndClear(), equalTo(false)); } public void testDelayedUnassignedScheduleReroute() throws Exception { - DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator(); - AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator); + MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator()); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms")) .numberOfShards(1).numberOfReplicas(1)) @@ -126,7 +125,6 @@ public class RoutingServiceTests extends ESAllocationTestCase { ClusterState prevState = clusterState; clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(nodeId)).build(); // make sure the replica is marked as delayed (i.e. not reallocated) - mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + TimeValue.timeValueMillis(randomIntBetween(0, 99)).nanos()); clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build(); assertEquals(1, clusterState.getRoutingNodes().unassigned().size()); @@ -134,7 +132,7 @@ public class RoutingServiceTests extends ESAllocationTestCase { routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState)); assertBusy(() -> assertTrue("routing service should have run a reroute", routingService.hasReroutedAndClear())); // verify the registration has been reset - assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(Long.MAX_VALUE)); + assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(Long.MAX_VALUE)); } /** @@ -144,8 +142,7 @@ public class RoutingServiceTests extends ESAllocationTestCase { final ThreadPool testThreadPool = new ThreadPool(getTestName()); try { - DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator(); - AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator); + MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator()); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("short_delay").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms")) .numberOfShards(1).numberOfReplicas(1)) @@ -185,11 +182,13 @@ public class RoutingServiceTests extends ESAllocationTestCase { } assertNotNull(longDelayReplica); + final long baseTime = System.nanoTime(); + // remove node of shortDelayReplica and node of longDelayReplica and reroute ClusterState prevState = clusterState; clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(shortDelayReplica.currentNodeId()).remove(longDelayReplica.currentNodeId())).build(); // make sure both replicas are marked as delayed (i.e. not reallocated) - mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + 1); + allocation.setNanoTimeOverride(baseTime); clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build(); // check that shortDelayReplica and longDelayReplica have been marked unassigned @@ -216,7 +215,7 @@ public class RoutingServiceTests extends ESAllocationTestCase { RoutingService routingService = new RoutingService(Settings.EMPTY, testThreadPool, clusterService, allocation); routingService.start(); // just so performReroute does not prematurely return // next (delayed) reroute should only delay longDelayReplica/longDelayUnassignedReplica, simulate that we are now 1 second after shards became unassigned - mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + TimeValue.timeValueSeconds(1).nanos()); + allocation.setNanoTimeOverride(baseTime + TimeValue.timeValueSeconds(1).nanos()); // register listener on cluster state so we know when cluster state has been changed CountDownLatch latch = new CountDownLatch(1); clusterService.addLast(event -> latch.countDown()); @@ -225,50 +224,12 @@ public class RoutingServiceTests extends ESAllocationTestCase { // cluster service should have updated state and called routingService with clusterChanged latch.await(); // verify the registration has been set to the delay of longDelayReplica/longDelayUnassignedReplica - assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(TimeValue.timeValueSeconds(10).millis())); + assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(TimeValue.timeValueSeconds(10).nanos())); } finally { terminate(testThreadPool); } } - public void testDelayedUnassignedDoesNotRerouteForNegativeDelays() throws Exception { - DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator(); - AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator); - MetaData metaData = MetaData.builder() - .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms")) - .numberOfShards(1).numberOfReplicas(1)) - .build(); - ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) - .metaData(metaData) - .routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build(); - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).localNodeId("node1").masterNodeId("node1")).build(); - clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build(); - // starting primaries - clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build(); - // starting replicas - clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build(); - assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false)); - // remove node2 and reroute - ClusterState prevState = clusterState; - clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build(); - clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build(); - // Set it in the future so the delay will be negative - mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + TimeValue.timeValueMinutes(1).nanos()); - - ClusterState newState = clusterState; - - routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState)); - assertBusy(new Runnable() { - @Override - public void run() { - assertThat(routingService.hasReroutedAndClear(), equalTo(false)); - - // verify the registration has been updated - assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(100L)); - } - }); - } - private class TestRoutingService extends RoutingService { private AtomicBoolean rerouted = new AtomicBoolean(); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java index 4847a86e0d6..3288b92cb8e 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java @@ -21,7 +21,6 @@ package org.elasticsearch.cluster.routing; import com.carrotsearch.hppc.IntHashSet; import com.carrotsearch.randomizedtesting.generators.RandomPicks; - import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; @@ -40,15 +39,8 @@ import org.elasticsearch.test.ESAllocationTestCase; import java.util.Collections; import java.util.EnumSet; -import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; -import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; -import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.lessThan; -import static org.hamcrest.Matchers.lessThanOrEqualTo; -import static org.hamcrest.Matchers.notNullValue; -import static org.hamcrest.Matchers.nullValue; +import static org.elasticsearch.cluster.routing.ShardRoutingState.*; +import static org.hamcrest.Matchers.*; /** */ @@ -282,9 +274,32 @@ public class UnassignedInfoTests extends ESAllocationTestCase { assertThat(delay, equalTo(0l)); } + /** + * Verifies that delayed allocation calculation are correct. + */ + public void testLeftDelayCalculation() throws Exception { + final long baseTime = System.nanoTime(); + final UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test", null, baseTime, System.currentTimeMillis()); + final long totalDelayNanos = TimeValue.timeValueMillis(10).nanos(); + final Settings settings = Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, TimeValue.timeValueNanos(totalDelayNanos)).build(); + long delay = unassignedInfo.updateDelay(baseTime, settings, Settings.EMPTY); + assertThat(delay, equalTo(totalDelayNanos)); + assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos())); + long delta1 = randomIntBetween(1, (int) (totalDelayNanos - 1)); + delay = unassignedInfo.updateDelay(baseTime + delta1, settings, Settings.EMPTY); + assertThat(delay, equalTo(totalDelayNanos - delta1)); + assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos())); + delay = unassignedInfo.updateDelay(baseTime + totalDelayNanos, settings, Settings.EMPTY); + assertThat(delay, equalTo(0L)); + assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos())); + delay = unassignedInfo.updateDelay(baseTime + totalDelayNanos + randomIntBetween(1, 20), settings, Settings.EMPTY); + assertThat(delay, equalTo(0L)); + assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos())); + } + + public void testNumberOfDelayedUnassigned() throws Exception { - DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator(); - AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator); + MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator()); MetaData metaData = MetaData.builder() .put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) .put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) @@ -303,17 +318,21 @@ public class UnassignedInfoTests extends ESAllocationTestCase { // remove node2 and reroute clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build(); // make sure both replicas are marked as delayed (i.e. not reallocated) - mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + 1); clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build(); assertThat(clusterState.prettyPrint(), UnassignedInfo.getNumberOfDelayedUnassigned(clusterState), equalTo(2)); } public void testFindNextDelayedAllocation() { - DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator(); - AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator); + MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator()); + final long baseTime = System.nanoTime(); + allocation.setNanoTimeOverride(baseTime); + final TimeValue delayTest1 = TimeValue.timeValueMillis(randomIntBetween(1, 200)); + final TimeValue delayTest2 = TimeValue.timeValueMillis(randomIntBetween(1, 200)); + final long expectMinDelaySettingsNanos = Math.min(delayTest1.nanos(), delayTest2.nanos()); + MetaData metaData = MetaData.builder() - .put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h")).numberOfShards(1).numberOfReplicas(1)) - .put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h")).numberOfShards(1).numberOfReplicas(1)) + .put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, delayTest1)).numberOfShards(1).numberOfReplicas(1)) + .put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, delayTest2)).numberOfShards(1).numberOfReplicas(1)) .build(); ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT) .metaData(metaData) @@ -328,15 +347,19 @@ public class UnassignedInfoTests extends ESAllocationTestCase { assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false)); // remove node2 and reroute clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build(); - // make sure both replicas are marked as delayed (i.e. not reallocated) - mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + 1); clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build(); - long nextDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSetting(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState); - assertThat(nextDelaySetting, equalTo(TimeValue.timeValueHours(10).millis())); + final long delta = randomBoolean() ? 0 : randomInt((int) expectMinDelaySettingsNanos); + + if (delta > 0) { + allocation.setNanoTimeOverride(baseTime + delta); + clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "time moved")).build(); + } + + long minDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSettingNanos(Settings.EMPTY, clusterState); + assertThat(minDelaySetting, equalTo(expectMinDelaySettingsNanos)); long nextDelay = UnassignedInfo.findNextDelayedAllocationIn(clusterState); - assertThat(nextDelay, greaterThan(TimeValue.timeValueHours(9).nanos())); - assertThat(nextDelay, lessThanOrEqualTo(TimeValue.timeValueHours(10).nanos())); + assertThat(nextDelay, equalTo(expectMinDelaySettingsNanos - delta)); } } diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 6d141a758e8..a739f30856a 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -29,14 +29,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.RoutingNodes; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.TestShardRouting; +import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; @@ -55,14 +48,9 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; -import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; -import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; -import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED; +import static org.elasticsearch.cluster.routing.ShardRoutingState.*; import static org.elasticsearch.common.settings.Settings.settingsBuilder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.*; public class DiskThresholdDeciderTests extends ESAllocationTestCase { @@ -859,7 +847,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { ) ); ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build(); - RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo); + RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo, System.nanoTime()); Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation); assertThat(decision.type(), equalTo(Decision.Type.NO)); @@ -879,7 +867,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { ) ); clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build(); - routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo); + routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo, System.nanoTime()); decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation); assertThat(decision.type(), equalTo(Decision.Type.YES)); @@ -978,7 +966,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { ) ); ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build(); - RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo); + RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo, System.nanoTime()); Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation); // Two shards should start happily @@ -1035,7 +1023,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase { ); clusterState = ClusterState.builder(updateClusterState).routingTable(builder.build()).build(); - routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo); + routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo, System.nanoTime()); decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation); assertThat(decision.type(), equalTo(Decision.Type.YES)); diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index fa31d8306e5..a386883ad1b 100644 --- a/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -20,21 +20,13 @@ package org.elasticsearch.cluster.routing.allocation.decider; import org.elasticsearch.Version; -import org.elasticsearch.cluster.ClusterInfo; -import org.elasticsearch.cluster.ClusterInfoService; -import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.DiskUsage; -import org.elasticsearch.cluster.EmptyClusterInfoService; +import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.MockInternalClusterInfoService.DevNullClusterInfo; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingHelper; -import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.settings.Settings; @@ -129,7 +121,7 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase { ImmutableOpenMap.Builder shardSizes = ImmutableOpenMap.builder(); shardSizes.put("[test][0][p]", 10L); // 10 bytes final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), mostAvailableUsage.build(), shardSizes.build(), ImmutableOpenMap.of()); - RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo); + RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo, System.nanoTime()); assertEquals(mostAvailableUsage.toString(), Decision.YES, decider.canAllocate(test_0, new RoutingNode("node_0", node_0), allocation)); assertEquals(mostAvailableUsage.toString(), Decision.NO, decider.canAllocate(test_0, new RoutingNode("node_1", node_1), allocation)); } @@ -194,7 +186,7 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase { shardSizes.put("[test][2][p]", 10L); final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), mostAvailableUsage.build(), shardSizes.build(), shardRoutingMap.build()); - RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo); + RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo, System.nanoTime()); assertEquals(Decision.YES, decider.canRemain(test_0, new RoutingNode("node_0", node_0), allocation)); assertEquals(Decision.NO, decider.canRemain(test_1, new RoutingNode("node_1", node_1), allocation)); try { diff --git a/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java b/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java index 25982150340..73cbb51faed 100644 --- a/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/PrimaryShardAllocatorTests.java @@ -27,13 +27,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.SnapshotId; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.RestoreSource; -import org.elasticsearch.cluster.routing.RoutingNodes; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.common.Nullable; @@ -48,9 +42,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; -import static org.hamcrest.Matchers.anyOf; -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.*; /** */ @@ -184,7 +176,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { .metaData(metaData) .routingTable(routingTable) .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); - RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), state.getRoutingNodes(), state.nodes(), null); + RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), state.getRoutingNodes(), state.nodes(), null, System.nanoTime()); testAllocator.addData(node1, -1).addData(node2, -1); boolean changed = testAllocator.allocateUnassigned(allocation); @@ -208,7 +200,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { .routingTable(routingTable) .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); - RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null); + RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(false)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); @@ -216,7 +208,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas testAllocator.addData(node1, 1); - allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null); + allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(false)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); @@ -224,7 +216,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas testAllocator.addData(node2, 1); - allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null); + allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0)); @@ -249,7 +241,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { .routingTable(routingTable) .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); - RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null); + RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(false)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); @@ -257,7 +249,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas testAllocator.addData(node1, 1); - allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null); + allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(false)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); @@ -265,7 +257,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas testAllocator.addData(node2, 2); - allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null); + allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0)); @@ -336,7 +328,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase { .metaData(metaData) .routingTable(routingTable) .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); - return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), null); + return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), null, System.nanoTime()); } class TestAllocator extends PrimaryShardAllocator { diff --git a/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java index 6a0aabe8d53..9a053b36527 100644 --- a/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java +++ b/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java @@ -20,7 +20,6 @@ package org.elasticsearch.gateway; import com.carrotsearch.randomizedtesting.generators.RandomPicks; - import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterInfo; import org.elasticsearch.cluster.ClusterState; @@ -28,15 +27,8 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.IndexShardRoutingTable; -import org.elasticsearch.cluster.routing.RoutingNode; -import org.elasticsearch.cluster.routing.RoutingNodes; -import org.elasticsearch.cluster.routing.RoutingTable; -import org.elasticsearch.cluster.routing.ShardRouting; -import org.elasticsearch.cluster.routing.ShardRoutingState; -import org.elasticsearch.cluster.routing.TestShardRouting; -import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider; import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; @@ -232,6 +224,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase { // we sometime return empty list of files, make sure we test this as well testAllocator.addData(node2, false, null); } + AllocationService.updateLeftDelayOfUnassignedShards(allocation, Settings.EMPTY); boolean changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(true)); assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1)); @@ -240,6 +233,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase { allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders(), Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, TimeValue.timeValueHours(1)).build(), UnassignedInfo.Reason.NODE_LEFT); testAllocator.addData(node2, false, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM")); + AllocationService.updateLeftDelayOfUnassignedShards(allocation, Settings.EMPTY); changed = testAllocator.allocateUnassigned(allocation); assertThat(changed, equalTo(true)); assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1)); @@ -296,7 +290,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase { .metaData(metaData) .routingTable(routingTable) .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); - return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY); + return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY, System.nanoTime()); } private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) { @@ -315,7 +309,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase { .metaData(metaData) .routingTable(routingTable) .nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build(); - return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY); + return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY, System.nanoTime()); } class TestAllocator extends ReplicaShardAllocator { diff --git a/core/src/test/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionIT.java b/core/src/test/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionIT.java index 933d8506531..818937c511e 100644 --- a/core/src/test/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionIT.java +++ b/core/src/test/java/org/elasticsearch/indexlifecycle/IndexLifecycleActionIT.java @@ -20,12 +20,13 @@ package org.elasticsearch.indexlifecycle; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.UnassignedInfo; import org.elasticsearch.common.Priority; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; @@ -42,15 +43,9 @@ import static org.elasticsearch.client.Requests.clusterHealthRequest; import static org.elasticsearch.client.Requests.createIndexRequest; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; -import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; -import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; -import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; +import static org.elasticsearch.cluster.routing.ShardRoutingState.*; import static org.elasticsearch.common.settings.Settings.settingsBuilder; -import static org.hamcrest.Matchers.anyOf; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.not; -import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.*; /** @@ -62,6 +57,7 @@ public class IndexLifecycleActionIT extends ESIntegTestCase { Settings settings = settingsBuilder() .put(SETTING_NUMBER_OF_SHARDS, 11) .put(SETTING_NUMBER_OF_REPLICAS, 1) + .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "0s") .build(); // start one server diff --git a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java index c55d80b7c51..ab3f825cecc 100644 --- a/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java +++ b/core/src/test/java/org/elasticsearch/indices/state/RareClusterStateIT.java @@ -56,19 +56,12 @@ import org.elasticsearch.test.disruption.BlockClusterStateProcessing; import org.elasticsearch.test.junit.annotations.TestLogging; import java.io.IOException; -import java.util.Arrays; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasItem; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.*; /** */ @@ -99,7 +92,7 @@ public class RareClusterStateIT extends ESIntegTestCase { .nodes(DiscoveryNodes.EMPTY_NODES) .build(), false ); - RoutingAllocation routingAllocation = new RoutingAllocation(allocationDeciders, routingNodes, current.nodes(), ClusterInfo.EMPTY); + RoutingAllocation routingAllocation = new RoutingAllocation(allocationDeciders, routingNodes, current.nodes(), ClusterInfo.EMPTY, System.nanoTime()); allocator.allocateUnassigned(routingAllocation); } diff --git a/test-framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java b/test-framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java index 65540a3c536..c4f4b196739 100644 --- a/test-framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java +++ b/test-framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java @@ -47,12 +47,7 @@ import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.test.gateway.NoopGatewayAllocator; import java.lang.reflect.Constructor; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.function.Function; +import java.util.*; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.common.util.CollectionUtils.arrayAsArrayList; @@ -63,32 +58,32 @@ import static org.hamcrest.CoreMatchers.is; */ public abstract class ESAllocationTestCase extends ESTestCase { - public static AllocationService createAllocationService() { + public static MockAllocationService createAllocationService() { return createAllocationService(Settings.Builder.EMPTY_SETTINGS); } - public static AllocationService createAllocationService(Settings settings) { + public static MockAllocationService createAllocationService(Settings settings) { return createAllocationService(settings, getRandom()); } - public static AllocationService createAllocationService(Settings settings, Random random) { + public static MockAllocationService createAllocationService(Settings settings, Random random) { return createAllocationService(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), random); } - public static AllocationService createAllocationService(Settings settings, NodeSettingsService nodeSettingsService, Random random) { - return new AllocationService(settings, + public static MockAllocationService createAllocationService(Settings settings, NodeSettingsService nodeSettingsService, Random random) { + return new MockAllocationService(settings, randomAllocationDeciders(settings, nodeSettingsService, random), new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE); } - public static AllocationService createAllocationService(Settings settings, ClusterInfoService clusterInfoService) { - return new AllocationService(settings, + public static MockAllocationService createAllocationService(Settings settings, ClusterInfoService clusterInfoService) { + return new MockAllocationService(settings, randomAllocationDeciders(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), getRandom()), new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), clusterInfoService); } - public static AllocationService createAllocationService(Settings settings, GatewayAllocator allocator) { - return new AllocationService(settings, + public static MockAllocationService createAllocationService(Settings settings, GatewayAllocator allocator) { + return new MockAllocationService(settings, randomAllocationDeciders(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), getRandom()), new ShardsAllocators(settings, allocator), EmptyClusterInfoService.INSTANCE); } @@ -187,9 +182,27 @@ public abstract class ESAllocationTestCase extends ESTestCase { } } + /** A lock {@link AllocationService} allowing tests to override time */ + protected static class MockAllocationService extends AllocationService { + + private Long nanoTimeOverride = null; + + public MockAllocationService(Settings settings, AllocationDeciders allocationDeciders, ShardsAllocators shardsAllocators, ClusterInfoService clusterInfoService) { + super(settings, allocationDeciders, shardsAllocators, clusterInfoService); + } + + public void setNanoTimeOverride(long nanoTime) { + this.nanoTimeOverride = nanoTime; + } + + @Override + protected long currentNanoTime() { + return nanoTimeOverride == null ? super.currentNanoTime() : nanoTimeOverride; + } + } + /** * Mocks behavior in ReplicaShardAllocator to remove delayed shards from list of unassigned shards so they don't get reassigned yet. - * Also computes delay in UnassignedInfo based on customizable time source. */ protected static class DelayedShardsMockGatewayAllocator extends GatewayAllocator { private final ReplicaShardAllocator replicaShardAllocator = new ReplicaShardAllocator(Settings.EMPTY) { @@ -199,16 +212,11 @@ public abstract class ESAllocationTestCase extends ESTestCase { } }; - private volatile Function timeSource; public DelayedShardsMockGatewayAllocator() { super(Settings.EMPTY, null, null); } - public void setTimeSource(Function timeSource) { - this.timeSource = timeSource; - } - @Override public void applyStartedShards(StartedRerouteAllocation allocation) {} @@ -224,8 +232,7 @@ public abstract class ESAllocationTestCase extends ESTestCase { if (shard.primary() || shard.allocatedPostIndexCreate() == false) { continue; } - changed |= replicaShardAllocator.ignoreUnassignedIfDelayed(timeSource == null ? System.nanoTime() : timeSource.apply(shard), - allocation, unassignedIterator, shard); + changed |= replicaShardAllocator.ignoreUnassignedIfDelayed(unassignedIterator, shard); } return changed; }