diff --git a/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java b/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java index 17e5999450..6d2f9557fb 100644 --- a/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java +++ b/activemq-core/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java @@ -26,6 +26,9 @@ import org.apache.camel.component.jms.JmsConfiguration; * @version $Revision$ */ public class ActiveMQComponent extends JmsComponent { + private boolean exposeAllQueues; + private CamelEndpointLoader endpointLoader; + /** * Creates an ActiveMQ Component * @@ -68,6 +71,38 @@ public class ActiveMQComponent extends JmsComponent { getConfiguration().setBrokerURL(brokerURL); } + public boolean isExposeAllQueues() { + return exposeAllQueues; + } + + /** + * If enabled this will cause all Queues in the ActiveMQ broker to be eagerly populated into the CamelContext + * so that they can be easily browsed by any Camel tooling. This option is disabled by default. + * + * @param exposeAllQueues + */ + public void setExposeAllQueues(boolean exposeAllQueues) { + this.exposeAllQueues = exposeAllQueues; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + if (isExposeAllQueues()) { + endpointLoader = new CamelEndpointLoader(getCamelContext()); + endpointLoader.afterPropertiesSet(); + } + } + + + @Override + protected void doStop() throws Exception { + if (endpointLoader != null) { + endpointLoader.destroy(); + endpointLoader = null; + } + super.doStop(); + } @Override protected JmsConfiguration createConfiguration() { diff --git a/activemq-core/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java b/activemq-core/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java index 39db4d76a1..9f6e1b5dfd 100644 --- a/activemq-core/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java +++ b/activemq-core/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java @@ -31,18 +31,19 @@ import org.apache.activemq.command.ActiveMQQueue; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.Endpoint; -import org.apache.camel.component.jms.JmsEndpoint; +import org.apache.camel.component.jms.JmsQueueEndpoint; import org.apache.camel.util.ObjectHelper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.beans.factory.DisposableBean; import org.springframework.beans.factory.InitializingBean; /** * A helper bean which populates a {@link CamelContext} with ActiveMQ Queue endpoints - * + * * @version $Revision: 1.1 $ */ -public class CamelEndpointLoader implements InitializingBean, CamelContextAware { +public class CamelEndpointLoader implements InitializingBean, DisposableBean, CamelContextAware { private static final transient Log LOG = LogFactory.getLog(CamelEndpointLoader.class); private CamelContext camelContext; private ActiveMQConnection connection; @@ -56,6 +57,54 @@ public class CamelEndpointLoader implements InitializingBean, CamelContextAware this.camelContext = camelContext; } + public void afterPropertiesSet() throws Exception { + ObjectHelper.notNull(camelContext, "camelContext"); + if (connection == null) { + Connection value = getConnectionFactory().createConnection(); + if (value instanceof ActiveMQConnection) { + connection = (ActiveMQConnection) value; + } + else { + throw new IllegalArgumentException("Created JMS Connection is not an ActiveMQConnection: " + value); + } + } + connection.start(); + DestinationSource source = connection.getDestinationSource(); + source.setDestinationListener(new DestinationListener() { + public void onDestinationEvent(DestinationEvent event) { + try { + ActiveMQDestination destination = event.getDestination(); + if (destination instanceof ActiveMQQueue) { + ActiveMQQueue queue = (ActiveMQQueue) destination; + if (event.isAddOperation()) { + addQueue(queue); + } + else { + removeQueue(queue); + } + } + } + catch (Exception e) { + LOG.warn("Caught: " + e, e); + } + } + }); + + Set queues = source.getQueues(); + for (ActiveMQQueue queue : queues) { + addQueue(queue); + } + } + + public void destroy() throws Exception { + if (connection != null) { + connection.close(); + connection = null; + } + } + + // Properties + //------------------------------------------------------------------------- public CamelContext getCamelContext() { return camelContext; } @@ -90,48 +139,13 @@ public class CamelEndpointLoader implements InitializingBean, CamelContextAware this.component = component; } - public void afterPropertiesSet() throws Exception { - ObjectHelper.notNull(camelContext, "camelContext"); - if (connection == null) { - Connection value = getConnectionFactory().createConnection(); - if (value instanceof ActiveMQConnection) { - connection = (ActiveMQConnection) value; - } - else { - throw new IllegalArgumentException("Created JMS Connection is not an ActiveMQConnection: " + value); - } - } - DestinationSource source = connection.getDestinationSource(); - source.setDestinationListener(new DestinationListener() { - public void onDestinationEvent(DestinationEvent event) { - try { - ActiveMQDestination destination = event.getDestination(); - if (destination instanceof ActiveMQQueue) { - ActiveMQQueue queue = (ActiveMQQueue) destination; - if (event.isAddOperation()) { - addQueue(queue); - } - else { - removeQueue(queue); - } - } - } - catch (Exception e) { - LOG.warn("Caught: " + e, e); - } - } - }); - - Set queues = source.getQueues(); - for (ActiveMQQueue queue : queues) { - addQueue(queue); - } - } + // Implementation methods + //------------------------------------------------------------------------- protected void addQueue(ActiveMQQueue queue) throws Exception { String queueUri = getQueueUri(queue); ActiveMQComponent jmsComponent = getComponent(); - Endpoint endpoint = new JmsEndpoint(queueUri, jmsComponent, queue.getPhysicalName(), false, jmsComponent.getConfiguration()); + Endpoint endpoint = new JmsQueueEndpoint(queueUri, jmsComponent, queue.getPhysicalName(), jmsComponent.getConfiguration()); camelContext.addSingletonEndpoint(queueUri, endpoint); } diff --git a/activemq-core/src/test/java/org/apache/activemq/camel/component/AutoExposeQueuesInCamelTest.java b/activemq-core/src/test/java/org/apache/activemq/camel/component/AutoExposeQueuesInCamelTest.java new file mode 100644 index 0000000000..e02d68ffe5 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/camel/component/AutoExposeQueuesInCamelTest.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.camel.component; + +import java.util.List; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.camel.CamelContext; +import org.apache.camel.CamelTemplate; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.spi.BrowsableEndpoint; +import org.apache.camel.util.CamelContextHelper; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * Shows that we can see the queues inside ActiveMQ via Camel + * by enabling the {@link ActiveMQComponent#setExposeAllQueues(boolean)} flag + * + * @version $Revision$ + */ +public class AutoExposeQueuesInCamelTest extends EmbeddedBrokerTestSupport { + private static final transient Log LOG = LogFactory.getLog(AutoExposeQueuesInCamelTest.class); + + protected ActiveMQQueue sampleQueue = new ActiveMQQueue("foo.bar"); + protected ActiveMQTopic sampleTopic = new ActiveMQTopic("cheese"); + + protected CamelContext camelContext = new DefaultCamelContext(); + protected CamelTemplate template; + + public void testWorks() throws Exception { + Thread.sleep(2000); + LOG.debug("Looking for endpoints..."); + List endpoints = CamelContextHelper.getSingletonEndpoints(camelContext, BrowsableEndpoint.class); + for (BrowsableEndpoint endpoint : endpoints) { + LOG.debug("Endpoint: " + endpoint); + } + assertEquals("Should have found an endpoint: "+ endpoints, 1, endpoints.size()); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + + // lets configure the ActiveMQ component for Camel + ActiveMQComponent component = new ActiveMQComponent(); + component.setBrokerURL(bindAddress); + component.setExposeAllQueues(true); + + camelContext.addComponent("activemq", component); + camelContext.start(); + } + + @Override + protected void tearDown() throws Exception { + camelContext.stop(); + super.tearDown(); + } + + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + broker.setDestinations(new ActiveMQDestination[]{ + sampleQueue, + sampleTopic + }); + return broker; + } + +} \ No newline at end of file diff --git a/pom.xml b/pom.xml index 08d0b40acb..8e373702ef 100755 --- a/pom.xml +++ b/pom.xml @@ -37,7 +37,11 @@ 1.0 1.0-M3-dev 1.2-RC1 - 1.2.0 + + 1.3-SNAPSHOT 2.0 1.6.1 3.1