diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java index 63819513e9..d37d364545 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java @@ -16,15 +16,16 @@ */ package org.apache.activemq.transport.stomp; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; - -import javax.jms.Destination; -import javax.jms.JMSException; import java.io.IOException; import java.util.HashMap; import java.util.Map; +import javax.jms.Destination; +import javax.jms.JMSException; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; + /** * Implementations of this interface are used to map back and forth from Stomp * to ActiveMQ. There are several standard mappings which are semantically the @@ -107,6 +108,13 @@ public interface FrameTranslator { msg.setJMSExpiration(Long.parseLong((String)o)); } + o = headers.remove(Stomp.Headers.Message.TIMESTAMP); + if (o != null) { + msg.setJMSTimestamp(Long.parseLong((String)o)); + } else { + msg.setJMSTimestamp(System.currentTimeMillis()); + } + o = headers.remove(Stomp.Headers.Send.PRIORITY); if (o != null) { msg.setJMSPriority(Integer.parseInt((String)o)); @@ -141,7 +149,6 @@ public interface FrameTranslator { // be sent back to a STOMP consumer we need to sanitize anything which could be in // Stomp.Headers.Message and might get passed through to the consumer headers.remove(Stomp.Headers.Message.MESSAGE_ID); - headers.remove(Stomp.Headers.Message.TIMESTAMP); headers.remove(Stomp.Headers.Message.REDELIVERED); headers.remove(Stomp.Headers.Message.SUBSCRIPTION); headers.remove(Stomp.Headers.Message.USERID); diff --git a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java index f31aad1b61..c0fc66bce5 100644 --- a/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java +++ b/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java @@ -128,8 +128,8 @@ public class ProtocolConverter { private static class AckEntry { - private String messageId; - private StompSubscription subscription; + private final String messageId; + private final StompSubscription subscription; public AckEntry(String messageId, StompSubscription subscription) { this.messageId = messageId; @@ -148,6 +148,7 @@ public class ProtocolConverter { return this.messageId; } + @SuppressWarnings("unused") public StompSubscription getSubscription() { return this.subscription; } @@ -168,6 +169,7 @@ public class ProtocolConverter { final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED); if (receiptId != null) { return new ResponseHandler() { + @Override public void onResponse(ProtocolConverter converter, Response response) throws IOException { if (response.isException()) { // Generally a command can fail.. but that does not invalidate the connection. @@ -317,7 +319,6 @@ public class ProtocolConverter { message.setProducerId(producerId); MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId()); message.setMessageId(id); - message.setJMSTimestamp(System.currentTimeMillis()); if (stompTx != null) { TransactionId activemqTx = transactions.get(stompTx); @@ -634,6 +635,7 @@ public class ProtocolConverter { consumerInfo.setPrefetchSize(0); final ResponseHandler handler = new ResponseHandler() { + @Override public void onResponse(ProtocolConverter converter, Response response) throws IOException { if (response.isException()) { // Generally a command can fail.. but that does not invalidate the connection. @@ -761,6 +763,7 @@ public class ProtocolConverter { connectionInfo.setTransportContext(command.getTransportContext()); sendToActiveMQ(connectionInfo, new ResponseHandler() { + @Override public void onResponse(ProtocolConverter converter, Response response) throws IOException { if (response.isException()) { @@ -776,6 +779,7 @@ public class ProtocolConverter { final ProducerInfo producerInfo = new ProducerInfo(producerId); sendToActiveMQ(producerInfo, new ResponseHandler() { + @Override public void onResponse(ProtocolConverter converter, Response response) throws IOException { if (response.isException()) { diff --git a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java index 0846d59ed8..08ce3694f3 100644 --- a/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java +++ b/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java @@ -1843,7 +1843,7 @@ public class StompTest extends StompTestSupport { assertFalse("Thisisnotallowed".equals(mess_headers.get(Stomp.Headers.Message.MESSAGE_ID) )); - assertFalse("1234".equals(mess_headers.get(Stomp.Headers.Message.TIMESTAMP))); + assertTrue("1234".equals(mess_headers.get(Stomp.Headers.Message.TIMESTAMP))); assertNull(mess_headers.get(Stomp.Headers.Message.REDELIVERED)); assertNull(mess_headers.get(Stomp.Headers.Message.SUBSCRIPTION)); assertEquals("system", mess_headers.get(Stomp.Headers.Message.USERID));