From b2a91b96c48f47507d00cbee319a28052a538f4b Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Wed, 5 Jun 2013 12:56:55 +0000 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4574 - EndpointCompleter functionality for camel component git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1489843 13f79535-47bb-0310-9956-ffa450edef68 --- .../camel/component/ActiveMQComponent.java | 67 +++++++++++++++++-- .../camel/component/CamelEndpointLoader.java | 49 +------------- .../AutoExposeQueuesInCamelTest.java | 48 ++++++++++--- pom.xml | 2 +- 4 files changed, 104 insertions(+), 62 deletions(-) diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java index 38491740d0..a1e4a909dc 100644 --- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/ActiveMQComponent.java @@ -17,25 +17,33 @@ package org.apache.activemq.camel.component; import java.net.URISyntaxException; -import java.util.Map; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.EnhancedConnection; import org.apache.activemq.Service; +import org.apache.activemq.advisory.DestinationSource; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.camel.CamelContext; +import org.apache.camel.ComponentConfiguration; import org.apache.camel.component.jms.JmsComponent; import org.apache.camel.component.jms.JmsConfiguration; +import org.apache.camel.spi.EndpointCompleter; import org.apache.camel.util.IntrospectionSupport; import org.apache.camel.util.ObjectHelper; import org.apache.camel.util.URISupport; import org.springframework.jms.connection.SingleConnectionFactory; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; + /** * The ActiveMQ Component * * */ -public class ActiveMQComponent extends JmsComponent { +public class ActiveMQComponent extends JmsComponent implements EndpointCompleter { private final CopyOnWriteArrayList singleConnectionFactoryList = new CopyOnWriteArrayList(); private final CopyOnWriteArrayList pooledConnectionFactoryServiceList = @@ -43,6 +51,10 @@ public class ActiveMQComponent extends JmsComponent { private boolean exposeAllQueues; private CamelEndpointLoader endpointLoader; + private EnhancedConnection connection; + private ConnectionFactory connectionFactory; + DestinationSource source; + /** * Creates an ActiveMQ Component * @@ -160,17 +172,30 @@ public class ActiveMQComponent extends JmsComponent { @Override protected void doStart() throws Exception { super.doStart(); + + if (connection == null) { + Connection value = getConnectionFactory().createConnection(); + if (value instanceof EnhancedConnection) { + connection = (EnhancedConnection) value; + } + else { + throw new IllegalArgumentException("Created JMS Connection is not an EnhancedConnection: " + value); + } + } + connection.start(); + source = connection.getDestinationSource(); + if (isExposeAllQueues()) { - endpointLoader = new CamelEndpointLoader(getCamelContext()); + endpointLoader = new CamelEndpointLoader(getCamelContext(), source); endpointLoader.afterPropertiesSet(); } } @Override protected void doStop() throws Exception { - if (endpointLoader != null) { - endpointLoader.destroy(); - endpointLoader = null; + if (connection != null) { + connection.close(); + connection = null; } for (Service s : pooledConnectionFactoryServiceList) { s.stop(); @@ -197,4 +222,34 @@ public class ActiveMQComponent extends JmsComponent { answer.setActiveMQComponent(this); return answer; } + + public ConnectionFactory getConnectionFactory() { + if (connectionFactory == null + && getConfiguration() instanceof ActiveMQConfiguration) { + connectionFactory = ((ActiveMQConfiguration)getConfiguration()).createConnectionFactory(); + } + return connectionFactory; + } + + @Override + public List completeEndpointPath(ComponentConfiguration componentConfiguration, String completionText) { + Set candidates = source.getQueues(); + String destinationName = completionText; + if (completionText.startsWith("topic:")) { + candidates = source.getTopics(); + destinationName = completionText.substring(6); + } else if (completionText.startsWith("queue:")) { + destinationName = completionText.substring(6); + } + + Iterator it = candidates.iterator(); + ArrayList answer = new ArrayList(); + while (it.hasNext()) { + ActiveMQDestination destination = (ActiveMQDestination)it.next(); + if (destination.getPhysicalName().startsWith(destinationName)) { + answer.add(destination.getPhysicalName()); + } + } + return answer; + } } diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java index 92049de5d5..1bf48d54d1 100644 --- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/CamelEndpointLoader.java @@ -49,15 +49,15 @@ import org.slf4j.LoggerFactory; public class CamelEndpointLoader implements CamelContextAware { private static final transient Logger LOG = LoggerFactory.getLogger(CamelEndpointLoader.class); private CamelContext camelContext; - private EnhancedConnection connection; - private ConnectionFactory connectionFactory; private ActiveMQComponent component; + DestinationSource source; public CamelEndpointLoader() { } - public CamelEndpointLoader(CamelContext camelContext) { + public CamelEndpointLoader(CamelContext camelContext, DestinationSource source) { this.camelContext = camelContext; + this.source = source; } /** @@ -67,18 +67,6 @@ public class CamelEndpointLoader implements CamelContextAware { */ @PostConstruct public void afterPropertiesSet() throws Exception { - ObjectHelper.notNull(camelContext, "camelContext"); - if (connection == null) { - Connection value = getConnectionFactory().createConnection(); - if (value instanceof EnhancedConnection) { - connection = (EnhancedConnection) value; - } - else { - throw new IllegalArgumentException("Created JMS Connection is not an EnhancedConnection: " + value); - } - } - connection.start(); - DestinationSource source = connection.getDestinationSource(); source.setDestinationListener(new DestinationListener() { public void onDestinationEvent(DestinationEvent event) { try { @@ -119,20 +107,6 @@ public class CamelEndpointLoader implements CamelContextAware { } } - - /** - * - * @throws Exception - * @org.apache.xbean.DestroyMethod - */ - @PreDestroy - public void destroy() throws Exception { - if (connection != null) { - connection.close(); - connection = null; - } - } - // Properties //------------------------------------------------------------------------- public CamelContext getCamelContext() { @@ -143,23 +117,6 @@ public class CamelEndpointLoader implements CamelContextAware { this.camelContext = camelContext; } - public EnhancedConnection getConnection() { - return connection; - } - - public ConnectionFactory getConnectionFactory() { - if (connectionFactory == null - && getComponent().getConfiguration() instanceof ActiveMQConfiguration) { - connectionFactory = ((ActiveMQConfiguration) getComponent() - .getConfiguration()).createConnectionFactory(); - } - return connectionFactory; - } - - public void setConnectionFactory(ConnectionFactory connectionFactory) { - this.connectionFactory = connectionFactory; - } - public ActiveMQComponent getComponent() { if (component == null) { component = camelContext.getComponent("activemq", ActiveMQComponent.class); diff --git a/activemq-camel/src/test/java/org/apache/activemq/camel/component/AutoExposeQueuesInCamelTest.java b/activemq-camel/src/test/java/org/apache/activemq/camel/component/AutoExposeQueuesInCamelTest.java index 03a3fcac89..77a687ad65 100644 --- a/activemq-camel/src/test/java/org/apache/activemq/camel/component/AutoExposeQueuesInCamelTest.java +++ b/activemq-camel/src/test/java/org/apache/activemq/camel/component/AutoExposeQueuesInCamelTest.java @@ -16,23 +16,26 @@ */ package org.apache.activemq.camel.component; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.EmbeddedBrokerTestSupport; -import org.apache.activemq.broker.BrokerService; import org.apache.camel.CamelContext; import org.apache.camel.Endpoint; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.spi.BrowsableEndpoint; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + /** * Shows that we can see the queues inside ActiveMQ via Camel * by enabling the {@link ActiveMQComponent#setExposeAllQueues(boolean)} flag @@ -46,17 +49,44 @@ public class AutoExposeQueuesInCamelTest extends EmbeddedBrokerTestSupport { protected ActiveMQTopic sampleTopic = new ActiveMQTopic("cheese"); protected CamelContext camelContext = new DefaultCamelContext(); + ActiveMQComponent component; public void testWorks() throws Exception { Thread.sleep(2000); LOG.debug("Looking for endpoints..."); + broker.getAdminView().addQueue("runtime"); + + Thread.sleep(1000); // Changed from using CamelContextHelper.getSingletonEndpoints here because JMS Endpoints in Camel // are always non-singleton List endpoints = getEndpoints(camelContext, BrowsableEndpoint.class); for (BrowsableEndpoint endpoint : endpoints) { LOG.debug("Endpoint: " + endpoint); } - assertEquals("Should have found an endpoint: "+ endpoints, 1, endpoints.size()); + assertEquals("Should have found an endpoint: "+ endpoints, 2, endpoints.size()); + } + + public void testCompleter() throws Exception { + Thread.sleep(1000); + List result = component.completeEndpointPath(null, "foo"); + assertThat(result, is(Arrays.asList("foo.bar"))); + result = component.completeEndpointPath(null, "queue:foo"); + assertThat(result, is(Arrays.asList("foo.bar"))); + result = component.completeEndpointPath(null, "topic:ch"); + assertThat(result, is(Arrays.asList("cheese"))); + result = component.completeEndpointPath(null, "ch"); + assertTrue(result.isEmpty()); + result = component.completeEndpointPath(null, "queue:ch"); + assertTrue(result.isEmpty()); + result = component.completeEndpointPath(null, "topic:foo"); + assertTrue(result.isEmpty()); + + broker.getAdminView().addQueue("runtime"); + + Thread.sleep(1000); + + result = component.completeEndpointPath(null, "run"); + assertThat(result, is(Arrays.asList("runtime"))); } public List getEndpoints(CamelContext camelContext, Class type) { @@ -76,7 +106,7 @@ public class AutoExposeQueuesInCamelTest extends EmbeddedBrokerTestSupport { super.setUp(); // lets configure the ActiveMQ component for Camel - ActiveMQComponent component = new ActiveMQComponent(); + component = new ActiveMQComponent(); component.setBrokerURL(bindAddress); component.setExposeAllQueues(true); diff --git a/pom.xml b/pom.xml index a58534c94b..15010c71f8 100755 --- a/pom.xml +++ b/pom.xml @@ -46,7 +46,7 @@ 1.0 1.0.0 1.0-M3-dev - 2.10.4 + 2.12-SNAPSHOT [2.10,3) 2.0 1.8.3