Added a modified version of the patch donated to (LINGO-22) submitted by Jim Beattie which tests that we leave no threads around after shutting down the JMS client and broker, together with making the Session Executor / TaskRunnerFactory part of the connection. We could still make it a singleton if required and just use reference counting to ensure its shutdown properly after all connections are closed

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@428339 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-08-03 11:16:18 +00:00
parent b62e5cd91e
commit 6cf2169533
4 changed files with 115 additions and 9 deletions

View File

@ -97,7 +97,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener {
public static final TaskRunnerFactory SESSION_TASK_RUNNER = new TaskRunnerFactory("ActiveMQ Session Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000);
private TaskRunnerFactory sessionTaskRunner = new TaskRunnerFactory("ActiveMQ Session Task",ThreadPriorities.INBOUND_CLIENT_SESSION,true,1000);
private final ThreadPoolExecutor asyncConnectionThread;
private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
@ -572,9 +572,10 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
started.set(false);
// TODO : ActiveMQConnectionFactory.onConnectionClose() not
// yet implemented.
// TODO if we move the TaskRunnerFactory to the connection factory
// then we may need to call
// factory.onConnectionClose(this);
sessionTaskRunner.shutdown();
closed.set(true);
closing.set(false);
@ -857,6 +858,15 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
transportListeners.remove(transportListener);
}
public TaskRunnerFactory getSessionTaskRunner() {
return sessionTaskRunner;
}
public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
this.sessionTaskRunner = sessionTaskRunner;
}
// Implementation methods
// -------------------------------------------------------------------------

View File

@ -102,7 +102,7 @@ public class ActiveMQSessionExecutor implements Task {
if( !messageQueue.isRunning() ) {
messageQueue.start();
if( session.isSessionAsyncDispatch() || dispatchedBySessionPool ) {
taskRunner = ActiveMQConnection.SESSION_TASK_RUNNER.createTaskRunner(this, "ActiveMQ Session: "+session.getSessionId());
taskRunner = session.connection.getSessionTaskRunner().createTaskRunner(this, "ActiveMQ Session: "+session.getSessionId());
}
wakeup();
}

View File

@ -18,6 +18,7 @@
package org.apache.activemq.thread;
import edu.emory.mathcs.backport.java.util.concurrent.Executor;
import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
import edu.emory.mathcs.backport.java.util.concurrent.SynchronousQueue;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
@ -37,7 +38,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
*/
public class TaskRunnerFactory {
private Executor executor;
private ExecutorService executor;
private int maxIterationsPerRun;
private String name;
private int priority;
@ -61,7 +62,12 @@ public class TaskRunnerFactory {
} else {
executor = createDefaultExecutor();
}
}
public void shutdown() {
if (executor != null) {
executor.shutdownNow();
}
}
public TaskRunner createTaskRunner(Task task, String name) {
@ -72,8 +78,7 @@ public class TaskRunnerFactory {
}
}
protected Executor createDefaultExecutor() {
protected ExecutorService createDefaultExecutor() {
ThreadPoolExecutor rc = new ThreadPoolExecutor(1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
Thread thread = new Thread(runnable, name);
@ -84,7 +89,6 @@ public class TaskRunnerFactory {
});
rc.allowCoreThreadTimeOut(true);
return rc;
}
}

View File

@ -0,0 +1,92 @@
/*
* Copyright 2005-2006 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.spring.ConsumerBean;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
/**
*
* @version $Revision: $
*/
public class StartAndStopClientAndBrokerDoesNotLeaveThreadsRunningTest extends TestCase {
public static interface Task {
public void execute() throws Exception;
}
public void setUp() throws Exception {
}
public void testStartAndStopClientAndBrokerAndCheckNoThreadsAreLeft() throws Exception {
runTest(new Task() {
public void execute() throws Exception {
BrokerService broker = new BrokerService();
broker.setPersistent(false);
broker.start();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue destination = session.createQueue(getName());
// consumer
MessageConsumer consumer = session.createConsumer(destination);
ConsumerBean listener = new ConsumerBean();
consumer.setMessageListener(listener);
// producer
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("Hello World!");
producer.send(message);
producer.close();
listener.assertMessagesArrived(1);
consumer.close();
session.close();
connection.close();
broker.stop();
}
});
}
public void runTest(Task task) throws Exception {
int numThreads = Thread.currentThread().getThreadGroup().activeCount();
Thread.currentThread().getThreadGroup().list();
task.execute();
Thread.yield();
Thread.sleep(2000); // Wait for the threads to exit on their own
Thread.currentThread().getThreadGroup().list();
assertEquals(numThreads, Thread.currentThread().getThreadGroup().activeCount());
}
}