diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java index fcf12564ef..82adad4691 100644 --- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java @@ -16,17 +16,17 @@ */ package org.apache.activemq.camel.component.broker; -import java.util.Map; - import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.command.ActiveMQMessage; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.component.jms.JmsMessage; -import org.apache.camel.converter.ObjectConverter; import org.apache.camel.impl.DefaultAsyncProducer; +import javax.jms.JMSException; +import java.util.Map; + public class BrokerProducer extends DefaultAsyncProducer { private final BrokerEndpoint brokerEndpoint; @@ -53,6 +53,7 @@ public class BrokerProducer extends DefaultAsyncProducer { protected boolean processInOnly(final Exchange exchange, final AsyncCallback callback) { try { ActiveMQMessage message = getMessage(exchange); + if (message != null) { message.setDestination(brokerEndpoint.getDestination()); //if the ProducerBrokerExchange is null the broker will create it @@ -67,76 +68,48 @@ public class BrokerProducer extends DefaultAsyncProducer { return true; } - private ActiveMQMessage getMessage(Exchange exchange) throws Exception { - ActiveMQMessage result; - Message camelMessage; - if (exchange.hasOut()) { - camelMessage = exchange.getOut(); - } else { - camelMessage = exchange.getIn(); - } - - Map headers = camelMessage.getHeaders(); - - /** - * We purposely don't want to support injecting messages half-way through - * broker processing - use the activemq camel component for that - but - * we will support changing message headers and destinations - */ - if (camelMessage instanceof JmsMessage) { - JmsMessage jmsMessage = (JmsMessage) camelMessage; - if (jmsMessage.getJmsMessage() instanceof ActiveMQMessage) { - result = (ActiveMQMessage) jmsMessage.getJmsMessage(); - //lets apply any new message headers - setJmsHeaders(result, headers); - } else { - throw new IllegalStateException("Not the original message from the broker " + jmsMessage.getJmsMessage()); - } - } else { - throw new IllegalStateException("Not the original message from the broker " + camelMessage); - } - + private ActiveMQMessage getMessage(Exchange exchange) throws IllegalStateException, JMSException { + Message camelMessage = getMessageFromExchange(exchange); + checkOriginalMessage(camelMessage); + ActiveMQMessage result = (ActiveMQMessage) ((JmsMessage) camelMessage).getJmsMessage(); + applyNewHeaders(result, camelMessage.getHeaders()); return result; } - private void setJmsHeaders(ActiveMQMessage message, Map headers) { - message.setReadOnlyProperties(false); - for (Map.Entry entry : headers.entrySet()) { - if (entry.getKey().equalsIgnoreCase("JMSDeliveryMode")) { - Object value = entry.getValue(); - if (value instanceof Number) { - Number number = (Number) value; - message.setJMSDeliveryMode(number.intValue()); - } - } - if (entry.getKey().equalsIgnoreCase("JmsPriority")) { - Integer value = ObjectConverter.toInteger(entry.getValue()); - if (value != null) { - message.setJMSPriority(value.intValue()); - } - } - if (entry.getKey().equalsIgnoreCase("JMSTimestamp")) { - Long value = ObjectConverter.toLong(entry.getValue()); - if (value != null) { - message.setJMSTimestamp(value.longValue()); - } - } - if (entry.getKey().equalsIgnoreCase("JMSExpiration")) { - Long value = ObjectConverter.toLong(entry.getValue()); - if (value != null) { - message.setJMSExpiration(value.longValue()); - } - } - if (entry.getKey().equalsIgnoreCase("JMSRedelivered")) { - message.setJMSRedelivered(ObjectConverter.toBool(entry.getValue())); - } - if (entry.getKey().equalsIgnoreCase("JMSType")) { - Object value = entry.getValue(); - if (value != null) { - message.setJMSType(value.toString()); - } - } + private Message getMessageFromExchange(Exchange exchange) { + if (exchange.hasOut()) { + return exchange.getOut(); + } + + return exchange.getIn(); + } + + private void checkOriginalMessage(Message camelMessage) throws IllegalStateException { + /** + * We purposely don't want to support injecting messages half-way through + * broker processing - use the activemq camel component for that - but + * we will support changing message headers and destinations. + */ + + if (!(camelMessage instanceof JmsMessage)) { + throw new IllegalStateException("Not the original message from the broker " + camelMessage); + } + + javax.jms.Message message = ((JmsMessage) camelMessage).getJmsMessage(); + + if (!(message instanceof ActiveMQMessage)) { + throw new IllegalStateException("Not the original message from the broker " + message); } } + private void applyNewHeaders(ActiveMQMessage message, Map headers) throws JMSException { + for (Map.Entry entry : headers.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + if(value == null) { + continue; + } + message.setObjectProperty(key, value.toString(), false); + } + } } diff --git a/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java b/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java index c2fc3f6ee8..2773baac22 100644 --- a/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java +++ b/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java @@ -16,22 +16,10 @@ */ package org.apache.activemq.camel.component.broker; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerRegistry; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.xbean.BrokerFactoryBean; import org.junit.After; @@ -41,6 +29,14 @@ import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.Resource; +import javax.jms.*; +import java.util.Enumeration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class BrokerComponentXMLConfigTest { protected static final String CONF_ROOT = "src/test/resources/org/apache/activemq/camel/component/broker/"; @@ -70,7 +66,6 @@ public class BrokerComponentXMLConfigTest { producerConnection = factory.createConnection(); producerConnection.start(); consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); } @@ -133,7 +128,6 @@ public class BrokerComponentXMLConfigTest { latch.await(timeOutInSeconds, TimeUnit.SECONDS); assertEquals(0, latch.getCount()); - } @Test @@ -179,4 +173,35 @@ public class BrokerComponentXMLConfigTest { assertEquals(0, divertLatch.getCount()); } + @Test + public void testPreserveOriginalHeaders() throws Exception { + final ActiveMQQueue queue = new ActiveMQQueue(QUEUE_NAME); + + Topic topic = consumerSession.createTopic(TOPIC_NAME); + + final CountDownLatch latch = new CountDownLatch(messageCount); + MessageConsumer consumer = consumerSession.createConsumer(queue); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(javax.jms.Message message) { + try { + assertEquals("321", message.getStringProperty("JMSXGroupID")); + assertEquals("custom", message.getStringProperty("CustomHeader")); + latch.countDown(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + }); + MessageProducer producer = producerSession.createProducer(topic); + + for (int i = 0; i < messageCount; i++) { + javax.jms.Message message = producerSession.createTextMessage("test: " + i); + message.setStringProperty("JMSXGroupID", "123"); + producer.send(message); + } + + latch.await(timeOutInSeconds, TimeUnit.SECONDS); + assertEquals(0, latch.getCount()); + } } diff --git a/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml b/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml index 750c134815..b84350b9aa 100644 --- a/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml +++ b/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml @@ -15,45 +15,46 @@ limitations under the License. --> - + - - 9 + + 321 + + + custom + - - - - - #{@destinationView.enqueueCount >= 100} - - - - - - + + + + + #{@destinationView.enqueueCount >= 100} + + + + + + - - + + -