Make sure the remaining delay of unassigned shard is updated with every reroute

For example: if a node left the cluster and an async store fetch was triggered. In that time no shard is marked as delayed (and strictly speaking it's not yet delayed). This caused test for shard delays post node left to fail. see : http://build-us-00.elastic.co/job/es_core_master_windows-2012-r2/2074/testReport/

 To fix this, the delay update is now done by the Allocation Service, based of a fixed time stamp that is determined at the beginning of the reroute.

 Also, this commit fixes a bug where unassigned info instances were reused across shard routings, causing calculated delays to be leaked.

Closes #14890
This commit is contained in:
Boaz Leskes 2015-11-20 11:37:47 +01:00
parent 28109a18a2
commit 4a6f3c7840
24 changed files with 242 additions and 282 deletions

View File

@ -55,7 +55,7 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
private final AllocationService allocationService;
private AtomicBoolean rerouting = new AtomicBoolean();
private volatile long minDelaySettingAtLastScheduling = Long.MAX_VALUE;
private volatile long minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE;
private volatile ScheduledFuture registeredNextDelayFuture;
@Inject
@ -100,14 +100,14 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
// Figure out if an existing scheduled reroute is good enough or whether we need to cancel and reschedule.
// If the minimum of the currently relevant delay settings is larger than something we scheduled in the past,
// we are guaranteed that the planned schedule will happen before any of the current shard delays are expired.
long minDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSetting(settings, event.state());
long minDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSettingNanos(settings, event.state());
if (minDelaySetting <= 0) {
logger.trace("no need to schedule reroute - no delayed unassigned shards, minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastScheduling);
minDelaySettingAtLastScheduling = Long.MAX_VALUE;
logger.trace("no need to schedule reroute - no delayed unassigned shards, minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastSchedulingNanos);
minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE;
FutureUtils.cancel(registeredNextDelayFuture);
} else if (minDelaySetting < minDelaySettingAtLastScheduling) {
} else if (minDelaySetting < minDelaySettingAtLastSchedulingNanos) {
FutureUtils.cancel(registeredNextDelayFuture);
minDelaySettingAtLastScheduling = minDelaySetting;
minDelaySettingAtLastSchedulingNanos = minDelaySetting;
TimeValue nextDelay = TimeValue.timeValueNanos(UnassignedInfo.findNextDelayedAllocationIn(event.state()));
assert nextDelay.nanos() > 0 : "next delay must be non 0 as minDelaySetting is [" + minDelaySetting + "]";
logger.info("delaying allocation for [{}] unassigned shards, next check in [{}]",
@ -115,25 +115,25 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
registeredNextDelayFuture = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
minDelaySettingAtLastScheduling = Long.MAX_VALUE;
minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE;
reroute("assign delayed unassigned shards");
}
@Override
public void onFailure(Throwable t) {
logger.warn("failed to schedule/execute reroute post unassigned shard", t);
minDelaySettingAtLastScheduling = Long.MAX_VALUE;
minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE;
}
});
} else {
logger.trace("no need to schedule reroute - current schedule reroute is enough. minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastScheduling);
logger.trace("no need to schedule reroute - current schedule reroute is enough. minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastSchedulingNanos);
}
}
}
// visible for testing
long getMinDelaySettingAtLastScheduling() {
return this.minDelaySettingAtLastScheduling;
long getMinDelaySettingAtLastSchedulingNanos() {
return this.minDelaySettingAtLastSchedulingNanos;
}
// visible for testing

View File

@ -34,7 +34,6 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* Holds additional information as to why the shard is in unassigned state.
@ -110,18 +109,27 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
private final String message;
private final Throwable failure;
/**
* creates an UnassingedInfo object based **current** time
*
* @param reason the cause for making this shard unassigned. See {@link Reason} for more information.
* @param message more information about cause.
**/
public UnassignedInfo(Reason reason, String message) {
this(reason, System.currentTimeMillis(), System.nanoTime(), message, null);
this(reason, message, null, System.nanoTime(), System.currentTimeMillis());
}
public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure) {
this(reason, System.currentTimeMillis(), System.nanoTime(), message, failure);
}
private UnassignedInfo(Reason reason, long unassignedTimeMillis, long timestampNanos, String message, Throwable failure) {
/**
* @param reason the cause for making this shard unassigned. See {@link Reason} for more information.
* @param message more information about cause.
* @param failure the shard level failure that caused this shard to be unassigned, if exists.
* @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation
* @param unassignedTimeMillis the time of unassignment used to display to in our reporting.
*/
public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure, long unassignedTimeNanos, long unassignedTimeMillis) {
this.reason = reason;
this.unassignedTimeMillis = unassignedTimeMillis;
this.unassignedTimeNanos = timestampNanos;
this.unassignedTimeNanos = unassignedTimeNanos;
this.message = message;
this.failure = failure;
assert !(message == null && failure != null) : "provide a message if a failure exception is provided";
@ -201,14 +209,14 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
}
/**
* The allocation delay value in milliseconds associated with the index (defaulting to node settings if not set).
* The allocation delay value in nano seconds associated with the index (defaulting to node settings if not set).
*/
public long getAllocationDelayTimeoutSetting(Settings settings, Settings indexSettings) {
public long getAllocationDelayTimeoutSettingNanos(Settings settings, Settings indexSettings) {
if (reason != Reason.NODE_LEFT) {
return 0;
}
TimeValue delayTimeout = indexSettings.getAsTime(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, settings.getAsTime(INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, DEFAULT_DELAYED_NODE_LEFT_TIMEOUT));
return Math.max(0l, delayTimeout.millis());
return Math.max(0l, delayTimeout.nanos());
}
/**
@ -221,18 +229,17 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
/**
* Updates delay left based on current time (in nanoseconds) and index/node settings.
* Should only be called from ReplicaShardAllocator.
*
* @return updated delay in nanoseconds
*/
public long updateDelay(long nanoTimeNow, Settings settings, Settings indexSettings) {
long delayTimeoutMillis = getAllocationDelayTimeoutSetting(settings, indexSettings);
long delayTimeoutNanos = getAllocationDelayTimeoutSettingNanos(settings, indexSettings);
final long newComputedLeftDelayNanos;
if (delayTimeoutMillis == 0l) {
if (delayTimeoutNanos == 0l) {
newComputedLeftDelayNanos = 0l;
} else {
assert nanoTimeNow >= unassignedTimeNanos;
long delayTimeoutNanos = TimeUnit.NANOSECONDS.convert(delayTimeoutMillis, TimeUnit.MILLISECONDS);
newComputedLeftDelayNanos = Math.max(0l, delayTimeoutNanos - (nanoTimeNow - unassignedTimeNanos));
newComputedLeftDelayNanos = Math.max(0L, delayTimeoutNanos - (nanoTimeNow - unassignedTimeNanos));
}
lastComputedLeftDelayNanos = newComputedLeftDelayNanos;
return newComputedLeftDelayNanos;
@ -255,21 +262,21 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
}
/**
* Finds the smallest delay expiration setting in milliseconds of all unassigned shards that are still delayed. Returns 0 if there are none.
* Finds the smallest delay expiration setting in nanos of all unassigned shards that are still delayed. Returns 0 if there are none.
*/
public static long findSmallestDelayedAllocationSetting(Settings settings, ClusterState state) {
long nextDelaySetting = Long.MAX_VALUE;
public static long findSmallestDelayedAllocationSettingNanos(Settings settings, ClusterState state) {
long minDelaySetting = Long.MAX_VALUE;
for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
if (shard.primary() == false) {
IndexMetaData indexMetaData = state.metaData().index(shard.getIndex());
long leftDelayNanos = shard.unassignedInfo().getLastComputedLeftDelayNanos();
long delayTimeoutSetting = shard.unassignedInfo().getAllocationDelayTimeoutSetting(settings, indexMetaData.getSettings());
if (leftDelayNanos > 0 && delayTimeoutSetting > 0 && delayTimeoutSetting < nextDelaySetting) {
nextDelaySetting = delayTimeoutSetting;
boolean delayed = shard.unassignedInfo().getLastComputedLeftDelayNanos() > 0;
long delayTimeoutSetting = shard.unassignedInfo().getAllocationDelayTimeoutSettingNanos(settings, indexMetaData.getSettings());
if (delayed && delayTimeoutSetting > 0 && delayTimeoutSetting < minDelaySetting) {
minDelaySetting = delayTimeoutSetting;
}
}
}
return nextDelaySetting == Long.MAX_VALUE ? 0l : nextDelaySetting;
return minDelaySetting == Long.MAX_VALUE ? 0l : minDelaySetting;
}
@ -320,14 +327,24 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
UnassignedInfo that = (UnassignedInfo) o;
if (unassignedTimeMillis != that.unassignedTimeMillis) return false;
if (reason != that.reason) return false;
if (message != null ? !message.equals(that.message) : that.message != null) return false;
if (unassignedTimeMillis != that.unassignedTimeMillis) {
return false;
}
if (reason != that.reason) {
return false;
}
if (message != null ? !message.equals(that.message) : that.message != null) {
return false;
}
return !(failure != null ? !failure.equals(that.failure) : that.failure != null);
}

View File

@ -22,16 +22,12 @@ package org.elasticsearch.cluster.routing.allocation;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterStateHealth;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
@ -43,9 +39,7 @@ import org.elasticsearch.common.settings.Settings;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -119,7 +113,8 @@ public class AllocationService extends AbstractComponent {
FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState.nodes(), failedShards, clusterInfoService.getClusterInfo());
boolean changed = false;
for (FailedRerouteAllocation.FailedShard failedShard : failedShards) {
changed |= applyFailedShard(allocation, failedShard.shard, true, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShard.message, failedShard.failure));
changed |= applyFailedShard(allocation, failedShard.shard, true, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShard.message, failedShard.failure,
System.nanoTime(), System.currentTimeMillis()));
}
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable());
@ -163,7 +158,7 @@ public class AllocationService extends AbstractComponent {
// we don't shuffle the unassigned shards here, to try and get as close as possible to
// a consistent result of the effect the commands have on the routing
// this allows systems to dry run the commands, see the resulting cluster state, and act on it
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo());
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo(), currentNanoTime());
// don't short circuit deciders, we want a full explanation
allocation.debugDecision(true);
// we ignore disable allocation, because commands are explicit
@ -202,7 +197,7 @@ public class AllocationService extends AbstractComponent {
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo());
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes(), clusterInfoService.getClusterInfo(), currentNanoTime());
allocation.debugDecision(debug);
if (!reroute(allocation)) {
return new RoutingAllocation.Result(false, clusterState.routingTable());
@ -239,6 +234,8 @@ public class AllocationService extends AbstractComponent {
// now allocate all the unassigned to available nodes
if (allocation.routingNodes().unassigned().size() > 0) {
updateLeftDelayOfUnassignedShards(allocation, settings);
changed |= shardsAllocators.allocateUnassigned(allocation);
}
@ -251,6 +248,15 @@ public class AllocationService extends AbstractComponent {
return changed;
}
// public for testing
public static void updateLeftDelayOfUnassignedShards(RoutingAllocation allocation, Settings settings) {
for (ShardRouting shardRouting : allocation.routingNodes().unassigned()) {
final MetaData metaData = allocation.metaData();
final IndexMetaData indexMetaData = metaData.index(shardRouting.index());
shardRouting.unassignedInfo().updateDelay(allocation.getCurrentNanoTime(), settings, indexMetaData.getSettings());
}
}
private boolean moveShards(RoutingAllocation allocation) {
boolean changed = false;
@ -312,7 +318,9 @@ public class AllocationService extends AbstractComponent {
}
}
for (ShardRouting shardToFail : shardsToFail) {
changed |= applyFailedShard(allocation, shardToFail, false, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing"));
changed |= applyFailedShard(allocation, shardToFail, false,
new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, "primary failed while replica initializing",
null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
}
// now, go over and elect a new primary if possible, not, from this code block on, if one is elected,
@ -372,8 +380,9 @@ public class AllocationService extends AbstractComponent {
}
changed = true;
// now, go over all the shards routing on the node, and fail them
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]");
for (ShardRouting shardRouting : node.copyShards()) {
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]", null,
allocation.getCurrentNanoTime(), System.currentTimeMillis());
applyFailedShard(allocation, shardRouting, false, unassignedInfo);
}
// its a dead node, remove it, note, its important to remove it *after* we apply failed shard
@ -531,4 +540,9 @@ public class AllocationService extends AbstractComponent {
RoutingNodes routingNodes = new RoutingNodes(clusterState, false); // this is a costly operation - only call this once!
return routingNodes;
}
/** ovrride this to control time based decisions during allocation */
protected long currentNanoTime() {
return System.nanoTime();
}
}

View File

@ -58,7 +58,7 @@ public class FailedRerouteAllocation extends RoutingAllocation {
private final List<FailedShard> failedShards;
public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, List<FailedShard> failedShards, ClusterInfo clusterInfo) {
super(deciders, routingNodes, nodes, clusterInfo);
super(deciders, routingNodes, nodes, clusterInfo, System.nanoTime());
this.failedShards = failedShards;
}

View File

@ -120,19 +120,27 @@ public class RoutingAllocation {
private boolean hasPendingAsyncFetch = false;
private final long currentNanoTime;
/**
* Creates a new {@link RoutingAllocation}
*
* @param deciders {@link AllocationDeciders} to used to make decisions for routing allocations
* @param routingNodes Routing nodes in the current cluster
* @param deciders {@link AllocationDeciders} to used to make decisions for routing allocations
* @param routingNodes Routing nodes in the current cluster
* @param nodes TODO: Documentation
* @param currentNanoTime the nano time to use for all delay allocation calculation (typically {@link System#nanoTime()})
*/
public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, ClusterInfo clusterInfo) {
public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, ClusterInfo clusterInfo, long currentNanoTime) {
this.deciders = deciders;
this.routingNodes = routingNodes;
this.nodes = nodes;
this.clusterInfo = clusterInfo;
this.currentNanoTime = currentNanoTime;
}
/** returns the nano time captured at the beginning of the allocation. used to make sure all time based decisions are aligned */
public long getCurrentNanoTime() {
return currentNanoTime;
}
/**

View File

@ -36,7 +36,7 @@ public class StartedRerouteAllocation extends RoutingAllocation {
private final List<? extends ShardRouting> startedShards;
public StartedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes, List<? extends ShardRouting> startedShards, ClusterInfo clusterInfo) {
super(deciders, routingNodes, nodes, clusterInfo);
super(deciders, routingNodes, nodes, clusterInfo, System.nanoTime());
this.startedShards = startedShards;
}

View File

@ -19,8 +19,8 @@
package org.elasticsearch.cluster.routing.allocation.allocator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
@ -74,6 +74,10 @@ public class ShardsAllocators extends AbstractComponent implements ShardsAllocat
return changed;
}
protected long nanoTime() {
return System.nanoTime();
}
@Override
public boolean rebalance(RoutingAllocation allocation) {
if (allocation.hasPendingAsyncFetch() == false) {

View File

@ -229,7 +229,8 @@ public class AllocateAllocationCommand implements AllocationCommand {
// it was index creation
if (unassigned.primary() && unassigned.unassignedInfo().getReason() != UnassignedInfo.Reason.INDEX_CREATED) {
unassigned.updateUnassignedInfo(new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED,
"force allocation from previous reason " + unassigned.unassignedInfo().getReason() + ", " + unassigned.unassignedInfo().getMessage(), unassigned.unassignedInfo().getFailure()));
"force allocation from previous reason " + unassigned.unassignedInfo().getReason() + ", " + unassigned.unassignedInfo().getMessage(),
unassigned.unassignedInfo().getFailure(), System.nanoTime(), System.currentTimeMillis()));
}
it.initialize(routingNode.nodeId(), unassigned.version(), allocation.clusterInfo().getShardSize(unassigned, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
break;

View File

@ -21,7 +21,10 @@ package org.elasticsearch.cluster.routing.allocation.command;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.RerouteExplanation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
@ -34,7 +37,6 @@ import org.elasticsearch.index.shard.ShardId;
import java.io.IOException;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
/**

View File

@ -24,7 +24,6 @@ import com.carrotsearch.hppc.ObjectLongMap;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
@ -101,7 +100,8 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
// we found a better match that has a full sync id match, the existing allocation is not fully synced
// so we found a better one, cancel this one
it.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA,
"existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node [" + nodeWithHighestMatch + "]"));
"existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node [" + nodeWithHighestMatch + "]",
null, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
changed = true;
}
}
@ -111,7 +111,6 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
}
public boolean allocateUnassigned(RoutingAllocation allocation) {
long nanoTimeNow = System.nanoTime();
boolean changed = false;
final RoutingNodes routingNodes = allocation.routingNodes();
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
@ -171,7 +170,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
}
} else if (matchingNodes.hasAnyData() == false) {
// if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation of the replica shard needs to be delayed
changed |= ignoreUnassignedIfDelayed(nanoTimeNow, allocation, unassignedIterator, shard);
changed |= ignoreUnassignedIfDelayed(unassignedIterator, shard);
}
}
return changed;
@ -185,16 +184,13 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
*
* PUBLIC FOR TESTS!
*
* @param timeNowNanos Timestamp in nanoseconds representing "now"
* @param allocation the routing allocation
* @param unassignedIterator iterator over unassigned shards
* @param shard the shard which might be delayed
* @return true iff allocation is delayed for this shard
*/
public boolean ignoreUnassignedIfDelayed(long timeNowNanos, RoutingAllocation allocation, RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator, ShardRouting shard) {
IndexMetaData indexMetaData = allocation.metaData().index(shard.getIndex());
public boolean ignoreUnassignedIfDelayed(RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator, ShardRouting shard) {
// calculate delay and store it in UnassignedInfo to be used by RoutingService
long delay = shard.unassignedInfo().updateDelay(timeNowNanos, settings, indexMetaData.getSettings());
long delay = shard.unassignedInfo().getLastComputedLeftDelayNanos();
if (delay > 0) {
logger.debug("[{}][{}]: delaying allocation of [{}] for [{}]", shard.index(), shard.id(), shard, TimeValue.timeValueNanos(delay));
/**

View File

@ -27,12 +27,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.index.shard.ShardId;
@ -40,10 +35,7 @@ import org.elasticsearch.index.shard.ShardId;
import java.util.HashSet;
import java.util.Set;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_CREATION_DATE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.elasticsearch.cluster.metadata.IndexMetaData.*;
import static org.elasticsearch.test.ESTestCase.randomFrom;
import static org.elasticsearch.test.ESTestCase.randomIntBetween;

View File

@ -20,25 +20,13 @@
package org.elasticsearch.cluster;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoriesMetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.metadata.*;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
@ -50,7 +38,6 @@ import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.warmer.IndexWarmersMetaData;
import org.elasticsearch.test.ESIntegTestCase;

View File

@ -21,10 +21,7 @@ package org.elasticsearch.cluster.routing;
import org.elasticsearch.test.ESTestCase;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.*;
/**
*/

View File

@ -19,7 +19,6 @@
package org.elasticsearch.cluster.routing;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -37,7 +36,6 @@ import static org.hamcrest.Matchers.equalTo;
/**
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
@LuceneTestCase.AwaitsFix(bugUrl = "http://build-us-00.elastic.co/job/es_core_master_windows-2012-r2/2074/testReport/ (boaz on it)")
public class DelayedAllocationIT extends ESIntegTestCase {
/**

View File

@ -19,9 +19,7 @@
package org.elasticsearch.cluster.routing;
import static org.elasticsearch.test.ESTestCase.randomAsciiOfLength;
import static org.elasticsearch.test.ESTestCase.randomFrom;
import static org.elasticsearch.test.ESTestCase.randomInt;
import static org.elasticsearch.test.ESTestCase.*;
/**
* Utility class the makes random modifications to ShardRouting

View File

@ -88,15 +88,14 @@ public class RoutingServiceTests extends ESAllocationTestCase {
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
ClusterState newState = clusterState;
assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(Long.MAX_VALUE));
assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(Long.MAX_VALUE));
routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(Long.MAX_VALUE));
assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(Long.MAX_VALUE));
assertThat(routingService.hasReroutedAndClear(), equalTo(false));
}
public void testDelayedUnassignedScheduleReroute() throws Exception {
DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator);
MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms"))
.numberOfShards(1).numberOfReplicas(1))
@ -126,7 +125,6 @@ public class RoutingServiceTests extends ESAllocationTestCase {
ClusterState prevState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(nodeId)).build();
// make sure the replica is marked as delayed (i.e. not reallocated)
mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + TimeValue.timeValueMillis(randomIntBetween(0, 99)).nanos());
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
assertEquals(1, clusterState.getRoutingNodes().unassigned().size());
@ -134,7 +132,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
assertBusy(() -> assertTrue("routing service should have run a reroute", routingService.hasReroutedAndClear()));
// verify the registration has been reset
assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(Long.MAX_VALUE));
assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(Long.MAX_VALUE));
}
/**
@ -144,8 +142,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
final ThreadPool testThreadPool = new ThreadPool(getTestName());
try {
DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator);
MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("short_delay").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms"))
.numberOfShards(1).numberOfReplicas(1))
@ -185,11 +182,13 @@ public class RoutingServiceTests extends ESAllocationTestCase {
}
assertNotNull(longDelayReplica);
final long baseTime = System.nanoTime();
// remove node of shortDelayReplica and node of longDelayReplica and reroute
ClusterState prevState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(shortDelayReplica.currentNodeId()).remove(longDelayReplica.currentNodeId())).build();
// make sure both replicas are marked as delayed (i.e. not reallocated)
mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + 1);
allocation.setNanoTimeOverride(baseTime);
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
// check that shortDelayReplica and longDelayReplica have been marked unassigned
@ -216,7 +215,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
RoutingService routingService = new RoutingService(Settings.EMPTY, testThreadPool, clusterService, allocation);
routingService.start(); // just so performReroute does not prematurely return
// next (delayed) reroute should only delay longDelayReplica/longDelayUnassignedReplica, simulate that we are now 1 second after shards became unassigned
mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + TimeValue.timeValueSeconds(1).nanos());
allocation.setNanoTimeOverride(baseTime + TimeValue.timeValueSeconds(1).nanos());
// register listener on cluster state so we know when cluster state has been changed
CountDownLatch latch = new CountDownLatch(1);
clusterService.addLast(event -> latch.countDown());
@ -225,50 +224,12 @@ public class RoutingServiceTests extends ESAllocationTestCase {
// cluster service should have updated state and called routingService with clusterChanged
latch.await();
// verify the registration has been set to the delay of longDelayReplica/longDelayUnassignedReplica
assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(TimeValue.timeValueSeconds(10).millis()));
assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(TimeValue.timeValueSeconds(10).nanos()));
} finally {
terminate(testThreadPool);
}
}
public void testDelayedUnassignedDoesNotRerouteForNegativeDelays() throws Exception {
DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms"))
.numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).localNodeId("node1").masterNodeId("node1")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
// starting primaries
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
// remove node2 and reroute
ClusterState prevState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
// Set it in the future so the delay will be negative
mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + TimeValue.timeValueMinutes(1).nanos());
ClusterState newState = clusterState;
routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(routingService.hasReroutedAndClear(), equalTo(false));
// verify the registration has been updated
assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(100L));
}
});
}
private class TestRoutingService extends RoutingService {
private AtomicBoolean rerouted = new AtomicBoolean();

View File

@ -21,7 +21,6 @@ package org.elasticsearch.cluster.routing;
import com.carrotsearch.hppc.IntHashSet;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
@ -40,15 +39,8 @@ import org.elasticsearch.test.ESAllocationTestCase;
import java.util.Collections;
import java.util.EnumSet;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.hamcrest.Matchers.*;
/**
*/
@ -282,9 +274,32 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
assertThat(delay, equalTo(0l));
}
/**
* Verifies that delayed allocation calculation are correct.
*/
public void testLeftDelayCalculation() throws Exception {
final long baseTime = System.nanoTime();
final UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test", null, baseTime, System.currentTimeMillis());
final long totalDelayNanos = TimeValue.timeValueMillis(10).nanos();
final Settings settings = Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, TimeValue.timeValueNanos(totalDelayNanos)).build();
long delay = unassignedInfo.updateDelay(baseTime, settings, Settings.EMPTY);
assertThat(delay, equalTo(totalDelayNanos));
assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos()));
long delta1 = randomIntBetween(1, (int) (totalDelayNanos - 1));
delay = unassignedInfo.updateDelay(baseTime + delta1, settings, Settings.EMPTY);
assertThat(delay, equalTo(totalDelayNanos - delta1));
assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos()));
delay = unassignedInfo.updateDelay(baseTime + totalDelayNanos, settings, Settings.EMPTY);
assertThat(delay, equalTo(0L));
assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos()));
delay = unassignedInfo.updateDelay(baseTime + totalDelayNanos + randomIntBetween(1, 20), settings, Settings.EMPTY);
assertThat(delay, equalTo(0L));
assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos()));
}
public void testNumberOfDelayedUnassigned() throws Exception {
DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator);
MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
@ -303,17 +318,21 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
// remove node2 and reroute
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
// make sure both replicas are marked as delayed (i.e. not reallocated)
mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + 1);
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
assertThat(clusterState.prettyPrint(), UnassignedInfo.getNumberOfDelayedUnassigned(clusterState), equalTo(2));
}
public void testFindNextDelayedAllocation() {
DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator);
MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
final long baseTime = System.nanoTime();
allocation.setNanoTimeOverride(baseTime);
final TimeValue delayTest1 = TimeValue.timeValueMillis(randomIntBetween(1, 200));
final TimeValue delayTest2 = TimeValue.timeValueMillis(randomIntBetween(1, 200));
final long expectMinDelaySettingsNanos = Math.min(delayTest1.nanos(), delayTest2.nanos());
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h")).numberOfShards(1).numberOfReplicas(1))
.put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h")).numberOfShards(1).numberOfReplicas(1))
.put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, delayTest1)).numberOfShards(1).numberOfReplicas(1))
.put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, delayTest2)).numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
@ -328,15 +347,19 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
// remove node2 and reroute
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
// make sure both replicas are marked as delayed (i.e. not reallocated)
mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + 1);
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
long nextDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSetting(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState);
assertThat(nextDelaySetting, equalTo(TimeValue.timeValueHours(10).millis()));
final long delta = randomBoolean() ? 0 : randomInt((int) expectMinDelaySettingsNanos);
if (delta > 0) {
allocation.setNanoTimeOverride(baseTime + delta);
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "time moved")).build();
}
long minDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSettingNanos(Settings.EMPTY, clusterState);
assertThat(minDelaySetting, equalTo(expectMinDelaySettingsNanos));
long nextDelay = UnassignedInfo.findNextDelayedAllocationIn(clusterState);
assertThat(nextDelay, greaterThan(TimeValue.timeValueHours(9).nanos()));
assertThat(nextDelay, lessThanOrEqualTo(TimeValue.timeValueHours(10).nanos()));
assertThat(nextDelay, equalTo(expectMinDelaySettingsNanos - delta));
}
}

View File

@ -29,14 +29,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
@ -55,14 +48,9 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.ShardRoutingState.UNASSIGNED;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.*;
public class DiskThresholdDeciderTests extends ESAllocationTestCase {
@ -859,7 +847,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
)
);
ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build();
RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo);
RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo, System.nanoTime());
Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
assertThat(decision.type(), equalTo(Decision.Type.NO));
@ -879,7 +867,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
)
);
clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build();
routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo);
routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo, System.nanoTime());
decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
assertThat(decision.type(), equalTo(Decision.Type.YES));
@ -978,7 +966,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
)
);
ClusterState clusterState = ClusterState.builder(baseClusterState).routingTable(builder.build()).build();
RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo);
RoutingAllocation routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo, System.nanoTime());
Decision decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
// Two shards should start happily
@ -1035,7 +1023,7 @@ public class DiskThresholdDeciderTests extends ESAllocationTestCase {
);
clusterState = ClusterState.builder(updateClusterState).routingTable(builder.build()).build();
routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo);
routingAllocation = new RoutingAllocation(null, new RoutingNodes(clusterState), discoveryNodes, clusterInfo, System.nanoTime());
decision = diskThresholdDecider.canRemain(firstRouting, firstRoutingNode, routingAllocation);
assertThat(decision.type(), equalTo(Decision.Type.YES));

View File

@ -20,21 +20,13 @@
package org.elasticsearch.cluster.routing.allocation.decider;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.DiskUsage;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.MockInternalClusterInfoService.DevNullClusterInfo;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
@ -129,7 +121,7 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
ImmutableOpenMap.Builder<String, Long> shardSizes = ImmutableOpenMap.builder();
shardSizes.put("[test][0][p]", 10L); // 10 bytes
final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), mostAvailableUsage.build(), shardSizes.build(), ImmutableOpenMap.of());
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo);
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo, System.nanoTime());
assertEquals(mostAvailableUsage.toString(), Decision.YES, decider.canAllocate(test_0, new RoutingNode("node_0", node_0), allocation));
assertEquals(mostAvailableUsage.toString(), Decision.NO, decider.canAllocate(test_0, new RoutingNode("node_1", node_1), allocation));
}
@ -194,7 +186,7 @@ public class DiskThresholdDeciderUnitTests extends ESTestCase {
shardSizes.put("[test][2][p]", 10L);
final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), mostAvailableUsage.build(), shardSizes.build(), shardRoutingMap.build());
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo);
RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Settings.EMPTY, new AllocationDecider[]{decider}), clusterState.getRoutingNodes(), clusterState.nodes(), clusterInfo, System.nanoTime());
assertEquals(Decision.YES, decider.canRemain(test_0, new RoutingNode("node_0", node_0), allocation));
assertEquals(Decision.NO, decider.canRemain(test_1, new RoutingNode("node_1", node_1), allocation));
try {

View File

@ -27,13 +27,7 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.SnapshotId;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.common.Nullable;
@ -48,9 +42,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.*;
/**
*/
@ -184,7 +176,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
.metaData(metaData)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), state.getRoutingNodes(), state.nodes(), null);
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), state.getRoutingNodes(), state.nodes(), null, System.nanoTime());
testAllocator.addData(node1, -1).addData(node2, -1);
boolean changed = testAllocator.allocateUnassigned(allocation);
@ -208,7 +200,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null);
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
@ -216,7 +208,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
testAllocator.addData(node1, 1);
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null);
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
@ -224,7 +216,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
testAllocator.addData(node2, 1);
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null);
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0));
@ -249,7 +241,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null);
RoutingAllocation allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
@ -257,7 +249,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
testAllocator.addData(node1, 1);
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null);
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
@ -265,7 +257,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(2)); // replicas
testAllocator.addData(node2, 2);
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null);
allocation = new RoutingAllocation(yesAllocationDeciders(), new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(0));
@ -336,7 +328,7 @@ public class PrimaryShardAllocatorTests extends ESAllocationTestCase {
.metaData(metaData)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), null);
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), null, System.nanoTime());
}
class TestAllocator extends PrimaryShardAllocator {

View File

@ -20,7 +20,6 @@
package org.elasticsearch.gateway;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterInfo;
import org.elasticsearch.cluster.ClusterState;
@ -28,15 +27,8 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.TestShardRouting;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
@ -232,6 +224,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
// we sometime return empty list of files, make sure we test this as well
testAllocator.addData(node2, false, null);
}
AllocationService.updateLeftDelayOfUnassignedShards(allocation, Settings.EMPTY);
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
@ -240,6 +233,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders(),
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, TimeValue.timeValueHours(1)).build(), UnassignedInfo.Reason.NODE_LEFT);
testAllocator.addData(node2, false, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"));
AllocationService.updateLeftDelayOfUnassignedShards(allocation, Settings.EMPTY);
changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
@ -296,7 +290,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
.metaData(metaData)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY);
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY, System.nanoTime());
}
private RoutingAllocation onePrimaryOnNode1And1ReplicaRecovering(AllocationDeciders deciders) {
@ -315,7 +309,7 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
.metaData(metaData)
.routingTable(routingTable)
.nodes(DiscoveryNodes.builder().put(node1).put(node2).put(node3)).build();
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY);
return new RoutingAllocation(deciders, new RoutingNodes(state, false), state.nodes(), ClusterInfo.EMPTY, System.nanoTime());
}
class TestAllocator extends ReplicaShardAllocator {

View File

@ -20,12 +20,13 @@
package org.elasticsearch.indexlifecycle;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
@ -42,15 +43,9 @@ import static org.elasticsearch.client.Requests.clusterHealthRequest;
import static org.elasticsearch.client.Requests.createIndexRequest;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.*;
/**
@ -62,6 +57,7 @@ public class IndexLifecycleActionIT extends ESIntegTestCase {
Settings settings = settingsBuilder()
.put(SETTING_NUMBER_OF_SHARDS, 11)
.put(SETTING_NUMBER_OF_REPLICAS, 1)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "0s")
.build();
// start one server

View File

@ -56,19 +56,12 @@ import org.elasticsearch.test.disruption.BlockClusterStateProcessing;
import org.elasticsearch.test.junit.annotations.TestLogging;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.*;
/**
*/
@ -99,7 +92,7 @@ public class RareClusterStateIT extends ESIntegTestCase {
.nodes(DiscoveryNodes.EMPTY_NODES)
.build(), false
);
RoutingAllocation routingAllocation = new RoutingAllocation(allocationDeciders, routingNodes, current.nodes(), ClusterInfo.EMPTY);
RoutingAllocation routingAllocation = new RoutingAllocation(allocationDeciders, routingNodes, current.nodes(), ClusterInfo.EMPTY, System.nanoTime());
allocator.allocateUnassigned(routingAllocation);
}

View File

@ -47,12 +47,7 @@ import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.Function;
import java.util.*;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.common.util.CollectionUtils.arrayAsArrayList;
@ -63,32 +58,32 @@ import static org.hamcrest.CoreMatchers.is;
*/
public abstract class ESAllocationTestCase extends ESTestCase {
public static AllocationService createAllocationService() {
public static MockAllocationService createAllocationService() {
return createAllocationService(Settings.Builder.EMPTY_SETTINGS);
}
public static AllocationService createAllocationService(Settings settings) {
public static MockAllocationService createAllocationService(Settings settings) {
return createAllocationService(settings, getRandom());
}
public static AllocationService createAllocationService(Settings settings, Random random) {
public static MockAllocationService createAllocationService(Settings settings, Random random) {
return createAllocationService(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), random);
}
public static AllocationService createAllocationService(Settings settings, NodeSettingsService nodeSettingsService, Random random) {
return new AllocationService(settings,
public static MockAllocationService createAllocationService(Settings settings, NodeSettingsService nodeSettingsService, Random random) {
return new MockAllocationService(settings,
randomAllocationDeciders(settings, nodeSettingsService, random),
new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), EmptyClusterInfoService.INSTANCE);
}
public static AllocationService createAllocationService(Settings settings, ClusterInfoService clusterInfoService) {
return new AllocationService(settings,
public static MockAllocationService createAllocationService(Settings settings, ClusterInfoService clusterInfoService) {
return new MockAllocationService(settings,
randomAllocationDeciders(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), getRandom()),
new ShardsAllocators(settings, NoopGatewayAllocator.INSTANCE), clusterInfoService);
}
public static AllocationService createAllocationService(Settings settings, GatewayAllocator allocator) {
return new AllocationService(settings,
public static MockAllocationService createAllocationService(Settings settings, GatewayAllocator allocator) {
return new MockAllocationService(settings,
randomAllocationDeciders(settings, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), getRandom()),
new ShardsAllocators(settings, allocator), EmptyClusterInfoService.INSTANCE);
}
@ -187,9 +182,27 @@ public abstract class ESAllocationTestCase extends ESTestCase {
}
}
/** A lock {@link AllocationService} allowing tests to override time */
protected static class MockAllocationService extends AllocationService {
private Long nanoTimeOverride = null;
public MockAllocationService(Settings settings, AllocationDeciders allocationDeciders, ShardsAllocators shardsAllocators, ClusterInfoService clusterInfoService) {
super(settings, allocationDeciders, shardsAllocators, clusterInfoService);
}
public void setNanoTimeOverride(long nanoTime) {
this.nanoTimeOverride = nanoTime;
}
@Override
protected long currentNanoTime() {
return nanoTimeOverride == null ? super.currentNanoTime() : nanoTimeOverride;
}
}
/**
* Mocks behavior in ReplicaShardAllocator to remove delayed shards from list of unassigned shards so they don't get reassigned yet.
* Also computes delay in UnassignedInfo based on customizable time source.
*/
protected static class DelayedShardsMockGatewayAllocator extends GatewayAllocator {
private final ReplicaShardAllocator replicaShardAllocator = new ReplicaShardAllocator(Settings.EMPTY) {
@ -199,16 +212,11 @@ public abstract class ESAllocationTestCase extends ESTestCase {
}
};
private volatile Function<ShardRouting, Long> timeSource;
public DelayedShardsMockGatewayAllocator() {
super(Settings.EMPTY, null, null);
}
public void setTimeSource(Function<ShardRouting, Long> timeSource) {
this.timeSource = timeSource;
}
@Override
public void applyStartedShards(StartedRerouteAllocation allocation) {}
@ -224,8 +232,7 @@ public abstract class ESAllocationTestCase extends ESTestCase {
if (shard.primary() || shard.allocatedPostIndexCreate() == false) {
continue;
}
changed |= replicaShardAllocator.ignoreUnassignedIfDelayed(timeSource == null ? System.nanoTime() : timeSource.apply(shard),
allocation, unassignedIterator, shard);
changed |= replicaShardAllocator.ignoreUnassignedIfDelayed(unassignedIterator, shard);
}
return changed;
}