diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java b/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java index 1f22e56237..71e8b3a994 100644 --- a/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java +++ b/activemq-core/src/main/java/org/apache/activemq/thread/PooledTaskRunner.java @@ -30,7 +30,7 @@ class PooledTaskRunner implements TaskRunner { private boolean queued; private boolean shutdown; private boolean iterating; - private Thread runningThread; + private volatile Thread runningThread; public PooledTaskRunner(Executor executor, Task task, int maxIterationsPerRun) { this.executor = executor; @@ -39,8 +39,11 @@ class PooledTaskRunner implements TaskRunner { runable = new Runnable() { public void run() { runningThread = Thread.currentThread(); - runTask(); - runningThread = null; + try { + runTask(); + } finally { + runningThread = null; + } } }; } @@ -77,7 +80,7 @@ class PooledTaskRunner implements TaskRunner { /** * shut down the task - * + * * @throws InterruptedException */ public void shutdown(long timeout) throws InterruptedException { @@ -114,15 +117,20 @@ class PooledTaskRunner implements TaskRunner { // Don't synchronize while we are iterating so that // multiple wakeup() calls can be executed concurrently. boolean done = false; - for (int i = 0; i < maxIterationsPerRun; i++) { - if (!task.iterate()) { - done = true; - break; + try { + for (int i = 0; i < maxIterationsPerRun; i++) { + if (!task.iterate()) { + done = true; + break; + } + } + } finally { + synchronized( runable ) { + iterating = false; } } synchronized (runable) { - iterating = false; if (shutdown) { queued = false; runable.notifyAll(); diff --git a/activemq-core/src/test/java/org/apache/activemq/thread/PooledTaskRunnerTest.java b/activemq-core/src/test/java/org/apache/activemq/thread/PooledTaskRunnerTest.java new file mode 100644 index 0000000000..3a9dfb04b3 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/thread/PooledTaskRunnerTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.thread; + +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import junit.framework.TestCase; + +public class PooledTaskRunnerTest extends TestCase { + private ExecutorService executor; + + @Override + protected void setUp() throws Exception { + super.setUp(); + executor = Executors.newCachedThreadPool(); + } + + @Override + protected void tearDown() throws Exception { + executor.shutdownNow(); + + super.tearDown(); + } + + public void testNormalBehavior() throws Exception { + final CountDownLatch latch = new CountDownLatch( 1 ); + + PooledTaskRunner runner = new PooledTaskRunner( executor, new Task() { + public boolean iterate() { + latch.countDown(); + + return false; + } + }, 1 ); + + runner.wakeup(); + + assertTrue( latch.await( 1, TimeUnit.SECONDS ) ); + + runner.shutdown(); + } + + public void testShutsDownAfterRunnerFailure() throws Exception { + Future future = executor.submit( new Callable() { + public Object call() throws Exception { + final CountDownLatch latch = new CountDownLatch( 1 ); + + PooledTaskRunner runner = new PooledTaskRunner( executor, new Task() { + public boolean iterate() { + latch.countDown(); + + throw new RuntimeException(); + } + }, 1 ); + + runner.wakeup(); + + assertTrue( latch.await( 1, TimeUnit.SECONDS ) ); + + runner.shutdown(); + + return null; + } + } ); + + try { + future.get( 5, TimeUnit.SECONDS ); + } catch( TimeoutException e ) { + fail( "TaskRunner did not shut down cleanly" ); + } + } +} \ No newline at end of file