mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3481 - stomp deadlock
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1163940 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
cf914d16af
commit
603b215841
|
@ -538,7 +538,9 @@ public class ProtocolConverter {
|
||||||
if (subscriptionId != null) {
|
if (subscriptionId != null) {
|
||||||
subscriptions.put(subscriptionId, stompSubscription);
|
subscriptions.put(subscriptionId, stompSubscription);
|
||||||
}
|
}
|
||||||
sendToActiveMQ(consumerInfo, createResponseHandler(command));
|
|
||||||
|
sendToActiveMQ(consumerInfo, null);
|
||||||
|
sendReceipt(command);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
|
protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
|
||||||
|
@ -840,4 +842,19 @@ public class ProtocolConverter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void sendReceipt(StompFrame command) {
|
||||||
|
final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
|
||||||
|
if (receiptId != null) {
|
||||||
|
StompFrame sc = new StompFrame();
|
||||||
|
sc.setAction(Stomp.Responses.RECEIPT);
|
||||||
|
sc.setHeaders(new HashMap<String, String>(1));
|
||||||
|
sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId);
|
||||||
|
try {
|
||||||
|
sendToStomp(sc);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Could not send a receipt for " + command, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1559,6 +1559,44 @@ public class StompTest extends CombinationTestSupport {
|
||||||
assertNotNull(stompMessage);
|
assertNotNull(stompMessage);
|
||||||
assertNull(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT));
|
assertNull(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testReceiptNewQueue() throws Exception {
|
||||||
|
|
||||||
|
String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
frame = stompConnection.receiveFrame();
|
||||||
|
assertTrue(frame.startsWith("CONNECTED"));
|
||||||
|
|
||||||
|
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 1234 + "\n" + "id:8fee4b8-4e5c9f66-4703-e936-3" + "\n" + "receipt:8fee4b8-4e5c9f66-4703-e936-2" + "\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
StompFrame receipt = stompConnection.receive();
|
||||||
|
assertTrue(receipt.getAction().startsWith("RECEIPT"));
|
||||||
|
assertEquals("8fee4b8-4e5c9f66-4703-e936-2", receipt.getHeaders().get("receipt-id"));
|
||||||
|
|
||||||
|
|
||||||
|
frame = "SEND\n destination:/queue/" + getQueueName() + 123 + "\ncontent-length:0" + " \n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
|
||||||
|
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 123 + "\n" + "id:8fee4b8-4e5c9f66-4703-e936-2" + "\n" + "receipt:8fee4b8-4e5c9f66-4703-e936-1" + "\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
|
||||||
|
receipt = stompConnection.receive();
|
||||||
|
assertTrue(receipt.getAction().startsWith("RECEIPT"));
|
||||||
|
assertEquals("8fee4b8-4e5c9f66-4703-e936-1", receipt.getHeaders().get("receipt-id"));
|
||||||
|
|
||||||
|
StompFrame message = stompConnection.receive();
|
||||||
|
assertTrue(message.getAction().startsWith("MESSAGE"));
|
||||||
|
|
||||||
|
String length = message.getHeaders().get("content-length");
|
||||||
|
assertEquals("0", length);
|
||||||
|
assertEquals(0, message.getContent().length);
|
||||||
|
|
||||||
|
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
|
||||||
|
stompConnection.sendFrame(frame);
|
||||||
|
}
|
||||||
|
|
||||||
protected void assertClients(int expected) throws Exception {
|
protected void assertClients(int expected) throws Exception {
|
||||||
org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
|
org.apache.activemq.broker.Connection[] clients = broker.getBroker().getClients();
|
||||||
|
|
Loading…
Reference in New Issue