Improve scheduling fairness when batching cluster state changes with equal priority (#20775)

As the wise man @ywelsch said: currently when we batch cluster state update tasks by the same executor, we the first task un-queued from the pending task queue. That means that other tasks for the same executor are left in the queue. When those are dequeued, they will trigger another run for the same executor. This can give unfair precedence to future tasks of the same executor, even if they weren't batched in the first run. Take this queue for example (all with equal priority)

 ```
 T1 (executor 1)
 T2 (executor 1)
 T3 (executor 2)
 T4 (executor 2)
 T5 (executor 1)
 T6 (executor 1)
 ```

 If T1 & T2 are picked up first (when T5 & T6 are not yet queued), one would expect T3 & T4 to run second. However, since T2 is still in the queue, it will trigger execution of T5 & T6.

 The fix is easy - ignore processed tasks when extracting them from the queue.

Closes #20768
This commit is contained in:
Boaz Leskes 2016-10-06 16:11:27 +02:00 committed by GitHub
parent 591a8d4ec6
commit b847a835e9
2 changed files with 67 additions and 2 deletions

View File

@ -891,8 +891,12 @@ public class ClusterService extends AbstractLifecycleComponent {
@Override @Override
public void run() { public void run() {
// if this task is already processed, the executor shouldn't execute other tasks (that arrived later),
// to give other executors a chance to execute their tasks.
if (processed.get() == false) {
runTasksForExecutor(executor); runTasksForExecutor(executor);
} }
}
public String toString(ClusterStateTaskExecutor<T> executor) { public String toString(ClusterStateTaskExecutor<T> executor) {
String taskDescription = executor.describeTasks(Collections.singletonList(task)); String taskDescription = executor.describeTasks(Collections.singletonList(task));

View File

@ -19,7 +19,6 @@
package org.elasticsearch.cluster.service; package org.elasticsearch.cluster.service;
import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier; import org.apache.logging.log4j.util.Supplier;
@ -54,6 +53,8 @@ import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
@ -301,6 +302,66 @@ public class ClusterServiceTests extends ESTestCase {
assertTrue(published.get()); assertTrue(published.get());
} }
public void testOneExecutorDontStarveAnother() throws InterruptedException {
final List<String> executionOrder = Collections.synchronizedList(new ArrayList<>());
final Semaphore allowProcessing = new Semaphore(0);
final Semaphore startedProcessing = new Semaphore(0);
class TaskExecutor implements ClusterStateTaskExecutor<String> {
@Override
public BatchResult<String> execute(ClusterState currentState, List<String> tasks) throws Exception {
executionOrder.addAll(tasks); // do this first, so startedProcessing can be used as a notification that this is done.
startedProcessing.release(tasks.size());
allowProcessing.acquire(tasks.size());
return BatchResult.<String>builder().successes(tasks).build(ClusterState.builder(currentState).build());
}
@Override
public boolean runOnlyOnMaster() {
return false;
}
}
TaskExecutor executorA = new TaskExecutor();
TaskExecutor executorB = new TaskExecutor();
final ClusterStateTaskConfig config = ClusterStateTaskConfig.build(Priority.NORMAL);
final ClusterStateTaskListener noopListener = (source, e) -> { throw new AssertionError(source, e); };
// this blocks the cluster state queue, so we can set it up right
clusterService.submitStateUpdateTask("0", "A0", config, executorA, noopListener);
// wait to be processed
startedProcessing.acquire(1);
assertThat(executionOrder, equalTo(Arrays.asList("A0")));
// these will be the first batch
clusterService.submitStateUpdateTask("1", "A1", config, executorA, noopListener);
clusterService.submitStateUpdateTask("2", "A2", config, executorA, noopListener);
// release the first 0 task, but not the second
allowProcessing.release(1);
startedProcessing.acquire(2);
assertThat(executionOrder, equalTo(Arrays.asList("A0", "A1", "A2")));
// setup the queue with pending tasks for another executor same priority
clusterService.submitStateUpdateTask("3", "B3", config, executorB, noopListener);
clusterService.submitStateUpdateTask("4", "B4", config, executorB, noopListener);
clusterService.submitStateUpdateTask("5", "A5", config, executorA, noopListener);
clusterService.submitStateUpdateTask("6", "A6", config, executorA, noopListener);
// now release the processing
allowProcessing.release(6);
// wait for last task to be processed
startedProcessing.acquire(4);
assertThat(executionOrder, equalTo(Arrays.asList("A0", "A1", "A2", "B3", "B4", "A5", "A6")));
}
// test that for a single thread, tasks are executed in the order // test that for a single thread, tasks are executed in the order
// that they are submitted // that they are submitted
public void testClusterStateUpdateTasksAreExecutedInOrder() throws BrokenBarrierException, InterruptedException { public void testClusterStateUpdateTasksAreExecutedInOrder() throws BrokenBarrierException, InterruptedException {