diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java index b25d4e49fd..4c5e74971d 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java @@ -18,6 +18,7 @@ package org.apache.activemq.transport.amqp; import java.io.File; import java.security.SecureRandom; +import java.util.Set; import java.util.Vector; import javax.jms.Connection; @@ -36,6 +37,7 @@ import org.apache.activemq.AutoFailTestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.broker.jmx.ConnectorViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.spring.SpringSslContext; import org.junit.After; @@ -78,6 +80,7 @@ public class AmqpTestSupport { brokerService.setPersistent(false); brokerService.setAdvisorySupport(false); brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages); + brokerService.setUseJmx(true); SSLContext ctx = SSLContext.getInstance("TLS"); ctx.init(new KeyManager[0], new TrustManager[]{new DefaultTrustManager()}, new SecureRandom()); @@ -164,6 +167,21 @@ public class AmqpTestSupport { return proxy; } + protected ConnectorViewMBean getProxyToConnectionView(String connectionType) throws Exception { + ObjectName connectorQuery = new ObjectName( + "org.apache.activemq:type=Broker,brokerName=localhost,connector=clientConnectors,connectorName="+connectionType+"_//*"); + + Set results = brokerService.getManagementContext().queryNames(connectorQuery, null); + + if (results == null || results.isEmpty() || results.size() > 1) { + throw new Exception("Unable to find the exact Connector instance."); + } + + ConnectorViewMBean proxy = (ConnectorViewMBean) brokerService.getManagementContext() + .newProxyInstance(results.iterator().next(), ConnectorViewMBean.class, true); + return proxy; + } + protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException { ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name); QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext() diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java index 5dc670e134..4813514056 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.ArrayList; import java.util.Enumeration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -39,6 +40,7 @@ import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.activemq.broker.jmx.ConnectorViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin; import org.apache.activemq.util.Wait; @@ -519,10 +521,45 @@ public class JMSClientTest extends AmqpTestSupport { connection.close(); } + @Test(timeout=60000) + public void testConnectionsAreClosed() throws Exception { + ActiveMQAdmin.enableJMSFrameTracing(); + + final ConnectorViewMBean connector = getProxyToConnectionView("amqp"); + LOG.info("Current number of Connections is: {}", connector.connectionCount()); + + ArrayList connections = new ArrayList(); + + for (int i = 0; i < 10; i++) { + connections.add(createConnection(null)); + } + + LOG.info("Current number of Connections is: {}", connector.connectionCount()); + + for (Connection connection : connections) { + connection.close(); + } + + assertTrue("Should have no connections left.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + LOG.info("Current number of Connections is: {}", connector.connectionCount()); + return connector.connectionCount() == 0; + } + })); + } + private Connection createConnection() throws JMSException { + return createConnection(name.toString()); + } + + private Connection createConnection(String clientId) throws JMSException { final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password"); final Connection connection = factory.createConnection(); - connection.setClientID(name.toString()); + if (clientId != null && !clientId.isEmpty()) { + connection.setClientID(clientId); + } connection.setExceptionListener(new ExceptionListener() { @Override public void onException(JMSException exception) {