diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java index fd16603865..b17674e88a 100644 --- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java +++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQResourceAdapter.java @@ -355,8 +355,18 @@ public class ActiveMQResourceAdapter extends ActiveMQConnectionSupport implement } private ActiveMQConnection newConnection() throws JMSException { - ActiveMQConnection connection = makeConnection(); - connection.start(); + ActiveMQConnection connection = null; + try { + connection = makeConnection(); + connection.start(); + } catch (JMSException ex) { + if (connection != null) { + try { + connection.close(); + } catch (JMSException ignore) { } + } + throw ex; + } return connection; } diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionExecutorThreadCleanUpTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionExecutorThreadCleanUpTest.java new file mode 100644 index 0000000000..bec57d35ff --- /dev/null +++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ActiveMQConnectionExecutorThreadCleanUpTest.java @@ -0,0 +1,122 @@ +package org.apache.activemq.ra; + +import java.util.Set; +import java.util.Iterator; +import javax.transaction.xa.XAResource; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.ra.ActiveMQResourceAdapter; + +import org.junit.Assert; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Test for AMQ-6700. + * Will fail to connect to embedded broker using JCA and uses + * "ActiveMQ Connection Executor" thread to deal with low + * level exception. This tests verifies if this thread gets + * cleared up correctly after use. + */ +public class ActiveMQConnectionExecutorThreadCleanUpTest { + + protected static Logger LOG = + LoggerFactory.getLogger(ActiveMQConnectionExecutorThreadCleanUpTest.class); + protected static final String AMQ_CONN_EXECUTOR_THREAD_NAME = + "ActiveMQ Connection Executor"; + private BrokerService broker = null; + + + @Before + public void setUp() throws Exception { + LOG.info("Configuring broker programmatically."); + broker = new BrokerService(); + broker.setPersistent(false); + + // explicitly limiting to 0 connections so that test is unable + // to connect + broker.addConnector("tcp://localhost:0?maximumConnections=0"); + broker.start(); + broker.waitUntilStarted(5000); + } + + + @After + public void shutDown() throws Exception { + if (broker != null) { + if (broker.isStarted()) { + broker.stop(); + broker.waitUntilStopped(); + } + } + } + + + /** + * This test tries to create connections into the broker using the + * resource adapter's transaction recovery functionality. + * If the broker does not accept the connection, the connection's + * thread pool executor is used to deal with the error. + * This has lead to race conditions where the thread was not shutdown + * but got leaked. + * @throws Exception + */ + @Test + public void testAMQConnectionExecutorThreadCleanUp() throws Exception { + LOG.info("testAMQConnectionExecutorThreadCleanUp() started."); + + ActiveMQResourceAdapter ra = new ActiveMQResourceAdapter(); + ra.setServerUrl(broker.getTransportConnectorByScheme("tcp").getPublishableConnectString()); + LOG.info("Using brokerUrl " + ra.getServerUrl()); + + // running in a small loop as very occasionally the call to + // ActiveMQResourceAdapter.$2.makeConnection() raises an exception + // rather than using the connection's executor task to deal with the + // connection error. + for (int i=0; i<10; i++) { + LOG.debug("Iteration " + i); + try { + XAResource[] resources = ra.getXAResources(null); + resources[0].recover(100); + } catch (Exception ex) { + LOG.error(ex.getMessage()); + } + // allow some small time for thread cleanup to happen + Thread.sleep(300); + + // check if thread exists + Assert.assertFalse("Thread named \"" + + AMQ_CONN_EXECUTOR_THREAD_NAME + + "\" not cleared up with ActiveMQConnection.", + hasActiveMQConnectionExceutorThread()); + } + ra.stop(); + } + + + /** + * Retrieves all threads from JVM and checks if any thread names contain + * AMQ_CONN_EXECUTOR_THREAD_NAME. + * + * @return true if such thread exists, otherwise false + */ + public boolean hasActiveMQConnectionExceutorThread() { + // retrieve all threads + Set threadSet = Thread.getAllStackTraces().keySet(); + Iterator iter = threadSet.iterator(); + while (iter.hasNext()) { + Thread thread = (Thread)iter.next(); + if (thread.getName().startsWith(AMQ_CONN_EXECUTOR_THREAD_NAME )) { + LOG.error("Thread with name {} found.", thread.getName()); + return true; + } + } + LOG.debug("Thread with name {} not found.", AMQ_CONN_EXECUTOR_THREAD_NAME); + return false; + } +}