Add new destiantion type annotation on outbound messages through the JMS
transformer.
This commit is contained in:
Timothy Bish 2015-03-06 11:49:30 -05:00
parent 7777744dc2
commit 528c25c6d8
2 changed files with 69 additions and 19 deletions

View File

@ -60,6 +60,24 @@ import org.apache.qpid.proton.message.ProtonJMessage;
public class JMSMappingOutboundTransformer extends OutboundTransformer {
public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest");
public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to");
public static final byte QUEUE_TYPE = 0x00;
public static final byte TOPIC_TYPE = 0x01;
public static final byte TEMP_QUEUE_TYPE = 0x02;
public static final byte TEMP_TOPIC_TYPE = 0x03;
// Deprecated legacy values used by old QPid AMQP 1.0 JMS client.
public static final Symbol LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-to-type");
public static final Symbol LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-reply-type");
public static final String LEGACY_QUEUE_TYPE = "queue";
public static final String LEGACY_TOPIC_TYPE = "topic";
public static final String LEGACY_TEMP_QUEUE_TYPE = "temporary,queue";
public static final String LEGACY_TEMP_TOPIC_TYPE = "temporary,topic";
public JMSMappingOutboundTransformer(JMSVendor vendor) {
super(vendor);
}
@ -163,14 +181,20 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
if (maMap == null) {
maMap = new HashMap<Symbol, Object>();
}
maMap.put(Symbol.valueOf("x-opt-to-type"), destinationAttributes(msg.getJMSDestination()));
maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSDestination()));
// Deprecated: used by legacy QPid AMQP 1.0 JMS client
maMap.put(LEGACY_JMS_DEST_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSDestination()));
}
if (msg.getJMSReplyTo() != null) {
props.setReplyTo(vendor.toAddress(msg.getJMSReplyTo()));
if (maMap == null) {
maMap = new HashMap<Symbol, Object>();
}
maMap.put(Symbol.valueOf("x-opt-reply-type"), destinationAttributes(msg.getJMSReplyTo()));
maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(msg.getJMSReplyTo()));
// Deprecated: used by legacy QPid AMQP 1.0 JMS client
maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSReplyTo()));
}
if (msg.getJMSCorrelationID() != null) {
props.setCorrelationId(msg.getJMSCorrelationID());
@ -271,21 +295,41 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
return (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(header, da, ma, props, ap, body, footer);
}
private static byte destinationType(Destination destination) {
if (destination instanceof Queue) {
if (destination instanceof TemporaryQueue) {
return TEMP_QUEUE_TYPE;
} else {
return QUEUE_TYPE;
}
} else if (destination instanceof Topic) {
if (destination instanceof TemporaryTopic) {
return TEMP_TOPIC_TYPE;
} else {
return TOPIC_TYPE;
}
}
throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
}
// Used by legacy QPid AMQP 1.0 JMS client.
@Deprecated
private static String destinationAttributes(Destination destination) {
if (destination instanceof Queue) {
if (destination instanceof TemporaryQueue) {
return "temporary,queue";
return LEGACY_TEMP_QUEUE_TYPE;
} else {
return "queue";
return LEGACY_QUEUE_TYPE;
}
}
if (destination instanceof Topic) {
} else if (destination instanceof Topic) {
if (destination instanceof TemporaryTopic) {
return "temporary,topic";
return LEGACY_TEMP_TOPIC_TYPE;
} else {
return "topic";
return LEGACY_TOPIC_TYPE;
}
}
return "";
throw new IllegalArgumentException("Unknown Destination Type passed to JMS Transformer.");
}
}

View File

@ -16,6 +16,12 @@
*/
package org.apache.activemq.transport.amqp.message;
import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.JMS_DEST_TYPE_MSG_ANNOTATION;
import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.JMS_REPLY_TO_TYPE_MSG_ANNOTATION;
import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.QUEUE_TYPE;
import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.TEMP_QUEUE_TYPE;
import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.TEMP_TOPIC_TYPE;
import static org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer.TOPIC_TYPE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@ -67,28 +73,28 @@ public class JMSMappingOutboundTransformerTest {
public void testConvertMessageWithJMSDestinationQueue() throws Exception {
Queue mockDest = Mockito.mock(Queue.class);
doTestConvertMessageWithJMSDestination(mockDest, "queue");
doTestConvertMessageWithJMSDestination(mockDest, QUEUE_TYPE);
}
@Test
public void testConvertMessageWithJMSDestinationTemporaryQueue() throws Exception {
TemporaryQueue mockDest = Mockito.mock(TemporaryQueue.class);
doTestConvertMessageWithJMSDestination(mockDest, "temporary,queue");
doTestConvertMessageWithJMSDestination(mockDest, TEMP_QUEUE_TYPE);
}
@Test
public void testConvertMessageWithJMSDestinationTopic() throws Exception {
Topic mockDest = Mockito.mock(Topic.class);
doTestConvertMessageWithJMSDestination(mockDest, "topic");
doTestConvertMessageWithJMSDestination(mockDest, TOPIC_TYPE);
}
@Test
public void testConvertMessageWithJMSDestinationTemporaryTopic() throws Exception {
TemporaryTopic mockDest = Mockito.mock(TemporaryTopic.class);
doTestConvertMessageWithJMSDestination(mockDest, "temporary,topic");
doTestConvertMessageWithJMSDestination(mockDest, TEMP_TOPIC_TYPE);
}
private void doTestConvertMessageWithJMSDestination(Destination jmsDestination, Object expectedAnnotationValue) throws Exception {
@ -108,7 +114,7 @@ public class JMSMappingOutboundTransformerTest {
MessageAnnotations ma = amqp.getMessageAnnotations();
Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
if (maMap != null) {
Object actualValue = maMap.get(Symbol.valueOf("x-opt-to-type"));
Object actualValue = maMap.get(JMS_DEST_TYPE_MSG_ANNOTATION);
assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue);
} else if (expectedAnnotationValue != null) {
fail("Expected annotation value, but there were no annotations");
@ -130,28 +136,28 @@ public class JMSMappingOutboundTransformerTest {
public void testConvertMessageWithJMSReplyToQueue() throws Exception {
Queue mockDest = Mockito.mock(Queue.class);
doTestConvertMessageWithJMSReplyTo(mockDest, "queue");
doTestConvertMessageWithJMSReplyTo(mockDest, QUEUE_TYPE);
}
@Test
public void testConvertMessageWithJMSReplyToTemporaryQueue() throws Exception {
TemporaryQueue mockDest = Mockito.mock(TemporaryQueue.class);
doTestConvertMessageWithJMSReplyTo(mockDest, "temporary,queue");
doTestConvertMessageWithJMSReplyTo(mockDest, TEMP_QUEUE_TYPE);
}
@Test
public void testConvertMessageWithJMSReplyToTopic() throws Exception {
Topic mockDest = Mockito.mock(Topic.class);
doTestConvertMessageWithJMSReplyTo(mockDest, "topic");
doTestConvertMessageWithJMSReplyTo(mockDest, TOPIC_TYPE);
}
@Test
public void testConvertMessageWithJMSReplyToTemporaryTopic() throws Exception {
TemporaryTopic mockDest = Mockito.mock(TemporaryTopic.class);
doTestConvertMessageWithJMSReplyTo(mockDest, "temporary,topic");
doTestConvertMessageWithJMSReplyTo(mockDest, TEMP_TOPIC_TYPE);
}
private void doTestConvertMessageWithJMSReplyTo(Destination jmsReplyTo, Object expectedAnnotationValue) throws Exception {
@ -171,7 +177,7 @@ public class JMSMappingOutboundTransformerTest {
MessageAnnotations ma = amqp.getMessageAnnotations();
Map<Symbol, Object> maMap = ma == null ? null : ma.getValue();
if (maMap != null) {
Object actualValue = maMap.get(Symbol.valueOf("x-opt-reply-type"));
Object actualValue = maMap.get(JMS_REPLY_TO_TYPE_MSG_ANNOTATION);
assertEquals("Unexpected annotation value", expectedAnnotationValue, actualValue);
} else if (expectedAnnotationValue != null) {
fail("Expected annotation value, but there were no annotations");