CAMEL-8522: Enrich AMQ message with original destination (where Camel received the message) which for example allows to keep that information when sending messages to other queues or DLQs etc.

This commit is contained in:
Claus Ibsen 2016-02-20 10:22:11 +01:00
parent 49974279a7
commit f490ab549d
4 changed files with 138 additions and 1 deletions

View File

@ -206,6 +206,11 @@ public class ActiveMQComponent extends JmsComponent implements EndpointCompleter
endpointLoader = new CamelEndpointLoader(getCamelContext(), source); endpointLoader = new CamelEndpointLoader(getCamelContext(), source);
endpointLoader.afterPropertiesSet(); endpointLoader.afterPropertiesSet();
} }
// use OriginalDestinationPropagateStrategy by default if no custom stategy has been set
if (getMessageCreatedStrategy() == null) {
setMessageCreatedStrategy(new OriginalDestinationPropagateStrategy());
}
} }
protected void createDestinationSource() { protected void createDestinationSource() {

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component;
import javax.jms.Message;
import javax.jms.Session;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.camel.Exchange;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.camel.component.jms.MessageCreatedStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A strategy to enrich JMS message with their original destination if the Camel
* route originates from a JMS destination.
*/
public class OriginalDestinationPropagateStrategy implements MessageCreatedStrategy {
private static final transient Logger LOG = LoggerFactory.getLogger(OriginalDestinationPropagateStrategy.class);
@Override
public void onMessageCreated(Message message, Session session, Exchange exchange, Throwable cause) {
if (exchange.getIn() instanceof JmsMessage) {
JmsMessage msg = exchange.getIn(JmsMessage.class);
Message jms = msg.getJmsMessage();
if (message instanceof ActiveMQMessage) {
ActiveMQMessage amq = (ActiveMQMessage) jms;
if (amq.getOriginalDestination() == null) {
ActiveMQDestination from = amq.getDestination();
if (from != null) {
LOG.trace("Setting OriginalDestination: {} on {}", from, message);
((ActiveMQMessage) message).setOriginalDestination(from);
}
}
}
}
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.component;
import javax.jms.Message;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.jms.JmsMessage;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
import static org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;
public class ActiveMQOriginalDestinationTest extends CamelTestSupport {
@Test
public void testActiveMQOriginalDestination() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(1);
template.sendBody("activemq:queue:foo", "Hello World");
assertMockEndpointsSatisfied();
// consume from bar
Exchange out = consumer.receive("activemq:queue:bar", 5000);
assertNotNull(out);
// and we should have foo as the original destination
JmsMessage msg = out.getIn(JmsMessage.class);
Message jms = msg.getJmsMessage();
ActiveMQMessage amq = assertIsInstanceOf(ActiveMQMessage.class, jms);
ActiveMQDestination original = amq.getOriginalDestination();
assertNotNull(original);
assertEquals("foo", original.getPhysicalName());
assertEquals("Queue", original.getDestinationTypeAsString());
}
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
camelContext.addComponent("activemq", activeMQComponent("vm://localhost?broker.persistent=false"));
return camelContext;
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("activemq:queue:foo")
.to("activemq:queue:bar")
.to("mock:result");
}
};
}
}

View File

@ -48,7 +48,7 @@
<aries-transaction-version>1.1.1</aries-transaction-version> <aries-transaction-version>1.1.1</aries-transaction-version>
<axion-version>1.0-M3-dev</axion-version> <axion-version>1.0-M3-dev</axion-version>
<camel-version>2.16.2</camel-version> <camel-version>2.16.2</camel-version>
<camel-version-range>[2.15,3)</camel-version-range> <camel-version-range>[2.16,3)</camel-version-range>
<cglib-version>2.2</cglib-version> <cglib-version>2.2</cglib-version>
<commons-beanutils-version>1.8.3</commons-beanutils-version> <commons-beanutils-version>1.8.3</commons-beanutils-version>
<commons-collections-version>3.2.2</commons-collections-version> <commons-collections-version>3.2.2</commons-collections-version>