diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java index e8ac7403ed..9ceb09605e 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java @@ -138,11 +138,12 @@ public abstract class InboundTransformer { if (ma != null) { for (Map.Entry entry : ma.getValue().entrySet()) { String key = entry.getKey().toString(); - if ("x-opt-jms-type".equals(key.toString()) && entry.getValue() != null) { + if ("x-opt-jms-type".equals(key) && entry.getValue() != null) { + // Legacy annotation, JMSType value will be replaced by Subject further down if also present. jms.setJMSType(entry.getValue().toString()); - } else { - setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue()); } + + setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue()); } } @@ -175,7 +176,7 @@ public abstract class InboundTransformer { jms.setJMSDestination(vendor.createDestination(properties.getTo())); } if (properties.getSubject() != null) { - jms.setStringProperty(prefixVendor + "Subject", properties.getSubject()); + jms.setJMSType(properties.getSubject()); } if (properties.getReplyTo() != null) { jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo())); diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java index 1a837aed80..de1bbda566 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/JMSMappingOutboundTransformer.java @@ -153,10 +153,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false); header.setPriority(new UnsignedByte((byte) msg.getJMSPriority())); if (msg.getJMSType() != null) { - if (maMap == null) { - maMap = new HashMap(); - } - maMap.put(Symbol.valueOf("x-opt-jms-type"), msg.getJMSType()); + props.setSubject(msg.getJMSType()); } if (msg.getJMSMessageID() != null) { props.setMessageId(msg.getJMSMessageID()); @@ -234,8 +231,6 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { } String name = key.substring(prefixMessageAnnotationsKey.length()); maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key)); - } else if (key.equals(subjectKey)) { - props.setSubject(msg.getStringProperty(key)); } else if (key.equals(contentTypeKey)) { props.setContentType(Symbol.getSymbol(msg.getStringProperty(key))); } else if (key.equals(contentEncodingKey)) { diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java index 61749d15e8..1d28a07b9d 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/OutboundTransformer.java @@ -32,7 +32,6 @@ public abstract class OutboundTransformer { String firstAcquirerKey; String prefixDeliveryAnnotationsKey; String prefixMessageAnnotationsKey; - String subjectKey; String contentTypeKey; String contentEncodingKey; String replyToGroupIDKey; @@ -57,7 +56,6 @@ public abstract class OutboundTransformer { firstAcquirerKey = prefixVendor + "FirstAcquirer"; prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations; prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations; - subjectKey = prefixVendor +"Subject"; contentTypeKey = prefixVendor +"ContentType"; contentEncodingKey = prefixVendor +"ContentEncoding"; replyToGroupIDKey = prefixVendor +"ReplyToGroupID"; diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java index 552d8287ae..6799a83df7 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java @@ -322,6 +322,47 @@ public class JMSClientTest extends JMSClientTestSupport { } } + @SuppressWarnings("rawtypes") + @Test(timeout=30000) + public void testSelectorsWithJMSType() throws Exception{ + ActiveMQAdmin.enableJMSFrameTracing(); + + connection = createConnection(); + { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + MessageProducer p = session.createProducer(queue); + + TextMessage message = session.createTextMessage(); + message.setText("text"); + p.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + + TextMessage message2 = session.createTextMessage(); + String type = "myJMSType"; + message2.setJMSType(type); + message2.setText("text + type"); + p.send(message2, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + + QueueBrowser browser = session.createBrowser(queue); + Enumeration enumeration = browser.getEnumeration(); + int count = 0; + while (enumeration.hasMoreElements()) { + Message m = (Message) enumeration.nextElement(); + assertTrue(m instanceof TextMessage); + count ++; + } + + assertEquals(2, count); + + MessageConsumer consumer = session.createConsumer(queue, "JMSType = '"+ type +"'"); + Message msg = consumer.receive(TestConfig.TIMEOUT); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals("Unexpected JMSType value", type, msg.getJMSType()); + assertEquals("Unexpected message content", "text + type", ((TextMessage) msg).getText()); + } + } + abstract class Testable implements Runnable { protected String msg; synchronized boolean passed() {