git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1481984 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-05-13 17:45:18 +00:00
parent 2bd0e5703e
commit e731c39af7
2 changed files with 183 additions and 52 deletions

View File

@ -60,7 +60,7 @@ import org.slf4j.LoggerFactory;
*
*/
public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener{
public class TcpTransportServer extends TransportServerThreadSupport implements ServiceListener {
private static final Logger LOG = LoggerFactory.getLogger(TcpTransportServer.class);
protected ServerSocket serverSocket;
@ -70,41 +70,36 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
protected long maxInactivityDuration = 30000;
protected long maxInactivityDurationInitalDelay = 10000;
protected int minmumWireFormatVersion;
protected boolean useQueueForAccept=true;
protected boolean useQueueForAccept = true;
/**
* trace=true -> the Transport stack where this TcpTransport
* object will be, will have a TransportLogger layer
* trace=false -> the Transport stack where this TcpTransport
* object will be, will NOT have a TransportLogger layer, and therefore
* will never be able to print logging messages.
* This parameter is most probably set in Connection or TransportConnector URIs.
* trace=true -> the Transport stack where this TcpTransport object will be, will have a TransportLogger layer
* trace=false -> the Transport stack where this TcpTransport object will be, will NOT have a TransportLogger layer,
* and therefore will never be able to print logging messages. This parameter is most probably set in Connection or
* TransportConnector URIs.
*/
protected boolean trace = false;
protected int soTimeout = 0;
protected int socketBufferSize = 64 * 1024;
protected int connectionTimeout = 30000;
protected int connectionTimeout = 30000;
/**
* Name of the LogWriter implementation to use.
* Names are mapped to classes in the resources/META-INF/services/org/apache/activemq/transport/logwriters directory.
* This parameter is most probably set in Connection or TransportConnector URIs.
* Name of the LogWriter implementation to use. Names are mapped to classes in the
* resources/META-INF/services/org/apache/activemq/transport/logwriters directory. This parameter is most probably
* set in Connection or TransportConnector URIs.
*/
protected String logWriterName = TransportLoggerSupport.defaultLogWriterName;
/**
* Specifies if the TransportLogger will be manageable by JMX or not.
* Also, as long as there is at least 1 TransportLogger which is manageable,
* a TransportLoggerControl MBean will me created.
* Specifies if the TransportLogger will be manageable by JMX or not. Also, as long as there is at least 1
* TransportLogger which is manageable, a TransportLoggerControl MBean will me created.
*/
protected boolean dynamicManagement = false;
/**
* startLogging=true -> the TransportLogger object of the Transport stack
* will initially write messages to the log.
* startLogging=false -> the TransportLogger object of the Transport stack
* will initially NOT write messages to the log.
* This parameter only has an effect if trace == true.
* This parameter is most probably set in Connection or TransportConnector URIs.
* startLogging=true -> the TransportLogger object of the Transport stack will initially write messages to the log.
* startLogging=false -> the TransportLogger object of the Transport stack will initially NOT write messages to the
* log. This parameter only has an effect if trace == true. This parameter is most probably set in Connection or
* TransportConnector URIs.
*/
protected boolean startLogging = true;
protected final ServerSocketFactory serverSocketFactory;
@ -116,7 +111,8 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
protected int maximumConnections = Integer.MAX_VALUE;
protected AtomicInteger currentTransportCount = new AtomicInteger();
public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException, URISyntaxException {
public TcpTransportServer(TcpTransportFactory transportFactory, URI location, ServerSocketFactory serverSocketFactory) throws IOException,
URISyntaxException {
super(location);
this.transportFactory = transportFactory;
this.serverSocketFactory = serverSocketFactory;
@ -136,15 +132,16 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
throw IOExceptionSupport.create("Failed to bind to server socket: " + bind + " due to: " + e, e);
}
try {
setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind
.getFragment()));
setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), resolveHostName(serverSocket, addr), serverSocket.getLocalPort(), bind.getPath(),
bind.getQuery(), bind.getFragment()));
} catch (URISyntaxException e) {
// it could be that the host name contains invalid characters such
// as _ on unix platforms
// so lets try use the IP address instead
try {
setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(), bind.getQuery(), bind.getFragment()));
setConnectURI(new URI(bind.getScheme(), bind.getUserInfo(), addr.getHostAddress(), serverSocket.getLocalPort(), bind.getPath(),
bind.getQuery(), bind.getFragment()));
} catch (URISyntaxException e2) {
throw IOExceptionSupport.create(e2);
}
@ -166,15 +163,16 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
}
/**
* @param wireFormatFactory The wireFormatFactory to set.
* @param wireFormatFactory
* The wireFormatFactory to set.
*/
public void setWireFormatFactory(WireFormatFactory wireFormatFactory) {
this.wireFormatFactory = wireFormatFactory;
}
/**
* Associates a broker info with the transport server so that the transport
* can do discovery advertisements of the broker.
* Associates a broker info with the transport server so that the transport can do discovery advertisements of the
* broker.
*
* @param brokerInfo
*/
@ -246,7 +244,8 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
}
/**
* @param backlog the backlog to set
* @param backlog
* the backlog to set
*/
public void setBacklog(int backlog) {
this.backlog = backlog;
@ -260,7 +259,8 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
}
/**
* @param useQueueForAccept the useQueueForAccept to set
* @param useQueueForAccept
* the useQueueForAccept to set
*/
public void setUseQueueForAccept(boolean useQueueForAccept) {
this.useQueueForAccept = useQueueForAccept;
@ -281,7 +281,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
} else {
if (useQueueForAccept) {
socketQueue.put(socket);
}else {
} else {
handleSocket(socket);
}
}
@ -300,15 +300,14 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
}
/**
* Allow derived classes to override the Transport implementation that this
* transport server creates.
* Allow derived classes to override the Transport implementation that this transport server creates.
*
* @param socket
* @param format
* @return
* @throws IOException
*/
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
protected Transport createTransport(Socket socket, WireFormat format) throws IOException {
return new TcpTransport(format, socket);
}
@ -343,7 +342,7 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
@Override
protected void doStart() throws Exception {
if(useQueueForAccept) {
if (useQueueForAccept) {
Runnable run = new Runnable() {
@Override
public void run() {
@ -363,11 +362,9 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
}
}
};
socketHandlerThread = new Thread(null, run,
"ActiveMQ Transport Server Thread Handler: " + toString(),
getStackSize());
socketHandlerThread = new Thread(null, run, "ActiveMQ Transport Server Thread Handler: " + toString(), getStackSize());
socketHandlerThread.setDaemon(true);
socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT-1);
socketHandlerThread.setPriority(ThreadPriorities.BROKER_MANAGEMENT - 1);
socketHandlerThread.start();
}
super.doStart();
@ -383,24 +380,22 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
@Override
public InetSocketAddress getSocketAddress() {
return (InetSocketAddress)serverSocket.getLocalSocketAddress();
return (InetSocketAddress) serverSocket.getLocalSocketAddress();
}
protected final void handleSocket(Socket socket) {
boolean closeSocket = true;
try {
if (this.currentTransportCount.get() >= this.maximumConnections) {
throw new ExceededMaximumConnectionsException("Exceeded the maximum " +
"number of allowed client connections. See the 'maximumConnections' " +
"property on the TCP transport configuration URI in the ActiveMQ " +
"configuration file (e.g., activemq.xml)");
throw new ExceededMaximumConnectionsException(
"Exceeded the maximum number of allowed client connections. See the '" +
"maximumConnections' property on the TCP transport configuration URI " +
"in the ActiveMQ configuration file (e.g., activemq.xml)");
} else {
HashMap<String, Object> options = new HashMap<String, Object>();
options.put("maxInactivityDuration", Long.valueOf(maxInactivityDuration));
options.put("maxInactivityDurationInitalDelay",
Long.valueOf(maxInactivityDurationInitalDelay));
options.put("minmumWireFormatVersion",
Integer.valueOf(minmumWireFormatVersion));
options.put("maxInactivityDurationInitalDelay", Long.valueOf(maxInactivityDurationInitalDelay));
options.put("minmumWireFormatVersion", Integer.valueOf(minmumWireFormatVersion));
options.put("trace", Boolean.valueOf(trace));
options.put("soTimeout", Integer.valueOf(soTimeout));
options.put("socketBufferSize", Integer.valueOf(socketBufferSize));
@ -412,13 +407,13 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
WireFormat format = wireFormatFactory.createWireFormat();
Transport transport = createTransport(socket, format);
closeSocket = false;
if (transport instanceof ServiceSupport) {
((ServiceSupport) transport).addServiceListener(this);
}
Transport configuredTransport =
transportFactory.serverConfigure( transport, format, options);
Transport configuredTransport = transportFactory.serverConfigure(transport, format, options);
getAcceptListener().onAccept(configuredTransport);
currentTransportCount.incrementAndGet();
@ -426,6 +421,13 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
} catch (SocketTimeoutException ste) {
// expect this to happen
} catch (Exception e) {
if (closeSocket) {
try {
socket.close();
} catch (Exception ignore) {
}
}
if (!isStopping()) {
onAcceptError(e);
} else if (!isStopped()) {
@ -467,7 +469,8 @@ public class TcpTransportServer extends TransportServerThreadSupport implements
}
/**
* @param maximumConnections the maximumConnections to set
* @param maximumConnections
* the maximumConnections to set
*/
public void setMaximumConnections(int maximumConnections) {
this.maximumConnections = maximumConnections;

View File

@ -0,0 +1,128 @@
package org.apache.activemq.bugs;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.lang.management.ManagementFactory;
import java.util.concurrent.CountDownLatch;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.util.Wait;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Unit test for simple App.
*/
public class AMQ4531Test extends TestCase {
private final Logger LOG = LoggerFactory.getLogger(AMQ4531Test.class);
private String connectionURI;
private MBeanServer mbeanServer;
private BrokerService broker;
@Override
protected void setUp() throws Exception {
super.setUp();
broker = new BrokerService();
connectionURI = broker.addConnector("tcp://0.0.0.0:0?maximumConnections=1").getPublishableConnectString();
broker.setPersistent(false);
broker.start();
mbeanServer = ManagementFactory.getPlatformMBeanServer();
}
@Override
protected void tearDown() throws Exception {
broker.stop();
super.tearDown();
}
/**
* Create the test case
*
* @param testName
* name of the test case
*/
public AMQ4531Test(String testName) {
super(testName);
}
/**
* @return the suite of tests being tested
*/
public static Test suite() {
return new TestSuite(AMQ4531Test.class);
}
public void testFDSLeak() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionURI);
ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection();
connection.start();
int connections = 100;
final long original = openFileDescriptorCount();
LOG.info("FD count: " + original);
final CountDownLatch done = new CountDownLatch(connections);
for (int i = 0; i < connections; i++) {
new Thread("worker: " + i) {
@Override
public void run() {
ActiveMQConnection connection = null;
try {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionURI);
connection = (ActiveMQConnection) factory.createConnection();
connection.start();
} catch (Exception e) {
LOG.debug(getStack(e));
} finally {
try {
connection.close();
} catch (Exception e) {
LOG.debug(getStack(e));
}
done.countDown();
LOG.debug("Latch count down called.");
}
}
}.start();
}
// Wait for all the clients to finish
LOG.info("Waiting for latch...");
done.await();
LOG.info("Latch complete.");
LOG.info("FD count: " + openFileDescriptorCount());
assertTrue("Too many open file descriptors: " + openFileDescriptorCount(), Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
long openFDs = openFileDescriptorCount();
LOG.info("Current FD count [{}], original FD count[{}]", openFDs, original);
return (openFDs - original) < 10;
}
}));
}
private long openFileDescriptorCount() throws Exception {
return ((Long) mbeanServer.getAttribute(new ObjectName("java.lang:type=OperatingSystem"), "OpenFileDescriptorCount")).longValue();
}
private String getStack(Throwable aThrowable) {
final Writer result = new StringWriter();
final PrintWriter printWriter = new PrintWriter(result);
aThrowable.printStackTrace(printWriter);
return result.toString();
}
}