Introduced a new IMMEDIATE priority - higher than URGENT

* applied to cluster update settings, reroute, node join/leave events, node failure

Closes #5062
This commit is contained in:
uboness 2014-02-10 14:21:40 +01:00
parent 5da883799a
commit da938a659d
8 changed files with 204 additions and 88 deletions

View File

@ -159,7 +159,7 @@ public class ClusterHealthRequest extends MasterNodeReadOperationRequest<Cluster
waitForNodes = in.readString();
readLocal(in);
if (in.readBoolean()) {
waitForEvents = Priority.fromByte(in.readByte());
waitForEvents = Priority.readFrom(in);
}
}
@ -189,7 +189,7 @@ public class ClusterHealthRequest extends MasterNodeReadOperationRequest<Cluster
out.writeBoolean(false);
} else {
out.writeBoolean(true);
out.writeByte(waitForEvents.value());
Priority.writeTo(waitForEvents, out);
}
}
}

View File

@ -73,7 +73,7 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA
@Override
protected void masterOperation(final ClusterRerouteRequest request, final ClusterState state, final ActionListener<ClusterRerouteResponse> listener) throws ElasticsearchException {
clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.URGENT, new AckedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.IMMEDIATE, new AckedClusterStateUpdateTask() {
private volatile ClusterState clusterStateToSend;
private volatile RoutingExplanations explanations;

View File

@ -87,7 +87,7 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe
final ImmutableSettings.Builder transientUpdates = ImmutableSettings.settingsBuilder();
final ImmutableSettings.Builder persistentUpdates = ImmutableSettings.settingsBuilder();
clusterService.submitStateUpdateTask("cluster_update_settings", Priority.URGENT, new AckedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("cluster_update_settings", Priority.IMMEDIATE, new AckedClusterStateUpdateTask() {
private volatile boolean changed = false;

View File

@ -86,7 +86,7 @@ public class PendingClusterTask implements Streamable {
@Override
public void readFrom(StreamInput in) throws IOException {
insertOrder = in.readVLong();
priority = Priority.fromByte(in.readByte());
priority = Priority.readFrom(in);
source = in.readText();
timeInQueue = in.readVLong();
}
@ -94,7 +94,7 @@ public class PendingClusterTask implements Streamable {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(insertOrder);
out.writeByte(priority.value());
Priority.writeTo(priority, out);
out.writeText(source);
out.writeVLong(timeInQueue);
}

View File

@ -19,34 +19,49 @@
package org.elasticsearch.common;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
*
*/
public final class Priority implements Comparable<Priority> {
public static Priority readFrom(StreamInput input) throws IOException {
return fromByte(input.readByte());
}
public static void writeTo(Priority priority, StreamOutput output) throws IOException {
byte b = priority.value;
if (output.getVersion().before(Version.V_1_1_0)) {
b = (byte) Math.max(URGENT.value, b);
}
output.writeByte(b);
}
public static Priority fromByte(byte b) {
switch (b) {
case 0:
return URGENT;
case 1:
return HIGH;
case 2:
return NORMAL;
case 3:
return LOW;
case 4:
return LANGUID;
case -1: return IMMEDIATE;
case 0: return URGENT;
case 1: return HIGH;
case 2: return NORMAL;
case 3: return LOW;
case 4: return LANGUID;
default:
throw new ElasticsearchIllegalArgumentException("can't find priority for [" + b + "]");
}
}
public static Priority URGENT = new Priority((byte) 0);
public static Priority HIGH = new Priority((byte) 1);
public static Priority NORMAL = new Priority((byte) 2);
public static Priority LOW = new Priority((byte) 3);
public static Priority LANGUID = new Priority((byte) 4);
public static final Priority IMMEDIATE = new Priority((byte) -1);
public static final Priority URGENT = new Priority((byte) 0);
public static final Priority HIGH = new Priority((byte) 1);
public static final Priority NORMAL = new Priority((byte) 2);
public static final Priority LOW = new Priority((byte) 3);
public static final Priority LANGUID = new Priority((byte) 4);
private static final Priority[] values = new Priority[] { IMMEDIATE, URGENT, HIGH, NORMAL, LOW, LANGUID };
private final byte value;
@ -54,12 +69,23 @@ public final class Priority implements Comparable<Priority> {
this.value = value;
}
public byte value() {
return this.value;
/**
* @return an array of all available priorities, sorted from the highest to the lowest.
*/
public static Priority[] values() {
return values;
}
public int compareTo(Priority p) {
return this.value - p.value;
return (this.value < p.value) ? -1 : ((this.value > p.value) ? 1 : 0);
}
public boolean after(Priority p) {
return value > p.value;
}
public boolean sameOrAfter(Priority p) {
return value >= p.value;
}
@Override
@ -82,14 +108,11 @@ public final class Priority implements Comparable<Priority> {
@Override
public String toString() {
switch (value) {
case (byte) 0:
return "URGENT";
case (byte) 1:
return "HIGH";
case (byte) 2:
return "NORMAL";
case (byte) 3:
return "LOW";
case (byte) -1: return "IMMEDIATE";
case (byte) 0: return "URGENT";
case (byte) 1: return "HIGH";
case (byte) 2: return "NORMAL";
case (byte) 3: return "LOW";
default:
return "LANGUID";
}

View File

@ -370,7 +370,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
return;
}
if (master) {
clusterService.submitStateUpdateTask("zen-disco-node_left(" + node + ")", Priority.URGENT, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-node_left(" + node + ")", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder(currentState.nodes()).remove(node.id());
@ -404,7 +404,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
// nothing to do here...
return;
}
clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, Priority.URGENT, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
DiscoveryNodes.Builder builder = DiscoveryNodes.builder(currentState.nodes())
@ -441,7 +441,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
// nothing to do here...
return;
}
clusterService.submitStateUpdateTask("zen-disco-minimum_master_nodes_changed", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-minimum_master_nodes_changed", Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
final int prevMinimumMasterNode = ZenDiscovery.this.electMaster.minimumMasterNodes();
@ -477,7 +477,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
logger.info("master_left [{}], reason [{}]", masterNode, reason);
clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.URGENT, new ProcessedClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (!masterNode.id().equals(currentState.nodes().masterNodeId())) {
@ -707,7 +707,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent<Discovery> implemen
// node calling the join request
membership.sendValidateJoinRequestBlocking(node, state, pingTimeout);
clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", Priority.URGENT, new ClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", Priority.IMMEDIATE, new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
if (currentState.nodes().nodeExists(node.id())) {

View File

@ -580,7 +580,86 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest {
assertThat(testService1.master(), is(false));
assertThat(testService2.master(), is(true));
}
}
/**
* Note, this test can only work as long as we have a single thread executor executing the state update tasks!
*/
@Test
public void testPriorizedTasks() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "local")
.build();
cluster().startNode(settings);
ClusterService clusterService = cluster().getInstance(ClusterService.class);
BlockingTask block = new BlockingTask();
clusterService.submitStateUpdateTask("test", Priority.IMMEDIATE, block);
int taskCount = randomIntBetween(5, 20);
Priority[] priorities = Priority.values();
// will hold all the tasks in the order in which they were executed
List<PrioritiezedTask> tasks = new ArrayList<PrioritiezedTask>(taskCount);
CountDownLatch latch = new CountDownLatch(taskCount);
for (int i = 0; i < taskCount; i++) {
Priority priority = priorities[randomIntBetween(0, priorities.length - 1)];
clusterService.submitStateUpdateTask("test", priority, new PrioritiezedTask(priority, latch, tasks));
}
block.release();
latch.await();
Priority prevPriority = null;
for (PrioritiezedTask task : tasks) {
if (prevPriority == null) {
prevPriority = task.priority;
} else {
assertThat(task.priority.sameOrAfter(prevPriority), is(true));
}
}
}
private static class BlockingTask implements ClusterStateUpdateTask {
private final CountDownLatch latch = new CountDownLatch(1);
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
latch.await();
return currentState;
}
@Override
public void onFailure(String source, Throwable t) {
}
public void release() {
latch.countDown();
}
}
private static class PrioritiezedTask implements ClusterStateUpdateTask {
private final Priority priority;
private final CountDownLatch latch;
private final List<PrioritiezedTask> tasks;
private PrioritiezedTask(Priority priority, CountDownLatch latch, List<PrioritiezedTask> tasks) {
this.priority = priority;
this.latch = latch;
this.tasks = tasks;
}
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
tasks.add(this);
latch.countDown();
return currentState;
}
@Override
public void onFailure(String source, Throwable t) {
latch.countDown();
}
}
public static class TestPlugin extends AbstractPlugin {

View File

@ -18,18 +18,20 @@
*/
package org.elasticsearch.common.util.concurrent;
import com.google.common.collect.Lists;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
/**
*
@ -39,37 +41,42 @@ public class PrioritizedExecutorsTests extends ElasticsearchTestCase {
@Test
public void testPriorityQueue() throws Exception {
PriorityBlockingQueue<Priority> queue = new PriorityBlockingQueue<Priority>();
queue.add(Priority.LANGUID);
queue.add(Priority.NORMAL);
queue.add(Priority.HIGH);
queue.add(Priority.LOW);
queue.add(Priority.URGENT);
List<Priority> priorities = Lists.newArrayList(Priority.values());
Collections.shuffle(priorities);
assertThat(queue.poll(), equalTo(Priority.URGENT));
assertThat(queue.poll(), equalTo(Priority.HIGH));
assertThat(queue.poll(), equalTo(Priority.NORMAL));
assertThat(queue.poll(), equalTo(Priority.LOW));
assertThat(queue.poll(), equalTo(Priority.LANGUID));
for (Priority priority : priorities) {
queue.add(priority);
}
Priority prevPriority = null;
while (!queue.isEmpty()) {
if (prevPriority == null) {
prevPriority = queue.poll();
} else {
assertThat(queue.poll().after(prevPriority), is(true));
}
}
}
@Test
public void testSubmitPrioritizedExecutorWithRunnables() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory());
List<Integer> results = new ArrayList<Integer>(7);
List<Integer> results = new ArrayList<Integer>(8);
CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(7);
CountDownLatch finishedLatch = new CountDownLatch(8);
executor.submit(new AwaitingJob(awaitingLatch));
executor.submit(new Job(6, Priority.LANGUID, results, finishedLatch));
executor.submit(new Job(4, Priority.LOW, results, finishedLatch));
executor.submit(new Job(1, Priority.HIGH, results, finishedLatch));
executor.submit(new Job(5, Priority.LOW, results, finishedLatch)); // will execute after the first LOW (fifo)
executor.submit(new Job(0, Priority.URGENT, results, finishedLatch));
executor.submit(new Job(3, Priority.NORMAL, results, finishedLatch));
executor.submit(new Job(2, Priority.HIGH, results, finishedLatch)); // will execute after the first HIGH (fifo)
executor.submit(new Job(7, Priority.LANGUID, results, finishedLatch));
executor.submit(new Job(5, Priority.LOW, results, finishedLatch));
executor.submit(new Job(2, Priority.HIGH, results, finishedLatch));
executor.submit(new Job(6, Priority.LOW, results, finishedLatch)); // will execute after the first LOW (fifo)
executor.submit(new Job(1, Priority.URGENT, results, finishedLatch));
executor.submit(new Job(4, Priority.NORMAL, results, finishedLatch));
executor.submit(new Job(3, Priority.HIGH, results, finishedLatch)); // will execute after the first HIGH (fifo)
executor.submit(new Job(0, Priority.IMMEDIATE, results, finishedLatch));
awaitingLatch.countDown();
finishedLatch.await();
assertThat(results.size(), equalTo(7));
assertThat(results.size(), equalTo(8));
assertThat(results.get(0), equalTo(0));
assertThat(results.get(1), equalTo(1));
assertThat(results.get(2), equalTo(2));
@ -77,26 +84,28 @@ public class PrioritizedExecutorsTests extends ElasticsearchTestCase {
assertThat(results.get(4), equalTo(4));
assertThat(results.get(5), equalTo(5));
assertThat(results.get(6), equalTo(6));
assertThat(results.get(7), equalTo(7));
}
@Test
public void testExecutePrioritizedExecutorWithRunnables() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory());
List<Integer> results = new ArrayList<Integer>(7);
List<Integer> results = new ArrayList<Integer>(8);
CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(7);
CountDownLatch finishedLatch = new CountDownLatch(8);
executor.execute(new AwaitingJob(awaitingLatch));
executor.execute(new Job(6, Priority.LANGUID, results, finishedLatch));
executor.execute(new Job(4, Priority.LOW, results, finishedLatch));
executor.execute(new Job(1, Priority.HIGH, results, finishedLatch));
executor.execute(new Job(5, Priority.LOW, results, finishedLatch)); // will execute after the first LOW (fifo)
executor.execute(new Job(0, Priority.URGENT, results, finishedLatch));
executor.execute(new Job(3, Priority.NORMAL, results, finishedLatch));
executor.execute(new Job(2, Priority.HIGH, results, finishedLatch)); // will execute after the first HIGH (fifo)
executor.execute(new Job(7, Priority.LANGUID, results, finishedLatch));
executor.execute(new Job(5, Priority.LOW, results, finishedLatch));
executor.execute(new Job(2, Priority.HIGH, results, finishedLatch));
executor.execute(new Job(6, Priority.LOW, results, finishedLatch)); // will execute after the first LOW (fifo)
executor.execute(new Job(1, Priority.URGENT, results, finishedLatch));
executor.execute(new Job(4, Priority.NORMAL, results, finishedLatch));
executor.execute(new Job(3, Priority.HIGH, results, finishedLatch)); // will execute after the first HIGH (fifo)
executor.execute(new Job(0, Priority.IMMEDIATE, results, finishedLatch));
awaitingLatch.countDown();
finishedLatch.await();
assertThat(results.size(), equalTo(7));
assertThat(results.size(), equalTo(8));
assertThat(results.get(0), equalTo(0));
assertThat(results.get(1), equalTo(1));
assertThat(results.get(2), equalTo(2));
@ -104,26 +113,28 @@ public class PrioritizedExecutorsTests extends ElasticsearchTestCase {
assertThat(results.get(4), equalTo(4));
assertThat(results.get(5), equalTo(5));
assertThat(results.get(6), equalTo(6));
assertThat(results.get(7), equalTo(7));
}
@Test
public void testSubmitPrioritizedExecutorWithCallables() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory());
List<Integer> results = new ArrayList<Integer>(7);
List<Integer> results = new ArrayList<Integer>(8);
CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(7);
CountDownLatch finishedLatch = new CountDownLatch(8);
executor.submit(new AwaitingJob(awaitingLatch));
executor.submit(new CallableJob(6, Priority.LANGUID, results, finishedLatch));
executor.submit(new CallableJob(4, Priority.LOW, results, finishedLatch));
executor.submit(new CallableJob(1, Priority.HIGH, results, finishedLatch));
executor.submit(new CallableJob(5, Priority.LOW, results, finishedLatch)); // will execute after the first LOW (fifo)
executor.submit(new CallableJob(0, Priority.URGENT, results, finishedLatch));
executor.submit(new CallableJob(3, Priority.NORMAL, results, finishedLatch));
executor.submit(new CallableJob(2, Priority.HIGH, results, finishedLatch)); // will execute after the first HIGH (fifo)
executor.submit(new CallableJob(7, Priority.LANGUID, results, finishedLatch));
executor.submit(new CallableJob(5, Priority.LOW, results, finishedLatch));
executor.submit(new CallableJob(2, Priority.HIGH, results, finishedLatch));
executor.submit(new CallableJob(6, Priority.LOW, results, finishedLatch)); // will execute after the first LOW (fifo)
executor.submit(new CallableJob(1, Priority.URGENT, results, finishedLatch));
executor.submit(new CallableJob(4, Priority.NORMAL, results, finishedLatch));
executor.submit(new CallableJob(3, Priority.HIGH, results, finishedLatch)); // will execute after the first HIGH (fifo)
executor.submit(new CallableJob(0, Priority.IMMEDIATE, results, finishedLatch));
awaitingLatch.countDown();
finishedLatch.await();
assertThat(results.size(), equalTo(7));
assertThat(results.size(), equalTo(8));
assertThat(results.get(0), equalTo(0));
assertThat(results.get(1), equalTo(1));
assertThat(results.get(2), equalTo(2));
@ -131,26 +142,28 @@ public class PrioritizedExecutorsTests extends ElasticsearchTestCase {
assertThat(results.get(4), equalTo(4));
assertThat(results.get(5), equalTo(5));
assertThat(results.get(6), equalTo(6));
assertThat(results.get(7), equalTo(7));
}
@Test
public void testSubmitPrioritizedExecutorWithMixed() throws Exception {
ExecutorService executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory());
List<Integer> results = new ArrayList<Integer>(7);
List<Integer> results = new ArrayList<Integer>(8);
CountDownLatch awaitingLatch = new CountDownLatch(1);
CountDownLatch finishedLatch = new CountDownLatch(7);
CountDownLatch finishedLatch = new CountDownLatch(8);
executor.submit(new AwaitingJob(awaitingLatch));
executor.submit(new CallableJob(6, Priority.LANGUID, results, finishedLatch));
executor.submit(new Job(4, Priority.LOW, results, finishedLatch));
executor.submit(new CallableJob(1, Priority.HIGH, results, finishedLatch));
executor.submit(new Job(5, Priority.LOW, results, finishedLatch)); // will execute after the first LOW (fifo)
executor.submit(new CallableJob(0, Priority.URGENT, results, finishedLatch));
executor.submit(new Job(3, Priority.NORMAL, results, finishedLatch));
executor.submit(new CallableJob(2, Priority.HIGH, results, finishedLatch)); // will execute after the first HIGH (fifo)
executor.submit(new CallableJob(7, Priority.LANGUID, results, finishedLatch));
executor.submit(new Job(5, Priority.LOW, results, finishedLatch));
executor.submit(new CallableJob(2, Priority.HIGH, results, finishedLatch));
executor.submit(new Job(6, Priority.LOW, results, finishedLatch)); // will execute after the first LOW (fifo)
executor.submit(new CallableJob(1, Priority.URGENT, results, finishedLatch));
executor.submit(new Job(4, Priority.NORMAL, results, finishedLatch));
executor.submit(new CallableJob(3, Priority.HIGH, results, finishedLatch)); // will execute after the first HIGH (fifo)
executor.submit(new Job(0, Priority.IMMEDIATE, results, finishedLatch));
awaitingLatch.countDown();
finishedLatch.await();
assertThat(results.size(), equalTo(7));
assertThat(results.size(), equalTo(8));
assertThat(results.get(0), equalTo(0));
assertThat(results.get(1), equalTo(1));
assertThat(results.get(2), equalTo(2));
@ -158,6 +171,7 @@ public class PrioritizedExecutorsTests extends ElasticsearchTestCase {
assertThat(results.get(4), equalTo(4));
assertThat(results.get(5), equalTo(5));
assertThat(results.get(6), equalTo(6));
assertThat(results.get(7), equalTo(7));
}
@Test