diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java index 0d634d58df..02c0063f24 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java @@ -65,6 +65,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S protected int soTimeout; protected int socketBufferSize = 64 * 1024; protected int ioBufferSize = 8 * 1024; + protected boolean closeAsync=true; protected Socket socket; protected DataOutputStream dataOut; protected DataInputStream dataIn; @@ -335,6 +336,20 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S public void setIoBufferSize(int ioBufferSize) { this.ioBufferSize = ioBufferSize; } + + /** + * @return the closeAsync + */ + public boolean isCloseAsync() { + return closeAsync; + } + + /** + * @param closeAsync the closeAsync to set + */ + public void setCloseAsync(boolean closeAsync) { + this.closeAsync = closeAsync; + } // Implementation methods // ------------------------------------------------------------------------- @@ -441,22 +456,33 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S // is hung.. then this hangs the close. // closeStreams(); if (socket != null) { - //closing the socket can hang also - final CountDownLatch latch = new CountDownLatch(1); - SOCKET_CLOSE.execute(new Runnable() { - - public void run() { - try { - socket.close(); - } catch (IOException e) { - LOG.debug("Caught exception closing socket",e); - }finally { - latch.countDown(); - } - } + if (closeAsync) { + //closing the socket can hang also + final CountDownLatch latch = new CountDownLatch(1); - }); - latch.await(1,TimeUnit.SECONDS); + SOCKET_CLOSE.execute(new Runnable() { + + public void run() { + try { + socket.shutdownInput(); + socket.shutdownOutput(); + socket.close(); + } catch (IOException e) { + LOG.debug("Caught exception closing socket",e); + }finally { + latch.countDown(); + } + } + + }); + latch.await(1,TimeUnit.SECONDS); + }else { + try { + socket.close(); + } catch (IOException e) { + LOG.debug("Caught exception closing socket",e); + } + } } } @@ -512,6 +538,7 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S SOCKET_CLOSE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactory() { public Thread newThread(Runnable runnable) { Thread thread = new Thread(runnable, "TcpSocketClose: "+runnable); + thread.setPriority(Thread.MAX_PRIORITY); thread.setDaemon(true); return thread; } diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java new file mode 100644 index 0000000000..9f6411f03a --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/perf/ConnectionChurnTest.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.perf; + +import java.util.ArrayList; +import java.util.List; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * @version $Revision: 1.3 $ + */ +public class ConnectionChurnTest extends TestCase { + protected static final int CONNECTION_COUNT = 200; + private static final Log LOG = LogFactory.getLog(ConnectionChurnTest.class); + protected BrokerService broker; + protected String bindAddress = ActiveMQConnection.DEFAULT_BROKER_BIND_URL+"?transport.closeAsync=false"; + protected int topicCount; + + public void testPerformance() throws Exception { + ConnectionFactory factory = createConnectionFactory(); + List list = new ArrayList(); + for (int i = 0; i < CONNECTION_COUNT; i++) { + Connection connection = factory.createConnection(); + connection.start(); + list.add(connection); + LOG.info("Created " + i); + if (i % 100 == 0) { + closeConnections(list); + } + } + closeConnections(list); + } + + protected void closeConnections(List list) throws JMSException { + for (Connection c : list) { + c.close(); + } + for (TransportConnector tc : broker.getTransportConnectors()) { + System.out.println(tc.getConnections().size()); + } + list.clear(); + } + + protected void setUp() throws Exception { + if (broker == null) { + broker = createBroker(); + } + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + if (broker != null) { + broker.stop(); + } + } + + protected ActiveMQConnectionFactory createConnectionFactory() + throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory( + ActiveMQConnection.DEFAULT_BROKER_URL); + return cf; + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + configureBroker(answer); + answer.start(); + return answer; + } + + protected void configureBroker(BrokerService answer) throws Exception { + answer.setPersistent(false); + answer.addConnector(bindAddress); + answer.setDeleteAllMessagesOnStartup(true); + } +}