From 31b0777c911a87b7ebae0efd7d5b14d1e06f3879 Mon Sep 17 00:00:00 2001
From: Yannick Welsch
Date: Thu, 26 May 2016 13:39:55 +0200
Subject: [PATCH] Simplify delayed shard allocation (#18351)
This commit simplifies the delayed shard allocation implementation by assigning clear responsibilities to the various components that are affected by delayed shard allocation:
- UnassignedInfo gets a boolean flag delayed which determines whether assignment of the shard should be delayed. The flag gets persisted in the cluster state and is thus available across nodes, i.e. each node knows whether a shard was delayed-unassigned in a specific cluster state. Before, nodes other than the current master were unaware of that information.
- This flag is initially set as true if the shard becomes unassigned due to a node leaving and the index setting index.unassigned.node_left.delayed_timeout being strictly positive. From then on, unassigned shards can only transition from delayed to non-delayed, never in the other direction.
- The reroute step is in charge of removing the delay marker (comparing timestamp when node left to current timestamp).
- A dedicated service DelayedAllocationService, reacting to cluster change events, has the responsibility to schedule reroutes to remove the delay marker.
Closes #18293
---
.../ClusterAllocationExplanation.java | 18 +-
...ansportClusterAllocationExplainAction.java | 13 +-
.../elasticsearch/cluster/ClusterModule.java | 2 +
.../routing/DelayedAllocationService.java | 225 ++++++++
.../cluster/routing/RoutingService.java | 61 +--
.../cluster/routing/ShardRouting.java | 1 +
.../cluster/routing/UnassignedInfo.java | 153 ++----
.../routing/allocation/AllocationService.java | 112 ++--
.../allocation/FailedRerouteAllocation.java | 4 +-
.../routing/allocation/RoutingAllocation.java | 6 +-
.../allocation/StartedRerouteAllocation.java | 4 +-
...AllocateEmptyPrimaryAllocationCommand.java | 2 +-
.../cluster/service/ClusterService.java | 2 +-
.../discovery/zen/NodeJoinController.java | 14 +-
.../discovery/zen/ZenDiscovery.java | 12 +-
.../gateway/ReplicaShardAllocator.java | 15 +-
.../ClusterAllocationExplanationTests.java | 17 +-
.../cluster/routing/DelayedAllocationIT.java | 9 +-
.../DelayedAllocationServiceTests.java | 510 ++++++++++++++++++
.../cluster/routing/RoutingServiceTests.java | 195 +------
.../cluster/routing/UnassignedInfoTests.java | 68 +--
.../zen/NodeJoinControllerTests.java | 2 +-
.../gateway/ReplicaShardAllocatorTests.java | 10 +-
.../test/ESAllocationTestCase.java | 9 +-
24 files changed, 923 insertions(+), 541 deletions(-)
create mode 100644 core/src/main/java/org/elasticsearch/cluster/routing/DelayedAllocationService.java
create mode 100644 core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java
index 9ca0efd6c56..e007929faf2 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanation.java
@@ -45,17 +45,19 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
private final boolean hasPendingAsyncFetch;
private final String assignedNodeId;
private final UnassignedInfo unassignedInfo;
+ private final long allocationDelayMillis;
private final long remainingDelayMillis;
private final Map nodeExplanations;
- public ClusterAllocationExplanation(ShardId shard, boolean primary, @Nullable String assignedNodeId, long remainingDelayMillis,
- @Nullable UnassignedInfo unassignedInfo, boolean hasPendingAsyncFetch,
+ public ClusterAllocationExplanation(ShardId shard, boolean primary, @Nullable String assignedNodeId, long allocationDelayMillis,
+ long remainingDelayMillis, @Nullable UnassignedInfo unassignedInfo, boolean hasPendingAsyncFetch,
Map nodeExplanations) {
this.shard = shard;
this.primary = primary;
this.hasPendingAsyncFetch = hasPendingAsyncFetch;
this.assignedNodeId = assignedNodeId;
this.unassignedInfo = unassignedInfo;
+ this.allocationDelayMillis = allocationDelayMillis;
this.remainingDelayMillis = remainingDelayMillis;
this.nodeExplanations = nodeExplanations;
}
@@ -66,6 +68,7 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
this.hasPendingAsyncFetch = in.readBoolean();
this.assignedNodeId = in.readOptionalString();
this.unassignedInfo = in.readOptionalWriteable(UnassignedInfo::new);
+ this.allocationDelayMillis = in.readVLong();
this.remainingDelayMillis = in.readVLong();
int mapSize = in.readVInt();
@@ -84,6 +87,7 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
out.writeBoolean(this.isStillFetchingShardData());
out.writeOptionalString(this.getAssignedNodeId());
out.writeOptionalWriteable(this.getUnassignedInfo());
+ out.writeVLong(allocationDelayMillis);
out.writeVLong(remainingDelayMillis);
out.writeVInt(this.nodeExplanations.size());
@@ -124,7 +128,12 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
return this.unassignedInfo;
}
- /** Return the remaining allocation delay for this shard in millisocends */
+ /** Return the configured delay before the shard can be allocated in milliseconds */
+ public long getAllocationDelayMillis() {
+ return this.allocationDelayMillis;
+ }
+
+ /** Return the remaining allocation delay for this shard in milliseconds */
public long getRemainingDelayMillis() {
return this.remainingDelayMillis;
}
@@ -152,8 +161,7 @@ public final class ClusterAllocationExplanation implements ToXContent, Writeable
// If we have unassigned info, show that
if (unassignedInfo != null) {
unassignedInfo.toXContent(builder, params);
- long delay = unassignedInfo.getLastComputedLeftDelayNanos();
- builder.timeValueField("allocation_delay_in_millis", "allocation_delay", TimeValue.timeValueNanos(delay));
+ builder.timeValueField("allocation_delay_in_millis", "allocation_delay", TimeValue.timeValueMillis(allocationDelayMillis));
builder.timeValueField("remaining_delay_in_millis", "remaining_delay", TimeValue.timeValueMillis(remainingDelayMillis));
}
builder.startObject("nodes");
diff --git a/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportClusterAllocationExplainAction.java b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportClusterAllocationExplainAction.java
index 46a4d1795ba..c5e1f6cc2a1 100644
--- a/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportClusterAllocationExplainAction.java
+++ b/core/src/main/java/org/elasticsearch/action/admin/cluster/allocation/TransportClusterAllocationExplainAction.java
@@ -59,6 +59,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
+
/**
* The {@code TransportClusterAllocationExplainAction} is responsible for actually executing the explanation of a shard's allocation on the
* master node in the cluster.
@@ -237,9 +239,9 @@ public class TransportClusterAllocationExplainAction
long remainingDelayMillis = 0;
final MetaData metadata = allocation.metaData();
final IndexMetaData indexMetaData = metadata.index(shard.index());
- if (ui != null) {
- final Settings indexSettings = indexMetaData.getSettings();
- long remainingDelayNanos = ui.getRemainingDelay(System.nanoTime(), metadata.settings(), indexSettings);
+ long allocationDelayMillis = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).getMillis();
+ if (ui != null && ui.isDelayed()) {
+ long remainingDelayNanos = ui.getRemainingDelay(System.nanoTime(), indexMetaData.getSettings());
remainingDelayMillis = TimeValue.timeValueNanos(remainingDelayNanos).millis();
}
@@ -262,8 +264,9 @@ public class TransportClusterAllocationExplainAction
allocation.hasPendingAsyncFetch());
explanations.put(node, nodeExplanation);
}
- return new ClusterAllocationExplanation(shard.shardId(), shard.primary(), shard.currentNodeId(),
- remainingDelayMillis, ui, gatewayAllocator.hasFetchPending(shard.shardId(), shard.primary()), explanations);
+ return new ClusterAllocationExplanation(shard.shardId(), shard.primary(),
+ shard.currentNodeId(), allocationDelayMillis, remainingDelayMillis, ui,
+ gatewayAllocator.hasFetchPending(shard.shardId(), shard.primary()), explanations);
}
@Override
diff --git a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java
index a02e399ac0c..707521b2cfe 100644
--- a/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java
+++ b/core/src/main/java/org/elasticsearch/cluster/ClusterModule.java
@@ -33,6 +33,7 @@ import org.elasticsearch.cluster.metadata.MetaDataIndexTemplateService;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService;
import org.elasticsearch.cluster.node.DiscoveryNodeService;
+import org.elasticsearch.cluster.routing.DelayedAllocationService;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.routing.RoutingService;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
@@ -151,6 +152,7 @@ public class ClusterModule extends AbstractModule {
bind(MetaDataIndexTemplateService.class).asEagerSingleton();
bind(IndexNameExpressionResolver.class).asEagerSingleton();
bind(RoutingService.class).asEagerSingleton();
+ bind(DelayedAllocationService.class).asEagerSingleton();
bind(ShardStateAction.class).asEagerSingleton();
bind(NodeIndexDeletedAction.class).asEagerSingleton();
bind(NodeMappingRefreshAction.class).asEagerSingleton();
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/DelayedAllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/DelayedAllocationService.java
new file mode 100644
index 00000000000..e79884cf9c9
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/DelayedAllocationService.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.routing;
+
+import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateListener;
+import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.routing.allocation.AllocationService;
+import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.component.AbstractLifecycleComponent;
+import org.elasticsearch.common.inject.Inject;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.common.util.concurrent.AbstractRunnable;
+import org.elasticsearch.common.util.concurrent.FutureUtils;
+import org.elasticsearch.threadpool.ThreadPool;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * The {@link DelayedAllocationService} listens to cluster state changes and checks
+ * if there are unassigned shards with delayed allocation (unassigned shards that have
+ * the delay marker). These are shards that have become unassigned due to a node leaving
+ * and which were assigned the delay marker based on the index delay setting
+ * {@link UnassignedInfo#INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING}
+ * (see {@link AllocationService#deassociateDeadNodes(RoutingAllocation)}).
+ * This class is responsible for choosing the next (closest) delay expiration of a
+ * delayed shard to schedule a reroute to remove the delay marker.
+ * The actual removal of the delay marker happens in
+ * {@link AllocationService#removeDelayMarkers(RoutingAllocation)}, triggering yet
+ * another cluster change event.
+ */
+public class DelayedAllocationService extends AbstractLifecycleComponent implements ClusterStateListener {
+
+ static final String CLUSTER_UPDATE_TASK_SOURCE = "delayed_allocation_reroute";
+
+ final ThreadPool threadPool;
+ private final ClusterService clusterService;
+ private final AllocationService allocationService;
+
+ AtomicReference delayedRerouteTask = new AtomicReference<>(); // package private to access from tests
+
+ /**
+ * represents a delayed scheduling of the reroute action that can be cancelled.
+ */
+ class DelayedRerouteTask extends ClusterStateUpdateTask {
+ final TimeValue nextDelay; // delay until submitting the reroute command
+ final long baseTimestampNanos; // timestamp (in nanos) upon which delay was calculated
+ volatile ScheduledFuture future;
+ final AtomicBoolean cancelScheduling = new AtomicBoolean();
+
+ DelayedRerouteTask(TimeValue nextDelay, long baseTimestampNanos) {
+ this.nextDelay = nextDelay;
+ this.baseTimestampNanos = baseTimestampNanos;
+ }
+
+ public long scheduledTimeToRunInNanos() {
+ return baseTimestampNanos + nextDelay.nanos();
+ }
+
+ public void cancelScheduling() {
+ cancelScheduling.set(true);
+ FutureUtils.cancel(future);
+ removeIfSameTask(this);
+ }
+
+ public void schedule() {
+ future = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() {
+ @Override
+ protected void doRun() throws Exception {
+ if (cancelScheduling.get()) {
+ return;
+ }
+ clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE, DelayedRerouteTask.this);
+ }
+
+ @Override
+ public void onFailure(Throwable t) {
+ logger.warn("failed to submit schedule/execute reroute post unassigned shard", t);
+ removeIfSameTask(DelayedRerouteTask.this);
+ }
+ });
+ }
+
+ @Override
+ public ClusterState execute(ClusterState currentState) throws Exception {
+ removeIfSameTask(this);
+ RoutingAllocation.Result routingResult = allocationService.reroute(currentState, "assign delayed unassigned shards");
+ if (routingResult.changed()) {
+ return ClusterState.builder(currentState).routingResult(routingResult).build();
+ } else {
+ return currentState;
+ }
+ }
+
+ @Override
+ public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
+ if (oldState == newState) {
+ // no state changed, check when we should remove the delay flag from the shards the next time.
+ // if cluster state changed, we can leave the scheduling of the next delay up to the clusterChangedEvent
+ // this should not be needed, but we want to be extra safe here
+ scheduleIfNeeded(currentNanoTime(), newState);
+ }
+ }
+
+ @Override
+ public void onFailure(String source, Throwable t) {
+ removeIfSameTask(this);
+ logger.warn("failed to schedule/execute reroute post unassigned shard", t);
+ }
+ }
+
+ @Inject
+ public DelayedAllocationService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
+ AllocationService allocationService) {
+ super(settings);
+ this.threadPool = threadPool;
+ this.clusterService = clusterService;
+ this.allocationService = allocationService;
+ clusterService.addFirst(this);
+ }
+
+ @Override
+ protected void doStart() {
+ }
+
+ @Override
+ protected void doStop() {
+ }
+
+ @Override
+ protected void doClose() {
+ clusterService.remove(this);
+ removeTaskAndCancel();
+ }
+
+ /** override this to control time based decisions during delayed allocation */
+ protected long currentNanoTime() {
+ return System.nanoTime();
+ }
+
+ @Override
+ public void clusterChanged(ClusterChangedEvent event) {
+ long currentNanoTime = currentNanoTime();
+ if (event.state().nodes().isLocalNodeElectedMaster()) {
+ scheduleIfNeeded(currentNanoTime, event.state());
+ }
+ }
+
+ private void removeTaskAndCancel() {
+ DelayedRerouteTask existingTask = delayedRerouteTask.getAndSet(null);
+ if (existingTask != null) {
+ logger.trace("cancelling existing delayed reroute task");
+ existingTask.cancelScheduling();
+ }
+ }
+
+ private void removeIfSameTask(DelayedRerouteTask expectedTask) {
+ delayedRerouteTask.compareAndSet(expectedTask, null);
+ }
+
+ /**
+ * Figure out if an existing scheduled reroute is good enough or whether we need to cancel and reschedule.
+ */
+ private void scheduleIfNeeded(long currentNanoTime, ClusterState state) {
+ assertClusterStateThread();
+ long nextDelayNanos = UnassignedInfo.findNextDelayedAllocation(currentNanoTime, state);
+ if (nextDelayNanos < 0) {
+ logger.trace("no need to schedule reroute - no delayed unassigned shards");
+ removeTaskAndCancel();
+ } else {
+ TimeValue nextDelay = TimeValue.timeValueNanos(nextDelayNanos);
+ final boolean earlierRerouteNeeded;
+ DelayedRerouteTask existingTask = delayedRerouteTask.get();
+ DelayedRerouteTask newTask = new DelayedRerouteTask(nextDelay, currentNanoTime);
+ if (existingTask == null) {
+ earlierRerouteNeeded = true;
+ } else if (newTask.scheduledTimeToRunInNanos() < existingTask.scheduledTimeToRunInNanos()) {
+ // we need an earlier delayed reroute
+ logger.trace("cancelling existing delayed reroute task as delayed reroute has to happen [{}] earlier",
+ TimeValue.timeValueNanos(existingTask.scheduledTimeToRunInNanos() - newTask.scheduledTimeToRunInNanos()));
+ existingTask.cancelScheduling();
+ earlierRerouteNeeded = true;
+ } else {
+ earlierRerouteNeeded = false;
+ }
+
+ if (earlierRerouteNeeded) {
+ logger.info("scheduling reroute for delayed shards in [{}] ({} delayed shards)", nextDelay,
+ UnassignedInfo.getNumberOfDelayedUnassigned(state));
+ DelayedRerouteTask currentTask = delayedRerouteTask.getAndSet(newTask);
+ assert existingTask == currentTask || currentTask == null;
+ newTask.schedule();
+ } else {
+ logger.trace("no need to reschedule delayed reroute - currently scheduled delayed reroute in [{}] is enough", nextDelay);
+ }
+ }
+ }
+
+ // protected so that it can be overridden (and disabled) by unit tests
+ protected void assertClusterStateThread() {
+ ClusterService.assertClusterStateThread();
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java
index 1ebd4699d1a..78e7e15d389 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/RoutingService.java
@@ -21,7 +21,6 @@ package org.elasticsearch.cluster.routing;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
@@ -30,12 +29,7 @@ import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.TimeValue;
-import org.elasticsearch.common.util.concurrent.AbstractRunnable;
-import org.elasticsearch.common.util.concurrent.FutureUtils;
-import org.elasticsearch.threadpool.ThreadPool;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -50,27 +44,20 @@ import java.util.concurrent.atomic.AtomicBoolean;
* actions.
*
*/
-public class RoutingService extends AbstractLifecycleComponent implements ClusterStateListener {
+public class RoutingService extends AbstractLifecycleComponent {
private static final String CLUSTER_UPDATE_TASK_SOURCE = "cluster_reroute";
- final ThreadPool threadPool;
private final ClusterService clusterService;
private final AllocationService allocationService;
private AtomicBoolean rerouting = new AtomicBoolean();
- private volatile long minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE;
- private volatile ScheduledFuture registeredNextDelayFuture;
@Inject
- public RoutingService(Settings settings, ThreadPool threadPool, ClusterService clusterService, AllocationService allocationService) {
+ public RoutingService(Settings settings, ClusterService clusterService, AllocationService allocationService) {
super(settings);
- this.threadPool = threadPool;
this.clusterService = clusterService;
this.allocationService = allocationService;
- if (clusterService != null) {
- clusterService.addFirst(this);
- }
}
@Override
@@ -83,8 +70,6 @@ public class RoutingService extends AbstractLifecycleComponent i
@Override
protected void doClose() {
- FutureUtils.cancel(registeredNextDelayFuture);
- clusterService.remove(this);
}
public AllocationService getAllocationService() {
@@ -98,48 +83,6 @@ public class RoutingService extends AbstractLifecycleComponent i
performReroute(reason);
}
- @Override
- public void clusterChanged(ClusterChangedEvent event) {
- if (event.state().nodes().isLocalNodeElectedMaster()) {
- // Figure out if an existing scheduled reroute is good enough or whether we need to cancel and reschedule.
- // If the minimum of the currently relevant delay settings is larger than something we scheduled in the past,
- // we are guaranteed that the planned schedule will happen before any of the current shard delays are expired.
- long minDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSettingNanos(settings, event.state());
- if (minDelaySetting <= 0) {
- logger.trace("no need to schedule reroute - no delayed unassigned shards, minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastSchedulingNanos);
- minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE;
- FutureUtils.cancel(registeredNextDelayFuture);
- } else if (minDelaySetting < minDelaySettingAtLastSchedulingNanos) {
- FutureUtils.cancel(registeredNextDelayFuture);
- minDelaySettingAtLastSchedulingNanos = minDelaySetting;
- TimeValue nextDelay = TimeValue.timeValueNanos(UnassignedInfo.findNextDelayedAllocationIn(event.state()));
- assert nextDelay.nanos() > 0 : "next delay must be non 0 as minDelaySetting is [" + minDelaySetting + "]";
- logger.info("delaying allocation for [{}] unassigned shards, next check in [{}]",
- UnassignedInfo.getNumberOfDelayedUnassigned(event.state()), nextDelay);
- registeredNextDelayFuture = threadPool.schedule(nextDelay, ThreadPool.Names.SAME, new AbstractRunnable() {
- @Override
- protected void doRun() throws Exception {
- minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE;
- reroute("assign delayed unassigned shards");
- }
-
- @Override
- public void onFailure(Throwable t) {
- logger.warn("failed to schedule/execute reroute post unassigned shard", t);
- minDelaySettingAtLastSchedulingNanos = Long.MAX_VALUE;
- }
- });
- } else {
- logger.trace("no need to schedule reroute - current schedule reroute is enough. minDelaySetting [{}], scheduled [{}]", minDelaySetting, minDelaySettingAtLastSchedulingNanos);
- }
- }
- }
-
- // visible for testing
- long getMinDelaySettingAtLastSchedulingNanos() {
- return this.minDelaySettingAtLastSchedulingNanos;
- }
-
// visible for testing
protected void performReroute(String reason) {
try {
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java
index 60bb455dfe6..810f8c183da 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/ShardRouting.java
@@ -316,6 +316,7 @@ public final class ShardRouting implements Writeable, ToXContent {
public ShardRouting updateUnassignedInfo(UnassignedInfo unassignedInfo) {
assert this.unassignedInfo != null : "can only update unassign info if they are already set";
+ assert this.unassignedInfo.isDelayed() || (unassignedInfo.isDelayed() == false) : "cannot transition from non-delayed to delayed";
return new ShardRouting(shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state,
unassignedInfo, allocationId, expectedShardSize);
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java
index bc44cd1701c..5d6333972b3 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/UnassignedInfo.java
@@ -22,6 +22,7 @@ package org.elasticsearch.cluster.routing;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@@ -43,10 +44,9 @@ import java.io.IOException;
public final class UnassignedInfo implements ToXContent, Writeable {
public static final FormatDateTimeFormatter DATE_TIME_FORMATTER = Joda.forPattern("dateOptionalTime");
- private static final TimeValue DEFAULT_DELAYED_NODE_LEFT_TIMEOUT = TimeValue.timeValueMinutes(1);
public static final Setting INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING =
- Setting.timeSetting("index.unassigned.node_left.delayed_timeout", DEFAULT_DELAYED_NODE_LEFT_TIMEOUT, Property.Dynamic,
+ Setting.timeSetting("index.unassigned.node_left.delayed_timeout", TimeValue.timeValueMinutes(1), Property.Dynamic,
Property.IndexScope);
/**
* Reason why the shard is in unassigned state.
@@ -112,19 +112,19 @@ public final class UnassignedInfo implements ToXContent, Writeable {
private final Reason reason;
private final long unassignedTimeMillis; // used for display and log messages, in milliseconds
private final long unassignedTimeNanos; // in nanoseconds, used to calculate delay for delayed shard allocation
- private final long lastComputedLeftDelayNanos; // how long to delay shard allocation, not serialized (always positive, 0 means no delay)
+ private final boolean delayed; // if allocation of this shard is delayed due to INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING
private final String message;
private final Throwable failure;
private final int failedAllocations;
/**
- * creates an UnassingedInfo object based **current** time
+ * creates an UnassignedInfo object based on **current** time
*
* @param reason the cause for making this shard unassigned. See {@link Reason} for more information.
* @param message more information about cause.
**/
public UnassignedInfo(Reason reason, String message) {
- this(reason, message, null, reason == Reason.ALLOCATION_FAILED ? 1 : 0, System.nanoTime(), System.currentTimeMillis());
+ this(reason, message, null, reason == Reason.ALLOCATION_FAILED ? 1 : 0, System.nanoTime(), System.currentTimeMillis(), false);
}
/**
@@ -133,28 +133,21 @@ public final class UnassignedInfo implements ToXContent, Writeable {
* @param failure the shard level failure that caused this shard to be unassigned, if exists.
* @param unassignedTimeNanos the time to use as the base for any delayed re-assignment calculation
* @param unassignedTimeMillis the time of unassignment used to display to in our reporting.
+ * @param delayed if allocation of this shard is delayed due to INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.
*/
- public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure, int failedAllocations, long unassignedTimeNanos, long unassignedTimeMillis) {
+ public UnassignedInfo(Reason reason, @Nullable String message, @Nullable Throwable failure, int failedAllocations,
+ long unassignedTimeNanos, long unassignedTimeMillis, boolean delayed) {
this.reason = reason;
this.unassignedTimeMillis = unassignedTimeMillis;
this.unassignedTimeNanos = unassignedTimeNanos;
- this.lastComputedLeftDelayNanos = 0L;
+ this.delayed = delayed;
this.message = message;
this.failure = failure;
this.failedAllocations = failedAllocations;
- assert (failedAllocations > 0) == (reason == Reason.ALLOCATION_FAILED):
+ assert (failedAllocations > 0) == (reason == Reason.ALLOCATION_FAILED) :
"failedAllocations: " + failedAllocations + " for reason " + reason;
assert !(message == null && failure != null) : "provide a message if a failure exception is provided";
- }
-
- public UnassignedInfo(UnassignedInfo unassignedInfo, long newComputedLeftDelayNanos) {
- this.reason = unassignedInfo.reason;
- this.unassignedTimeMillis = unassignedInfo.unassignedTimeMillis;
- this.unassignedTimeNanos = unassignedInfo.unassignedTimeNanos;
- this.lastComputedLeftDelayNanos = newComputedLeftDelayNanos;
- this.message = unassignedInfo.message;
- this.failure = unassignedInfo.failure;
- this.failedAllocations = unassignedInfo.failedAllocations;
+ assert !(delayed && reason != Reason.NODE_LEFT) : "shard can only be delayed if it is unassigned due to a node leaving";
}
public UnassignedInfo(StreamInput in) throws IOException {
@@ -163,7 +156,7 @@ public final class UnassignedInfo implements ToXContent, Writeable {
// As System.nanoTime() cannot be compared across different JVMs, reset it to now.
// This means that in master fail-over situations, elapsed delay time is forgotten.
this.unassignedTimeNanos = System.nanoTime();
- this.lastComputedLeftDelayNanos = 0L;
+ this.delayed = in.readBoolean();
this.message = in.readOptionalString();
this.failure = in.readThrowable();
this.failedAllocations = in.readVInt();
@@ -173,6 +166,7 @@ public final class UnassignedInfo implements ToXContent, Writeable {
out.writeByte((byte) reason.ordinal());
out.writeLong(unassignedTimeMillis);
// Do not serialize unassignedTimeNanos as System.nanoTime() cannot be compared across different JVMs
+ out.writeBoolean(delayed);
out.writeOptionalString(message);
out.writeThrowable(failure);
out.writeVInt(failedAllocations);
@@ -185,7 +179,16 @@ public final class UnassignedInfo implements ToXContent, Writeable {
/**
* Returns the number of previously failed allocations of this shard.
*/
- public int getNumFailedAllocations() { return failedAllocations; }
+ public int getNumFailedAllocations() {
+ return failedAllocations;
+ }
+
+ /**
+ * Returns true if allocation of this shard is delayed due to {@link #INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING}
+ */
+ public boolean isDelayed() {
+ return delayed;
+ }
/**
* The reason why the shard is unassigned.
@@ -239,50 +242,16 @@ public final class UnassignedInfo implements ToXContent, Writeable {
}
/**
- * The allocation delay value in nano seconds associated with the index (defaulting to node settings if not set).
- */
- public long getAllocationDelayTimeoutSettingNanos(Settings settings, Settings indexSettings) {
- if (reason != Reason.NODE_LEFT) {
- return 0;
- }
- TimeValue delayTimeout = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexSettings, settings);
- return Math.max(0L, delayTimeout.nanos());
- }
-
- /**
- * The delay in nanoseconds until this unassigned shard can be reassigned. This value is cached and might be slightly out-of-date.
- * See also the {@link #updateDelay(long, Settings, Settings)} method.
- */
- public long getLastComputedLeftDelayNanos() {
- return lastComputedLeftDelayNanos;
- }
-
- /**
- * Calculates the delay left based on current time (in nanoseconds) and index/node settings.
+ * Calculates the delay left based on current time (in nanoseconds) and the delay defined by the index settings.
+ * Only relevant if shard is effectively delayed (see {@link #isDelayed()})
+ * Returns 0 if delay is negative
*
* @return calculated delay in nanoseconds
*/
- public long getRemainingDelay(final long nanoTimeNow, final Settings settings, final Settings indexSettings) {
- final long delayTimeoutNanos = getAllocationDelayTimeoutSettingNanos(settings, indexSettings);
- if (delayTimeoutNanos == 0L) {
- return 0L;
- } else {
- assert nanoTimeNow >= unassignedTimeNanos;
- return Math.max(0L, delayTimeoutNanos - (nanoTimeNow - unassignedTimeNanos));
- }
- }
-
- /**
- * Creates new UnassignedInfo object if delay needs updating.
- *
- * @return new Unassigned with updated delay, or this if no change in delay
- */
- public UnassignedInfo updateDelay(final long nanoTimeNow, final Settings settings, final Settings indexSettings) {
- final long newComputedLeftDelayNanos = getRemainingDelay(nanoTimeNow, settings, indexSettings);
- if (lastComputedLeftDelayNanos == newComputedLeftDelayNanos) {
- return this;
- }
- return new UnassignedInfo(this, newComputedLeftDelayNanos);
+ public long getRemainingDelay(final long nanoTimeNow, final Settings indexSettings) {
+ long delayTimeoutNanos = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexSettings).nanos();
+ assert nanoTimeNow >= unassignedTimeNanos;
+ return Math.max(0L, delayTimeoutNanos - (nanoTimeNow - unassignedTimeNanos));
}
/**
@@ -291,49 +260,34 @@ public final class UnassignedInfo implements ToXContent, Writeable {
public static int getNumberOfDelayedUnassigned(ClusterState state) {
int count = 0;
for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
- if (shard.primary() == false) {
- long delay = shard.unassignedInfo().getLastComputedLeftDelayNanos();
- if (delay > 0) {
- count++;
- }
+ if (shard.unassignedInfo().isDelayed()) {
+ count++;
}
}
return count;
}
/**
- * Finds the smallest delay expiration setting in nanos of all unassigned shards that are still delayed. Returns 0 if there are none.
+ * Finds the next (closest) delay expiration of an delayed shard in nanoseconds based on current time.
+ * Returns 0 if delay is negative.
+ * Returns -1 if no delayed shard is found.
*/
- public static long findSmallestDelayedAllocationSettingNanos(Settings settings, ClusterState state) {
- long minDelaySetting = Long.MAX_VALUE;
- for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
- if (shard.primary() == false) {
- IndexMetaData indexMetaData = state.metaData().index(shard.getIndexName());
- boolean delayed = shard.unassignedInfo().getLastComputedLeftDelayNanos() > 0;
- long delayTimeoutSetting = shard.unassignedInfo().getAllocationDelayTimeoutSettingNanos(settings, indexMetaData.getSettings());
- if (delayed && delayTimeoutSetting > 0 && delayTimeoutSetting < minDelaySetting) {
- minDelaySetting = delayTimeoutSetting;
+ public static long findNextDelayedAllocation(long currentNanoTime, ClusterState state) {
+ MetaData metaData = state.metaData();
+ RoutingTable routingTable = state.routingTable();
+ long nextDelayNanos = Long.MAX_VALUE;
+ for (ShardRouting shard : routingTable.shardsWithState(ShardRoutingState.UNASSIGNED)) {
+ UnassignedInfo unassignedInfo = shard.unassignedInfo();
+ if (unassignedInfo.isDelayed()) {
+ Settings indexSettings = metaData.index(shard.index()).getSettings();
+ // calculate next time to schedule
+ final long newComputedLeftDelayNanos = unassignedInfo.getRemainingDelay(currentNanoTime, indexSettings);
+ if (newComputedLeftDelayNanos < nextDelayNanos) {
+ nextDelayNanos = newComputedLeftDelayNanos;
}
}
}
- return minDelaySetting == Long.MAX_VALUE ? 0L : minDelaySetting;
- }
-
-
- /**
- * Finds the next (closest) delay expiration of an unassigned shard in nanoseconds. Returns 0 if there are none.
- */
- public static long findNextDelayedAllocationIn(ClusterState state) {
- long nextDelay = Long.MAX_VALUE;
- for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) {
- if (shard.primary() == false) {
- long nextShardDelay = shard.unassignedInfo().getLastComputedLeftDelayNanos();
- if (nextShardDelay > 0 && nextShardDelay < nextDelay) {
- nextDelay = nextShardDelay;
- }
- }
- }
- return nextDelay == Long.MAX_VALUE ? 0L : nextDelay;
+ return nextDelayNanos == Long.MAX_VALUE ? -1L : nextDelayNanos;
}
public String shortSummary() {
@@ -343,6 +297,7 @@ public final class UnassignedInfo implements ToXContent, Writeable {
if (failedAllocations > 0) {
sb.append(", failed_attempts[").append(failedAllocations).append("]");
}
+ sb.append(", delayed=").append(delayed);
String details = getDetails();
if (details != null) {
@@ -364,6 +319,7 @@ public final class UnassignedInfo implements ToXContent, Writeable {
if (failedAllocations > 0) {
builder.field("failed_attempts", failedAllocations);
}
+ builder.field("delayed", delayed);
String details = getDetails();
if (details != null) {
builder.field("details", details);
@@ -386,6 +342,12 @@ public final class UnassignedInfo implements ToXContent, Writeable {
if (unassignedTimeMillis != that.unassignedTimeMillis) {
return false;
}
+ if (delayed != that.delayed) {
+ return false;
+ }
+ if (failedAllocations != that.failedAllocations) {
+ return false;
+ }
if (reason != that.reason) {
return false;
}
@@ -393,12 +355,13 @@ public final class UnassignedInfo implements ToXContent, Writeable {
return false;
}
return !(failure != null ? !failure.equals(that.failure) : that.failure != null);
-
}
@Override
public int hashCode() {
int result = reason != null ? reason.hashCode() : 0;
+ result = 31 * result + Boolean.hashCode(delayed);
+ result = 31 * result + Integer.hashCode(failedAllocations);
result = 31 * result + Long.hashCode(unassignedTimeMillis);
result = 31 * result + (message != null ? message.hashCode() : 0);
result = 31 * result + (failure != null ? failure.hashCode() : 0);
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java
index d59113675d8..7dd9b1f8fec 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/AllocationService.java
@@ -53,6 +53,8 @@ import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
+import static org.elasticsearch.cluster.routing.UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING;
+
/**
* This service manages the node allocation of a cluster. For this reason the
@@ -90,7 +92,7 @@ public class AllocationService extends AbstractComponent {
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
- StartedRerouteAllocation allocation = new StartedRerouteAllocation(allocationDeciders, routingNodes, clusterState, startedShards, clusterInfoService.getClusterInfo());
+ StartedRerouteAllocation allocation = new StartedRerouteAllocation(allocationDeciders, routingNodes, clusterState, startedShards, clusterInfoService.getClusterInfo(), currentNanoTime());
boolean changed = applyStartedShards(routingNodes, startedShards);
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
@@ -99,28 +101,27 @@ public class AllocationService extends AbstractComponent {
if (withReroute) {
reroute(allocation);
}
- final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes);
-
String startedShardsAsString = firstListElementsToCommaDelimitedString(startedShards, s -> s.shardId().toString());
- logClusterHealthStateChange(
- new ClusterStateHealth(clusterState),
- new ClusterStateHealth(clusterState.metaData(), result.routingTable()),
- "shards started [" + startedShardsAsString + "] ..."
- );
- return result;
+ return buildResultAndLogHealthChange(allocation, "shards started [" + startedShardsAsString + "] ...");
+ }
+
+ protected RoutingAllocation.Result buildResultAndLogHealthChange(RoutingAllocation allocation, String reason) {
+ return buildResultAndLogHealthChange(allocation, reason, new RoutingExplanations());
}
- protected RoutingAllocation.Result buildChangedResult(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingNodes newRoutingNodes) {
- return buildChangedResult(oldMetaData, oldRoutingTable, newRoutingNodes, new RoutingExplanations());
-
- }
-
- protected RoutingAllocation.Result buildChangedResult(MetaData oldMetaData, RoutingTable oldRoutingTable, RoutingNodes newRoutingNodes,
- RoutingExplanations explanations) {
+ protected RoutingAllocation.Result buildResultAndLogHealthChange(RoutingAllocation allocation, String reason, RoutingExplanations explanations) {
+ MetaData oldMetaData = allocation.metaData();
+ RoutingTable oldRoutingTable = allocation.routingTable();
+ RoutingNodes newRoutingNodes = allocation.routingNodes();
final RoutingTable newRoutingTable = new RoutingTable.Builder().updateNodes(newRoutingNodes).build();
MetaData newMetaData = updateMetaDataWithRoutingTable(oldMetaData, oldRoutingTable, newRoutingTable);
assert newRoutingTable.validate(newMetaData); // validates the routing table is coherent with the cluster state metadata
+ logClusterHealthStateChange(
+ new ClusterStateHealth(allocation.metaData(), allocation.routingTable()),
+ new ClusterStateHealth(newMetaData, newRoutingTable),
+ reason
+ );
return new RoutingAllocation.Result(true, newRoutingTable, newMetaData, explanations);
}
@@ -216,7 +217,8 @@ public class AllocationService extends AbstractComponent {
RoutingNodes routingNodes = getMutableRoutingNodes(clusterState);
// shuffle the unassigned nodes, just so we won't have things like poison failed shards
routingNodes.unassigned().shuffle();
- FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState, failedShards, clusterInfoService.getClusterInfo());
+ long currentNanoTime = currentNanoTime();
+ FailedRerouteAllocation allocation = new FailedRerouteAllocation(allocationDeciders, routingNodes, clusterState, failedShards, clusterInfoService.getClusterInfo(), currentNanoTime);
boolean changed = false;
// as failing primaries also fail associated replicas, we fail replicas first here so that their nodes are added to ignore list
List orderedFailedShards = new ArrayList<>(failedShards);
@@ -225,21 +227,38 @@ public class AllocationService extends AbstractComponent {
UnassignedInfo unassignedInfo = failedShard.shard.unassignedInfo();
final int failedAllocations = unassignedInfo != null ? unassignedInfo.getNumFailedAllocations() : 0;
changed |= applyFailedShard(allocation, failedShard.shard, true, new UnassignedInfo(UnassignedInfo.Reason.ALLOCATION_FAILED, failedShard.message, failedShard.failure,
- failedAllocations + 1, System.nanoTime(), System.currentTimeMillis()));
+ failedAllocations + 1, currentNanoTime, System.currentTimeMillis(), false));
}
if (!changed) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
}
gatewayAllocator.applyFailedShards(allocation);
reroute(allocation);
- final RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes);
String failedShardsAsString = firstListElementsToCommaDelimitedString(failedShards, s -> s.shard.shardId().toString());
- logClusterHealthStateChange(
- new ClusterStateHealth(clusterState),
- new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()),
- "shards failed [" + failedShardsAsString + "] ..."
- );
- return result;
+ return buildResultAndLogHealthChange(allocation, "shards failed [" + failedShardsAsString + "] ...");
+ }
+
+ /**
+ * Removes delay markers from unassigned shards based on current time stamp. Returns true if markers were removed.
+ */
+ private boolean removeDelayMarkers(RoutingAllocation allocation) {
+ final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = allocation.routingNodes().unassigned().iterator();
+ final MetaData metaData = allocation.metaData();
+ boolean changed = false;
+ while (unassignedIterator.hasNext()) {
+ ShardRouting shardRouting = unassignedIterator.next();
+ UnassignedInfo unassignedInfo = shardRouting.unassignedInfo();
+ if (unassignedInfo.isDelayed()) {
+ final long newComputedLeftDelayNanos = unassignedInfo.getRemainingDelay(allocation.getCurrentNanoTime(),
+ metaData.getIndexSafe(shardRouting.index()).getSettings());
+ if (newComputedLeftDelayNanos == 0) {
+ changed = true;
+ unassignedIterator.updateUnassignedInfo(new UnassignedInfo(unassignedInfo.getReason(), unassignedInfo.getMessage(), unassignedInfo.getFailure(),
+ unassignedInfo.getNumFailedAllocations(), unassignedInfo.getUnassignedTimeInNanos(), unassignedInfo.getUnassignedTimeInMillis(), false));
+ }
+ }
+ }
+ return changed;
}
/**
@@ -276,13 +295,7 @@ public class AllocationService extends AbstractComponent {
// the assumption is that commands will move / act on shards (or fail through exceptions)
// so, there will always be shard "movements", so no need to check on reroute
reroute(allocation);
- RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes, explanations);
- logClusterHealthStateChange(
- new ClusterStateHealth(clusterState),
- new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()),
- "reroute commands"
- );
- return result;
+ return buildResultAndLogHealthChange(allocation, "reroute commands", explanations);
}
@@ -310,13 +323,7 @@ public class AllocationService extends AbstractComponent {
if (!reroute(allocation)) {
return new RoutingAllocation.Result(false, clusterState.routingTable(), clusterState.metaData());
}
- RoutingAllocation.Result result = buildChangedResult(clusterState.metaData(), clusterState.routingTable(), routingNodes);
- logClusterHealthStateChange(
- new ClusterStateHealth(clusterState),
- new ClusterStateHealth(clusterState.getMetaData(), result.routingTable()),
- reason
- );
- return result;
+ return buildResultAndLogHealthChange(allocation, reason);
}
private void logClusterHealthStateChange(ClusterStateHealth previousStateHealth, ClusterStateHealth newStateHealth, String reason) {
@@ -341,8 +348,7 @@ public class AllocationService extends AbstractComponent {
// now allocate all the unassigned to available nodes
if (allocation.routingNodes().unassigned().size() > 0) {
- updateLeftDelayOfUnassignedShards(allocation, settings);
-
+ changed |= removeDelayMarkers(allocation);
changed |= gatewayAllocator.allocateUnassigned(allocation);
}
@@ -351,22 +357,6 @@ public class AllocationService extends AbstractComponent {
return changed;
}
- // public for testing
- public static void updateLeftDelayOfUnassignedShards(RoutingAllocation allocation, Settings settings) {
- final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = allocation.routingNodes().unassigned().iterator();
- final MetaData metaData = allocation.metaData();
- while (unassignedIterator.hasNext()) {
- ShardRouting shardRouting = unassignedIterator.next();
- final IndexMetaData indexMetaData = metaData.getIndexSafe(shardRouting.index());
- UnassignedInfo previousUnassignedInfo = shardRouting.unassignedInfo();
- UnassignedInfo updatedUnassignedInfo = previousUnassignedInfo.updateDelay(allocation.getCurrentNanoTime(), settings,
- indexMetaData.getSettings());
- if (updatedUnassignedInfo != previousUnassignedInfo) { // reference equality!
- unassignedIterator.updateUnassignedInfo(updatedUnassignedInfo);
- }
- }
- }
-
private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) {
boolean changed = false;
final RoutingNodes routingNodes = allocation.routingNodes();
@@ -436,8 +426,10 @@ public class AllocationService extends AbstractComponent {
changed = true;
// now, go over all the shards routing on the node, and fail them
for (ShardRouting shardRouting : node.copyShards()) {
- UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]", null,
- 0, allocation.getCurrentNanoTime(), System.currentTimeMillis());
+ final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
+ boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0;
+ UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]",
+ null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed);
applyFailedShard(allocation, shardRouting, false, unassignedInfo);
}
// its a dead node, remove it, note, its important to remove it *after* we apply failed shard
@@ -458,7 +450,7 @@ public class AllocationService extends AbstractComponent {
for (ShardRouting routing : replicas) {
changed |= applyFailedShard(allocation, routing, false,
new UnassignedInfo(UnassignedInfo.Reason.PRIMARY_FAILED, "primary failed while replica initializing",
- null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
+ null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false));
}
return changed;
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java
index ef2e42eed76..b1b0dfce1fe 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/FailedRerouteAllocation.java
@@ -57,8 +57,8 @@ public class FailedRerouteAllocation extends RoutingAllocation {
private final List failedShards;
- public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, List failedShards, ClusterInfo clusterInfo) {
- super(deciders, routingNodes, clusterState, clusterInfo, System.nanoTime(), false);
+ public FailedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, List failedShards, ClusterInfo clusterInfo, long currentNanoTime) {
+ super(deciders, routingNodes, clusterState, clusterInfo, currentNanoTime, false);
this.failedShards = failedShards;
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java
index 0df8074e14c..584b141913f 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/RoutingAllocation.java
@@ -56,7 +56,7 @@ public class RoutingAllocation {
private final MetaData metaData;
- private RoutingExplanations explanations = new RoutingExplanations();
+ private final RoutingExplanations explanations;
/**
* Creates a new {@link RoutingAllocation.Result}
@@ -65,9 +65,7 @@ public class RoutingAllocation {
* @param metaData the {@link MetaData} this Result references
*/
public Result(boolean changed, RoutingTable routingTable, MetaData metaData) {
- this.changed = changed;
- this.routingTable = routingTable;
- this.metaData = metaData;
+ this(changed, routingTable, metaData, new RoutingExplanations());
}
/**
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java
index 0f55ab4fda1..4d1ac1408a2 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/StartedRerouteAllocation.java
@@ -35,8 +35,8 @@ public class StartedRerouteAllocation extends RoutingAllocation {
private final List extends ShardRouting> startedShards;
- public StartedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, List extends ShardRouting> startedShards, ClusterInfo clusterInfo) {
- super(deciders, routingNodes, clusterState, clusterInfo, System.nanoTime(), false);
+ public StartedRerouteAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, ClusterState clusterState, List extends ShardRouting> startedShards, ClusterInfo clusterInfo, long currentNanoTime) {
+ super(deciders, routingNodes, clusterState, clusterInfo, currentNanoTime, false);
this.startedShards = startedShards;
}
diff --git a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java
index c80afde3086..08be17a8e98 100644
--- a/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java
+++ b/core/src/main/java/org/elasticsearch/cluster/routing/allocation/command/AllocateEmptyPrimaryAllocationCommand.java
@@ -125,7 +125,7 @@ public class AllocateEmptyPrimaryAllocationCommand extends BasePrimaryAllocation
// we need to move the unassigned info back to treat it as if it was index creation
unassignedInfoToUpdate = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED,
"force empty allocation from previous reason " + shardRouting.unassignedInfo().getReason() + ", " + shardRouting.unassignedInfo().getMessage(),
- shardRouting.unassignedInfo().getFailure(), 0, System.nanoTime(), System.currentTimeMillis());
+ shardRouting.unassignedInfo().getFailure(), 0, System.nanoTime(), System.currentTimeMillis(), false);
}
initializeUnassignedShard(allocation, routingNodes, routingNode, shardRouting, unassignedInfoToUpdate);
diff --git a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java
index 15c2b5c3939..ac4830fe0ec 100644
--- a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java
+++ b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java
@@ -455,7 +455,7 @@ public class ClusterService extends AbstractLifecycleComponent {
}
/** asserts that the current thread is the cluster state update thread */
- public boolean assertClusterStateThread() {
+ public static boolean assertClusterStateThread() {
assert Thread.currentThread().getName().contains(ClusterService.UPDATE_THREAD_NAME) :
"not called from the cluster state update thread";
return true;
diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java
index ee96335c2b2..fdf084639d1 100644
--- a/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java
+++ b/core/src/main/java/org/elasticsearch/discovery/zen/NodeJoinController.java
@@ -89,7 +89,7 @@ public class NodeJoinController extends AbstractComponent {
assert accumulateJoins.get() : "waitToBeElectedAsMaster is called we are not accumulating joins";
final CountDownLatch done = new CountDownLatch(1);
- final ElectionContext newContext = new ElectionContext(callback, requiredMasterJoins, clusterService) {
+ final ElectionContext newContext = new ElectionContext(callback, requiredMasterJoins) {
@Override
void onClose() {
if (electionContext.compareAndSet(this, null)) {
@@ -307,16 +307,14 @@ public class NodeJoinController extends AbstractComponent {
static abstract class ElectionContext implements ElectionCallback {
private final ElectionCallback callback;
private final int requiredMasterJoins;
- private final ClusterService clusterService;
/** set to true after enough joins have been seen and a cluster update task is submitted to become master */
final AtomicBoolean pendingSetAsMasterTask = new AtomicBoolean();
final AtomicBoolean closed = new AtomicBoolean();
- ElectionContext(ElectionCallback callback, int requiredMasterJoins, ClusterService clusterService) {
+ ElectionContext(ElectionCallback callback, int requiredMasterJoins) {
this.callback = callback;
this.requiredMasterJoins = requiredMasterJoins;
- this.clusterService = clusterService;
}
abstract void onClose();
@@ -324,7 +322,7 @@ public class NodeJoinController extends AbstractComponent {
@Override
public void onElectedAsMaster(ClusterState state) {
assert pendingSetAsMasterTask.get() : "onElectedAsMaster called but pendingSetAsMasterTask is not set";
- assertClusterStateThread();
+ ClusterService.assertClusterStateThread();
assert state.nodes().isLocalNodeElectedMaster() : "onElectedAsMaster called but local node is not master";
if (closed.compareAndSet(false, true)) {
try {
@@ -337,7 +335,7 @@ public class NodeJoinController extends AbstractComponent {
@Override
public void onFailure(Throwable t) {
- assertClusterStateThread();
+ ClusterService.assertClusterStateThread();
if (closed.compareAndSet(false, true)) {
try {
onClose();
@@ -346,10 +344,6 @@ public class NodeJoinController extends AbstractComponent {
}
}
}
-
- private void assertClusterStateThread() {
- assert clusterService instanceof ClusterService == false || ((ClusterService) clusterService).assertClusterStateThread();
- }
}
diff --git a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
index 9f0b194241b..4d2b9e7c5e7 100644
--- a/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
+++ b/core/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java
@@ -1145,14 +1145,14 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
/** cleans any running joining thread and calls {@link #rejoin} */
public ClusterState stopRunningThreadAndRejoin(ClusterState clusterState, String reason) {
- assertClusterStateThread();
+ ClusterService.assertClusterStateThread();
currentJoinThread.set(null);
return rejoin(clusterState, reason);
}
/** starts a new joining thread if there is no currently active one and join thread controlling is started */
public void startNewThreadIfNotRunning() {
- assertClusterStateThread();
+ ClusterService.assertClusterStateThread();
if (joinThreadActive()) {
return;
}
@@ -1185,7 +1185,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
* If the given thread is not the currently running join thread, the command is ignored.
*/
public void markThreadAsDoneAndStartNew(Thread joinThread) {
- assertClusterStateThread();
+ ClusterService.assertClusterStateThread();
if (!markThreadAsDone(joinThread)) {
return;
}
@@ -1194,7 +1194,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
/** marks the given joinThread as completed. Returns false if the supplied thread is not the currently active join thread */
public boolean markThreadAsDone(Thread joinThread) {
- assertClusterStateThread();
+ ClusterService.assertClusterStateThread();
return currentJoinThread.compareAndSet(joinThread, null);
}
@@ -1210,9 +1210,5 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen
running.set(true);
}
- private void assertClusterStateThread() {
- assert clusterService instanceof ClusterService == false || ((ClusterService) clusterService).assertClusterStateThread();
- }
-
}
}
diff --git a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java
index 8b6e425c26a..cf8a54dfc64 100644
--- a/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java
+++ b/core/src/main/java/org/elasticsearch/gateway/ReplicaShardAllocator.java
@@ -108,7 +108,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
currentNode, nodeWithHighestMatch);
it.moveToUnassigned(new UnassignedInfo(UnassignedInfo.Reason.REALLOCATED_REPLICA,
"existing allocation of replica to [" + currentNode + "] cancelled, sync id match found on node [" + nodeWithHighestMatch + "]",
- null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis()));
+ null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), false));
changed = true;
}
}
@@ -179,7 +179,7 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
}
} else if (matchingNodes.hasAnyData() == false) {
// if we didn't manage to find *any* data (regardless of matching sizes), check if the allocation of the replica shard needs to be delayed
- changed |= ignoreUnassignedIfDelayed(unassignedIterator, shard);
+ ignoreUnassignedIfDelayed(unassignedIterator, shard);
}
}
return changed;
@@ -195,21 +195,16 @@ public abstract class ReplicaShardAllocator extends AbstractComponent {
*
* @param unassignedIterator iterator over unassigned shards
* @param shard the shard which might be delayed
- * @return true iff allocation is delayed for this shard
*/
- public boolean ignoreUnassignedIfDelayed(RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator, ShardRouting shard) {
- // calculate delay and store it in UnassignedInfo to be used by RoutingService
- long delay = shard.unassignedInfo().getLastComputedLeftDelayNanos();
- if (delay > 0) {
- logger.debug("[{}][{}]: delaying allocation of [{}] for [{}]", shard.index(), shard.id(), shard, TimeValue.timeValueNanos(delay));
+ public void ignoreUnassignedIfDelayed(RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator, ShardRouting shard) {
+ if (shard.unassignedInfo().isDelayed()) {
+ logger.debug("{}: allocation of [{}] is delayed", shard.shardId(), shard);
/**
* mark it as changed, since we want to kick a publishing to schedule future allocation,
* see {@link org.elasticsearch.cluster.routing.RoutingService#clusterChanged(ClusterChangedEvent)}).
*/
unassignedIterator.removeAndIgnore();
- return true;
}
- return false;
}
/**
diff --git a/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanationTests.java b/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanationTests.java
index 511c547f233..d0e8ef14d01 100644
--- a/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanationTests.java
+++ b/core/src/test/java/org/elasticsearch/action/admin/cluster/allocation/ClusterAllocationExplanationTests.java
@@ -21,19 +21,18 @@ package org.elasticsearch.action.admin.cluster.allocation;
import org.apache.lucene.index.CorruptIndexException;
import org.elasticsearch.ElasticsearchException;
-import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.ShardRouting;
-import org.elasticsearch.cluster.routing.ShardRoutingHelper;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.DummyTransportAddress;
+import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
@@ -43,10 +42,8 @@ import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -61,13 +58,11 @@ public final class ClusterAllocationExplanationTests extends ESTestCase {
private Index i = new Index("foo", "uuid");
private ShardRouting primaryShard = ShardRouting.newUnassigned(new ShardId(i, 0), null, true,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
- private ShardRouting replicaShard = ShardRouting.newUnassigned(new ShardId(i, 0), null, false,
- new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo"));
private IndexMetaData indexMetaData = IndexMetaData.builder("foo")
.settings(Settings.builder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
.put(IndexMetaData.SETTING_INDEX_UUID, "uuid"))
- .putActiveAllocationIds(0, new HashSet(Arrays.asList("aid1", "aid2")))
+ .putActiveAllocationIds(0, Sets.newHashSet("aid1", "aid2"))
.numberOfShards(1)
.numberOfReplicas(1)
.build();
@@ -80,7 +75,6 @@ public final class ClusterAllocationExplanationTests extends ESTestCase {
noDecision.add(Decision.single(Decision.Type.NO, "no label", "no thanks"));
}
-
private void assertExplanations(NodeExplanation ne, String finalExplanation, ClusterAllocationExplanation.FinalDecision finalDecision,
ClusterAllocationExplanation.StoreCopy storeCopy) {
assertEquals(finalExplanation, ne.getFinalExplanation());
@@ -195,6 +189,7 @@ public final class ClusterAllocationExplanationTests extends ESTestCase {
public void testExplanationSerialization() throws Exception {
ShardId shard = new ShardId("test", "uuid", 0);
+ long allocationDelay = randomIntBetween(0, 500);
long remainingDelay = randomIntBetween(0, 500);
Map nodeExplanations = new HashMap<>(1);
Float nodeWeight = randomFloat();
@@ -207,7 +202,7 @@ public final class ClusterAllocationExplanationTests extends ESTestCase {
yesDecision, nodeWeight, storeStatus, "", activeAllocationIds, false);
nodeExplanations.put(ne.getNode(), ne);
ClusterAllocationExplanation cae = new ClusterAllocationExplanation(shard, true,
- "assignedNode", remainingDelay, null, false, nodeExplanations);
+ "assignedNode", allocationDelay, remainingDelay, null, false, nodeExplanations);
BytesStreamOutput out = new BytesStreamOutput();
cae.writeTo(out);
StreamInput in = StreamInput.wrap(out.bytes());
@@ -217,6 +212,7 @@ public final class ClusterAllocationExplanationTests extends ESTestCase {
assertTrue(cae2.isAssigned());
assertEquals("assignedNode", cae2.getAssignedNodeId());
assertNull(cae2.getUnassignedInfo());
+ assertEquals(allocationDelay, cae2.getAllocationDelayMillis());
assertEquals(remainingDelay, cae2.getRemainingDelayMillis());
for (Map.Entry entry : cae2.getNodeExplanations().entrySet()) {
DiscoveryNode node = entry.getKey();
@@ -230,7 +226,6 @@ public final class ClusterAllocationExplanationTests extends ESTestCase {
public void testExplanationToXContent() throws Exception {
ShardId shardId = new ShardId("foo", "uuid", 0);
- long remainingDelay = 42;
Decision.Multi d = new Decision.Multi();
d.add(Decision.single(Decision.Type.NO, "no label", "because I said no"));
d.add(Decision.single(Decision.Type.YES, "yes label", "yes please"));
@@ -245,7 +240,7 @@ public final class ClusterAllocationExplanationTests extends ESTestCase {
Map nodeExplanations = new HashMap<>(1);
nodeExplanations.put(ne.getNode(), ne);
ClusterAllocationExplanation cae = new ClusterAllocationExplanation(shardId, true,
- "assignedNode", remainingDelay, null, false, nodeExplanations);
+ "assignedNode", 42, 42, null, false, nodeExplanations);
XContentBuilder builder = XContentFactory.jsonBuilder();
cae.toXContent(builder, ToXContent.EMPTY_PARAMS);
assertEquals("{\"shard\":{\"index\":\"foo\",\"index_uuid\":\"uuid\",\"id\":0,\"primary\":true},\"assigned\":true," +
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java b/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java
index eda6bca80f7..88f09a55c2e 100644
--- a/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationIT.java
@@ -108,8 +108,6 @@ public class DelayedAllocationIT extends ESIntegTestCase {
* allocation to a very small value, it kicks the allocation of the unassigned shard
* even though the node it was hosted on will not come back.
*/
- @AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/18293")
- @TestLogging("_root:DEBUG,cluster.routing:TRACE")
public void testDelayedAllocationChangeWithSettingTo100ms() throws Exception {
internalCluster().startNodesAsync(3).get();
prepareCreate("test").setSettings(Settings.builder()
@@ -145,12 +143,7 @@ public class DelayedAllocationIT extends ESIntegTestCase {
ensureGreen("test");
indexRandomData();
internalCluster().stopRandomNode(InternalTestCluster.nameFilter(findNodeWithShard()));
- assertBusy(new Runnable() {
- @Override
- public void run() {
- assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true));
- }
- });
+ assertBusy(() -> assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true)));
assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1));
assertAcked(client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueMillis(0))).get());
ensureGreen("test");
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java
new file mode 100644
index 00000000000..be8e85c0fa8
--- /dev/null
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/DelayedAllocationServiceTests.java
@@ -0,0 +1,510 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.cluster.routing;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.cluster.ClusterChangedEvent;
+import org.elasticsearch.cluster.ClusterInfoService;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.ClusterStateUpdateTask;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.cluster.node.DiscoveryNodes;
+import org.elasticsearch.cluster.routing.allocation.AllocationService;
+import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
+import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
+import org.elasticsearch.cluster.service.ClusterService;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.gateway.GatewayAllocator;
+import org.elasticsearch.test.ESAllocationTestCase;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.junit.After;
+import org.junit.Before;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.Collections.singleton;
+import static org.elasticsearch.cluster.routing.DelayedAllocationService.CLUSTER_UPDATE_TASK_SOURCE;
+import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
+import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
+import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
+import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds;
+import static org.hamcrest.Matchers.equalTo;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+/**
+ */
+public class DelayedAllocationServiceTests extends ESAllocationTestCase {
+
+ private TestDelayAllocationService delayedAllocationService;
+ private MockAllocationService allocationService;
+ private ClusterService clusterService;
+ private ThreadPool threadPool;
+
+ @Before
+ public void createDelayedAllocationService() {
+ threadPool = new ThreadPool(getTestName());
+ clusterService = mock(ClusterService.class);
+ allocationService = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
+ delayedAllocationService = new TestDelayAllocationService(Settings.EMPTY, threadPool, clusterService, allocationService);
+ verify(clusterService).addFirst(delayedAllocationService);
+ }
+
+ @After
+ public void shutdownThreadPool() throws Exception {
+ terminate(threadPool);
+ }
+
+ public void testNoDelayedUnassigned() throws Exception {
+ MetaData metaData = MetaData.builder()
+ .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)
+ .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0"))
+ .numberOfShards(1).numberOfReplicas(1))
+ .build();
+ ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
+ .metaData(metaData)
+ .routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build();
+ clusterState = ClusterState.builder(clusterState)
+ .nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).localNodeId("node1").masterNodeId("node1"))
+ .build();
+ clusterState = ClusterState.builder(clusterState).routingResult(allocationService.reroute(clusterState, "reroute")).build();
+ // starting primaries
+ clusterState = ClusterState.builder(clusterState)
+ .routingResult(allocationService.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)))
+ .build();
+ // starting replicas
+ clusterState = ClusterState.builder(clusterState)
+ .routingResult(allocationService.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)))
+ .build();
+ assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
+ ClusterState prevState = clusterState;
+ // remove node2 and reroute
+ DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()).remove("node2");
+ boolean nodeAvailableForAllocation = randomBoolean();
+ if (nodeAvailableForAllocation) {
+ nodes.put(newNode("node3"));
+ }
+ clusterState = ClusterState.builder(clusterState).nodes(nodes).build();
+ clusterState = ClusterState.builder(clusterState).routingResult(allocationService.reroute(clusterState, "reroute")).build();
+ ClusterState newState = clusterState;
+ List unassignedShards = newState.getRoutingTable().shardsWithState(ShardRoutingState.UNASSIGNED);
+ if (nodeAvailableForAllocation) {
+ assertThat(unassignedShards.size(), equalTo(0));
+ } else {
+ assertThat(unassignedShards.size(), equalTo(1));
+ assertThat(unassignedShards.get(0).unassignedInfo().isDelayed(), equalTo(false));
+ }
+
+ delayedAllocationService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
+ verifyNoMoreInteractions(clusterService);
+ assertNull(delayedAllocationService.delayedRerouteTask.get());
+ }
+
+ public void testDelayedUnassignedScheduleReroute() throws Exception {
+ TimeValue delaySetting = timeValueMillis(100);
+ MetaData metaData = MetaData.builder()
+ .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)
+ .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delaySetting))
+ .numberOfShards(1).numberOfReplicas(1))
+ .build();
+ ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
+ .metaData(metaData)
+ .routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build();
+ clusterState = ClusterState.builder(clusterState)
+ .nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).localNodeId("node1").masterNodeId("node1"))
+ .build();
+ final long baseTimestampNanos = System.nanoTime();
+ allocationService.setNanoTimeOverride(baseTimestampNanos);
+ clusterState = ClusterState.builder(clusterState).routingResult(allocationService.reroute(clusterState, "reroute")).build();
+ // starting primaries
+ clusterState = ClusterState.builder(clusterState)
+ .routingResult(allocationService.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)))
+ .build();
+ // starting replicas
+ clusterState = ClusterState.builder(clusterState)
+ .routingResult(allocationService.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)))
+ .build();
+ assertFalse("no shards should be unassigned", clusterState.getRoutingNodes().unassigned().size() > 0);
+ String nodeId = null;
+ final List allShards = clusterState.getRoutingNodes().routingTable().allShards("test");
+ // we need to find the node with the replica otherwise we will not reroute
+ for (ShardRouting shardRouting : allShards) {
+ if (shardRouting.primary() == false) {
+ nodeId = shardRouting.currentNodeId();
+ break;
+ }
+ }
+ assertNotNull(nodeId);
+
+ // remove node that has replica and reroute
+ clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(nodeId)).build();
+ clusterState = ClusterState.builder(clusterState).routingResult(allocationService.reroute(clusterState, "reroute")).build();
+ ClusterState stateWithDelayedShard = clusterState;
+ // make sure the replica is marked as delayed (i.e. not reallocated)
+ assertEquals(1, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithDelayedShard));
+ ShardRouting delayedShard = stateWithDelayedShard.getRoutingNodes().unassigned().iterator().next();
+ assertEquals(baseTimestampNanos, delayedShard.unassignedInfo().getUnassignedTimeInNanos());
+
+ // mock ClusterService.submitStateUpdateTask() method
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicReference clusterStateUpdateTask = new AtomicReference<>();
+ doAnswer(invocationOnMock -> {
+ clusterStateUpdateTask.set((ClusterStateUpdateTask)invocationOnMock.getArguments()[1]);
+ latch.countDown();
+ return null;
+ }).when(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), any(ClusterStateUpdateTask.class));
+ assertNull(delayedAllocationService.delayedRerouteTask.get());
+ long delayUntilClusterChangeEvent = TimeValue.timeValueNanos(randomInt((int)delaySetting.nanos() - 1)).nanos();
+ long clusterChangeEventTimestampNanos = baseTimestampNanos + delayUntilClusterChangeEvent;
+ delayedAllocationService.setNanoTimeOverride(clusterChangeEventTimestampNanos);
+ delayedAllocationService.clusterChanged(new ClusterChangedEvent("fake node left", stateWithDelayedShard, clusterState));
+
+ // check that delayed reroute task was created and registered with the proper settings
+ DelayedAllocationService.DelayedRerouteTask delayedRerouteTask = delayedAllocationService.delayedRerouteTask.get();
+ assertNotNull(delayedRerouteTask);
+ assertFalse(delayedRerouteTask.cancelScheduling.get());
+ assertThat(delayedRerouteTask.baseTimestampNanos, equalTo(clusterChangeEventTimestampNanos));
+ assertThat(delayedRerouteTask.nextDelay.nanos(),
+ equalTo(delaySetting.nanos() - (clusterChangeEventTimestampNanos - baseTimestampNanos)));
+
+ // check that submitStateUpdateTask() was invoked on the cluster service mock
+ assertTrue(latch.await(30, TimeUnit.SECONDS));
+ verify(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), eq(clusterStateUpdateTask.get()));
+
+ // advance the time on the allocation service to a timestamp that happened after the delayed scheduling
+ long nanoTimeForReroute = clusterChangeEventTimestampNanos + delaySetting.nanos() + timeValueMillis(randomInt(200)).nanos();
+ allocationService.setNanoTimeOverride(nanoTimeForReroute);
+ // apply cluster state
+ ClusterState stateWithRemovedDelay = clusterStateUpdateTask.get().execute(stateWithDelayedShard);
+ // check that shard is not delayed anymore
+ assertEquals(0, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithRemovedDelay));
+ // check that task is now removed
+ assertNull(delayedAllocationService.delayedRerouteTask.get());
+
+ // simulate calling listener (cluster change event)
+ delayedAllocationService.setNanoTimeOverride(nanoTimeForReroute + timeValueMillis(randomInt(200)).nanos());
+ delayedAllocationService.clusterChanged(
+ new ClusterChangedEvent(CLUSTER_UPDATE_TASK_SOURCE, stateWithRemovedDelay, stateWithDelayedShard));
+ // check that no new task is scheduled
+ assertNull(delayedAllocationService.delayedRerouteTask.get());
+ // check that no further cluster state update was submitted
+ verifyNoMoreInteractions(clusterService);
+ }
+
+ /**
+ * This tests that a new delayed reroute is scheduled right after a delayed reroute was run
+ */
+ public void testDelayedUnassignedScheduleRerouteAfterDelayedReroute() throws Exception {
+ TimeValue shortDelaySetting = timeValueMillis(100);
+ TimeValue longDelaySetting = TimeValue.timeValueSeconds(1);
+ MetaData metaData = MetaData.builder()
+ .put(IndexMetaData.builder("short_delay")
+ .settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), shortDelaySetting))
+ .numberOfShards(1).numberOfReplicas(1))
+ .put(IndexMetaData.builder("long_delay")
+ .settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), longDelaySetting))
+ .numberOfShards(1).numberOfReplicas(1))
+ .build();
+ ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData)
+ .routingTable(RoutingTable.builder().addAsNew(metaData.index("short_delay")).addAsNew(metaData.index("long_delay")).build())
+ .nodes(DiscoveryNodes.builder()
+ .put(newNode("node0", singleton(DiscoveryNode.Role.MASTER))).localNodeId("node0").masterNodeId("node0")
+ .put(newNode("node1")).put(newNode("node2")).put(newNode("node3")).put(newNode("node4"))).build();
+ // allocate shards
+ clusterState = ClusterState.builder(clusterState).routingResult(allocationService.reroute(clusterState, "reroute")).build();
+ // start primaries
+ clusterState = ClusterState.builder(clusterState)
+ .routingResult(allocationService.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)))
+ .build();
+ // start replicas
+ clusterState = ClusterState.builder(clusterState)
+ .routingResult(allocationService.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)))
+ .build();
+ assertThat("all shards should be started", clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4));
+
+ // find replica of short_delay
+ ShardRouting shortDelayReplica = null;
+ for (ShardRouting shardRouting : clusterState.getRoutingNodes().routingTable().allShards("short_delay")) {
+ if (shardRouting.primary() == false) {
+ shortDelayReplica = shardRouting;
+ break;
+ }
+ }
+ assertNotNull(shortDelayReplica);
+
+ // find replica of long_delay
+ ShardRouting longDelayReplica = null;
+ for (ShardRouting shardRouting : clusterState.getRoutingNodes().routingTable().allShards("long_delay")) {
+ if (shardRouting.primary() == false) {
+ longDelayReplica = shardRouting;
+ break;
+ }
+ }
+ assertNotNull(longDelayReplica);
+
+ final long baseTimestampNanos = System.nanoTime();
+
+ // remove node of shortDelayReplica and node of longDelayReplica and reroute
+ ClusterState clusterStateBeforeNodeLeft = clusterState;
+ clusterState = ClusterState.builder(clusterState)
+ .nodes(DiscoveryNodes.builder(clusterState.nodes())
+ .remove(shortDelayReplica.currentNodeId())
+ .remove(longDelayReplica.currentNodeId()))
+ .build();
+ // make sure both replicas are marked as delayed (i.e. not reallocated)
+ allocationService.setNanoTimeOverride(baseTimestampNanos);
+ clusterState = ClusterState.builder(clusterState).routingResult(allocationService.reroute(clusterState, "reroute")).build();
+ final ClusterState stateWithDelayedShards = clusterState;
+ assertEquals(2, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithDelayedShards));
+ RoutingNodes.UnassignedShards.UnassignedIterator iter = stateWithDelayedShards.getRoutingNodes().unassigned().iterator();
+ assertEquals(baseTimestampNanos, iter.next().unassignedInfo().getUnassignedTimeInNanos());
+ assertEquals(baseTimestampNanos, iter.next().unassignedInfo().getUnassignedTimeInNanos());
+
+ // mock ClusterService.submitStateUpdateTask() method
+ CountDownLatch latch1 = new CountDownLatch(1);
+ AtomicReference clusterStateUpdateTask1 = new AtomicReference<>();
+ doAnswer(invocationOnMock -> {
+ clusterStateUpdateTask1.set((ClusterStateUpdateTask)invocationOnMock.getArguments()[1]);
+ latch1.countDown();
+ return null;
+ }).when(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), any(ClusterStateUpdateTask.class));
+ assertNull(delayedAllocationService.delayedRerouteTask.get());
+ long delayUntilClusterChangeEvent = TimeValue.timeValueNanos(randomInt((int)shortDelaySetting.nanos() - 1)).nanos();
+ long clusterChangeEventTimestampNanos = baseTimestampNanos + delayUntilClusterChangeEvent;
+ delayedAllocationService.setNanoTimeOverride(clusterChangeEventTimestampNanos);
+ delayedAllocationService.clusterChanged(
+ new ClusterChangedEvent("fake node left", stateWithDelayedShards, clusterStateBeforeNodeLeft));
+
+ // check that delayed reroute task was created and registered with the proper settings
+ DelayedAllocationService.DelayedRerouteTask firstDelayedRerouteTask = delayedAllocationService.delayedRerouteTask.get();
+ assertNotNull(firstDelayedRerouteTask);
+ assertFalse(firstDelayedRerouteTask.cancelScheduling.get());
+ assertThat(firstDelayedRerouteTask.baseTimestampNanos, equalTo(clusterChangeEventTimestampNanos));
+ assertThat(firstDelayedRerouteTask.nextDelay.nanos(),
+ equalTo(UnassignedInfo.findNextDelayedAllocation(clusterChangeEventTimestampNanos, stateWithDelayedShards)));
+ assertThat(firstDelayedRerouteTask.nextDelay.nanos(),
+ equalTo(shortDelaySetting.nanos() - (clusterChangeEventTimestampNanos - baseTimestampNanos)));
+
+ // check that submitStateUpdateTask() was invoked on the cluster service mock
+ assertTrue(latch1.await(30, TimeUnit.SECONDS));
+ verify(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), eq(clusterStateUpdateTask1.get()));
+
+ // advance the time on the allocation service to a timestamp that happened after the delayed scheduling
+ long nanoTimeForReroute = clusterChangeEventTimestampNanos + shortDelaySetting.nanos() + timeValueMillis(randomInt(50)).nanos();
+ allocationService.setNanoTimeOverride(nanoTimeForReroute);
+ // apply cluster state
+ ClusterState stateWithOnlyOneDelayedShard = clusterStateUpdateTask1.get().execute(stateWithDelayedShards);
+ // check that shard is not delayed anymore
+ assertEquals(1, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithOnlyOneDelayedShard));
+ // check that task is now removed
+ assertNull(delayedAllocationService.delayedRerouteTask.get());
+
+ // mock ClusterService.submitStateUpdateTask() method again
+ CountDownLatch latch2 = new CountDownLatch(1);
+ AtomicReference clusterStateUpdateTask2 = new AtomicReference<>();
+ doAnswer(invocationOnMock -> {
+ clusterStateUpdateTask2.set((ClusterStateUpdateTask)invocationOnMock.getArguments()[1]);
+ latch2.countDown();
+ return null;
+ }).when(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), any(ClusterStateUpdateTask.class));
+ // simulate calling listener (cluster change event)
+ delayUntilClusterChangeEvent = timeValueMillis(randomInt(50)).nanos();
+ clusterChangeEventTimestampNanos = nanoTimeForReroute + delayUntilClusterChangeEvent;
+ delayedAllocationService.setNanoTimeOverride(clusterChangeEventTimestampNanos);
+ delayedAllocationService.clusterChanged(
+ new ClusterChangedEvent(CLUSTER_UPDATE_TASK_SOURCE, stateWithOnlyOneDelayedShard, stateWithDelayedShards));
+
+ // check that new delayed reroute task was created and registered with the proper settings
+ DelayedAllocationService.DelayedRerouteTask secondDelayedRerouteTask = delayedAllocationService.delayedRerouteTask.get();
+ assertNotNull(secondDelayedRerouteTask);
+ assertFalse(secondDelayedRerouteTask.cancelScheduling.get());
+ assertThat(secondDelayedRerouteTask.baseTimestampNanos, equalTo(clusterChangeEventTimestampNanos));
+ assertThat(secondDelayedRerouteTask.nextDelay.nanos(),
+ equalTo(UnassignedInfo.findNextDelayedAllocation(clusterChangeEventTimestampNanos, stateWithOnlyOneDelayedShard)));
+ assertThat(secondDelayedRerouteTask.nextDelay.nanos(),
+ equalTo(longDelaySetting.nanos() - (clusterChangeEventTimestampNanos - baseTimestampNanos)));
+
+ // check that submitStateUpdateTask() was invoked on the cluster service mock
+ assertTrue(latch2.await(30, TimeUnit.SECONDS));
+ verify(clusterService).submitStateUpdateTask(eq(CLUSTER_UPDATE_TASK_SOURCE), eq(clusterStateUpdateTask2.get()));
+
+ // advance the time on the allocation service to a timestamp that happened after the delayed scheduling
+ nanoTimeForReroute = clusterChangeEventTimestampNanos + longDelaySetting.nanos() + timeValueMillis(randomInt(50)).nanos();
+ allocationService.setNanoTimeOverride(nanoTimeForReroute);
+ // apply cluster state
+ ClusterState stateWithNoDelayedShards = clusterStateUpdateTask2.get().execute(stateWithOnlyOneDelayedShard);
+ // check that shard is not delayed anymore
+ assertEquals(0, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithNoDelayedShards));
+ // check that task is now removed
+ assertNull(delayedAllocationService.delayedRerouteTask.get());
+
+ // simulate calling listener (cluster change event)
+ delayedAllocationService.setNanoTimeOverride(nanoTimeForReroute + timeValueMillis(randomInt(50)).nanos());
+ delayedAllocationService.clusterChanged(
+ new ClusterChangedEvent(CLUSTER_UPDATE_TASK_SOURCE, stateWithNoDelayedShards, stateWithOnlyOneDelayedShard));
+ // check that no new task is scheduled
+ assertNull(delayedAllocationService.delayedRerouteTask.get());
+ // check that no further cluster state update was submitted
+ verifyNoMoreInteractions(clusterService);
+ }
+
+ public void testDelayedUnassignedScheduleRerouteRescheduledOnShorterDelay() throws Exception {
+ TimeValue delaySetting = timeValueSeconds(30);
+ TimeValue shorterDelaySetting = timeValueMillis(100);
+ MetaData metaData = MetaData.builder()
+ .put(IndexMetaData.builder("foo").settings(settings(Version.CURRENT)
+ .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), delaySetting))
+ .numberOfShards(1).numberOfReplicas(1))
+ .put(IndexMetaData.builder("bar").settings(settings(Version.CURRENT)
+ .put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), shorterDelaySetting))
+ .numberOfShards(1).numberOfReplicas(1))
+ .build();
+ ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
+ .metaData(metaData)
+ .routingTable(RoutingTable.builder()
+ .addAsNew(metaData.index("foo"))
+ .addAsNew(metaData.index("bar"))
+ .build()).build();
+ clusterState = ClusterState.builder(clusterState)
+ .nodes(DiscoveryNodes.builder()
+ .put(newNode("node1")).put(newNode("node2")).put(newNode("node3")).put(newNode("node4"))
+ .localNodeId("node1").masterNodeId("node1"))
+ .build();
+ final long nodeLeftTimestampNanos = System.nanoTime();
+ allocationService.setNanoTimeOverride(nodeLeftTimestampNanos);
+ clusterState = ClusterState.builder(clusterState).routingResult(allocationService.reroute(clusterState, "reroute")).build();
+ // starting primaries
+ clusterState = ClusterState.builder(clusterState)
+ .routingResult(allocationService.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)))
+ .build();
+ // starting replicas
+ clusterState = ClusterState.builder(clusterState)
+ .routingResult(allocationService.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING)))
+ .build();
+ assertFalse("no shards should be unassigned", clusterState.getRoutingNodes().unassigned().size() > 0);
+ String nodeIdOfFooReplica = null;
+ for (ShardRouting shardRouting : clusterState.getRoutingNodes().routingTable().allShards("foo")) {
+ if (shardRouting.primary() == false) {
+ nodeIdOfFooReplica = shardRouting.currentNodeId();
+ break;
+ }
+ }
+ assertNotNull(nodeIdOfFooReplica);
+
+ // remove node that has replica and reroute
+ clusterState = ClusterState.builder(clusterState).nodes(
+ DiscoveryNodes.builder(clusterState.nodes()).remove(nodeIdOfFooReplica)).build();
+ clusterState = ClusterState.builder(clusterState).routingResult(allocationService.reroute(clusterState, "fake node left")).build();
+ ClusterState stateWithDelayedShard = clusterState;
+ // make sure the replica is marked as delayed (i.e. not reallocated)
+ assertEquals(1, UnassignedInfo.getNumberOfDelayedUnassigned(stateWithDelayedShard));
+ ShardRouting delayedShard = stateWithDelayedShard.getRoutingNodes().unassigned().iterator().next();
+ assertEquals(nodeLeftTimestampNanos, delayedShard.unassignedInfo().getUnassignedTimeInNanos());
+
+ assertNull(delayedAllocationService.delayedRerouteTask.get());
+ long delayUntilClusterChangeEvent = TimeValue.timeValueNanos(randomInt((int)shorterDelaySetting.nanos() - 1)).nanos();
+ long clusterChangeEventTimestampNanos = nodeLeftTimestampNanos + delayUntilClusterChangeEvent;
+ delayedAllocationService.setNanoTimeOverride(clusterChangeEventTimestampNanos);
+ delayedAllocationService.clusterChanged(new ClusterChangedEvent("fake node left", stateWithDelayedShard, clusterState));
+
+ // check that delayed reroute task was created and registered with the proper settings
+ DelayedAllocationService.DelayedRerouteTask delayedRerouteTask = delayedAllocationService.delayedRerouteTask.get();
+ assertNotNull(delayedRerouteTask);
+ assertFalse(delayedRerouteTask.cancelScheduling.get());
+ assertThat(delayedRerouteTask.baseTimestampNanos, equalTo(clusterChangeEventTimestampNanos));
+ assertThat(delayedRerouteTask.nextDelay.nanos(),
+ equalTo(delaySetting.nanos() - (clusterChangeEventTimestampNanos - nodeLeftTimestampNanos)));
+
+ if (randomBoolean()) {
+ // update settings with shorter delay
+ ClusterState stateWithShorterDelay = ClusterState.builder(stateWithDelayedShard).metaData(MetaData.builder(
+ stateWithDelayedShard.metaData()).updateSettings(Settings.builder().put(
+ UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), shorterDelaySetting).build(), "foo")).build();
+ delayedAllocationService.setNanoTimeOverride(clusterChangeEventTimestampNanos);
+ delayedAllocationService.clusterChanged(
+ new ClusterChangedEvent("apply shorter delay", stateWithShorterDelay, stateWithDelayedShard));
+ } else {
+ // node leaves with replica shard of index bar that has shorter delay
+ String nodeIdOfBarReplica = null;
+ for (ShardRouting shardRouting : stateWithDelayedShard.getRoutingNodes().routingTable().allShards("bar")) {
+ if (shardRouting.primary() == false) {
+ nodeIdOfBarReplica = shardRouting.currentNodeId();
+ break;
+ }
+ }
+ assertNotNull(nodeIdOfBarReplica);
+
+ // remove node that has replica and reroute
+ clusterState = ClusterState.builder(stateWithDelayedShard).nodes(
+ DiscoveryNodes.builder(stateWithDelayedShard.nodes()).remove(nodeIdOfBarReplica)).build();
+ ClusterState stateWithShorterDelay = ClusterState.builder(clusterState).routingResult(
+ allocationService.reroute(clusterState, "fake node left")).build();
+ delayedAllocationService.setNanoTimeOverride(clusterChangeEventTimestampNanos);
+ delayedAllocationService.clusterChanged(
+ new ClusterChangedEvent("fake node left", stateWithShorterDelay, stateWithDelayedShard));
+ }
+
+ // check that delayed reroute task was replaced by shorter reroute task
+ DelayedAllocationService.DelayedRerouteTask shorterDelayedRerouteTask = delayedAllocationService.delayedRerouteTask.get();
+ assertNotNull(shorterDelayedRerouteTask);
+ assertNotEquals(shorterDelayedRerouteTask, delayedRerouteTask);
+ assertTrue(delayedRerouteTask.cancelScheduling.get()); // existing task was cancelled
+ assertFalse(shorterDelayedRerouteTask.cancelScheduling.get());
+ assertThat(delayedRerouteTask.baseTimestampNanos, equalTo(clusterChangeEventTimestampNanos));
+ assertThat(shorterDelayedRerouteTask.nextDelay.nanos(),
+ equalTo(shorterDelaySetting.nanos() - (clusterChangeEventTimestampNanos - nodeLeftTimestampNanos)));
+ }
+
+ private static class TestDelayAllocationService extends DelayedAllocationService {
+ private volatile long nanoTimeOverride = -1L;
+
+ public TestDelayAllocationService(Settings settings, ThreadPool threadPool, ClusterService clusterService,
+ AllocationService allocationService) {
+ super(settings, threadPool, clusterService, allocationService);
+ }
+
+ @Override
+ protected void assertClusterStateThread() {
+ // do not check this in the unit tests
+ }
+
+ public void setNanoTimeOverride(long nanoTime) {
+ this.nanoTimeOverride = nanoTime;
+ }
+
+ @Override
+ protected long currentNanoTime() {
+ return nanoTimeOverride == -1L ? super.currentNanoTime() : nanoTimeOverride;
+ }
+ }
+}
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java
index 68027312581..4006ed0e1db 100644
--- a/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/RoutingServiceTests.java
@@ -19,32 +19,12 @@
package org.elasticsearch.cluster.routing;
-import org.elasticsearch.Version;
-import org.elasticsearch.cluster.ClusterChangedEvent;
-import org.elasticsearch.cluster.ClusterName;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.cluster.metadata.MetaData;
-import org.elasticsearch.cluster.node.DiscoveryNode;
-import org.elasticsearch.cluster.node.DiscoveryNodes;
-import org.elasticsearch.cluster.routing.allocation.AllocationService;
-import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESAllocationTestCase;
-import org.elasticsearch.threadpool.ThreadPool;
-import org.junit.After;
import org.junit.Before;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
-import static java.util.Collections.singleton;
-import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
-import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
-import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
-import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.equalTo;
/**
@@ -58,191 +38,18 @@ public class RoutingServiceTests extends ESAllocationTestCase {
routingService = new TestRoutingService();
}
- @After
- public void shutdownRoutingService() throws Exception {
- routingService.shutdown();
- }
-
public void testReroute() {
assertThat(routingService.hasReroutedAndClear(), equalTo(false));
routingService.reroute("test");
assertThat(routingService.hasReroutedAndClear(), equalTo(true));
}
- public void testNoDelayedUnassigned() throws Exception {
- AllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
- MetaData metaData = MetaData.builder()
- .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "0"))
- .numberOfShards(1).numberOfReplicas(1))
- .build();
- ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
- .metaData(metaData)
- .routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build();
- clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).localNodeId("node1").masterNodeId("node1")).build();
- clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
- // starting primaries
- clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
- // starting replicas
- clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
- assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
- // remove node2 and reroute
- ClusterState prevState = clusterState;
- clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
- clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
- ClusterState newState = clusterState;
-
- assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(Long.MAX_VALUE));
- routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
- assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(Long.MAX_VALUE));
- assertThat(routingService.hasReroutedAndClear(), equalTo(false));
- }
-
- public void testDelayedUnassignedScheduleReroute() throws Exception {
- MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
- MetaData metaData = MetaData.builder()
- .put(IndexMetaData.builder("test").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms"))
- .numberOfShards(1).numberOfReplicas(1))
- .build();
- ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT)
- .metaData(metaData)
- .routingTable(RoutingTable.builder().addAsNew(metaData.index("test")).build()).build();
- clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).localNodeId("node1").masterNodeId("node1")).build();
- clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
- // starting primaries
- clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
- // starting replicas
- clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
- assertFalse("no shards should be unassigned", clusterState.getRoutingNodes().unassigned().size() > 0);
- String nodeId = null;
- final List allShards = clusterState.getRoutingNodes().routingTable().allShards("test");
- // we need to find the node with the replica otherwise we will not reroute
- for (ShardRouting shardRouting : allShards) {
- if (shardRouting.primary() == false) {
- nodeId = shardRouting.currentNodeId();
- break;
- }
- }
- assertNotNull(nodeId);
-
- // remove nodeId and reroute
- ClusterState prevState = clusterState;
- clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(nodeId)).build();
- // make sure the replica is marked as delayed (i.e. not reallocated)
- clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
- assertEquals(1, clusterState.getRoutingNodes().unassigned().size());
-
- ClusterState newState = clusterState;
- routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
- assertBusy(() -> assertTrue("routing service should have run a reroute", routingService.hasReroutedAndClear()));
- // verify the registration has been reset
- assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(Long.MAX_VALUE));
- }
-
- /**
- * This tests that a new delayed reroute is scheduled right after a delayed reroute was run
- */
- public void testDelayedUnassignedScheduleRerouteAfterDelayedReroute() throws Exception {
- final ThreadPool testThreadPool = new ThreadPool(getTestName());
- ClusterService clusterService = createClusterService(testThreadPool);
- try {
- MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
- MetaData metaData = MetaData.builder()
- .put(IndexMetaData.builder("short_delay").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "100ms"))
- .numberOfShards(1).numberOfReplicas(1))
- .put(IndexMetaData.builder("long_delay").settings(settings(Version.CURRENT).put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "10s"))
- .numberOfShards(1).numberOfReplicas(1))
- .build();
- ClusterState clusterState = ClusterState.builder(ClusterName.DEFAULT).metaData(metaData)
- .routingTable(RoutingTable.builder().addAsNew(metaData.index("short_delay")).addAsNew(metaData.index("long_delay")).build())
- .nodes(DiscoveryNodes.builder()
- .put(newNode("node0", singleton(DiscoveryNode.Role.MASTER))).localNodeId("node0").masterNodeId("node0")
- .put(newNode("node1")).put(newNode("node2")).put(newNode("node3")).put(newNode("node4"))).build();
- // allocate shards
- clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
- // start primaries
- clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
- // start replicas
- clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
- assertThat("all shards should be started", clusterState.getRoutingNodes().shardsWithState(STARTED).size(), equalTo(4));
-
- // find replica of short_delay
- ShardRouting shortDelayReplica = null;
- for (ShardRouting shardRouting : clusterState.getRoutingNodes().routingTable().allShards("short_delay")) {
- if (shardRouting.primary() == false) {
- shortDelayReplica = shardRouting;
- break;
- }
- }
- assertNotNull(shortDelayReplica);
-
- // find replica of long_delay
- ShardRouting longDelayReplica = null;
- for (ShardRouting shardRouting : clusterState.getRoutingNodes().routingTable().allShards("long_delay")) {
- if (shardRouting.primary() == false) {
- longDelayReplica = shardRouting;
- break;
- }
- }
- assertNotNull(longDelayReplica);
-
- final long baseTime = System.nanoTime();
-
- // remove node of shortDelayReplica and node of longDelayReplica and reroute
- ClusterState prevState = clusterState;
- clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove(shortDelayReplica.currentNodeId()).remove(longDelayReplica.currentNodeId())).build();
- // make sure both replicas are marked as delayed (i.e. not reallocated)
- allocation.setNanoTimeOverride(baseTime);
- clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
-
- // check that shortDelayReplica and longDelayReplica have been marked unassigned
- RoutingNodes.UnassignedShards unassigned = clusterState.getRoutingNodes().unassigned();
- assertEquals(2, unassigned.size());
- // update shortDelayReplica and longDelayReplica variables with new shard routing
- ShardRouting shortDelayUnassignedReplica = null;
- ShardRouting longDelayUnassignedReplica = null;
- for (ShardRouting shr : unassigned) {
- if (shr.getIndexName().equals("short_delay")) {
- shortDelayUnassignedReplica = shr;
- } else {
- longDelayUnassignedReplica = shr;
- }
- }
- assertTrue(shortDelayReplica.isSameShard(shortDelayUnassignedReplica));
- assertTrue(longDelayReplica.isSameShard(longDelayUnassignedReplica));
-
- // manually trigger a clusterChanged event on routingService
- ClusterState newState = clusterState;
- setState(clusterService, newState);
- // create routing service, also registers listener on cluster service
- RoutingService routingService = new RoutingService(Settings.EMPTY, testThreadPool, clusterService, allocation);
- routingService.start(); // just so performReroute does not prematurely return
- // next (delayed) reroute should only delay longDelayReplica/longDelayUnassignedReplica, simulate that we are now 1 second after shards became unassigned
- allocation.setNanoTimeOverride(baseTime + TimeValue.timeValueSeconds(1).nanos());
- // register listener on cluster state so we know when cluster state has been changed
- CountDownLatch latch = new CountDownLatch(1);
- clusterService.addLast(event -> latch.countDown());
- // instead of clusterService calling clusterChanged, we call it directly here
- routingService.clusterChanged(new ClusterChangedEvent("test", newState, prevState));
- // cluster service should have updated state and called routingService with clusterChanged
- latch.await();
- // verify the registration has been set to the delay of longDelayReplica/longDelayUnassignedReplica
- assertThat(routingService.getMinDelaySettingAtLastSchedulingNanos(), equalTo(TimeValue.timeValueSeconds(10).nanos()));
- } finally {
- clusterService.stop();
- terminate(testThreadPool);
- }
- }
-
private class TestRoutingService extends RoutingService {
private AtomicBoolean rerouted = new AtomicBoolean();
public TestRoutingService() {
- super(Settings.EMPTY, new ThreadPool(getTestName()), null, null);
- }
-
- void shutdown() throws Exception {
- terminate(threadPool);
+ super(Settings.EMPTY, null, null);
}
public boolean hasReroutedAndClear() {
diff --git a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java
index f46224570b0..a0ef14ee98d 100644
--- a/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java
+++ b/core/src/test/java/org/elasticsearch/cluster/routing/UnassignedInfoTests.java
@@ -38,7 +38,6 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.test.ESAllocationTestCase;
import java.util.Collections;
-import java.util.EnumSet;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
@@ -75,7 +74,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
public void testSerialization() throws Exception {
UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), UnassignedInfo.Reason.values());
UnassignedInfo meta = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ?
- new UnassignedInfo(reason, randomBoolean() ? randomAsciiOfLength(4) : null, null, randomIntBetween(1, 100), System.nanoTime(), System.currentTimeMillis()):
+ new UnassignedInfo(reason, randomBoolean() ? randomAsciiOfLength(4) : null, null, randomIntBetween(1, 100), System.nanoTime(), System.currentTimeMillis(), false):
new UnassignedInfo(reason, randomBoolean() ? randomAsciiOfLength(4) : null);
BytesStreamOutput out = new BytesStreamOutput();
meta.writeTo(out);
@@ -262,59 +261,20 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
/**
* Verifies that delayed allocation calculation are correct.
*/
- public void testUnassignedDelayedOnlyOnNodeLeft() throws Exception {
- UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, null);
- unassignedInfo = unassignedInfo.updateDelay(unassignedInfo.getUnassignedTimeInNanos() + 1, // add 1 tick delay
- Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "10h").build(), Settings.EMPTY);
- long delay = unassignedInfo.getLastComputedLeftDelayNanos();
- long cachedDelay = unassignedInfo.getLastComputedLeftDelayNanos();
- assertThat(delay, equalTo(cachedDelay));
- assertThat(delay, equalTo(TimeValue.timeValueHours(10).nanos() - 1));
- }
-
- /**
- * Verifies that delayed allocation is only computed when the reason is NODE_LEFT.
- */
- public void testUnassignedDelayOnlyNodeLeftNonNodeLeftReason() throws Exception {
- EnumSet reasons = EnumSet.allOf(UnassignedInfo.Reason.class);
- reasons.remove(UnassignedInfo.Reason.NODE_LEFT);
- UnassignedInfo.Reason reason = RandomPicks.randomFrom(random(), reasons);
- UnassignedInfo unassignedInfo = reason == UnassignedInfo.Reason.ALLOCATION_FAILED ?
- new UnassignedInfo(reason, null, null, 1, System.nanoTime(), System.currentTimeMillis()):
- new UnassignedInfo(reason, null);
- unassignedInfo = unassignedInfo.updateDelay(unassignedInfo.getUnassignedTimeInNanos() + 1, // add 1 tick delay
- Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), "10h").build(), Settings.EMPTY);
- long delay = unassignedInfo.getLastComputedLeftDelayNanos();
- assertThat(delay, equalTo(0L));
- delay = unassignedInfo.getLastComputedLeftDelayNanos();
- assertThat(delay, equalTo(0L));
- }
-
- /**
- * Verifies that delayed allocation calculation are correct.
- */
- public void testLeftDelayCalculation() throws Exception {
+ public void testRemainingDelayCalculation() throws Exception {
final long baseTime = System.nanoTime();
- UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test", null, 0, baseTime, System.currentTimeMillis());
+ UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "test", null, 0, baseTime, System.currentTimeMillis(), randomBoolean());
final long totalDelayNanos = TimeValue.timeValueMillis(10).nanos();
- final Settings settings = Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueNanos(totalDelayNanos)).build();
- unassignedInfo = unassignedInfo.updateDelay(baseTime, settings, Settings.EMPTY);
- long delay = unassignedInfo.getLastComputedLeftDelayNanos();
+ final Settings indexSettings = Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueNanos(totalDelayNanos)).build();
+ long delay = unassignedInfo.getRemainingDelay(baseTime, indexSettings);
assertThat(delay, equalTo(totalDelayNanos));
- assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos()));
long delta1 = randomIntBetween(1, (int) (totalDelayNanos - 1));
- unassignedInfo = unassignedInfo.updateDelay(baseTime + delta1, settings, Settings.EMPTY);
- delay = unassignedInfo.getLastComputedLeftDelayNanos();
+ delay = unassignedInfo.getRemainingDelay(baseTime + delta1, indexSettings);
assertThat(delay, equalTo(totalDelayNanos - delta1));
- assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos()));
- unassignedInfo = unassignedInfo.updateDelay(baseTime + totalDelayNanos, settings, Settings.EMPTY);
- delay = unassignedInfo.getLastComputedLeftDelayNanos();
+ delay = unassignedInfo.getRemainingDelay(baseTime + totalDelayNanos, indexSettings);
assertThat(delay, equalTo(0L));
- assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos()));
- unassignedInfo = unassignedInfo.updateDelay(baseTime + totalDelayNanos + randomIntBetween(1, 20), settings, Settings.EMPTY);
- delay = unassignedInfo.getLastComputedLeftDelayNanos();
+ delay = unassignedInfo.getRemainingDelay(baseTime + totalDelayNanos + randomIntBetween(1, 20), indexSettings);
assertThat(delay, equalTo(0L));
- assertThat(delay, equalTo(unassignedInfo.getLastComputedLeftDelayNanos()));
}
@@ -344,8 +304,6 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
public void testFindNextDelayedAllocation() {
MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator());
- final long baseTime = System.nanoTime();
- allocation.setNanoTimeOverride(baseTime);
final TimeValue delayTest1 = TimeValue.timeValueMillis(randomIntBetween(1, 200));
final TimeValue delayTest2 = TimeValue.timeValueMillis(randomIntBetween(1, 200));
final long expectMinDelaySettingsNanos = Math.min(delayTest1.nanos(), delayTest2.nanos());
@@ -366,20 +324,18 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
// remove node2 and reroute
+ final long baseTime = System.nanoTime();
+ allocation.setNanoTimeOverride(baseTime);
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "reroute")).build();
- final long delta = randomBoolean() ? 0 : randomInt((int) expectMinDelaySettingsNanos);
+ final long delta = randomBoolean() ? 0 : randomInt((int) expectMinDelaySettingsNanos - 1);
if (delta > 0) {
allocation.setNanoTimeOverride(baseTime + delta);
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState, "time moved")).build();
}
- long minDelaySetting = UnassignedInfo.findSmallestDelayedAllocationSettingNanos(Settings.EMPTY, clusterState);
- assertThat(minDelaySetting, equalTo(expectMinDelaySettingsNanos));
-
- long nextDelay = UnassignedInfo.findNextDelayedAllocationIn(clusterState);
- assertThat(nextDelay, equalTo(expectMinDelaySettingsNanos - delta));
+ assertThat(UnassignedInfo.findNextDelayedAllocation(baseTime + delta, clusterState), equalTo(expectMinDelaySettingsNanos - delta));
}
}
diff --git a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java
index 9013d99678d..651eab421ad 100644
--- a/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java
+++ b/core/src/test/java/org/elasticsearch/discovery/zen/NodeJoinControllerTests.java
@@ -521,7 +521,7 @@ public class NodeJoinControllerTests extends ESTestCase {
static class NoopRoutingService extends RoutingService {
public NoopRoutingService(Settings settings) {
- super(settings, null, null, new NoopAllocationService(settings));
+ super(settings, null, new NoopAllocationService(settings));
}
@Override
diff --git a/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java b/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java
index 20eb6286813..82004448658 100644
--- a/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java
+++ b/core/src/test/java/org/elasticsearch/gateway/ReplicaShardAllocatorTests.java
@@ -233,16 +233,14 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
// we sometime return empty list of files, make sure we test this as well
testAllocator.addData(node2, false, null);
}
- AllocationService.updateLeftDelayOfUnassignedShards(allocation, Settings.EMPTY);
boolean changed = testAllocator.allocateUnassigned(allocation);
- assertThat(changed, equalTo(true));
+ assertThat(changed, equalTo(false));
assertThat(allocation.routingNodes().unassigned().ignored().size(), equalTo(1));
assertThat(allocation.routingNodes().unassigned().ignored().get(0).shardId(), equalTo(shardId));
allocation = onePrimaryOnNode1And1Replica(yesAllocationDeciders(),
Settings.builder().put(UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.getKey(), TimeValue.timeValueHours(1)).build(), UnassignedInfo.Reason.NODE_LEFT);
testAllocator.addData(node2, false, "MATCH", new StoreFileMetaData("file1", 10, "MATCH_CHECKSUM"));
- AllocationService.updateLeftDelayOfUnassignedShards(allocation, Settings.EMPTY);
changed = testAllocator.allocateUnassigned(allocation);
assertThat(changed, equalTo(true));
assertThat(allocation.routingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(1));
@@ -290,11 +288,15 @@ public class ReplicaShardAllocatorTests extends ESAllocationTestCase {
.numberOfShards(1).numberOfReplicas(1)
.putActiveAllocationIds(0, Sets.newHashSet(primaryShard.allocationId().getId())))
.build();
+ // mark shard as delayed if reason is NODE_LEFT
+ boolean delayed = reason == UnassignedInfo.Reason.NODE_LEFT &&
+ UnassignedInfo.INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(settings).nanos() > 0;
RoutingTable routingTable = RoutingTable.builder()
.add(IndexRoutingTable.builder(shardId.getIndex())
.addIndexShard(new IndexShardRoutingTable.Builder(shardId)
.addShard(primaryShard)
- .addShard(ShardRouting.newUnassigned(shardId, null, false, new UnassignedInfo(reason, null)))
+ .addShard(ShardRouting.newUnassigned(shardId, null, false,
+ new UnassignedInfo(reason, null, null, 0, System.nanoTime(), System.currentTimeMillis(), delayed)))
.build())
)
.build();
diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java
index d7d5df54240..7cbb7e819c4 100644
--- a/test/framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java
+++ b/test/framework/src/main/java/org/elasticsearch/test/ESAllocationTestCase.java
@@ -196,7 +196,7 @@ public abstract class ESAllocationTestCase extends ESTestCase {
/** A lock {@link AllocationService} allowing tests to override time */
protected static class MockAllocationService extends AllocationService {
- private Long nanoTimeOverride = null;
+ private volatile long nanoTimeOverride = -1L;
public MockAllocationService(Settings settings, AllocationDeciders allocationDeciders, GatewayAllocator gatewayAllocator,
ShardsAllocator shardsAllocator, ClusterInfoService clusterInfoService) {
@@ -209,7 +209,7 @@ public abstract class ESAllocationTestCase extends ESTestCase {
@Override
protected long currentNanoTime() {
- return nanoTimeOverride == null ? super.currentNanoTime() : nanoTimeOverride;
+ return nanoTimeOverride == -1L ? super.currentNanoTime() : nanoTimeOverride;
}
}
@@ -238,16 +238,15 @@ public abstract class ESAllocationTestCase extends ESTestCase {
@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
final RoutingNodes.UnassignedShards.UnassignedIterator unassignedIterator = allocation.routingNodes().unassigned().iterator();
- boolean changed = false;
while (unassignedIterator.hasNext()) {
ShardRouting shard = unassignedIterator.next();
IndexMetaData indexMetaData = allocation.metaData().index(shard.getIndexName());
if (shard.primary() || shard.allocatedPostIndexCreate(indexMetaData) == false) {
continue;
}
- changed |= replicaShardAllocator.ignoreUnassignedIfDelayed(unassignedIterator, shard);
+ replicaShardAllocator.ignoreUnassignedIfDelayed(unassignedIterator, shard);
}
- return changed;
+ return false;
}
}
}