Simplify delayed shard allocation (#18351)

This commit simplifies the delayed shard allocation implementation by assigning clear responsibilities to the various components that are affected by delayed shard allocation:

- UnassignedInfo gets a boolean flag delayed which determines whether assignment of the shard should be delayed. The flag gets persisted in the cluster state and is thus available across nodes, i.e. each node knows whether a shard was delayed-unassigned in a specific cluster state. Before, nodes other than the current master were unaware of that information.
- This flag is initially set as true if the shard becomes unassigned due to a node leaving and the index setting index.unassigned.node_left.delayed_timeout being strictly positive. From then on, unassigned shards can only transition from delayed to non-delayed, never in the other direction.
- The reroute step is in charge of removing the delay marker (comparing timestamp when node left to current timestamp).
- A dedicated service DelayedAllocationService, reacting to cluster change events, has the responsibility to schedule reroutes to remove the delay marker.

Closes #18293
This commit is contained in:
Yannick Welsch 2016-05-26 13:39:55 +02:00
parent cad959b980
commit 31b0777c91
24 changed files with 923 additions and 541 deletions

View File

@ -45,17 +45,19 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
private final boolean hasPendingAsyncFetch;
private final String assignedNodeId;
private final UnassignedInfo unassignedInfo;
private final long allocationDelayMillis;
private final long remainingDelayMillis;
private final Map<DiscoveryNode, NodeExplanation> nodeExplanations;
public ClusterAllocationExplanation(ShardId shard, boolean primary, @Nullable String assignedNodeId, long remainingDelayMillis,
@Nullable UnassignedInfo unassignedInfo, boolean hasPendingAsyncFetch,
public ClusterAllocationExplanation(ShardId shard, boolean primary, @Nullable String assignedNodeId, long allocationDelayMillis,
long remainingDelayMillis, @Nullable UnassignedInfo unassignedInfo, boolean hasPendingAsyncFetch,
Map<DiscoveryNode, NodeExplanation> nodeExplanations) {
this.shard = shard;
this.primary = primary;
this.hasPendingAsyncFetch = hasPendingAsyncFetch;
this.assignedNodeId = assignedNodeId;
this.unassignedInfo = unassignedInfo;
this.allocationDelayMillis = allocationDelayMillis;
this.remainingDelayMillis = remainingDelayMillis;
this.nodeExplanations = nodeExplanations;
}
@ -66,6 +68,7 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
this.hasPendingAsyncFetch = in.readBoolean();
this.assignedNodeId = in.readOptionalString();
this.unassignedInfo = in.readOptionalWriteable(UnassignedInfo::new);
this.allocationDelayMillis = in.readVLong();
this.remainingDelayMillis = in.readVLong();
int mapSize = in.readVInt();
@ -84,6 +87,7 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
out.writeBoolean(this.isStillFetchingShardData());
out.writeOptionalString(this.getAssignedNodeId());
out.writeOptionalWriteable(this.getUnassignedInfo());
out.writeVLong(allocationDelayMillis);
out.writeVLong(remainingDelayMillis);
out.writeVInt(this.nodeExplanations.size());
@ -124,7 +128,12 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
return this.unassignedInfo;
}
/** Return the remaining allocation delay for this shard in millisocends */
/** Return the configured delay before the shard can be allocated in milliseconds */
public long getAllocationDelayMillis() {
return this.allocationDelayMillis;
}
/** Return the remaining allocation delay for this shard in milliseconds */
public long getRemainingDelayMillis() {
return this.remainingDelayMillis;
}
@ -152,8 +161,7 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
// If we have unassigned info, show that
if (unassignedInfo != null) {
unassignedInfo.toXContent(builder, params);
long delay = unassignedInfo.getLastComputedLeftDelayNanos();
builder.timeValueField("allocation_delay_in_millis", "allocation_delay", TimeValue.timeValueNanos(delay));
builder.timeValueField("allocation_delay_in_millis", "allocation_delay", TimeValue.timeValueMillis(allocationDelayMillis));
builder.timeValueField("remaining_delay_in_millis", "remaining_delay", TimeValue.timeValueMillis(remainingDelayMillis));
}
builder.startObject("nodes");

View File

@ -59,6 +59,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
/**
* The {@code TransportClusterAllocationExplainAction} is responsible for actually executing the explanation of a shard's allocation on the
* master node in the cluster.
@ -237,9 +239,9 @@ public class TransportClusterAllocationExplainAction
long remainingDelayMillis = 0;
final MetaData metadata = allocation.metaData();
final IndexMetaData indexMetaData = metadata.index(shard.index());
if (ui != null) {
final Settings indexSettings = indexMetaData.getSettings();
long remainingDelayNanos = ui.getRemainingDelay(System.nanoTime(), metadata.settings(), indexSettings);
long allocationDelayMillis = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).getMillis();
if (ui != null && ui.isDelayed()) {
long remainingDelayNanos = ui.getRemainingDelay(System.nanoTime(), indexMetaData.getSettings());
remainingDelayMillis = TimeValue.timeValueNanos(remainingDelayNanos).millis();
}
@ -262,8 +264,9 @@ public class TransportClusterAllocationExplainAction
allocation.hasPendingAsyncFetch());
explanations.put(node, nodeExplanation);
}
return new ClusterAllocationExplanation(shard.shardId(), shard.primary(), shard.currentNodeId(),
remainingDelayMillis, ui, gatewayAllocator.hasFetchPending(shard.shardId(), shard.primary()), explanations);
return new ClusterAllocationExplanation(shard.shardId(), shard.primary(),
shard.currentNodeId(), allocationDelayMillis, remainingDelayMillis, ui,
gatewayAllocator.hasFetchPending(shard.shardId(), shard.primary()), explanations);
}
@Override

View File

@ -33,6 +33,7 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
import org.elasticsearch.cluster.routing.DelayedAllocationService;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
@ -151,6 +152,7 @@ public class ClusterModule extends AbstractModule {
bind(MetaDataIndexTemplateService.class).asEagerSingleton();
bind(IndexNameExpressionResolver.class).asEagerSingleton();
bind(RoutingService.class).asEagerSingleton();
bind(DelayedAllocationService.class).asEagerSingleton();
bind(ShardStateAction.class).asEagerSingleton();
bind(NodeIndexDeletedAction.class).asEagerSingleton();
bind(NodeMappingRefreshAction.class).asEagerSingleton();

View File

@ -0,0 +1,225 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* The {@link DelayedAllocationService} listens to cluster state changes and checks
* if there are unassigned shards with delayed allocation (unassigned shards that have
* the delay marker). These are shards that have become unassigned due to a node leaving
* and which were assigned the delay marker based on the index delay setting
* {@link UnassignedInfo#INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING}
* (see {@link AllocationService#deassociateDeadNodes(RoutingAllocation)}).
* This class is responsible for choosing the next (closest) delay expiration of a
* delayed shard to schedule a reroute to remove the delay marker.
* The actual removal of the delay marker happens in
* {@link AllocationService#removeDelayMarkers(RoutingAllocation)}, triggering yet
* another cluster change event.
*/
public class DelayedAllocationService extends AbstractLifecycleComponent<DelayedAllocationService> implements ClusterStateListener {
static final String CLUSTER_UPDATE_TASK_SOURCE = "delayed_allocation_reroute";
final ThreadPool threadPool;
private final ClusterService clusterService;
private final AllocationService allocationService;
AtomicReference<DelayedRerouteTask> delayedRerouteTask = new AtomicReference<>(); // package private to access from tests
/**
* represents a delayed scheduling of the reroute action that can be cancelled.
*/
class DelayedRerouteTask extends ClusterStateUpdateTask {
final TimeValue nextDelay; // delay until submitting the reroute command
final long baseTimestampNanos; // timestamp (in nanos) upon which delay was calculated
volatile ScheduledFuture future;
final AtomicBoolean cancelScheduling = new AtomicBoolean();
DelayedRerouteTask(TimeValue nextDelay, long baseTimestampNanos) {
this.nextDelay = nextDelay;
this.baseTimestampNanos = baseTimestampNanos;
}
public long scheduledTimeToRunInNanos() {
return baseTimestampNanos + nextDelay.nanos();
}
public void cancelScheduling() {
cancelScheduling.set(true);
FutureUtils.cancel(future);
removeIfSameTask(this);
}
public void schedule() {
future = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
if (cancelScheduling.get()) {
return;
}
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, DelayedRerouteTask.this);
}
@Override
public void onFailure(Throwable t) {
logger.warn("failed to submit schedule/execute reroute post unassigned shard", t);
removeIfSameTask(DelayedRerouteTask.this);
}
});
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
removeIfSameTask(this);
RoutingAllocation.Result routingResult = allocationService.reroute(currentState, "assign delayed unassigned shards");
if (routingResult.changed()) {
return ClusterState.builder(currentState).routingResult(routingResult).build();
} else {
return currentState;
}
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (oldState == newState) {
// no state changed, check when we should remove the delay flag from the shards the next time.
// if cluster state changed, we can leave the scheduling of the next delay up to the clusterChangedEvent
// this should not be needed, but we want to be extra safe here
scheduleIfNeeded(currentNanoTime(), newState);
}
}
@Override
public void onFailure(String source, Throwable t) {
removeIfSameTask(this);
logger.warn("failed to schedule/execute reroute post unassigned shard", t);
}
}
@Inject
public DelayedAllocationService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
AllocationService allocationService) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.allocationService = allocationService;
clusterService.addFirst(this);
}
@Override
protected void doStart() {
}
@Override
protected void doStop() {
}
@Override
protected void doClose() {
clusterService.remove(this);
removeTaskAndCancel();
}
/** override this to control time based decisions during delayed allocation */
protected long currentNanoTime() {
return System.nanoTime();
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
long currentNanoTime = currentNanoTime();
if (event.state().nodes().isLocalNodeElectedMaster()) {
scheduleIfNeeded(currentNanoTime, event.state());
}
}
private void removeTaskAndCancel() {
DelayedRerouteTask existingTask = delayedRerouteTask.getAndSet(null);
if (existingTask != null) {
logger.trace("cancelling existing delayed reroute task");
existingTask.cancelScheduling();
}
}
private void removeIfSameTask(DelayedRerouteTask expectedTask) {
delayedRerouteTask.compareAndSet(expectedTask, null);
}
/**
* Figure out if an existing scheduled reroute is good enough or whether we need to cancel and reschedule.
*/
private void scheduleIfNeeded(long currentNanoTime, ClusterState state) {
assertClusterStateThread();
long nextDelayNanos = UnassignedInfo.findNextDelayedAllocation(currentNanoTime, state);
if (nextDelayNanos < 0) {
logger.trace("no need to schedule reroute - no delayed unassigned shards");
removeTaskAndCancel();
} else {
TimeValue nextDelay = TimeValue.timeValueNanos(nextDelayNanos);
final boolean earlierRerouteNeeded;
DelayedRerouteTask existingTask = delayedRerouteTask.get();
DelayedRerouteTask newTask = new DelayedRerouteTask(nextDelay, currentNanoTime);
if (existingTask == null) {
earlierRerouteNeeded = true;
} else if (newTask.scheduledTimeToRunInNanos() < existingTask.scheduledTimeToRunInNanos()) {
// we need an earlier delayed reroute
logger.trace("cancelling existing delayed reroute task as delayed reroute has to happen [{}] earlier",
TimeValue.timeValueNanos(existingTask.scheduledTimeToRunInNanos() - newTask.scheduledTimeToRunInNanos()));
existingTask.cancelScheduling();
earlierRerouteNeeded = true;
} else {
earlierRerouteNeeded = false;
}
if (earlierRerouteNeeded) {
logger.info("scheduling reroute for delayed shards in [{}] ({} delayed shards)", nextDelay,
UnassignedInfo.getNumberOfDelayedUnassigned(state));
DelayedRerouteTask currentTask = delayedRerouteTask.getAndSet(newTask);
assert existingTask == currentTask || currentTask == null;
newTask.schedule();
} else {
logger.trace("no need to reschedule delayed reroute - currently scheduled delayed reroute in [{}] is enough", nextDelay);
}
}
}
// protected so that it can be overridden (and disabled) by unit tests
protected void assertClusterStateThread() {
ClusterService.assertClusterStateThread();
}
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.cluster.routing;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@ -30,12 +29,7 @@ import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.FutureUtils;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@ -50,27 +44,20 @@ import java.util.concurrent.atomic.AtomicBoolean;
* actions.
* </p>
*/
public class RoutingService extends AbstractLifecycleComponent<RoutingService> implements ClusterStateListener {
public class RoutingService extends AbstractLifecycleComponent<RoutingService> {
private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute";
final ThreadPool threadPool;
private final ClusterService clusterService;
private final AllocationService allocationService;
private AtomicBoolean rerouting = new AtomicBoolean();
private volatile long minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE;
private volatile ScheduledFuture registeredNextDelayFuture;
@Inject
public RoutingService(Settings settings, ThreadPool threadPool, ClusterService clusterService, AllocationService allocationService) {
public RoutingService(Settings settings, ClusterService clusterService, AllocationService allocationService) {
super(settings);
this.threadPool = threadPool;
this.clusterService = clusterService;
this.allocationService = allocationService;
if (clusterService != null) {
clusterService.addFirst(this);
}
}
@Override
@ -83,8 +70,6 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
@Override
protected void doClose() {
FutureUtils.cancel(registeredNextDelayFuture);
clusterService.remove(this);
}
public AllocationService getAllocationService() {
@ -98,48 +83,6 @@ public class RoutingService extends AbstractLifecycleComponent<RoutingService> i
performReroute(reason);
}
@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.state().nodes().isLocalNodeElectedMaster()) {
// Figure out if an existing scheduled reroute is good enough or whether we need to cancel and reschedule.
// If the minimum of the currently relevant delay settings is larger than something we scheduled in the past,
// we are guaranteed that the planned schedule will happen before any of the current shard delays are expired.
long minDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSettingNanos(settings, event.state());
if (minDelaySetting <= 0) {
logger.trace("no need to schedule reroute - no delayed unassigned shards, minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastSchedulingNanos);
minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE;
FutureUtils.cancel(registeredNextDelayFuture);
} else if (minDelaySetting < minDelaySettingAtLastSchedulingNanos) {
FutureUtils.cancel(registeredNextDelayFuture);
minDelaySettingAtLastSchedulingNanos = minDelaySetting;
TimeValue nextDelay = TimeValue.timeValueNanos(UnassignedInfo.findNextDelayedAllocationIn(event.state()));
assert nextDelay.nanos() > 0 : "next delay must be non 0 as minDelaySetting is [" + minDelaySetting + "]";
logger.info("delaying allocation for [{}] unassigned shards, next check in [{}]",
UnassignedInfo.getNumberOfDelayedUnassigned(event.state()), nextDelay);
registeredNextDelayFuture = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE;
reroute("assign delayed unassigned shards");
}
@Override
public void onFailure(Throwable t) {
logger.warn("failed to schedule/execute reroute post unassigned shard", t);
minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE;
}
});
} else {
logger.trace("no need to schedule reroute - current schedule reroute is enough. minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastSchedulingNanos);
}
}
}
// visible for testing
long getMinDelaySettingAtLastSchedulingNanos() {
return this.minDelaySettingAtLastSchedulingNanos;
}
// visible for testing
protected void performReroute(String reason) {
try {

View File

@ -316,6 +316,7 @@ public final class ShardRouting implements Writeable, ToXContent {
public ShardRouting updateUnassignedInfo(UnassignedInfo unassignedInfo) {
assert this.unassignedInfo != null : "can only update unassign info if they are already set";
assert this.unassignedInfo.isDelayed() || (unassignedInfo.isDelayed() == false) : "cannot transition from non-delayed to delayed";
return new ShardRouting(shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state,
unassignedInfo, allocationId, expectedShardSize);
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -43,10 +44,9 @@ import java.io.IOException;
public final class UnassignedInfo implements ToXContent, Writeable {
public static final FormatDateTimeFormatter DATE_TIME_FORMATTER = Joda.forPattern("dateOptionalTime");
private static final TimeValue DEFAULT_DELAYED_NODE_LEFT_TIMEOUT = TimeValue.timeValueMinutes(1);
public static final Setting<TimeValue> INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING =
Setting.timeSetting("index.unassigned.node_left.delayed_timeout", DEFAULT_DELAYED_NODE_LEFT_TIMEOUT, Property.Dynamic,
Setting.timeSetting("index.unassigned.node_left.delayed_timeout", TimeValue.timeValueMinutes(1), Property.Dynamic,
Property.IndexScope);
/**
* Reason why the shard is in unassigned state.
@ -112,19 +112,19 @@ public final class UnassignedInfo implements ToXContent, Writeable {
private final Reason reason;
private final long unassignedTimeMillis; // used for display and log messages, in milliseconds
private final long unassignedTimeNanos; // in nanoseconds, used to calculate delay for delayed shard allocation
private final long lastComputedLeftDelayNanos; // how long to delay shard allocation, not serialized (always positive, 0 means no delay)
private final boolean delayed; // if allocation of this shard is delayed due to INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING
private final String message;
private final Throwable failure;
private final int failedAllocations;
/**
* creates an UnassingedInfo object based **current** time
* creates an UnassignedInfo object based on **current** time
*
* @param reason the cause for making this shard unassigned. See {@link Reason} for more information.
* @param message more information about cause.
**/
public UnassignedInfo(Reason reason, String message) {
this(reason, message, null, reason == Reason.ALLOCATION_FAILED ? 1 : 0, System.nanoTime(), System.currentTimeMillis());
this(reason, message, null, reason == Reason.ALLOCATION_FAILED ? 1 : 0, System.nanoTime(), System.currentTimeMillis(), false);
}
/**
@ -133,28 +133,21 @@ public final class UnassignedInfo implements ToXContent, Writeable {
* @param failure the shard level failure that caused this shard to be unassigned, if exists.
* @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation
* @param unassignedTimeMillis the time of unassignment used to display to in our reporting.
* @param delayed if allocation of this shard is delayed due to INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.
*/
public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure, int failedAllocations, long unassignedTimeNanos, long unassignedTimeMillis) {
public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure, int failedAllocations,
long unassignedTimeNanos, long unassignedTimeMillis, boolean delayed) {
this.reason = reason;
this.unassignedTimeMillis = unassignedTimeMillis;
this.unassignedTimeNanos = unassignedTimeNanos;
this.lastComputedLeftDelayNanos = 0L;
this.delayed = delayed;
this.message = message;
this.failure = failure;
this.failedAllocations = failedAllocations;
assert (failedAllocations > 0) == (reason == Reason.ALLOCATION_FAILED):
assert (failedAllocations > 0) == (reason == Reason.ALLOCATION_FAILED) :
"failedAllocations: " + failedAllocations + " for reason " + reason;
assert !(message == null && failure != null) : "provide a message if a failure exception is provided";
}
public UnassignedInfo(UnassignedInfo unassignedInfo, long newComputedLeftDelayNanos) {
this.reason = unassignedInfo.reason;
this.unassignedTimeMillis = unassignedInfo.unassignedTimeMillis;
this.unassignedTimeNanos = unassignedInfo.unassignedTimeNanos;
this.lastComputedLeftDelayNanos = newComputedLeftDelayNanos;
this.message = unassignedInfo.message;
this.failure = unassignedInfo.failure;
this.failedAllocations = unassignedInfo.failedAllocations;
assert !(delayed && reason != Reason.NODE_LEFT) : "shard can only be delayed if it is unassigned due to a node leaving";
}
public UnassignedInfo(StreamInput in) throws IOException {
@ -163,7 +156,7 @@ public final class UnassignedInfo implements ToXContent, Writeable {
// As System.nanoTime() cannot be compared across different JVMs, reset it to now.
// This means that in master fail-over situations, elapsed delay time is forgotten.
this.unassignedTimeNanos = System.nanoTime();
this.lastComputedLeftDelayNanos = 0L;
this.delayed = in.readBoolean();
this.message = in.readOptionalString();
this.failure = in.readThrowable();
this.failedAllocations = in.readVInt();
@ -173,6 +166,7 @@ public final class UnassignedInfo implements ToXContent, Writeable {
out.writeByte((byte) reason.ordinal());
out.writeLong(unassignedTimeMillis);
// Do not serialize unassignedTimeNanos as System.nanoTime() cannot be compared across different JVMs
out.writeBoolean(delayed);
out.writeOptionalString(message);
out.writeThrowable(failure);
out.writeVInt(failedAllocations);
@ -185,7 +179,16 @@ public final class UnassignedInfo implements ToXContent, Writeable {
/**
* Returns the number of previously failed allocations of this shard.
*/
public int getNumFailedAllocations() { return failedAllocations; }
public int getNumFailedAllocations() {
return failedAllocations;
}
/**
* Returns true if allocation of this shard is delayed due to {@link #INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING}
*/
public boolean isDelayed() {
return delayed;
}
/**
* The reason why the shard is unassigned.
@ -239,50 +242,16 @@ public final class UnassignedInfo implements ToXContent, Writeable {
}
/**
* The allocation delay value in nano seconds associated with the index (defaulting to node settings if not set).
*/
public long getAllocationDelayTimeoutSettingNanos(Settings settings, Settings indexSettings) {
if (reason != Reason.NODE_LEFT) {
return 0;
}
TimeValue delayTimeout = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexSettings, settings);
return Math.max(0L, delayTimeout.nanos());
}
/**
* The delay in nanoseconds until this unassigned shard can be reassigned. This value is cached and might be slightly out-of-date.
* See also the {@link #updateDelay(long, Settings, Settings)} method.
*/
public long getLastComputedLeftDelayNanos() {
return lastComputedLeftDelayNanos;
}
/**
* Calculates the delay left based on current time (in nanoseconds) and index/node settings.
* Calculates the delay left based on current time (in nanoseconds) and the delay defined by the index settings.
* Only relevant if shard is effectively delayed (see {@link #isDelayed()})
* Returns 0 if delay is negative
*
* @return calculated delay in nanoseconds
*/
public long getRemainingDelay(final long nanoTimeNow, final Settings settings, final Settings indexSettings) {
final long delayTimeoutNanos = getAllocationDelayTimeoutSettingNanos(settings, indexSettings);
if (delayTimeoutNanos == 0L) {
return 0L;
} else {
assert nanoTimeNow >= unassignedTimeNanos;
return Math.max(0L, delayTimeoutNanos - (nanoTimeNow - unassignedTimeNanos));
}
}
/**
* Creates new UnassignedInfo object if delay needs updating.
*
* @return new Unassigned with updated delay, or this if no change in delay
*/
public UnassignedInfo updateDelay(final long nanoTimeNow, final Settings settings, final Settings indexSettings) {
final long newComputedLeftDelayNanos = getRemainingDelay(nanoTimeNow, settings, indexSettings);
if (lastComputedLeftDelayNanos == newComputedLeftDelayNanos) {
return this;
}
return new UnassignedInfo(this, newComputedLeftDelayNanos);
public long getRemainingDelay(final long nanoTimeNow, final Settings indexSettings) {
long delayTimeoutNanos = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexSettings).nanos();
assert nanoTimeNow >= unassignedTimeNanos;
return Math.max(0L, delayTimeoutNanos - (nanoTimeNow - unassignedTimeNanos));
}
/**
@ -291,49 +260,34 @@ public final class UnassignedInfo implements ToXContent, Writeable {
public static int getNumberOfDelayedUnassigned(ClusterState state) {
int count = 0;
for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
if (shard.primary() == false) {
long delay = shard.unassignedInfo().getLastComputedLeftDelayNanos();
if (delay > 0) {
count++;
}
if (shard.unassignedInfo().isDelayed()) {
count++;
}
}
return count;
}
/**
* Finds the smallest delay expiration setting in nanos of all unassigned shards that are still delayed. Returns 0 if there are none.
* Finds the next (closest) delay expiration of an delayed shard in nanoseconds based on current time.
* Returns 0 if delay is negative.
* Returns -1 if no delayed shard is found.
*/
public static long findSmallestDelayedAllocationSettingNanos(Settings settings, ClusterState state) {
long minDelaySetting = Long.MAX_VALUE;
for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
if (shard.primary() == false) {
IndexMetaData indexMetaData = state.metaData().index(shard.getIndexName());
boolean delayed = shard.unassignedInfo().getLastComputedLeftDelayNanos() > 0;
long delayTimeoutSetting = shard.unassignedInfo().getAllocationDelayTimeoutSettingNanos(settings, indexMetaData.getSettings());
if (delayed && delayTimeoutSetting > 0 && delayTimeoutSetting < minDelaySetting) {
minDelaySetting = delayTimeoutSetting;
public static long findNextDelayedAllocation(long currentNanoTime, ClusterState state) {
MetaData metaData = state.metaData();
RoutingTable routingTable = state.routingTable();
long nextDelayNanos = Long.MAX_VALUE;
for (ShardRouting shard : routingTable.shardsWithState(ShardRoutingState.UNASSIGNED)) {
UnassignedInfo unassignedInfo = shard.unassignedInfo();
if (unassignedInfo.isDelayed()) {
Settings indexSettings = metaData.index(shard.index()).getSettings();
// calculate next time to schedule
final long newComputedLeftDelayNanos = unassignedInfo.getRemainingDelay(currentNanoTime, indexSettings);
if (newComputedLeftDelayNanos < nextDelayNanos) {
nextDelayNanos = newComputedLeftDelayNanos;
}
}
}
return minDelaySetting == Long.MAX_VALUE ? 0L : minDelaySetting;
}
/**
* Finds the next (closest) delay expiration of an unassigned shard in nanoseconds. Returns 0 if there are none.
*/
public static long findNextDelayedAllocationIn(ClusterState state) {
long nextDelay = Long.MAX_VALUE;
for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
if (shard.primary() == false) {
long nextShardDelay = shard.unassignedInfo().getLastComputedLeftDelayNanos();
if (nextShardDelay > 0 && nextShardDelay < nextDelay) {
nextDelay = nextShardDelay;
}
}
}
return nextDelay == Long.MAX_VALUE ? 0L : nextDelay;
return nextDelayNanos == Long.MAX_VALUE ? -1L : nextDelayNanos;
}
public String shortSummary() {
@ -343,6 +297,7 @@ public final class UnassignedInfo implements ToXContent, Writeable {
if (failedAllocations > 0) {
sb.append(", failed_attempts[").append(failedAllocations).append("]");
}
sb.append(", delayed=").append(delayed);
String details = getDetails();
if (details != null) {
@ -364,6 +319,7 @@ public final class UnassignedInfo implements ToXContent, Writeable {
if (failedAllocations > 0) {
builder.field("failed_attempts", failedAllocations);
}
builder.field("delayed", delayed);
String details = getDetails();
if (details != null) {
builder.field("details", details);
@ -386,6 +342,12 @@ public final class UnassignedInfo implements ToXContent, Writeable {
if (unassignedTimeMillis != that.unassignedTimeMillis) {
return false;
}
if (delayed != that.delayed) {
return false;
}
if (failedAllocations != that.failedAllocations) {
return false;
}
if (reason != that.reason) {
return false;
}
@ -393,12 +355,13 @@ public final class UnassignedInfo implements ToXContent, Writeable {
return false;
}
return !(failure != null ? !failure.equals(that.failure) : that.failure != null);
}
@Override
public int hashCode() {
int result = reason != null ? reason.hashCode() : 0;
result = 31 * result + Boolean.hashCode(delayed);
result = 31 * result + Integer.hashCode(failedAllocations);
result = 31 * result + Long.hashCode(unassignedTimeMillis);
result = 31 * result + (message != null ? message.hashCode() : 0);
result = 31 * result + (failure != null ? failure.hashCode() : 0);

View File

@ -53,6 +53,8 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
/**
* This service manages the node allocation of a cluster. For this reason the
@ -90,7 +92,7 @@ public class AllocationService extends AbstractComponent {
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
StartedRerouteAllocation allocation = new StartedRerouteAllocation(allocationDeciders, routingNodes, clusterState, startedShards, clusterInfoService.getClusterInfo());
StartedRerouteAllocation allocation = new StartedRerouteAllocation(allocationDeciders, routingNodes, clusterState, startedShards, clusterInfoService.getClusterInfo(), currentNanoTime());
boolean changed = applyStartedShards(routingNodes, startedShards);
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
@ -99,28 +101,27 @@ public class AllocationService extends AbstractComponent {
if (withReroute) {
reroute(allocation);
}
final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes);
String startedShardsAsString = firstListElementsToCommaDelimitedString(startedShards, s -> s.shardId().toString());
logClusterHealthStateChange(
new ClusterStateHealth(clusterState),
new ClusterStateHealth(clusterState.metaData(), result.routingTable()),
"shards started [" + startedShardsAsString + "] ..."
);
return result;
return buildResultAndLogHealthChange(allocation, "shards started [" + startedShardsAsString + "] ...");
}
protected RoutingAllocation.Result buildResultAndLogHealthChange(RoutingAllocation allocation, String reason) {
return buildResultAndLogHealthChange(allocation, reason, new RoutingExplanations());
}
protected RoutingAllocation.Result buildChangedResult(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingNodes newRoutingNodes) {
return buildChangedResult(oldMetaData, oldRoutingTable, newRoutingNodes, new RoutingExplanations());
}
protected RoutingAllocation.Result buildChangedResult(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingNodes newRoutingNodes,
RoutingExplanations explanations) {
protected RoutingAllocation.Result buildResultAndLogHealthChange(RoutingAllocation allocation, String reason, RoutingExplanations explanations) {
MetaData oldMetaData = allocation.metaData();
RoutingTable oldRoutingTable = allocation.routingTable();
RoutingNodes newRoutingNodes = allocation.routingNodes();
final RoutingTable newRoutingTable = new RoutingTable.Builder().updateNodes(newRoutingNodes).build();
MetaData newMetaData = updateMetaDataWithRoutingTable(oldMetaData, oldRoutingTable, newRoutingTable);
assert newRoutingTable.validate(newMetaData); // validates the routing table is coherent with the cluster state metadata
logClusterHealthStateChange(
new ClusterStateHealth(allocation.metaData(), allocation.routingTable()),
new ClusterStateHealth(newMetaData, newRoutingTable),
reason
);
return new RoutingAllocation.Result(true, newRoutingTable, newMetaData, explanations);
}
@ -216,7 +217,8 @@ public class AllocationService extends AbstractComponent {
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState, failedShards, clusterInfoService.getClusterInfo());
long currentNanoTime = currentNanoTime();
FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState, failedShards, clusterInfoService.getClusterInfo(), currentNanoTime);
boolean changed = false;
// as failing primaries also fail associated replicas, we fail replicas first here so that their nodes are added to ignore list
List<FailedRerouteAllocation.FailedShard> orderedFailedShards = new ArrayList<>(failedShards);
@ -225,21 +227,38 @@ public class AllocationService extends AbstractComponent {
UnassignedInfo unassignedInfo = failedShard.shard.unassignedInfo();
final int failedAllocations = unassignedInfo != null ? unassignedInfo.getNumFailedAllocations() : 0;
changed |= applyFailedShard(allocation, failedShard.shard, true, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShard.message, failedShard.failure,
failedAllocations + 1, System.nanoTime(), System.currentTimeMillis()));
failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false));
}
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
}
gatewayAllocator.applyFailedShards(allocation);
reroute(allocation);
final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes);
String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.shard.shardId().toString());
logClusterHealthStateChange(
new ClusterStateHealth(clusterState),
new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()),
"shards failed [" + failedShardsAsString + "] ..."
);
return result;
return buildResultAndLogHealthChange(allocation, "shards failed [" + failedShardsAsString + "] ...");
}
/**
* Removes delay markers from unassigned shards based on current time stamp. Returns true if markers were removed.
*/
private boolean removeDelayMarkers(RoutingAllocation allocation) {
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = allocation.routingNodes().unassigned().iterator();
final MetaData metaData = allocation.metaData();
boolean changed = false;
while (unassignedIterator.hasNext()) {
ShardRouting shardRouting = unassignedIterator.next();
UnassignedInfo unassignedInfo = shardRouting.unassignedInfo();
if (unassignedInfo.isDelayed()) {
final long newComputedLeftDelayNanos = unassignedInfo.getRemainingDelay(allocation.getCurrentNanoTime(),
metaData.getIndexSafe(shardRouting.index()).getSettings());
if (newComputedLeftDelayNanos == 0) {
changed = true;
unassignedIterator.updateUnassignedInfo(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(),
unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), unassignedInfo.getUnassignedTimeInMillis(), false));
}
}
}
return changed;
}
/**
@ -276,13 +295,7 @@ public class AllocationService extends AbstractComponent {
// the assumption is that commands will move / act on shards (or fail through exceptions)
// so, there will always be shard "movements", so no need to check on reroute
reroute(allocation);
RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes, explanations);
logClusterHealthStateChange(
new ClusterStateHealth(clusterState),
new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()),
"reroute commands"
);
return result;
return buildResultAndLogHealthChange(allocation, "reroute commands", explanations);
}
@ -310,13 +323,7 @@ public class AllocationService extends AbstractComponent {
if (!reroute(allocation)) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
}
RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes);
logClusterHealthStateChange(
new ClusterStateHealth(clusterState),
new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()),
reason
);
return result;
return buildResultAndLogHealthChange(allocation, reason);
}
private void logClusterHealthStateChange(ClusterStateHealth previousStateHealth, ClusterStateHealth newStateHealth, String reason) {
@ -341,8 +348,7 @@ public class AllocationService extends AbstractComponent {
// now allocate all the unassigned to available nodes
if (allocation.routingNodes().unassigned().size() > 0) {
updateLeftDelayOfUnassignedShards(allocation, settings);
changed |= removeDelayMarkers(allocation);
changed |= gatewayAllocator.allocateUnassigned(allocation);
}
@ -351,22 +357,6 @@ public class AllocationService extends AbstractComponent {
return changed;
}
// public for testing
public static void updateLeftDelayOfUnassignedShards(RoutingAllocation allocation, Settings settings) {
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = allocation.routingNodes().unassigned().iterator();
final MetaData metaData = allocation.metaData();
while (unassignedIterator.hasNext()) {
ShardRouting shardRouting = unassignedIterator.next();
final IndexMetaData indexMetaData = metaData.getIndexSafe(shardRouting.index());
UnassignedInfo previousUnassignedInfo = shardRouting.unassignedInfo();
UnassignedInfo updatedUnassignedInfo = previousUnassignedInfo.updateDelay(allocation.getCurrentNanoTime(), settings,
indexMetaData.getSettings());
if (updatedUnassignedInfo != previousUnassignedInfo) { // reference equality!
unassignedIterator.updateUnassignedInfo(updatedUnassignedInfo);
}
}
}
private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) {
boolean changed = false;
final RoutingNodes routingNodes = allocation.routingNodes();
@ -436,8 +426,10 @@ public class AllocationService extends AbstractComponent {
changed = true;
// now, go over all the shards routing on the node, and fail them
for (ShardRouting shardRouting : node.copyShards()) {
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]", null,
0, allocation.getCurrentNanoTime(), System.currentTimeMillis());
final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0;
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed);
applyFailedShard(allocation, shardRouting, false, unassignedInfo);
}
// its a dead node, remove it, note, its important to remove it *after* we apply failed shard
@ -458,7 +450,7 @@ public class AllocationService extends AbstractComponent {
for (ShardRouting routing : replicas) {
changed |= applyFailedShard(allocation, routing, false,
new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED, "primary failed while replica initializing",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false));
}
return changed;
}

View File

@ -57,8 +57,8 @@ public class FailedRerouteAllocation extends RoutingAllocation {
private final List<FailedShard> failedShards;
public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, List<FailedShard> failedShards, ClusterInfo clusterInfo) {
super(deciders, routingNodes, clusterState, clusterInfo, System.nanoTime(), false);
public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, List<FailedShard> failedShards, ClusterInfo clusterInfo, long currentNanoTime) {
super(deciders, routingNodes, clusterState, clusterInfo, currentNanoTime, false);
this.failedShards = failedShards;
}

View File

@ -56,7 +56,7 @@ public class RoutingAllocation {
private final MetaData metaData;
private RoutingExplanations explanations = new RoutingExplanations();
private final RoutingExplanations explanations;
/**
* Creates a new {@link RoutingAllocation.Result}
@ -65,9 +65,7 @@ public class RoutingAllocation {
* @param metaData the {@link MetaData} this Result references
*/
public Result(boolean changed, RoutingTable routingTable, MetaData metaData) {
this.changed = changed;
this.routingTable = routingTable;
this.metaData = metaData;
this(changed, routingTable, metaData, new RoutingExplanations());
}
/**

View File

@ -35,8 +35,8 @@ public class StartedRerouteAllocation extends RoutingAllocation {
private final List<? extends ShardRouting> startedShards;
public StartedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, List<? extends ShardRouting> startedShards, ClusterInfo clusterInfo) {
super(deciders, routingNodes, clusterState, clusterInfo, System.nanoTime(), false);
public StartedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, List<? extends ShardRouting> startedShards, ClusterInfo clusterInfo, long currentNanoTime) {
super(deciders, routingNodes, clusterState, clusterInfo, currentNanoTime, false);
this.startedShards = startedShards;
}

View File

@ -125,7 +125,7 @@ public class AllocateEmptyPrimaryAllocationCommand extends BasePrimaryAllocation
// we need to move the unassigned info back to treat it as if it was index creation
unassignedInfoToUpdate = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED,
"force empty allocation from previous reason " + shardRouting.unassignedInfo().getReason() + ", " + shardRouting.unassignedInfo().getMessage(),
shardRouting.unassignedInfo().getFailure(), 0, System.nanoTime(), System.currentTimeMillis());
shardRouting.unassignedInfo().getFailure(), 0, System.nanoTime(), System.currentTimeMillis(), false);
}
initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate);

View File

@ -455,7 +455,7 @@ public class ClusterService extends AbstractLifecycleComponent<ClusterService> {
}
/** asserts that the current thread is the cluster state update thread */
public boolean assertClusterStateThread() {
public static boolean assertClusterStateThread() {
assert Thread.currentThread().getName().contains(ClusterService.UPDATE_THREAD_NAME) :
"not called from the cluster state update thread";
return true;

View File

@ -89,7 +89,7 @@ public class NodeJoinController extends AbstractComponent {
assert accumulateJoins.get() : "waitToBeElectedAsMaster is called we are not accumulating joins";
final CountDownLatch done = new CountDownLatch(1);
final ElectionContext newContext = new ElectionContext(callback, requiredMasterJoins, clusterService) {
final ElectionContext newContext = new ElectionContext(callback, requiredMasterJoins) {
@Override
void onClose() {
if (electionContext.compareAndSet(this, null)) {
@ -307,16 +307,14 @@ public class NodeJoinController extends AbstractComponent {
static abstract class ElectionContext implements ElectionCallback {
private final ElectionCallback callback;
private final int requiredMasterJoins;
private final ClusterService clusterService;
/** set to true after enough joins have been seen and a cluster update task is submitted to become master */
final AtomicBoolean pendingSetAsMasterTask = new AtomicBoolean();
final AtomicBoolean closed = new AtomicBoolean();
ElectionContext(ElectionCallback callback, int requiredMasterJoins, ClusterService clusterService) {
ElectionContext(ElectionCallback callback, int requiredMasterJoins) {
this.callback = callback;
this.requiredMasterJoins = requiredMasterJoins;
this.clusterService = clusterService;
}
abstract void onClose();
@ -324,7 +322,7 @@ public class NodeJoinController extends AbstractComponent {
@Override
public void onElectedAsMaster(ClusterState state) {
assert pendingSetAsMasterTask.get() : "onElectedAsMaster called but pendingSetAsMasterTask is not set";
assertClusterStateThread();
ClusterService.assertClusterStateThread();
assert state.nodes().isLocalNodeElectedMaster() : "onElectedAsMaster called but local node is not master";
if (closed.compareAndSet(false, true)) {
try {
@ -337,7 +335,7 @@ public class NodeJoinController extends AbstractComponent {
@Override
public void onFailure(Throwable t) {
assertClusterStateThread();
ClusterService.assertClusterStateThread();
if (closed.compareAndSet(false, true)) {
try {
onClose();
@ -346,10 +344,6 @@ public class NodeJoinController extends AbstractComponent {
}
}
}
private void assertClusterStateThread() {
assert clusterService instanceof ClusterService == false || ((ClusterService) clusterService).assertClusterStateThread();
}
}

View File

@ -1145,14 +1145,14 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
/** cleans any running joining thread and calls {@link #rejoin} */
public ClusterState stopRunningThreadAndRejoin(ClusterState clusterState, String reason) {
assertClusterStateThread();
ClusterService.assertClusterStateThread();
currentJoinThread.set(null);
return rejoin(clusterState, reason);
}
/** starts a new joining thread if there is no currently active one and join thread controlling is started */
public void startNewThreadIfNotRunning() {
assertClusterStateThread();
ClusterService.assertClusterStateThread();
if (joinThreadActive()) {
return;
}
@ -1185,7 +1185,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
* If the given thread is not the currently running join thread, the command is ignored.
*/
public void markThreadAsDoneAndStartNew(Thread joinThread) {
assertClusterStateThread();
ClusterService.assertClusterStateThread();
if (!markThreadAsDone(joinThread)) {
return;
}
@ -1194,7 +1194,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
/** marks the given joinThread as completed. Returns false if the supplied thread is not the currently active join thread */
public boolean markThreadAsDone(Thread joinThread) {
assertClusterStateThread();
ClusterService.assertClusterStateThread();
return currentJoinThread.compareAndSet(joinThread, null);
}
@ -1210,9 +1210,5 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
running.set(true);
}
private void assertClusterStateThread() {
assert clusterService instanceof ClusterService == false || ((ClusterService) clusterService).assertClusterStateThread();
}
}
}

View File

@ -108,7 +108,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
currentNode, nodeWithHighestMatch);
it.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA,
"existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node [" + nodeWithHighestMatch + "]",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false));
changed = true;
}
}
@ -179,7 +179,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
}
} else if (matchingNodes.hasAnyData() == false) {
// if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation of the replica shard needs to be delayed
changed |= ignoreUnassignedIfDelayed(unassignedIterator, shard);
ignoreUnassignedIfDelayed(unassignedIterator, shard);
}
}
return changed;
@ -195,21 +195,16 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
*
* @param unassignedIterator iterator over unassigned shards
* @param shard the shard which might be delayed
* @return true iff allocation is delayed for this shard
*/
public boolean ignoreUnassignedIfDelayed(RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator, ShardRouting shard) {
// calculate delay and store it in UnassignedInfo to be used by RoutingService
long delay = shard.unassignedInfo().getLastComputedLeftDelayNanos();
if (delay > 0) {
logger.debug("[{}][{}]: delaying allocation of [{}] for [{}]", shard.index(), shard.id(), shard, TimeValue.timeValueNanos(delay));
public void ignoreUnassignedIfDelayed(RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator, ShardRouting shard) {
if (shard.unassignedInfo().isDelayed()) {
logger.debug("{}: allocation of [{}] is delayed", shard.shardId(), shard);
/**
* mark it as changed, since we want to kick a publishing to schedule future allocation,
* see {@link org.elasticsearch.cluster.routing.RoutingService#clusterChanged(ClusterChangedEvent)}).
*/
unassignedIterator.removeAndIgnore();
return true;
}
return false;
}
/**

View File

@ -21,19 +21,18 @@ package org.elasticsearch.action.admin.cluster.allocation;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -43,10 +42,8 @@ import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -61,13 +58,11 @@ public final class ClusterAllocationExplanationTests extends ESTestCase {
private Index i = new Index("foo", "uuid");
private ShardRouting primaryShard = ShardRouting.newUnassigned(new ShardId(i, 0), null, true,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
private ShardRouting replicaShard = ShardRouting.newUnassigned(new ShardId(i, 0), null, false,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
private IndexMetaData indexMetaData = IndexMetaData.builder("foo")
.settings(Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_INDEX_UUID, "uuid"))
.putActiveAllocationIds(0, new HashSet<String>(Arrays.asList("aid1", "aid2")))
.putActiveAllocationIds(0, Sets.newHashSet("aid1", "aid2"))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
@ -80,7 +75,6 @@ public final class ClusterAllocationExplanationTests extends ESTestCase {
noDecision.add(Decision.single(Decision.Type.NO, "no label", "no thanks"));
}
private void assertExplanations(NodeExplanation ne, String finalExplanation, ClusterAllocationExplanation.FinalDecision finalDecision,
ClusterAllocationExplanation.StoreCopy storeCopy) {
assertEquals(finalExplanation, ne.getFinalExplanation());
@ -195,6 +189,7 @@ public final class ClusterAllocationExplanationTests extends ESTestCase {
public void testExplanationSerialization() throws Exception {
ShardId shard = new ShardId("test", "uuid", 0);
long allocationDelay = randomIntBetween(0, 500);
long remainingDelay = randomIntBetween(0, 500);
Map<DiscoveryNode, NodeExplanation> nodeExplanations = new HashMap<>(1);
Float nodeWeight = randomFloat();
@ -207,7 +202,7 @@ public final class ClusterAllocationExplanationTests extends ESTestCase {
yesDecision, nodeWeight, storeStatus, "", activeAllocationIds, false);
nodeExplanations.put(ne.getNode(), ne);
ClusterAllocationExplanation cae = new ClusterAllocationExplanation(shard, true,
"assignedNode", remainingDelay, null, false, nodeExplanations);
"assignedNode", allocationDelay, remainingDelay, null, false, nodeExplanations);
BytesStreamOutput out = new BytesStreamOutput();
cae.writeTo(out);
StreamInput in = StreamInput.wrap(out.bytes());
@ -217,6 +212,7 @@ public final class ClusterAllocationExplanationTests extends ESTestCase {
assertTrue(cae2.isAssigned());
assertEquals("assignedNode", cae2.getAssignedNodeId());
assertNull(cae2.getUnassignedInfo());
assertEquals(allocationDelay, cae2.getAllocationDelayMillis());
assertEquals(remainingDelay, cae2.getRemainingDelayMillis());
for (Map.Entry<DiscoveryNode, NodeExplanation> entry : cae2.getNodeExplanations().entrySet()) {
DiscoveryNode node = entry.getKey();
@ -230,7 +226,6 @@ public final class ClusterAllocationExplanationTests extends ESTestCase {
public void testExplanationToXContent() throws Exception {
ShardId shardId = new ShardId("foo", "uuid", 0);
long remainingDelay = 42;
Decision.Multi d = new Decision.Multi();
d.add(Decision.single(Decision.Type.NO, "no label", "because I said no"));
d.add(Decision.single(Decision.Type.YES, "yes label", "yes please"));
@ -245,7 +240,7 @@ public final class ClusterAllocationExplanationTests extends ESTestCase {
Map<DiscoveryNode, NodeExplanation> nodeExplanations = new HashMap<>(1);
nodeExplanations.put(ne.getNode(), ne);
ClusterAllocationExplanation cae = new ClusterAllocationExplanation(shardId, true,
"assignedNode", remainingDelay, null, false, nodeExplanations);
"assignedNode", 42, 42, null, false, nodeExplanations);
XContentBuilder builder = XContentFactory.jsonBuilder();
cae.toXContent(builder, ToXContent.EMPTY_PARAMS);
assertEquals("{\"shard\":{\"index\":\"foo\",\"index_uuid\":\"uuid\",\"id\":0,\"primary\":true},\"assigned\":true," +

View File

@ -108,8 +108,6 @@ public class DelayedAllocationIT extends ESIntegTestCase {
* allocation to a very small value, it kicks the allocation of the unassigned shard
* even though the node it was hosted on will not come back.
*/
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/18293")
@TestLogging("_root:DEBUG,cluster.routing:TRACE")
public void testDelayedAllocationChangeWithSettingTo100ms() throws Exception {
internalCluster().startNodesAsync(3).get();
prepareCreate("test").setSettings(Settings.builder()
@ -145,12 +143,7 @@ public class DelayedAllocationIT extends ESIntegTestCase {
ensureGreen("test");
indexRandomData();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard()));
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true));
}
});
assertBusy(() -> assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true)));
assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1));
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueMillis(0))).get());
ensureGreen("test");

View File

@ -0,0 +1,510 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.routing;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterInfoService;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.gateway.GatewayAllocator;
import org.elasticsearch.test.ESAllocationTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import static java.util.Collections.singleton;
import static org.elasticsearch.cluster.routing.DelayedAllocationService.CLUSTER_UPDATE_TASK_SOURCE;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
/**
*/
public class DelayedAllocationServiceTests extends ESAllocationTestCase {
private TestDelayAllocationService delayedAllocationService;
private MockAllocationService allocationService;
private ClusterService clusterService;
private ThreadPool threadPool;
@Before
public void createDelayedAllocationService() {
threadPool = new ThreadPool(getTestName());
clusterService = mock(ClusterService.class);
allocationService = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
delayedAllocationService = new TestDelayAllocationService(Settings.EMPTY, threadPool, clusterService, allocationService);
verify(clusterService).addFirst(delayedAllocationService);
}
@After
public void shutdownThreadPool() throws Exception {
terminate(threadPool);
}
public void testNoDelayedUnassigned() throws Exception {
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0"))
.numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build();
clusterState = ClusterState.builder(clusterState)
.nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).localNodeId("node1").masterNodeId("node1"))
.build();
clusterState = ClusterState.builder(clusterState).routingResult(allocationService.reroute(clusterState, "reroute")).build();
// starting primaries
clusterState = ClusterState.builder(clusterState)
.routingResult(allocationService.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)))
.build();
// starting replicas
clusterState = ClusterState.builder(clusterState)
.routingResult(allocationService.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)))
.build();
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
ClusterState prevState = clusterState;
// remove node2 and reroute
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()).remove("node2");
boolean nodeAvailableForAllocation = randomBoolean();
if (nodeAvailableForAllocation) {
nodes.put(newNode("node3"));
}
clusterState = ClusterState.builder(clusterState).nodes(nodes).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocationService.reroute(clusterState, "reroute")).build();
ClusterState newState = clusterState;
List<ShardRouting> unassignedShards = newState.getRoutingTable().shardsWithState(ShardRoutingState.UNASSIGNED);
if (nodeAvailableForAllocation) {
assertThat(unassignedShards.size(), equalTo(0));
} else {
assertThat(unassignedShards.size(), equalTo(1));
assertThat(unassignedShards.get(0).unassignedInfo().isDelayed(), equalTo(false));
}
delayedAllocationService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
verifyNoMoreInteractions(clusterService);
assertNull(delayedAllocationService.delayedRerouteTask.get());
}
public void testDelayedUnassignedScheduleReroute() throws Exception {
TimeValue delaySetting = timeValueMillis(100);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delaySetting))
.numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build();
clusterState = ClusterState.builder(clusterState)
.nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).localNodeId("node1").masterNodeId("node1"))
.build();
final long baseTimestampNanos = System.nanoTime();
allocationService.setNanoTimeOverride(baseTimestampNanos);
clusterState = ClusterState.builder(clusterState).routingResult(allocationService.reroute(clusterState, "reroute")).build();
// starting primaries
clusterState = ClusterState.builder(clusterState)
.routingResult(allocationService.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)))
.build();
// starting replicas
clusterState = ClusterState.builder(clusterState)
.routingResult(allocationService.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)))
.build();
assertFalse("no shards should be unassigned", clusterState.getRoutingNodes().unassigned().size() > 0);
String nodeId = null;
final List<ShardRouting> allShards = clusterState.getRoutingNodes().routingTable().allShards("test");
// we need to find the node with the replica otherwise we will not reroute
for (ShardRouting shardRouting : allShards) {
if (shardRouting.primary() == false) {
nodeId = shardRouting.currentNodeId();
break;
}
}
assertNotNull(nodeId);
// remove node that has replica and reroute
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(nodeId)).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocationService.reroute(clusterState, "reroute")).build();
ClusterState stateWithDelayedShard = clusterState;
// make sure the replica is marked as delayed (i.e. not reallocated)
assertEquals(1, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithDelayedShard));
ShardRouting delayedShard = stateWithDelayedShard.getRoutingNodes().unassigned().iterator().next();
assertEquals(baseTimestampNanos, delayedShard.unassignedInfo().getUnassignedTimeInNanos());
// mock ClusterService.submitStateUpdateTask() method
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<ClusterStateUpdateTask> clusterStateUpdateTask = new AtomicReference<>();
doAnswer(invocationOnMock -> {
clusterStateUpdateTask.set((ClusterStateUpdateTask)invocationOnMock.getArguments()[1]);
latch.countDown();
return null;
}).when(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), any(ClusterStateUpdateTask.class));
assertNull(delayedAllocationService.delayedRerouteTask.get());
long delayUntilClusterChangeEvent = TimeValue.timeValueNanos(randomInt((int)delaySetting.nanos() - 1)).nanos();
long clusterChangeEventTimestampNanos = baseTimestampNanos + delayUntilClusterChangeEvent;
delayedAllocationService.setNanoTimeOverride(clusterChangeEventTimestampNanos);
delayedAllocationService.clusterChanged(new ClusterChangedEvent("fake node left", stateWithDelayedShard, clusterState));
// check that delayed reroute task was created and registered with the proper settings
DelayedAllocationService.DelayedRerouteTask delayedRerouteTask = delayedAllocationService.delayedRerouteTask.get();
assertNotNull(delayedRerouteTask);
assertFalse(delayedRerouteTask.cancelScheduling.get());
assertThat(delayedRerouteTask.baseTimestampNanos, equalTo(clusterChangeEventTimestampNanos));
assertThat(delayedRerouteTask.nextDelay.nanos(),
equalTo(delaySetting.nanos() - (clusterChangeEventTimestampNanos - baseTimestampNanos)));
// check that submitStateUpdateTask() was invoked on the cluster service mock
assertTrue(latch.await(30, TimeUnit.SECONDS));
verify(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), eq(clusterStateUpdateTask.get()));
// advance the time on the allocation service to a timestamp that happened after the delayed scheduling
long nanoTimeForReroute = clusterChangeEventTimestampNanos + delaySetting.nanos() + timeValueMillis(randomInt(200)).nanos();
allocationService.setNanoTimeOverride(nanoTimeForReroute);
// apply cluster state
ClusterState stateWithRemovedDelay = clusterStateUpdateTask.get().execute(stateWithDelayedShard);
// check that shard is not delayed anymore
assertEquals(0, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithRemovedDelay));
// check that task is now removed
assertNull(delayedAllocationService.delayedRerouteTask.get());
// simulate calling listener (cluster change event)
delayedAllocationService.setNanoTimeOverride(nanoTimeForReroute + timeValueMillis(randomInt(200)).nanos());
delayedAllocationService.clusterChanged(
new ClusterChangedEvent(CLUSTER_UPDATE_TASK_SOURCE, stateWithRemovedDelay, stateWithDelayedShard));
// check that no new task is scheduled
assertNull(delayedAllocationService.delayedRerouteTask.get());
// check that no further cluster state update was submitted
verifyNoMoreInteractions(clusterService);
}
/**
* This tests that a new delayed reroute is scheduled right after a delayed reroute was run
*/
public void testDelayedUnassignedScheduleRerouteAfterDelayedReroute() throws Exception {
TimeValue shortDelaySetting = timeValueMillis(100);
TimeValue longDelaySetting = TimeValue.timeValueSeconds(1);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("short_delay")
.settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), shortDelaySetting))
.numberOfShards(1).numberOfReplicas(1))
.put(IndexMetaData.builder("long_delay")
.settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), longDelaySetting))
.numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("short_delay")).addAsNew(metaData.index("long_delay")).build())
.nodes(DiscoveryNodes.builder()
.put(newNode("node0", singleton(DiscoveryNode.Role.MASTER))).localNodeId("node0").masterNodeId("node0")
.put(newNode("node1")).put(newNode("node2")).put(newNode("node3")).put(newNode("node4"))).build();
// allocate shards
clusterState = ClusterState.builder(clusterState).routingResult(allocationService.reroute(clusterState, "reroute")).build();
// start primaries
clusterState = ClusterState.builder(clusterState)
.routingResult(allocationService.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)))
.build();
// start replicas
clusterState = ClusterState.builder(clusterState)
.routingResult(allocationService.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)))
.build();
assertThat("all shards should be started", clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4));
// find replica of short_delay
ShardRouting shortDelayReplica = null;
for (ShardRouting shardRouting : clusterState.getRoutingNodes().routingTable().allShards("short_delay")) {
if (shardRouting.primary() == false) {
shortDelayReplica = shardRouting;
break;
}
}
assertNotNull(shortDelayReplica);
// find replica of long_delay
ShardRouting longDelayReplica = null;
for (ShardRouting shardRouting : clusterState.getRoutingNodes().routingTable().allShards("long_delay")) {
if (shardRouting.primary() == false) {
longDelayReplica = shardRouting;
break;
}
}
assertNotNull(longDelayReplica);
final long baseTimestampNanos = System.nanoTime();
// remove node of shortDelayReplica and node of longDelayReplica and reroute
ClusterState clusterStateBeforeNodeLeft = clusterState;
clusterState = ClusterState.builder(clusterState)
.nodes(DiscoveryNodes.builder(clusterState.nodes())
.remove(shortDelayReplica.currentNodeId())
.remove(longDelayReplica.currentNodeId()))
.build();
// make sure both replicas are marked as delayed (i.e. not reallocated)
allocationService.setNanoTimeOverride(baseTimestampNanos);
clusterState = ClusterState.builder(clusterState).routingResult(allocationService.reroute(clusterState, "reroute")).build();
final ClusterState stateWithDelayedShards = clusterState;
assertEquals(2, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithDelayedShards));
RoutingNodes.UnassignedShards.UnassignedIterator iter = stateWithDelayedShards.getRoutingNodes().unassigned().iterator();
assertEquals(baseTimestampNanos, iter.next().unassignedInfo().getUnassignedTimeInNanos());
assertEquals(baseTimestampNanos, iter.next().unassignedInfo().getUnassignedTimeInNanos());
// mock ClusterService.submitStateUpdateTask() method
CountDownLatch latch1 = new CountDownLatch(1);
AtomicReference<ClusterStateUpdateTask> clusterStateUpdateTask1 = new AtomicReference<>();
doAnswer(invocationOnMock -> {
clusterStateUpdateTask1.set((ClusterStateUpdateTask)invocationOnMock.getArguments()[1]);
latch1.countDown();
return null;
}).when(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), any(ClusterStateUpdateTask.class));
assertNull(delayedAllocationService.delayedRerouteTask.get());
long delayUntilClusterChangeEvent = TimeValue.timeValueNanos(randomInt((int)shortDelaySetting.nanos() - 1)).nanos();
long clusterChangeEventTimestampNanos = baseTimestampNanos + delayUntilClusterChangeEvent;
delayedAllocationService.setNanoTimeOverride(clusterChangeEventTimestampNanos);
delayedAllocationService.clusterChanged(
new ClusterChangedEvent("fake node left", stateWithDelayedShards, clusterStateBeforeNodeLeft));
// check that delayed reroute task was created and registered with the proper settings
DelayedAllocationService.DelayedRerouteTask firstDelayedRerouteTask = delayedAllocationService.delayedRerouteTask.get();
assertNotNull(firstDelayedRerouteTask);
assertFalse(firstDelayedRerouteTask.cancelScheduling.get());
assertThat(firstDelayedRerouteTask.baseTimestampNanos, equalTo(clusterChangeEventTimestampNanos));
assertThat(firstDelayedRerouteTask.nextDelay.nanos(),
equalTo(UnassignedInfo.findNextDelayedAllocation(clusterChangeEventTimestampNanos, stateWithDelayedShards)));
assertThat(firstDelayedRerouteTask.nextDelay.nanos(),
equalTo(shortDelaySetting.nanos() - (clusterChangeEventTimestampNanos - baseTimestampNanos)));
// check that submitStateUpdateTask() was invoked on the cluster service mock
assertTrue(latch1.await(30, TimeUnit.SECONDS));
verify(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), eq(clusterStateUpdateTask1.get()));
// advance the time on the allocation service to a timestamp that happened after the delayed scheduling
long nanoTimeForReroute = clusterChangeEventTimestampNanos + shortDelaySetting.nanos() + timeValueMillis(randomInt(50)).nanos();
allocationService.setNanoTimeOverride(nanoTimeForReroute);
// apply cluster state
ClusterState stateWithOnlyOneDelayedShard = clusterStateUpdateTask1.get().execute(stateWithDelayedShards);
// check that shard is not delayed anymore
assertEquals(1, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithOnlyOneDelayedShard));
// check that task is now removed
assertNull(delayedAllocationService.delayedRerouteTask.get());
// mock ClusterService.submitStateUpdateTask() method again
CountDownLatch latch2 = new CountDownLatch(1);
AtomicReference<ClusterStateUpdateTask> clusterStateUpdateTask2 = new AtomicReference<>();
doAnswer(invocationOnMock -> {
clusterStateUpdateTask2.set((ClusterStateUpdateTask)invocationOnMock.getArguments()[1]);
latch2.countDown();
return null;
}).when(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), any(ClusterStateUpdateTask.class));
// simulate calling listener (cluster change event)
delayUntilClusterChangeEvent = timeValueMillis(randomInt(50)).nanos();
clusterChangeEventTimestampNanos = nanoTimeForReroute + delayUntilClusterChangeEvent;
delayedAllocationService.setNanoTimeOverride(clusterChangeEventTimestampNanos);
delayedAllocationService.clusterChanged(
new ClusterChangedEvent(CLUSTER_UPDATE_TASK_SOURCE, stateWithOnlyOneDelayedShard, stateWithDelayedShards));
// check that new delayed reroute task was created and registered with the proper settings
DelayedAllocationService.DelayedRerouteTask secondDelayedRerouteTask = delayedAllocationService.delayedRerouteTask.get();
assertNotNull(secondDelayedRerouteTask);
assertFalse(secondDelayedRerouteTask.cancelScheduling.get());
assertThat(secondDelayedRerouteTask.baseTimestampNanos, equalTo(clusterChangeEventTimestampNanos));
assertThat(secondDelayedRerouteTask.nextDelay.nanos(),
equalTo(UnassignedInfo.findNextDelayedAllocation(clusterChangeEventTimestampNanos, stateWithOnlyOneDelayedShard)));
assertThat(secondDelayedRerouteTask.nextDelay.nanos(),
equalTo(longDelaySetting.nanos() - (clusterChangeEventTimestampNanos - baseTimestampNanos)));
// check that submitStateUpdateTask() was invoked on the cluster service mock
assertTrue(latch2.await(30, TimeUnit.SECONDS));
verify(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), eq(clusterStateUpdateTask2.get()));
// advance the time on the allocation service to a timestamp that happened after the delayed scheduling
nanoTimeForReroute = clusterChangeEventTimestampNanos + longDelaySetting.nanos() + timeValueMillis(randomInt(50)).nanos();
allocationService.setNanoTimeOverride(nanoTimeForReroute);
// apply cluster state
ClusterState stateWithNoDelayedShards = clusterStateUpdateTask2.get().execute(stateWithOnlyOneDelayedShard);
// check that shard is not delayed anymore
assertEquals(0, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithNoDelayedShards));
// check that task is now removed
assertNull(delayedAllocationService.delayedRerouteTask.get());
// simulate calling listener (cluster change event)
delayedAllocationService.setNanoTimeOverride(nanoTimeForReroute + timeValueMillis(randomInt(50)).nanos());
delayedAllocationService.clusterChanged(
new ClusterChangedEvent(CLUSTER_UPDATE_TASK_SOURCE, stateWithNoDelayedShards, stateWithOnlyOneDelayedShard));
// check that no new task is scheduled
assertNull(delayedAllocationService.delayedRerouteTask.get());
// check that no further cluster state update was submitted
verifyNoMoreInteractions(clusterService);
}
public void testDelayedUnassignedScheduleRerouteRescheduledOnShorterDelay() throws Exception {
TimeValue delaySetting = timeValueSeconds(30);
TimeValue shorterDelaySetting = timeValueMillis(100);
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("foo").settings(settings(Version.CURRENT)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delaySetting))
.numberOfShards(1).numberOfReplicas(1))
.put(IndexMetaData.builder("bar").settings(settings(Version.CURRENT)
.put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), shorterDelaySetting))
.numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder()
.addAsNew(metaData.index("foo"))
.addAsNew(metaData.index("bar"))
.build()).build();
clusterState = ClusterState.builder(clusterState)
.nodes(DiscoveryNodes.builder()
.put(newNode("node1")).put(newNode("node2")).put(newNode("node3")).put(newNode("node4"))
.localNodeId("node1").masterNodeId("node1"))
.build();
final long nodeLeftTimestampNanos = System.nanoTime();
allocationService.setNanoTimeOverride(nodeLeftTimestampNanos);
clusterState = ClusterState.builder(clusterState).routingResult(allocationService.reroute(clusterState, "reroute")).build();
// starting primaries
clusterState = ClusterState.builder(clusterState)
.routingResult(allocationService.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)))
.build();
// starting replicas
clusterState = ClusterState.builder(clusterState)
.routingResult(allocationService.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)))
.build();
assertFalse("no shards should be unassigned", clusterState.getRoutingNodes().unassigned().size() > 0);
String nodeIdOfFooReplica = null;
for (ShardRouting shardRouting : clusterState.getRoutingNodes().routingTable().allShards("foo")) {
if (shardRouting.primary() == false) {
nodeIdOfFooReplica = shardRouting.currentNodeId();
break;
}
}
assertNotNull(nodeIdOfFooReplica);
// remove node that has replica and reroute
clusterState = ClusterState.builder(clusterState).nodes(
DiscoveryNodes.builder(clusterState.nodes()).remove(nodeIdOfFooReplica)).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocationService.reroute(clusterState, "fake node left")).build();
ClusterState stateWithDelayedShard = clusterState;
// make sure the replica is marked as delayed (i.e. not reallocated)
assertEquals(1, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithDelayedShard));
ShardRouting delayedShard = stateWithDelayedShard.getRoutingNodes().unassigned().iterator().next();
assertEquals(nodeLeftTimestampNanos, delayedShard.unassignedInfo().getUnassignedTimeInNanos());
assertNull(delayedAllocationService.delayedRerouteTask.get());
long delayUntilClusterChangeEvent = TimeValue.timeValueNanos(randomInt((int)shorterDelaySetting.nanos() - 1)).nanos();
long clusterChangeEventTimestampNanos = nodeLeftTimestampNanos + delayUntilClusterChangeEvent;
delayedAllocationService.setNanoTimeOverride(clusterChangeEventTimestampNanos);
delayedAllocationService.clusterChanged(new ClusterChangedEvent("fake node left", stateWithDelayedShard, clusterState));
// check that delayed reroute task was created and registered with the proper settings
DelayedAllocationService.DelayedRerouteTask delayedRerouteTask = delayedAllocationService.delayedRerouteTask.get();
assertNotNull(delayedRerouteTask);
assertFalse(delayedRerouteTask.cancelScheduling.get());
assertThat(delayedRerouteTask.baseTimestampNanos, equalTo(clusterChangeEventTimestampNanos));
assertThat(delayedRerouteTask.nextDelay.nanos(),
equalTo(delaySetting.nanos() - (clusterChangeEventTimestampNanos - nodeLeftTimestampNanos)));
if (randomBoolean()) {
// update settings with shorter delay
ClusterState stateWithShorterDelay = ClusterState.builder(stateWithDelayedShard).metaData(MetaData.builder(
stateWithDelayedShard.metaData()).updateSettings(Settings.builder().put(
UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), shorterDelaySetting).build(), "foo")).build();
delayedAllocationService.setNanoTimeOverride(clusterChangeEventTimestampNanos);
delayedAllocationService.clusterChanged(
new ClusterChangedEvent("apply shorter delay", stateWithShorterDelay, stateWithDelayedShard));
} else {
// node leaves with replica shard of index bar that has shorter delay
String nodeIdOfBarReplica = null;
for (ShardRouting shardRouting : stateWithDelayedShard.getRoutingNodes().routingTable().allShards("bar")) {
if (shardRouting.primary() == false) {
nodeIdOfBarReplica = shardRouting.currentNodeId();
break;
}
}
assertNotNull(nodeIdOfBarReplica);
// remove node that has replica and reroute
clusterState = ClusterState.builder(stateWithDelayedShard).nodes(
DiscoveryNodes.builder(stateWithDelayedShard.nodes()).remove(nodeIdOfBarReplica)).build();
ClusterState stateWithShorterDelay = ClusterState.builder(clusterState).routingResult(
allocationService.reroute(clusterState, "fake node left")).build();
delayedAllocationService.setNanoTimeOverride(clusterChangeEventTimestampNanos);
delayedAllocationService.clusterChanged(
new ClusterChangedEvent("fake node left", stateWithShorterDelay, stateWithDelayedShard));
}
// check that delayed reroute task was replaced by shorter reroute task
DelayedAllocationService.DelayedRerouteTask shorterDelayedRerouteTask = delayedAllocationService.delayedRerouteTask.get();
assertNotNull(shorterDelayedRerouteTask);
assertNotEquals(shorterDelayedRerouteTask, delayedRerouteTask);
assertTrue(delayedRerouteTask.cancelScheduling.get()); // existing task was cancelled
assertFalse(shorterDelayedRerouteTask.cancelScheduling.get());
assertThat(delayedRerouteTask.baseTimestampNanos, equalTo(clusterChangeEventTimestampNanos));
assertThat(shorterDelayedRerouteTask.nextDelay.nanos(),
equalTo(shorterDelaySetting.nanos() - (clusterChangeEventTimestampNanos - nodeLeftTimestampNanos)));
}
private static class TestDelayAllocationService extends DelayedAllocationService {
private volatile long nanoTimeOverride = -1L;
public TestDelayAllocationService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
AllocationService allocationService) {
super(settings, threadPool, clusterService, allocationService);
}
@Override
protected void assertClusterStateThread() {
// do not check this in the unit tests
}
public void setNanoTimeOverride(long nanoTime) {
this.nanoTimeOverride = nanoTime;
}
@Override
protected long currentNanoTime() {
return nanoTimeOverride == -1L ? super.currentNanoTime() : nanoTimeOverride;
}
}
}

View File

@ -19,32 +19,12 @@
package org.elasticsearch.cluster.routing;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESAllocationTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.util.Collections.singleton;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.equalTo;
/**
@ -58,191 +38,18 @@ public class RoutingServiceTests extends ESAllocationTestCase {
routingService = new TestRoutingService();
}
@After
public void shutdownRoutingService() throws Exception {
routingService.shutdown();
}
public void testReroute() {
assertThat(routingService.hasReroutedAndClear(), equalTo(false));
routingService.reroute("test");
assertThat(routingService.hasReroutedAndClear(), equalTo(true));
}
public void testNoDelayedUnassigned() throws Exception {
AllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0"))
.numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).localNodeId("node1").masterNodeId("node1")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
// starting primaries
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
// remove node2 and reroute
ClusterState prevState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
ClusterState newState = clusterState;
assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(Long.MAX_VALUE));
routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(Long.MAX_VALUE));
assertThat(routingService.hasReroutedAndClear(), equalTo(false));
}
public void testDelayedUnassignedScheduleReroute() throws Exception {
MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms"))
.numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).localNodeId("node1").masterNodeId("node1")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
// starting primaries
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
assertFalse("no shards should be unassigned", clusterState.getRoutingNodes().unassigned().size() > 0);
String nodeId = null;
final List<ShardRouting> allShards = clusterState.getRoutingNodes().routingTable().allShards("test");
// we need to find the node with the replica otherwise we will not reroute
for (ShardRouting shardRouting : allShards) {
if (shardRouting.primary() == false) {
nodeId = shardRouting.currentNodeId();
break;
}
}
assertNotNull(nodeId);
// remove nodeId and reroute
ClusterState prevState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(nodeId)).build();
// make sure the replica is marked as delayed (i.e. not reallocated)
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
assertEquals(1, clusterState.getRoutingNodes().unassigned().size());
ClusterState newState = clusterState;
routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
assertBusy(() -> assertTrue("routing service should have run a reroute", routingService.hasReroutedAndClear()));
// verify the registration has been reset
assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(Long.MAX_VALUE));
}
/**
* This tests that a new delayed reroute is scheduled right after a delayed reroute was run
*/
public void testDelayedUnassignedScheduleRerouteAfterDelayedReroute() throws Exception {
final ThreadPool testThreadPool = new ThreadPool(getTestName());
ClusterService clusterService = createClusterService(testThreadPool);
try {
MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("short_delay").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms"))
.numberOfShards(1).numberOfReplicas(1))
.put(IndexMetaData.builder("long_delay").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "10s"))
.numberOfShards(1).numberOfReplicas(1))
.build();
ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData)
.routingTable(RoutingTable.builder().addAsNew(metaData.index("short_delay")).addAsNew(metaData.index("long_delay")).build())
.nodes(DiscoveryNodes.builder()
.put(newNode("node0", singleton(DiscoveryNode.Role.MASTER))).localNodeId("node0").masterNodeId("node0")
.put(newNode("node1")).put(newNode("node2")).put(newNode("node3")).put(newNode("node4"))).build();
// allocate shards
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
// start primaries
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
// start replicas
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
assertThat("all shards should be started", clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4));
// find replica of short_delay
ShardRouting shortDelayReplica = null;
for (ShardRouting shardRouting : clusterState.getRoutingNodes().routingTable().allShards("short_delay")) {
if (shardRouting.primary() == false) {
shortDelayReplica = shardRouting;
break;
}
}
assertNotNull(shortDelayReplica);
// find replica of long_delay
ShardRouting longDelayReplica = null;
for (ShardRouting shardRouting : clusterState.getRoutingNodes().routingTable().allShards("long_delay")) {
if (shardRouting.primary() == false) {
longDelayReplica = shardRouting;
break;
}
}
assertNotNull(longDelayReplica);
final long baseTime = System.nanoTime();
// remove node of shortDelayReplica and node of longDelayReplica and reroute
ClusterState prevState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(shortDelayReplica.currentNodeId()).remove(longDelayReplica.currentNodeId())).build();
// make sure both replicas are marked as delayed (i.e. not reallocated)
allocation.setNanoTimeOverride(baseTime);
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
// check that shortDelayReplica and longDelayReplica have been marked unassigned
RoutingNodes.UnassignedShards unassigned = clusterState.getRoutingNodes().unassigned();
assertEquals(2, unassigned.size());
// update shortDelayReplica and longDelayReplica variables with new shard routing
ShardRouting shortDelayUnassignedReplica = null;
ShardRouting longDelayUnassignedReplica = null;
for (ShardRouting shr : unassigned) {
if (shr.getIndexName().equals("short_delay")) {
shortDelayUnassignedReplica = shr;
} else {
longDelayUnassignedReplica = shr;
}
}
assertTrue(shortDelayReplica.isSameShard(shortDelayUnassignedReplica));
assertTrue(longDelayReplica.isSameShard(longDelayUnassignedReplica));
// manually trigger a clusterChanged event on routingService
ClusterState newState = clusterState;
setState(clusterService, newState);
// create routing service, also registers listener on cluster service
RoutingService routingService = new RoutingService(Settings.EMPTY, testThreadPool, clusterService, allocation);
routingService.start(); // just so performReroute does not prematurely return
// next (delayed) reroute should only delay longDelayReplica/longDelayUnassignedReplica, simulate that we are now 1 second after shards became unassigned
allocation.setNanoTimeOverride(baseTime + TimeValue.timeValueSeconds(1).nanos());
// register listener on cluster state so we know when cluster state has been changed
CountDownLatch latch = new CountDownLatch(1);
clusterService.addLast(event -> latch.countDown());
// instead of clusterService calling clusterChanged, we call it directly here
routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
// cluster service should have updated state and called routingService with clusterChanged
latch.await();
// verify the registration has been set to the delay of longDelayReplica/longDelayUnassignedReplica
assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(TimeValue.timeValueSeconds(10).nanos()));
} finally {
clusterService.stop();
terminate(testThreadPool);
}
}
private class TestRoutingService extends RoutingService {
private AtomicBoolean rerouted = new AtomicBoolean();
public TestRoutingService() {
super(Settings.EMPTY, new ThreadPool(getTestName()), null, null);
}
void shutdown() throws Exception {
terminate(threadPool);
super(Settings.EMPTY, null, null);
}
public boolean hasReroutedAndClear() {

View File

@ -38,7 +38,6 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.test.ESAllocationTestCase;
import java.util.Collections;
import java.util.EnumSet;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
@ -75,7 +74,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
public void testSerialization() throws Exception {
UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), UnassignedInfo.Reason.values());
UnassignedInfo meta = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ?
new UnassignedInfo(reason, randomBoolean() ? randomAsciiOfLength(4) : null, null, randomIntBetween(1, 100), System.nanoTime(), System.currentTimeMillis()):
new UnassignedInfo(reason, randomBoolean() ? randomAsciiOfLength(4) : null, null, randomIntBetween(1, 100), System.nanoTime(), System.currentTimeMillis(), false):
new UnassignedInfo(reason, randomBoolean() ? randomAsciiOfLength(4) : null);
BytesStreamOutput out = new BytesStreamOutput();
meta.writeTo(out);
@ -262,59 +261,20 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
/**
* Verifies that delayed allocation calculation are correct.
*/
public void testUnassignedDelayedOnlyOnNodeLeft() throws Exception {
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, null);
unassignedInfo = unassignedInfo.updateDelay(unassignedInfo.getUnassignedTimeInNanos() + 1, // add 1 tick delay
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "10h").build(), Settings.EMPTY);
long delay = unassignedInfo.getLastComputedLeftDelayNanos();
long cachedDelay = unassignedInfo.getLastComputedLeftDelayNanos();
assertThat(delay, equalTo(cachedDelay));
assertThat(delay, equalTo(TimeValue.timeValueHours(10).nanos() - 1));
}
/**
* Verifies that delayed allocation is only computed when the reason is NODE_LEFT.
*/
public void testUnassignedDelayOnlyNodeLeftNonNodeLeftReason() throws Exception {
EnumSet<UnassignedInfo.Reason> reasons = EnumSet.allOf(UnassignedInfo.Reason.class);
reasons.remove(UnassignedInfo.Reason.NODE_LEFT);
UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), reasons);
UnassignedInfo unassignedInfo = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ?
new UnassignedInfo(reason, null, null, 1, System.nanoTime(), System.currentTimeMillis()):
new UnassignedInfo(reason, null);
unassignedInfo = unassignedInfo.updateDelay(unassignedInfo.getUnassignedTimeInNanos() + 1, // add 1 tick delay
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "10h").build(), Settings.EMPTY);
long delay = unassignedInfo.getLastComputedLeftDelayNanos();
assertThat(delay, equalTo(0L));
delay = unassignedInfo.getLastComputedLeftDelayNanos();
assertThat(delay, equalTo(0L));
}
/**
* Verifies that delayed allocation calculation are correct.
*/
public void testLeftDelayCalculation() throws Exception {
public void testRemainingDelayCalculation() throws Exception {
final long baseTime = System.nanoTime();
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test", null, 0, baseTime, System.currentTimeMillis());
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test", null, 0, baseTime, System.currentTimeMillis(), randomBoolean());
final long totalDelayNanos = TimeValue.timeValueMillis(10).nanos();
final Settings settings = Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueNanos(totalDelayNanos)).build();
unassignedInfo = unassignedInfo.updateDelay(baseTime, settings, Settings.EMPTY);
long delay = unassignedInfo.getLastComputedLeftDelayNanos();
final Settings indexSettings = Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueNanos(totalDelayNanos)).build();
long delay = unassignedInfo.getRemainingDelay(baseTime, indexSettings);
assertThat(delay, equalTo(totalDelayNanos));
assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos()));
long delta1 = randomIntBetween(1, (int) (totalDelayNanos - 1));
unassignedInfo = unassignedInfo.updateDelay(baseTime + delta1, settings, Settings.EMPTY);
delay = unassignedInfo.getLastComputedLeftDelayNanos();
delay = unassignedInfo.getRemainingDelay(baseTime + delta1, indexSettings);
assertThat(delay, equalTo(totalDelayNanos - delta1));
assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos()));
unassignedInfo = unassignedInfo.updateDelay(baseTime + totalDelayNanos, settings, Settings.EMPTY);
delay = unassignedInfo.getLastComputedLeftDelayNanos();
delay = unassignedInfo.getRemainingDelay(baseTime + totalDelayNanos, indexSettings);
assertThat(delay, equalTo(0L));
assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos()));
unassignedInfo = unassignedInfo.updateDelay(baseTime + totalDelayNanos + randomIntBetween(1, 20), settings, Settings.EMPTY);
delay = unassignedInfo.getLastComputedLeftDelayNanos();
delay = unassignedInfo.getRemainingDelay(baseTime + totalDelayNanos + randomIntBetween(1, 20), indexSettings);
assertThat(delay, equalTo(0L));
assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos()));
}
@ -344,8 +304,6 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
public void testFindNextDelayedAllocation() {
MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
final long baseTime = System.nanoTime();
allocation.setNanoTimeOverride(baseTime);
final TimeValue delayTest1 = TimeValue.timeValueMillis(randomIntBetween(1, 200));
final TimeValue delayTest2 = TimeValue.timeValueMillis(randomIntBetween(1, 200));
final long expectMinDelaySettingsNanos = Math.min(delayTest1.nanos(), delayTest2.nanos());
@ -366,20 +324,18 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
// remove node2 and reroute
final long baseTime = System.nanoTime();
allocation.setNanoTimeOverride(baseTime);
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
final long delta = randomBoolean() ? 0 : randomInt((int) expectMinDelaySettingsNanos);
final long delta = randomBoolean() ? 0 : randomInt((int) expectMinDelaySettingsNanos - 1);
if (delta > 0) {
allocation.setNanoTimeOverride(baseTime + delta);
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "time moved")).build();
}
long minDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSettingNanos(Settings.EMPTY, clusterState);
assertThat(minDelaySetting, equalTo(expectMinDelaySettingsNanos));
long nextDelay = UnassignedInfo.findNextDelayedAllocationIn(clusterState);
assertThat(nextDelay, equalTo(expectMinDelaySettingsNanos - delta));
assertThat(UnassignedInfo.findNextDelayedAllocation(baseTime + delta, clusterState), equalTo(expectMinDelaySettingsNanos - delta));
}
}

View File

@ -521,7 +521,7 @@ public class NodeJoinControllerTests extends ESTestCase {
static class NoopRoutingService extends RoutingService {
public NoopRoutingService(Settings settings) {
super(settings, null, null, new NoopAllocationService(settings));
super(settings, null, new NoopAllocationService(settings));
}
@Override

View File

@ -233,16 +233,14 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
// we sometime return empty list of files, make sure we test this as well
testAllocator.addData(node2, false, null);
}
AllocationService.updateLeftDelayOfUnassignedShards(allocation, Settings.EMPTY);
boolean changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders(),
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueHours(1)).build(), UnassignedInfo.Reason.NODE_LEFT);
testAllocator.addData(node2, false, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"));
AllocationService.updateLeftDelayOfUnassignedShards(allocation, Settings.EMPTY);
changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
@ -290,11 +288,15 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
.numberOfShards(1).numberOfReplicas(1)
.putActiveAllocationIds(0, Sets.newHashSet(primaryShard.allocationId().getId())))
.build();
// mark shard as delayed if reason is NODE_LEFT
boolean delayed = reason == UnassignedInfo.Reason.NODE_LEFT &&
UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(settings).nanos() > 0;
RoutingTable routingTable = RoutingTable.builder()
.add(IndexRoutingTable.builder(shardId.getIndex())
.addIndexShard(new IndexShardRoutingTable.Builder(shardId)
.addShard(primaryShard)
.addShard(ShardRouting.newUnassigned(shardId, null, false, new UnassignedInfo(reason, null)))
.addShard(ShardRouting.newUnassigned(shardId, null, false,
new UnassignedInfo(reason, null, null, 0, System.nanoTime(), System.currentTimeMillis(), delayed)))
.build())
)
.build();

View File

@ -196,7 +196,7 @@ public abstract class ESAllocationTestCase extends ESTestCase {
/** A lock {@link AllocationService} allowing tests to override time */
protected static class MockAllocationService extends AllocationService {
private Long nanoTimeOverride = null;
private volatile long nanoTimeOverride = -1L;
public MockAllocationService(Settings settings, AllocationDeciders allocationDeciders, GatewayAllocator gatewayAllocator,
ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService) {
@ -209,7 +209,7 @@ public abstract class ESAllocationTestCase extends ESTestCase {
@Override
protected long currentNanoTime() {
return nanoTimeOverride == null ? super.currentNanoTime() : nanoTimeOverride;
return nanoTimeOverride == -1L ? super.currentNanoTime() : nanoTimeOverride;
}
}
@ -238,16 +238,15 @@ public abstract class ESAllocationTestCase extends ESTestCase {
@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = allocation.routingNodes().unassigned().iterator();
boolean changed = false;
while (unassignedIterator.hasNext()) {
ShardRouting shard = unassignedIterator.next();
IndexMetaData indexMetaData = allocation.metaData().index(shard.getIndexName());
if (shard.primary() || shard.allocatedPostIndexCreate(indexMetaData) == false) {
continue;
}
changed |= replicaShardAllocator.ignoreUnassignedIfDelayed(unassignedIterator, shard);
replicaShardAllocator.ignoreUnassignedIfDelayed(unassignedIterator, shard);
}
return changed;
return false;
}
}
}