From 7ca25965db5b9ffdfba11b914dafbb36a3504162 Mon Sep 17 00:00:00 2001 From: Rob Davies Date: Tue, 9 Sep 2014 16:52:21 +0100 Subject: [PATCH] added CamelRoutesBrokerPlugin for https://issues.apache.org/jira/browse/AMQ-5351 --- .../camel/camelplugin/CamelRoutesBroker.java | 294 ++++++++++++++++++ .../camelplugin/CamelRoutesBrokerPlugin.java | 67 ++++ .../camelplugin/CamelPluginConfigTest.java | 127 ++++++++ .../camelplugin/camel-routes-activemq.xml | 29 ++ .../activemq/camel/camelplugin/routes.xml | 22 ++ activemq-spring/pom.xml | 1 + 6 files changed, 540 insertions(+) create mode 100644 activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBroker.java create mode 100644 activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBrokerPlugin.java create mode 100644 activemq-camel/src/test/java/org/apache/activemq/camel/camelplugin/CamelPluginConfigTest.java create mode 100644 activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/camel-routes-activemq.xml create mode 100644 activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/routes.xml diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBroker.java b/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBroker.java new file mode 100644 index 0000000000..4d5bbb2d41 --- /dev/null +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBroker.java @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.camel.camelplugin; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerContext; +import org.apache.activemq.broker.BrokerFilter; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.ConsumerBrokerExchange; +import org.apache.activemq.broker.ProducerBrokerExchange; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.command.ConsumerControl; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.Response; +import org.apache.activemq.command.TransactionId; +import org.apache.activemq.spring.Utils; +import org.apache.activemq.usage.Usage; +import org.apache.camel.impl.DefaultCamelContext; +import org.apache.camel.model.RouteDefinition; +import org.apache.camel.model.RoutesDefinition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.io.Resource; + +import java.io.File; +import java.io.InputStream; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +/** + * A StatisticsBroker You can retrieve a Map Message for a Destination - or + * Broker containing statistics as key-value pairs The message must contain a + * replyTo Destination - else its ignored + * + */ +public class CamelRoutesBroker extends BrokerFilter { + private static Logger LOG = LoggerFactory.getLogger(CamelRoutesBroker.class); + private String routesFile = ""; + private int checkPeriod = 1000; + private Resource theRoutes; + private DefaultCamelContext camelContext; + private long lastRoutesModified = -1; + private CountDownLatch countDownLatch; + + /** + * Overide methods to pause the broker whilst camel routes are loaded + */ + @Override + public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception { + blockWhileLoadingCamelRoutes(); + super.send(producerExchange, message); + } + + @Override + public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception { + blockWhileLoadingCamelRoutes(); + super.acknowledge(consumerExchange, ack); + } + + @Override + public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception { + blockWhileLoadingCamelRoutes(); + return super.messagePull(context, pull); + } + + @Override + public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) { + blockWhileLoadingCamelRoutes(); + super.processConsumerControl(consumerExchange, control); + } + + @Override + public void reapplyInterceptor() { + blockWhileLoadingCamelRoutes(); + super.reapplyInterceptor(); + } + + @Override + public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception { + blockWhileLoadingCamelRoutes(); + super.beginTransaction(context, xid); + } + + @Override + public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception { + blockWhileLoadingCamelRoutes(); + return super.prepareTransaction(context, xid); + } + + @Override + public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception { + blockWhileLoadingCamelRoutes(); + super.rollbackTransaction(context, xid); + } + + @Override + public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception { + blockWhileLoadingCamelRoutes(); + super.commitTransaction(context, xid, onePhase); + } + + @Override + public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception { + blockWhileLoadingCamelRoutes(); + super.forgetTransaction(context, transactionId); + } + + @Override + public void preProcessDispatch(MessageDispatch messageDispatch) { + blockWhileLoadingCamelRoutes(); + super.preProcessDispatch(messageDispatch); + } + + @Override + public void postProcessDispatch(MessageDispatch messageDispatch) { + blockWhileLoadingCamelRoutes(); + super.postProcessDispatch(messageDispatch); + } + + @Override + public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause) { + blockWhileLoadingCamelRoutes(); + return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause); + } + + @Override + public void messageConsumed(ConnectionContext context, MessageReference messageReference) { + blockWhileLoadingCamelRoutes(); + super.messageConsumed(context, messageReference); + } + + @Override + public void messageDelivered(ConnectionContext context, MessageReference messageReference) { + blockWhileLoadingCamelRoutes(); + super.messageDelivered(context, messageReference); + } + + @Override + public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) { + blockWhileLoadingCamelRoutes(); + super.messageDiscarded(context, sub, messageReference); + } + + @Override + public void isFull(ConnectionContext context, Destination destination, Usage usage) { + blockWhileLoadingCamelRoutes(); + super.isFull(context, destination, usage); + } + + @Override + public void nowMasterBroker() { + blockWhileLoadingCamelRoutes(); + super.nowMasterBroker(); + } + + /* + * Properties + */ + + public String getRoutesFile() { + return routesFile; + } + + public void setRoutesFile(String routesFile) { + this.routesFile = routesFile; + } + + public int getCheckPeriod() { + return checkPeriod; + } + + public void setCheckPeriod(int checkPeriod) { + this.checkPeriod = checkPeriod; + } + + public CamelRoutesBroker(Broker next) { + super(next); + } + + @Override + public void start() throws Exception { + super.start(); + LOG.info("Starting CamelRoutesBroker"); + + camelContext = new DefaultCamelContext(); + camelContext.setName("EmbeddedCamel-" + getBrokerName()); + camelContext.start(); + + getBrokerService().getScheduler().executePeriodically(new Runnable() { + @Override + public void run() { + try { + loadCamelRoutes(); + } catch (Throwable e) { + LOG.error("Failed to load Camel Routes", e); + } + + } + }, getCheckPeriod()); + } + + + + @Override + public void stop() throws Exception { + CountDownLatch latch = this.countDownLatch; + if (latch != null){ + latch.countDown(); + } + if (camelContext != null){ + camelContext.stop(); + } + super.stop(); + } + + private void loadCamelRoutes() throws Exception{ + if (theRoutes == null) { + String fileToUse = getRoutesFile(); + if (fileToUse == null || fileToUse.trim().isEmpty()) { + BrokerContext brokerContext = getBrokerService().getBrokerContext(); + if (brokerContext != null) { + String uri = brokerContext.getConfigurationUrl(); + Resource resource = Utils.resourceFromString(uri); + if (resource.exists()) { + fileToUse = resource.getFile().getParent(); + fileToUse += File.separator; + fileToUse += "routes.xml"; + } + } + } + if (fileToUse != null && !fileToUse.isEmpty()){ + theRoutes = Utils.resourceFromString(fileToUse); + setRoutesFile(theRoutes.getFile().getAbsolutePath()); + } + } + if (!isStopped() && camelContext != null && theRoutes != null && theRoutes.exists()){ + long lastModified = theRoutes.lastModified(); + if (lastModified != lastRoutesModified){ + CountDownLatch latch = new CountDownLatch(1); + this.countDownLatch = latch; + lastRoutesModified = lastModified; + + List currentRoutes = camelContext.getRouteDefinitions(); + for (RouteDefinition rd:currentRoutes){ + camelContext.stopRoute(rd); + camelContext.removeRouteDefinition(rd); + } + InputStream is = theRoutes.getInputStream(); + RoutesDefinition routesDefinition = camelContext.loadRoutesDefinition(is); + + for (RouteDefinition rd: routesDefinition.getRoutes()){ + camelContext.startRoute(rd); + } + is.close(); + latch.countDown(); + this.countDownLatch=null; + } + + + } + } + + private void blockWhileLoadingCamelRoutes(){ + CountDownLatch latch = this.countDownLatch; + if (latch != null){ + try { + latch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + +} diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBrokerPlugin.java b/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBrokerPlugin.java new file mode 100644 index 0000000000..2e9833fdc7 --- /dev/null +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/camelplugin/CamelRoutesBrokerPlugin.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.camel.camelplugin; + +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerPlugin; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A CamelRoutesBrokerPlugin + * + * load camel routes dynamically from a routes.xml file located in same directory as ActiveMQ.xml + * + * @org.apache.xbean.XBean element="camelRoutesBrokerPlugin" + * + */ +public class CamelRoutesBrokerPlugin implements BrokerPlugin { + private static Logger LOG = LoggerFactory.getLogger(CamelRoutesBrokerPlugin.class); + private String routesFile = ""; + private int checkPeriod =1000; + + public String getRoutesFile() { + return routesFile; + } + + public void setRoutesFile(String routesFile) { + this.routesFile = routesFile; + } + + public int getCheckPeriod() { + return checkPeriod; + } + + public void setCheckPeriod(int checkPeriod) { + this.checkPeriod = checkPeriod; + } + + /** + * @param broker + * @return the plug-in + * @throws Exception + * @see org.apache.activemq.broker.BrokerPlugin#installPlugin(org.apache.activemq.broker.Broker) + */ + public Broker installPlugin(Broker broker) throws Exception { + CamelRoutesBroker answer = new CamelRoutesBroker(broker); + answer.setCheckPeriod(getCheckPeriod()); + answer.setRoutesFile(getRoutesFile()); + LOG.info("Installing CamelRoutesBroker"); + return answer; + } +} diff --git a/activemq-camel/src/test/java/org/apache/activemq/camel/camelplugin/CamelPluginConfigTest.java b/activemq-camel/src/test/java/org/apache/activemq/camel/camelplugin/CamelPluginConfigTest.java new file mode 100644 index 0000000000..5db3cc23c5 --- /dev/null +++ b/activemq-camel/src/test/java/org/apache/activemq/camel/camelplugin/CamelPluginConfigTest.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.camel.camelplugin; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerRegistry; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.xbean.XBeanBrokerFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.FileSystemResource; +import org.springframework.core.io.Resource; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; + +public class CamelPluginConfigTest { + + protected static final String CONF_ROOT = "src/test/resources/org/apache/activemq/camel/camelplugin/"; + protected static final String TOPIC_NAME = "test.topic"; + protected static final String QUEUE_NAME = "test.queue"; + + protected BrokerService brokerService; + protected ActiveMQConnectionFactory factory; + protected Connection producerConnection; + protected Connection consumerConnection; + protected Session consumerSession; + protected Session producerSession; + + protected int messageCount = 1000; + protected int timeOutInSeconds = 10; + + @Before + public void setUp() throws Exception { + brokerService = createBroker(new FileSystemResource(CONF_ROOT + "camel-routes-activemq.xml")); + + factory = new ActiveMQConnectionFactory(BrokerRegistry.getInstance().findFirst().getVmConnectorURI()); + consumerConnection = factory.createConnection(); + consumerConnection.start(); + producerConnection = factory.createConnection(); + producerConnection.start(); + consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + protected BrokerService createBroker(String resource) throws Exception { + return createBroker(new ClassPathResource(resource)); + } + + protected BrokerService createBroker(Resource resource) throws Exception { + + XBeanBrokerFactory factory = new XBeanBrokerFactory(); + BrokerService broker = factory.createBroker(resource.getURI()); + return broker; + } + + @After + public void tearDown() throws Exception { + if (producerConnection != null) { + producerConnection.close(); + } + if (consumerConnection != null) { + consumerConnection.close(); + } + if (brokerService != null) { + brokerService.stop(); + } + } + + @Test + public void testReRouteAll() throws Exception { + Thread.sleep(2000); + final ActiveMQQueue queue = new ActiveMQQueue(QUEUE_NAME); + + Topic topic = consumerSession.createTopic(TOPIC_NAME); + + final CountDownLatch latch = new CountDownLatch(messageCount); + MessageConsumer consumer = consumerSession.createConsumer(queue); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(javax.jms.Message message) { + try { + latch.countDown(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + }); + MessageProducer producer = producerSession.createProducer(topic); + + for (int i = 0; i < messageCount; i++) { + javax.jms.Message message = producerSession.createTextMessage("test: " + i); + producer.send(message); + } + + latch.await(timeOutInSeconds, TimeUnit.SECONDS); + assertEquals(0, latch.getCount()); + + } +} diff --git a/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/camel-routes-activemq.xml b/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/camel-routes-activemq.xml new file mode 100644 index 0000000000..a6be02b0c8 --- /dev/null +++ b/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/camel-routes-activemq.xml @@ -0,0 +1,29 @@ + + + + + + + + + + diff --git a/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/routes.xml b/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/routes.xml new file mode 100644 index 0000000000..d44ae06711 --- /dev/null +++ b/activemq-camel/src/test/resources/org/apache/activemq/camel/camelplugin/routes.xml @@ -0,0 +1,22 @@ + + + + + + + diff --git a/activemq-spring/pom.xml b/activemq-spring/pom.xml index afe1a69aac..6efa019086 100755 --- a/activemq-spring/pom.xml +++ b/activemq-spring/pom.xml @@ -238,6 +238,7 @@ ${basedir}/../activemq-client/src/main/java ${basedir}/../activemq-broker/src/main/java + ${basedir}/../activemq-camel/src/main/java ${basedir}/../activemq-leveldb-store/src/main/java ${basedir}/../activemq-jdbc-store/src/main/java ${basedir}/../activemq-kahadb-store/src/main/java