add ability to associate a timeout with a priority executor

enhancement that can be used later to timeout tasks that are pending, also added the ability to get the pending task list from the executor
This commit is contained in:
Shay Banon 2013-07-22 09:16:47 +02:00
parent e8ff7de6b8
commit e2961c0c7a
6 changed files with 387 additions and 165 deletions

View File

@ -0,0 +1,41 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster;
import org.elasticsearch.common.unit.TimeValue;
/**
* An extension interface to {@link org.elasticsearch.cluster.ClusterStateUpdateTask} that allows to associate
* a timeout.
*/
public interface TimeoutClusterStateUpdateTask extends ClusterStateUpdateTask {
/**
* If the cluster state update task wasn't processed by the provided timeout, call
* {@link #onTimeout(String)}.
*/
TimeValue timeout();
/**
* Called when the cluster sate update task wasn't processed by the provided
* {@link #timeout()}.
*/
void onTimeout(String source);
}

View File

@ -36,6 +36,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.PrioritizedRunnable; import org.elasticsearch.common.util.concurrent.PrioritizedRunnable;
import org.elasticsearch.discovery.Discovery; import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.discovery.DiscoveryService; import org.elasticsearch.discovery.DiscoveryService;
@ -69,7 +70,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
private final TimeValue reconnectInterval; private final TimeValue reconnectInterval;
private volatile ExecutorService updateTasksExecutor; private volatile PrioritizedEsThreadPoolExecutor updateTasksExecutor;
private final List<ClusterStateListener> priorityClusterStateListeners = new CopyOnWriteArrayList<ClusterStateListener>(); private final List<ClusterStateListener> priorityClusterStateListeners = new CopyOnWriteArrayList<ClusterStateListener>();
private final List<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<ClusterStateListener>(); private final List<ClusterStateListener> clusterStateListeners = new CopyOnWriteArrayList<ClusterStateListener>();
@ -195,7 +196,7 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
return; return;
} }
// call the post added notification on the same event thread // call the post added notification on the same event thread
updateTasksExecutor.execute(new Runnable() { updateTasksExecutor.execute(new PrioritizedRunnable(Priority.HIGH) {
@Override @Override
public void run() { public void run() {
NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout); NotifyTimeout notifyTimeout = new NotifyTimeout(listener, timeout);
@ -215,7 +216,36 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
if (!lifecycle.started()) { if (!lifecycle.started()) {
return; return;
} }
updateTasksExecutor.execute(new PrioritizedRunnable(priority) { final UpdateTask task = new UpdateTask(source, priority, updateTask);
if (updateTask instanceof TimeoutClusterStateUpdateTask) {
final TimeoutClusterStateUpdateTask timeoutUpdateTask = (TimeoutClusterStateUpdateTask) updateTask;
updateTasksExecutor.execute(task, threadPool.scheduler(), timeoutUpdateTask.timeout(), new Runnable() {
@Override
public void run() {
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
timeoutUpdateTask.onTimeout(task.source);
}
});
}
});
} else {
updateTasksExecutor.execute(task);
}
}
class UpdateTask extends PrioritizedRunnable {
public final String source;
public final ClusterStateUpdateTask updateTask;
UpdateTask(String source, Priority priority, ClusterStateUpdateTask updateTask) {
super(priority);
this.source = source;
this.updateTask = updateTask;
}
@Override @Override
public void run() { public void run() {
if (!lifecycle.started()) { if (!lifecycle.started()) {
@ -349,7 +379,6 @@ public class InternalClusterService extends AbstractLifecycleComponent<ClusterSe
logger.warn(sb.toString(), e); logger.warn(sb.toString(), e);
} }
} }
});
} }
class NotifyTimeout implements Runnable { class NotifyTimeout implements Runnable {

View File

@ -31,7 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger;
*/ */
public class EsExecutors { public class EsExecutors {
public static EsThreadPoolExecutor newSinglePrioritizingThreadExecutor(ThreadFactory threadFactory) { public static PrioritizedEsThreadPoolExecutor newSinglePrioritizingThreadExecutor(ThreadFactory threadFactory) {
return new PrioritizedEsThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory); return new PrioritizedEsThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, threadFactory);
} }

View File

@ -20,6 +20,7 @@
package org.elasticsearch.common.util.concurrent; package org.elasticsearch.common.util.concurrent;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;
import java.util.concurrent.*; import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -28,12 +29,12 @@ import java.util.concurrent.atomic.AtomicLong;
* A prioritizing executor which uses a priority queue as a work queue. The jobs that will be submitted will be treated * A prioritizing executor which uses a priority queue as a work queue. The jobs that will be submitted will be treated
* as {@link PrioritizedRunnable} and/or {@link PrioritizedCallable}, those tasks that are not instances of these two will * as {@link PrioritizedRunnable} and/or {@link PrioritizedCallable}, those tasks that are not instances of these two will
* be wrapped and assign a default {@link Priority#NORMAL} priority. * be wrapped and assign a default {@link Priority#NORMAL} priority.
* * <p/>
* Note, if two tasks have the same priority, the first to arrive will be executed first (FIFO style). * Note, if two tasks have the same priority, the first to arrive will be executed first (FIFO style).
*/ */
public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor { public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
private AtomicLong tieBreaker = new AtomicLong(Long.MIN_VALUE); private AtomicLong insertionOrder = new AtomicLong();
public PrioritizedEsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) { public PrioritizedEsThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<Runnable>(), threadFactory); super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<Runnable>(), threadFactory);
@ -47,14 +48,49 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<Runnable>(initialWorkQueuSize), threadFactory, handler); super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new PriorityBlockingQueue<Runnable>(initialWorkQueuSize), threadFactory, handler);
} }
public Pending[] getPending() {
Object[] objects = getQueue().toArray();
Pending[] infos = new Pending[objects.length];
for (int i = 0; i < objects.length; i++) {
Object obj = objects[i];
if (obj instanceof TieBreakingPrioritizedRunnable) {
TieBreakingPrioritizedRunnable t = (TieBreakingPrioritizedRunnable) obj;
infos[i] = new Pending(t.runnable, t.priority(), t.insertionOrder);
} else if (obj instanceof PrioritizedFutureTask) {
PrioritizedFutureTask t = (PrioritizedFutureTask) obj;
infos[i] = new Pending(t.task, t.priority, t.insertionOrder);
}
}
return infos;
}
public void execute(Runnable command, final ScheduledExecutorService timer, final TimeValue timeout, final Runnable timeoutCallback) {
if (command instanceof PrioritizedRunnable) {
command = new TieBreakingPrioritizedRunnable((PrioritizedRunnable) command, insertionOrder.incrementAndGet());
} else if (!(command instanceof PrioritizedFutureTask)) { // it might be a callable wrapper...
command = new TieBreakingPrioritizedRunnable(command, Priority.NORMAL, insertionOrder.incrementAndGet());
}
if (timeout.nanos() >= 0) {
final Runnable fCommand = command;
timer.schedule(new Runnable() {
@Override
public void run() {
boolean removed = getQueue().remove(fCommand);
if (removed) {
timeoutCallback.run();
}
}
}, timeout.nanos(), TimeUnit.NANOSECONDS);
}
super.execute(command);
}
@Override @Override
public void execute(Runnable command) { public void execute(Runnable command) {
if (command instanceof PrioritizedRunnable) { if (command instanceof PrioritizedRunnable) {
super.execute(new TieBreakingPrioritizedRunnable((PrioritizedRunnable) command, tieBreaker.incrementAndGet())); command = new TieBreakingPrioritizedRunnable((PrioritizedRunnable) command, insertionOrder.incrementAndGet());
return; } else if (!(command instanceof PrioritizedFutureTask)) { // it might be a callable wrapper...
} command = new TieBreakingPrioritizedRunnable(command, Priority.NORMAL, insertionOrder.incrementAndGet());
if (!(command instanceof Comparable)) {
command = new TieBreakingPrioritizedRunnable(command, Priority.NORMAL, tieBreaker.incrementAndGet());
} }
super.execute(command); super.execute(command);
} }
@ -64,7 +100,7 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
if (!(runnable instanceof PrioritizedRunnable)) { if (!(runnable instanceof PrioritizedRunnable)) {
runnable = PrioritizedRunnable.wrap(runnable, Priority.NORMAL); runnable = PrioritizedRunnable.wrap(runnable, Priority.NORMAL);
} }
return new PrioritizedFutureTask<T>((PrioritizedRunnable) runnable, value, tieBreaker.incrementAndGet()); return new PrioritizedFutureTask<T>((PrioritizedRunnable) runnable, value, insertionOrder.incrementAndGet());
} }
@Override @Override
@ -72,22 +108,34 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
if (!(callable instanceof PrioritizedCallable)) { if (!(callable instanceof PrioritizedCallable)) {
callable = PrioritizedCallable.wrap(callable, Priority.NORMAL); callable = PrioritizedCallable.wrap(callable, Priority.NORMAL);
} }
return new PrioritizedFutureTask<T>((PrioritizedCallable<T>) callable, tieBreaker.incrementAndGet()); return new PrioritizedFutureTask<T>((PrioritizedCallable<T>) callable, insertionOrder.incrementAndGet());
}
public static class Pending {
public final Object task;
public final Priority priority;
public final long insertionOrder;
public Pending(Object task, Priority priority, long insertionOrder) {
this.task = task;
this.priority = priority;
this.insertionOrder = insertionOrder;
}
} }
static class TieBreakingPrioritizedRunnable extends PrioritizedRunnable { static class TieBreakingPrioritizedRunnable extends PrioritizedRunnable {
private final Runnable runnable; final Runnable runnable;
private final long tieBreaker; final long insertionOrder;
TieBreakingPrioritizedRunnable(PrioritizedRunnable runnable, long tieBreaker) { TieBreakingPrioritizedRunnable(PrioritizedRunnable runnable, long insertionOrder) {
this(runnable, runnable.priority(), tieBreaker); this(runnable, runnable.priority(), insertionOrder);
} }
TieBreakingPrioritizedRunnable(Runnable runnable, Priority priority, long tieBreaker) { TieBreakingPrioritizedRunnable(Runnable runnable, Priority priority, long insertionOrder) {
super(priority); super(priority);
this.runnable = runnable; this.runnable = runnable;
this.tieBreaker = tieBreaker; this.insertionOrder = insertionOrder;
} }
@Override @Override
@ -101,28 +149,28 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
if (res != 0 || !(pr instanceof TieBreakingPrioritizedRunnable)) { if (res != 0 || !(pr instanceof TieBreakingPrioritizedRunnable)) {
return res; return res;
} }
return tieBreaker < ((TieBreakingPrioritizedRunnable)pr).tieBreaker ? -1 : 1; return insertionOrder < ((TieBreakingPrioritizedRunnable) pr).insertionOrder ? -1 : 1;
} }
} }
/**
*
*/
static class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask> { static class PrioritizedFutureTask<T> extends FutureTask<T> implements Comparable<PrioritizedFutureTask> {
private final Priority priority; final Object task;
private final long tieBreaker; final Priority priority;
final long insertionOrder;
public PrioritizedFutureTask(PrioritizedRunnable runnable, T value, long tieBreaker) { public PrioritizedFutureTask(PrioritizedRunnable runnable, T value, long insertionOrder) {
super(runnable, value); super(runnable, value);
this.task = runnable;
this.priority = runnable.priority(); this.priority = runnable.priority();
this.tieBreaker = tieBreaker; this.insertionOrder = insertionOrder;
} }
public PrioritizedFutureTask(PrioritizedCallable<T> callable, long tieBreaker) { public PrioritizedFutureTask(PrioritizedCallable<T> callable, long insertionOrder) {
super(callable); super(callable);
this.task = callable;
this.priority = callable.priority(); this.priority = callable.priority();
this.tieBreaker = tieBreaker; this.insertionOrder = insertionOrder;
} }
@Override @Override
@ -131,7 +179,7 @@ public class PrioritizedEsThreadPoolExecutor extends EsThreadPoolExecutor {
if (res != 0) { if (res != 0) {
return res; return res;
} }
return tieBreaker < pft.tieBreaker ? -1 : 1; return insertionOrder < pft.insertionOrder ? -1 : 1;
} }
} }
} }

View File

@ -21,14 +21,14 @@ package org.elasticsearch.test.integration.cluster;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.component.LifecycleComponent; import org.elasticsearch.common.component.LifecycleComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.inject.Singleton; import org.elasticsearch.common.inject.Singleton;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.internal.InternalNode; import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.plugins.AbstractPlugin; import org.elasticsearch.plugins.AbstractPlugin;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
@ -38,6 +38,9 @@ import org.testng.annotations.Test;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
@ -46,7 +49,7 @@ import static org.hamcrest.Matchers.*;
/** /**
* *
*/ */
public class LocalNodeMasterListenerTests extends AbstractZenNodesTests { public class ClusterServiceTests extends AbstractZenNodesTests {
@AfterMethod @AfterMethod
public void closeNodes() { public void closeNodes() {
@ -54,8 +57,56 @@ public class LocalNodeMasterListenerTests extends AbstractZenNodesTests {
} }
@Test @Test
public void testListenerCallbacks() throws Exception { public void testTimeoutUpdateTask() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.zen.minimum_master_nodes", 1)
.put("discovery.zen.ping_timeout", "200ms")
.put("discovery.initial_state_timeout", "500ms")
.build();
InternalNode node1 = (InternalNode) startNode("node1", settings);
ClusterService clusterService1 = node1.injector().getInstance(ClusterService.class);
final CountDownLatch block = new CountDownLatch(1);
clusterService1.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
try {
block.await();
} catch (InterruptedException e) {
assert false;
}
return currentState;
}
});
final CountDownLatch timedOut = new CountDownLatch(1);
final AtomicBoolean executeCalled = new AtomicBoolean();
clusterService1.submitStateUpdateTask("test2", new TimeoutClusterStateUpdateTask() {
@Override
public TimeValue timeout() {
return TimeValue.timeValueMillis(2);
}
@Override
public void onTimeout(String source) {
timedOut.countDown();
}
@Override
public ClusterState execute(ClusterState currentState) {
executeCalled.set(true);
return currentState;
}
});
assertThat(timedOut.await(500, TimeUnit.MILLISECONDS), equalTo(true));
block.countDown();
Thread.sleep(100); // sleep a bit to double check that execute on the timed out update task is not called...
assertThat(executeCalled.get(), equalTo(false));
}
@Test
public void testListenerCallbacks() throws Exception {
Settings settings = settingsBuilder() Settings settings = settingsBuilder()
.put("discovery.zen.minimum_master_nodes", 1) .put("discovery.zen.minimum_master_nodes", 1)
.put("discovery.zen.ping_timeout", "200ms") .put("discovery.zen.ping_timeout", "200ms")

View File

@ -20,17 +20,17 @@
package org.elasticsearch.test.unit.common.util.concurrent; package org.elasticsearch.test.unit.common.util.concurrent;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.common.util.concurrent.PrioritizedCallable; import org.elasticsearch.common.util.concurrent.PrioritizedCallable;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.common.util.concurrent.PrioritizedRunnable; import org.elasticsearch.common.util.concurrent.PrioritizedRunnable;
import org.testng.annotations.Test; import org.testng.annotations.Test;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.*;
import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
@ -164,6 +164,59 @@ public class PrioritizedExecutorsTests {
assertThat(results.get(6), equalTo(6)); assertThat(results.get(6), equalTo(6));
} }
@Test
public void testTimeout() throws Exception {
ScheduledExecutorService timer = Executors.newSingleThreadScheduledExecutor();
PrioritizedEsThreadPoolExecutor executor = EsExecutors.newSinglePrioritizingThreadExecutor(Executors.defaultThreadFactory());
final CountDownLatch block = new CountDownLatch(1);
executor.execute(new Runnable() {
@Override
public void run() {
try {
block.await();
} catch (InterruptedException e) {
assert false;
}
}
@Override
public String toString() {
return "the blocking";
}
});
final AtomicBoolean executeCalled = new AtomicBoolean();
final CountDownLatch timedOut = new CountDownLatch(1);
executor.execute(new Runnable() {
@Override
public void run() {
executeCalled.set(true);
}
@Override
public String toString() {
return "the waiting";
}
}, timer, TimeValue.timeValueMillis(5), new Runnable() {
@Override
public void run() {
timedOut.countDown();
}
}
);
PrioritizedEsThreadPoolExecutor.Pending[] pending = executor.getPending();
assertThat(pending.length, equalTo(1));
assertThat(pending[0].task.toString(), equalTo("the waiting"));
assertThat(timedOut.await(500, TimeUnit.MILLISECONDS), equalTo(true));
block.countDown();
Thread.sleep(100); // sleep a bit to double check that execute on the timed out update task is not called...
assertThat(executeCalled.get(), equalTo(false));
timer.shutdownNow();
executor.shutdownNow();
}
static class AwaitingJob extends PrioritizedRunnable { static class AwaitingJob extends PrioritizedRunnable {