mirror of https://github.com/apache/activemq.git
Added patch for https://issues.apache.org/activemq/browse/AMQ-1890
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@692181 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
eed43a8f05
commit
4c92244758
|
@ -45,6 +45,7 @@ import org.apache.activemq.command.MessageDispatch;
|
||||||
import org.apache.activemq.command.MessageId;
|
import org.apache.activemq.command.MessageId;
|
||||||
import org.apache.activemq.command.ProducerId;
|
import org.apache.activemq.command.ProducerId;
|
||||||
import org.apache.activemq.command.ProducerInfo;
|
import org.apache.activemq.command.ProducerInfo;
|
||||||
|
import org.apache.activemq.command.RemoveSubscriptionInfo;
|
||||||
import org.apache.activemq.command.Response;
|
import org.apache.activemq.command.Response;
|
||||||
import org.apache.activemq.command.SessionId;
|
import org.apache.activemq.command.SessionId;
|
||||||
import org.apache.activemq.command.SessionInfo;
|
import org.apache.activemq.command.SessionInfo;
|
||||||
|
@ -412,6 +413,17 @@ public class ProtocolConverter {
|
||||||
if (subscriptionId == null && destination == null) {
|
if (subscriptionId == null && destination == null) {
|
||||||
throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
|
throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check if it is a durable subscription
|
||||||
|
String durable = command.getHeaders().get("activemq.subscriptionName");
|
||||||
|
if (durable != null) {
|
||||||
|
RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
|
||||||
|
info.setClientId(durable);
|
||||||
|
info.setSubscriptionName(durable);
|
||||||
|
info.setConnectionId(connectionId);
|
||||||
|
sendToActiveMQ(info, createResponseHandler(command));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: Unsubscribing using a destination is a bit wierd if multiple
|
// TODO: Unsubscribing using a destination is a bit wierd if multiple
|
||||||
// subscriptions
|
// subscriptions
|
||||||
|
@ -426,7 +438,7 @@ public class ProtocolConverter {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
throw new ProtocolException("No subscription matched.");
|
throw new ProtocolException("No subscription matched.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,11 +35,15 @@ import javax.jms.MessageProducer;
|
||||||
import javax.jms.ObjectMessage;
|
import javax.jms.ObjectMessage;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import javax.jms.TextMessage;
|
import javax.jms.TextMessage;
|
||||||
|
import javax.management.MBeanServer;
|
||||||
|
import javax.management.MBeanServerInvocationHandler;
|
||||||
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.CombinationTestSupport;
|
import org.apache.activemq.CombinationTestSupport;
|
||||||
import org.apache.activemq.broker.BrokerFactory;
|
import org.apache.activemq.broker.BrokerFactory;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.jmx.BrokerViewMBean;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -857,6 +861,58 @@ public class StompTest extends CombinationTestSupport {
|
||||||
stompConnection.sendFrame(frame);
|
stompConnection.sendFrame(frame);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testDurableUnsub() throws Exception {
|
||||||
|
// get broker JMX view
|
||||||
|
MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer();
|
||||||
|
|
||||||
|
String domain = "org.apache.activemq";
|
||||||
|
ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost");
|
||||||
|
|
||||||
|
BrokerViewMBean view = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
|
||||||
|
|
||||||
|
// connect
|
||||||
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\nclient-id:test\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
frame = stompConnection.receiveFrame();
|
||||||
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
assertEquals(view.getDurableTopicSubscribers().length, 0);
|
||||||
|
|
||||||
|
// subscribe
|
||||||
|
frame = "SUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "\n" + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
// wait a bit for MBean to get refreshed
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e){}
|
||||||
|
|
||||||
|
assertEquals(view.getDurableTopicSubscribers().length, 1);
|
||||||
|
// disconnect
|
||||||
|
frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e){}
|
||||||
|
|
||||||
|
//reconnect
|
||||||
|
stompConnect();
|
||||||
|
// connect
|
||||||
|
frame = "CONNECT\n" + "login: system\n" + "passcode: manager\nclient-id:test\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
frame = stompConnection.receiveFrame();
|
||||||
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
// unsubscribe
|
||||||
|
frame = "UNSUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "\n" + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e){}
|
||||||
|
assertEquals(view.getDurableTopicSubscribers().length, 0);
|
||||||
|
}
|
||||||
|
|
||||||
protected void assertClients(int expected) throws Exception {
|
protected void assertClients(int expected) throws Exception {
|
||||||
org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
|
org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
|
||||||
int actual = clients.length;
|
int actual = clients.length;
|
||||||
|
|
Loading…
Reference in New Issue