mirror of https://github.com/apache/activemq.git
added a plugin to support invocation of MessageListener classes as a bean in Camel; for more information see: http://cwiki.apache.org/CAMEL/activemq.html
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@583184 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
48e8c053fc
commit
810375af5f
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.camel.component;
|
||||
package org.apache.activemq.camel.converter;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.camel.Converter;
|
|
@ -0,0 +1,102 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.camel.converter;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageListener;
|
||||
import javax.jms.Message;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQMessage;
|
||||
import org.apache.activemq.command.ActiveMQObjectMessage;
|
||||
import org.apache.activemq.command.ActiveMQTextMessage;
|
||||
import org.apache.camel.Converter;
|
||||
import org.apache.camel.Exchange;
|
||||
import org.apache.camel.Processor;
|
||||
import org.apache.camel.component.jms.JmsBinding;
|
||||
|
||||
/**
|
||||
* @version $Revision$
|
||||
*/
|
||||
@Converter
|
||||
public class ActiveMQMessageConverter {
|
||||
private JmsBinding binding = new JmsBinding();
|
||||
|
||||
/**
|
||||
* Converts the inbound message exchange to an ActiveMQ JMS message
|
||||
*
|
||||
* @return the ActiveMQ message
|
||||
*/
|
||||
@Converter
|
||||
public ActiveMQMessage toMessage(Exchange exchange) throws JMSException {
|
||||
ActiveMQMessage message = createActiveMQMessage(exchange);
|
||||
getBinding().appendJmsProperties(message, exchange);
|
||||
return message;
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows a JMS {@link MessageListener} to be converted to a Camel {@link Processor}
|
||||
* so that we can provide better
|
||||
* <a href="">Bean Integration</a> so that we can use any JMS MessageListener in
|
||||
* in Camel as a bean
|
||||
* @param listener the JMS message listener
|
||||
* @return a newly created Camel Processor which when invoked will invoke
|
||||
* {@link MessageListener#onMessage(Message)}
|
||||
*/
|
||||
@Converter
|
||||
public Processor toProcessor(final MessageListener listener) {
|
||||
return new Processor() {
|
||||
public void process(Exchange exchange) throws Exception {
|
||||
Message message = toMessage(exchange);
|
||||
listener.onMessage(message);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Processor of MessageListener: " + listener;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static ActiveMQMessage createActiveMQMessage(Exchange exchange) throws JMSException {
|
||||
Object body = exchange.getIn().getBody();
|
||||
if (body instanceof String) {
|
||||
ActiveMQTextMessage answer = new ActiveMQTextMessage();
|
||||
answer.setText((String) body);
|
||||
return answer;
|
||||
} else if (body instanceof Serializable) {
|
||||
ActiveMQObjectMessage answer = new ActiveMQObjectMessage();
|
||||
answer.setObject((Serializable) body);
|
||||
return answer;
|
||||
} else {
|
||||
return new ActiveMQMessage();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Properties
|
||||
//-------------------------------------------------------------------------
|
||||
public JmsBinding getBinding() {
|
||||
return binding;
|
||||
}
|
||||
|
||||
public void setBinding(JmsBinding binding) {
|
||||
this.binding = binding;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,28 @@
|
|||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<html>
|
||||
<head>
|
||||
</head>
|
||||
<body>
|
||||
|
||||
Defines the <a href="http://activemq.apache.org/camel/type-converter.html">Type Converters</a> for working
|
||||
with JMS and ActiveMQ with <a href="http://activemq.apache.org/camel/">Camel</a>
|
||||
<a href="http://activemq.apache.org/camel/enterprise-integration-patterns.html">Enterprise Integration Patterns</a>
|
||||
|
||||
|
||||
</body>
|
||||
</html>
|
|
@ -15,4 +15,4 @@
|
|||
# limitations under the License.
|
||||
#
|
||||
|
||||
org.apache.activemq.camel.component
|
||||
org.apache.activemq.camel.converter
|
|
@ -16,13 +16,13 @@
|
|||
*/
|
||||
package org.apache.activemq.camel.component;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.camel.CamelContext;
|
||||
import org.apache.camel.ContextTestSupport;
|
||||
import org.apache.camel.Exchange;
|
||||
import org.apache.camel.Message;
|
||||
import org.apache.camel.builder.RouteBuilder;
|
||||
import static org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;
|
||||
import org.apache.activemq.camel.converter.ActiveMQConverter;
|
||||
import org.apache.camel.component.mock.AssertionClause;
|
||||
import org.apache.camel.component.mock.MockEndpoint;
|
||||
import org.apache.commons.logging.Log;
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.camel.converter;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import javax.jms.Message;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import static org.apache.activemq.camel.component.ActiveMQComponent.activeMQComponent;
|
||||
import org.apache.activemq.spring.ConsumerBean;
|
||||
import org.apache.camel.CamelContext;
|
||||
import org.apache.camel.ContextTestSupport;
|
||||
import org.apache.camel.builder.RouteBuilder;
|
||||
import org.apache.camel.component.mock.MockEndpoint;
|
||||
|
||||
/**
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class InvokeMessageListenerTest extends ContextTestSupport { protected MockEndpoint resultEndpoint;
|
||||
protected String startEndpointUri = "activemq:queue:test.a";
|
||||
protected ConsumerBean listener = new ConsumerBean();
|
||||
|
||||
public void testSendTextMessage() throws Exception {
|
||||
String expectedBody = "Hello there!";
|
||||
|
||||
template.sendBodyAndHeader(startEndpointUri, expectedBody, "cheese", 123);
|
||||
|
||||
listener.assertMessagesArrived(1);
|
||||
|
||||
List<Message> list = listener.flushMessages();
|
||||
assertTrue("Should have received some messages!", !list.isEmpty());
|
||||
Message message = list.get(0);
|
||||
|
||||
log.debug("Received: " + message);
|
||||
|
||||
TextMessage textMessage = assertIsInstanceOf(TextMessage.class, message);
|
||||
assertEquals("Text mesage body: " + textMessage, expectedBody, textMessage.getText());
|
||||
}
|
||||
|
||||
protected CamelContext createCamelContext() throws Exception {
|
||||
CamelContext camelContext = super.createCamelContext();
|
||||
camelContext.addComponent("activemq",
|
||||
activeMQComponent("vm://localhost?broker.persistent=false"));
|
||||
return camelContext;
|
||||
}
|
||||
|
||||
protected RouteBuilder createRouteBuilder() throws Exception {
|
||||
return new RouteBuilder() {
|
||||
public void configure() throws Exception {
|
||||
from(startEndpointUri).bean(listener);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue