diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java index ab0688f034..bec56d9115 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java @@ -97,7 +97,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener { - public static final TaskRunnerFactory SESSION_TASK_RUNNER = new TaskRunnerFactory("ActiveMQ Session Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000); + private TaskRunnerFactory sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000); private final ThreadPoolExecutor asyncConnectionThread; private static final Log log = LogFactory.getLog(ActiveMQConnection.class); @@ -572,9 +572,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon started.set(false); - // TODO : ActiveMQConnectionFactory.onConnectionClose() not - // yet implemented. + // TODO if we move the TaskRunnerFactory to the connection factory + // then we may need to call // factory.onConnectionClose(this); + sessionTaskRunner.shutdown(); closed.set(true); closing.set(false); @@ -857,6 +858,15 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon transportListeners.remove(transportListener); } + public TaskRunnerFactory getSessionTaskRunner() { + return sessionTaskRunner; + } + + public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) { + this.sessionTaskRunner = sessionTaskRunner; + } + + // Implementation methods // ------------------------------------------------------------------------- diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java index 856bb13396..02aac882b6 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSessionExecutor.java @@ -102,7 +102,7 @@ public class ActiveMQSessionExecutor implements Task { if( !messageQueue.isRunning() ) { messageQueue.start(); if( session.isSessionAsyncDispatch() || dispatchedBySessionPool ) { - taskRunner = ActiveMQConnection.SESSION_TASK_RUNNER.createTaskRunner(this, "ActiveMQ Session: "+session.getSessionId()); + taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this, "ActiveMQ Session: "+session.getSessionId()); } wakeup(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java b/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java index 7c3bda61fe..94dc772de3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/thread/TaskRunnerFactory.java @@ -18,6 +18,7 @@ package org.apache.activemq.thread; import edu.emory.mathcs.backport.java.util.concurrent.Executor; +import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService; import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue; import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory; import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor; @@ -37,7 +38,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit; */ public class TaskRunnerFactory { - private Executor executor; + private ExecutorService executor; private int maxIterationsPerRun; private String name; private int priority; @@ -61,9 +62,14 @@ public class TaskRunnerFactory { } else { executor = createDefaultExecutor(); } - } + public void shutdown() { + if (executor != null) { + executor.shutdownNow(); + } + } + public TaskRunner createTaskRunner(Task task, String name) { if( executor!=null ) { return new PooledTaskRunner(executor, task, maxIterationsPerRun); @@ -72,8 +78,7 @@ public class TaskRunnerFactory { } } - protected Executor createDefaultExecutor() { - + protected ExecutorService createDefaultExecutor() { ThreadPoolExecutor rc = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable, name); @@ -84,7 +89,6 @@ public class TaskRunnerFactory { }); rc.allowCoreThreadTimeOut(true); return rc; - } } diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest.java new file mode 100644 index 0000000000..bcf6ee3155 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest.java @@ -0,0 +1,92 @@ +/* + * Copyright 2005-2006 The Apache Software Foundation. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.usecases; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.spring.ConsumerBean; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import junit.framework.TestCase; + +/** + * + * @version $Revision: $ + */ +public class StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest extends TestCase { + + public static interface Task { + public void execute() throws Exception; + } + + public void setUp() throws Exception { + } + + public void testStartAndStopClientAndBrokerAndCheckNoThreadsAreLeft() throws Exception { + runTest(new Task() { + + public void execute() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.start(); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue destination = session.createQueue(getName()); + + // consumer + MessageConsumer consumer = session.createConsumer(destination); + ConsumerBean listener = new ConsumerBean(); + consumer.setMessageListener(listener); + + // producer + MessageProducer producer = session.createProducer(destination); + TextMessage message = session.createTextMessage("Hello World!"); + producer.send(message); + producer.close(); + + listener.assertMessagesArrived(1); + + consumer.close(); + session.close(); + connection.close(); + + broker.stop(); + } + }); + } + + public void runTest(Task task) throws Exception { + int numThreads = Thread.currentThread().getThreadGroup().activeCount(); + Thread.currentThread().getThreadGroup().list(); + + task.execute(); + + Thread.yield(); + Thread.sleep(2000); // Wait for the threads to exit on their own + + Thread.currentThread().getThreadGroup().list(); + assertEquals(numThreads, Thread.currentThread().getThreadGroup().activeCount()); + } +}