Merge pull request from ywelsch/refactor/delayed-allocation

Simplify delayed shard allocation
This commit is contained in:
Yannick Welsch 2015-11-19 09:58:52 +01:00
commit 6a2fa73fb5
10 changed files with 231 additions and 232 deletions
core/src
main/java/org/elasticsearch
test/java/org/elasticsearch/cluster/routing
docs/reference/index-modules/allocation
test-framework/src/main/java/org/elasticsearch/test

@ -273,13 +273,13 @@ public class TransportClusterHealthAction extends TransportMasterNodeReadAction<
} catch (IndexNotFoundException e) {
// one of the specified indices is not there - treat it as RED.
ClusterHealthResponse response = new ClusterHealthResponse(clusterName.value(), Strings.EMPTY_ARRAY, clusterState,
numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(), settings, clusterState),
numberOfPendingTasks, numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(clusterState),
pendingTaskTimeInQueue);
response.setStatus(ClusterHealthStatus.RED);
return response;
}
return new ClusterHealthResponse(clusterName.value(), concreteIndices, clusterState, numberOfPendingTasks,
numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(), settings, clusterState), pendingTaskTimeInQueue);
numberOfInFlightFetch, UnassignedInfo.getNumberOfDelayedUnassigned(clusterState), pendingTaskTimeInQueue);
}
}

@ -31,7 +31,6 @@ import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.Locale;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
@ -56,9 +55,8 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
private final AllocationService allocationService;
private AtomicBoolean rerouting = new AtomicBoolean();
private volatile long registeredNextDelaySetting = Long.MAX_VALUE;
private volatile long minDelaySettingAtLastScheduling = Long.MAX_VALUE;
private volatile ScheduledFuture registeredNextDelayFuture;
private volatile long unassignedShardsAllocatedTimestamp = 0;
@Inject
public RoutingService(Settings settings, ThreadPool threadPool, ClusterService clusterService, AllocationService allocationService) {
@ -89,19 +87,6 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
return this.allocationService;
}
/**
* Update the last time the allocator tried to assign unassigned shards
*
* This is used so that both the GatewayAllocator and RoutingService use a
* consistent timestamp for comparing which shards have been delayed to
* avoid a race condition where GatewayAllocator thinks the shard should
* be delayed and the RoutingService thinks it has already passed the delay
* and that the GatewayAllocator has/will handle it.
*/
public void setUnassignedShardsAllocatedTimestamp(long timeInMillis) {
this.unassignedShardsAllocatedTimestamp = timeInMillis;
}
/**
* Initiates a reroute.
*/
@ -112,51 +97,43 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().nodes().localNodeMaster()) {
// figure out when the next unassigned allocation need to happen from now. If this is larger or equal
// then the last time we checked and scheduled, we are guaranteed to have a reroute until then, so no need
// to schedule again
long nextDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSetting(settings, event.state());
if (nextDelaySetting > 0 && nextDelaySetting < registeredNextDelaySetting) {
// Figure out if an existing scheduled reroute is good enough or whether we need to cancel and reschedule.
// If the minimum of the currently relevant delay settings is larger than something we scheduled in the past,
// we are guaranteed that the planned schedule will happen before any of the current shard delays are expired.
long minDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSetting(settings, event.state());
if (minDelaySetting <= 0) {
logger.trace("no need to schedule reroute - no delayed unassigned shards, minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastScheduling);
minDelaySettingAtLastScheduling = Long.MAX_VALUE;
FutureUtils.cancel(registeredNextDelayFuture);
registeredNextDelaySetting = nextDelaySetting;
// We calculate nextDelay based on System.currentTimeMillis() here because we want the next delay from the "now" perspective
// rather than the delay from the last time the GatewayAllocator tried to assign/delay the shard.
// The actual calculation is based on the latter though, to account for shards that should have been allocated
// between unassignedShardsAllocatedTimestamp and System.currentTimeMillis()
long nextDelayBasedOnUnassignedShardsAllocatedTimestamp = UnassignedInfo.findNextDelayedAllocationIn(unassignedShardsAllocatedTimestamp, settings, event.state());
// adjust from unassignedShardsAllocatedTimestamp to now
long nextDelayMillis = nextDelayBasedOnUnassignedShardsAllocatedTimestamp - (System.currentTimeMillis() - unassignedShardsAllocatedTimestamp);
if (nextDelayMillis < 0) {
nextDelayMillis = 0;
}
TimeValue nextDelay = TimeValue.timeValueMillis(nextDelayMillis);
int unassignedDelayedShards = UnassignedInfo.getNumberOfDelayedUnassigned(unassignedShardsAllocatedTimestamp, settings, event.state());
if (unassignedDelayedShards > 0) {
logger.info("delaying allocation for [{}] unassigned shards, next check in [{}]",
unassignedDelayedShards, nextDelay);
registeredNextDelayFuture = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
registeredNextDelaySetting = Long.MAX_VALUE;
reroute("assign delayed unassigned shards");
}
} else if (minDelaySetting < minDelaySettingAtLastScheduling) {
FutureUtils.cancel(registeredNextDelayFuture);
minDelaySettingAtLastScheduling = minDelaySetting;
TimeValue nextDelay = TimeValue.timeValueNanos(UnassignedInfo.findNextDelayedAllocationIn(event.state()));
assert nextDelay.nanos() > 0 : "next delay must be non 0 as minDelaySetting is [" + minDelaySetting + "]";
logger.info("delaying allocation for [{}] unassigned shards, next check in [{}]",
UnassignedInfo.getNumberOfDelayedUnassigned(event.state()), nextDelay);
registeredNextDelayFuture = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
minDelaySettingAtLastScheduling = Long.MAX_VALUE;
reroute("assign delayed unassigned shards");
}
@Override
public void onFailure(Throwable t) {
logger.warn("failed to schedule/execute reroute post unassigned shard", t);
registeredNextDelaySetting = Long.MAX_VALUE;
}
});
}
@Override
public void onFailure(Throwable t) {
logger.warn("failed to schedule/execute reroute post unassigned shard", t);
minDelaySettingAtLastScheduling = Long.MAX_VALUE;
}
});
} else {
logger.trace("no need to schedule reroute due to delayed unassigned, next_delay_setting [{}], registered [{}]", nextDelaySetting, registeredNextDelaySetting);
logger.trace("no need to schedule reroute - current schedule reroute is enough. minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastScheduling);
}
}
}
// visible for testing
long getRegisteredNextDelaySetting() {
return this.registeredNextDelaySetting;
long getMinDelaySettingAtLastScheduling() {
return this.minDelaySettingAtLastScheduling;
}
// visible for testing

@ -34,6 +34,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* Holds additional information as to why the shard is in unassigned state.
@ -103,21 +104,24 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
}
private final Reason reason;
private final long timestamp;
private final long unassignedTimeMillis; // used for display and log messages, in milliseconds
private final long unassignedTimeNanos; // in nanoseconds, used to calculate delay for delayed shard allocation
private long lastComputedLeftDelayNanos = 0l; // how long to delay shard allocation, not serialized (always positive, 0 means no delay)
private final String message;
private final Throwable failure;
public UnassignedInfo(Reason reason, String message) {
this(reason, System.currentTimeMillis(), message, null);
this(reason, System.currentTimeMillis(), System.nanoTime(), message, null);
}
public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure) {
this(reason, System.currentTimeMillis(), message, failure);
this(reason, System.currentTimeMillis(), System.nanoTime(), message, failure);
}
private UnassignedInfo(Reason reason, long timestamp, String message, Throwable failure) {
private UnassignedInfo(Reason reason, long unassignedTimeMillis, long timestampNanos, String message, Throwable failure) {
this.reason = reason;
this.timestamp = timestamp;
this.unassignedTimeMillis = unassignedTimeMillis;
this.unassignedTimeNanos = timestampNanos;
this.message = message;
this.failure = failure;
assert !(message == null && failure != null) : "provide a message if a failure exception is provided";
@ -125,14 +129,18 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
UnassignedInfo(StreamInput in) throws IOException {
this.reason = Reason.values()[(int) in.readByte()];
this.timestamp = in.readLong();
this.unassignedTimeMillis = in.readLong();
// As System.nanoTime() cannot be compared across different JVMs, reset it to now.
// This means that in master failover situations, elapsed delay time is forgotten.
this.unassignedTimeNanos = System.nanoTime();
this.message = in.readOptionalString();
this.failure = in.readThrowable();
}
public void writeTo(StreamOutput out) throws IOException {
out.writeByte((byte) reason.ordinal());
out.writeLong(timestamp);
out.writeLong(unassignedTimeMillis);
// Do not serialize unassignedTimeNanos as System.nanoTime() cannot be compared across different JVMs
out.writeOptionalString(message);
out.writeThrowable(failure);
}
@ -149,13 +157,20 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
}
/**
* The timestamp in milliseconds since epoch. Note, we use timestamp here since
* we want to make sure its preserved across node serializations. Extra care need
* to be made if its used to calculate diff (handle negative values) in case of
* time drift.
* The timestamp in milliseconds when the shard became unassigned, based on System.currentTimeMillis().
* Note, we use timestamp here since we want to make sure its preserved across node serializations.
*/
public long getTimestampInMillis() {
return this.timestamp;
public long getUnassignedTimeInMillis() {
return this.unassignedTimeMillis;
}
/**
* The timestamp in nanoseconds when the shard became unassigned, based on System.nanoTime().
* Used to calculate the delay for delayed shard allocation.
* ONLY EXPOSED FOR TESTS!
*/
public long getUnassignedTimeInNanos() {
return this.unassignedTimeNanos;
}
/**
@ -186,7 +201,7 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
}
/**
* The allocation delay value associated with the index (defaulting to node settings if not set).
* The allocation delay value in milliseconds associated with the index (defaulting to node settings if not set).
*/
public long getAllocationDelayTimeoutSetting(Settings settings, Settings indexSettings) {
if (reason != Reason.NODE_LEFT) {
@ -197,31 +212,40 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
}
/**
* The time in millisecond until this unassigned shard can be reassigned.
* The delay in nanoseconds until this unassigned shard can be reassigned. This value is cached and might be slightly out-of-date.
* See also the {@link #updateDelay(long, Settings, Settings)} method.
*/
public long getDelayAllocationExpirationIn(long unassignedShardsAllocatedTimestamp, Settings settings, Settings indexSettings) {
long delayTimeout = getAllocationDelayTimeoutSetting(settings, indexSettings);
if (delayTimeout == 0) {
return 0;
}
long delta = unassignedShardsAllocatedTimestamp - timestamp;
// account for time drift, treat it as no timeout
if (delta < 0) {
return 0;
}
return delayTimeout - delta;
public long getLastComputedLeftDelayNanos() {
return lastComputedLeftDelayNanos;
}
/**
* Updates delay left based on current time (in nanoseconds) and index/node settings.
* Should only be called from ReplicaShardAllocator.
* @return updated delay in nanoseconds
*/
public long updateDelay(long nanoTimeNow, Settings settings, Settings indexSettings) {
long delayTimeoutMillis = getAllocationDelayTimeoutSetting(settings, indexSettings);
final long newComputedLeftDelayNanos;
if (delayTimeoutMillis == 0l) {
newComputedLeftDelayNanos = 0l;
} else {
assert nanoTimeNow >= unassignedTimeNanos;
long delayTimeoutNanos = TimeUnit.NANOSECONDS.convert(delayTimeoutMillis, TimeUnit.MILLISECONDS);
newComputedLeftDelayNanos = Math.max(0l, delayTimeoutNanos - (nanoTimeNow - unassignedTimeNanos));
}
lastComputedLeftDelayNanos = newComputedLeftDelayNanos;
return newComputedLeftDelayNanos;
}
/**
* Returns the number of shards that are unassigned and currently being delayed.
*/
public static int getNumberOfDelayedUnassigned(long unassignedShardsAllocatedTimestamp, Settings settings, ClusterState state) {
public static int getNumberOfDelayedUnassigned(ClusterState state) {
int count = 0;
for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
if (shard.primary() == false) {
IndexMetaData indexMetaData = state.metaData().index(shard.getIndex());
long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(unassignedShardsAllocatedTimestamp, settings, indexMetaData.getSettings());
long delay = shard.unassignedInfo().getLastComputedLeftDelayNanos();
if (delay > 0) {
count++;
}
@ -231,15 +255,16 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
}
/**
* Finds the smallest delay expiration setting of an unassigned shard. Returns 0 if there are none.
* Finds the smallest delay expiration setting in milliseconds of all unassigned shards that are still delayed. Returns 0 if there are none.
*/
public static long findSmallestDelayedAllocationSetting(Settings settings, ClusterState state) {
long nextDelaySetting = Long.MAX_VALUE;
for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
if (shard.primary() == false) {
IndexMetaData indexMetaData = state.metaData().index(shard.getIndex());
long leftDelayNanos = shard.unassignedInfo().getLastComputedLeftDelayNanos();
long delayTimeoutSetting = shard.unassignedInfo().getAllocationDelayTimeoutSetting(settings, indexMetaData.getSettings());
if (delayTimeoutSetting > 0 && delayTimeoutSetting < nextDelaySetting) {
if (leftDelayNanos > 0 && delayTimeoutSetting > 0 && delayTimeoutSetting < nextDelaySetting) {
nextDelaySetting = delayTimeoutSetting;
}
}
@ -249,14 +274,13 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
/**
* Finds the next (closest) delay expiration of an unassigned shard. Returns 0 if there are none.
* Finds the next (closest) delay expiration of an unassigned shard in nanoseconds. Returns 0 if there are none.
*/
public static long findNextDelayedAllocationIn(long unassignedShardsAllocatedTimestamp, Settings settings, ClusterState state) {
public static long findNextDelayedAllocationIn(ClusterState state) {
long nextDelay = Long.MAX_VALUE;
for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
if (shard.primary() == false) {
IndexMetaData indexMetaData = state.metaData().index(shard.getIndex());
long nextShardDelay = shard.unassignedInfo().getDelayAllocationExpirationIn(unassignedShardsAllocatedTimestamp, settings, indexMetaData.getSettings());
long nextShardDelay = shard.unassignedInfo().getLastComputedLeftDelayNanos();
if (nextShardDelay > 0 && nextShardDelay < nextDelay) {
nextDelay = nextShardDelay;
}
@ -268,7 +292,7 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
public String shortSummary() {
StringBuilder sb = new StringBuilder();
sb.append("[reason=").append(reason).append("]");
sb.append(", at[").append(DATE_TIME_FORMATTER.printer().print(timestamp)).append("]");
sb.append(", at[").append(DATE_TIME_FORMATTER.printer().print(unassignedTimeMillis)).append("]");
String details = getDetails();
if (details != null) {
sb.append(", details[").append(details).append("]");
@ -285,7 +309,7 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("unassigned_info");
builder.field("reason", reason);
builder.field("at", DATE_TIME_FORMATTER.printer().print(timestamp));
builder.field("at", DATE_TIME_FORMATTER.printer().print(unassignedTimeMillis));
String details = getDetails();
if (details != null) {
builder.field("details", details);
@ -301,7 +325,7 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
UnassignedInfo that = (UnassignedInfo) o;
if (timestamp != that.timestamp) return false;
if (unassignedTimeMillis != that.unassignedTimeMillis) return false;
if (reason != that.reason) return false;
if (message != null ? !message.equals(that.message) : that.message != null) return false;
return !(failure != null ? !failure.equals(that.failure) : that.failure != null);
@ -311,7 +335,7 @@ public class UnassignedInfo implements ToXContent, Writeable<UnassignedInfo> {
@Override
public int hashCode() {
int result = reason != null ? reason.hashCode() : 0;
result = 31 * result + Long.hashCode(timestamp);
result = 31 * result + Long.hashCode(unassignedTimeMillis);
result = 31 * result + (message != null ? message.hashCode() : 0);
result = 31 * result + (failure != null ? failure.hashCode() : 0);
return result;

@ -113,10 +113,6 @@ public class GatewayAllocator extends AbstractComponent {
}
public boolean allocateUnassigned(final RoutingAllocation allocation) {
// Take a snapshot of the current time and tell the RoutingService
// about it, so it will use a consistent timestamp for delays
long lastAllocateUnassignedRun = System.currentTimeMillis();
this.routingService.setUnassignedShardsAllocatedTimestamp(lastAllocateUnassignedRun);
boolean changed = false;
RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned();
@ -124,7 +120,7 @@ public class GatewayAllocator extends AbstractComponent {
changed |= primaryShardAllocator.allocateUnassigned(allocation);
changed |= replicaShardAllocator.processExistingRecoveries(allocation);
changed |= replicaShardAllocator.allocateUnassigned(allocation, lastAllocateUnassignedRun);
changed |= replicaShardAllocator.allocateUnassigned(allocation);
return changed;
}

@ -111,10 +111,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
}
public boolean allocateUnassigned(RoutingAllocation allocation) {
return allocateUnassigned(allocation, System.currentTimeMillis());
}
public boolean allocateUnassigned(RoutingAllocation allocation, long allocateUnassignedTimestapm) {
long nanoTimeNow = System.nanoTime();
boolean changed = false;
final RoutingNodes routingNodes = allocation.routingNodes();
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
@ -173,27 +170,43 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
unassignedIterator.initialize(nodeWithHighestMatch.nodeId(), shard.version(), allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE));
}
} else if (matchingNodes.hasAnyData() == false) {
// if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation
// of the replica shard needs to be delayed, and if so, add it to the ignore unassigned list
// note: we only care about replica in delayed allocation, since if we have an unassigned primary it
// will anyhow wait to find an existing copy of the shard to be allocated
// note: the other side of the equation is scheduling a reroute in a timely manner, which happens in the RoutingService
IndexMetaData indexMetaData = allocation.metaData().index(shard.getIndex());
long delay = shard.unassignedInfo().getDelayAllocationExpirationIn(allocateUnassignedTimestapm, settings, indexMetaData.getSettings());
if (delay > 0) {
logger.debug("[{}][{}]: delaying allocation of [{}] for [{}]", shard.index(), shard.id(), shard, TimeValue.timeValueMillis(delay));
/**
* mark it as changed, since we want to kick a publishing to schedule future allocation,
* see {@link org.elasticsearch.cluster.routing.RoutingService#clusterChanged(ClusterChangedEvent)}).
*/
changed = true;
unassignedIterator.removeAndIgnore();
}
// if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation of the replica shard needs to be delayed
changed |= ignoreUnassignedIfDelayed(nanoTimeNow, allocation, unassignedIterator, shard);
}
}
return changed;
}
/**
* Check if the allocation of the replica is to be delayed. Compute the delay and if it is delayed, add it to the ignore unassigned list
* Note: we only care about replica in delayed allocation, since if we have an unassigned primary it
* will anyhow wait to find an existing copy of the shard to be allocated
* Note: the other side of the equation is scheduling a reroute in a timely manner, which happens in the RoutingService
*
* PUBLIC FOR TESTS!
*
* @param timeNowNanos Timestamp in nanoseconds representing "now"
* @param allocation the routing allocation
* @param unassignedIterator iterator over unassigned shards
* @param shard the shard which might be delayed
* @return true iff allocation is delayed for this shard
*/
public boolean ignoreUnassignedIfDelayed(long timeNowNanos, RoutingAllocation allocation, RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator, ShardRouting shard) {
IndexMetaData indexMetaData = allocation.metaData().index(shard.getIndex());
// calculate delay and store it in UnassignedInfo to be used by RoutingService
long delay = shard.unassignedInfo().updateDelay(timeNowNanos, settings, indexMetaData.getSettings());
if (delay > 0) {
logger.debug("[{}][{}]: delaying allocation of [{}] for [{}]", shard.index(), shard.id(), shard, TimeValue.timeValueMillis(delay));
/**
* mark it as changed, since we want to kick a publishing to schedule future allocation,
* see {@link org.elasticsearch.cluster.routing.RoutingService#clusterChanged(ClusterChangedEvent)}).
*/
unassignedIterator.removeAndIgnore();
return true;
}
return false;
}
/**
* Can the shard be allocated on at least one node based on the allocation deciders.
*/

@ -234,8 +234,8 @@ public class RestShardsAction extends AbstractCatAction {
if (shard.unassignedInfo() != null) {
table.addCell(shard.unassignedInfo().getReason());
table.addCell(UnassignedInfo.DATE_TIME_FORMATTER.printer().print(shard.unassignedInfo().getTimestampInMillis()));
table.addCell(TimeValue.timeValueMillis(System.currentTimeMillis() - shard.unassignedInfo().getTimestampInMillis()));
table.addCell(UnassignedInfo.DATE_TIME_FORMATTER.printer().print(shard.unassignedInfo().getUnassignedTimeInMillis()));
table.addCell(TimeValue.timeValueMillis(System.currentTimeMillis() - shard.unassignedInfo().getUnassignedTimeInMillis()));
table.addCell(shard.unassignedInfo().getDetails());
} else {
table.addCell(null);

@ -23,28 +23,18 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.test.ESAllocationTestCase;
import org.elasticsearch.test.cluster.TestClusterService;
import org.elasticsearch.test.junit.annotations.TestLogging;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@ -77,7 +67,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
}
public void testNoDelayedUnassigned() throws Exception {
AllocationService allocation = createAllocationService();
AllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "0"))
.numberOfShards(1).numberOfReplicas(1))
@ -98,15 +88,15 @@ public class RoutingServiceTests extends ESAllocationTestCase {
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
ClusterState newState = clusterState;
assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(Long.MAX_VALUE));
assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(Long.MAX_VALUE));
routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(Long.MAX_VALUE));
assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(Long.MAX_VALUE));
assertThat(routingService.hasReroutedAndClear(), equalTo(false));
}
@TestLogging("_root:DEBUG")
public void testDelayedUnassignedScheduleReroute() throws Exception {
AllocationService allocation = createAllocationService();
DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms"))
.numberOfShards(1).numberOfReplicas(1))
@ -131,24 +121,20 @@ public class RoutingServiceTests extends ESAllocationTestCase {
}
}
assertNotNull(nodeId);
// remove node2 and reroute
// remove nodeId and reroute
ClusterState prevState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(nodeId)).build();
// make sure the replica is marked as delayed (i.e. not reallocated)
mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + TimeValue.timeValueMillis(randomIntBetween(0, 99)).nanos());
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
// We need to update the routing service's last attempted run to
// signal that the GatewayAllocator tried to allocated it but
// it was delayed
RoutingNodes.UnassignedShards unassigned = clusterState.getRoutingNodes().unassigned();
assertEquals(1, unassigned.size());
ShardRouting next = unassigned.iterator().next();
routingService.setUnassignedShardsAllocatedTimestamp(next.unassignedInfo().getTimestampInMillis() + randomIntBetween(0, 99));
assertEquals(1, clusterState.getRoutingNodes().unassigned().size());
ClusterState newState = clusterState;
routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
assertBusy(() -> assertTrue("routing service should have run a reroute", routingService.hasReroutedAndClear()));
// verify the registration has been reset
assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(Long.MAX_VALUE));
assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(Long.MAX_VALUE));
}
/**
@ -159,10 +145,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
try {
DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
AllocationService allocation = new AllocationService(Settings.Builder.EMPTY_SETTINGS,
randomAllocationDeciders(Settings.Builder.EMPTY_SETTINGS, new NodeSettingsService(Settings.Builder.EMPTY_SETTINGS), getRandom()),
new ShardsAllocators(Settings.Builder.EMPTY_SETTINGS, mockGatewayAllocator), EmptyClusterInfoService.INSTANCE);
AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("short_delay").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms"))
.numberOfShards(1).numberOfReplicas(1))
@ -206,7 +189,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
ClusterState prevState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(shortDelayReplica.currentNodeId()).remove(longDelayReplica.currentNodeId())).build();
// make sure both replicas are marked as delayed (i.e. not reallocated)
mockGatewayAllocator.setShardsToDelay(Arrays.asList(shortDelayReplica, longDelayReplica));
mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + 1);
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
// check that shortDelayReplica and longDelayReplica have been marked unassigned
@ -232,10 +215,8 @@ public class RoutingServiceTests extends ESAllocationTestCase {
// create routing service, also registers listener on cluster service
RoutingService routingService = new RoutingService(Settings.EMPTY, testThreadPool, clusterService, allocation);
routingService.start(); // just so performReroute does not prematurely return
// ensure routing service has proper timestamp before triggering
routingService.setUnassignedShardsAllocatedTimestamp(shortDelayUnassignedReplica.unassignedInfo().getTimestampInMillis() + randomIntBetween(0, 50));
// next (delayed) reroute should only delay longDelayReplica/longDelayUnassignedReplica
mockGatewayAllocator.setShardsToDelay(Arrays.asList(longDelayUnassignedReplica));
// next (delayed) reroute should only delay longDelayReplica/longDelayUnassignedReplica, simulate that we are now 1 second after shards became unassigned
mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + TimeValue.timeValueSeconds(1).nanos());
// register listener on cluster state so we know when cluster state has been changed
CountDownLatch latch = new CountDownLatch(1);
clusterService.addLast(event -> latch.countDown());
@ -244,14 +225,15 @@ public class RoutingServiceTests extends ESAllocationTestCase {
// cluster service should have updated state and called routingService with clusterChanged
latch.await();
// verify the registration has been set to the delay of longDelayReplica/longDelayUnassignedReplica
assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(10000L));
assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(TimeValue.timeValueSeconds(10).millis()));
} finally {
terminate(testThreadPool);
}
}
public void testDelayedUnassignedDoesNotRerouteForNegativeDelays() throws Exception {
AllocationService allocation = createAllocationService();
DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "100ms"))
.numberOfShards(1).numberOfReplicas(1))
@ -271,7 +253,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
// Set it in the future so the delay will be negative
routingService.setUnassignedShardsAllocatedTimestamp(System.currentTimeMillis() + TimeValue.timeValueMinutes(1).millis());
mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + TimeValue.timeValueMinutes(1).nanos());
ClusterState newState = clusterState;
@ -282,7 +264,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
assertThat(routingService.hasReroutedAndClear(), equalTo(false));
// verify the registration has been updated
assertThat(routingService.getRegisteredNextDelaySetting(), equalTo(100L));
assertThat(routingService.getMinDelaySettingAtLastScheduling(), equalTo(100L));
}
});
}
@ -309,46 +291,4 @@ public class RoutingServiceTests extends ESAllocationTestCase {
rerouted.set(true);
}
}
/**
* Mocks behavior in ReplicaShardAllocator to remove delayed shards from list of unassigned shards so they don't get reassigned yet.
* It does not implement the full logic but shards that are to be delayed need to be explicitly set using the method setShardsToDelay(...).
*/
private static class DelayedShardsMockGatewayAllocator extends GatewayAllocator {
volatile List<ShardRouting> delayedShards = Collections.emptyList();
public DelayedShardsMockGatewayAllocator() {
super(Settings.EMPTY, null, null);
}
@Override
public void applyStartedShards(StartedRerouteAllocation allocation) {}
@Override
public void applyFailedShards(FailedRerouteAllocation allocation) {}
/**
* Explicitly set which shards should be delayed in the next allocateUnassigned calls
*/
public void setShardsToDelay(List<ShardRouting> delayedShards) {
this.delayedShards = delayedShards;
}
@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
final RoutingNodes routingNodes = allocation.routingNodes();
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = routingNodes.unassigned().iterator();
boolean changed = false;
while (unassignedIterator.hasNext()) {
ShardRouting shard = unassignedIterator.next();
for (ShardRouting shardToDelay : delayedShards) {
if (shard.isSameShard(shardToDelay)) {
changed = true;
unassignedIterator.removeAndIgnore();
}
}
}
return changed;
}
}
}

@ -81,7 +81,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
UnassignedInfo read = new UnassignedInfo(StreamInput.wrap(out.bytes()));
assertThat(read.getReason(), equalTo(meta.getReason()));
assertThat(read.getTimestampInMillis(), equalTo(meta.getTimestampInMillis()));
assertThat(read.getUnassignedTimeInMillis(), equalTo(meta.getUnassignedTimeInMillis()));
assertThat(read.getMessage(), equalTo(meta.getMessage()));
assertThat(read.getDetails(), equalTo(meta.getDetails()));
}
@ -222,7 +222,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo(), notNullValue());
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.NODE_LEFT));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getTimestampInMillis(), greaterThan(0l));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getUnassignedTimeInMillis(), greaterThan(0l));
}
/**
@ -253,7 +253,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getMessage(), equalTo("test fail"));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getDetails(), equalTo("test fail"));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getTimestampInMillis(), greaterThan(0l));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getUnassignedTimeInMillis(), greaterThan(0l));
}
/**
@ -261,17 +261,11 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
*/
public void testUnassignedDelayedOnlyOnNodeLeft() throws Exception {
final UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, null);
long delay = unassignedInfo.getAllocationDelayTimeoutSetting(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), Settings.EMPTY);
assertThat(delay, equalTo(TimeValue.timeValueHours(10).millis()));
assertBusy(new Runnable() {
@Override
public void run() {
long delay = unassignedInfo.getDelayAllocationExpirationIn(System.currentTimeMillis(),
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), Settings.EMPTY);
assertThat(delay, greaterThan(0l));
assertThat(delay, lessThan(TimeValue.timeValueHours(10).millis()));
}
});
long delay = unassignedInfo.updateDelay(unassignedInfo.getUnassignedTimeInNanos() + 1, // add 1 tick delay
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), Settings.EMPTY);
long cachedDelay = unassignedInfo.getLastComputedLeftDelayNanos();
assertThat(delay, equalTo(cachedDelay));
assertThat(delay, equalTo(TimeValue.timeValueHours(10).nanos() - 1));
}
/**
@ -281,15 +275,16 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
EnumSet<UnassignedInfo.Reason> reasons = EnumSet.allOf(UnassignedInfo.Reason.class);
reasons.remove(UnassignedInfo.Reason.NODE_LEFT);
UnassignedInfo unassignedInfo = new UnassignedInfo(RandomPicks.randomFrom(getRandom(), reasons), null);
long delay = unassignedInfo.getAllocationDelayTimeoutSetting(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), Settings.EMPTY);
assertThat(delay, equalTo(0l));
delay = unassignedInfo.getDelayAllocationExpirationIn(System.currentTimeMillis(),
long delay = unassignedInfo.updateDelay(unassignedInfo.getUnassignedTimeInNanos() + 1, // add 1 tick delay
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), Settings.EMPTY);
assertThat(delay, equalTo(0l));
delay = unassignedInfo.getLastComputedLeftDelayNanos();
assertThat(delay, equalTo(0l));
}
public void testNumberOfDelayedUnassigned() throws Exception {
AllocationService allocation = createAllocationService();
DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
@ -299,8 +294,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test1")).addAsNew(metaData.index("test2")).build()).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(),
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState), equalTo(0));
assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(clusterState), equalTo(0));
// starting primaries
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
@ -308,24 +302,25 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
// remove node2 and reroute
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
// make sure both replicas are marked as delayed (i.e. not reallocated)
mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + 1);
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
assertThat(clusterState.prettyPrint(), UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(),
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState), equalTo(2));
assertThat(clusterState.prettyPrint(), UnassignedInfo.getNumberOfDelayedUnassigned(clusterState), equalTo(2));
}
public void testFindNextDelayedAllocation() {
AllocationService allocation = createAllocationService();
DelayedShardsMockGatewayAllocator mockGatewayAllocator = new DelayedShardsMockGatewayAllocator();
AllocationService allocation = createAllocationService(Settings.EMPTY, mockGatewayAllocator);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1))
.put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h")).numberOfShards(1).numberOfReplicas(1))
.put(IndexMetaData.builder("test2").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h")).numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test1")).addAsNew(metaData.index("test2")).build()).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2"))).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(System.currentTimeMillis(),
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState), equalTo(0));
assertThat(UnassignedInfo.getNumberOfDelayedUnassigned(clusterState), equalTo(0));
// starting primaries
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
@ -333,14 +328,15 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
// remove node2 and reroute
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
// make sure both replicas are marked as delayed (i.e. not reallocated)
mockGatewayAllocator.setTimeSource(shard -> shard.unassignedInfo().getUnassignedTimeInNanos() + 1);
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
long nextDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSetting(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState);
assertThat(nextDelaySetting, equalTo(TimeValue.timeValueHours(10).millis()));
long nextDelay = UnassignedInfo.findNextDelayedAllocationIn(System.currentTimeMillis(),
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING, "10h").build(), clusterState);
assertThat(nextDelay, greaterThan(TimeValue.timeValueHours(9).millis()));
assertThat(nextDelay, lessThanOrEqualTo(TimeValue.timeValueHours(10).millis()));
long nextDelay = UnassignedInfo.findNextDelayedAllocationIn(clusterState);
assertThat(nextDelay, greaterThan(TimeValue.timeValueHours(9).nanos()));
assertThat(nextDelay, lessThanOrEqualTo(TimeValue.timeValueHours(10).nanos()));
}
}

@ -58,7 +58,9 @@ With delayed allocation enabled, the above scenario changes to look like this:
NOTE: This setting will not affect the promotion of replicas to primaries, nor
will it affect the assignment of replicas that have not been assigned
previously.
previously. In particular, delayed allocation does not come into effect after a full cluster restart.
Also, in case of a master failover situation, elapsed delay time is forgotten
(i.e. reset to the full initial delay).
==== Cancellation of shard relocation

@ -25,10 +25,13 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
@ -36,7 +39,10 @@ import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.gateway.AsyncShardFetch;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.gateway.ReplicaShardAllocator;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
@ -46,6 +52,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.function.Function;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.common.util.CollectionUtils.arrayAsArrayList;
@ -179,4 +186,48 @@ public abstract class ESAllocationTestCase extends ESTestCase {
return decision;
}
}
/**
* Mocks behavior in ReplicaShardAllocator to remove delayed shards from list of unassigned shards so they don't get reassigned yet.
* Also computes delay in UnassignedInfo based on customizable time source.
*/
protected static class DelayedShardsMockGatewayAllocator extends GatewayAllocator {
private final ReplicaShardAllocator replicaShardAllocator = new ReplicaShardAllocator(Settings.EMPTY) {
@Override
protected AsyncShardFetch.FetchResult<TransportNodesListShardStoreMetaData.NodeStoreFilesMetaData> fetchData(ShardRouting shard, RoutingAllocation allocation) {
return new AsyncShardFetch.FetchResult<>(shard.shardId(), null, Collections.<String>emptySet(), Collections.<String>emptySet());
}
};
private volatile Function<ShardRouting, Long> timeSource;
public DelayedShardsMockGatewayAllocator() {
super(Settings.EMPTY, null, null);
}
public void setTimeSource(Function<ShardRouting, Long> timeSource) {
this.timeSource = timeSource;
}
@Override
public void applyStartedShards(StartedRerouteAllocation allocation) {}
@Override
public void applyFailedShards(FailedRerouteAllocation allocation) {}
@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = allocation.routingNodes().unassigned().iterator();
boolean changed = false;
while (unassignedIterator.hasNext()) {
ShardRouting shard = unassignedIterator.next();
if (shard.primary() || shard.allocatedPostIndexCreate() == false) {
continue;
}
changed |= replicaShardAllocator.ignoreUnassignedIfDelayed(timeSource == null ? System.nanoTime() : timeSource.apply(shard),
allocation, unassignedIterator, shard);
}
return changed;
}
}
}