From b847a835e9b1d2d4b1f5329a1667aa5776da58a6 Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Thu, 6 Oct 2016 16:11:27 +0200 Subject: [PATCH] Improve scheduling fairness when batching cluster state changes with equal priority (#20775) As the wise man @ywelsch said: currently when we batch cluster state update tasks by the same executor, we the first task un-queued from the pending task queue. That means that other tasks for the same executor are left in the queue. When those are dequeued, they will trigger another run for the same executor. This can give unfair precedence to future tasks of the same executor, even if they weren't batched in the first run. Take this queue for example (all with equal priority) ``` T1 (executor 1) T2 (executor 1) T3 (executor 2) T4 (executor 2) T5 (executor 1) T6 (executor 1) ``` If T1 & T2 are picked up first (when T5 & T6 are not yet queued), one would expect T3 & T4 to run second. However, since T2 is still in the queue, it will trigger execution of T5 & T6. The fix is easy - ignore processed tasks when extracting them from the queue. Closes #20768 --- .../cluster/service/ClusterService.java | 6 +- .../cluster/service/ClusterServiceTests.java | 63 ++++++++++++++++++- 2 files changed, 67 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java index e981313f402..38d42a0b0f9 100644 --- a/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java +++ b/core/src/main/java/org/elasticsearch/cluster/service/ClusterService.java @@ -891,7 +891,11 @@ public class ClusterService extends AbstractLifecycleComponent { @Override public void run() { - runTasksForExecutor(executor); + // if this task is already processed, the executor shouldn't execute other tasks (that arrived later), + // to give other executors a chance to execute their tasks. + if (processed.get() == false) { + runTasksForExecutor(executor); + } } public String toString(ClusterStateTaskExecutor executor) { diff --git a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java index af5dc422e66..a0d6bc8e407 100644 --- a/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/service/ClusterServiceTests.java @@ -19,7 +19,6 @@ package org.elasticsearch.cluster.service; import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; @@ -54,6 +53,8 @@ import org.junit.Before; import org.junit.BeforeClass; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -301,6 +302,66 @@ public class ClusterServiceTests extends ESTestCase { assertTrue(published.get()); } + public void testOneExecutorDontStarveAnother() throws InterruptedException { + final List executionOrder = Collections.synchronizedList(new ArrayList<>()); + final Semaphore allowProcessing = new Semaphore(0); + final Semaphore startedProcessing = new Semaphore(0); + + class TaskExecutor implements ClusterStateTaskExecutor { + + @Override + public BatchResult execute(ClusterState currentState, List tasks) throws Exception { + executionOrder.addAll(tasks); // do this first, so startedProcessing can be used as a notification that this is done. + startedProcessing.release(tasks.size()); + allowProcessing.acquire(tasks.size()); + return BatchResult.builder().successes(tasks).build(ClusterState.builder(currentState).build()); + } + + @Override + public boolean runOnlyOnMaster() { + return false; + } + } + + TaskExecutor executorA = new TaskExecutor(); + TaskExecutor executorB = new TaskExecutor(); + + final ClusterStateTaskConfig config = ClusterStateTaskConfig.build(Priority.NORMAL); + final ClusterStateTaskListener noopListener = (source, e) -> { throw new AssertionError(source, e); }; + // this blocks the cluster state queue, so we can set it up right + clusterService.submitStateUpdateTask("0", "A0", config, executorA, noopListener); + // wait to be processed + startedProcessing.acquire(1); + assertThat(executionOrder, equalTo(Arrays.asList("A0"))); + + + // these will be the first batch + clusterService.submitStateUpdateTask("1", "A1", config, executorA, noopListener); + clusterService.submitStateUpdateTask("2", "A2", config, executorA, noopListener); + + // release the first 0 task, but not the second + allowProcessing.release(1); + startedProcessing.acquire(2); + assertThat(executionOrder, equalTo(Arrays.asList("A0", "A1", "A2"))); + + // setup the queue with pending tasks for another executor same priority + clusterService.submitStateUpdateTask("3", "B3", config, executorB, noopListener); + clusterService.submitStateUpdateTask("4", "B4", config, executorB, noopListener); + + + clusterService.submitStateUpdateTask("5", "A5", config, executorA, noopListener); + clusterService.submitStateUpdateTask("6", "A6", config, executorA, noopListener); + + // now release the processing + allowProcessing.release(6); + + // wait for last task to be processed + startedProcessing.acquire(4); + + assertThat(executionOrder, equalTo(Arrays.asList("A0", "A1", "A2", "B3", "B4", "A5", "A6"))); + + } + // test that for a single thread, tasks are executed in the order // that they are submitted public void testClusterStateUpdateTasksAreExecutedInOrder() throws BrokenBarrierException, InterruptedException {