AMQ-5637: support mapping between the AMQP Subject field and JMSType header

https://issues.apache.org/jira/browse/AMQ-5637
This commit is contained in:
Robert Gemmell 2015-03-04 18:40:52 +00:00
parent cad1a2a8cd
commit 6b18857b53
4 changed files with 47 additions and 12 deletions

View File

@ -138,11 +138,12 @@ public abstract class InboundTransformer {
if (ma != null) { if (ma != null) {
for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) { for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
String key = entry.getKey().toString(); String key = entry.getKey().toString();
if ("x-opt-jms-type".equals(key.toString()) && entry.getValue() != null) { if ("x-opt-jms-type".equals(key) && entry.getValue() != null) {
// Legacy annotation, JMSType value will be replaced by Subject further down if also present.
jms.setJMSType(entry.getValue().toString()); jms.setJMSType(entry.getValue().toString());
} else {
setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
} }
setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
} }
} }
@ -175,7 +176,7 @@ public abstract class InboundTransformer {
jms.setJMSDestination(vendor.createDestination(properties.getTo())); jms.setJMSDestination(vendor.createDestination(properties.getTo()));
} }
if (properties.getSubject() != null) { if (properties.getSubject() != null) {
jms.setStringProperty(prefixVendor + "Subject", properties.getSubject()); jms.setJMSType(properties.getSubject());
} }
if (properties.getReplyTo() != null) { if (properties.getReplyTo() != null) {
jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo())); jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));

View File

@ -153,10 +153,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false); header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
header.setPriority(new UnsignedByte((byte) msg.getJMSPriority())); header.setPriority(new UnsignedByte((byte) msg.getJMSPriority()));
if (msg.getJMSType() != null) { if (msg.getJMSType() != null) {
if (maMap == null) { props.setSubject(msg.getJMSType());
maMap = new HashMap<Symbol, Object>();
}
maMap.put(Symbol.valueOf("x-opt-jms-type"), msg.getJMSType());
} }
if (msg.getJMSMessageID() != null) { if (msg.getJMSMessageID() != null) {
props.setMessageId(msg.getJMSMessageID()); props.setMessageId(msg.getJMSMessageID());
@ -234,8 +231,6 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
} }
String name = key.substring(prefixMessageAnnotationsKey.length()); String name = key.substring(prefixMessageAnnotationsKey.length());
maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key)); maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
} else if (key.equals(subjectKey)) {
props.setSubject(msg.getStringProperty(key));
} else if (key.equals(contentTypeKey)) { } else if (key.equals(contentTypeKey)) {
props.setContentType(Symbol.getSymbol(msg.getStringProperty(key))); props.setContentType(Symbol.getSymbol(msg.getStringProperty(key)));
} else if (key.equals(contentEncodingKey)) { } else if (key.equals(contentEncodingKey)) {

View File

@ -32,7 +32,6 @@ public abstract class OutboundTransformer {
String firstAcquirerKey; String firstAcquirerKey;
String prefixDeliveryAnnotationsKey; String prefixDeliveryAnnotationsKey;
String prefixMessageAnnotationsKey; String prefixMessageAnnotationsKey;
String subjectKey;
String contentTypeKey; String contentTypeKey;
String contentEncodingKey; String contentEncodingKey;
String replyToGroupIDKey; String replyToGroupIDKey;
@ -57,7 +56,6 @@ public abstract class OutboundTransformer {
firstAcquirerKey = prefixVendor + "FirstAcquirer"; firstAcquirerKey = prefixVendor + "FirstAcquirer";
prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations; prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations;
prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations; prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations;
subjectKey = prefixVendor +"Subject";
contentTypeKey = prefixVendor +"ContentType"; contentTypeKey = prefixVendor +"ContentType";
contentEncodingKey = prefixVendor +"ContentEncoding"; contentEncodingKey = prefixVendor +"ContentEncoding";
replyToGroupIDKey = prefixVendor +"ReplyToGroupID"; replyToGroupIDKey = prefixVendor +"ReplyToGroupID";

View File

@ -322,6 +322,47 @@ public class JMSClientTest extends JMSClientTestSupport {
} }
} }
@SuppressWarnings("rawtypes")
@Test(timeout=30000)
public void testSelectorsWithJMSType() throws Exception{
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
MessageProducer p = session.createProducer(queue);
TextMessage message = session.createTextMessage();
message.setText("text");
p.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
TextMessage message2 = session.createTextMessage();
String type = "myJMSType";
message2.setJMSType(type);
message2.setText("text + type");
p.send(message2, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
QueueBrowser browser = session.createBrowser(queue);
Enumeration enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message m = (Message) enumeration.nextElement();
assertTrue(m instanceof TextMessage);
count ++;
}
assertEquals(2, count);
MessageConsumer consumer = session.createConsumer(queue, "JMSType = '"+ type +"'");
Message msg = consumer.receive(TestConfig.TIMEOUT);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals("Unexpected JMSType value", type, msg.getJMSType());
assertEquals("Unexpected message content", "text + type", ((TextMessage) msg).getText());
}
}
abstract class Testable implements Runnable { abstract class Testable implements Runnable {
protected String msg; protected String msg;
synchronized boolean passed() { synchronized boolean passed() {