git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1478976 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-05-03 20:54:31 +00:00
parent bf071376b5
commit 436f892c16
3 changed files with 21 additions and 10 deletions

View File

@ -16,15 +16,16 @@
*/
package org.apache.activemq.transport.stomp;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import javax.jms.Destination;
import javax.jms.JMSException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import javax.jms.Destination;
import javax.jms.JMSException;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
/**
* Implementations of this interface are used to map back and forth from Stomp
* to ActiveMQ. There are several standard mappings which are semantically the
@ -107,6 +108,13 @@ public interface FrameTranslator {
msg.setJMSExpiration(Long.parseLong((String)o));
}
o = headers.remove(Stomp.Headers.Message.TIMESTAMP);
if (o != null) {
msg.setJMSTimestamp(Long.parseLong((String)o));
} else {
msg.setJMSTimestamp(System.currentTimeMillis());
}
o = headers.remove(Stomp.Headers.Send.PRIORITY);
if (o != null) {
msg.setJMSPriority(Integer.parseInt((String)o));
@ -141,7 +149,6 @@ public interface FrameTranslator {
// be sent back to a STOMP consumer we need to sanitize anything which could be in
// Stomp.Headers.Message and might get passed through to the consumer
headers.remove(Stomp.Headers.Message.MESSAGE_ID);
headers.remove(Stomp.Headers.Message.TIMESTAMP);
headers.remove(Stomp.Headers.Message.REDELIVERED);
headers.remove(Stomp.Headers.Message.SUBSCRIPTION);
headers.remove(Stomp.Headers.Message.USERID);

View File

@ -128,8 +128,8 @@ public class ProtocolConverter {
private static class AckEntry {
private String messageId;
private StompSubscription subscription;
private final String messageId;
private final StompSubscription subscription;
public AckEntry(String messageId, StompSubscription subscription) {
this.messageId = messageId;
@ -148,6 +148,7 @@ public class ProtocolConverter {
return this.messageId;
}
@SuppressWarnings("unused")
public StompSubscription getSubscription() {
return this.subscription;
}
@ -168,6 +169,7 @@ public class ProtocolConverter {
final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
if (receiptId != null) {
return new ResponseHandler() {
@Override
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
// Generally a command can fail.. but that does not invalidate the connection.
@ -317,7 +319,6 @@ public class ProtocolConverter {
message.setProducerId(producerId);
MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
message.setMessageId(id);
message.setJMSTimestamp(System.currentTimeMillis());
if (stompTx != null) {
TransactionId activemqTx = transactions.get(stompTx);
@ -634,6 +635,7 @@ public class ProtocolConverter {
consumerInfo.setPrefetchSize(0);
final ResponseHandler handler = new ResponseHandler() {
@Override
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
// Generally a command can fail.. but that does not invalidate the connection.
@ -761,6 +763,7 @@ public class ProtocolConverter {
connectionInfo.setTransportContext(command.getTransportContext());
sendToActiveMQ(connectionInfo, new ResponseHandler() {
@Override
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
@ -776,6 +779,7 @@ public class ProtocolConverter {
final ProducerInfo producerInfo = new ProducerInfo(producerId);
sendToActiveMQ(producerInfo, new ResponseHandler() {
@Override
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {

View File

@ -1843,7 +1843,7 @@ public class StompTest extends StompTestSupport {
assertFalse("Thisisnotallowed".equals(mess_headers.get(Stomp.Headers.Message.MESSAGE_ID)
));
assertFalse("1234".equals(mess_headers.get(Stomp.Headers.Message.TIMESTAMP)));
assertTrue("1234".equals(mess_headers.get(Stomp.Headers.Message.TIMESTAMP)));
assertNull(mess_headers.get(Stomp.Headers.Message.REDELIVERED));
assertNull(mess_headers.get(Stomp.Headers.Message.SUBSCRIPTION));
assertEquals("system", mess_headers.get(Stomp.Headers.Message.USERID));