mirror of https://github.com/apache/activemq.git
Applied patch from AMQ-1686 thanks Gary
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@651316 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
44e0eaf1f4
commit
d8be50b617
|
@ -127,24 +127,23 @@ class PooledTaskRunner implements TaskRunner {
|
|||
} finally {
|
||||
synchronized( runable ) {
|
||||
iterating = false;
|
||||
}
|
||||
}
|
||||
|
||||
synchronized (runable) {
|
||||
if (shutdown) {
|
||||
queued = false;
|
||||
runable.notifyAll();
|
||||
return;
|
||||
}
|
||||
if (shutdown) {
|
||||
queued = false;
|
||||
runable.notifyAll();
|
||||
return;
|
||||
}
|
||||
|
||||
// If we could not iterate all the items
|
||||
// then we need to re-queue.
|
||||
if (!done) {
|
||||
queued = true;
|
||||
}
|
||||
// If we could not iterate all the items
|
||||
// then we need to re-queue.
|
||||
if (!done) {
|
||||
queued = true;
|
||||
}
|
||||
|
||||
if (queued) {
|
||||
executor.execute(runable);
|
||||
}
|
||||
|
||||
if (queued) {
|
||||
executor.execute(runable);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,8 +21,12 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.SynchronousQueue;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
|
@ -60,6 +64,62 @@ public class PooledTaskRunnerTest extends TestCase {
|
|||
runner.shutdown();
|
||||
}
|
||||
|
||||
|
||||
public void testWakeupResultsInThreadSafeCalls() throws Exception {
|
||||
|
||||
ThreadPoolExecutor executor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
|
||||
public Thread newThread(Runnable runnable) {
|
||||
Thread thread = new Thread(runnable, getName());
|
||||
thread.setDaemon(true);
|
||||
thread.setPriority(Thread.NORM_PRIORITY);
|
||||
return thread;
|
||||
}
|
||||
});
|
||||
final CountDownLatch doneLatch = new CountDownLatch( 100 );
|
||||
final AtomicInteger clashCount = new AtomicInteger();
|
||||
final AtomicInteger count = new AtomicInteger();
|
||||
|
||||
|
||||
final PooledTaskRunner runner = new PooledTaskRunner(executor, new Task() {
|
||||
String threadUnSafeVal = null;
|
||||
public boolean iterate() {
|
||||
if (threadUnSafeVal != null) {
|
||||
clashCount.incrementAndGet();
|
||||
}
|
||||
threadUnSafeVal = Thread.currentThread().getName();
|
||||
count.incrementAndGet();
|
||||
doneLatch.countDown();
|
||||
if (!threadUnSafeVal.equals(Thread.currentThread().getName())) {
|
||||
clashCount.incrementAndGet();
|
||||
}
|
||||
threadUnSafeVal = null;
|
||||
return false;
|
||||
}
|
||||
}, 1 );
|
||||
|
||||
Runnable doWakeup = new Runnable() {
|
||||
public void run() {
|
||||
try {
|
||||
runner.wakeup();
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
final int iterations = 1000;
|
||||
for (int i=0; i< iterations; i++) {
|
||||
if (i%100 == 0) {
|
||||
Thread.sleep(10);
|
||||
}
|
||||
executor.execute(doWakeup);
|
||||
}
|
||||
|
||||
doneLatch.await(20, TimeUnit.SECONDS);
|
||||
assertEquals("thread safety clash", 0, clashCount.get());
|
||||
assertTrue("called more than once", count.get() > 1);
|
||||
runner.shutdown();
|
||||
}
|
||||
|
||||
public void testShutsDownAfterRunnerFailure() throws Exception {
|
||||
Future<Object> future = executor.submit( new Callable<Object>() {
|
||||
public Object call() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue