mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3497 - send subscription receipt early such that it cannot be preceeded by a message dispatch
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1170523 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
68a8b6d62a
commit
ba0c1e59b2
|
@ -539,8 +539,9 @@ public class ProtocolConverter {
|
|||
subscriptions.put(subscriptionId, stompSubscription);
|
||||
}
|
||||
|
||||
sendToActiveMQ(consumerInfo, null);
|
||||
// dispatch can beat the receipt so send it early
|
||||
sendReceipt(command);
|
||||
sendToActiveMQ(consumerInfo, null);
|
||||
}
|
||||
|
||||
protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
|
||||
|
|
|
@ -113,6 +113,7 @@ public class StompTest extends CombinationTestSupport {
|
|||
+ "}}";
|
||||
}
|
||||
broker = BrokerFactory.createBroker(new URI(confUri));
|
||||
broker.setDeleteAllMessagesOnStartup(true);
|
||||
broker.start();
|
||||
broker.waitUntilStarted();
|
||||
|
||||
|
@ -375,6 +376,51 @@ public class StompTest extends CombinationTestSupport {
|
|||
stompConnection.sendFrame(frame);
|
||||
}
|
||||
|
||||
|
||||
public void testSubscriptionReceipts() throws Exception {
|
||||
final int done = 500;
|
||||
int count = 0;
|
||||
|
||||
URI connectUri = new URI(bindAddress);
|
||||
|
||||
do {
|
||||
|
||||
StompConnection sender = new StompConnection();
|
||||
sender.open(createSocket(connectUri));
|
||||
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||
sender.sendFrame(frame);
|
||||
|
||||
frame = sender.receiveFrame();
|
||||
assertTrue(frame.startsWith("CONNECTED"));
|
||||
|
||||
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World:" + count + Stomp.NULL;
|
||||
sender.sendFrame(frame);
|
||||
|
||||
sender.disconnect();
|
||||
|
||||
StompConnection receiver = new StompConnection();
|
||||
receiver.open(createSocket(connectUri));
|
||||
|
||||
frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||
receiver.sendFrame(frame);
|
||||
|
||||
frame = receiver.receiveFrame();
|
||||
assertTrue(frame.startsWith("CONNECTED"));
|
||||
|
||||
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt: " + (count++) + "\n\n" + Stomp.NULL;
|
||||
receiver.sendFrame(frame);
|
||||
|
||||
frame = receiver.receiveFrame();
|
||||
assertTrue("" + frame, frame.startsWith("RECEIPT"));
|
||||
assertTrue("Receipt contains receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
|
||||
LOG.info("received: " + frame.substring(frame.indexOf(Stomp.Headers.Response.RECEIPT_ID)));
|
||||
frame = receiver.receiveFrame();
|
||||
assertTrue("" + frame, frame.startsWith("MESSAGE"));
|
||||
receiver.disconnect();
|
||||
} while (count < done);
|
||||
|
||||
}
|
||||
|
||||
public void testSubscribeWithAutoAck() throws Exception {
|
||||
|
||||
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||
|
|
Loading…
Reference in New Issue