Fix some issues with STOMP v1.2 protocol support.
This commit is contained in:
Timothy Bish 2015-03-02 16:12:44 -05:00
parent bb83bf5746
commit 4fe2bd534a
2 changed files with 106 additions and 7 deletions

View File

@ -557,7 +557,7 @@ public class ProtocolConverter {
String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID); String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION); String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
if (this.version.equals(Stomp.V1_1) && subscriptionId == null) { if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) {
throw new ProtocolException("SUBSCRIBE received without a subscription id!"); throw new ProtocolException("SUBSCRIBE received without a subscription id!");
} }
@ -675,7 +675,7 @@ public class ProtocolConverter {
} }
String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID); String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
if (this.version.equals(Stomp.V1_1) && subscriptionId == null) { if (!this.version.equals(Stomp.V1_0) && subscriptionId == null) {
throw new ProtocolException("UNSUBSCRIBE received without a subscription id!"); throw new ProtocolException("UNSUBSCRIBE received without a subscription id!");
} }
@ -686,7 +686,7 @@ public class ProtocolConverter {
// check if it is a durable subscription // check if it is a durable subscription
String durable = command.getHeaders().get("activemq.subscriptionName"); String durable = command.getHeaders().get("activemq.subscriptionName");
String clientId = durable; String clientId = durable;
if (this.version.equals(Stomp.V1_1)) { if (!this.version.equals(Stomp.V1_0)) {
clientId = connectionInfo.getClientId(); clientId = connectionInfo.getClientId();
} }

View File

@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import javax.jms.Connection; import javax.jms.Connection;
import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.junit.Test; import org.junit.Test;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -319,9 +320,13 @@ public class Stomp12Test extends StompTestSupport {
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
try { Wait.waitFor(new Wait.Condition() {
Thread.sleep(400);
} catch (InterruptedException e){} @Override
public boolean isSatisified() throws Exception {
return getProxyToBroker().getCurrentConnectionsCount() <= 1;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
// reconnect and send some messages to the offline subscribers and then try to get // reconnect and send some messages to the offline subscribers and then try to get
// them after subscribing again. // them after subscribing again.
@ -411,7 +416,7 @@ public class Stomp12Test extends StompTestSupport {
assertTrue(browseDone.getHeaders().get(Stomp.Headers.Message.DESTINATION) != null); assertTrue(browseDone.getHeaders().get(Stomp.Headers.Message.DESTINATION) != null);
String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"receipt:1" + "id:12345\n\n" + Stomp.NULL; "receipt:1\n" + "id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(unsub); stompConnection.sendFrame(unsub);
StompFrame stompFrame = stompConnection.receive(); StompFrame stompFrame = stompConnection.receive();
@ -466,4 +471,98 @@ public class Stomp12Test extends StompTestSupport {
String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame); stompConnection.sendFrame(frame);
} }
@Test(timeout = 60000)
public void testDurableSubAndUnSub() throws Exception {
BrokerViewMBean view = getProxyToBroker();
String connectFrame = "STOMP\n" +
"login:system\n" +
"passcode:manager\n" +
"accept-version:1.2\n" +
"host:localhost\n" +
"client-id:durableSubTest\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String frame = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + frame);
assertTrue(frame.startsWith("CONNECTED"));
assertEquals(view.getDurableTopicSubscribers().length, 0);
// subscribe to destination durably
frame = "SUBSCRIBE\n" +
"destination:/topic/" + getQueueName() + "1" + "\n" +
"ack:auto\n" + "receipt:1\n" + "id:durablesub-1\n" +
"activemq.subscriptionName:test1\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
StompFrame receipt = stompConnection.receive();
LOG.debug("Broker sent: " + receipt);
assertTrue(receipt.getAction().startsWith("RECEIPT"));
assertEquals("1", receipt.getHeaders().get("receipt-id"));
assertEquals(view.getDurableTopicSubscribers().length, 1);
frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
stompConnection.close();
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToBroker().getCurrentConnectionsCount() <= 1;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
stompConnect();
stompConnection.sendFrame(connectFrame);
frame = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + frame);
assertTrue(frame.startsWith("CONNECTED"));
assertEquals(view.getDurableTopicSubscribers().length, 0);
assertEquals(view.getInactiveDurableTopicSubscribers().length, 1);
// unsubscribe from topic
frame = "UNSUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "1\n" +
"id:durablesub-1\n" + "receipt:3\n" +
"activemq.subscriptionName:test1\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
receipt = stompConnection.receive();
LOG.debug("Broker sent: " + frame);
assertTrue(receipt.getAction().startsWith("RECEIPT"));
assertEquals("3", receipt.getHeaders().get("receipt-id"));
assertEquals(view.getInactiveDurableTopicSubscribers().length, 0);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
public void testSubscribeWithNoId() throws Exception {
String connectFrame = "STOMP\n" +
"login:system\n" +
"passcode:manager\n" +
"accept-version:1.2\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("ERROR"));
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
} }