diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java b/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java index 3fe6efd43ba..28f94b19e75 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/health/ClusterHealthRequest.java @@ -159,7 +159,7 @@ public class ClusterHealthRequest extends MasterNodeReadOperationRequest listener) throws ElasticsearchException { - clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.URGENT, new AckedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.IMMEDIATE, new AckedClusterStateUpdateTask() { private volatile ClusterState clusterStateToSend; private volatile RoutingExplanations explanations; diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java index 9ce5f54e60d..6c39001e7e4 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/settings/TransportClusterUpdateSettingsAction.java @@ -87,7 +87,7 @@ public class TransportClusterUpdateSettingsAction extends TransportMasterNodeOpe final ImmutableSettings.Builder transientUpdates = ImmutableSettings.settingsBuilder(); final ImmutableSettings.Builder persistentUpdates = ImmutableSettings.settingsBuilder(); - clusterService.submitStateUpdateTask("cluster_update_settings", Priority.URGENT, new AckedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("cluster_update_settings", Priority.IMMEDIATE, new AckedClusterStateUpdateTask() { private volatile boolean changed = false; diff --git a/src/main/java/org/elasticsearch/cluster/service/PendingClusterTask.java b/src/main/java/org/elasticsearch/cluster/service/PendingClusterTask.java index 3efd396d9d0..5b90bed745d 100644 --- a/src/main/java/org/elasticsearch/cluster/service/PendingClusterTask.java +++ b/src/main/java/org/elasticsearch/cluster/service/PendingClusterTask.java @@ -86,7 +86,7 @@ public class PendingClusterTask implements Streamable { @Override public void readFrom(StreamInput in) throws IOException { insertOrder = in.readVLong(); - priority = Priority.fromByte(in.readByte()); + priority = Priority.readFrom(in); source = in.readText(); timeInQueue = in.readVLong(); } @@ -94,7 +94,7 @@ public class PendingClusterTask implements Streamable { @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(insertOrder); - out.writeByte(priority.value()); + Priority.writeTo(priority, out); out.writeText(source); out.writeVLong(timeInQueue); } diff --git a/src/main/java/org/elasticsearch/common/Priority.java b/src/main/java/org/elasticsearch/common/Priority.java index 6d7dd10b8fb..f7a3f23b262 100644 --- a/src/main/java/org/elasticsearch/common/Priority.java +++ b/src/main/java/org/elasticsearch/common/Priority.java @@ -19,34 +19,49 @@ package org.elasticsearch.common; import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.Version; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; + +import java.io.IOException; /** * */ public final class Priority implements Comparable { + public static Priority readFrom(StreamInput input) throws IOException { + return fromByte(input.readByte()); + } + + public static void writeTo(Priority priority, StreamOutput output) throws IOException { + byte b = priority.value; + if (output.getVersion().before(Version.V_1_1_0)) { + b = (byte) Math.max(URGENT.value, b); + } + output.writeByte(b); + } + public static Priority fromByte(byte b) { switch (b) { - case 0: - return URGENT; - case 1: - return HIGH; - case 2: - return NORMAL; - case 3: - return LOW; - case 4: - return LANGUID; + case -1: return IMMEDIATE; + case 0: return URGENT; + case 1: return HIGH; + case 2: return NORMAL; + case 3: return LOW; + case 4: return LANGUID; default: throw new ElasticsearchIllegalArgumentException("can't find priority for [" + b + "]"); } } - public static Priority URGENT = new Priority((byte) 0); - public static Priority HIGH = new Priority((byte) 1); - public static Priority NORMAL = new Priority((byte) 2); - public static Priority LOW = new Priority((byte) 3); - public static Priority LANGUID = new Priority((byte) 4); + public static final Priority IMMEDIATE = new Priority((byte) -1); + public static final Priority URGENT = new Priority((byte) 0); + public static final Priority HIGH = new Priority((byte) 1); + public static final Priority NORMAL = new Priority((byte) 2); + public static final Priority LOW = new Priority((byte) 3); + public static final Priority LANGUID = new Priority((byte) 4); + private static final Priority[] values = new Priority[] { IMMEDIATE, URGENT, HIGH, NORMAL, LOW, LANGUID }; private final byte value; @@ -54,12 +69,23 @@ public final class Priority implements Comparable { this.value = value; } - public byte value() { - return this.value; + /** + * @return an array of all available priorities, sorted from the highest to the lowest. + */ + public static Priority[] values() { + return values; } public int compareTo(Priority p) { - return this.value - p.value; + return (this.value < p.value) ? -1 : ((this.value > p.value) ? 1 : 0); + } + + public boolean after(Priority p) { + return value > p.value; + } + + public boolean sameOrAfter(Priority p) { + return value >= p.value; } @Override @@ -82,14 +108,11 @@ public final class Priority implements Comparable { @Override public String toString() { switch (value) { - case (byte) 0: - return "URGENT"; - case (byte) 1: - return "HIGH"; - case (byte) 2: - return "NORMAL"; - case (byte) 3: - return "LOW"; + case (byte) -1: return "IMMEDIATE"; + case (byte) 0: return "URGENT"; + case (byte) 1: return "HIGH"; + case (byte) 2: return "NORMAL"; + case (byte) 3: return "LOW"; default: return "LANGUID"; } diff --git a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java index ca59bfad0bf..16bcb18b194 100644 --- a/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java +++ b/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java @@ -370,7 +370,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen return; } if (master) { - clusterService.submitStateUpdateTask("zen-disco-node_left(" + node + ")", Priority.URGENT, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("zen-disco-node_left(" + node + ")", Priority.IMMEDIATE, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { DiscoveryNodes.Builder builder = DiscoveryNodes.builder(currentState.nodes()).remove(node.id()); @@ -404,7 +404,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen // nothing to do here... return; } - clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, Priority.URGENT, new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("zen-disco-node_failed(" + node + "), reason " + reason, Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { DiscoveryNodes.Builder builder = DiscoveryNodes.builder(currentState.nodes()) @@ -441,7 +441,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen // nothing to do here... return; } - clusterService.submitStateUpdateTask("zen-disco-minimum_master_nodes_changed", Priority.URGENT, new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("zen-disco-minimum_master_nodes_changed", Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { final int prevMinimumMasterNode = ZenDiscovery.this.electMaster.minimumMasterNodes(); @@ -477,7 +477,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen logger.info("master_left [{}], reason [{}]", masterNode, reason); - clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.URGENT, new ProcessedClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("zen-disco-master_failed (" + masterNode + ")", Priority.IMMEDIATE, new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { if (!masterNode.id().equals(currentState.nodes().masterNodeId())) { @@ -707,7 +707,7 @@ public class ZenDiscovery extends AbstractLifecycleComponent implemen // node calling the join request membership.sendValidateJoinRequestBlocking(node, state, pingTimeout); - clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", Priority.URGENT, new ClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("zen-disco-receive(join from node[" + node + "])", Priority.IMMEDIATE, new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) { if (currentState.nodes().nodeExists(node.id())) { diff --git a/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java b/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java index bcea79970a6..5f214cb796b 100644 --- a/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java +++ b/src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java @@ -580,7 +580,86 @@ public class ClusterServiceTests extends ElasticsearchIntegrationTest { assertThat(testService1.master(), is(false)); assertThat(testService2.master(), is(true)); } + } + /** + * Note, this test can only work as long as we have a single thread executor executing the state update tasks! + */ + @Test + public void testPriorizedTasks() throws Exception { + Settings settings = settingsBuilder() + .put("discovery.type", "local") + .build(); + cluster().startNode(settings); + ClusterService clusterService = cluster().getInstance(ClusterService.class); + BlockingTask block = new BlockingTask(); + clusterService.submitStateUpdateTask("test", Priority.IMMEDIATE, block); + int taskCount = randomIntBetween(5, 20); + Priority[] priorities = Priority.values(); + + // will hold all the tasks in the order in which they were executed + List tasks = new ArrayList(taskCount); + CountDownLatch latch = new CountDownLatch(taskCount); + for (int i = 0; i < taskCount; i++) { + Priority priority = priorities[randomIntBetween(0, priorities.length - 1)]; + clusterService.submitStateUpdateTask("test", priority, new PrioritiezedTask(priority, latch, tasks)); + } + + block.release(); + latch.await(); + + Priority prevPriority = null; + for (PrioritiezedTask task : tasks) { + if (prevPriority == null) { + prevPriority = task.priority; + } else { + assertThat(task.priority.sameOrAfter(prevPriority), is(true)); + } + } + } + + private static class BlockingTask implements ClusterStateUpdateTask { + private final CountDownLatch latch = new CountDownLatch(1); + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + latch.await(); + return currentState; + } + + @Override + public void onFailure(String source, Throwable t) { + } + + public void release() { + latch.countDown(); + } + + } + + private static class PrioritiezedTask implements ClusterStateUpdateTask { + + private final Priority priority; + private final CountDownLatch latch; + private final List tasks; + + private PrioritiezedTask(Priority priority, CountDownLatch latch, List tasks) { + this.priority = priority; + this.latch = latch; + this.tasks = tasks; + } + + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + tasks.add(this); + latch.countDown(); + return currentState; + } + + @Override + public void onFailure(String source, Throwable t) { + latch.countDown(); + } } public static class TestPlugin extends AbstractPlugin { diff --git a/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java b/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java index bd172175090..bfc4ee7b50c 100644 --- a/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java +++ b/src/test/java/org/elasticsearch/common/util/concurrent/PrioritizedExecutorsTests.java @@ -18,18 +18,20 @@ */ package org.elasticsearch.common.util.concurrent; +import com.google.common.collect.Lists; import org.elasticsearch.common.Priority; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.test.ElasticsearchTestCase; import org.junit.Test; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; -import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; /** * @@ -39,37 +41,42 @@ public class PrioritizedExecutorsTests extends ElasticsearchTestCase { @Test public void testPriorityQueue() throws Exception { PriorityBlockingQueue queue = new PriorityBlockingQueue(); - queue.add(Priority.LANGUID); - queue.add(Priority.NORMAL); - queue.add(Priority.HIGH); - queue.add(Priority.LOW); - queue.add(Priority.URGENT); + List priorities = Lists.newArrayList(Priority.values()); + Collections.shuffle(priorities); - assertThat(queue.poll(), equalTo(Priority.URGENT)); - assertThat(queue.poll(), equalTo(Priority.HIGH)); - assertThat(queue.poll(), equalTo(Priority.NORMAL)); - assertThat(queue.poll(), equalTo(Priority.LOW)); - assertThat(queue.poll(), equalTo(Priority.LANGUID)); + for (Priority priority : priorities) { + queue.add(priority); + } + + Priority prevPriority = null; + while (!queue.isEmpty()) { + if (prevPriority == null) { + prevPriority = queue.poll(); + } else { + assertThat(queue.poll().after(prevPriority), is(true)); + } + } } @Test public void testSubmitPrioritizedExecutorWithRunnables() throws Exception { ExecutorService executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory()); - List results = new ArrayList(7); + List results = new ArrayList(8); CountDownLatch awaitingLatch = new CountDownLatch(1); - CountDownLatch finishedLatch = new CountDownLatch(7); + CountDownLatch finishedLatch = new CountDownLatch(8); executor.submit(new AwaitingJob(awaitingLatch)); - executor.submit(new Job(6, Priority.LANGUID, results, finishedLatch)); - executor.submit(new Job(4, Priority.LOW, results, finishedLatch)); - executor.submit(new Job(1, Priority.HIGH, results, finishedLatch)); - executor.submit(new Job(5, Priority.LOW, results, finishedLatch)); // will execute after the first LOW (fifo) - executor.submit(new Job(0, Priority.URGENT, results, finishedLatch)); - executor.submit(new Job(3, Priority.NORMAL, results, finishedLatch)); - executor.submit(new Job(2, Priority.HIGH, results, finishedLatch)); // will execute after the first HIGH (fifo) + executor.submit(new Job(7, Priority.LANGUID, results, finishedLatch)); + executor.submit(new Job(5, Priority.LOW, results, finishedLatch)); + executor.submit(new Job(2, Priority.HIGH, results, finishedLatch)); + executor.submit(new Job(6, Priority.LOW, results, finishedLatch)); // will execute after the first LOW (fifo) + executor.submit(new Job(1, Priority.URGENT, results, finishedLatch)); + executor.submit(new Job(4, Priority.NORMAL, results, finishedLatch)); + executor.submit(new Job(3, Priority.HIGH, results, finishedLatch)); // will execute after the first HIGH (fifo) + executor.submit(new Job(0, Priority.IMMEDIATE, results, finishedLatch)); awaitingLatch.countDown(); finishedLatch.await(); - assertThat(results.size(), equalTo(7)); + assertThat(results.size(), equalTo(8)); assertThat(results.get(0), equalTo(0)); assertThat(results.get(1), equalTo(1)); assertThat(results.get(2), equalTo(2)); @@ -77,26 +84,28 @@ public class PrioritizedExecutorsTests extends ElasticsearchTestCase { assertThat(results.get(4), equalTo(4)); assertThat(results.get(5), equalTo(5)); assertThat(results.get(6), equalTo(6)); + assertThat(results.get(7), equalTo(7)); } @Test public void testExecutePrioritizedExecutorWithRunnables() throws Exception { ExecutorService executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory()); - List results = new ArrayList(7); + List results = new ArrayList(8); CountDownLatch awaitingLatch = new CountDownLatch(1); - CountDownLatch finishedLatch = new CountDownLatch(7); + CountDownLatch finishedLatch = new CountDownLatch(8); executor.execute(new AwaitingJob(awaitingLatch)); - executor.execute(new Job(6, Priority.LANGUID, results, finishedLatch)); - executor.execute(new Job(4, Priority.LOW, results, finishedLatch)); - executor.execute(new Job(1, Priority.HIGH, results, finishedLatch)); - executor.execute(new Job(5, Priority.LOW, results, finishedLatch)); // will execute after the first LOW (fifo) - executor.execute(new Job(0, Priority.URGENT, results, finishedLatch)); - executor.execute(new Job(3, Priority.NORMAL, results, finishedLatch)); - executor.execute(new Job(2, Priority.HIGH, results, finishedLatch)); // will execute after the first HIGH (fifo) + executor.execute(new Job(7, Priority.LANGUID, results, finishedLatch)); + executor.execute(new Job(5, Priority.LOW, results, finishedLatch)); + executor.execute(new Job(2, Priority.HIGH, results, finishedLatch)); + executor.execute(new Job(6, Priority.LOW, results, finishedLatch)); // will execute after the first LOW (fifo) + executor.execute(new Job(1, Priority.URGENT, results, finishedLatch)); + executor.execute(new Job(4, Priority.NORMAL, results, finishedLatch)); + executor.execute(new Job(3, Priority.HIGH, results, finishedLatch)); // will execute after the first HIGH (fifo) + executor.execute(new Job(0, Priority.IMMEDIATE, results, finishedLatch)); awaitingLatch.countDown(); finishedLatch.await(); - assertThat(results.size(), equalTo(7)); + assertThat(results.size(), equalTo(8)); assertThat(results.get(0), equalTo(0)); assertThat(results.get(1), equalTo(1)); assertThat(results.get(2), equalTo(2)); @@ -104,26 +113,28 @@ public class PrioritizedExecutorsTests extends ElasticsearchTestCase { assertThat(results.get(4), equalTo(4)); assertThat(results.get(5), equalTo(5)); assertThat(results.get(6), equalTo(6)); + assertThat(results.get(7), equalTo(7)); } @Test public void testSubmitPrioritizedExecutorWithCallables() throws Exception { ExecutorService executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory()); - List results = new ArrayList(7); + List results = new ArrayList(8); CountDownLatch awaitingLatch = new CountDownLatch(1); - CountDownLatch finishedLatch = new CountDownLatch(7); + CountDownLatch finishedLatch = new CountDownLatch(8); executor.submit(new AwaitingJob(awaitingLatch)); - executor.submit(new CallableJob(6, Priority.LANGUID, results, finishedLatch)); - executor.submit(new CallableJob(4, Priority.LOW, results, finishedLatch)); - executor.submit(new CallableJob(1, Priority.HIGH, results, finishedLatch)); - executor.submit(new CallableJob(5, Priority.LOW, results, finishedLatch)); // will execute after the first LOW (fifo) - executor.submit(new CallableJob(0, Priority.URGENT, results, finishedLatch)); - executor.submit(new CallableJob(3, Priority.NORMAL, results, finishedLatch)); - executor.submit(new CallableJob(2, Priority.HIGH, results, finishedLatch)); // will execute after the first HIGH (fifo) + executor.submit(new CallableJob(7, Priority.LANGUID, results, finishedLatch)); + executor.submit(new CallableJob(5, Priority.LOW, results, finishedLatch)); + executor.submit(new CallableJob(2, Priority.HIGH, results, finishedLatch)); + executor.submit(new CallableJob(6, Priority.LOW, results, finishedLatch)); // will execute after the first LOW (fifo) + executor.submit(new CallableJob(1, Priority.URGENT, results, finishedLatch)); + executor.submit(new CallableJob(4, Priority.NORMAL, results, finishedLatch)); + executor.submit(new CallableJob(3, Priority.HIGH, results, finishedLatch)); // will execute after the first HIGH (fifo) + executor.submit(new CallableJob(0, Priority.IMMEDIATE, results, finishedLatch)); awaitingLatch.countDown(); finishedLatch.await(); - assertThat(results.size(), equalTo(7)); + assertThat(results.size(), equalTo(8)); assertThat(results.get(0), equalTo(0)); assertThat(results.get(1), equalTo(1)); assertThat(results.get(2), equalTo(2)); @@ -131,26 +142,28 @@ public class PrioritizedExecutorsTests extends ElasticsearchTestCase { assertThat(results.get(4), equalTo(4)); assertThat(results.get(5), equalTo(5)); assertThat(results.get(6), equalTo(6)); + assertThat(results.get(7), equalTo(7)); } @Test public void testSubmitPrioritizedExecutorWithMixed() throws Exception { ExecutorService executor = EsExecutors.newSinglePrioritizing(Executors.defaultThreadFactory()); - List results = new ArrayList(7); + List results = new ArrayList(8); CountDownLatch awaitingLatch = new CountDownLatch(1); - CountDownLatch finishedLatch = new CountDownLatch(7); + CountDownLatch finishedLatch = new CountDownLatch(8); executor.submit(new AwaitingJob(awaitingLatch)); - executor.submit(new CallableJob(6, Priority.LANGUID, results, finishedLatch)); - executor.submit(new Job(4, Priority.LOW, results, finishedLatch)); - executor.submit(new CallableJob(1, Priority.HIGH, results, finishedLatch)); - executor.submit(new Job(5, Priority.LOW, results, finishedLatch)); // will execute after the first LOW (fifo) - executor.submit(new CallableJob(0, Priority.URGENT, results, finishedLatch)); - executor.submit(new Job(3, Priority.NORMAL, results, finishedLatch)); - executor.submit(new CallableJob(2, Priority.HIGH, results, finishedLatch)); // will execute after the first HIGH (fifo) + executor.submit(new CallableJob(7, Priority.LANGUID, results, finishedLatch)); + executor.submit(new Job(5, Priority.LOW, results, finishedLatch)); + executor.submit(new CallableJob(2, Priority.HIGH, results, finishedLatch)); + executor.submit(new Job(6, Priority.LOW, results, finishedLatch)); // will execute after the first LOW (fifo) + executor.submit(new CallableJob(1, Priority.URGENT, results, finishedLatch)); + executor.submit(new Job(4, Priority.NORMAL, results, finishedLatch)); + executor.submit(new CallableJob(3, Priority.HIGH, results, finishedLatch)); // will execute after the first HIGH (fifo) + executor.submit(new Job(0, Priority.IMMEDIATE, results, finishedLatch)); awaitingLatch.countDown(); finishedLatch.await(); - assertThat(results.size(), equalTo(7)); + assertThat(results.size(), equalTo(8)); assertThat(results.get(0), equalTo(0)); assertThat(results.get(1), equalTo(1)); assertThat(results.get(2), equalTo(2)); @@ -158,6 +171,7 @@ public class PrioritizedExecutorsTests extends ElasticsearchTestCase { assertThat(results.get(4), equalTo(4)); assertThat(results.get(5), equalTo(5)); assertThat(results.get(6), equalTo(6)); + assertThat(results.get(7), equalTo(7)); } @Test