diff --git a/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnection.java b/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnection.java index 789dbbf90a..3cc2c18819 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnection.java +++ b/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnection.java @@ -16,9 +16,6 @@ */ package org.apache.activemq.proxy; -import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.activemq.Service; import org.apache.activemq.command.ShutdownInfo; import org.apache.activemq.command.WireFormatInfo; @@ -29,14 +26,17 @@ import org.apache.activemq.util.ServiceStopper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + class ProxyConnection implements Service { private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class); - private final Transport localTransport; - private final Transport remoteTransport; - private AtomicBoolean shuttingDown = new AtomicBoolean(false); - private AtomicBoolean running = new AtomicBoolean(false); + protected final Transport localTransport; + protected final Transport remoteTransport; + private final AtomicBoolean shuttingDown = new AtomicBoolean(false); + private final AtomicBoolean running = new AtomicBoolean(false); public ProxyConnection(Transport localTransport, Transport remoteTransport) { this.localTransport = localTransport; @@ -53,12 +53,14 @@ class ProxyConnection implements Service { } } + @Override public void start() throws Exception { if (!running.compareAndSet(false, true)) { return; } this.localTransport.setTransportListener(new DefaultTransportListener() { + @Override public void onCommand(Object command) { boolean shutdown = false; if (command.getClass() == ShutdownInfo.class) { @@ -81,12 +83,14 @@ class ProxyConnection implements Service { } } + @Override public void onException(IOException error) { onFailure(error); } }); this.remoteTransport.setTransportListener(new DefaultTransportListener() { + @Override public void onCommand(Object command) { try { // skipping WireFormat infos @@ -99,6 +103,7 @@ class ProxyConnection implements Service { } } + @Override public void onException(IOException error) { onFailure(error); } @@ -108,15 +113,68 @@ class ProxyConnection implements Service { remoteTransport.start(); } + @Override public void stop() throws Exception { if (!running.compareAndSet(true, false)) { return; } shuttingDown.set(true); ServiceStopper ss = new ServiceStopper(); - ss.stop(localTransport); ss.stop(remoteTransport); + ss.stop(localTransport); ss.throwFirstException(); } + + @Override + public boolean equals(Object arg) { + if (arg == null || !(arg instanceof ProxyConnection)) { + return false; + } else { + ProxyConnection other = (ProxyConnection) arg; + String otherRemote = ""; + String otherLocal = ""; + String thisRemote = ""; + String thisLocal = ""; + + if (other.localTransport != null && other.localTransport.getRemoteAddress() != null) { + otherLocal = other.localTransport.getRemoteAddress(); + } + if (other.remoteTransport != null && other.remoteTransport.getRemoteAddress() != null) { + otherRemote = other.remoteTransport.getRemoteAddress(); + } + if (this.remoteTransport != null && this.remoteTransport.getRemoteAddress() != null) { + thisRemote = this.remoteTransport.getRemoteAddress(); + } + if (this.localTransport != null && this.localTransport.getRemoteAddress() != null) { + thisLocal = this.localTransport.getRemoteAddress(); + } + + if (otherRemote.equals(thisRemote) && otherLocal.equals(thisLocal)) { + return true; + } else { + return false; + } + } + } + + + @Override + public int hashCode() { + int hash = 17; + if (localTransport != null && localTransport.getRemoteAddress() != null) { + hash += 31 * hash + localTransport.getRemoteAddress().hashCode(); + } + if (remoteTransport != null && remoteTransport.getRemoteAddress() != null) { + hash = 31 * hash + remoteTransport.hashCode(); + } + return hash; + } + + @Override + public String toString() { + return "ProxyConnection [localTransport=" + localTransport + + ", remoteTransport=" + remoteTransport + ", shuttingDown=" + + shuttingDown.get() + ", running=" + running.get() + "]"; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnector.java b/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnector.java index d82911b55a..b36faafc25 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnector.java +++ b/activemq-broker/src/main/java/org/apache/activemq/proxy/ProxyConnector.java @@ -16,11 +16,6 @@ */ package org.apache.activemq.proxy; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.Iterator; -import java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.Service; import org.apache.activemq.transport.CompositeTransport; import org.apache.activemq.transport.Transport; @@ -32,10 +27,14 @@ import org.apache.activemq.util.ServiceStopper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.Iterator; +import java.util.concurrent.CopyOnWriteArrayList; + /** * @org.apache.xbean.XBean - * - * */ public class ProxyConnector implements Service { @@ -45,45 +44,59 @@ public class ProxyConnector implements Service { private URI remote; private URI localUri; private String name; + /** * Should we proxy commands to the local broker using VM transport as well? */ private boolean proxyToLocalBroker = true; - + private final CopyOnWriteArrayList connections = new CopyOnWriteArrayList(); + @Override public void start() throws Exception { this.getServer().setAcceptListener(new TransportAcceptListener() { + @Override public void onAccept(Transport localTransport) { + ProxyConnection connection = null; try { - Transport remoteTransport = createRemoteTransport(); - ProxyConnection connection = new ProxyConnection(localTransport, remoteTransport); - connections.add(connection); + Transport remoteTransport = createRemoteTransport(localTransport); + connection = new ProxyConnection(localTransport, remoteTransport); connection.start(); + connections.add(connection); } catch (Exception e) { onAcceptError(e); + try { + if (connection != null) { + connection.stop(); + } + } catch (Exception eoc) { + LOG.error("Could not close broken connection: ", eoc); + } } } + @Override public void onAcceptError(Exception error) { LOG.error("Could not accept connection: ", error); } }); getServer().start(); LOG.info("Proxy Connector {} started", getName()); - } + @Override public void stop() throws Exception { ServiceStopper ss = new ServiceStopper(); if (this.server != null) { ss.stop(this.server); } + for (Iterator iter = connections.iterator(); iter.hasNext();) { LOG.info("Connector stopped: Stopping proxy."); ss.stop(iter.next()); } + connections.clear(); ss.throwFirstException(); LOG.info("Proxy Connector {} stopped", getName()); } @@ -133,11 +146,11 @@ public class ProxyConnector implements Service { return TransportFactory.bind(bind); } - private Transport createRemoteTransport() throws Exception { + private Transport createRemoteTransport(final Transport local) throws Exception { Transport transport = TransportFactory.compositeConnect(remote); CompositeTransport ct = transport.narrow(CompositeTransport.class); if (ct != null && localUri != null && proxyToLocalBroker) { - ct.add(false,new URI[] {localUri}); + ct.add(false, new URI[] { localUri }); } // Add a transport filter so that we can track the transport life cycle @@ -146,7 +159,9 @@ public class ProxyConnector implements Service { public void stop() throws Exception { LOG.info("Stopping proxy."); super.stop(); - connections.remove(this); + ProxyConnection dummy = new ProxyConnection(local, this); + LOG.debug("Removing proxyConnection {}", dummy.toString()); + connections.remove(dummy); } }; return transport; @@ -175,4 +190,7 @@ public class ProxyConnector implements Service { this.proxyToLocalBroker = proxyToLocalBroker; } + protected Integer getConnectionCount() { + return connections.size(); + } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/proxy/AMQ4889Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/proxy/AMQ4889Test.java new file mode 100644 index 0000000000..bfe73ec2ea --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/proxy/AMQ4889Test.java @@ -0,0 +1,111 @@ +package org.apache.activemq.proxy; + + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.security.AuthenticationUser; +import org.apache.activemq.security.SimpleAuthenticationPlugin; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSSecurityException; +import javax.jms.Session; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class AMQ4889Test { + protected static final Logger LOG = LoggerFactory.getLogger(AMQ4889Test.class); + + public static final String USER = "user"; + public static final String GOOD_USER_PASSWORD = "password"; + public static final String WRONG_PASSWORD = "wrongPassword"; + public static final String PROXY_URI = "tcp://localhost:6002"; + public static final String LOCAL_URI = "tcp://localhost:6001"; + + protected BrokerService brokerService; + private ProxyConnector proxyConnector; + protected TransportConnector transportConnector; + protected ConnectionFactory connectionFactory; + + private static final Integer ITERATIONS = 100; + + protected BrokerService createBroker() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + + ArrayList plugins = new ArrayList(); + BrokerPlugin authenticationPlugin = configureAuthentication(); + plugins.add(authenticationPlugin); + BrokerPlugin[] array = new BrokerPlugin[plugins.size()]; + brokerService.setPlugins(plugins.toArray(array)); + + transportConnector = brokerService.addConnector(LOCAL_URI); + proxyConnector = new ProxyConnector(); + proxyConnector.setName("proxy"); // TODO rename + proxyConnector.setBind(new URI(PROXY_URI)); + proxyConnector.setRemote(new URI(LOCAL_URI)); + brokerService.addProxyConnector(proxyConnector); + + brokerService.start(); + brokerService.waitUntilStarted(); + + return brokerService; + } + + protected BrokerPlugin configureAuthentication() throws Exception { + List users = new ArrayList(); + users.add(new AuthenticationUser(USER, GOOD_USER_PASSWORD, "users")); + SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users); + + return authenticationPlugin; + } + + @Before + public void setUp() throws Exception { + brokerService = createBroker(); + connectionFactory = new ActiveMQConnectionFactory(PROXY_URI); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + + @Test(timeout = 1 * 60 * 1000) + public void testForConnectionLeak() throws Exception { + Integer expectedConnectionCount = 0; + for (int i=0; i < ITERATIONS; i++) { + try { + if (i % 2 == 0) { + LOG.debug("Iteration {} adding bad connection", i); + Connection connection = connectionFactory.createConnection(USER, WRONG_PASSWORD); // TODO change to debug + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + fail("createSession should fail"); + } else { + LOG.debug("Iteration {} adding good connection", i); + Connection connection = connectionFactory.createConnection(USER, GOOD_USER_PASSWORD); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + expectedConnectionCount++; + } + // + } catch (JMSSecurityException e) { + } + LOG.debug("Iteration {} Connections? {}", i, proxyConnector.getConnectionCount()); + Thread.sleep(50); // Need to wait for remove to finish + assertEquals(expectedConnectionCount, proxyConnector.getConnectionCount()); + } + } +}