diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index f129e8fc08..4004d3f1af 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -256,7 +256,7 @@ public class ProtocolConverter { checkConnected(); - if (this.version.equals(Stomp.V1_1)) { + if (this.version.equals(Stomp.V1_0)) { throw new ProtocolException("NACK received but connection is in v1.0 mode."); } diff --git a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java index 6389f97cfe..5f2f0a930c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.transport.stomp; +import org.apache.activemq.command.*; + +import javax.jms.JMSException; import java.io.IOException; import java.util.Iterator; import java.util.LinkedHashMap; @@ -23,17 +26,6 @@ import java.util.LinkedList; import java.util.Map; import java.util.Map.Entry; -import javax.jms.JMSException; - -import org.apache.activemq.command.ActiveMQBytesMessage; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageDispatch; -import org.apache.activemq.command.MessageId; -import org.apache.activemq.command.TransactionId; - /** * Keeps track of the STOMP subscription so that acking is correctly done. * @@ -197,7 +189,7 @@ public class StompSubscription { } dispatchedMessage.remove(msgId); - return null; + return ack; } public String getAckMode() { diff --git a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java index 4c90470c24..c61347bfb6 100644 --- a/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java +++ b/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java @@ -16,20 +16,6 @@ */ package org.apache.activemq.transport.stomp; -import java.io.DataInputStream; -import java.io.IOException; -import java.net.Socket; -import java.net.SocketTimeoutException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.UnknownHostException; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerFactory; @@ -39,6 +25,12 @@ import org.apache.activemq.command.ActiveMQTextMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.jms.*; +import java.io.DataInputStream; +import java.io.IOException; +import java.net.*; +import java.util.concurrent.TimeUnit; + public class Stomp11Test extends CombinationTestSupport { private static final Logger LOG = LoggerFactory.getLogger(StompTest.class); @@ -650,4 +642,60 @@ public class Stomp11Test extends CombinationTestSupport { stompConnection.sendFrame(frame); } + public void testNackMessage() throws Exception { + + String connectFrame = "STOMP\n" + + "login: system\n" + + "passcode: manager\n" + + "accept-version:1.1\n" + + "host:localhost\n" + + "\n" + Stomp.NULL; + stompConnection.sendFrame(connectFrame); + + String f = stompConnection.receiveFrame(); + LOG.debug("Broker sent: " + f); + + assertTrue(f.startsWith("CONNECTED")); + + String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\npersistent:true\n\n" + "Hello World" + Stomp.NULL; + + stompConnection.sendFrame(message); + + String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "id:12345\n" + "ack:client\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + StompFrame received = stompConnection.receive(); + assertTrue(received.getAction().equals("MESSAGE")); + + // nack it + frame = "NACK\n" + "subscription:12345\n" + "message-id:" + + received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + + "id:12345\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + //consume it from dlq + + frame = "SUBSCRIBE\n" + "destination:/queue/ActiveMQ.DLQ\n" + + "id:12345\n" + "ack:client\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + StompFrame receivedDLQ = stompConnection.receive(200); + assertEquals(receivedDLQ.getHeaders().get("message-id"), received.getHeaders().get("message-id")); + + frame = "ACK\n" + "subscription:12345\n" + "message-id:" + + received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + frame = "UNSUBSCRIBE\n" + "destination:/queue/ActiveMQ.DLQ\n" + + "id:12345\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + + + frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL; + stompConnection.sendFrame(frame); + } + }