migrated the Camel destination code from the camel project into ActiveMQ as it makes more sense to host it here - and avoids a circular dependency issue when releasing ActiveMQ 5.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@563609 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2007-08-07 18:36:58 +00:00
parent 2728ccbb88
commit 4102accade
15 changed files with 1261 additions and 49 deletions

View File

@ -39,28 +39,24 @@
<!-- =============================== -->
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<optional>false</optional>
</dependency>
<dependency>
<groupId>${pom.groupId}</groupId>
<artifactId>activeio-core</artifactId>
<optional>false</optional>
<artifactId>commons-logging-api</artifactId>
</dependency>
<dependency>
<groupId>${pom.groupId}</groupId>
<artifactId>activeio-core</artifactId>
<optional>false</optional>
<type>test-jar</type>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</dependency>
<dependency>
<groupId>${pom.groupId}</groupId>
<artifactId>activeio-core</artifactId>
<optional>false</optional>
</dependency>
<!-- =============================== -->
<!-- Optional Dependencies -->
<!-- =============================== -->
@ -69,6 +65,11 @@
<artifactId>activemq-jaas</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jms</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
@ -84,44 +85,29 @@
<artifactId>geronimo-j2ee-jacc_1.0_spec</artifactId>
<optional>true</optional>
</dependency>
<!-- commons -->
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-primitives</groupId>
<artifactId>commons-primitives</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<optional>true</optional>
</dependency>
<!-- for XML parsing -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>test</scope>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>axion</groupId>
<artifactId>axion</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>regexp</groupId>
<artifactId>regexp</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>activemq</groupId>
<artifactId>jmdns</artifactId>
@ -132,25 +118,74 @@
<artifactId>xalan</artifactId>
<optional>true</optional>
</dependency>
<!--- not really a dependency at all - just added optionally to get the generator working -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring</artifactId>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-openwire-generator</artifactId>
<optional>true</optional>
</dependency>
<!-- =============================== -->
<!-- Testing Dependencies -->
<!-- =============================== -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-openwire-generator</artifactId>
<optional>true</optional>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${pom.groupId}</groupId>
<artifactId>activeio-core</artifactId>
<optional>false</optional>
<type>test-jar</type>
</dependency>
<!-- testing camel helpers -->
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- database testing -->
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-primitives</groupId>
<artifactId>commons-primitives</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>axion</groupId>
<artifactId>axion</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>regexp</groupId>
<artifactId>regexp</artifactId>
<scope>test</scope>
</dependency>
<!-- LDAP tests -->

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.management.JMSStatsImpl;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.util.IdGenerator;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
/**
* @version $Revision: $
*/
public class CamelConnection extends ActiveMQConnection implements CamelContextAware {
private CamelContext camelContext;
protected CamelConnection(Transport transport, IdGenerator clientIdGenerator, JMSStatsImpl factoryStats) throws Exception {
super(transport, clientIdGenerator, factoryStats);
}
public CamelContext getCamelContext() {
return camelContext;
}
public void setCamelContext(CamelContext camelContext) {
this.camelContext = camelContext;
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel;
import org.apache.activemq.management.JMSStatsImpl;
import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.apache.activemq.transport.Transport;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
/**
* A JMS ConnectionFactory which resolves non-JMS destinations or instances of
* {@link CamelDestination} to use the {@link CamelContext} to perform smart routing etc
*
* @version $Revision: $
*/
public class CamelConnectionFactory extends ActiveMQConnectionFactory implements CamelContextAware {
private CamelContext camelContext;
public CamelConnectionFactory() {
}
public CamelContext getCamelContext() {
return camelContext;
}
public void setCamelContext(CamelContext camelContext) {
this.camelContext = camelContext;
}
// Implementation methods
//-----------------------------------------------------------------------
protected CamelConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
CamelConnection connection = new CamelConnection(transport, getClientIdGenerator(), stats);
CamelContext context = getCamelContext();
if (context != null) {
connection.setCamelContext(context);
}
return connection;
}
}

View File

@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.CustomDestination;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Endpoint;
import org.apache.camel.component.jms.JmsBinding;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.TopicPublisher;
import javax.jms.TopicSubscriber;
/**
* @version $Revision: $
*/
public class CamelDestination implements CustomDestination, CamelContextAware {
private String uri;
private Endpoint endpoint;
private CamelContext camelContext;
private JmsBinding binding = new JmsBinding();
public CamelDestination() {
}
public CamelDestination(String uri) {
this.uri = uri;
}
public String toString() {
return uri.toString();
}
// CustomDestination interface
//-----------------------------------------------------------------------
public MessageConsumer createConsumer(ActiveMQSession session, String messageSelector) {
return createConsumer(session, messageSelector, false);
}
public MessageConsumer createConsumer(ActiveMQSession session, String messageSelector, boolean noLocal) {
return new CamelMessageConsumer(this, resolveEndpoint(session), session, messageSelector, noLocal);
}
public TopicSubscriber createSubscriber(ActiveMQSession session, String messageSelector, boolean noLocal) {
return createDurableSubscriber(session, null, messageSelector, noLocal);
}
public TopicSubscriber createDurableSubscriber(ActiveMQSession session, String name, String messageSelector, boolean noLocal) {
throw new UnsupportedOperationException("This destination is not a Topic: " + this);
}
public QueueReceiver createReceiver(ActiveMQSession session, String messageSelector) {
throw new UnsupportedOperationException("This destination is not a Queue: " + this);
}
// Producers
//-----------------------------------------------------------------------
public MessageProducer createProducer(ActiveMQSession session) throws JMSException {
return new CamelMessageProducer(this, resolveEndpoint(session), session);
}
public TopicPublisher createPublisher(ActiveMQSession session) throws JMSException {
throw new UnsupportedOperationException("This destination is not a Topic: " + this);
}
public QueueSender createSender(ActiveMQSession session) throws JMSException {
throw new UnsupportedOperationException("This destination is not a Queue: " + this);
}
// Properties
//-----------------------------------------------------------------------
public String getUri() {
return uri;
}
public void setUri(String uri) {
this.uri = uri;
}
public Endpoint getEndpoint() {
return endpoint;
}
public void setEndpoint(Endpoint endpoint) {
this.endpoint = endpoint;
}
public CamelContext getCamelContext() {
return camelContext;
}
public void setCamelContext(CamelContext camelContext) {
this.camelContext = camelContext;
}
public JmsBinding getBinding() {
return binding;
}
public void setBinding(JmsBinding binding) {
this.binding = binding;
}
// Implementation methods
//-----------------------------------------------------------------------
/**
* Resolves the Camel Endpoint for this destination
*
* @return
*/
protected Endpoint resolveEndpoint(ActiveMQSession session) {
Endpoint answer = getEndpoint();
if (answer == null) {
answer = resolveCamelContext(session).getEndpoint(getUri());
if (answer == null) {
throw new IllegalArgumentException("No endpoint could be found for URI: " + getUri());
}
}
return answer;
}
protected CamelContext resolveCamelContext(ActiveMQSession session) {
CamelContext answer = getCamelContext();
if (answer == null) {
ActiveMQConnection connection = session.getConnection();
if (connection instanceof CamelConnection) {
CamelConnection camelConnection = (CamelConnection) connection;
answer = camelConnection.getCamelContext();
}
}
if (answer == null) {
throw new IllegalArgumentException("No CamelContext has been configured");
}
return answer;
}
}

View File

@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.camel.Consumer;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.PollingConsumer;
import org.apache.camel.Processor;
import javax.jms.*;
import javax.jms.IllegalStateException;
/**
* A JMS {@link javax.jms.MessageConsumer} which consumes message exchanges from a
* Camel {@link Endpoint}
*
* @version $Revision: $
*/
public class CamelMessageConsumer implements MessageConsumer {
private final CamelDestination destination;
private final Endpoint endpoint;
private final ActiveMQSession session;
private final String messageSelector;
private final boolean noLocal;
private MessageListener messageListener;
private Consumer consumer;
private PollingConsumer pollingConsumer;
private boolean closed;
public CamelMessageConsumer(CamelDestination destination, Endpoint endpoint, ActiveMQSession session, String messageSelector, boolean noLocal) {
this.destination = destination;
this.endpoint = endpoint;
this.session = session;
this.messageSelector = messageSelector;
this.noLocal = noLocal;
}
public void close() throws JMSException {
if (!closed) {
closed = true;
try {
if (consumer != null) {
consumer.stop();
}
if (pollingConsumer != null) {
pollingConsumer.stop();
}
}
catch (JMSException e) {
throw e;
}
catch (Exception e) {
throw JMSExceptionSupport.create(e);
}
}
}
public MessageListener getMessageListener() throws JMSException {
return messageListener;
}
public void setMessageListener(MessageListener messageListener) throws JMSException {
this.messageListener = messageListener;
if (messageListener != null && consumer == null) {
consumer = createConsumer();
}
}
public Message receive() throws JMSException {
Exchange exchange = getPollingConsumer().receive();
return createMessage(exchange);
}
public Message receive(long timeoutMillis) throws JMSException {
Exchange exchange = getPollingConsumer().receive(timeoutMillis);
return createMessage(exchange);
}
public Message receiveNoWait() throws JMSException {
Exchange exchange = getPollingConsumer().receiveNoWait();
return createMessage(exchange);
}
// Properties
//-----------------------------------------------------------------------
public CamelDestination getDestination() {
return destination;
}
public Endpoint getEndpoint() {
return endpoint;
}
public String getMessageSelector() {
return messageSelector;
}
public boolean isNoLocal() {
return noLocal;
}
public ActiveMQSession getSession() {
return session;
}
// Implementation methods
//-----------------------------------------------------------------------
protected PollingConsumer getPollingConsumer() throws JMSException {
try {
if (pollingConsumer == null) {
pollingConsumer = endpoint.createPollingConsumer();
pollingConsumer.start();
}
return pollingConsumer;
}
catch (JMSException e) {
throw e;
}
catch (Exception e) {
throw JMSExceptionSupport.create(e);
}
}
protected Message createMessage(Exchange exchange) throws JMSException {
if (exchange != null) {
Message message = destination.getBinding().makeJmsMessage(exchange, session);
return message;
}
else {
return null;
}
}
protected Consumer createConsumer() throws JMSException {
try {
Consumer answer = endpoint.createConsumer(new Processor() {
public void process(Exchange exchange) throws Exception {
Message message = createMessage(exchange);
getMessageListener().onMessage(message);
}
});
answer.start();
return answer;
}
catch (JMSException e) {
throw e;
}
catch (Exception e) {
throw JMSExceptionSupport.create(e);
}
}
protected void checkClosed() throws javax.jms.IllegalStateException {
if (closed) {
throw new IllegalStateException("The producer is closed");
}
}
}

View File

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel;
import org.apache.activemq.ActiveMQMessageProducerSupport;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.camel.Endpoint;
import org.apache.camel.Producer;
import org.apache.camel.component.jms.JmsExchange;
import org.apache.camel.util.ObjectHelper;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
/**
* A JMS {@link javax.jms.MessageProducer} which sends message exchanges to a
* Camel {@link Endpoint}
*
* @version $Revision: $
*/
public class CamelMessageProducer extends ActiveMQMessageProducerSupport {
private final CamelDestination destination;
private final Endpoint endpoint;
protected Producer producer;
private boolean closed;
public CamelMessageProducer(CamelDestination destination, Endpoint endpoint, ActiveMQSession session) throws JMSException {
super(session);
this.destination = destination;
this.endpoint = endpoint;
try {
this.producer = endpoint.createProducer();
}
catch (JMSException e) {
throw e;
}
catch (Exception e) {
throw JMSExceptionSupport.create(e);
}
}
public CamelDestination getDestination() throws JMSException {
return destination;
}
public Endpoint getEndpoint() {
return endpoint;
}
public void close() throws JMSException {
if (!closed) {
closed = true;
try {
producer.stop();
}
catch (JMSException e) {
throw e;
}
catch (Exception e) {
throw JMSExceptionSupport.create(e);
}
}
}
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
CamelDestination camelDestination = null;
if (ObjectHelper.equals(destination, this.destination)) {
camelDestination = this.destination;
}
else {
// TODO support any CamelDestination?
throw new IllegalArgumentException("Invalid destination setting: " + destination + " when expected: " + this.destination);
}
try {
JmsExchange exchange = new JmsExchange(endpoint.getContext(), camelDestination.getBinding(), message);
producer.process(exchange);
}
catch (JMSException e) {
throw e;
}
catch (Exception e) {
throw JMSExceptionSupport.create(e);
}
}
protected void checkClosed() throws IllegalStateException {
if (closed) {
throw new IllegalStateException("The producer is closed");
}
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel;
import org.apache.activemq.ActiveMQSession;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueSender;
import javax.jms.QueueReceiver;
/**
* A JMS {@link Queue} object which refers to a Camel endpoint
*
* @version $Revision: $
*/
public class CamelQueue extends CamelDestination implements Queue {
public CamelQueue(String uri) {
super(uri);
}
public String getQueueName() throws JMSException {
return getUri();
}
public QueueSender createSender(ActiveMQSession session) throws JMSException {
return new CamelQueueSender(this, resolveEndpoint(session), session);
}
public QueueReceiver createReceiver(ActiveMQSession session, String messageSelector) {
return new CamelQueueReceiver(this, resolveEndpoint(session), session, messageSelector);
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel;
import org.apache.activemq.ActiveMQSession;
import org.apache.camel.Endpoint;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueReceiver;
/**
* A JMS {@link javax.jms.QueueReceiver} which consumes message exchanges from a
* Camel {@link org.apache.camel.Endpoint}
*
* @version $Revision: $
*/
public class CamelQueueReceiver extends CamelMessageConsumer implements QueueReceiver {
public CamelQueueReceiver(CamelQueue destination, Endpoint endpoint, ActiveMQSession session, String name) {
super(destination, endpoint, session, null, false);
}
/**
* Gets the <CODE>Queue</CODE> associated with this queue receiver.
*
* @return this receiver's <CODE>Queue</CODE>
* @throws JMSException if the JMS provider fails to get the queue for this queue
* receiver due to some internal error.
*/
public Queue getQueue() throws JMSException {
checkClosed();
return (Queue) super.getDestination();
}
}

View File

@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel;
import org.apache.activemq.ActiveMQSession;
import org.apache.camel.Endpoint;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueSender;
/**
* A JMS {@link javax.jms.QueueSender} which sends message exchanges to a
* Camel {@link org.apache.camel.Endpoint}
*
* @version $Revision: $
*/
public class CamelQueueSender extends CamelMessageProducer implements QueueSender {
public CamelQueueSender(CamelQueue destination, Endpoint endpoint, ActiveMQSession session) throws JMSException {
super(destination, endpoint, session);
}
/**
* Gets the queue associated with this <CODE>QueueSender</CODE>.
*
* @return this sender's queue
* @throws JMSException if the JMS provider fails to get the queue for this
* <CODE>QueueSender</CODE> due to some internal error.
*/
public Queue getQueue() throws JMSException {
return (Queue) super.getDestination();
}
/**
* Sends a message to a queue for an unidentified message producer. Uses
* the <CODE>QueueSender</CODE>'s default delivery mode, priority, and
* time to live.
* <p/>
* <p/>
* Typically, a message producer is assigned a queue at creation time;
* however, the JMS API also supports unidentified message producers, which
* require that the queue be supplied every time a message is sent.
*
* @param queue the queue to send this message to
* @param message the message to send
* @throws JMSException if the JMS provider fails to send the message due to some
* internal error.
* @see javax.jms.MessageProducer#getDeliveryMode()
* @see javax.jms.MessageProducer#getTimeToLive()
* @see javax.jms.MessageProducer#getPriority()
*/
public void send(Queue queue, Message message) throws JMSException {
super.send(queue, message);
}
/**
* Sends a message to a queue for an unidentified message producer,
* specifying delivery mode, priority and time to live.
* <p/>
* <p/>
* Typically, a message producer is assigned a queue at creation time;
* however, the JMS API also supports unidentified message producers, which
* require that the queue be supplied every time a message is sent.
*
* @param queue the queue to send this message to
* @param message the message to send
* @param deliveryMode the delivery mode to use
* @param priority the priority for this message
* @param timeToLive the message's lifetime (in milliseconds)
* @throws JMSException if the JMS provider fails to send the message due to some
* internal error.
*/
public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive)
throws JMSException {
super.send(queue,
message,
deliveryMode,
priority,
timeToLive);
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel;
import org.apache.activemq.ActiveMQSession;
import javax.jms.JMSException;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import javax.jms.TopicSubscriber;
/**
* A JMS {@link javax.jms.Topic} object which refers to a Camel endpoint
*
* @version $Revision: $
*/
public class CamelTopic extends CamelDestination implements Topic {
public CamelTopic(String uri) {
super(uri);
}
public String getTopicName() throws JMSException {
return getUri();
}
public TopicPublisher createPublisher(ActiveMQSession session) throws JMSException {
return new CamelTopicPublisher(this, resolveEndpoint(session), session);
}
public TopicSubscriber createDurableSubscriber(ActiveMQSession session, String name, String messageSelector, boolean noLocal) {
return new CamelTopicSubscriber(this, resolveEndpoint(session), session, name, messageSelector, noLocal);
}
}

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel;
import org.apache.activemq.ActiveMQSession;
import org.apache.camel.Endpoint;
import javax.jms.JMSException;
import javax.jms.TopicPublisher;
import javax.jms.Topic;
import javax.jms.Message;
/**
* A JMS {@link javax.jms.TopicPublisher} which sends message exchanges to a
* Camel {@link Endpoint}
*
* @version $Revision: $
*/
public class CamelTopicPublisher extends CamelMessageProducer implements TopicPublisher {
public CamelTopicPublisher(CamelTopic destination, Endpoint endpoint, ActiveMQSession session) throws JMSException {
super(destination, endpoint, session);
}
/**
* Gets the topic associated with this <CODE>TopicPublisher</CODE>.
*
* @return this publisher's topic
* @throws JMSException if the JMS provider fails to get the topic for this
* <CODE>TopicPublisher</CODE> due to some internal error.
*/
public Topic getTopic() throws JMSException {
return (Topic) super.getDestination();
}
/**
* Publishes a message to the topic. Uses the <CODE>TopicPublisher</CODE>'s
* default delivery mode, priority, and time to live.
*
* @param message the message to publish
* @throws JMSException if the JMS provider fails to publish the message due to
* some internal error.
* @throws javax.jms.MessageFormatException if an invalid message is specified.
* @throws javax.jms.InvalidDestinationException if a client uses this method with a <CODE>TopicPublisher
* </CODE> with an invalid topic.
* @throws java.lang.UnsupportedOperationException
* if a client uses this method with a <CODE>TopicPublisher
* </CODE> that did not specify a topic at creation time.
* @see javax.jms.MessageProducer#getDeliveryMode()
* @see javax.jms.MessageProducer#getTimeToLive()
* @see javax.jms.MessageProducer#getPriority()
*/
public void publish(Message message) throws JMSException {
super.send(message);
}
/**
* Publishes a message to the topic, specifying delivery mode, priority,
* and time to live.
*
* @param message the message to publish
* @param deliveryMode the delivery mode to use
* @param priority the priority for this message
* @param timeToLive the message's lifetime (in milliseconds)
* @throws JMSException if the JMS provider fails to publish the message due to
* some internal error.
* @throws javax.jms.MessageFormatException if an invalid message is specified.
* @throws javax.jms.InvalidDestinationException if a client uses this method with a <CODE>TopicPublisher
* </CODE> with an invalid topic.
* @throws java.lang.UnsupportedOperationException
* if a client uses this method with a <CODE>TopicPublisher
* </CODE> that did not specify a topic at creation time.
*/
public void publish(Message message, int deliveryMode, int priority,
long timeToLive) throws JMSException {
super.send(message, deliveryMode, priority, timeToLive);
}
/**
* Publishes a message to a topic for an unidentified message producer.
* Uses the <CODE>TopicPublisher</CODE>'s default delivery mode,
* priority, and time to live.
* <p/>
* <P>
* Typically, a message producer is assigned a topic at creation time;
* however, the JMS API also supports unidentified message producers, which
* require that the topic be supplied every time a message is published.
*
* @param topic the topic to publish this message to
* @param message the message to publish
* @throws JMSException if the JMS provider fails to publish the message due to
* some internal error.
* @throws javax.jms.MessageFormatException if an invalid message is specified.
* @throws javax.jms.InvalidDestinationException if a client uses this method with an invalid topic.
* @see javax.jms.MessageProducer#getDeliveryMode()
* @see javax.jms.MessageProducer#getTimeToLive()
* @see javax.jms.MessageProducer#getPriority()
*/
public void publish(Topic topic, Message message) throws JMSException {
super.send(topic, message);
}
/**
* Publishes a message to a topic for an unidentified message producer,
* specifying delivery mode, priority and time to live.
* <p/>
* <P>
* Typically, a message producer is assigned a topic at creation time;
* however, the JMS API also supports unidentified message producers, which
* require that the topic be supplied every time a message is published.
*
* @param topic the topic to publish this message to
* @param message the message to publish
* @param deliveryMode the delivery mode to use
* @param priority the priority for this message
* @param timeToLive the message's lifetime (in milliseconds)
* @throws JMSException if the JMS provider fails to publish the message due to
* some internal error.
* @throws javax.jms.MessageFormatException if an invalid message is specified.
* @throws javax.jms.InvalidDestinationException if a client uses this method with an invalid topic.
*/
public void publish(Topic topic, Message message, int deliveryMode,
int priority, long timeToLive) throws JMSException {
super.send(topic, message, deliveryMode, priority, timeToLive);
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel;
import org.apache.activemq.ActiveMQSession;
import org.apache.camel.Endpoint;
import javax.jms.JMSException;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
/**
* A JMS {@link javax.jms.TopicSubscriber} which consumes message exchanges from a
* Camel {@link Endpoint}
*
* @version $Revision: $
*/
public class CamelTopicSubscriber extends CamelMessageConsumer implements TopicSubscriber {
public CamelTopicSubscriber(CamelTopic destination, Endpoint endpoint, ActiveMQSession session, String name, String messageSelector, boolean noLocal) {
super(destination, endpoint, session, messageSelector, noLocal);
}
/**
* Gets the <CODE>Topic</CODE> associated with this subscriber.
*
* @return this subscriber's <CODE>Topic</CODE>
* @throws javax.jms.JMSException if the JMS provider fails to get the topic for this topic
* subscriber due to some internal error.
*/
public Topic getTopic() throws JMSException {
checkClosed();
return (Topic) super.getDestination();
}
/**
* Gets the <CODE>NoLocal</CODE> attribute for this subscriber. The
* default value for this attribute is false.
*
* @return true if locally published messages are being inhibited
* @throws JMSException if the JMS provider fails to get the <CODE>NoLocal
* </CODE> attribute for this topic subscriber due to some
* internal error.
*/
public boolean getNoLocal() throws JMSException {
checkClosed();
return super.isNoLocal();
}
}

View File

@ -0,0 +1,25 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<html>
<head>
</head>
<body>
Defines a JMS client which is capable of sending and receiving messages to Camel endpoints
</body>
</html>

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel;
import junit.framework.Assert;
import org.apache.camel.CamelTemplate;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.spring.SpringTestSupport;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* @version $Revision: $
*/
public class CamelJmsTest extends SpringTestSupport {
protected String expectedBody = "<hello>world!</hello>";
public void testSendingViaJmsIsReceivedByCamel() throws Exception {
MockEndpoint result = resolveMandatoryEndpoint("mock:result", MockEndpoint.class);
result.expectedBodiesReceived(expectedBody);
result.message(0).header("foo").isEqualTo("bar");
// lets create a message
Destination destination = getMandatoryBean(Destination.class, "sendTo");
ConnectionFactory factory = getMandatoryBean(ConnectionFactory.class, "connectionFactory");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(destination);
// now lets send a message
ObjectMessage message = session.createObjectMessage(expectedBody);
message.setStringProperty("foo", "bar");
producer.send(message);
result.assertIsSatisfied();
log.info("Received message: " + result.getReceivedExchanges());
}
public void testConsumingViaJMSReceivesMessageFromCamel() throws Exception {
// lets create a message
Destination destination = getMandatoryBean(Destination.class, "consumeFrom");
ConnectionFactory factory = getMandatoryBean(ConnectionFactory.class, "connectionFactory");
CamelTemplate template = getMandatoryBean(CamelTemplate.class, "camelTemplate");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
log.info("Consuming from: " + destination);
MessageConsumer consumer = session.createConsumer(destination);
// now lets send a message
template.sendBody("seda:consumer", expectedBody);
Message message = consumer.receive(5000);
Assert.assertNotNull("Should have received a message from destination: " + destination, message);
TextMessage textMessage = assertIsInstanceOf(TextMessage.class, message);
Assert.assertEquals("Message body", expectedBody, textMessage.getText());
log.info("Received message: " + message);
}
protected int getExpectedRouteCount() {
return 0;
}
protected ClassPathXmlApplicationContext createApplicationContext() {
return new ClassPathXmlApplicationContext("org/apache/activemq/camel/spring.xml");
}
}

View File

@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/camel/schema/spring http://activemq.apache.org/camel/schema/spring/camel-spring.xsd
">
<!-- START SNIPPET: example -->
<camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
<beanPostProcessor/>
</camelContext>
<bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://localhost?broker.persistent=false"/>
</bean>
<bean id="sendTo" class="org.apache.activemq.camel.CamelDestination">
<property name="uri" value="mock:result"/>
</bean>
<bean id="consumeFrom" class="org.apache.activemq.camel.CamelDestination">
<property name="uri" value="seda:consumer"/>
</bean>
<bean id="camelTemplate" class="org.apache.camel.spring.CamelTemplateFactoryBean"/>
<!-- END SNIPPET: example -->
<!--
<bean id="connectionFactory" class="org.apache.camel.jms.CamelConnectionFactory">
<property name="brokerURL" value="vm://localhost?broker.persistent=false"/>
</bean>
-->
</beans>