This commit is contained in:
Rob Davies 2014-09-09 16:52:21 +01:00
parent b2e6a41661
commit 7ca25965db
6 changed files with 540 additions and 0 deletions

View File

@ -0,0 +1,294 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.camelplugin;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerContext;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ConsumerControl;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.spring.Utils;
import org.apache.activemq.usage.Usage;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.model.RoutesDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.Resource;
import java.io.File;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* A StatisticsBroker You can retrieve a Map Message for a Destination - or
* Broker containing statistics as key-value pairs The message must contain a
* replyTo Destination - else its ignored
*
*/
public class CamelRoutesBroker extends BrokerFilter {
private static Logger LOG = LoggerFactory.getLogger(CamelRoutesBroker.class);
private String routesFile = "";
private int checkPeriod = 1000;
private Resource theRoutes;
private DefaultCamelContext camelContext;
private long lastRoutesModified = -1;
private CountDownLatch countDownLatch;
/**
* Overide methods to pause the broker whilst camel routes are loaded
*/
@Override
public void send(ProducerBrokerExchange producerExchange, Message message) throws Exception {
blockWhileLoadingCamelRoutes();
super.send(producerExchange, message);
}
@Override
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {
blockWhileLoadingCamelRoutes();
super.acknowledge(consumerExchange, ack);
}
@Override
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
blockWhileLoadingCamelRoutes();
return super.messagePull(context, pull);
}
@Override
public void processConsumerControl(ConsumerBrokerExchange consumerExchange, ConsumerControl control) {
blockWhileLoadingCamelRoutes();
super.processConsumerControl(consumerExchange, control);
}
@Override
public void reapplyInterceptor() {
blockWhileLoadingCamelRoutes();
super.reapplyInterceptor();
}
@Override
public void beginTransaction(ConnectionContext context, TransactionId xid) throws Exception {
blockWhileLoadingCamelRoutes();
super.beginTransaction(context, xid);
}
@Override
public int prepareTransaction(ConnectionContext context, TransactionId xid) throws Exception {
blockWhileLoadingCamelRoutes();
return super.prepareTransaction(context, xid);
}
@Override
public void rollbackTransaction(ConnectionContext context, TransactionId xid) throws Exception {
blockWhileLoadingCamelRoutes();
super.rollbackTransaction(context, xid);
}
@Override
public void commitTransaction(ConnectionContext context, TransactionId xid, boolean onePhase) throws Exception {
blockWhileLoadingCamelRoutes();
super.commitTransaction(context, xid, onePhase);
}
@Override
public void forgetTransaction(ConnectionContext context, TransactionId transactionId) throws Exception {
blockWhileLoadingCamelRoutes();
super.forgetTransaction(context, transactionId);
}
@Override
public void preProcessDispatch(MessageDispatch messageDispatch) {
blockWhileLoadingCamelRoutes();
super.preProcessDispatch(messageDispatch);
}
@Override
public void postProcessDispatch(MessageDispatch messageDispatch) {
blockWhileLoadingCamelRoutes();
super.postProcessDispatch(messageDispatch);
}
@Override
public boolean sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription, Throwable poisonCause) {
blockWhileLoadingCamelRoutes();
return super.sendToDeadLetterQueue(context, messageReference, subscription, poisonCause);
}
@Override
public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
blockWhileLoadingCamelRoutes();
super.messageConsumed(context, messageReference);
}
@Override
public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
blockWhileLoadingCamelRoutes();
super.messageDelivered(context, messageReference);
}
@Override
public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
blockWhileLoadingCamelRoutes();
super.messageDiscarded(context, sub, messageReference);
}
@Override
public void isFull(ConnectionContext context, Destination destination, Usage usage) {
blockWhileLoadingCamelRoutes();
super.isFull(context, destination, usage);
}
@Override
public void nowMasterBroker() {
blockWhileLoadingCamelRoutes();
super.nowMasterBroker();
}
/*
* Properties
*/
public String getRoutesFile() {
return routesFile;
}
public void setRoutesFile(String routesFile) {
this.routesFile = routesFile;
}
public int getCheckPeriod() {
return checkPeriod;
}
public void setCheckPeriod(int checkPeriod) {
this.checkPeriod = checkPeriod;
}
public CamelRoutesBroker(Broker next) {
super(next);
}
@Override
public void start() throws Exception {
super.start();
LOG.info("Starting CamelRoutesBroker");
camelContext = new DefaultCamelContext();
camelContext.setName("EmbeddedCamel-" + getBrokerName());
camelContext.start();
getBrokerService().getScheduler().executePeriodically(new Runnable() {
@Override
public void run() {
try {
loadCamelRoutes();
} catch (Throwable e) {
LOG.error("Failed to load Camel Routes", e);
}
}
}, getCheckPeriod());
}
@Override
public void stop() throws Exception {
CountDownLatch latch = this.countDownLatch;
if (latch != null){
latch.countDown();
}
if (camelContext != null){
camelContext.stop();
}
super.stop();
}
private void loadCamelRoutes() throws Exception{
if (theRoutes == null) {
String fileToUse = getRoutesFile();
if (fileToUse == null || fileToUse.trim().isEmpty()) {
BrokerContext brokerContext = getBrokerService().getBrokerContext();
if (brokerContext != null) {
String uri = brokerContext.getConfigurationUrl();
Resource resource = Utils.resourceFromString(uri);
if (resource.exists()) {
fileToUse = resource.getFile().getParent();
fileToUse += File.separator;
fileToUse += "routes.xml";
}
}
}
if (fileToUse != null && !fileToUse.isEmpty()){
theRoutes = Utils.resourceFromString(fileToUse);
setRoutesFile(theRoutes.getFile().getAbsolutePath());
}
}
if (!isStopped() && camelContext != null && theRoutes != null && theRoutes.exists()){
long lastModified = theRoutes.lastModified();
if (lastModified != lastRoutesModified){
CountDownLatch latch = new CountDownLatch(1);
this.countDownLatch = latch;
lastRoutesModified = lastModified;
List<RouteDefinition> currentRoutes = camelContext.getRouteDefinitions();
for (RouteDefinition rd:currentRoutes){
camelContext.stopRoute(rd);
camelContext.removeRouteDefinition(rd);
}
InputStream is = theRoutes.getInputStream();
RoutesDefinition routesDefinition = camelContext.loadRoutesDefinition(is);
for (RouteDefinition rd: routesDefinition.getRoutes()){
camelContext.startRoute(rd);
}
is.close();
latch.countDown();
this.countDownLatch=null;
}
}
}
private void blockWhileLoadingCamelRoutes(){
CountDownLatch latch = this.countDownLatch;
if (latch != null){
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.camelplugin;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerPlugin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A CamelRoutesBrokerPlugin
*
* load camel routes dynamically from a routes.xml file located in same directory as ActiveMQ.xml
*
* @org.apache.xbean.XBean element="camelRoutesBrokerPlugin"
*
*/
public class CamelRoutesBrokerPlugin implements BrokerPlugin {
private static Logger LOG = LoggerFactory.getLogger(CamelRoutesBrokerPlugin.class);
private String routesFile = "";
private int checkPeriod =1000;
public String getRoutesFile() {
return routesFile;
}
public void setRoutesFile(String routesFile) {
this.routesFile = routesFile;
}
public int getCheckPeriod() {
return checkPeriod;
}
public void setCheckPeriod(int checkPeriod) {
this.checkPeriod = checkPeriod;
}
/**
* @param broker
* @return the plug-in
* @throws Exception
* @see org.apache.activemq.broker.BrokerPlugin#installPlugin(org.apache.activemq.broker.Broker)
*/
public Broker installPlugin(Broker broker) throws Exception {
CamelRoutesBroker answer = new CamelRoutesBroker(broker);
answer.setCheckPeriod(getCheckPeriod());
answer.setRoutesFile(getRoutesFile());
LOG.info("Installing CamelRoutesBroker");
return answer;
}
}

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel.camelplugin;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.xbean.XBeanBrokerFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
public class CamelPluginConfigTest {
protected static final String CONF_ROOT = "src/test/resources/org/apache/activemq/camel/camelplugin/";
protected static final String TOPIC_NAME = "test.topic";
protected static final String QUEUE_NAME = "test.queue";
protected BrokerService brokerService;
protected ActiveMQConnectionFactory factory;
protected Connection producerConnection;
protected Connection consumerConnection;
protected Session consumerSession;
protected Session producerSession;
protected int messageCount = 1000;
protected int timeOutInSeconds = 10;
@Before
public void setUp() throws Exception {
brokerService = createBroker(new FileSystemResource(CONF_ROOT + "camel-routes-activemq.xml"));
factory = new ActiveMQConnectionFactory(BrokerRegistry.getInstance().findFirst().getVmConnectorURI());
consumerConnection = factory.createConnection();
consumerConnection.start();
producerConnection = factory.createConnection();
producerConnection.start();
consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
protected BrokerService createBroker(String resource) throws Exception {
return createBroker(new ClassPathResource(resource));
}
protected BrokerService createBroker(Resource resource) throws Exception {
XBeanBrokerFactory factory = new XBeanBrokerFactory();
BrokerService broker = factory.createBroker(resource.getURI());
return broker;
}
@After
public void tearDown() throws Exception {
if (producerConnection != null) {
producerConnection.close();
}
if (consumerConnection != null) {
consumerConnection.close();
}
if (brokerService != null) {
brokerService.stop();
}
}
@Test
public void testReRouteAll() throws Exception {
Thread.sleep(2000);
final ActiveMQQueue queue = new ActiveMQQueue(QUEUE_NAME);
Topic topic = consumerSession.createTopic(TOPIC_NAME);
final CountDownLatch latch = new CountDownLatch(messageCount);
MessageConsumer consumer = consumerSession.createConsumer(queue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(javax.jms.Message message) {
try {
latch.countDown();
} catch (Throwable e) {
e.printStackTrace();
}
}
});
MessageProducer producer = producerSession.createProducer(topic);
for (int i = 0; i < messageCount; i++) {
javax.jms.Message message = producerSession.createTextMessage("test: " + i);
producer.send(message);
}
latch.await(timeOutInSeconds, TimeUnit.SECONDS);
assertEquals(0, latch.getCount());
}
}

View File

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" persistent="false">
<plugins>
<camelRoutesBrokerPlugin checkPeriod="100" />
</plugins>
</broker>
</beans>

View File

@ -0,0 +1,22 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<routes xmlns="http://camel.apache.org/schema/spring">
<route id="test">
<from uri="broker:topic:test.topic"/>
<to uri="broker:queue:test.queue"/>
</route>
</routes>

View File

@ -238,6 +238,7 @@
<includes> <includes>
<include>${basedir}/../activemq-client/src/main/java</include> <include>${basedir}/../activemq-client/src/main/java</include>
<include>${basedir}/../activemq-broker/src/main/java</include> <include>${basedir}/../activemq-broker/src/main/java</include>
<include>${basedir}/../activemq-camel/src/main/java</include>
<include>${basedir}/../activemq-leveldb-store/src/main/java</include> <include>${basedir}/../activemq-leveldb-store/src/main/java</include>
<include>${basedir}/../activemq-jdbc-store/src/main/java</include> <include>${basedir}/../activemq-jdbc-store/src/main/java</include>
<include>${basedir}/../activemq-kahadb-store/src/main/java</include> <include>${basedir}/../activemq-kahadb-store/src/main/java</include>