git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@910984 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2010-02-17 14:13:12 +00:00
parent 0567645ec6
commit 75a32e4ec7
4 changed files with 190 additions and 4 deletions

View File

@ -16,9 +16,13 @@
*/
package org.apache.activemq.camel.component;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.Service;
import org.apache.camel.CamelContext;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.component.jms.JmsConfiguration;
import org.springframework.jms.connection.SingleConnectionFactory;
/**
* The <a href="http://activemq.apache.org/camel/activemq.html">ActiveMQ Component</a>
@ -26,6 +30,10 @@ import org.apache.camel.component.jms.JmsConfiguration;
* @version $Revision$
*/
public class ActiveMQComponent extends JmsComponent {
private final CopyOnWriteArrayList<SingleConnectionFactory> singleConnectionFactoryList =
new CopyOnWriteArrayList<SingleConnectionFactory>();
private final CopyOnWriteArrayList<Service> pooledConnectionFactoryServiceList =
new CopyOnWriteArrayList<Service>();
private boolean exposeAllQueues;
private CamelEndpointLoader endpointLoader;
@ -110,6 +118,14 @@ public class ActiveMQComponent extends JmsComponent {
}
}
protected void addPooledConnectionFactoryService(Service pooledConnectionFactoryService) {
pooledConnectionFactoryServiceList.add(pooledConnectionFactoryService);
}
protected void addSingleConnectionFactory(SingleConnectionFactory singleConnectionFactory) {
singleConnectionFactoryList.add(singleConnectionFactory);
}
@Override
protected void doStart() throws Exception {
super.doStart();
@ -119,18 +135,35 @@ public class ActiveMQComponent extends JmsComponent {
}
}
@Override
protected void doStop() throws Exception {
if (endpointLoader != null) {
endpointLoader.destroy();
endpointLoader = null;
}
for (Service s : pooledConnectionFactoryServiceList) {
s.stop();
}
pooledConnectionFactoryServiceList.clear();
for (SingleConnectionFactory s : singleConnectionFactoryList) {
s.destroy();
}
singleConnectionFactoryList.clear();
super.doStop();
}
@Override
public void setConfiguration(JmsConfiguration configuration) {
if (configuration instanceof ActiveMQConfiguration) {
((ActiveMQConfiguration) configuration).setActiveMQComponent(this);
}
super.setConfiguration(configuration);
}
@Override
protected JmsConfiguration createConfiguration() {
return new ActiveMQConfiguration();
ActiveMQConfiguration answer = new ActiveMQConfiguration();
answer.setActiveMQComponent(this);
return answer;
}
}

View File

@ -20,6 +20,7 @@ import java.lang.reflect.Constructor;
import javax.jms.ConnectionFactory;
import org.apache.activemq.Service;
import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.apache.camel.component.jms.JmsConfiguration;
import org.springframework.jms.connection.SingleConnectionFactory;
@ -36,6 +37,7 @@ public class ActiveMQConfiguration extends JmsConfiguration {
private boolean usePooledConnection = true;
private String userName;
private String password;
private ActiveMQComponent activeMQComponent;
public ActiveMQConfiguration() {
}
@ -134,6 +136,10 @@ public class ActiveMQConfiguration extends JmsConfiguration {
return answer;
}
protected void setActiveMQComponent(ActiveMQComponent activeMQComponent) {
this.activeMQComponent = activeMQComponent;
}
@Override
protected ConnectionFactory createConnectionFactory() {
ActiveMQConnectionFactory answer = new ActiveMQConnectionFactory();
@ -148,10 +154,18 @@ public class ActiveMQConfiguration extends JmsConfiguration {
}
answer.setBrokerURL(getBrokerURL());
if (isUseSingleConnection()) {
return new SingleConnectionFactory(answer);
SingleConnectionFactory scf = new SingleConnectionFactory(answer);
if (activeMQComponent != null) {
activeMQComponent.addSingleConnectionFactory(scf);
}
return scf;
}
else if (isUsePooledConnection()) {
return createPooledConnectionFactory(answer);
ConnectionFactory pcf = createPooledConnectionFactory(answer);
if (activeMQComponent != null) {
activeMQComponent.addPooledConnectionFactoryService((Service) pcf);
}
return pcf;
}
else {
return answer;

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.camel;
import java.util.Timer;
import java.util.TimerTask;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.camel.component.ActiveMQComponent;
import org.apache.camel.Body;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class AMQ2611Test extends TestCase {
private static final String BROKER_URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "test.queue";
private static final Log log = LogFactory.getLog(AMQ2611Test.class);
private BrokerService brokerService = null;
private Timer statisticsTimer = null;
private CamelContext camelContext = null;
public AMQ2611Test() {
}
private void createBroker() throws Exception {
brokerService = new BrokerService();
brokerService.addConnector(BROKER_URL);
brokerService.start();
}
public static class Consumer {
public void consume(@Body String message) {
log.info("consume message = " + message);
}
}
private void createCamelContext() throws Exception {
log.info("creating context and sending message");
camelContext = new DefaultCamelContext();
camelContext.addComponent("activemq", ActiveMQComponent
.activeMQComponent(BROKER_URL));
final String queueEndpointName = "activemq:queue" + QUEUE_NAME;
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from(queueEndpointName).bean(Consumer.class, "consume");
}
});
camelContext.start();
final ProducerTemplate producerTemplate = camelContext
.createProducerTemplate();
producerTemplate.sendBody(queueEndpointName, "message");
}
private void destroyCamelContext() throws Exception {
log.info("destroying context");
camelContext.stop();
camelContext = null;
}
public void testConnections() {
try {
createBroker();
int i = 0;
while (i++ < 5) {
createCamelContext();
Thread.sleep(1000);
destroyCamelContext();
Thread.sleep(1000);
assertEquals(0, brokerService.getConnectorByName(BROKER_URL).getConnections().size());
}
} catch (Exception e) {
log.warn("run", e);
}
}
}

View File

@ -0,0 +1,35 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
#
# The logging properties used during tests..
#
log4j.rootLogger=INFO, out, stdout
#log4j.logger.org.apache.activemq=DEBUG
# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
# File appender
log4j.appender.out=org.apache.log4j.FileAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
log4j.appender.out.file=target/activemq-test.log
log4j.appender.out.append=true