Allow RerouteService to reroute at lower priority (#44338)

Today the `BatchedRerouteService` submits its delayed reroute task at `HIGH`
priority, but in some cases a lower priority would be more appropriate. This
commit adds the facility to submit delayed reroute tasks at different
priorities, such that each submitted reroute task runs at a priority no lower
than the one requested. It does not change the fact that all delayed reroute
tasks are submitted at `HIGH` priority, but at least it makes this explicit.
This commit is contained in:
David Turner 2019-07-15 17:37:31 +01:00
parent 6c7f7d4a10
commit 86ee8eab3f
15 changed files with 199 additions and 53 deletions

View File

@ -383,7 +383,7 @@ public class ShardStateAction {
// assign it again, even if that means putting it back on the node on which it previously failed: // assign it again, even if that means putting it back on the node on which it previously failed:
final String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shards", numberOfUnassignedShards); final String reason = String.format(Locale.ROOT, "[%d] unassigned shards after failing shards", numberOfUnassignedShards);
logger.trace("{}, scheduling a reroute", reason); logger.trace("{}, scheduling a reroute", reason);
rerouteService.reroute(reason, ActionListener.wrap( rerouteService.reroute(reason, Priority.HIGH, ActionListener.wrap(
r -> logger.trace("{}, reroute completed", reason), r -> logger.trace("{}, reroute completed", reason),
e -> logger.debug(new ParameterizedMessage("{}, reroute failed", reason), e))); e -> logger.debug(new ParameterizedMessage("{}, reroute failed", reason), e)));
} }

View File

@ -31,6 +31,7 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.zen.ElectMasterService; import org.elasticsearch.discovery.zen.ElectMasterService;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData; import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
@ -155,7 +156,7 @@ public class JoinTaskExecutor implements ClusterStateTaskExecutor<JoinTaskExecut
results.success(joinTask); results.success(joinTask);
} }
if (nodesChanged) { if (nodesChanged) {
rerouteService.reroute("post-join reroute", ActionListener.wrap( rerouteService.reroute("post-join reroute", Priority.HIGH, ActionListener.wrap(
r -> logger.trace("post-join reroute completed"), r -> logger.trace("post-join reroute completed"),
e -> logger.debug("post-join reroute failed", e))); e -> logger.debug("post-join reroute failed", e)));

View File

@ -24,7 +24,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.PlainListenableActionFuture;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.NotMasterException; import org.elasticsearch.cluster.NotMasterException;
@ -32,6 +31,8 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiFunction; import java.util.function.BiFunction;
/** /**
@ -49,7 +50,8 @@ public class BatchedRerouteService implements RerouteService {
private final Object mutex = new Object(); private final Object mutex = new Object();
@Nullable // null if no reroute is currently pending @Nullable // null if no reroute is currently pending
private PlainListenableActionFuture<Void> pendingRerouteListeners; private List<ActionListener<Void>> pendingRerouteListeners;
private Priority pendingTaskPriority = Priority.LANGUID;
/** /**
* @param reroute Function that computes the updated cluster state after it has been rerouted. * @param reroute Function that computes the updated cluster state after it has been rerouted.
@ -63,29 +65,55 @@ public class BatchedRerouteService implements RerouteService {
* Initiates a reroute. * Initiates a reroute.
*/ */
@Override @Override
public final void reroute(String reason, ActionListener<Void> listener) { public final void reroute(String reason, Priority priority, ActionListener<Void> listener) {
final PlainListenableActionFuture<Void> currentListeners; final List<ActionListener<Void>> currentListeners;
synchronized (mutex) { synchronized (mutex) {
if (pendingRerouteListeners != null) { if (pendingRerouteListeners != null) {
logger.trace("already has pending reroute, adding [{}] to batch", reason); if (priority.sameOrAfter(pendingTaskPriority)) {
pendingRerouteListeners.addListener(listener); logger.trace("already has pending reroute at priority [{}], adding [{}] with priority [{}] to batch",
return; pendingTaskPriority, reason, priority);
pendingRerouteListeners.add(listener);
return;
} else {
logger.trace("already has pending reroute at priority [{}], promoting batch to [{}] and adding [{}]",
pendingTaskPriority, priority, reason);
currentListeners = new ArrayList<>(1 + pendingRerouteListeners.size());
currentListeners.add(listener);
currentListeners.addAll(pendingRerouteListeners);
pendingRerouteListeners.clear();
pendingRerouteListeners = currentListeners;
pendingTaskPriority = priority;
}
} else {
logger.trace("no pending reroute, scheduling reroute [{}] at priority [{}]", reason, priority);
currentListeners = new ArrayList<>(1);
currentListeners.add(listener);
pendingRerouteListeners = currentListeners;
pendingTaskPriority = priority;
} }
currentListeners = PlainListenableActionFuture.newListenableFuture();
currentListeners.addListener(listener);
pendingRerouteListeners = currentListeners;
} }
logger.trace("rerouting [{}]", reason);
try { try {
clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")", clusterService.submitStateUpdateTask(CLUSTER_UPDATE_TASK_SOURCE + "(" + reason + ")",
new ClusterStateUpdateTask(Priority.HIGH) { new ClusterStateUpdateTask(priority) {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
final boolean currentListenersArePending;
synchronized (mutex) { synchronized (mutex) {
assert pendingRerouteListeners == currentListeners; assert currentListeners.isEmpty() == (pendingRerouteListeners != currentListeners)
pendingRerouteListeners = null; : "currentListeners=" + currentListeners + ", pendingRerouteListeners=" + pendingRerouteListeners;
currentListenersArePending = pendingRerouteListeners == currentListeners;
if (currentListenersArePending) {
pendingRerouteListeners = null;
}
}
if (currentListenersArePending) {
logger.trace("performing batched reroute [{}]", reason);
return reroute.apply(currentState, reason);
} else {
logger.trace("batched reroute [{}] was promoted", reason);
return currentState;
} }
return reroute.apply(currentState, reason);
} }
@Override @Override
@ -95,7 +123,7 @@ public class BatchedRerouteService implements RerouteService {
pendingRerouteListeners = null; pendingRerouteListeners = null;
} }
} }
currentListeners.onFailure(new NotMasterException("delayed reroute [" + reason + "] cancelled")); ActionListener.onFailure(currentListeners, new NotMasterException("delayed reroute [" + reason + "] cancelled"));
// no big deal, the new master will reroute again // no big deal, the new master will reroute again
} }
@ -114,22 +142,26 @@ public class BatchedRerouteService implements RerouteService {
logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state version [{}]", logger.error(() -> new ParameterizedMessage("unexpected failure during [{}], current state version [{}]",
source, state.version()), e); source, state.version()), e);
} }
currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] failed", e)); ActionListener.onFailure(currentListeners,
new ElasticsearchException("delayed reroute [" + reason + "] failed", e));
} }
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
currentListeners.onResponse(null); ActionListener.onResponse(currentListeners, null);
} }
}); });
} catch (Exception e) { } catch (Exception e) {
synchronized (mutex) { synchronized (mutex) {
assert pendingRerouteListeners == currentListeners; assert currentListeners.isEmpty() == (pendingRerouteListeners != currentListeners);
pendingRerouteListeners = null; if (pendingRerouteListeners == currentListeners) {
pendingRerouteListeners = null;
}
} }
ClusterState state = clusterService.state(); ClusterState state = clusterService.state();
logger.warn(() -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e); logger.warn(() -> new ParameterizedMessage("failed to reroute routing table, current state:\n{}", state), e);
currentListeners.onFailure(new ElasticsearchException("delayed reroute [" + reason + "] could not be submitted", e)); ActionListener.onFailure(currentListeners,
new ElasticsearchException("delayed reroute [" + reason + "] could not be submitted", e));
} }
} }
} }

View File

@ -19,11 +19,19 @@
package org.elasticsearch.cluster.routing; package org.elasticsearch.cluster.routing;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.Priority;
/** /**
* Asynchronously performs a cluster reroute, updating any shard states and rebalancing the cluster if appropriate. * Asynchronously performs a cluster reroute, updating any shard states and rebalancing the cluster if appropriate.
*/ */
@FunctionalInterface @FunctionalInterface
public interface RerouteService { public interface RerouteService {
void reroute(String reason, ActionListener<Void> listener);
/**
* Schedule a cluster reroute.
* @param priority the (minimum) priority at which to run this reroute. If there is already a pending reroute at a higher priority then
* this reroute is batched with the pending one; if there is already a pending reroute at a lower priority then
* the priority of the pending batch is raised to the given priority.
*/
void reroute(String reason, Priority priority, ActionListener<Void> listener);
} }

View File

@ -34,6 +34,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.RerouteService; import org.elasticsearch.cluster.routing.RerouteService;
import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings; import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
@ -185,7 +186,7 @@ public class DiskThresholdMonitor {
if (reroute) { if (reroute) {
logger.info("rerouting shards: [{}]", explanation); logger.info("rerouting shards: [{}]", explanation);
rerouteService.reroute("disk threshold monitor", ActionListener.wrap(r -> { rerouteService.reroute("disk threshold monitor", Priority.HIGH, ActionListener.wrap(r -> {
setLastRunTimeMillis(); setLastRunTimeMillis();
listener.onResponse(r); listener.onResponse(r);
}, e -> { }, e -> {

View File

@ -60,10 +60,18 @@ public enum Priority {
this.value = value; this.value = value;
} }
/**
* @return whether tasks of {@code this} priority will run after those of priority {@code p}.
* For instance, {@code Priority.URGENT.after(Priority.IMMEDIATE)} returns {@code true}.
*/
public boolean after(Priority p) { public boolean after(Priority p) {
return this.compareTo(p) > 0; return this.compareTo(p) > 0;
} }
/**
* @return whether tasks of {@code this} priority will run no earlier than those of priority {@code p}.
* For instance, {@code Priority.URGENT.sameOrAfter(Priority.IMMEDIATE)} returns {@code true}.
*/
public boolean sameOrAfter(Priority p) { public boolean sameOrAfter(Priority p) {
return this.compareTo(p) >= 0; return this.compareTo(p) >= 0;
} }

View File

@ -31,6 +31,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision; import org.elasticsearch.cluster.routing.allocation.AllocateUnassignedDecision;
import org.elasticsearch.cluster.routing.allocation.FailedShard; import org.elasticsearch.cluster.routing.allocation.FailedShard;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
@ -103,6 +104,8 @@ public class GatewayAllocator {
} }
public void allocateUnassigned(final RoutingAllocation allocation) { public void allocateUnassigned(final RoutingAllocation allocation) {
assert primaryShardAllocator != null;
assert replicaShardAllocator != null;
innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator); innerAllocatedUnassigned(allocation, primaryShardAllocator, replicaShardAllocator);
} }
@ -124,8 +127,10 @@ public class GatewayAllocator {
*/ */
public AllocateUnassignedDecision decideUnassignedShardAllocation(ShardRouting unassignedShard, RoutingAllocation routingAllocation) { public AllocateUnassignedDecision decideUnassignedShardAllocation(ShardRouting unassignedShard, RoutingAllocation routingAllocation) {
if (unassignedShard.primary()) { if (unassignedShard.primary()) {
assert primaryShardAllocator != null;
return primaryShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger); return primaryShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger);
} else { } else {
assert replicaShardAllocator != null;
return replicaShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger); return replicaShardAllocator.makeAllocationDecision(unassignedShard, routingAllocation, logger);
} }
} }
@ -139,7 +144,8 @@ public class GatewayAllocator {
@Override @Override
protected void reroute(ShardId shardId, String reason) { protected void reroute(ShardId shardId, String reason) {
logger.trace("{} scheduling reroute for {}", shardId, reason); logger.trace("{} scheduling reroute for {}", shardId, reason);
rerouteService.reroute("async_shard_fetch", ActionListener.wrap( assert rerouteService != null;
rerouteService.reroute("async_shard_fetch", Priority.HIGH, ActionListener.wrap(
r -> logger.trace("{} scheduled reroute completed for {}", shardId, reason), r -> logger.trace("{} scheduled reroute completed for {}", shardId, reason),
e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", shardId, reason), e))); e -> logger.debug(new ParameterizedMessage("{} scheduled reroute failed for {}", shardId, reason), e)));
} }

View File

@ -58,7 +58,7 @@ public class JoinHelperTests extends ESTestCase {
x -> localNode, null, Collections.emptySet()); x -> localNode, null, Collections.emptySet());
JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> null, JoinHelper joinHelper = new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> null,
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); }, (joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList(), (s, r) -> {}); Collections.emptyList(), (s, p, r) -> {});
transportService.start(); transportService.start();
DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT); DiscoveryNode node1 = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT);
@ -164,7 +164,7 @@ public class JoinHelperTests extends ESTestCase {
x -> localNode, null, Collections.emptySet()); x -> localNode, null, Collections.emptySet());
new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> localClusterState, new JoinHelper(Settings.EMPTY, null, null, transportService, () -> 0L, () -> localClusterState,
(joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); }, (joinRequest, joinCallback) -> { throw new AssertionError(); }, startJoinRequest -> { throw new AssertionError(); },
Collections.emptyList(), (s, r) -> {}); // registers request handler Collections.emptyList(), (s, p, r) -> {}); // registers request handler
transportService.start(); transportService.start();
transportService.acceptIncomingRequests(); transportService.acceptIncomingRequests();

View File

@ -174,7 +174,7 @@ public class NodeJoinTests extends ESTestCase {
() -> new InMemoryPersistedState(term, initialState), r -> emptyList(), () -> new InMemoryPersistedState(term, initialState), r -> emptyList(),
new NoOpClusterApplier(), new NoOpClusterApplier(),
Collections.emptyList(), Collections.emptyList(),
random, (s, r) -> {}, ElectionStrategy.DEFAULT_INSTANCE); random, (s, p, r) -> {}, ElectionStrategy.DEFAULT_INSTANCE);
transportService.start(); transportService.start();
transportService.acceptIncomingRequests(); transportService.acceptIncomingRequests();
transport = capturingTransport; transport = capturingTransport;

View File

@ -25,6 +25,8 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException; import org.elasticsearch.cluster.coordination.FailedToCommitClusterStateException;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
@ -32,12 +34,16 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThan;
@ -70,13 +76,14 @@ public class BatchedRerouteServiceTests extends ESTestCase {
final CountDownLatch countDownLatch = new CountDownLatch(iterations); final CountDownLatch countDownLatch = new CountDownLatch(iterations);
for (int i = 0; i < iterations; i++) { for (int i = 0; i < iterations; i++) {
rerouteCountBeforeReroute = Math.max(rerouteCountBeforeReroute, rerouteCount.get()); rerouteCountBeforeReroute = Math.max(rerouteCountBeforeReroute, rerouteCount.get());
batchedRerouteService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown)); batchedRerouteService.reroute("iteration " + i, randomFrom(EnumSet.allOf(Priority.class)),
ActionListener.wrap(countDownLatch::countDown));
} }
countDownLatch.await(10, TimeUnit.SECONDS); countDownLatch.await(10, TimeUnit.SECONDS);
assertThat(rerouteCountBeforeReroute, lessThan(rerouteCount.get())); assertThat(rerouteCountBeforeReroute, lessThan(rerouteCount.get()));
} }
public void testBatchesReroutesTogether() throws BrokenBarrierException, InterruptedException { public void testBatchesReroutesTogetherAtPriorityOfHighestSubmittedReroute() throws BrokenBarrierException, InterruptedException {
final CyclicBarrier cyclicBarrier = new CyclicBarrier(2); final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
clusterService.submitStateUpdateTask("block master service", new ClusterStateUpdateTask() { clusterService.submitStateUpdateTask("block master service", new ClusterStateUpdateTask() {
@Override @Override
@ -100,14 +107,77 @@ public class BatchedRerouteServiceTests extends ESTestCase {
return s; return s;
}); });
final int iterations = between(1, 100); final int iterations = scaledRandomIntBetween(1, 100);
final CountDownLatch countDownLatch = new CountDownLatch(iterations); final CountDownLatch tasksSubmittedCountDown = new CountDownLatch(iterations);
for (int i = 0; i < iterations; i++) { final CountDownLatch tasksCompletedCountDown = new CountDownLatch(iterations);
batchedRerouteService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown)); final List<Runnable> actions = new ArrayList<>(iterations);
final Function<Priority, Runnable> rerouteFromPriority = priority -> () -> {
final AtomicBoolean alreadyRun = new AtomicBoolean();
batchedRerouteService.reroute("reroute at " + priority, priority, ActionListener.wrap(() -> {
assertTrue(alreadyRun.compareAndSet(false, true));
tasksCompletedCountDown.countDown();
}));
tasksSubmittedCountDown.countDown();
};
actions.add(rerouteFromPriority.apply(Priority.URGENT)); // ensure at least one URGENT priority reroute
for (int i = 1; i < iterations; i++) {
final int iteration = i;
if (randomBoolean()) {
actions.add(rerouteFromPriority.apply(randomFrom(Priority.LOW, Priority.NORMAL, Priority.HIGH, Priority.URGENT)));
} else {
final Priority priority = randomFrom(Priority.NORMAL, Priority.HIGH, Priority.URGENT, Priority.IMMEDIATE);
final boolean submittedConcurrentlyWithReroute = randomBoolean();
if (submittedConcurrentlyWithReroute == false) {
tasksSubmittedCountDown.countDown(); // this task might be submitted later
}
actions.add(() -> {
clusterService.submitStateUpdateTask("other task " + iteration + " at " + priority,
new ClusterStateUpdateTask(priority) {
@Override
public ClusterState execute(ClusterState currentState) {
switch (priority) {
case IMMEDIATE:
if (submittedConcurrentlyWithReroute) {
assertFalse("should have rerouted after " + priority + " priority task", rerouteExecuted.get());
} // else this task might be submitted too late to precede the reroute
break;
case URGENT:
// may run either before or after reroute
break;
case HIGH:
case NORMAL:
assertTrue("should have rerouted before " + priority + " priority task", rerouteExecuted.get());
break;
default:
fail("unexpected priority: " + priority);
break;
}
return currentState;
}
@Override
public void onFailure(String source, Exception e) {
throw new AssertionError(source, e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
tasksCompletedCountDown.countDown();
}
});
if (submittedConcurrentlyWithReroute) {
tasksSubmittedCountDown.countDown();
}
});
}
} }
Randomness.shuffle(actions);
actions.forEach(threadPool.generic()::execute);
assertTrue(tasksSubmittedCountDown.await(10, TimeUnit.SECONDS));
cyclicBarrier.await(); // allow master thread to continue; cyclicBarrier.await(); // allow master thread to continue;
countDownLatch.await(); // wait for reroute to complete assertTrue(tasksCompletedCountDown.await(10, TimeUnit.SECONDS)); // wait for reroute to complete
assertTrue(rerouteExecuted.get()); // see above for assertion that it's only called once assertTrue(rerouteExecuted.get()); // see above for assertion that it's only called once
} }
@ -123,7 +193,19 @@ public class BatchedRerouteServiceTests extends ESTestCase {
final int iterations = between(1, 100); final int iterations = between(1, 100);
final CountDownLatch countDownLatch = new CountDownLatch(iterations); final CountDownLatch countDownLatch = new CountDownLatch(iterations);
for (int i = 0; i < iterations; i++) { for (int i = 0; i < iterations; i++) {
batchedRerouteService.reroute("iteration " + i, ActionListener.wrap(countDownLatch::countDown)); batchedRerouteService.reroute("iteration " + i,
randomFrom(EnumSet.allOf(Priority.class)), ActionListener.wrap(
r -> {
countDownLatch.countDown();
if (rarely()) {
throw new ElasticsearchException("failure during notification");
}
}, e -> {
countDownLatch.countDown();
if (randomBoolean()) {
throw new ElasticsearchException("failure during failure notification", e);
}
}));
if (rarely()) { if (rarely()) {
clusterService.getMasterService().setClusterStatePublisher( clusterService.getMasterService().setClusterStatePublisher(
randomBoolean() randomBoolean()

View File

@ -31,6 +31,7 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -44,6 +45,7 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.hamcrest.Matchers.equalTo;
public class DiskThresholdMonitorTests extends ESAllocationTestCase { public class DiskThresholdMonitorTests extends ESAllocationTestCase {
@ -77,10 +79,12 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
AtomicReference<Set<String>> indices = new AtomicReference<>(); AtomicReference<Set<String>> indices = new AtomicReference<>();
AtomicLong currentTime = new AtomicLong(); AtomicLong currentTime = new AtomicLong();
DiskThresholdMonitor monitor = new DiskThresholdMonitor(settings, () -> finalState, DiskThresholdMonitor monitor = new DiskThresholdMonitor(settings, () -> finalState,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, (reason, listener) -> { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get,
assertTrue(reroute.compareAndSet(false, true)); (reason, priority, listener) -> {
listener.onResponse(null); assertTrue(reroute.compareAndSet(false, true));
}) { assertThat(priority, equalTo(Priority.HIGH));
listener.onResponse(null);
}) {
@Override @Override
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) { protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly)); assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly));
@ -117,10 +121,12 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
assertTrue(anotherFinalClusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2")); assertTrue(anotherFinalClusterState.blocks().indexBlocked(ClusterBlockLevel.WRITE, "test_2"));
monitor = new DiskThresholdMonitor(settings, () -> anotherFinalClusterState, monitor = new DiskThresholdMonitor(settings, () -> anotherFinalClusterState,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, (reason, listener) -> { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get,
assertTrue(reroute.compareAndSet(false, true)); (reason, priority, listener) -> {
listener.onResponse(null); assertTrue(reroute.compareAndSet(false, true));
}) { assertThat(priority, equalTo(Priority.HIGH));
listener.onResponse(null);
}) {
@Override @Override
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) { protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly)); assertTrue(indices.compareAndSet(null, indicesToMarkReadOnly));
@ -144,10 +150,12 @@ public class DiskThresholdMonitorTests extends ESAllocationTestCase {
AtomicLong currentTime = new AtomicLong(); AtomicLong currentTime = new AtomicLong();
AtomicReference<ActionListener<Void>> listenerReference = new AtomicReference<>(); AtomicReference<ActionListener<Void>> listenerReference = new AtomicReference<>();
DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterState, DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterState,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get, (reason, listener) -> { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, currentTime::get,
assertNotNull(listener); (reason, priority, listener) -> {
assertTrue(listenerReference.compareAndSet(null, listener)); assertNotNull(listener);
}) { assertThat(priority, equalTo(Priority.HIGH));
assertTrue(listenerReference.compareAndSet(null, listener));
}) {
@Override @Override
protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) { protected void markIndicesReadOnly(Set<String> indicesToMarkReadOnly, ActionListener<Void> listener) {
throw new AssertionError("unexpected"); throw new AssertionError("unexpected");

View File

@ -139,7 +139,7 @@ public class NodeJoinControllerTests extends ESTestCase {
} }
masterService = ClusterServiceUtils.createMasterService(threadPool, initialState); masterService = ClusterServiceUtils.createMasterService(threadPool, initialState);
nodeJoinController = new NodeJoinController(Settings.EMPTY, masterService, createAllocationService(Settings.EMPTY), nodeJoinController = new NodeJoinController(Settings.EMPTY, masterService, createAllocationService(Settings.EMPTY),
new ElectMasterService(Settings.EMPTY), (s, r) -> {}); new ElectMasterService(Settings.EMPTY), (s, p, r) -> {});
} }
public void testSimpleJoinAccumulation() throws InterruptedException, ExecutionException { public void testSimpleJoinAccumulation() throws InterruptedException, ExecutionException {

View File

@ -370,7 +370,7 @@ public class ZenDiscoveryUnitTests extends ESTestCase {
new NamedWriteableRegistry(ClusterModule.getNamedWriteables()), new NamedWriteableRegistry(ClusterModule.getNamedWriteables()),
masterService, clusterApplier, clusterSettings, hostsResolver -> Collections.emptyList(), masterService, clusterApplier, clusterSettings, hostsResolver -> Collections.emptyList(),
ESAllocationTestCase.createAllocationService(), ESAllocationTestCase.createAllocationService(),
Collections.emptyList(), mock(GatewayMetaState.class), (s, r) -> {}); Collections.emptyList(), mock(GatewayMetaState.class), (s, p, r) -> {});
zenDiscovery.start(); zenDiscovery.start();
return zenDiscovery; return zenDiscovery;
} }

View File

@ -213,7 +213,7 @@ public class ClusterStateChanges {
transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver); transportService, clusterService, threadPool, createIndexService, actionFilters, indexNameExpressionResolver);
nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger); nodeRemovalExecutor = new NodeRemovalClusterStateTaskExecutor(allocationService, logger);
joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, (s, r) -> {}); joinTaskExecutor = new JoinTaskExecutor(Settings.EMPTY, allocationService, logger, (s, p, r) -> {});
} }
public ClusterState createIndex(ClusterState state, CreateIndexRequest request) { public ClusterState createIndex(ClusterState state, CreateIndexRequest request) {

View File

@ -887,7 +887,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY); final AllocationService allocationService = ESAllocationTestCase.createAllocationService(Settings.EMPTY);
coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(), coordinator = new Coordinator("test_node", settings, clusterSettings, transportService, writableRegistry(),
allocationService, masterService, this::getPersistedState, allocationService, masterService, this::getPersistedState,
Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), (s, r) -> {}, Cluster.this::provideSeedHosts, clusterApplierService, onJoinValidators, Randomness.get(), (s, p, r) -> {},
getElectionStrategy()); getElectionStrategy());
masterService.setClusterStatePublisher(coordinator); masterService.setClusterStatePublisher(coordinator);
final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService, final GatewayService gatewayService = new GatewayService(settings, allocationService, clusterService,