mirror of https://github.com/apache/activemq.git
fixed failing camel tests; made the CamelEndpointLoader dependent on the EnhancedConnection interface so it works with a naked ActiveMQConnection or a PooledConnection. Also changed the defaults to use a PooledConnectionFactory by default when using the ActiveMQ camel component
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@643458 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
80722e4c9d
commit
53756f3c19
|
@ -98,7 +98,7 @@ import org.apache.activemq.advisory.DestinationSource;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener {
|
||||
public class ActiveMQConnection implements Connection, TopicConnection, QueueConnection, StatsCapable, Closeable, StreamConnection, TransportListener, EnhancedConnection {
|
||||
|
||||
public static final String DEFAULT_USER = ActiveMQConnectionFactory.DEFAULT_USER;
|
||||
public static final String DEFAULT_PASSWORD = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq;
|
||||
|
||||
import javax.jms.TopicConnection;
|
||||
import javax.jms.QueueConnection;
|
||||
import javax.jms.JMSException;
|
||||
|
||||
import org.apache.activemq.advisory.DestinationSource;
|
||||
|
||||
/**
|
||||
* A set of enhanced APIs for a JMS provider
|
||||
*
|
||||
* @version $Revision: 1.1 $
|
||||
*/
|
||||
public interface EnhancedConnection extends TopicConnection, QueueConnection, Closeable {
|
||||
|
||||
/**
|
||||
* Returns the {@link DestinationSource} object which can be used to listen to destinations
|
||||
* being created or destroyed or to enquire about the current destinations available on the broker
|
||||
*
|
||||
* @return a lazily created destination source
|
||||
* @throws JMSException
|
||||
*/
|
||||
DestinationSource getDestinationSource() throws JMSException;
|
||||
}
|
|
@ -17,7 +17,6 @@
|
|||
package org.apache.activemq.camel.component;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
|
||||
import javax.jms.ConnectionFactory;
|
||||
|
||||
|
@ -31,8 +30,8 @@ import org.springframework.jms.core.JmsTemplate;
|
|||
*/
|
||||
public class ActiveMQConfiguration extends JmsConfiguration {
|
||||
private String brokerURL = ActiveMQConnectionFactory.DEFAULT_BROKER_URL;
|
||||
private boolean useSingleConnection = true;
|
||||
private boolean usePooledConnection = false;
|
||||
private boolean useSingleConnection = false;
|
||||
private boolean usePooledConnection = true;
|
||||
|
||||
public ActiveMQConfiguration() {
|
||||
}
|
||||
|
@ -92,12 +91,12 @@ public class ActiveMQConfiguration extends JmsConfiguration {
|
|||
answer.setBeanName("Camel");
|
||||
}
|
||||
answer.setBrokerURL(getBrokerURL());
|
||||
if (isUsePooledConnection()) {
|
||||
return createPooledConnectionFactory(answer);
|
||||
}
|
||||
else if (isUseSingleConnection()) {
|
||||
if (isUseSingleConnection()) {
|
||||
return new SingleConnectionFactory(answer);
|
||||
}
|
||||
else if (isUsePooledConnection()) {
|
||||
return createPooledConnectionFactory(answer);
|
||||
}
|
||||
else {
|
||||
return answer;
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import javax.jms.Connection;
|
|||
import javax.jms.ConnectionFactory;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.EnhancedConnection;
|
||||
import org.apache.activemq.advisory.DestinationEvent;
|
||||
import org.apache.activemq.advisory.DestinationListener;
|
||||
import org.apache.activemq.advisory.DestinationSource;
|
||||
|
@ -46,7 +47,7 @@ import org.springframework.beans.factory.InitializingBean;
|
|||
public class CamelEndpointLoader implements InitializingBean, DisposableBean, CamelContextAware {
|
||||
private static final transient Log LOG = LogFactory.getLog(CamelEndpointLoader.class);
|
||||
private CamelContext camelContext;
|
||||
private ActiveMQConnection connection;
|
||||
private EnhancedConnection connection;
|
||||
private ConnectionFactory connectionFactory;
|
||||
private ActiveMQComponent component;
|
||||
|
||||
|
@ -61,11 +62,11 @@ public class CamelEndpointLoader implements InitializingBean, DisposableBean, Ca
|
|||
ObjectHelper.notNull(camelContext, "camelContext");
|
||||
if (connection == null) {
|
||||
Connection value = getConnectionFactory().createConnection();
|
||||
if (value instanceof ActiveMQConnection) {
|
||||
connection = (ActiveMQConnection) value;
|
||||
if (value instanceof EnhancedConnection) {
|
||||
connection = (EnhancedConnection) value;
|
||||
}
|
||||
else {
|
||||
throw new IllegalArgumentException("Created JMS Connection is not an ActiveMQConnection: " + value);
|
||||
throw new IllegalArgumentException("Created JMS Connection is not an EnhancedConnection: " + value);
|
||||
}
|
||||
}
|
||||
connection.start();
|
||||
|
@ -113,7 +114,7 @@ public class CamelEndpointLoader implements InitializingBean, DisposableBean, Ca
|
|||
this.camelContext = camelContext;
|
||||
}
|
||||
|
||||
public ActiveMQConnection getConnection() {
|
||||
public EnhancedConnection getConnection() {
|
||||
return connection;
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,8 @@ import javax.jms.TopicSession;
|
|||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQSession;
|
||||
import org.apache.activemq.AlreadyClosedException;
|
||||
import org.apache.activemq.EnhancedConnection;
|
||||
import org.apache.activemq.advisory.DestinationSource;
|
||||
|
||||
/**
|
||||
* Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
|
||||
|
@ -47,7 +49,7 @@ import org.apache.activemq.AlreadyClosedException;
|
|||
*
|
||||
* @version $Revision: 1.1.1.1 $
|
||||
*/
|
||||
public class PooledConnection implements TopicConnection, QueueConnection {
|
||||
public class PooledConnection implements TopicConnection, QueueConnection, EnhancedConnection {
|
||||
|
||||
private ConnectionPool pool;
|
||||
private boolean stopped;
|
||||
|
@ -139,6 +141,13 @@ public class PooledConnection implements TopicConnection, QueueConnection {
|
|||
return pool.createSession(transacted, ackMode);
|
||||
}
|
||||
|
||||
// EnhancedCollection API
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
public DestinationSource getDestinationSource() throws JMSException {
|
||||
return getConnection().getDestinationSource();
|
||||
}
|
||||
|
||||
// Implementation methods
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.springframework.jms.connection.SingleConnectionFactory;
|
|||
public class ActiveMQConfigureTest extends ContextTestSupport {
|
||||
|
||||
public void testJmsTemplateUsesPoolingConnectionFactory() throws Exception {
|
||||
JmsEndpoint endpoint = resolveMandatoryEndpoint("activemq:test.foo?usePooledConnection=true");
|
||||
JmsEndpoint endpoint = resolveMandatoryEndpoint("activemq:test.foo");
|
||||
JmsProducer producer = endpoint.createProducer();
|
||||
|
||||
JmsTemplate template = assertIsInstanceOf(JmsTemplate.class, producer.getTemplate());
|
||||
|
@ -43,7 +43,7 @@ public class ActiveMQConfigureTest extends ContextTestSupport {
|
|||
}
|
||||
|
||||
public void testJmsTemplateUsesSingleConnectionFactory() throws Exception {
|
||||
JmsEndpoint endpoint = resolveMandatoryEndpoint("activemq:test.foo");
|
||||
JmsEndpoint endpoint = resolveMandatoryEndpoint("activemq:test.foo?useSingleConnection=true");
|
||||
JmsProducer producer = endpoint.createProducer();
|
||||
|
||||
JmsTemplate template = assertIsInstanceOf(JmsTemplate.class, producer.getTemplate());
|
||||
|
@ -53,7 +53,7 @@ public class ActiveMQConfigureTest extends ContextTestSupport {
|
|||
}
|
||||
|
||||
public void testJmsTemplateDoesNotUsePoolingConnectionFactory() throws Exception {
|
||||
JmsEndpoint endpoint = resolveMandatoryEndpoint("activemq:test.foo?useSingleConnection=false");
|
||||
JmsEndpoint endpoint = resolveMandatoryEndpoint("activemq:test.foo?usePooledConnection=false");
|
||||
JmsProducer producer = endpoint.createProducer();
|
||||
|
||||
JmsTemplate template = assertIsInstanceOf(JmsTemplate.class, producer.getTemplate());
|
||||
|
@ -67,8 +67,7 @@ public class ActiveMQConfigureTest extends ContextTestSupport {
|
|||
|
||||
AbstractMessageListenerContainer listenerContainer = consumer.getListenerContainer();
|
||||
assertEquals("pubSubDomain", true, listenerContainer.isPubSubDomain());
|
||||
SingleConnectionFactory connectionFactory = assertIsInstanceOf(SingleConnectionFactory.class, listenerContainer.getConnectionFactory());
|
||||
assertIsInstanceOf(ActiveMQConnectionFactory.class, connectionFactory.getTargetConnectionFactory());
|
||||
assertIsInstanceOf(PooledConnectionFactory.class, listenerContainer.getConnectionFactory());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue