diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java index e5a51f9770..0c21c33d69 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java @@ -38,4 +38,9 @@ public class StompTransportFactory extends TcpTransportFactory { transport = new StompTransportFilter(transport, new LegacyFrameTranslator()); return super.compositeConfigure(transport, format, options); } + + protected boolean isUseInactivityMonitor(Transport transport) { + // lets disable the inactivity monitor as stomp does not use keep alive packets + return false; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java index 7a3125840a..7e1dd6c2e4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java @@ -356,6 +356,10 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S } protected void doStop(ServiceStopper stopper) throws Exception { + if (log.isDebugEnabled()) { + log.debug("Stopping transport " + this); + } + // Closing the streams flush the sockets before closing.. if the socket // is hung.. then this hangs the close. // closeStreams(); diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java index 881af0f089..98ae0d9247 100755 --- a/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java @@ -87,7 +87,9 @@ public class TcpTransportFactory extends TransportFactory { transport = new TransportLogger(transport); } - transport = new InactivityMonitor(transport); + if (isUseInactivityMonitor(transport)) { + transport = new InactivityMonitor(transport); + } // Only need the WireFormatNegotiator if using openwire if( format instanceof OpenWireFormat ) { @@ -97,6 +99,13 @@ public class TcpTransportFactory extends TransportFactory { return transport; } + /** + * Returns true if the inactivity monitor should be used on the transport + */ + protected boolean isUseInactivityMonitor(Transport transport) { + return true; + } + protected Transport createTransport(URI location,WireFormat wf) throws UnknownHostException,IOException{ URI localLocation=null; String path=location.getPath(); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/Main.java b/activemq-core/src/test/java/org/apache/activemq/broker/Main.java index ba3781662c..f26d650b7b 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/Main.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/Main.java @@ -20,6 +20,7 @@ package org.apache.activemq.broker; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.ManagementContext; import org.apache.activemq.broker.util.UDPTraceBrokerPlugin; import org.apache.activemq.broker.view.ConnectionDotFilePlugin; import org.apache.activemq.broker.view.DestinationDotFilePlugin; @@ -52,8 +53,15 @@ public class Main { // URI(brokerURI)); BrokerService broker = new BrokerService(); broker.setPersistent(false); + + // for running on Java 5 without mx4j + ManagementContext managementContext = broker.getManagementContext(); + managementContext.setFindTigerMbeanServer(true); + managementContext.setUseMBeanServer(true); + managementContext.setCreateConnector(false); + broker.setUseJmx(true); - broker.setPlugins(new BrokerPlugin[] { new ConnectionDotFilePlugin(), new UDPTraceBrokerPlugin() }); + //broker.setPlugins(new BrokerPlugin[] { new ConnectionDotFilePlugin(), new UDPTraceBrokerPlugin() }); broker.addConnector("tcp://localhost:61616"); broker.addConnector("stomp://localhost:61613"); broker.start(); diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java index 8ca159f8a7..cabcf6a851 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java @@ -19,14 +19,13 @@ package org.apache.activemq.transport.stomp; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.*; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.transport.stomp.Stomp; import javax.jms.*; - +import javax.jms.Connection; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -64,8 +63,6 @@ public class StompTest extends CombinationTestSupport { session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); queue = new ActiveMQQueue(getQueueName()); connection.start(); - - } protected Socket createSocket(URI connectUri) throws IOException { @@ -78,7 +75,9 @@ public class StompTest extends CombinationTestSupport { protected void tearDown() throws Exception { connection.close(); - stompSocket.close(); + if (stompSocket != null) { + stompSocket.close(); + } broker.stop(); } @@ -681,6 +680,37 @@ public class StompTest extends CombinationTestSupport { assertEquals("second message", message.getText().trim()); } + public void testDisconnectedClientsAreRemovedFromTheBroker() throws Exception { + assertClients(1); + String frame = + "CONNECT\n" + + "login: brianm\n" + + "passcode: wombats\n\n"+ + Stomp.NULL; + + sendFrame(frame); + + // This test case is currently failing + waitForFrameToTakeEffect(); + + assertClients(2); + + // now lets kill the socket + stompSocket.close(); + stompSocket = null; + + Thread.sleep(2000); + + assertClients(1); + } + + protected void assertClients(int expected) throws Exception { + org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients(); + int actual = clients.length; + + assertEquals("Number of clients", expected, actual); + } + protected void waitForFrameToTakeEffect() throws InterruptedException { // bit of a dirty hack :) // another option would be to force some kind of receipt to be returned