mirror of https://github.com/apache/activemq.git
added support to be able to auto-expose ActiveMQ Queues into a CamelContext so that they are browsable by any Camel based tooling
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@634686 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1a01d13731
commit
26eedea5fb
|
@ -26,6 +26,9 @@ import org.apache.camel.component.jms.JmsConfiguration;
|
||||||
* @version $Revision$
|
* @version $Revision$
|
||||||
*/
|
*/
|
||||||
public class ActiveMQComponent extends JmsComponent {
|
public class ActiveMQComponent extends JmsComponent {
|
||||||
|
private boolean exposeAllQueues;
|
||||||
|
private CamelEndpointLoader endpointLoader;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates an <a href="http://activemq.apache.org/camel/activemq.html">ActiveMQ Component</a>
|
* Creates an <a href="http://activemq.apache.org/camel/activemq.html">ActiveMQ Component</a>
|
||||||
*
|
*
|
||||||
|
@ -68,6 +71,38 @@ public class ActiveMQComponent extends JmsComponent {
|
||||||
getConfiguration().setBrokerURL(brokerURL);
|
getConfiguration().setBrokerURL(brokerURL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isExposeAllQueues() {
|
||||||
|
return exposeAllQueues;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If enabled this will cause all Queues in the ActiveMQ broker to be eagerly populated into the CamelContext
|
||||||
|
* so that they can be easily browsed by any Camel tooling. This option is disabled by default.
|
||||||
|
*
|
||||||
|
* @param exposeAllQueues
|
||||||
|
*/
|
||||||
|
public void setExposeAllQueues(boolean exposeAllQueues) {
|
||||||
|
this.exposeAllQueues = exposeAllQueues;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doStart() throws Exception {
|
||||||
|
super.doStart();
|
||||||
|
if (isExposeAllQueues()) {
|
||||||
|
endpointLoader = new CamelEndpointLoader(getCamelContext());
|
||||||
|
endpointLoader.afterPropertiesSet();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void doStop() throws Exception {
|
||||||
|
if (endpointLoader != null) {
|
||||||
|
endpointLoader.destroy();
|
||||||
|
endpointLoader = null;
|
||||||
|
}
|
||||||
|
super.doStop();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected JmsConfiguration createConfiguration() {
|
protected JmsConfiguration createConfiguration() {
|
||||||
|
|
|
@ -31,10 +31,11 @@ import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.camel.CamelContext;
|
import org.apache.camel.CamelContext;
|
||||||
import org.apache.camel.CamelContextAware;
|
import org.apache.camel.CamelContextAware;
|
||||||
import org.apache.camel.Endpoint;
|
import org.apache.camel.Endpoint;
|
||||||
import org.apache.camel.component.jms.JmsEndpoint;
|
import org.apache.camel.component.jms.JmsQueueEndpoint;
|
||||||
import org.apache.camel.util.ObjectHelper;
|
import org.apache.camel.util.ObjectHelper;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.springframework.beans.factory.DisposableBean;
|
||||||
import org.springframework.beans.factory.InitializingBean;
|
import org.springframework.beans.factory.InitializingBean;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -42,7 +43,7 @@ import org.springframework.beans.factory.InitializingBean;
|
||||||
*
|
*
|
||||||
* @version $Revision: 1.1 $
|
* @version $Revision: 1.1 $
|
||||||
*/
|
*/
|
||||||
public class CamelEndpointLoader implements InitializingBean, CamelContextAware {
|
public class CamelEndpointLoader implements InitializingBean, DisposableBean, CamelContextAware {
|
||||||
private static final transient Log LOG = LogFactory.getLog(CamelEndpointLoader.class);
|
private static final transient Log LOG = LogFactory.getLog(CamelEndpointLoader.class);
|
||||||
private CamelContext camelContext;
|
private CamelContext camelContext;
|
||||||
private ActiveMQConnection connection;
|
private ActiveMQConnection connection;
|
||||||
|
@ -56,6 +57,54 @@ public class CamelEndpointLoader implements InitializingBean, CamelContextAware
|
||||||
this.camelContext = camelContext;
|
this.camelContext = camelContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void afterPropertiesSet() throws Exception {
|
||||||
|
ObjectHelper.notNull(camelContext, "camelContext");
|
||||||
|
if (connection == null) {
|
||||||
|
Connection value = getConnectionFactory().createConnection();
|
||||||
|
if (value instanceof ActiveMQConnection) {
|
||||||
|
connection = (ActiveMQConnection) value;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
throw new IllegalArgumentException("Created JMS Connection is not an ActiveMQConnection: " + value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
connection.start();
|
||||||
|
DestinationSource source = connection.getDestinationSource();
|
||||||
|
source.setDestinationListener(new DestinationListener() {
|
||||||
|
public void onDestinationEvent(DestinationEvent event) {
|
||||||
|
try {
|
||||||
|
ActiveMQDestination destination = event.getDestination();
|
||||||
|
if (destination instanceof ActiveMQQueue) {
|
||||||
|
ActiveMQQueue queue = (ActiveMQQueue) destination;
|
||||||
|
if (event.isAddOperation()) {
|
||||||
|
addQueue(queue);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
removeQueue(queue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
LOG.warn("Caught: " + e, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Set<ActiveMQQueue> queues = source.getQueues();
|
||||||
|
for (ActiveMQQueue queue : queues) {
|
||||||
|
addQueue(queue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void destroy() throws Exception {
|
||||||
|
if (connection != null) {
|
||||||
|
connection.close();
|
||||||
|
connection = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Properties
|
||||||
|
//-------------------------------------------------------------------------
|
||||||
public CamelContext getCamelContext() {
|
public CamelContext getCamelContext() {
|
||||||
return camelContext;
|
return camelContext;
|
||||||
}
|
}
|
||||||
|
@ -90,48 +139,13 @@ public class CamelEndpointLoader implements InitializingBean, CamelContextAware
|
||||||
this.component = component;
|
this.component = component;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void afterPropertiesSet() throws Exception {
|
// Implementation methods
|
||||||
ObjectHelper.notNull(camelContext, "camelContext");
|
//-------------------------------------------------------------------------
|
||||||
if (connection == null) {
|
|
||||||
Connection value = getConnectionFactory().createConnection();
|
|
||||||
if (value instanceof ActiveMQConnection) {
|
|
||||||
connection = (ActiveMQConnection) value;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
throw new IllegalArgumentException("Created JMS Connection is not an ActiveMQConnection: " + value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
DestinationSource source = connection.getDestinationSource();
|
|
||||||
source.setDestinationListener(new DestinationListener() {
|
|
||||||
public void onDestinationEvent(DestinationEvent event) {
|
|
||||||
try {
|
|
||||||
ActiveMQDestination destination = event.getDestination();
|
|
||||||
if (destination instanceof ActiveMQQueue) {
|
|
||||||
ActiveMQQueue queue = (ActiveMQQueue) destination;
|
|
||||||
if (event.isAddOperation()) {
|
|
||||||
addQueue(queue);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
removeQueue(queue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
LOG.warn("Caught: " + e, e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
Set<ActiveMQQueue> queues = source.getQueues();
|
|
||||||
for (ActiveMQQueue queue : queues) {
|
|
||||||
addQueue(queue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected void addQueue(ActiveMQQueue queue) throws Exception {
|
protected void addQueue(ActiveMQQueue queue) throws Exception {
|
||||||
String queueUri = getQueueUri(queue);
|
String queueUri = getQueueUri(queue);
|
||||||
ActiveMQComponent jmsComponent = getComponent();
|
ActiveMQComponent jmsComponent = getComponent();
|
||||||
Endpoint endpoint = new JmsEndpoint(queueUri, jmsComponent, queue.getPhysicalName(), false, jmsComponent.getConfiguration());
|
Endpoint endpoint = new JmsQueueEndpoint(queueUri, jmsComponent, queue.getPhysicalName(), jmsComponent.getConfiguration());
|
||||||
camelContext.addSingletonEndpoint(queueUri, endpoint);
|
camelContext.addSingletonEndpoint(queueUri, endpoint);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,89 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.camel.component;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
import org.apache.activemq.EmbeddedBrokerTestSupport;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.camel.CamelContext;
|
||||||
|
import org.apache.camel.CamelTemplate;
|
||||||
|
import org.apache.camel.impl.DefaultCamelContext;
|
||||||
|
import org.apache.camel.spi.BrowsableEndpoint;
|
||||||
|
import org.apache.camel.util.CamelContextHelper;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shows that we can see the queues inside ActiveMQ via Camel
|
||||||
|
* by enabling the {@link ActiveMQComponent#setExposeAllQueues(boolean)} flag
|
||||||
|
*
|
||||||
|
* @version $Revision$
|
||||||
|
*/
|
||||||
|
public class AutoExposeQueuesInCamelTest extends EmbeddedBrokerTestSupport {
|
||||||
|
private static final transient Log LOG = LogFactory.getLog(AutoExposeQueuesInCamelTest.class);
|
||||||
|
|
||||||
|
protected ActiveMQQueue sampleQueue = new ActiveMQQueue("foo.bar");
|
||||||
|
protected ActiveMQTopic sampleTopic = new ActiveMQTopic("cheese");
|
||||||
|
|
||||||
|
protected CamelContext camelContext = new DefaultCamelContext();
|
||||||
|
protected CamelTemplate template;
|
||||||
|
|
||||||
|
public void testWorks() throws Exception {
|
||||||
|
Thread.sleep(2000);
|
||||||
|
LOG.debug("Looking for endpoints...");
|
||||||
|
List<BrowsableEndpoint> endpoints = CamelContextHelper.getSingletonEndpoints(camelContext, BrowsableEndpoint.class);
|
||||||
|
for (BrowsableEndpoint endpoint : endpoints) {
|
||||||
|
LOG.debug("Endpoint: " + endpoint);
|
||||||
|
}
|
||||||
|
assertEquals("Should have found an endpoint: "+ endpoints, 1, endpoints.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
|
||||||
|
// lets configure the ActiveMQ component for Camel
|
||||||
|
ActiveMQComponent component = new ActiveMQComponent();
|
||||||
|
component.setBrokerURL(bindAddress);
|
||||||
|
component.setExposeAllQueues(true);
|
||||||
|
|
||||||
|
camelContext.addComponent("activemq", component);
|
||||||
|
camelContext.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void tearDown() throws Exception {
|
||||||
|
camelContext.stop();
|
||||||
|
super.tearDown();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected BrokerService createBroker() throws Exception {
|
||||||
|
BrokerService broker = super.createBroker();
|
||||||
|
broker.setDestinations(new ActiveMQDestination[]{
|
||||||
|
sampleQueue,
|
||||||
|
sampleTopic
|
||||||
|
});
|
||||||
|
return broker;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
6
pom.xml
6
pom.xml
|
@ -37,7 +37,11 @@
|
||||||
<aopalliance-version>1.0</aopalliance-version>
|
<aopalliance-version>1.0</aopalliance-version>
|
||||||
<axion-version>1.0-M3-dev</axion-version>
|
<axion-version>1.0-M3-dev</axion-version>
|
||||||
<axis-version>1.2-RC1</axis-version>
|
<axis-version>1.2-RC1</axis-version>
|
||||||
<camel-version>1.2.0</camel-version>
|
<!-- TODO switch to 1.3.0 when its released
|
||||||
|
needed for changes in the org.apache.activemq.camel.component package to automatically make
|
||||||
|
ActiveMQ queues available in Camel as endpoints and make them browsable
|
||||||
|
-->
|
||||||
|
<camel-version>1.3-SNAPSHOT</camel-version>
|
||||||
<cglib-version>2.0</cglib-version>
|
<cglib-version>2.0</cglib-version>
|
||||||
<commons-beanutils-version>1.6.1</commons-beanutils-version>
|
<commons-beanutils-version>1.6.1</commons-beanutils-version>
|
||||||
<commons-collections-version>3.1</commons-collections-version>
|
<commons-collections-version>3.1</commons-collections-version>
|
||||||
|
|
Loading…
Reference in New Issue