diff --git a/src/main/java/org/elasticsearch/cluster/TimeoutClusterStateUpdateTask.java b/src/main/java/org/elasticsearch/cluster/TimeoutClusterStateUpdateTask.java new file mode 100644 index 00000000000..4b3fe131d3b --- /dev/null +++ b/src/main/java/org/elasticsearch/cluster/TimeoutClusterStateUpdateTask.java @@ -0,0 +1,41 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster; + +import org.elasticsearch.common.unit.TimeValue; + +/** + * An extension interface to {@link org.elasticsearch.cluster.ClusterStateUpdateTask} that allows to associate + * a timeout. + */ +public interface TimeoutClusterStateUpdateTask extends ClusterStateUpdateTask { + + /** + * If the cluster state update task wasn't processed by the provided timeout, call + * {@link #onTimeout(String)}. + */ + TimeValue timeout(); + + /** + * Called when the cluster sate update task wasn't processed by the provided + * {@link #timeout()}. + */ + void onTimeout(String source); +} diff --git a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java index a5a4dd1fb4d..1efe7ab290e 100644 --- a/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java +++ b/src/main/java/org/elasticsearch/cluster/service/InternalClusterService.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.PrioritizedRunnable; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.DiscoveryService; @@ -69,7 +70,7 @@ public class InternalClusterService extends AbstractLifecycleComponent priorityClusterStateListeners = new CopyOnWriteArrayList(); private final List clusterStateListeners = new CopyOnWriteArrayList(); @@ -195,7 +196,7 @@ public class InternalClusterService extends AbstractLifecycleComponent 0) { - logger.info("{}, reason: {}", summary, source); - } - } - - // TODO, do this in parallel (and wait) - for (DiscoveryNode node : nodesDelta.addedNodes()) { - if (!nodeRequiresConnection(node)) { - continue; - } - try { - transportService.connectToNode(node); - } catch (Exception e) { - // the fault detection will detect it as failed as well - logger.warn("failed to connect to node [" + node + "]", e); - } - } - - // if we are the master, publish the new state to all nodes - // we publish here before we send a notification to all the listeners, since if it fails - // we don't want to notify - if (newClusterState.nodes().localNodeMaster()) { - discoveryService.publish(newClusterState); - } - - // update the current cluster state - clusterState = newClusterState; - - for (ClusterStateListener listener : priorityClusterStateListeners) { - listener.clusterChanged(clusterChangedEvent); - } - for (ClusterStateListener listener : clusterStateListeners) { - listener.clusterChanged(clusterChangedEvent); - } - for (ClusterStateListener listener : lastClusterStateListeners) { - listener.clusterChanged(clusterChangedEvent); - } - - if (!nodesDelta.removedNodes().isEmpty()) { - threadPool.generic().execute(new Runnable() { - @Override - public void run() { - for (DiscoveryNode node : nodesDelta.removedNodes()) { - transportService.disconnectFromNode(node); - } - } - }); - } - - - if (updateTask instanceof ProcessedClusterStateUpdateTask) { - ((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(newClusterState); - } - - logger.debug("processing [{}]: done applying updated cluster_state", source); - } catch (Exception e) { - StringBuilder sb = new StringBuilder("failed to apply updated cluster state:\nversion [").append(newClusterState.version()).append("], source [").append(source).append("]\n"); + if (logger.isTraceEnabled()) { + StringBuilder sb = new StringBuilder("cluster state updated:\nversion [").append(newClusterState.version()).append("], source [").append(source).append("]\n"); sb.append(newClusterState.nodes().prettyPrint()); sb.append(newClusterState.routingTable().prettyPrint()); sb.append(newClusterState.readOnlyRoutingNodes().prettyPrint()); - logger.warn(sb.toString(), e); + logger.trace(sb.toString()); + } else if (logger.isDebugEnabled()) { + logger.debug("cluster state updated, version [{}], source [{}]", newClusterState.version(), source); } + + ClusterChangedEvent clusterChangedEvent = new ClusterChangedEvent(source, newClusterState, previousClusterState); + // new cluster state, notify all listeners + final DiscoveryNodes.Delta nodesDelta = clusterChangedEvent.nodesDelta(); + if (nodesDelta.hasChanges() && logger.isInfoEnabled()) { + String summary = nodesDelta.shortSummary(); + if (summary.length() > 0) { + logger.info("{}, reason: {}", summary, source); + } + } + + // TODO, do this in parallel (and wait) + for (DiscoveryNode node : nodesDelta.addedNodes()) { + if (!nodeRequiresConnection(node)) { + continue; + } + try { + transportService.connectToNode(node); + } catch (Exception e) { + // the fault detection will detect it as failed as well + logger.warn("failed to connect to node [" + node + "]", e); + } + } + + // if we are the master, publish the new state to all nodes + // we publish here before we send a notification to all the listeners, since if it fails + // we don't want to notify + if (newClusterState.nodes().localNodeMaster()) { + discoveryService.publish(newClusterState); + } + + // update the current cluster state + clusterState = newClusterState; + + for (ClusterStateListener listener : priorityClusterStateListeners) { + listener.clusterChanged(clusterChangedEvent); + } + for (ClusterStateListener listener : clusterStateListeners) { + listener.clusterChanged(clusterChangedEvent); + } + for (ClusterStateListener listener : lastClusterStateListeners) { + listener.clusterChanged(clusterChangedEvent); + } + + if (!nodesDelta.removedNodes().isEmpty()) { + threadPool.generic().execute(new Runnable() { + @Override + public void run() { + for (DiscoveryNode node : nodesDelta.removedNodes()) { + transportService.disconnectFromNode(node); + } + } + }); + } + + + if (updateTask instanceof ProcessedClusterStateUpdateTask) { + ((ProcessedClusterStateUpdateTask) updateTask).clusterStateProcessed(newClusterState); + } + + logger.debug("processing [{}]: done applying updated cluster_state", source); + } catch (Exception e) { + StringBuilder sb = new StringBuilder("failed to apply updated cluster state:\nversion [").append(newClusterState.version()).append("], source [").append(source).append("]\n"); + sb.append(newClusterState.nodes().prettyPrint()); + sb.append(newClusterState.routingTable().prettyPrint()); + sb.append(newClusterState.readOnlyRoutingNodes().prettyPrint()); + logger.warn(sb.toString(), e); } - }); + } } class NotifyTimeout implements Runnable { diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 6138d6d129c..8c4f7c257e0 100644 --- a/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class EsExecutors { - public static EsThreadPoolExecutor newSinglePrioritizingThreadExecutor(ThreadFactory threadFactory) { + public static PrioritizedEsThreadPoolExecutor newSinglePrioritizingThreadExecutor(ThreadFactory threadFactory) { return new PrioritizedEsThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory); } diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java b/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java index 546875fd2c7..8fa74d714ba 100644 --- a/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java +++ b/src/main/java/org/elasticsearch/common/util/concurrent/PrioritizedEsThreadPoolExecutor.java @@ -20,6 +20,7 @@ package org.elasticsearch.common.util.concurrent; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.unit.TimeValue; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; @@ -28,12 +29,12 @@ import java.util.concurrent.atomic.AtomicLong; * A prioritizing executor which uses a priority queue as a work queue. The jobs that will be submitted will be treated * as {@link PrioritizedRunnable} and/or {@link PrioritizedCallable}, those tasks that are not instances of these two will * be wrapped and assign a default {@link Priority#NORMAL} priority. - * + *

* Note, if two tasks have the same priority, the first to arrive will be executed first (FIFO style). */ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { - private AtomicLong tieBreaker = new AtomicLong(Long.MIN_VALUE); + private AtomicLong insertionOrder = new AtomicLong(); public PrioritizedEsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue(), threadFactory); @@ -47,14 +48,49 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue(initialWorkQueuSize), threadFactory, handler); } + public Pending[] getPending() { + Object[] objects = getQueue().toArray(); + Pending[] infos = new Pending[objects.length]; + for (int i = 0; i < objects.length; i++) { + Object obj = objects[i]; + if (obj instanceof TieBreakingPrioritizedRunnable) { + TieBreakingPrioritizedRunnable t = (TieBreakingPrioritizedRunnable) obj; + infos[i] = new Pending(t.runnable, t.priority(), t.insertionOrder); + } else if (obj instanceof PrioritizedFutureTask) { + PrioritizedFutureTask t = (PrioritizedFutureTask) obj; + infos[i] = new Pending(t.task, t.priority, t.insertionOrder); + } + } + return infos; + } + + public void execute(Runnable command, final ScheduledExecutorService timer, final TimeValue timeout, final Runnable timeoutCallback) { + if (command instanceof PrioritizedRunnable) { + command = new TieBreakingPrioritizedRunnable((PrioritizedRunnable) command, insertionOrder.incrementAndGet()); + } else if (!(command instanceof PrioritizedFutureTask)) { // it might be a callable wrapper... + command = new TieBreakingPrioritizedRunnable(command, Priority.NORMAL, insertionOrder.incrementAndGet()); + } + if (timeout.nanos() >= 0) { + final Runnable fCommand = command; + timer.schedule(new Runnable() { + @Override + public void run() { + boolean removed = getQueue().remove(fCommand); + if (removed) { + timeoutCallback.run(); + } + } + }, timeout.nanos(), TimeUnit.NANOSECONDS); + } + super.execute(command); + } + @Override public void execute(Runnable command) { if (command instanceof PrioritizedRunnable) { - super.execute(new TieBreakingPrioritizedRunnable((PrioritizedRunnable) command, tieBreaker.incrementAndGet())); - return; - } - if (!(command instanceof Comparable)) { - command = new TieBreakingPrioritizedRunnable(command, Priority.NORMAL, tieBreaker.incrementAndGet()); + command = new TieBreakingPrioritizedRunnable((PrioritizedRunnable) command, insertionOrder.incrementAndGet()); + } else if (!(command instanceof PrioritizedFutureTask)) { // it might be a callable wrapper... + command = new TieBreakingPrioritizedRunnable(command, Priority.NORMAL, insertionOrder.incrementAndGet()); } super.execute(command); } @@ -64,7 +100,7 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { if (!(runnable instanceof PrioritizedRunnable)) { runnable = PrioritizedRunnable.wrap(runnable, Priority.NORMAL); } - return new PrioritizedFutureTask((PrioritizedRunnable) runnable, value, tieBreaker.incrementAndGet()); + return new PrioritizedFutureTask((PrioritizedRunnable) runnable, value, insertionOrder.incrementAndGet()); } @Override @@ -72,22 +108,34 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { if (!(callable instanceof PrioritizedCallable)) { callable = PrioritizedCallable.wrap(callable, Priority.NORMAL); } - return new PrioritizedFutureTask((PrioritizedCallable) callable, tieBreaker.incrementAndGet()); + return new PrioritizedFutureTask((PrioritizedCallable) callable, insertionOrder.incrementAndGet()); + } + + public static class Pending { + public final Object task; + public final Priority priority; + public final long insertionOrder; + + public Pending(Object task, Priority priority, long insertionOrder) { + this.task = task; + this.priority = priority; + this.insertionOrder = insertionOrder; + } } static class TieBreakingPrioritizedRunnable extends PrioritizedRunnable { - private final Runnable runnable; - private final long tieBreaker; + final Runnable runnable; + final long insertionOrder; - TieBreakingPrioritizedRunnable(PrioritizedRunnable runnable, long tieBreaker) { - this(runnable, runnable.priority(), tieBreaker); + TieBreakingPrioritizedRunnable(PrioritizedRunnable runnable, long insertionOrder) { + this(runnable, runnable.priority(), insertionOrder); } - TieBreakingPrioritizedRunnable(Runnable runnable, Priority priority, long tieBreaker) { + TieBreakingPrioritizedRunnable(Runnable runnable, Priority priority, long insertionOrder) { super(priority); this.runnable = runnable; - this.tieBreaker = tieBreaker; + this.insertionOrder = insertionOrder; } @Override @@ -101,28 +149,28 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { if (res != 0 || !(pr instanceof TieBreakingPrioritizedRunnable)) { return res; } - return tieBreaker < ((TieBreakingPrioritizedRunnable)pr).tieBreaker ? -1 : 1; + return insertionOrder < ((TieBreakingPrioritizedRunnable) pr).insertionOrder ? -1 : 1; } } - /** - * - */ static class PrioritizedFutureTask extends FutureTask implements Comparable { - private final Priority priority; - private final long tieBreaker; + final Object task; + final Priority priority; + final long insertionOrder; - public PrioritizedFutureTask(PrioritizedRunnable runnable, T value, long tieBreaker) { + public PrioritizedFutureTask(PrioritizedRunnable runnable, T value, long insertionOrder) { super(runnable, value); + this.task = runnable; this.priority = runnable.priority(); - this.tieBreaker = tieBreaker; + this.insertionOrder = insertionOrder; } - public PrioritizedFutureTask(PrioritizedCallable callable, long tieBreaker) { + public PrioritizedFutureTask(PrioritizedCallable callable, long insertionOrder) { super(callable); + this.task = callable; this.priority = callable.priority(); - this.tieBreaker = tieBreaker; + this.insertionOrder = insertionOrder; } @Override @@ -131,7 +179,7 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { if (res != 0) { return res; } - return tieBreaker < pft.tieBreaker ? -1 : 1; + return insertionOrder < pft.insertionOrder ? -1 : 1; } } } diff --git a/src/test/java/org/elasticsearch/test/integration/cluster/LocalNodeMasterListenerTests.java b/src/test/java/org/elasticsearch/test/integration/cluster/ClusterServiceTests.java similarity index 77% rename from src/test/java/org/elasticsearch/test/integration/cluster/LocalNodeMasterListenerTests.java rename to src/test/java/org/elasticsearch/test/integration/cluster/ClusterServiceTests.java index 3c2a351c443..859ba30585c 100644 --- a/src/test/java/org/elasticsearch/test/integration/cluster/LocalNodeMasterListenerTests.java +++ b/src/test/java/org/elasticsearch/test/integration/cluster/ClusterServiceTests.java @@ -21,14 +21,14 @@ package org.elasticsearch.test.integration.cluster; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.cluster.ClusterService; -import org.elasticsearch.cluster.LocalNodeMasterListener; +import org.elasticsearch.cluster.*; import org.elasticsearch.common.Priority; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Singleton; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.node.internal.InternalNode; import org.elasticsearch.plugins.AbstractPlugin; import org.elasticsearch.threadpool.ThreadPool; @@ -38,6 +38,9 @@ import org.testng.annotations.Test; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.hamcrest.MatcherAssert.assertThat; @@ -46,7 +49,7 @@ import static org.hamcrest.Matchers.*; /** * */ -public class LocalNodeMasterListenerTests extends AbstractZenNodesTests { +public class ClusterServiceTests extends AbstractZenNodesTests { @AfterMethod public void closeNodes() { @@ -54,8 +57,56 @@ public class LocalNodeMasterListenerTests extends AbstractZenNodesTests { } @Test - public void testListenerCallbacks() throws Exception { + public void testTimeoutUpdateTask() throws Exception { + Settings settings = settingsBuilder() + .put("discovery.zen.minimum_master_nodes", 1) + .put("discovery.zen.ping_timeout", "200ms") + .put("discovery.initial_state_timeout", "500ms") + .build(); + InternalNode node1 = (InternalNode) startNode("node1", settings); + ClusterService clusterService1 = node1.injector().getInstance(ClusterService.class); + final CountDownLatch block = new CountDownLatch(1); + clusterService1.submitStateUpdateTask("test1", new ClusterStateUpdateTask() { + @Override + public ClusterState execute(ClusterState currentState) { + try { + block.await(); + } catch (InterruptedException e) { + assert false; + } + return currentState; + } + }); + + final CountDownLatch timedOut = new CountDownLatch(1); + final AtomicBoolean executeCalled = new AtomicBoolean(); + clusterService1.submitStateUpdateTask("test2", new TimeoutClusterStateUpdateTask() { + @Override + public TimeValue timeout() { + return TimeValue.timeValueMillis(2); + } + + @Override + public void onTimeout(String source) { + timedOut.countDown(); + } + + @Override + public ClusterState execute(ClusterState currentState) { + executeCalled.set(true); + return currentState; + } + }); + + assertThat(timedOut.await(500, TimeUnit.MILLISECONDS), equalTo(true)); + block.countDown(); + Thread.sleep(100); // sleep a bit to double check that execute on the timed out update task is not called... + assertThat(executeCalled.get(), equalTo(false)); + } + + @Test + public void testListenerCallbacks() throws Exception { Settings settings = settingsBuilder() .put("discovery.zen.minimum_master_nodes", 1) .put("discovery.zen.ping_timeout", "200ms") diff --git a/src/test/java/org/elasticsearch/test/unit/common/util/concurrent/PrioritizedExecutorsTests.java b/src/test/java/org/elasticsearch/test/unit/common/util/concurrent/PrioritizedExecutorsTests.java index c22920a81bb..d4924ad99eb 100644 --- a/src/test/java/org/elasticsearch/test/unit/common/util/concurrent/PrioritizedExecutorsTests.java +++ b/src/test/java/org/elasticsearch/test/unit/common/util/concurrent/PrioritizedExecutorsTests.java @@ -20,17 +20,17 @@ package org.elasticsearch.test.unit.common.util.concurrent; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.PrioritizedCallable; +import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.PrioritizedRunnable; import org.testng.annotations.Test; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -61,7 +61,7 @@ public class PrioritizedExecutorsTests { ExecutorService executor = EsExecutors.newSinglePrioritizingThreadExecutor(Executors.defaultThreadFactory()); List results = new ArrayList(7); CountDownLatch awaitingLatch = new CountDownLatch(1); - CountDownLatch finishedLatch = new CountDownLatch(7); + CountDownLatch finishedLatch = new CountDownLatch(7); executor.submit(new AwaitingJob(awaitingLatch)); executor.submit(new Job(6, Priority.LANGUID, results, finishedLatch)); executor.submit(new Job(4, Priority.LOW, results, finishedLatch)); @@ -88,7 +88,7 @@ public class PrioritizedExecutorsTests { ExecutorService executor = EsExecutors.newSinglePrioritizingThreadExecutor(Executors.defaultThreadFactory()); List results = new ArrayList(7); CountDownLatch awaitingLatch = new CountDownLatch(1); - CountDownLatch finishedLatch = new CountDownLatch(7); + CountDownLatch finishedLatch = new CountDownLatch(7); executor.execute(new AwaitingJob(awaitingLatch)); executor.execute(new Job(6, Priority.LANGUID, results, finishedLatch)); executor.execute(new Job(4, Priority.LOW, results, finishedLatch)); @@ -115,7 +115,7 @@ public class PrioritizedExecutorsTests { ExecutorService executor = EsExecutors.newSinglePrioritizingThreadExecutor(Executors.defaultThreadFactory()); List results = new ArrayList(7); CountDownLatch awaitingLatch = new CountDownLatch(1); - CountDownLatch finishedLatch = new CountDownLatch(7); + CountDownLatch finishedLatch = new CountDownLatch(7); executor.submit(new AwaitingJob(awaitingLatch)); executor.submit(new CallableJob(6, Priority.LANGUID, results, finishedLatch)); executor.submit(new CallableJob(4, Priority.LOW, results, finishedLatch)); @@ -142,7 +142,7 @@ public class PrioritizedExecutorsTests { ExecutorService executor = EsExecutors.newSinglePrioritizingThreadExecutor(Executors.defaultThreadFactory()); List results = new ArrayList(7); CountDownLatch awaitingLatch = new CountDownLatch(1); - CountDownLatch finishedLatch = new CountDownLatch(7); + CountDownLatch finishedLatch = new CountDownLatch(7); executor.submit(new AwaitingJob(awaitingLatch)); executor.submit(new CallableJob(6, Priority.LANGUID, results, finishedLatch)); executor.submit(new Job(4, Priority.LOW, results, finishedLatch)); @@ -164,6 +164,59 @@ public class PrioritizedExecutorsTests { assertThat(results.get(6), equalTo(6)); } + @Test + public void testTimeout() throws Exception { + ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor(); + PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizingThreadExecutor(Executors.defaultThreadFactory()); + final CountDownLatch block = new CountDownLatch(1); + executor.execute(new Runnable() { + @Override + public void run() { + try { + block.await(); + } catch (InterruptedException e) { + assert false; + } + } + + @Override + public String toString() { + return "the blocking"; + } + }); + + final AtomicBoolean executeCalled = new AtomicBoolean(); + final CountDownLatch timedOut = new CountDownLatch(1); + executor.execute(new Runnable() { + @Override + public void run() { + executeCalled.set(true); + } + + @Override + public String toString() { + return "the waiting"; + } + }, timer, TimeValue.timeValueMillis(5), new Runnable() { + @Override + public void run() { + timedOut.countDown(); + } + } + ); + + PrioritizedEsThreadPoolExecutor.Pending[] pending = executor.getPending(); + assertThat(pending.length, equalTo(1)); + assertThat(pending[0].task.toString(), equalTo("the waiting")); + + assertThat(timedOut.await(500, TimeUnit.MILLISECONDS), equalTo(true)); + block.countDown(); + Thread.sleep(100); // sleep a bit to double check that execute on the timed out update task is not called... + assertThat(executeCalled.get(), equalTo(false)); + + timer.shutdownNow(); + executor.shutdownNow(); + } static class AwaitingJob extends PrioritizedRunnable {