mirror of https://github.com/apache/activemq.git
Merge branch 'master' into amqp-sasl-auth
This commit is contained in:
commit
fe9e38622d
|
@ -138,11 +138,12 @@ public abstract class InboundTransformer {
|
|||
if (ma != null) {
|
||||
for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
|
||||
String key = entry.getKey().toString();
|
||||
if ("x-opt-jms-type".equals(key.toString()) && entry.getValue() != null) {
|
||||
if ("x-opt-jms-type".equals(key) && entry.getValue() != null) {
|
||||
// Legacy annotation, JMSType value will be replaced by Subject further down if also present.
|
||||
jms.setJMSType(entry.getValue().toString());
|
||||
} else {
|
||||
setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
|
||||
}
|
||||
|
||||
setProperty(jms, prefixVendor + prefixMessageAnnotations + key, entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -175,7 +176,7 @@ public abstract class InboundTransformer {
|
|||
jms.setJMSDestination(vendor.createDestination(properties.getTo()));
|
||||
}
|
||||
if (properties.getSubject() != null) {
|
||||
jms.setStringProperty(prefixVendor + "Subject", properties.getSubject());
|
||||
jms.setJMSType(properties.getSubject());
|
||||
}
|
||||
if (properties.getReplyTo() != null) {
|
||||
jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
|
||||
|
|
|
@ -153,10 +153,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
|
|||
header.setDurable(msg.getJMSDeliveryMode() == DeliveryMode.PERSISTENT ? true : false);
|
||||
header.setPriority(new UnsignedByte((byte) msg.getJMSPriority()));
|
||||
if (msg.getJMSType() != null) {
|
||||
if (maMap == null) {
|
||||
maMap = new HashMap<Symbol, Object>();
|
||||
}
|
||||
maMap.put(Symbol.valueOf("x-opt-jms-type"), msg.getJMSType());
|
||||
props.setSubject(msg.getJMSType());
|
||||
}
|
||||
if (msg.getJMSMessageID() != null) {
|
||||
props.setMessageId(msg.getJMSMessageID());
|
||||
|
@ -234,8 +231,6 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
|
|||
}
|
||||
String name = key.substring(prefixMessageAnnotationsKey.length());
|
||||
maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key));
|
||||
} else if (key.equals(subjectKey)) {
|
||||
props.setSubject(msg.getStringProperty(key));
|
||||
} else if (key.equals(contentTypeKey)) {
|
||||
props.setContentType(Symbol.getSymbol(msg.getStringProperty(key)));
|
||||
} else if (key.equals(contentEncodingKey)) {
|
||||
|
|
|
@ -32,7 +32,6 @@ public abstract class OutboundTransformer {
|
|||
String firstAcquirerKey;
|
||||
String prefixDeliveryAnnotationsKey;
|
||||
String prefixMessageAnnotationsKey;
|
||||
String subjectKey;
|
||||
String contentTypeKey;
|
||||
String contentEncodingKey;
|
||||
String replyToGroupIDKey;
|
||||
|
@ -57,7 +56,6 @@ public abstract class OutboundTransformer {
|
|||
firstAcquirerKey = prefixVendor + "FirstAcquirer";
|
||||
prefixDeliveryAnnotationsKey = prefixVendor + prefixDeliveryAnnotations;
|
||||
prefixMessageAnnotationsKey = prefixVendor + prefixMessageAnnotations;
|
||||
subjectKey = prefixVendor +"Subject";
|
||||
contentTypeKey = prefixVendor +"ContentType";
|
||||
contentEncodingKey = prefixVendor +"ContentEncoding";
|
||||
replyToGroupIDKey = prefixVendor +"ReplyToGroupID";
|
||||
|
|
|
@ -322,6 +322,47 @@ public class JMSClientTest extends JMSClientTestSupport {
|
|||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Test(timeout=30000)
|
||||
public void testSelectorsWithJMSType() throws Exception{
|
||||
ActiveMQAdmin.enableJMSFrameTracing();
|
||||
|
||||
connection = createConnection();
|
||||
{
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createQueue(getDestinationName());
|
||||
MessageProducer p = session.createProducer(queue);
|
||||
|
||||
TextMessage message = session.createTextMessage();
|
||||
message.setText("text");
|
||||
p.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
|
||||
|
||||
TextMessage message2 = session.createTextMessage();
|
||||
String type = "myJMSType";
|
||||
message2.setJMSType(type);
|
||||
message2.setText("text + type");
|
||||
p.send(message2, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
|
||||
|
||||
QueueBrowser browser = session.createBrowser(queue);
|
||||
Enumeration enumeration = browser.getEnumeration();
|
||||
int count = 0;
|
||||
while (enumeration.hasMoreElements()) {
|
||||
Message m = (Message) enumeration.nextElement();
|
||||
assertTrue(m instanceof TextMessage);
|
||||
count ++;
|
||||
}
|
||||
|
||||
assertEquals(2, count);
|
||||
|
||||
MessageConsumer consumer = session.createConsumer(queue, "JMSType = '"+ type +"'");
|
||||
Message msg = consumer.receive(TestConfig.TIMEOUT);
|
||||
assertNotNull(msg);
|
||||
assertTrue(msg instanceof TextMessage);
|
||||
assertEquals("Unexpected JMSType value", type, msg.getJMSType());
|
||||
assertEquals("Unexpected message content", "text + type", ((TextMessage) msg).getText());
|
||||
}
|
||||
}
|
||||
|
||||
abstract class Testable implements Runnable {
|
||||
protected String msg;
|
||||
synchronized boolean passed() {
|
||||
|
|
|
@ -32,7 +32,7 @@
|
|||
|
||||
<properties>
|
||||
<activemq.osgi.import.pkg>
|
||||
org.apache.xbean*;version="[3.13,4)",
|
||||
org.apache.xbean*;version="[3.13,5)",
|
||||
org.apache.aries.blueprint.*;version="[1.0,2)",
|
||||
*
|
||||
</activemq.osgi.import.pkg>
|
||||
|
|
|
@ -142,6 +142,7 @@
|
|||
!org.apache.activemq.transport.ws*;version=${project.version},
|
||||
!org.apache.activemq.transport.xstream;version=${project.version},
|
||||
!org.apache.activemq.transport.util;version=${project.version},
|
||||
org.apache.xbean*;version="[3.13,5)",
|
||||
org.apache.activemq*;version=${project.version};resolution:=optional
|
||||
</Import-Package>
|
||||
</instructions>
|
||||
|
|
|
@ -63,7 +63,8 @@
|
|||
scala*;resolution:=optional,
|
||||
org.springframework*;version="[3,4]";resolution:=optional,
|
||||
org.springframework.osgi*;version="[1,4]";resolution:=optional,
|
||||
org.apache.xbean.spring*;resolution:=optional,
|
||||
org.apache.xbean.spring*;resolution:=optional;version="[3.13,5)",
|
||||
org.apache.xbean*;version="[3.13,5)",
|
||||
javax.annotation*,
|
||||
javax.jms*,
|
||||
javax.management*,
|
||||
|
|
|
@ -152,6 +152,7 @@
|
|||
javax.net.ssl,
|
||||
org.osgi.framework;version="[1.5,2)",
|
||||
org.osgi.service.cm,
|
||||
org.apache.xbean*;version="[3.13,5)",
|
||||
org.apache.commons.logging;version="[1.1,2)";resolution:=optional,
|
||||
org.slf4j;version="[1.6,2)";resolution:=optional,
|
||||
org.slf4j.spi;version="[1.6,2)";resolution:=optional,
|
||||
|
|
|
@ -124,6 +124,7 @@
|
|||
org.osgi.framework,
|
||||
org.osgi.service.cm,
|
||||
org.apache.commons.logging;version="[1.1,2)";resolution:=optional,
|
||||
org.apache.xbean*;version="[3.13,5)",
|
||||
org.slf4j;version="[1.6,2)";resolution:=optional,
|
||||
org.slf4j.spi;version="[1.6,2)";resolution:=optional,
|
||||
org.apache.log4j;version="[1.2.14,2)";resolution:=optional,
|
||||
|
|
Loading…
Reference in New Issue