This closes #137

Fix for AMQ-5903 - thanks Heath and Paul
This commit is contained in:
Jeff Genender 2015-07-29 18:27:47 -06:00
commit de86f473f7
3 changed files with 105 additions and 106 deletions

View File

@ -16,17 +16,17 @@
*/
package org.apache.activemq.camel.component.broker;
import java.util.Map;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.camel.converter.ObjectConverter;
import org.apache.camel.impl.DefaultAsyncProducer;
import javax.jms.JMSException;
import java.util.Map;
public class BrokerProducer extends DefaultAsyncProducer {
private final BrokerEndpoint brokerEndpoint;
@ -53,6 +53,7 @@ public class BrokerProducer extends DefaultAsyncProducer {
protected boolean processInOnly(final Exchange exchange, final AsyncCallback callback) {
try {
ActiveMQMessage message = getMessage(exchange);
if (message != null) {
message.setDestination(brokerEndpoint.getDestination());
//if the ProducerBrokerExchange is null the broker will create it
@ -67,76 +68,48 @@ public class BrokerProducer extends DefaultAsyncProducer {
return true;
}
private ActiveMQMessage getMessage(Exchange exchange) throws Exception {
ActiveMQMessage result;
Message camelMessage;
if (exchange.hasOut()) {
camelMessage = exchange.getOut();
} else {
camelMessage = exchange.getIn();
}
Map<String, Object> headers = camelMessage.getHeaders();
/**
* We purposely don't want to support injecting messages half-way through
* broker processing - use the activemq camel component for that - but
* we will support changing message headers and destinations
*/
if (camelMessage instanceof JmsMessage) {
JmsMessage jmsMessage = (JmsMessage) camelMessage;
if (jmsMessage.getJmsMessage() instanceof ActiveMQMessage) {
result = (ActiveMQMessage) jmsMessage.getJmsMessage();
//lets apply any new message headers
setJmsHeaders(result, headers);
} else {
throw new IllegalStateException("Not the original message from the broker " + jmsMessage.getJmsMessage());
}
} else {
throw new IllegalStateException("Not the original message from the broker " + camelMessage);
}
private ActiveMQMessage getMessage(Exchange exchange) throws IllegalStateException, JMSException {
Message camelMessage = getMessageFromExchange(exchange);
checkOriginalMessage(camelMessage);
ActiveMQMessage result = (ActiveMQMessage) ((JmsMessage) camelMessage).getJmsMessage();
applyNewHeaders(result, camelMessage.getHeaders());
return result;
}
private void setJmsHeaders(ActiveMQMessage message, Map<String, Object> headers) {
message.setReadOnlyProperties(false);
for (Map.Entry<String, Object> entry : headers.entrySet()) {
if (entry.getKey().equalsIgnoreCase("JMSDeliveryMode")) {
Object value = entry.getValue();
if (value instanceof Number) {
Number number = (Number) value;
message.setJMSDeliveryMode(number.intValue());
}
}
if (entry.getKey().equalsIgnoreCase("JmsPriority")) {
Integer value = ObjectConverter.toInteger(entry.getValue());
if (value != null) {
message.setJMSPriority(value.intValue());
}
}
if (entry.getKey().equalsIgnoreCase("JMSTimestamp")) {
Long value = ObjectConverter.toLong(entry.getValue());
if (value != null) {
message.setJMSTimestamp(value.longValue());
}
}
if (entry.getKey().equalsIgnoreCase("JMSExpiration")) {
Long value = ObjectConverter.toLong(entry.getValue());
if (value != null) {
message.setJMSExpiration(value.longValue());
}
}
if (entry.getKey().equalsIgnoreCase("JMSRedelivered")) {
message.setJMSRedelivered(ObjectConverter.toBool(entry.getValue()));
}
if (entry.getKey().equalsIgnoreCase("JMSType")) {
Object value = entry.getValue();
if (value != null) {
message.setJMSType(value.toString());
}
}
private Message getMessageFromExchange(Exchange exchange) {
if (exchange.hasOut()) {
return exchange.getOut();
}
return exchange.getIn();
}
private void checkOriginalMessage(Message camelMessage) throws IllegalStateException {
/**
* We purposely don't want to support injecting messages half-way through
* broker processing - use the activemq camel component for that - but
* we will support changing message headers and destinations.
*/
if (!(camelMessage instanceof JmsMessage)) {
throw new IllegalStateException("Not the original message from the broker " + camelMessage);
}
javax.jms.Message message = ((JmsMessage) camelMessage).getJmsMessage();
if (!(message instanceof ActiveMQMessage)) {
throw new IllegalStateException("Not the original message from the broker " + message);
}
}
private void applyNewHeaders(ActiveMQMessage message, Map<String, Object> headers) throws JMSException {
for (Map.Entry<String, Object> entry : headers.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if(value == null) {
continue;
}
message.setObjectProperty(key, value.toString(), false);
}
}
}

View File

@ -16,22 +16,10 @@
*/
package org.apache.activemq.camel.component.broker;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.junit.After;
@ -41,6 +29,14 @@ import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import javax.jms.*;
import java.util.Enumeration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class BrokerComponentXMLConfigTest {
protected static final String CONF_ROOT = "src/test/resources/org/apache/activemq/camel/component/broker/";
@ -70,7 +66,6 @@ public class BrokerComponentXMLConfigTest {
producerConnection = factory.createConnection();
producerConnection.start();
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
@ -133,7 +128,6 @@ public class BrokerComponentXMLConfigTest {
latch.await(timeOutInSeconds, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());
}
@Test
@ -179,4 +173,35 @@ public class BrokerComponentXMLConfigTest {
assertEquals(0, divertLatch.getCount());
}
@Test
public void testPreserveOriginalHeaders() throws Exception {
final ActiveMQQueue queue = new ActiveMQQueue(QUEUE_NAME);
Topic topic = consumerSession.createTopic(TOPIC_NAME);
final CountDownLatch latch = new CountDownLatch(messageCount);
MessageConsumer consumer = consumerSession.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(javax.jms.Message message) {
try {
assertEquals("321", message.getStringProperty("JMSXGroupID"));
assertEquals("custom", message.getStringProperty("CustomHeader"));
latch.countDown();
} catch (Throwable e) {
e.printStackTrace();
}
}
});
MessageProducer producer = producerSession.createProducer(topic);
for (int i = 0; i < messageCount; i++) {
javax.jms.Message message = producerSession.createTextMessage("test: " + i);
message.setStringProperty("JMSXGroupID", "123");
producer.send(message);
}
latch.await(timeOutInSeconds, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());
}
}

View File

@ -15,45 +15,46 @@
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<camelContext id="camel" trace="false" xmlns="http://camel.apache.org/schema/spring">
<!-- You can use Spring XML syntax to define the routes here using the <route> element -->
<route id="brokerComponentTest">
<from uri="broker:topic:test.broker.>"/>
<setHeader headerName="JMSPriority">
<constant>9</constant>
</setHeader>
<setHeader headerName="JMSXGroupID">
<constant>321</constant>
</setHeader>
<setHeader headerName="CustomHeader">
<constant>custom</constant>
</setHeader>
<to uri="broker:queue:test.broker.component.queue"/>
</route>
<route id="brokerComponentDLQAboveLimitTest">
<from uri="broker:queue:test.broker.component.route"/>
<choice>
<when>
<spel>#{@destinationView.enqueueCount >= 100}</spel>
<to uri="broker:queue:test.broker.component.ProcessLater"/>
</when>
<otherwise>
<to uri="broker:queue:test.broker.component.route"/>
</otherwise>
</choice>
<route id="brokerComponentDLQAboveLimitTest">
<from uri="broker:queue:test.broker.component.route"/>
<choice>
<when>
<spel>#{@destinationView.enqueueCount >= 100}</spel>
<to uri="broker:queue:test.broker.component.ProcessLater"/>
</when>
<otherwise>
<to uri="broker:queue:test.broker.component.route"/>
</otherwise>
</choice>
</route>
</camelContext>
<bean id="brokerView" class="org.apache.activemq.broker.view.MessageBrokerView">
<constructor-arg value="testBroker"/>
</bean>
<bean id="destinationView" factory-bean="brokerView" factory-method="getDestinationView">
<constructor-arg value="test.broker.component.route"/>
</bean>
</beans>