diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index bd923fc4a3..961127e25f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -44,6 +44,7 @@ import org.apache.activemq.command.CommandTypes; import org.apache.activemq.command.ConnectionError; import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerControl; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.DestinationInfo; @@ -485,13 +486,13 @@ public class ProtocolConverter { throw new ProtocolException("SUBSCRIBE received without a subscription id!"); } - ActiveMQDestination actualDest = translator.convertDestination(this, destination, true); + final ActiveMQDestination actualDest = translator.convertDestination(this, destination, true); if (actualDest == null) { throw new ProtocolException("Invalid Destination."); } - ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); + final ConsumerId id = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId()); ConsumerInfo consumerInfo = new ConsumerInfo(id); consumerInfo.setPrefetchSize(1000); consumerInfo.setDispatchAsync(true); @@ -540,9 +541,45 @@ public class ProtocolConverter { subscriptions.put(subscriptionId, stompSubscription); } - // dispatch can beat the receipt so send it early - sendReceipt(command); - sendToActiveMQ(consumerInfo, null); + final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); + if (receiptId != null && consumerInfo.getPrefetchSize() > 0) { + + final StompFrame cmd = command; + final int prefetch = consumerInfo.getPrefetchSize(); + + // Since dispatch could beat the receipt we set prefetch to zero to start and then + // once we've sent our Receipt we are safe to turn on dispatch if the response isn't + // an error message. + consumerInfo.setPrefetchSize(0); + + final ResponseHandler handler = new ResponseHandler() { + public void onResponse(ProtocolConverter converter, Response response) throws IOException { + if (response.isException()) { + // Generally a command can fail.. but that does not invalidate the connection. + // We report back the failure but we don't close the connection. + Throwable exception = ((ExceptionResponse)response).getException(); + handleException(exception, cmd); + } else { + StompFrame sc = new StompFrame(); + sc.setAction(Stomp.Responses.RECEIPT); + sc.setHeaders(new HashMap(1)); + sc.getHeaders().put(Stomp.Headers.Response.RECEIPT_ID, receiptId); + stompTransport.sendToStomp(sc); + + ConsumerControl control = new ConsumerControl(); + control.setPrefetch(prefetch); + control.setDestination(actualDest); + control.setConsumerId(id); + + sendToActiveMQ(control, null); + } + } + }; + + sendToActiveMQ(consumerInfo, handler); + } else { + sendToActiveMQ(consumerInfo, createResponseHandler(command)); + } } protected void onStompUnsubscribe(StompFrame command) throws ProtocolException { diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java index c9f478536d..20195010ca 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java @@ -194,7 +194,6 @@ public class StompTest extends CombinationTestSupport { String f = stompConnection.receiveFrame(); assertTrue(f.startsWith("CONNECTED")); assertTrue(f.indexOf("response-id:1") >= 0); - } public void testSendMessage() throws Exception { @@ -383,7 +382,6 @@ public class StompTest extends CombinationTestSupport { stompConnection.sendFrame(frame); } - public void testSubscriptionReceipts() throws Exception { final int done = 500; int count = 0; @@ -392,7 +390,6 @@ public class StompTest extends CombinationTestSupport { URI connectUri = new URI(bindAddress); do { - StompConnection sender = new StompConnection(); sender.open(createSocket(connectUri)); String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL; @@ -434,7 +431,6 @@ public class StompTest extends CombinationTestSupport { receiver.disconnect(); } while (count < done); - } public void testSubscribeWithAutoAck() throws Exception { @@ -570,7 +566,6 @@ public class StompTest extends CombinationTestSupport { frame = stompConnection.receiveFrame(); assertTrue(frame.startsWith("MESSAGE")); - frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); } @@ -756,7 +751,6 @@ public class StompTest extends CombinationTestSupport { sendMessage(getName()); StompFrame msg = stompConnection.receive(); - assertTrue(msg.getAction().equals("MESSAGE")); HashMap ackHeaders = new HashMap(); @@ -938,9 +932,43 @@ public class StompTest extends CombinationTestSupport { frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL; stompConnection.sendFrame(frame); - String f = stompConnection.receiveFrame(); - assertTrue(f.startsWith("ERROR")); + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("ERROR")); + } + public void testSubscribeWithReceiptNotAuthorized() throws Exception { + + String frame = "CONNECT\n" + "login:guest\n" + "passcode:password\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + + "ack:auto\n" + "receipt:1\n" + "\n" + Stomp.NULL; + + stompConnection.sendFrame(frame); + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("ERROR")); + assertTrue("Error Frame did not contain receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0); + } + + public void testSubscribeWithInvalidSelector() throws Exception { + + String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("CONNECTED")); + + frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "selector:foo.bar = 1\n" + "ack:auto\n\n" + Stomp.NULL; + + stompConnection.sendFrame(frame); + frame = stompConnection.receiveFrame(); + assertTrue(frame.startsWith("ERROR")); + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); } public void testTransformationUnknownTranslator() throws Exception { @@ -1991,7 +2019,6 @@ public class StompTest extends CombinationTestSupport { assertTrue("Receipt Frame: " + frame, frame.trim().startsWith("RECEIPT")); assertTrue("Receipt contains correct receipt-id " + frame, frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0); - // The subscription should receive a response with the ReplyTo property set. StompFrame received = responder.receive(); assertNotNull(received);