Fix for AMQ-4889, potential leak of ProxyConnectors

This commit is contained in:
Kevin Earls 2013-12-16 18:13:27 +01:00
parent cb5c29d02d
commit 257710ba1a
3 changed files with 210 additions and 23 deletions

View File

@ -16,9 +16,6 @@
*/
package org.apache.activemq.proxy;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.Service;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.WireFormatInfo;
@ -29,14 +26,17 @@ import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
class ProxyConnection implements Service {
private static final Logger LOG = LoggerFactory.getLogger(ProxyConnection.class);
private final Transport localTransport;
private final Transport remoteTransport;
private AtomicBoolean shuttingDown = new AtomicBoolean(false);
private AtomicBoolean running = new AtomicBoolean(false);
protected final Transport localTransport;
protected final Transport remoteTransport;
private final AtomicBoolean shuttingDown = new AtomicBoolean(false);
private final AtomicBoolean running = new AtomicBoolean(false);
public ProxyConnection(Transport localTransport, Transport remoteTransport) {
this.localTransport = localTransport;
@ -53,12 +53,14 @@ class ProxyConnection implements Service {
}
}
@Override
public void start() throws Exception {
if (!running.compareAndSet(false, true)) {
return;
}
this.localTransport.setTransportListener(new DefaultTransportListener() {
@Override
public void onCommand(Object command) {
boolean shutdown = false;
if (command.getClass() == ShutdownInfo.class) {
@ -81,12 +83,14 @@ class ProxyConnection implements Service {
}
}
@Override
public void onException(IOException error) {
onFailure(error);
}
});
this.remoteTransport.setTransportListener(new DefaultTransportListener() {
@Override
public void onCommand(Object command) {
try {
// skipping WireFormat infos
@ -99,6 +103,7 @@ class ProxyConnection implements Service {
}
}
@Override
public void onException(IOException error) {
onFailure(error);
}
@ -108,15 +113,68 @@ class ProxyConnection implements Service {
remoteTransport.start();
}
@Override
public void stop() throws Exception {
if (!running.compareAndSet(true, false)) {
return;
}
shuttingDown.set(true);
ServiceStopper ss = new ServiceStopper();
ss.stop(localTransport);
ss.stop(remoteTransport);
ss.stop(localTransport);
ss.throwFirstException();
}
@Override
public boolean equals(Object arg) {
if (arg == null || !(arg instanceof ProxyConnection)) {
return false;
} else {
ProxyConnection other = (ProxyConnection) arg;
String otherRemote = "";
String otherLocal = "";
String thisRemote = "";
String thisLocal = "";
if (other.localTransport != null && other.localTransport.getRemoteAddress() != null) {
otherLocal = other.localTransport.getRemoteAddress();
}
if (other.remoteTransport != null && other.remoteTransport.getRemoteAddress() != null) {
otherRemote = other.remoteTransport.getRemoteAddress();
}
if (this.remoteTransport != null && this.remoteTransport.getRemoteAddress() != null) {
thisRemote = this.remoteTransport.getRemoteAddress();
}
if (this.localTransport != null && this.localTransport.getRemoteAddress() != null) {
thisLocal = this.localTransport.getRemoteAddress();
}
if (otherRemote.equals(thisRemote) && otherLocal.equals(thisLocal)) {
return true;
} else {
return false;
}
}
}
@Override
public int hashCode() {
int hash = 17;
if (localTransport != null && localTransport.getRemoteAddress() != null) {
hash += 31 * hash + localTransport.getRemoteAddress().hashCode();
}
if (remoteTransport != null && remoteTransport.getRemoteAddress() != null) {
hash = 31 * hash + remoteTransport.hashCode();
}
return hash;
}
@Override
public String toString() {
return "ProxyConnection [localTransport=" + localTransport
+ ", remoteTransport=" + remoteTransport + ", shuttingDown="
+ shuttingDown.get() + ", running=" + running.get() + "]";
}
}

View File

@ -16,11 +16,6 @@
*/
package org.apache.activemq.proxy;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.Service;
import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.Transport;
@ -32,10 +27,14 @@ import org.apache.activemq.util.ServiceStopper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* @org.apache.xbean.XBean
*
*
*/
public class ProxyConnector implements Service {
@ -45,45 +44,59 @@ public class ProxyConnector implements Service {
private URI remote;
private URI localUri;
private String name;
/**
* Should we proxy commands to the local broker using VM transport as well?
*/
private boolean proxyToLocalBroker = true;
private final CopyOnWriteArrayList<ProxyConnection> connections = new CopyOnWriteArrayList<ProxyConnection>();
@Override
public void start() throws Exception {
this.getServer().setAcceptListener(new TransportAcceptListener() {
@Override
public void onAccept(Transport localTransport) {
ProxyConnection connection = null;
try {
Transport remoteTransport = createRemoteTransport();
ProxyConnection connection = new ProxyConnection(localTransport, remoteTransport);
connections.add(connection);
Transport remoteTransport = createRemoteTransport(localTransport);
connection = new ProxyConnection(localTransport, remoteTransport);
connection.start();
connections.add(connection);
} catch (Exception e) {
onAcceptError(e);
try {
if (connection != null) {
connection.stop();
}
} catch (Exception eoc) {
LOG.error("Could not close broken connection: ", eoc);
}
}
}
@Override
public void onAcceptError(Exception error) {
LOG.error("Could not accept connection: ", error);
}
});
getServer().start();
LOG.info("Proxy Connector {} started", getName());
}
@Override
public void stop() throws Exception {
ServiceStopper ss = new ServiceStopper();
if (this.server != null) {
ss.stop(this.server);
}
for (Iterator<ProxyConnection> iter = connections.iterator(); iter.hasNext();) {
LOG.info("Connector stopped: Stopping proxy.");
ss.stop(iter.next());
}
connections.clear();
ss.throwFirstException();
LOG.info("Proxy Connector {} stopped", getName());
}
@ -133,11 +146,11 @@ public class ProxyConnector implements Service {
return TransportFactory.bind(bind);
}
private Transport createRemoteTransport() throws Exception {
private Transport createRemoteTransport(final Transport local) throws Exception {
Transport transport = TransportFactory.compositeConnect(remote);
CompositeTransport ct = transport.narrow(CompositeTransport.class);
if (ct != null && localUri != null && proxyToLocalBroker) {
ct.add(false,new URI[] {localUri});
ct.add(false, new URI[] { localUri });
}
// Add a transport filter so that we can track the transport life cycle
@ -146,7 +159,9 @@ public class ProxyConnector implements Service {
public void stop() throws Exception {
LOG.info("Stopping proxy.");
super.stop();
connections.remove(this);
ProxyConnection dummy = new ProxyConnection(local, this);
LOG.debug("Removing proxyConnection {}", dummy.toString());
connections.remove(dummy);
}
};
return transport;
@ -175,4 +190,7 @@ public class ProxyConnector implements Service {
this.proxyToLocalBroker = proxyToLocalBroker;
}
protected Integer getConnectionCount() {
return connections.size();
}
}

View File

@ -0,0 +1,111 @@
package org.apache.activemq.proxy;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.security.AuthenticationUser;
import org.apache.activemq.security.SimpleAuthenticationPlugin;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSSecurityException;
import javax.jms.Session;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class AMQ4889Test {
protected static final Logger LOG = LoggerFactory.getLogger(AMQ4889Test.class);
public static final String USER = "user";
public static final String GOOD_USER_PASSWORD = "password";
public static final String WRONG_PASSWORD = "wrongPassword";
public static final String PROXY_URI = "tcp://localhost:6002";
public static final String LOCAL_URI = "tcp://localhost:6001";
protected BrokerService brokerService;
private ProxyConnector proxyConnector;
protected TransportConnector transportConnector;
protected ConnectionFactory connectionFactory;
private static final Integer ITERATIONS = 100;
protected BrokerService createBroker() throws Exception {
brokerService = new BrokerService();
brokerService.setPersistent(false);
ArrayList<BrokerPlugin> plugins = new ArrayList<BrokerPlugin>();
BrokerPlugin authenticationPlugin = configureAuthentication();
plugins.add(authenticationPlugin);
BrokerPlugin[] array = new BrokerPlugin[plugins.size()];
brokerService.setPlugins(plugins.toArray(array));
transportConnector = brokerService.addConnector(LOCAL_URI);
proxyConnector = new ProxyConnector();
proxyConnector.setName("proxy"); // TODO rename
proxyConnector.setBind(new URI(PROXY_URI));
proxyConnector.setRemote(new URI(LOCAL_URI));
brokerService.addProxyConnector(proxyConnector);
brokerService.start();
brokerService.waitUntilStarted();
return brokerService;
}
protected BrokerPlugin configureAuthentication() throws Exception {
List<AuthenticationUser> users = new ArrayList<AuthenticationUser>();
users.add(new AuthenticationUser(USER, GOOD_USER_PASSWORD, "users"));
SimpleAuthenticationPlugin authenticationPlugin = new SimpleAuthenticationPlugin(users);
return authenticationPlugin;
}
@Before
public void setUp() throws Exception {
brokerService = createBroker();
connectionFactory = new ActiveMQConnectionFactory(PROXY_URI);
}
@After
public void tearDown() throws Exception {
brokerService.stop();
brokerService.waitUntilStopped();
}
@Test(timeout = 1 * 60 * 1000)
public void testForConnectionLeak() throws Exception {
Integer expectedConnectionCount = 0;
for (int i=0; i < ITERATIONS; i++) {
try {
if (i % 2 == 0) {
LOG.debug("Iteration {} adding bad connection", i);
Connection connection = connectionFactory.createConnection(USER, WRONG_PASSWORD); // TODO change to debug
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
fail("createSession should fail");
} else {
LOG.debug("Iteration {} adding good connection", i);
Connection connection = connectionFactory.createConnection(USER, GOOD_USER_PASSWORD);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
expectedConnectionCount++;
}
//
} catch (JMSSecurityException e) {
}
LOG.debug("Iteration {} Connections? {}", i, proxyConnector.getConnectionCount());
Thread.sleep(50); // Need to wait for remove to finish
assertEquals(expectedConnectionCount, proxyConnector.getConnectionCount());
}
}
}