fix for AMQ-1134 so that stomp connections are cleared up by the broker if a stomp client is killed without disconnecting properly

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@497898 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2007-01-19 18:36:41 +00:00
parent 6e26195cd8
commit 0aac7f5d5c
5 changed files with 64 additions and 8 deletions
activemq-core/src
main/java/org/apache/activemq/transport
test/java/org/apache/activemq
broker
transport/stomp

View File

@ -38,4 +38,9 @@ public class StompTransportFactory extends TcpTransportFactory {
transport = new StompTransportFilter(transport, new LegacyFrameTranslator()); transport = new StompTransportFilter(transport, new LegacyFrameTranslator());
return super.compositeConfigure(transport, format, options); return super.compositeConfigure(transport, format, options);
} }
protected boolean isUseInactivityMonitor(Transport transport) {
// lets disable the inactivity monitor as stomp does not use keep alive packets
return false;
}
} }

View File

@ -356,6 +356,10 @@ public class TcpTransport extends TransportThreadSupport implements Transport, S
} }
protected void doStop(ServiceStopper stopper) throws Exception { protected void doStop(ServiceStopper stopper) throws Exception {
if (log.isDebugEnabled()) {
log.debug("Stopping transport " + this);
}
// Closing the streams flush the sockets before closing.. if the socket // Closing the streams flush the sockets before closing.. if the socket
// is hung.. then this hangs the close. // is hung.. then this hangs the close.
// closeStreams(); // closeStreams();

View File

@ -87,7 +87,9 @@ public class TcpTransportFactory extends TransportFactory {
transport = new TransportLogger(transport); transport = new TransportLogger(transport);
} }
transport = new InactivityMonitor(transport); if (isUseInactivityMonitor(transport)) {
transport = new InactivityMonitor(transport);
}
// Only need the WireFormatNegotiator if using openwire // Only need the WireFormatNegotiator if using openwire
if( format instanceof OpenWireFormat ) { if( format instanceof OpenWireFormat ) {
@ -97,6 +99,13 @@ public class TcpTransportFactory extends TransportFactory {
return transport; return transport;
} }
/**
* Returns true if the inactivity monitor should be used on the transport
*/
protected boolean isUseInactivityMonitor(Transport transport) {
return true;
}
protected Transport createTransport(URI location,WireFormat wf) throws UnknownHostException,IOException{ protected Transport createTransport(URI location,WireFormat wf) throws UnknownHostException,IOException{
URI localLocation=null; URI localLocation=null;
String path=location.getPath(); String path=location.getPath();

View File

@ -20,6 +20,7 @@ package org.apache.activemq.broker;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.util.UDPTraceBrokerPlugin; import org.apache.activemq.broker.util.UDPTraceBrokerPlugin;
import org.apache.activemq.broker.view.ConnectionDotFilePlugin; import org.apache.activemq.broker.view.ConnectionDotFilePlugin;
import org.apache.activemq.broker.view.DestinationDotFilePlugin; import org.apache.activemq.broker.view.DestinationDotFilePlugin;
@ -52,8 +53,15 @@ public class Main {
// URI(brokerURI)); // URI(brokerURI));
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
broker.setPersistent(false); broker.setPersistent(false);
// for running on Java 5 without mx4j
ManagementContext managementContext = broker.getManagementContext();
managementContext.setFindTigerMbeanServer(true);
managementContext.setUseMBeanServer(true);
managementContext.setCreateConnector(false);
broker.setUseJmx(true); broker.setUseJmx(true);
broker.setPlugins(new BrokerPlugin[] { new ConnectionDotFilePlugin(), new UDPTraceBrokerPlugin() }); //broker.setPlugins(new BrokerPlugin[] { new ConnectionDotFilePlugin(), new UDPTraceBrokerPlugin() });
broker.addConnector("tcp://localhost:61616"); broker.addConnector("tcp://localhost:61616");
broker.addConnector("stomp://localhost:61613"); broker.addConnector("stomp://localhost:61613");
broker.start(); broker.start();

View File

@ -19,14 +19,13 @@ package org.apache.activemq.transport.stomp;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.*;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.transport.stomp.Stomp; import org.apache.activemq.transport.stomp.Stomp;
import javax.jms.*; import javax.jms.*;
import javax.jms.Connection;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -64,8 +63,6 @@ public class StompTest extends CombinationTestSupport {
session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
queue = new ActiveMQQueue(getQueueName()); queue = new ActiveMQQueue(getQueueName());
connection.start(); connection.start();
} }
protected Socket createSocket(URI connectUri) throws IOException { protected Socket createSocket(URI connectUri) throws IOException {
@ -78,7 +75,9 @@ public class StompTest extends CombinationTestSupport {
protected void tearDown() throws Exception { protected void tearDown() throws Exception {
connection.close(); connection.close();
stompSocket.close(); if (stompSocket != null) {
stompSocket.close();
}
broker.stop(); broker.stop();
} }
@ -681,6 +680,37 @@ public class StompTest extends CombinationTestSupport {
assertEquals("second message", message.getText().trim()); assertEquals("second message", message.getText().trim());
} }
public void testDisconnectedClientsAreRemovedFromTheBroker() throws Exception {
assertClients(1);
String frame =
"CONNECT\n" +
"login: brianm\n" +
"passcode: wombats\n\n"+
Stomp.NULL;
sendFrame(frame);
// This test case is currently failing
waitForFrameToTakeEffect();
assertClients(2);
// now lets kill the socket
stompSocket.close();
stompSocket = null;
Thread.sleep(2000);
assertClients(1);
}
protected void assertClients(int expected) throws Exception {
org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
int actual = clients.length;
assertEquals("Number of clients", expected, actual);
}
protected void waitForFrameToTakeEffect() throws InterruptedException { protected void waitForFrameToTakeEffect() throws InterruptedException {
// bit of a dirty hack :) // bit of a dirty hack :)
// another option would be to force some kind of receipt to be returned // another option would be to force some kind of receipt to be returned