diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index 515a0f0a13..f63a85b449 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -45,6 +45,7 @@ import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionId; import org.apache.activemq.command.SessionInfo; @@ -412,6 +413,17 @@ public class ProtocolConverter { if (subscriptionId == null && destination == null) { throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from"); } + + // check if it is a durable subscription + String durable = command.getHeaders().get("activemq.subscriptionName"); + if (durable != null) { + RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); + info.setClientId(durable); + info.setSubscriptionName(durable); + info.setConnectionId(connectionId); + sendToActiveMQ(info, createResponseHandler(command)); + return; + } // TODO: Unsubscribing using a destination is a bit wierd if multiple // subscriptions @@ -426,7 +438,7 @@ public class ProtocolConverter { return; } } - + throw new ProtocolException("No subscription matched."); } diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java index b5364724d7..8dcfd6b417 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java @@ -35,11 +35,15 @@ import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; +import javax.management.MBeanServer; +import javax.management.MBeanServerInvocationHandler; +import javax.management.ObjectName; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.commons.logging.Log; @@ -857,6 +861,58 @@ public class StompTest extends CombinationTestSupport { stompConnection.sendFrame(frame); } + public void testDurableUnsub() throws Exception { + // get broker JMX view + MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer(); + + String domain = "org.apache.activemq"; + ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost"); + + BrokerViewMBean view = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true); + + // connect + String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\nclient-id:test\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + assertEquals(view.getDurableTopicSubscribers().length, 0); + + // subscribe + frame = "SUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "\n" + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + // wait a bit for MBean to get refreshed + try { + Thread.sleep(100); + } catch (InterruptedException e){} + + assertEquals(view.getDurableTopicSubscribers().length, 1); + // disconnect + frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + try { + Thread.sleep(100); + } catch (InterruptedException e){} + + //reconnect + stompConnect(); + // connect + frame = "CONNECT\n" + "login: system\n" + "passcode: manager\nclient-id:test\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + // unsubscribe + frame = "UNSUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "\n" + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + try { + Thread.sleep(100); + } catch (InterruptedException e){} + assertEquals(view.getDurableTopicSubscribers().length, 0); + } + protected void assertClients(int expected) throws Exception { org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients(); int actual = clients.length;