diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java index 0c39c70fc1..a10e1499f6 100644 --- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java @@ -28,9 +28,11 @@ import org.apache.activemq.advisory.DestinationListener; import org.apache.activemq.advisory.DestinationSource; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.Endpoint; +import org.apache.camel.component.jms.JmsEndpoint; import org.apache.camel.component.jms.JmsQueueEndpoint; import org.apache.camel.util.ObjectHelper; import org.apache.commons.logging.Log; @@ -83,6 +85,15 @@ public class CamelEndpointLoader implements InitializingBean, DisposableBean, Ca removeQueue(queue); } } + else if (destination instanceof ActiveMQTopic) { + ActiveMQTopic topic = (ActiveMQTopic) destination; + if (event.isAddOperation()) { + addTopic(topic); + } + else { + removeTopic(topic); + } + } } catch (Exception e) { LOG.warn("Caught: " + e, e); @@ -94,6 +105,11 @@ public class CamelEndpointLoader implements InitializingBean, DisposableBean, Ca for (ActiveMQQueue queue : queues) { addQueue(queue); } + + Set topics = source.getTopics(); + for (ActiveMQTopic topic : topics) { + addTopic(topic); + } } public void destroy() throws Exception { @@ -159,4 +175,20 @@ public class CamelEndpointLoader implements InitializingBean, DisposableBean, Ca String queueUri = getQueueUri(queue); camelContext.removeEndpoints(queueUri); } + + protected void addTopic(ActiveMQTopic topic) throws Exception { + String topicUri = getTopicUri(topic); + ActiveMQComponent jmsComponent = getComponent(); + Endpoint endpoint = new JmsEndpoint(topicUri, jmsComponent, topic.getPhysicalName(), true, jmsComponent.getConfiguration()); + camelContext.addEndpoint(topicUri, endpoint); + } + + protected String getTopicUri(ActiveMQTopic topic) { + return "activemq:topic:" + topic.getPhysicalName(); + } + + protected void removeTopic(ActiveMQTopic topic) throws Exception { + String topicUri = getTopicUri(topic); + camelContext.removeEndpoints(topicUri); + } }