https://issues.apache.org/jira/browse/AMQ-4757 activemq-jms-pool a generic jms xa pool derived from activemq-pool which activemq-pool now extends with amq specifics

This commit is contained in:
gtully 2013-09-30 23:08:40 +01:00
parent 6b9662440e
commit b66559ee07
55 changed files with 2233 additions and 1335 deletions

View File

@ -45,7 +45,7 @@
<overwrite>true</overwrite>
<resources>
<resource>
<directory>${jboss.home}</directory>
<directory>${ee.install.home}</directory>
<excludes>
<exclude>standalone/data</exclude>
<exclude>standalone/log</exclude>
@ -74,4 +74,4 @@
</plugin>
</plugins>
</build>
</project>
</project>

View File

@ -48,6 +48,10 @@
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.jboss.arquillian.junit</groupId>
<artifactId>arquillian-junit-container</artifactId>
</dependency>
<dependency>
<groupId>org.jboss.spec</groupId>
<artifactId>jboss-javaee-6.0</artifactId>

View File

@ -22,7 +22,6 @@ import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;

View File

@ -61,7 +61,7 @@
</bean>
<!-- only for jta - not jms tm
bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource">
bean id="resourceManager" class="org.apache.activemq.jms.pool.GenericResourceManager" init-method="recoverResource">
<property name="transactionManager" ref="transactionManager" />
<property name="connectionFactory" ref="activemqConnectionFactory" />
<property name="resourceName" value="activemq.default" />

View File

@ -122,6 +122,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private int maxThreadPoolSize = ActiveMQConnection.DEFAULT_THREAD_POOL_SIZE;
private TaskRunnerFactory sessionTaskRunner;
private RejectedExecutionHandler rejectedTaskHandler = null;
protected int xaAckMode = -1; // ensure default init before setting via brokerUrl introspection in sub class
// /////////////////////////////////////////////
//

View File

@ -50,6 +50,8 @@ import org.apache.activemq.util.IdGenerator;
*/
public class ActiveMQXAConnection extends ActiveMQConnection implements XATopicConnection, XAQueueConnection, XAConnection {
private int xaAckMode;
protected ActiveMQXAConnection(Transport transport, IdGenerator clientIdGenerator,
IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {
super(transport, clientIdGenerator, connectionIdGenerator, factoryStats);
@ -70,6 +72,18 @@ public class ActiveMQXAConnection extends ActiveMQConnection implements XATopicC
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
return new ActiveMQXASession(this, getNextSessionId(), Session.SESSION_TRANSACTED, isDispatchAsync());
return new ActiveMQXASession(this, getNextSessionId(), getAckMode(), isDispatchAsync());
}
private int getAckMode() {
return xaAckMode > 0 ? xaAckMode : Session.SESSION_TRANSACTED;
}
public void setXaAckMode(int xaAckMode) {
this.xaAckMode = xaAckMode;
}
public int getXaAckMode() {
return xaAckMode;
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq;
import java.net.URI;
import java.util.Properties;
import javax.jms.JMSException;
import javax.jms.XAConnection;
@ -81,6 +82,25 @@ public class ActiveMQXAConnectionFactory extends ActiveMQConnectionFactory imple
protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
ActiveMQXAConnection connection = new ActiveMQXAConnection(transport, getClientIdGenerator(), getConnectionIdGenerator(), stats);
configureXAConnection(connection);
return connection;
}
private void configureXAConnection(ActiveMQXAConnection connection) {
connection.setXaAckMode(xaAckMode);
}
public int getXaAckMode() {
return xaAckMode;
}
public void setXaAckMode(int xaAckMode) {
this.xaAckMode = xaAckMode;
}
@Override
public void populateProperties(Properties props) {
super.populateProperties(props);
props.put("xaAckMode", Integer.toString(xaAckMode));
}
}

108
activemq-jms-pool/pom.xml Executable file
View File

@ -0,0 +1,108 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.9-SNAPSHOT</version>
</parent>
<artifactId>activemq-jms-pool</artifactId>
<name>ActiveMQ :: Generic JMS Pool</name>
<description>Generic JMS Pooled ConnectionFactory</description>
<properties>
<activemq.osgi.import.pkg>
javax.transaction*;resolution:=optional,
org.apache.geronimo.transaction.manager*;resolution:=optional,
*
</activemq.osgi.import.pkg>
<activemq.osgi.export>
org.apache.activemq.jms.pool*;version=${project.version};-noimport:=true,
</activemq.osgi.export>
</properties>
<dependencies>
<!-- =============================== -->
<!-- Required Dependencies -->
<!-- =============================== -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jms_1.1_spec</artifactId>
</dependency>
<dependency>
<groupId>org.apache.geronimo.components</groupId>
<artifactId>geronimo-transaction</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-jta_1.0.1B_spec</artifactId>
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
<artifactId>geronimo-annotation_1.0_spec</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
</dependency>
<!-- for testing use amq -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-broker</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-kahadb-store</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-broker</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
package org.apache.activemq.jms.pool;
/**
* A cache key for the connection details

View File

@ -15,20 +15,17 @@
* limitations under the License.
*/
package org.apache.activemq.pool;
package org.apache.activemq.jms.pool;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.IllegalStateException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool;
@ -47,11 +44,10 @@ public class ConnectionPool {
private static final transient Logger LOG = LoggerFactory.getLogger(ConnectionPool.class);
private ActiveMQConnection connection;
protected Connection connection;
private int referenceCount;
private long lastUsed = System.currentTimeMillis();
private final long firstUsed = lastUsed;
private boolean hasFailed;
private boolean hasExpired;
private int idleTimeout = 30 * 1000;
private long expiryTimeout = 0l;
@ -60,38 +56,9 @@ public class ConnectionPool {
private final GenericKeyedObjectPool<SessionKey, PooledSession> sessionPool;
private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
public ConnectionPool(ActiveMQConnection connection) {
public ConnectionPool(Connection connection) {
this.connection = connection;
// Add a transport Listener so that we can notice if this connection
// should be expired due to a connection failure.
connection.addTransportListener(new TransportListener() {
@Override
public void onCommand(Object command) {
}
@Override
public void onException(IOException error) {
synchronized (ConnectionPool.this) {
hasFailed = true;
}
}
@Override
public void transportInterupted() {
}
@Override
public void transportResumed() {
}
});
// make sure that we set the hasFailed flag, in case the transport already failed
// prior to the addition of our new TransportListener
if(connection.isTransportFailed()) {
hasFailed = true;
}
this.connection = wrap(connection);
// Create our internal Pool of session instances.
this.sessionPool = new GenericKeyedObjectPool<SessionKey, PooledSession>(
@ -110,9 +77,8 @@ public class ConnectionPool {
@Override
public PooledSession makeObject(SessionKey key) throws Exception {
ActiveMQSession session = (ActiveMQSession)
ConnectionPool.this.connection.createSession(key.isTransacted(), key.getAckMode());
return new PooledSession(key, session, sessionPool);
Session session = makeSession(key);
return new PooledSession(key, session, sessionPool, key.isTransacted());
}
@Override
@ -128,6 +94,22 @@ public class ConnectionPool {
);
}
// useful when external failure needs to force expiry
public void setHasExpired(boolean val) {
hasExpired = val;
}
protected Session makeSession(SessionKey key) throws JMSException {
return connection.createSession(key.isTransacted(), key.getAckMode());
}
protected Connection wrap(Connection connection) {
return connection;
}
protected void unWrap(Connection connection) {
}
public void start() throws JMSException {
if (started.compareAndSet(false, true)) {
try {
@ -139,7 +121,7 @@ public class ConnectionPool {
}
}
public synchronized ActiveMQConnection getConnection() {
public synchronized Connection getConnection() {
return connection;
}
@ -149,7 +131,9 @@ public class ConnectionPool {
try {
session = sessionPool.borrowObject(key);
} catch (Exception e) {
throw JMSExceptionSupport.create(e);
IllegalStateException illegalStateException = new IllegalStateException(e.toString());
illegalStateException.initCause(e);
throw illegalStateException;
}
return session;
}
@ -191,11 +175,7 @@ public class ConnectionPool {
}
this.loanedSessions.clear();
// We only clean up temporary destinations when all users of this
// connection have called close.
if (getConnection() != null) {
getConnection().cleanUpTempDestinations();
}
unWrap(getConnection());
expiredCheck();
}
@ -218,7 +198,7 @@ public class ConnectionPool {
return true;
}
if (hasExpired || hasFailed) {
if (hasExpired) {
if (referenceCount == 0) {
close();
expired = true;

View File

@ -0,0 +1,194 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.pool;
import java.io.IOException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import org.apache.geronimo.transaction.manager.NamedXAResourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.geronimo.transaction.manager.RecoverableTransactionManager;
import org.apache.geronimo.transaction.manager.NamedXAResource;
import org.apache.geronimo.transaction.manager.WrapperNamedXAResource;
/**
* This class allows wiring the ActiveMQ broker and the Geronimo transaction manager
* in a way that will allow the transaction manager to correctly recover XA transactions.
*
* For example, it can be used the following way:
* <pre>
* <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
* <property name="brokerURL" value="tcp://localhost:61616" />
* </bean>
*
* <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryFactoryBean">
* <property name="maxConnections" value="8" />
* <property name="transactionManager" ref="transactionManager" />
* <property name="connectionFactory" ref="activemqConnectionFactory" />
* <property name="resourceName" value="activemq.broker" />
* </bean>
*
* <bean id="resourceManager" class="org.apache.activemq.jms.pool.GenericResourceManager" init-method="recoverResource">
* <property name="transactionManager" ref="transactionManager" />
* <property name="connectionFactory" ref="activemqConnectionFactory" />
* <property name="resourceName" value="activemq.broker" />
* </bean>
* </pre>
*/
public class GenericResourceManager {
private static final Logger LOGGER = LoggerFactory.getLogger(GenericResourceManager.class);
private String resourceName;
private String userName;
private String password;
private TransactionManager transactionManager;
private ConnectionFactory connectionFactory;
public void recoverResource() {
try {
if (!Recovery.recover(this)) {
LOGGER.info("Resource manager is unrecoverable");
}
} catch (NoClassDefFoundError e) {
LOGGER.info("Resource manager is unrecoverable due to missing classes: " + e);
} catch (Throwable e) {
LOGGER.warn("Error while recovering resource manager", e);
}
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getResourceName() {
return resourceName;
}
public void setResourceName(String resourceName) {
this.resourceName = resourceName;
}
public TransactionManager getTransactionManager() {
return transactionManager;
}
public void setTransactionManager(TransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
/**
* This class will ensure the broker is properly recovered when wired with
* the Geronimo transaction manager.
*/
public static class Recovery {
public static boolean isRecoverable(GenericResourceManager rm) {
return rm.getConnectionFactory() instanceof XAConnectionFactory &&
rm.getTransactionManager() instanceof RecoverableTransactionManager &&
rm.getResourceName() != null && !"".equals(rm.getResourceName());
}
public static boolean recover(final GenericResourceManager rm) throws IOException {
if (isRecoverable(rm)) {
final XAConnectionFactory connFactory = (XAConnectionFactory) rm.getConnectionFactory();
RecoverableTransactionManager rtxManager = (RecoverableTransactionManager) rm.getTransactionManager();
rtxManager.registerNamedXAResourceFactory(new NamedXAResourceFactory() {
@Override
public String getName() {
return rm.getResourceName();
}
@Override
public NamedXAResource getNamedXAResource() throws SystemException {
try {
final XAConnection xaConnection = connFactory.createXAConnection(rm.getUserName(), rm.getPassword());
final XASession session = xaConnection.createXASession();
xaConnection.start();
LOGGER.debug("new namedXAResource's connection: " + xaConnection);
return new ConnectionAndWrapperNamedXAResource(session.getXAResource(), getName(), xaConnection);
} catch (Exception e) {
SystemException se = new SystemException("Failed to create ConnectionAndWrapperNamedXAResource, " + e.getLocalizedMessage());
se.initCause(e);
LOGGER.error(se.getLocalizedMessage(), se);
throw se;
}
}
@Override
public void returnNamedXAResource(NamedXAResource namedXaResource) {
if (namedXaResource instanceof ConnectionAndWrapperNamedXAResource) {
try {
LOGGER.debug("closing returned namedXAResource's connection: " + ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection);
((ConnectionAndWrapperNamedXAResource)namedXaResource).connection.close();
} catch (Exception ignored) {
LOGGER.debug("failed to close returned namedXAResource: " + namedXaResource, ignored);
}
}
}
});
return true;
} else {
return false;
}
}
}
public static class ConnectionAndWrapperNamedXAResource extends WrapperNamedXAResource {
final Connection connection;
public ConnectionAndWrapperNamedXAResource(XAResource xaResource, String name, Connection connection) {
super(xaResource, name);
this.connection = connection;
}
}
}

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.pool;
import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import javax.net.ssl.SSLServerSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class IntrospectionSupport {
private static final Logger LOG = LoggerFactory.getLogger(IntrospectionSupport.class);
private IntrospectionSupport() {
}
public static boolean setProperties(Object target, Map props) {
boolean rc = false;
if (target == null) {
throw new IllegalArgumentException("target was null.");
}
if (props == null) {
throw new IllegalArgumentException("props was null.");
}
for (Iterator<?> iter = props.entrySet().iterator(); iter.hasNext();) {
Entry<?,?> entry = (Entry<?,?>)iter.next();
if (setProperty(target, (String)entry.getKey(), entry.getValue())) {
iter.remove();
rc = true;
}
}
return rc;
}
public static boolean setProperty(Object target, String name, Object value) {
try {
Class<?> clazz = target.getClass();
if (target instanceof SSLServerSocket) {
// overcome illegal access issues with internal implementation class
clazz = SSLServerSocket.class;
}
Method setter = findSetterMethod(clazz, name);
if (setter == null) {
return false;
}
// If the type is null or it matches the needed type, just use the
// value directly
if (value == null || value.getClass() == setter.getParameterTypes()[0]) {
setter.invoke(target, value);
} else {
// We need to convert it
setter.invoke(target, convert(value, setter.getParameterTypes()[0]));
}
return true;
} catch (Exception e) {
LOG.error(String.format("Could not set property %s on %s", name, target), e);
return false;
}
}
private static Object convert(Object value, Class to) {
if (value == null) {
// lets avoid NullPointerException when converting to boolean for null values
if (boolean.class.isAssignableFrom(to)) {
return Boolean.FALSE;
}
return null;
}
// eager same instance type test to avoid the overhead of invoking the type converter
// if already same type
if (to.isAssignableFrom(value.getClass())) {
return to.cast(value);
}
if (boolean.class.isAssignableFrom(to) && value instanceof String) {
return Boolean.valueOf((String)value);
}
throw new IllegalArgumentException("Cannot convert from " + value.getClass()
+ " to " + to + " with value " + value);
}
private static Method findSetterMethod(Class clazz, String name) {
// Build the method name.
name = "set" + Character.toUpperCase(name.charAt(0)) + name.substring(1);
Method[] methods = clazz.getMethods();
for (Method method : methods) {
Class<?> params[] = method.getParameterTypes();
if (method.getName().equals(name) && params.length == 1 ) {
return method;
}
}
return null;
}
}

View File

@ -13,28 +13,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
package org.apache.activemq.jms.pool;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.XASession;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ra.LocalAndXATransaction;
import org.apache.geronimo.transaction.manager.WrapperNamedXAResource;
public class JcaConnectionPool extends XaConnectionPool {
private final String name;
public JcaConnectionPool(ActiveMQConnection connection, TransactionManager transactionManager, String name) {
public JcaConnectionPool(Connection connection, TransactionManager transactionManager, String name) {
super(connection, transactionManager);
this.name = name;
}
@Override
protected XAResource createXaResource(PooledSession session) throws JMSException {
XAResource xares = new LocalAndXATransaction(session.getInternalSession().getTransactionContext());
XAResource xares = ((XASession)session.getInternalSession()).getXAResource();
if (name != null) {
xares = new WrapperNamedXAResource(xares, name);
}

View File

@ -0,0 +1,35 @@
/*
* Copyright 2006 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.pool;
import javax.jms.Connection;
public class JcaPooledConnectionFactory extends XaPooledConnectionFactory {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
protected ConnectionPool createConnectionPool(Connection connection) {
return new JcaConnectionPool(connection, getTransactionManager(), getName());
}
}

View File

@ -0,0 +1,286 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.pool;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.IllegalStateException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
* {@link QueueConnection} which is pooled and on {@link #close()} will return
* its reference to the ConnectionPool backing it.
*
* <b>NOTE</b> this implementation is only intended for use when sending
* messages. It does not deal with pooling of consumers; for that look at a
* library like <a href="http://jencks.org/">Jencks</a> such as in <a
* href="http://jencks.org/Message+Driven+POJOs">this example</a>
*
*/
public class PooledConnection implements TopicConnection, QueueConnection, PooledSessionEventListener {
private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnection.class);
protected ConnectionPool pool;
private volatile boolean stopped;
private final List<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>();
private final List<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>();
private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
/**
* Creates a new PooledConnection instance that uses the given ConnectionPool to create
* and manage its resources. The ConnectionPool instance can be shared amongst many
* PooledConnection instances.
*
* @param pool
* The connection and pool manager backing this proxy connection object.
*/
public PooledConnection(ConnectionPool pool) {
this.pool = pool;
}
/**
* Factory method to create a new instance.
*/
public PooledConnection newInstance() {
return new PooledConnection(pool);
}
@Override
public void close() throws JMSException {
this.cleanupConnectionTemporaryDestinations();
this.cleanupAllLoanedSessions();
if (this.pool != null) {
this.pool.decrementReferenceCount();
this.pool = null;
}
}
@Override
public void start() throws JMSException {
assertNotClosed();
pool.start();
}
@Override
public void stop() throws JMSException {
stopped = true;
}
@Override
public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
}
@Override
public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages);
}
@Override
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i) throws JMSException {
return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i);
}
@Override
public String getClientID() throws JMSException {
return getConnection().getClientID();
}
@Override
public ExceptionListener getExceptionListener() throws JMSException {
return getConnection().getExceptionListener();
}
@Override
public ConnectionMetaData getMetaData() throws JMSException {
return getConnection().getMetaData();
}
@Override
public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
getConnection().setExceptionListener(exceptionListener);
}
@Override
public void setClientID(String clientID) throws JMSException {
// ignore repeated calls to setClientID() with the same client id
// this could happen when a JMS component such as Spring that uses a
// PooledConnectionFactory shuts down and reinitializes.
if (this.getConnection().getClientID() == null || !this.getClientID().equals(clientID)) {
getConnection().setClientID(clientID);
}
}
@Override
public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages);
}
// Session factory methods
// -------------------------------------------------------------------------
@Override
public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException {
return (QueueSession) createSession(transacted, ackMode);
}
@Override
public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException {
return (TopicSession) createSession(transacted, ackMode);
}
@Override
public Session createSession(boolean transacted, int ackMode) throws JMSException {
PooledSession result;
result = (PooledSession) pool.createSession(transacted, ackMode);
// Store the session so we can close the sessions that this PooledConnection
// created in order to ensure that consumers etc are closed per the JMS contract.
loanedSessions.add(result);
// Add a event listener to the session that notifies us when the session
// creates / destroys temporary destinations and closes etc.
result.addSessionEventListener(this);
return result;
}
// Implementation methods
// -------------------------------------------------------------------------
@Override
public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
connTempQueues.add(tempQueue);
}
@Override
public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
connTempTopics.add(tempTopic);
}
@Override
public void onSessionClosed(PooledSession session) {
if (session != null) {
this.loanedSessions.remove(session);
}
}
public Connection getConnection() throws JMSException {
assertNotClosed();
return pool.getConnection();
}
protected void assertNotClosed() throws javax.jms.IllegalStateException {
if (stopped || pool == null) {
throw new IllegalStateException("Connection closed");
}
}
protected Session createSession(SessionKey key) throws JMSException {
return getConnection().createSession(key.isTransacted(), key.getAckMode());
}
@Override
public String toString() {
return "PooledConnection { " + pool + " }";
}
/**
* Remove all of the temporary destinations created for this connection.
* This is important since the underlying connection may be reused over a
* long period of time, accumulating all of the temporary destinations from
* each use. However, from the perspective of the lifecycle from the
* client's view, close() closes the connection and, therefore, deletes all
* of the temporary destinations created.
*/
protected void cleanupConnectionTemporaryDestinations() {
for (TemporaryQueue tempQueue : connTempQueues) {
try {
tempQueue.delete();
} catch (JMSException ex) {
LOG.info("failed to delete Temporary Queue \"" + tempQueue.toString() + "\" on closing pooled connection: " + ex.getMessage());
}
}
connTempQueues.clear();
for (TemporaryTopic tempTopic : connTempTopics) {
try {
tempTopic.delete();
} catch (JMSException ex) {
LOG.info("failed to delete Temporary Topic \"" + tempTopic.toString() + "\" on closing pooled connection: " + ex.getMessage());
}
}
connTempTopics.clear();
}
/**
* The PooledSession tracks all Sessions that it created and now we close them. Closing the
* PooledSession will return the internal Session to the Pool of Session after cleaning up
* all the resources that the Session had allocated for this PooledConnection.
*/
protected void cleanupAllLoanedSessions() {
for (PooledSession session : loanedSessions) {
try {
session.close();
} catch (JMSException ex) {
LOG.info("failed to close laoned Session \"" + session + "\" on closing pooled connection: " + ex.getMessage());
}
}
loanedSessions.clear();
}
/**
* @return the total number of Pooled session including idle sessions that are not
* currently loaned out to any client.
*/
public int getNumSessions() {
return this.pool.getNumSessions();
}
/**
* @return the number of Sessions that are currently checked out of this Connection's session pool.
*/
public int getNumActiveSessions() {
return this.pool.getNumActiveSessions();
}
/**
* @return the number of Sessions that are idle in this Connection's sessions pool.
*/
public int getNumtIdleSessions() {
return this.pool.getNumIdleSessions();
}
}

View File

@ -0,0 +1,479 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.pool;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.XAConnectionFactory;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A JMS provider which pools Connection, Session and MessageProducer instances
* so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's
* <a href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
* Connections, sessions and producers are returned to a pool after use so that they can be reused later
* without having to undergo the cost of creating them again.
*
* b>NOTE:</b> while this implementation does allow the creation of a collection of active consumers,
* it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which
* are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually
* just created at startup and left active, handling incoming messages as they come. When a consumer is
* complete, it is best to close it rather than return it to a pool for later reuse: this is because,
* even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer,
* where they'll get held until the consumer is active again.
*
* If you are creating a collection of consumers (for example, for multi-threaded message consumption), you
* might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that
* all messages don't end up going to just one of the consumers. See this FAQ entry for more detail:
* http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
*
* Optionally, one may configure the pool to examine and possibly evict objects as they sit idle in the
* pool. This is performed by an "idle object eviction" thread, which runs asynchronously. Caution should
* be used when configuring this optional feature. Eviction runs contend with client threads for access
* to objects in the pool, so if they run too frequently performance issues may result. The idle object
* eviction thread may be configured using the {@link org.apache.activemq.jms.pool.PooledConnectionFactory#setTimeBetweenExpirationCheckMillis} method. By
* default the value is -1 which means no eviction thread will be run. Set to a non-negative value to
* configure the idle eviction thread to run.
*
*/
public class PooledConnectionFactory implements ConnectionFactory {
private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
protected final AtomicBoolean stopped = new AtomicBoolean(false);
private GenericKeyedObjectPool<ConnectionKey, ConnectionPool> connectionsPool;
private ConnectionFactory connectionFactory;
private int maximumActiveSessionPerConnection = 500;
private int idleTimeout = 30 * 1000;
private boolean blockIfSessionPoolIsFull = true;
private long expiryTimeout = 0l;
private boolean createConnectionOnStartup = true;
public void initConnectionsPool() {
if (this.connectionsPool == null) {
this.connectionsPool = new GenericKeyedObjectPool<ConnectionKey, ConnectionPool>(
new KeyedPoolableObjectFactory<ConnectionKey, ConnectionPool>() {
@Override
public void activateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
}
@Override
public void destroyObject(ConnectionKey key, ConnectionPool connection) throws Exception {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Destroying connection: {}", connection);
}
connection.close();
} catch (Exception e) {
LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e);
}
}
@Override
public ConnectionPool makeObject(ConnectionKey key) throws Exception {
Connection delegate = createConnection(key);
ConnectionPool connection = createConnectionPool(delegate);
connection.setIdleTimeout(getIdleTimeout());
connection.setExpiryTimeout(getExpiryTimeout());
connection.setMaximumActiveSessionPerConnection(getMaximumActiveSessionPerConnection());
connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull());
if (LOG.isTraceEnabled()) {
LOG.trace("Created new connection: {}", connection);
}
return connection;
}
@Override
public void passivateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
}
@Override
public boolean validateObject(ConnectionKey key, ConnectionPool connection) {
if (connection != null && connection.expiredCheck()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Connection has expired: {} and will be destroyed", connection);
}
return false;
}
return true;
}
});
// Set max idle (not max active) since our connections always idle in the pool.
this.connectionsPool.setMaxIdle(1);
// We always want our validate method to control when idle objects are evicted.
this.connectionsPool.setTestOnBorrow(true);
this.connectionsPool.setTestWhileIdle(true);
}
}
/**
* @return the currently configured ConnectionFactory used to create the pooled Connections.
*/
public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}
/**
* Sets the ConnectionFactory used to create new pooled Connections.
* <p/>
* Updates to this value do not affect Connections that were previously created and placed
* into the pool. In order to allocate new Connections based off this new ConnectionFactory
* it is first necessary to {@link clear} the pooled Connections.
*
* @param toUse
* The factory to use to create pooled Connections.
*/
public void setConnectionFactory(final ConnectionFactory toUse) {
if (toUse instanceof XAConnectionFactory) {
connectionFactory = new ConnectionFactory() {
public Connection createConnection() throws JMSException {
return ((XAConnectionFactory)toUse).createXAConnection();
}
public Connection createConnection(String userName, String password) throws JMSException {
return ((XAConnectionFactory)toUse).createXAConnection(userName, password);
}
};
} else {
this.connectionFactory = toUse;
}
}
@Override
public Connection createConnection() throws JMSException {
return createConnection(null, null);
}
@Override
public synchronized Connection createConnection(String userName, String password) throws JMSException {
if (stopped.get()) {
LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
return null;
}
ConnectionPool connection = null;
ConnectionKey key = new ConnectionKey(userName, password);
// This will either return an existing non-expired ConnectionPool or it
// will create a new one to meet the demand.
if (getConnectionsPool().getNumIdle(key) < getMaxConnections()) {
try {
// we want borrowObject to return the one we added.
connectionsPool.setLifo(true);
connectionsPool.addObject(key);
} catch (Exception e) {
throw createJmsException("Error while attempting to add new Connection to the pool", e);
}
} else {
// now we want the oldest one in the pool.
connectionsPool.setLifo(false);
}
try {
// We can race against other threads returning the connection when there is an
// expiration or idle timeout. We keep pulling out ConnectionPool instances until
// we win and get a non-closed instance and then increment the reference count
// under lock to prevent another thread from triggering an expiration check and
// pulling the rug out from under us.
while (connection == null) {
connection = connectionsPool.borrowObject(key);
synchronized (connection) {
if (connection.getConnection() != null) {
connection.incrementReferenceCount();
break;
}
// Return the bad one to the pool and let if get destroyed as normal.
connectionsPool.returnObject(key, connection);
connection = null;
}
}
} catch (Exception e) {
throw createJmsException("Error while attempting to retrieve a connection from the pool", e);
}
try {
connectionsPool.returnObject(key, connection);
} catch (Exception e) {
throw createJmsException("Error when returning connection to the pool", e);
}
return newPooledConnection(connection);
}
protected Connection newPooledConnection(ConnectionPool connection) {
return new PooledConnection(connection);
}
private JMSException createJmsException(String msg, Exception cause) {
JMSException exception = new JMSException(msg);
exception.setLinkedException(cause);
exception.initCause(cause);
return exception;
}
protected Connection createConnection(ConnectionKey key) throws JMSException {
if (key.getUserName() == null && key.getPassword() == null) {
return connectionFactory.createConnection();
} else {
return connectionFactory.createConnection(key.getUserName(), key.getPassword());
}
}
public void start() {
LOG.debug("Staring the PooledConnectionFactory: create on start = {}", isCreateConnectionOnStartup());
stopped.set(false);
if (isCreateConnectionOnStartup()) {
try {
// warm the pool by creating a connection during startup
createConnection();
} catch (JMSException e) {
LOG.warn("Create pooled connection during start failed. This exception will be ignored.", e);
}
}
}
public void stop() {
if (stopped.compareAndSet(false, true)) {
LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}",
connectionsPool != null ? connectionsPool.getNumActive() : 0);
try {
if (connectionsPool != null) {
connectionsPool.close();
}
} catch (Exception e) {
}
}
}
/**
* Clears all connections from the pool. Each connection that is currently in the pool is
* closed and removed from the pool. A new connection will be created on the next call to
* {@link createConnection}. Care should be taken when using this method as Connections that
* are in use be client's will be closed.
*/
public void clear() {
if (stopped.get()) {
return;
}
getConnectionsPool().clear();
}
/**
* Returns the currently configured maximum number of sessions a pooled Connection will
* create before it either blocks or throws an exception when a new session is requested,
* depending on configuration.
*
* @return the number of session instances that can be taken from a pooled connection.
*/
public int getMaximumActiveSessionPerConnection() {
return maximumActiveSessionPerConnection;
}
/**
* Sets the maximum number of active sessions per connection
*
* @param maximumActiveSessionPerConnection
* The maximum number of active session per connection in the pool.
*/
public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection;
}
/**
* Controls the behavior of the internal session pool. By default the call to
* Connection.getSession() will block if the session pool is full. If the
* argument false is given, it will change the default behavior and instead the
* call to getSession() will throw a JMSException.
*
* The size of the session pool is controlled by the @see #maximumActive
* property.
*
* @param block - if true, the call to getSession() blocks if the pool is full
* until a session object is available. defaults to true.
*/
public void setBlockIfSessionPoolIsFull(boolean block) {
this.blockIfSessionPoolIsFull = block;
}
/**
* Returns whether a pooled Connection will enter a blocked state or will throw an Exception
* once the maximum number of sessions has been borrowed from the the Session Pool.
*
* @return true if the pooled Connection createSession method will block when the limit is hit.
* @see setBlockIfSessionPoolIsFull
*/
public boolean isBlockIfSessionPoolIsFull() {
return this.blockIfSessionPoolIsFull;
}
/**
* Returns the maximum number to pooled Connections that this factory will allow before it
* begins to return connections from the pool on calls to ({@link createConnection}.
*
* @return the maxConnections that will be created for this pool.
*/
public int getMaxConnections() {
return getConnectionsPool().getMaxIdle();
}
/**
* Sets the maximum number of pooled Connections (defaults to one). Each call to
* {@link createConnection} will result in a new Connection being create up to the max
* connections value.
*
* @param maxConnections the maxConnections to set
*/
public void setMaxConnections(int maxConnections) {
getConnectionsPool().setMaxIdle(maxConnections);
}
/**
* Gets the Idle timeout value applied to new Connection's that are created by this pool.
* <p/>
* The idle timeout is used determine if a Connection instance has sat to long in the pool unused
* and if so is closed and removed from the pool. The default value is 30 seconds.
*
* @return idle timeout value (milliseconds)
*/
public int getIdleTimeout() {
return idleTimeout;
}
/**
* Sets the idle timeout value for Connection's that are created by this pool in Milliseconds,
* defaults to 30 seconds.
* <p/>
* For a Connection that is in the pool but has no current users the idle timeout determines how
* long the Connection can live before it is eligible for removal from the pool. Normally the
* connections are tested when an attempt to check one out occurs so a Connection instance can sit
* in the pool much longer than its idle timeout if connections are used infrequently.
*
* @param idleTimeout
* The maximum time a pooled Connection can sit unused before it is eligible for removal.
*/
public void setIdleTimeout(int idleTimeout) {
this.idleTimeout = idleTimeout;
}
/**
* allow connections to expire, irrespective of load or idle time. This is useful with failover
* to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
*
* @param expiryTimeout non zero in milliseconds
*/
public void setExpiryTimeout(long expiryTimeout) {
this.expiryTimeout = expiryTimeout;
}
/**
* @return the configured expiration timeout for connections in the pool.
*/
public long getExpiryTimeout() {
return expiryTimeout;
}
/**
* @return true if a Connection is created immediately on a call to {@link start}.
*/
public boolean isCreateConnectionOnStartup() {
return createConnectionOnStartup;
}
/**
* Whether to create a connection on starting this {@link PooledConnectionFactory}.
* <p/>
* This can be used to warm-up the pool on startup. Notice that any kind of exception
* happens during startup is logged at WARN level and ignored.
*
* @param createConnectionOnStartup <tt>true</tt> to create a connection on startup
*/
public void setCreateConnectionOnStartup(boolean createConnectionOnStartup) {
this.createConnectionOnStartup = createConnectionOnStartup;
}
/**
* Gets the Pool of ConnectionPool instances which are keyed by different ConnectionKeys.
*
* @return this factories pool of ConnectionPool instances.
*/
protected GenericKeyedObjectPool<ConnectionKey, ConnectionPool> getConnectionsPool() {
initConnectionsPool();
return this.connectionsPool;
}
/**
* Sets the number of milliseconds to sleep between runs of the idle Connection eviction thread.
* When non-positive, no idle object eviction thread will be run, and Connections will only be
* checked on borrow to determine if they have sat idle for too long or have failed for some
* other reason.
* <p/>
* By default this value is set to -1 and no expiration thread ever runs.
*
* @param timeBetweenExpirationCheckMillis
* The time to wait between runs of the idle Connection eviction thread.
*/
public void setTimeBetweenExpirationCheckMillis(long timeBetweenExpirationCheckMillis) {
getConnectionsPool().setTimeBetweenEvictionRunsMillis(timeBetweenExpirationCheckMillis);
}
/**
* @return the number of milliseconds to sleep between runs of the idle connection eviction thread.
*/
public long getTimeBetweenExpirationCheckMillis() {
return getConnectionsPool().getTimeBetweenEvictionRunsMillis();
}
/**
* @return the number of Connections currently in the Pool
*/
public int getNumConnections() {
return getConnectionsPool().getNumIdle();
}
/**
* Delegate that creates each instance of an ConnectionPool object. Subclasses can override
* this method to customize the type of connection pool returned.
*
* @param connection
*
* @return instance of a new ConnectionPool.
*/
protected ConnectionPool createConnectionPool(Connection connection) {
return new ConnectionPool(connection);
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
package org.apache.activemq.jms.pool;
import javax.jms.JMSException;
import javax.jms.Message;

View File

@ -14,21 +14,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
package org.apache.activemq.jms.pool;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import org.apache.activemq.ActiveMQMessageProducer;
/**
* A pooled {@link MessageProducer}
*/
public class PooledProducer implements MessageProducer {
private final ActiveMQMessageProducer messageProducer;
private final MessageProducer messageProducer;
private final Destination destination;
private int deliveryMode;
@ -37,7 +35,7 @@ public class PooledProducer implements MessageProducer {
private int priority;
private long timeToLive;
public PooledProducer(ActiveMQMessageProducer messageProducer, Destination destination) throws JMSException {
public PooledProducer(MessageProducer messageProducer, Destination destination) throws JMSException {
this.messageProducer = messageProducer;
this.destination = destination;
@ -72,7 +70,7 @@ public class PooledProducer implements MessageProducer {
if (destination == null) {
destination = this.destination;
}
ActiveMQMessageProducer messageProducer = getMessageProducer();
MessageProducer messageProducer = getMessageProducer();
// just in case let only one thread send at once
synchronized (messageProducer) {
@ -137,7 +135,7 @@ public class PooledProducer implements MessageProducer {
// Implementation methods
// -------------------------------------------------------------------------
protected ActiveMQMessageProducer getMessageProducer() {
protected MessageProducer getMessageProducer() {
return messageProducer;
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
package org.apache.activemq.jms.pool;
import javax.jms.Destination;
import javax.jms.JMSException;
@ -22,14 +22,12 @@ import javax.jms.Message;
import javax.jms.Queue;
import javax.jms.QueueSender;
import org.apache.activemq.ActiveMQQueueSender;
/**
* {@link QueueSender} instance that is created and managed by the PooledConnection.
*/
public class PooledQueueSender extends PooledProducer implements QueueSender {
public PooledQueueSender(ActiveMQQueueSender messageProducer, Destination destination) throws JMSException {
public PooledQueueSender(QueueSender messageProducer, Destination destination) throws JMSException {
super(messageProducer, destination);
}
@ -48,7 +46,7 @@ public class PooledQueueSender extends PooledProducer implements QueueSender {
return (Queue) getDestination();
}
protected ActiveMQQueueSender getQueueSender() {
return (ActiveMQQueueSender) getMessageProducer();
protected QueueSender getQueueSender() {
return (QueueSender) getMessageProducer();
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
package org.apache.activemq.jms.pool;
import java.io.Serializable;
import java.util.Iterator;
@ -46,12 +46,6 @@ import javax.jms.TopicSubscriber;
import javax.jms.XASession;
import javax.transaction.xa.XAResource;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.ActiveMQQueueSender;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.ActiveMQTopicPublisher;
import org.apache.activemq.AlreadyClosedException;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.commons.pool.KeyedObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -66,19 +60,19 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners =
new CopyOnWriteArrayList<PooledSessionEventListener>();
private ActiveMQSession session;
private ActiveMQMessageProducer messageProducer;
private ActiveMQQueueSender queueSender;
private ActiveMQTopicPublisher topicPublisher;
private Session session;
private MessageProducer messageProducer;
private QueueSender queueSender;
private TopicPublisher topicPublisher;
private boolean transactional = true;
private boolean ignoreClose;
private boolean isXa;
public PooledSession(SessionKey key, ActiveMQSession session, KeyedObjectPool<SessionKey, PooledSession> sessionPool) {
public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey, PooledSession> sessionPool, boolean transactional) {
this.key = key;
this.session = session;
this.sessionPool = sessionPool;
this.transactional = session.isTransacted();
this.transactional = transactional;
}
public void addSessionEventListener(PooledSessionEventListener listener) {
@ -155,7 +149,9 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
try {
sessionPool.returnObject(key, this);
} catch (Exception e) {
throw JMSExceptionSupport.create(e);
javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString());
illegalStateException.initCause(e);
throw illegalStateException;
}
}
}
@ -271,10 +267,10 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
@Override
public XAResource getXAResource() {
if (session == null) {
throw new IllegalStateException("Session is closed");
if (session instanceof XASession) {
return ((XASession)session).getXAResource();
}
return session.getTransactionContext();
return null;
}
@Override
@ -338,22 +334,22 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
@Override
public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
return addTopicSubscriber(getInternalSession().createSubscriber(topic));
return addTopicSubscriber(((TopicSession)getInternalSession()).createSubscriber(topic));
}
@Override
public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
return addTopicSubscriber(getInternalSession().createSubscriber(topic, selector, local));
return addTopicSubscriber(((TopicSession)getInternalSession()).createSubscriber(topic, selector, local));
}
@Override
public QueueReceiver createReceiver(Queue queue) throws JMSException {
return addQueueReceiver(getInternalSession().createReceiver(queue));
return addQueueReceiver(((QueueSession)getInternalSession()).createReceiver(queue));
}
@Override
public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
return addQueueReceiver(getInternalSession().createReceiver(queue, selector));
return addQueueReceiver(((QueueSession)getInternalSession()).createReceiver(queue, selector));
}
// Producer related methods
@ -387,30 +383,30 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
consumers.remove(consumer);
}
public ActiveMQSession getInternalSession() throws AlreadyClosedException {
public Session getInternalSession() throws IllegalStateException {
if (session == null) {
throw new AlreadyClosedException("The session has already been closed");
throw new IllegalStateException("The session has already been closed");
}
return session;
}
public ActiveMQMessageProducer getMessageProducer() throws JMSException {
public MessageProducer getMessageProducer() throws JMSException {
if (messageProducer == null) {
messageProducer = (ActiveMQMessageProducer) getInternalSession().createProducer(null);
messageProducer = getInternalSession().createProducer(null);
}
return messageProducer;
}
public ActiveMQQueueSender getQueueSender() throws JMSException {
public QueueSender getQueueSender() throws JMSException {
if (queueSender == null) {
queueSender = (ActiveMQQueueSender) getInternalSession().createSender(null);
queueSender = ((QueueSession)getInternalSession()).createSender(null);
}
return queueSender;
}
public ActiveMQTopicPublisher getTopicPublisher() throws JMSException {
public TopicPublisher getTopicPublisher() throws JMSException {
if (topicPublisher == null) {
topicPublisher = (ActiveMQTopicPublisher) getInternalSession().createPublisher(null);
topicPublisher = ((TopicSession)getInternalSession()).createPublisher(null);
}
return topicPublisher;
}

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.pool;
package org.apache.activemq.jms.pool;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
package org.apache.activemq.jms.pool;
import javax.jms.Destination;
import javax.jms.JMSException;
@ -22,14 +22,12 @@ import javax.jms.Message;
import javax.jms.Topic;
import javax.jms.TopicPublisher;
import org.apache.activemq.ActiveMQTopicPublisher;
/**
* A {@link TopicPublisher} instance that is created and managed by a PooledConnection.
*/
public class PooledTopicPublisher extends PooledProducer implements TopicPublisher {
public PooledTopicPublisher(ActiveMQTopicPublisher messageProducer, Destination destination) throws JMSException {
public PooledTopicPublisher(TopicPublisher messageProducer, Destination destination) throws JMSException {
super(messageProducer, destination);
}
@ -58,7 +56,7 @@ public class PooledTopicPublisher extends PooledProducer implements TopicPublish
getTopicPublisher().publish(topic, message, i, i1, l);
}
protected ActiveMQTopicPublisher getTopicPublisher() {
return (ActiveMQTopicPublisher) getMessageProducer();
protected TopicPublisher getTopicPublisher() {
return (TopicPublisher) getMessageProducer();
}
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
package org.apache.activemq.jms.pool;
/**
* A cache key for the session details used to locate PooledSession intances.

View File

@ -14,18 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
package org.apache.activemq.jms.pool;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.XAConnection;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import org.apache.activemq.ActiveMQConnection;
/**
* An XA-aware connection pool. When a session is created and an xa transaction is active,
* the session will automatically be enlisted in the current transaction.
@ -36,11 +36,16 @@ public class XaConnectionPool extends ConnectionPool {
private final TransactionManager transactionManager;
public XaConnectionPool(ActiveMQConnection connection, TransactionManager transactionManager) {
public XaConnectionPool(Connection connection, TransactionManager transactionManager) {
super(connection);
this.transactionManager = transactionManager;
}
@Override
protected Session makeSession(SessionKey key) throws JMSException {
return ((XAConnection)connection).createXASession();
}
@Override
public Session createSession(boolean transacted, int ackMode) throws JMSException {
try {

View File

@ -0,0 +1,145 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.pool;
import java.io.Serializable;
import java.util.Hashtable;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.XAConnectionFactory;
import javax.naming.Binding;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.Name;
import javax.naming.NamingEnumeration;
import javax.naming.spi.ObjectFactory;
import javax.transaction.TransactionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A pooled connection factory that automatically enlists
* sessions in the current active XA transaction if any.
*/
public class XaPooledConnectionFactory extends PooledConnectionFactory implements ObjectFactory,
Serializable, QueueConnectionFactory, TopicConnectionFactory {
private static final transient Logger LOG = LoggerFactory.getLogger(XaPooledConnectionFactory.class);
private TransactionManager transactionManager;
private boolean tmFromJndi = false;
private String tmJndiName = "java:/TransactionManager";
public TransactionManager getTransactionManager() {
if (transactionManager == null && tmFromJndi) {
try {
transactionManager = (TransactionManager) new InitialContext().lookup(getTmJndiName());
} catch (Throwable ignored) {
if (LOG.isTraceEnabled()) {
LOG.trace("exception on tmFromJndi: " + getTmJndiName(), ignored);
}
}
}
return transactionManager;
}
public void setTransactionManager(TransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
@Override
protected ConnectionPool createConnectionPool(Connection connection) {
return new XaConnectionPool(connection, getTransactionManager());
}
@Override
public Object getObjectInstance(Object obj, Name name, Context nameCtx, Hashtable<?, ?> environment) throws Exception {
setTmFromJndi(true);
configFromJndiConf(obj);
if (environment != null) {
IntrospectionSupport.setProperties(this, environment);
}
return this;
}
private void configFromJndiConf(Object rootContextName) {
if (rootContextName instanceof String) {
String name = (String) rootContextName;
name = name.substring(0, name.lastIndexOf('/')) + "/conf" + name.substring(name.lastIndexOf('/'));
try {
InitialContext ctx = new InitialContext();
NamingEnumeration bindings = ctx.listBindings(name);
while (bindings.hasMore()) {
Binding bd = (Binding)bindings.next();
IntrospectionSupport.setProperty(this, bd.getName(), bd.getObject());
}
} catch (Exception ignored) {
if (LOG.isTraceEnabled()) {
LOG.trace("exception on config from jndi: " + name, ignored);
}
}
}
}
public String getTmJndiName() {
return tmJndiName;
}
public void setTmJndiName(String tmJndiName) {
this.tmJndiName = tmJndiName;
}
public boolean isTmFromJndi() {
return tmFromJndi;
}
/**
* Allow transaction manager resolution from JNDI (ee deployment)
* @param tmFromJndi
*/
public void setTmFromJndi(boolean tmFromJndi) {
this.tmFromJndi = tmFromJndi;
}
@Override
public QueueConnection createQueueConnection() throws JMSException {
return (QueueConnection) createConnection();
}
@Override
public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
return (QueueConnection) createConnection(userName, password);
}
@Override
public TopicConnection createTopicConnection() throws JMSException {
return (TopicConnection) createConnection();
}
@Override
public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
return (TopicConnection) createConnection(userName, password);
}
}

View File

@ -20,9 +20,6 @@
<body>
A JMS provider which pools Connection, Session and MessageProducer instances so it can be used with tools like
Spring's <a href="http://activemq.org/Spring+Support">JmsTemplate</a>. <b>Note</b> that this package
does not deal with pooling of consumers; for that look at a library like <a href="http://jencks.org/">Jencks</a>
such as in <a href="http://jencks.org/Message+Driven+POJOs">this example</a>
Spring's <a href="http://activemq.org/Spring+Support">JmsTemplate</a>.
</body>
</html>

View File

@ -14,14 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
package org.apache.activemq.jms.pool;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
@ -41,35 +40,36 @@ public class ConnectionExpiryEvictsFromPoolTest extends TestSupport {
TransportConnector connector = broker.addConnector("tcp://localhost:0");
broker.start();
factory = new ActiveMQConnectionFactory("mock:" + connector.getConnectUri());
pooledFactory = new PooledConnectionFactory(factory);
pooledFactory = new PooledConnectionFactory();
pooledFactory.setConnectionFactory(factory);
pooledFactory.setMaxConnections(1);
}
public void testEvictionOfIdle() throws Exception {
pooledFactory.setIdleTimeout(10);
PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
ActiveMQConnection amq1 = connection.getConnection();
Connection amq1 = connection.getConnection();
connection.close();
// let it idle timeout
TimeUnit.SECONDS.sleep(1);
PooledConnection connection2 = (PooledConnection) pooledFactory.createConnection();
ActiveMQConnection amq2 = connection2.getConnection();
Connection amq2 = connection2.getConnection();
assertTrue("not equal", !amq1.equals(amq2));
}
public void testEvictionOfExpired() throws Exception {
pooledFactory.setExpiryTimeout(10);
Connection connection = pooledFactory.createConnection();
ActiveMQConnection amq1 = ((PooledConnection) connection).getConnection();
Connection amq1 = ((PooledConnection) connection).getConnection();
// let it expire while in use
TimeUnit.SECONDS.sleep(1);
connection.close();
Connection connection2 = pooledFactory.createConnection();
ActiveMQConnection amq2 = ((PooledConnection) connection2).getConnection();
Connection amq2 = ((PooledConnection) connection2).getConnection();
assertTrue("not equal", !amq1.equals(amq2));
}
@ -94,7 +94,7 @@ public class ConnectionExpiryEvictsFromPoolTest extends TestSupport {
assertTrue("Session should be fine, instead: " + e.getMessage(), false);
}
ActiveMQConnection original = connection.getConnection();
Connection original = connection.getConnection();
connection.close();
connection2.close();

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
package org.apache.activemq.jms.pool;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
@ -78,7 +78,8 @@ public class PooledConnectionFactoryMaximumActiveTest extends TestCase {
public void testApp() throws Exception {
// Initialize JMS connection
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
PooledConnectionFactory cf = new PooledConnectionFactory(amq);
PooledConnectionFactory cf = new PooledConnectionFactory();
cf.setConnectionFactory(amq);
cf.setMaxConnections(3);
cf.setMaximumActiveSessionPerConnection(1);
cf.setBlockIfSessionPoolIsFull(true);

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
package org.apache.activemq.jms.pool;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -31,7 +31,6 @@ import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.Wait;
import org.apache.log4j.Logger;
@ -69,7 +68,8 @@ public class PooledConnectionFactoryTest extends TestCase {
public void testClearAllConnections() throws Exception {
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
PooledConnectionFactory cf = new PooledConnectionFactory(amq);
PooledConnectionFactory cf = new PooledConnectionFactory();
cf.setConnectionFactory(amq);
cf.setMaxConnections(3);
PooledConnection conn1 = (PooledConnection) cf.createConnection();
@ -98,7 +98,8 @@ public class PooledConnectionFactoryTest extends TestCase {
public void testMaxConnectionsAreCreated() throws Exception {
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
PooledConnectionFactory cf = new PooledConnectionFactory(amq);
PooledConnectionFactory cf = new PooledConnectionFactory();
cf.setConnectionFactory(amq);
cf.setMaxConnections(3);
PooledConnection conn1 = (PooledConnection) cf.createConnection();
@ -115,10 +116,11 @@ public class PooledConnectionFactoryTest extends TestCase {
public void testConnectionsAreRotated() throws Exception {
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
PooledConnectionFactory cf = new PooledConnectionFactory(amq);
PooledConnectionFactory cf = new PooledConnectionFactory();
cf.setConnectionFactory(amq);
cf.setMaxConnections(10);
ActiveMQConnection previous = null;
Connection previous = null;
// Front load the pool.
for (int i = 0; i < 10; ++i) {
@ -126,7 +128,7 @@ public class PooledConnectionFactoryTest extends TestCase {
}
for (int i = 0; i < 100; ++i) {
ActiveMQConnection current = ((PooledConnection) cf.createConnection()).getConnection();
Connection current = ((PooledConnection) cf.createConnection()).getConnection();
assertNotSame(previous, current);
previous = current;
}
@ -135,7 +137,8 @@ public class PooledConnectionFactoryTest extends TestCase {
public void testConnectionsArePooled() throws Exception {
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
PooledConnectionFactory cf = new PooledConnectionFactory(amq);
PooledConnectionFactory cf = new PooledConnectionFactory();
cf.setConnectionFactory(amq);
cf.setMaxConnections(1);
PooledConnection conn1 = (PooledConnection) cf.createConnection();
@ -152,7 +155,8 @@ public class PooledConnectionFactoryTest extends TestCase {
public void testConnectionsArePooledAsyncCreate() throws Exception {
final ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
final PooledConnectionFactory cf = new PooledConnectionFactory(amq);
final PooledConnectionFactory cf = new PooledConnectionFactory();
cf.setConnectionFactory(amq);
cf.setMaxConnections(1);
final ConcurrentLinkedQueue<PooledConnection> connections = new ConcurrentLinkedQueue<PooledConnection>();
@ -233,7 +237,8 @@ public class PooledConnectionFactoryTest extends TestCase {
// wait at most 5 seconds for the call to createSession
try {
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
PooledConnectionFactory cf = new PooledConnectionFactory(amq);
PooledConnectionFactory cf = new PooledConnectionFactory();
cf.setConnectionFactory(amq);
cf.setMaxConnections(3);
cf.setMaximumActiveSessionPerConnection(1);
cf.setBlockIfSessionPoolIsFull(false);

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
package org.apache.activemq.jms.pool;
import javax.jms.Queue;
import javax.jms.Session;
@ -47,7 +47,8 @@ public class PooledConnectionFactoryWithTemporaryDestinationsTest extends TestSu
TransportConnector connector = broker.addConnector("tcp://localhost:0");
broker.start();
factory = new ActiveMQConnectionFactory("mock:" + connector.getConnectUri() + "?closeAsync=false");
pooledFactory = new PooledConnectionFactory(factory);
pooledFactory = new PooledConnectionFactory();
pooledFactory.setConnectionFactory(factory);
}
protected void tearDown() throws Exception {

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.pool;
package org.apache.activemq.jms.pool;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
@ -78,7 +78,8 @@ public class PooledConnectionSessionCleanupTest {
// Set a long idle timeout on the pooled connections to better show the
// problem of holding onto created resources on close.
directConnFact = new ActiveMQConnectionFactory(service.getVmConnectorURI());
pooledConnFact = new PooledConnectionFactory(directConnFact);
pooledConnFact = new PooledConnectionFactory();
pooledConnFact.setConnectionFactory(directConnFact);
pooledConnFact.setIdleTimeout((int)TimeUnit.MINUTES.toMillis(60));
pooledConnFact.setMaxConnections(1);

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.pool;
package org.apache.activemq.jms.pool;
import static org.junit.Assert.assertTrue;
@ -73,7 +73,8 @@ public class PooledConnectionTempDestCleanupTest {
// Create the ActiveMQConnectionFactory and the PooledConnectionFactory.
directConnFact = new ActiveMQConnectionFactory(embeddedBroker.getVmConnectorURI());
pooledConnFact = new PooledConnectionFactory(directConnFact);
pooledConnFact = new PooledConnectionFactory();
pooledConnFact.setConnectionFactory(directConnFact);
// Prepare the connections
directConn1 = directConnFact.createConnection();

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
package org.apache.activemq.jms.pool;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@ -22,6 +22,7 @@ import javax.jms.IllegalStateException;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -104,8 +105,9 @@ public class PooledConnectionTest extends TestCase {
}
protected ConnectionFactory createPooledConnectionFactory() {
ConnectionFactory cf = new PooledConnectionFactory("vm://localhost?broker.persistent=false");
((PooledConnectionFactory)cf).setMaxConnections(1);
PooledConnectionFactory cf = new PooledConnectionFactory();
cf.setConnectionFactory(new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"));
cf.setMaxConnections(1);
log.debug("ConnectionFactory initialized.");
return cf;
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
package org.apache.activemq.jms.pool;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@ -53,7 +53,8 @@ public class PooledSessionExhaustionTest extends TestCase {
broker.start();
connectionUri = connector.getPublishableConnectString();
factory = new ActiveMQConnectionFactory(connectionUri);
pooledFactory = new PooledConnectionFactory(factory);
pooledFactory = new PooledConnectionFactory();
pooledFactory.setConnectionFactory(factory);
pooledFactory.setMaxConnections(1);
pooledFactory.setBlockIfSessionPoolIsFull(false);
}

View File

@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
package org.apache.activemq.jms.pool;
import static org.junit.Assert.assertEquals;
@ -43,7 +43,8 @@ public class PooledSessionTest {
broker.start();
connectionUri = connector.getPublishableConnectString();
factory = new ActiveMQConnectionFactory(connectionUri);
pooledFactory = new PooledConnectionFactory(factory);
pooledFactory = new PooledConnectionFactory();
pooledFactory.setConnectionFactory(factory);
pooledFactory.setMaxConnections(1);
pooledFactory.setBlockIfSessionPoolIsFull(false);
}

View File

@ -14,11 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
package org.apache.activemq.jms.pool;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Session;
@ -26,7 +27,6 @@ import javax.jms.TopicConnection;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
@ -80,7 +80,7 @@ public class PooledTopicPublisherTest extends TestSupport {
pcf.setConnectionFactory(new ActiveMQConnectionFactory(uri));
PooledConnection conn = (PooledConnection) pcf.createConnection();
ActiveMQConnection amq = conn.getConnection();
Connection amq = conn.getConnection();
assertNotNull(amq);
final CountDownLatch gotException = new CountDownLatch(1);
conn.setExceptionListener(new ExceptionListener() {

View File

@ -0,0 +1,348 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.jms.pool;
import java.util.Hashtable;
import java.util.Vector;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.naming.spi.ObjectFactory;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.InvalidTransactionException;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.ActiveMQXASession;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.test.TestSupport;
public class XAConnectionPoolTest extends TestSupport {
// https://issues.apache.org/jira/browse/AMQ-3251
public void testAfterCompletionCanClose() throws Exception {
final Vector<Synchronization> syncs = new Vector<Synchronization>();
ActiveMQTopic topic = new ActiveMQTopic("test");
XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false"));
// simple TM that is in a tx and will track syncs
pcf.setTransactionManager(new TransactionManager(){
@Override
public void begin() throws NotSupportedException, SystemException {
}
@Override
public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
}
@Override
public int getStatus() throws SystemException {
return Status.STATUS_ACTIVE;
}
@Override
public Transaction getTransaction() throws SystemException {
return new Transaction() {
@Override
public void commit() throws HeuristicMixedException, HeuristicRollbackException, RollbackException, SecurityException, SystemException {
}
@Override
public boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException {
return false;
}
@Override
public boolean enlistResource(XAResource xaRes) throws IllegalStateException, RollbackException, SystemException {
return false;
}
@Override
public int getStatus() throws SystemException {
return 0;
}
@Override
public void registerSynchronization(Synchronization synch) throws IllegalStateException, RollbackException, SystemException {
syncs.add(synch);
}
@Override
public void rollback() throws IllegalStateException, SystemException {
}
@Override
public void setRollbackOnly() throws IllegalStateException, SystemException {
}
};
}
@Override
public void resume(Transaction tobj) throws IllegalStateException, InvalidTransactionException, SystemException {
}
@Override
public void rollback() throws IllegalStateException, SecurityException, SystemException {
}
@Override
public void setRollbackOnly() throws IllegalStateException, SystemException {
}
@Override
public void setTransactionTimeout(int seconds) throws SystemException {
}
@Override
public Transaction suspend() throws SystemException {
return null;
}
});
TopicConnection connection = (TopicConnection) pcf.createConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
assertTrue(session instanceof PooledSession);
PooledSession pooledSession = (PooledSession) session;
assertTrue(pooledSession.getInternalSession() instanceof ActiveMQXASession);
TopicPublisher publisher = session.createPublisher(topic);
publisher.publish(session.createMessage());
// simulate a commit
for (Synchronization sync : syncs) {
sync.beforeCompletion();
}
for (Synchronization sync : syncs) {
sync.afterCompletion(1);
}
connection.close();
}
public void testAckModeOfPoolNonXAWithTM() throws Exception {
final Vector<Synchronization> syncs = new Vector<Synchronization>();
ActiveMQTopic topic = new ActiveMQTopic("test");
XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false&jms.xaAckMode=" + Session.CLIENT_ACKNOWLEDGE));
// simple TM that is in a tx and will track syncs
pcf.setTransactionManager(new TransactionManager(){
@Override
public void begin() throws NotSupportedException, SystemException {
}
@Override
public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
}
@Override
public int getStatus() throws SystemException {
return Status.STATUS_ACTIVE;
}
@Override
public Transaction getTransaction() throws SystemException {
return new Transaction() {
@Override
public void commit() throws HeuristicMixedException, HeuristicRollbackException, RollbackException, SecurityException, SystemException {
}
@Override
public boolean delistResource(XAResource xaRes, int flag) throws IllegalStateException, SystemException {
return false;
}
@Override
public boolean enlistResource(XAResource xaRes) throws IllegalStateException, RollbackException, SystemException {
return false;
}
@Override
public int getStatus() throws SystemException {
return 0;
}
@Override
public void registerSynchronization(Synchronization synch) throws IllegalStateException, RollbackException, SystemException {
syncs.add(synch);
}
@Override
public void rollback() throws IllegalStateException, SystemException {
}
@Override
public void setRollbackOnly() throws IllegalStateException, SystemException {
}
};
}
@Override
public void resume(Transaction tobj) throws IllegalStateException, InvalidTransactionException, SystemException {
}
@Override
public void rollback() throws IllegalStateException, SecurityException, SystemException {
}
@Override
public void setRollbackOnly() throws IllegalStateException, SystemException {
}
@Override
public void setTransactionTimeout(int seconds) throws SystemException {
}
@Override
public Transaction suspend() throws SystemException {
return null;
}
});
TopicConnection connection = (TopicConnection) pcf.createConnection();
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
assertEquals("client ack is enforce", Session.CLIENT_ACKNOWLEDGE, session.getAcknowledgeMode());
TopicPublisher publisher = session.createPublisher(topic);
publisher.publish(session.createMessage());
// simulate a commit
for (Synchronization sync : syncs) {
sync.beforeCompletion();
}
for (Synchronization sync : syncs) {
sync.afterCompletion(1);
}
connection.close();
}
public void testInstanceOf() throws Exception {
XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
assertTrue(pcf instanceof QueueConnectionFactory);
assertTrue(pcf instanceof TopicConnectionFactory);
}
public void testBindable() throws Exception {
XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
assertTrue(pcf instanceof ObjectFactory);
assertTrue(((ObjectFactory)pcf).getObjectInstance(null, null, null, null) instanceof XaPooledConnectionFactory);
assertTrue(pcf.isTmFromJndi());
}
public void testBindableEnvOverrides() throws Exception {
XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
assertTrue(pcf instanceof ObjectFactory);
Hashtable<String, String> environment = new Hashtable<String, String>();
environment.put("tmFromJndi", String.valueOf(Boolean.FALSE));
assertTrue(((ObjectFactory) pcf).getObjectInstance(null, null, null, environment) instanceof XaPooledConnectionFactory);
assertFalse(pcf.isTmFromJndi());
}
public void testSenderAndPublisherDest() throws Exception {
XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false"));
QueueConnection connection = pcf.createQueueConnection();
QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
QueueSender sender = session.createSender(session.createQueue("AA"));
assertNotNull(sender.getQueue().getQueueName());
connection.close();
TopicConnection topicConnection = pcf.createTopicConnection();
TopicSession topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
TopicPublisher topicPublisher = topicSession.createPublisher(topicSession.createTopic("AA"));
assertNotNull(topicPublisher.getTopic().getTopicName());
topicConnection.close();
}
public void testSessionArgsIgnoredWithTm() throws Exception {
XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false"));
// simple TM that with no tx
pcf.setTransactionManager(new TransactionManager() {
@Override
public void begin() throws NotSupportedException, SystemException {
throw new SystemException("NoTx");
}
@Override
public void commit() throws HeuristicMixedException, HeuristicRollbackException, IllegalStateException, RollbackException, SecurityException, SystemException {
throw new IllegalStateException("NoTx");
}
@Override
public int getStatus() throws SystemException {
return Status.STATUS_NO_TRANSACTION;
}
@Override
public Transaction getTransaction() throws SystemException {
throw new SystemException("NoTx");
}
@Override
public void resume(Transaction tobj) throws IllegalStateException, InvalidTransactionException, SystemException {
throw new IllegalStateException("NoTx");
}
@Override
public void rollback() throws IllegalStateException, SecurityException, SystemException {
throw new IllegalStateException("NoTx");
}
@Override
public void setRollbackOnly() throws IllegalStateException, SystemException {
throw new IllegalStateException("NoTx");
}
@Override
public void setTransactionTimeout(int seconds) throws SystemException {
}
@Override
public Transaction suspend() throws SystemException {
throw new SystemException("NoTx");
}
});
QueueConnection connection = pcf.createQueueConnection();
// like ee tck
assertNotNull("can create session(false, 0)", connection.createQueueSession(false, 0));
connection.close();
}
}

View File

@ -1,4 +1,4 @@
package org.apache.activemq.pool.bugs;
package org.apache.activemq.jms.pool.bugs;
import static org.junit.Assert.fail;
@ -8,9 +8,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.pool.PooledConnection;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.jms.pool.PooledConnection;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -42,11 +43,12 @@ public class AMQ4441Test {
public void demo() throws JMSException, InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean done = new AtomicBoolean(false);
final PooledConnectionFactory pooled = new PooledConnectionFactory("vm://localhost?create=false");
final PooledConnectionFactory pooled = new PooledConnectionFactory();
pooled.setConnectionFactory(new ActiveMQConnectionFactory("vm://localhost?create=false"));
pooled.setMaxConnections(2);
pooled.setExpiryTimeout(10L);
pooled.start();
//pooled.start();
Thread[] threads = new Thread[10];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(new Runnable() {

View File

@ -121,7 +121,7 @@
<property name="connectionFactory" ref="activemqConnectionFactory" />
</bean>
<bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource">
<bean id="resourceManager" class="org.apache.activemq.jms.pool.GenericResourceManager" init-method="recoverResource">
<property name="transactionManager" ref="transactionManager" />
<property name="connectionFactory" ref="activemqConnectionFactory" />
<property name="resourceName" value="activemq.${name}" />

View File

@ -32,6 +32,7 @@
<properties>
<activemq.osgi.import.pkg>
javax.transaction*;resolution:=optional,
org.apache.activemq.jms.pool*;resolution:=optional,
org.apache.activemq.ra*;resolution:=optional,
org.apache.geronimo.transaction.manager*;resolution:=optional,
*
@ -52,12 +53,11 @@
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-client</artifactId>
<artifactId>activemq-jms-pool</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-ra</artifactId>
<optional>true</optional>
<artifactId>activemq-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.geronimo.components</groupId>
@ -77,6 +77,11 @@
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-broker</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-broker</artifactId>

View File

@ -16,169 +16,10 @@
*/
package org.apache.activemq.pool;
import java.io.IOException;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
import javax.jms.JMSException;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import org.apache.geronimo.transaction.manager.NamedXAResourceFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.geronimo.transaction.manager.RecoverableTransactionManager;
import org.apache.geronimo.transaction.manager.NamedXAResource;
import org.apache.geronimo.transaction.manager.WrapperNamedXAResource;
import org.apache.activemq.jms.pool.GenericResourceManager;
/**
* This class allows wiring the ActiveMQ broker and the Geronimo transaction manager
* in a way that will allow the transaction manager to correctly recover XA transactions.
*
* For example, it can be used the following way:
* <pre>
* <bean id="activemqConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
* <property name="brokerURL" value="tcp://localhost:61616" />
* </bean>
*
* <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryFactoryBean">
* <property name="maxConnections" value="8" />
* <property name="transactionManager" ref="transactionManager" />
* <property name="connectionFactory" ref="activemqConnectionFactory" />
* <property name="resourceName" value="activemq.broker" />
* </bean>
*
* <bean id="resourceManager" class="org.apache.activemq.pool.ActiveMQResourceManager" init-method="recoverResource">
* <property name="transactionManager" ref="transactionManager" />
* <property name="connectionFactory" ref="activemqConnectionFactory" />
* <property name="resourceName" value="activemq.broker" />
* </bean>
* </pre>
* @Deprecated use {@link org.apache.activemq.jms.pool.GenericResourceManager}
*/
public class ActiveMQResourceManager {
private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMQResourceManager.class);
private String resourceName;
private TransactionManager transactionManager;
private ConnectionFactory connectionFactory;
public void recoverResource() {
try {
if (!Recovery.recover(this)) {
LOGGER.info("Resource manager is unrecoverable");
}
} catch (NoClassDefFoundError e) {
LOGGER.info("Resource manager is unrecoverable due to missing classes: " + e);
} catch (Throwable e) {
LOGGER.warn("Error while recovering resource manager", e);
}
}
public String getResourceName() {
return resourceName;
}
public void setResourceName(String resourceName) {
this.resourceName = resourceName;
}
public TransactionManager getTransactionManager() {
return transactionManager;
}
public void setTransactionManager(TransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
/**
* This class will ensure the broker is properly recovered when wired with
* the Geronimo transaction manager.
*/
public static class Recovery {
public static boolean isRecoverable(ActiveMQResourceManager rm) {
return rm.getConnectionFactory() instanceof ActiveMQConnectionFactory &&
rm.getTransactionManager() instanceof RecoverableTransactionManager &&
rm.getResourceName() != null && !"".equals(rm.getResourceName());
}
public static boolean recover(final ActiveMQResourceManager rm) throws IOException {
if (isRecoverable(rm)) {
try {
final ActiveMQConnectionFactory connFactory = (ActiveMQConnectionFactory) rm.getConnectionFactory();
ActiveMQConnection activeConn = (ActiveMQConnection)connFactory.createConnection();
final ActiveMQSession session = (ActiveMQSession)activeConn.createSession(true, Session.SESSION_TRANSACTED);
NamedXAResource namedXaResource = new WrapperNamedXAResource(session.getTransactionContext(), rm.getResourceName());
RecoverableTransactionManager rtxManager = (RecoverableTransactionManager) rm.getTransactionManager();
rtxManager.registerNamedXAResourceFactory(new NamedXAResourceFactory() {
@Override
public String getName() {
return rm.getResourceName();
}
@Override
public NamedXAResource getNamedXAResource() throws SystemException {
try {
final ActiveMQConnection activeConn = (ActiveMQConnection)connFactory.createConnection();
final ActiveMQSession session = (ActiveMQSession)activeConn.createSession(true, Session.SESSION_TRANSACTED);
activeConn.start();
LOGGER.debug("new namedXAResource's connection: " + activeConn);
return new ConnectionAndWrapperNamedXAResource(session.getTransactionContext(), getName(), activeConn);
} catch (Exception e) {
SystemException se = new SystemException("Failed to create ConnectionAndWrapperNamedXAResource, " + e.getLocalizedMessage());
se.initCause(e);
LOGGER.error(se.getLocalizedMessage(), se);
throw se;
}
}
@Override
public void returnNamedXAResource(NamedXAResource namedXaResource) {
if (namedXaResource instanceof ConnectionAndWrapperNamedXAResource) {
try {
LOGGER.debug("closing returned namedXAResource's connection: " + ((ConnectionAndWrapperNamedXAResource)namedXaResource).connection);
((ConnectionAndWrapperNamedXAResource)namedXaResource).connection.close();
} catch (Exception ignored) {
LOGGER.debug("failed to close returned namedXAResource: " + namedXaResource, ignored);
}
}
}
});
return true;
} catch (JMSException e) {
throw IOExceptionSupport.create(e);
}
} else {
return false;
}
}
}
public static class ConnectionAndWrapperNamedXAResource extends WrapperNamedXAResource {
final ActiveMQConnection connection;
public ConnectionAndWrapperNamedXAResource(XAResource xaResource, String name, ActiveMQConnection connection) {
super(xaResource, name);
this.connection = connection;
}
}
public class ActiveMQResourceManager extends GenericResourceManager {
}

View File

@ -1,113 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
import java.util.Properties;
import javax.naming.NamingException;
import javax.naming.Reference;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.jndi.JNDIReferenceFactory;
import org.apache.activemq.jndi.JNDIStorableInterface;
/**
* AmqJNDIPooledConnectionFactory.java
* Created by linus on 2008-03-07.
*/
public class AmqJNDIPooledConnectionFactory extends PooledConnectionFactory
implements JNDIStorableInterface {
private Properties properties;
public AmqJNDIPooledConnectionFactory() {
super();
}
public AmqJNDIPooledConnectionFactory(String brokerURL) {
super(brokerURL);
}
public AmqJNDIPooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
super(connectionFactory);
}
/**
* set the properties for this instance as retrieved from JNDI
*
* @param props
*/
public synchronized void setProperties(Properties props) {
this.properties = props;
buildFromProperties(props);
}
/**
* Get the properties from this instance for storing in JNDI
*
* @return the properties
*/
public synchronized Properties getProperties() {
if (this.properties == null) {
this.properties = new Properties();
}
populateProperties(this.properties);
return this.properties;
}
/**
* Retrive a Reference for this instance to store in JNDI
*
* @return the built Reference
* @throws NamingException
* if error on building Reference
*/
public Reference getReference() throws NamingException {
return JNDIReferenceFactory.createReference(this.getClass().getName(),
this);
}
public void buildFromProperties(Properties properties) {
if (properties == null) {
properties = new Properties();
}
((ActiveMQConnectionFactory) getConnectionFactory())
.buildFromProperties(properties);
String temp = properties.getProperty("maximumActive");
if (temp != null && temp.length() > 0) {
setMaximumActiveSessionPerConnection(Integer.parseInt(temp));
}
temp = properties.getProperty("maximumActiveSessionPerConnection");
if (temp != null && temp.length() > 0) {
setMaximumActiveSessionPerConnection(Integer.parseInt(temp));
}
temp = properties.getProperty("maxConnections");
if (temp != null && temp.length() > 0) {
setMaxConnections(Integer.parseInt(temp));
}
}
public void populateProperties(Properties props) {
((ActiveMQConnectionFactory) getConnectionFactory())
.populateProperties(props);
props.setProperty("maximumActive", Integer
.toString(getMaximumActiveSessionPerConnection()));
props.setProperty("maximumActiveSessionPerConnection", Integer
.toString(getMaximumActiveSessionPerConnection()));
props.setProperty("maxConnections", Integer
.toString(getMaxConnections()));
}
}

View File

@ -15,28 +15,20 @@
*/
package org.apache.activemq.pool;
import java.io.IOException;
import javax.jms.Connection;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.jms.pool.ConnectionPool;
import org.apache.activemq.jms.pool.JcaConnectionPool;
import org.apache.activemq.transport.TransportListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*/
public class JcaPooledConnectionFactory extends XaPooledConnectionFactory {
private static final transient Logger LOG = LoggerFactory.getLogger(JcaPooledConnectionFactory.class);
private String name;
public JcaPooledConnectionFactory() {
super();
}
public JcaPooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
super(connectionFactory);
}
public JcaPooledConnectionFactory(String brokerURL) {
super(brokerURL);
}
public String getName() {
return name;
}
@ -45,7 +37,50 @@ public class JcaPooledConnectionFactory extends XaPooledConnectionFactory {
this.name = name;
}
protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
return new JcaConnectionPool(connection, getTransactionManager(), getName());
protected ConnectionPool createConnectionPool(Connection connection) {
return new JcaConnectionPool(connection, getTransactionManager(), getName()) {
@Override
protected Connection wrap(final Connection connection) {
// Add a transport Listener so that we can notice if this connection
// should be expired due to a connection failure.
((ActiveMQConnection)connection).addTransportListener(new TransportListener() {
@Override
public void onCommand(Object command) {
}
@Override
public void onException(IOException error) {
synchronized (this) {
setHasExpired(true);
LOG.info("Expiring connection " + connection + " on IOException: " + error);
LOG.debug("Expiring connection on IOException", error);
}
}
@Override
public void transportInterupted() {
}
@Override
public void transportResumed() {
}
});
// make sure that we set the hasFailed flag, in case the transport already failed
// prior to the addition of our new TransportListener
setHasExpired(((ActiveMQConnection) connection).isTransportFailed());
// may want to return an amq EnhancedConnection
return connection;
}
@Override
protected void unWrap(Connection connection) {
if (connection != null) {
((ActiveMQConnection)connection).cleanUpTempDestinations();
}
}
};
}
}

View File

@ -16,284 +16,19 @@
*/
package org.apache.activemq.pool;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.AlreadyClosedException;
import org.apache.activemq.EnhancedConnection;
import org.apache.activemq.advisory.DestinationSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.activemq.jms.pool.ConnectionPool;
/**
* Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
* {@link QueueConnection} which is pooled and on {@link #close()} will return
* its reference to the ConnectionPool backing it.
*
* <b>NOTE</b> this implementation is only intended for use when sending
* messages. It does not deal with pooling of consumers; for that look at a
* library like <a href="http://jencks.org/">Jencks</a> such as in <a
* href="http://jencks.org/Message+Driven+POJOs">this example</a>
*
*/
public class PooledConnection implements TopicConnection, QueueConnection, EnhancedConnection, PooledSessionEventListener {
private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnection.class);
private ConnectionPool pool;
private volatile boolean stopped;
private final List<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>();
private final List<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>();
private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
/**
* Creates a new PooledConnection instance that uses the given ConnectionPool to create
* and manage its resources. The ConnectionPool instance can be shared amongst many
* PooledConnection instances.
*
* @param pool
* The connection and pool manager backing this proxy connection object.
*/
public PooledConnection(ConnectionPool pool) {
this.pool = pool;
public class PooledConnection extends org.apache.activemq.jms.pool.PooledConnection implements EnhancedConnection {
public PooledConnection(ConnectionPool connection) {
super(connection);
}
/**
* Factory method to create a new instance.
*/
public PooledConnection newInstance() {
return new PooledConnection(pool);
}
@Override
public void close() throws JMSException {
this.cleanupConnectionTemporaryDestinations();
this.cleanupAllLoanedSessions();
if (this.pool != null) {
this.pool.decrementReferenceCount();
this.pool = null;
}
}
@Override
public void start() throws JMSException {
assertNotClosed();
pool.start();
}
@Override
public void stop() throws JMSException {
stopped = true;
}
@Override
public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
}
@Override
public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages);
}
@Override
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i) throws JMSException {
return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i);
}
@Override
public String getClientID() throws JMSException {
return getConnection().getClientID();
}
@Override
public ExceptionListener getExceptionListener() throws JMSException {
return getConnection().getExceptionListener();
}
@Override
public ConnectionMetaData getMetaData() throws JMSException {
return getConnection().getMetaData();
}
@Override
public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
getConnection().setExceptionListener(exceptionListener);
}
@Override
public void setClientID(String clientID) throws JMSException {
// ignore repeated calls to setClientID() with the same client id
// this could happen when a JMS component such as Spring that uses a
// PooledConnectionFactory shuts down and reinitializes.
if (this.getConnection().getClientID() == null || !this.getClientID().equals(clientID)) {
getConnection().setClientID(clientID);
}
}
@Override
public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages);
}
// Session factory methods
// -------------------------------------------------------------------------
@Override
public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException {
return (QueueSession) createSession(transacted, ackMode);
}
@Override
public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException {
return (TopicSession) createSession(transacted, ackMode);
}
@Override
public Session createSession(boolean transacted, int ackMode) throws JMSException {
PooledSession result;
result = (PooledSession) pool.createSession(transacted, ackMode);
// Store the session so we can close the sessions that this PooledConnection
// created in order to ensure that consumers etc are closed per the JMS contract.
loanedSessions.add(result);
// Add a event listener to the session that notifies us when the session
// creates / destroys temporary destinations and closes etc.
result.addSessionEventListener(this);
return result;
}
// EnhancedCollection API
// -------------------------------------------------------------------------
@Override
public DestinationSource getDestinationSource() throws JMSException {
return getConnection().getDestinationSource();
}
// Implementation methods
// -------------------------------------------------------------------------
@Override
public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
connTempQueues.add(tempQueue);
}
@Override
public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
connTempTopics.add(tempTopic);
}
@Override
public void onSessionClosed(PooledSession session) {
if (session != null) {
this.loanedSessions.remove(session);
}
}
public ActiveMQConnection getConnection() throws JMSException {
assertNotClosed();
return pool.getConnection();
}
protected void assertNotClosed() throws AlreadyClosedException {
if (stopped || pool == null) {
throw new AlreadyClosedException();
}
}
protected ActiveMQSession createSession(SessionKey key) throws JMSException {
return (ActiveMQSession) getConnection().createSession(key.isTransacted(), key.getAckMode());
}
@Override
public String toString() {
return "PooledConnection { " + pool + " }";
}
/**
* Remove all of the temporary destinations created for this connection.
* This is important since the underlying connection may be reused over a
* long period of time, accumulating all of the temporary destinations from
* each use. However, from the perspective of the lifecycle from the
* client's view, close() closes the connection and, therefore, deletes all
* of the temporary destinations created.
*/
protected void cleanupConnectionTemporaryDestinations() {
for (TemporaryQueue tempQueue : connTempQueues) {
try {
tempQueue.delete();
} catch (JMSException ex) {
LOG.info("failed to delete Temporary Queue \"" + tempQueue.toString() + "\" on closing pooled connection: " + ex.getMessage());
}
}
connTempQueues.clear();
for (TemporaryTopic tempTopic : connTempTopics) {
try {
tempTopic.delete();
} catch (JMSException ex) {
LOG.info("failed to delete Temporary Topic \"" + tempTopic.toString() + "\" on closing pooled connection: " + ex.getMessage());
}
}
connTempTopics.clear();
}
/**
* The PooledSession tracks all Sessions that it created and now we close them. Closing the
* PooledSession will return the internal Session to the Pool of Session after cleaning up
* all the resources that the Session had allocated for this PooledConnection.
*/
protected void cleanupAllLoanedSessions() {
for (PooledSession session : loanedSessions) {
try {
session.close();
} catch (JMSException ex) {
LOG.info("failed to close laoned Session \"" + session + "\" on closing pooled connection: " + ex.getMessage());
}
}
loanedSessions.clear();
}
/**
* @return the total number of Pooled session including idle sessions that are not
* currently loaned out to any client.
*/
public int getNumSessions() {
return this.pool.getNumSessions();
}
/**
* @return the number of Sessions that are currently checked out of this Connection's session pool.
*/
public int getNumActiveSessions() {
return this.pool.getNumActiveSessions();
}
/**
* @return the number of Sessions that are idle in this Connection's sessions pool.
*/
public int getNumtIdleSessions() {
return this.pool.getNumIdleSessions();
return ((ActiveMQConnection)getConnection()).getDestinationSource();
}
}

View File

@ -16,486 +16,126 @@
*/
package org.apache.activemq.pool;
import java.io.IOException;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.naming.NamingException;
import javax.naming.Reference;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.Service;
import org.apache.activemq.jndi.JNDIBaseStorable;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.commons.pool.KeyedObjectPool;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.activemq.jms.pool.ConnectionPool;
import org.apache.activemq.jndi.JNDIReferenceFactory;
import org.apache.activemq.jndi.JNDIStorableInterface;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IntrospectionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A JMS provider which pools Connection, Session and MessageProducer instances
* so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's
* <a href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
* Connections, sessions and producers are returned to a pool after use so that they can be reused later
* without having to undergo the cost of creating them again.
*
* b>NOTE:</b> while this implementation does allow the creation of a collection of active consumers,
* it does not 'pool' consumers. Pooling makes sense for connections, sessions and producers, which
* are expensive to create and can remain idle a minimal cost. Consumers, on the other hand, are usually
* just created at startup and left active, handling incoming messages as they come. When a consumer is
* complete, it is best to close it rather than return it to a pool for later reuse: this is because,
* even if a consumer is idle, ActiveMQ will keep delivering messages to the consumer's prefetch buffer,
* where they'll get held until the consumer is active again.
*
* If you are creating a collection of consumers (for example, for multi-threaded message consumption), you
* might want to consider using a lower prefetch value for each consumer (e.g. 10 or 20), to ensure that
* all messages don't end up going to just one of the consumers. See this FAQ entry for more detail:
* http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
*
* Optionally, one may configure the pool to examine and possibly evict objects as they sit idle in the
* pool. This is performed by an "idle object eviction" thread, which runs asynchronously. Caution should
* be used when configuring this optional feature. Eviction runs contend with client threads for access
* to objects in the pool, so if they run too frequently performance issues may result. The idle object
* eviction thread may be configured using the {@link setTimeBetweenExpirationCheckMillis} method. By
* default the value is -1 which means no eviction thread will be run. Set to a non-negative value to
* configure the idle eviction thread to run.
* Add Service and Referenceable and TransportListener to @link{org.apache.activemq.jms.pool.PooledConnectionFactory}
*
* @org.apache.xbean.XBean element="pooledConnectionFactory"
*/
public class PooledConnectionFactory extends JNDIBaseStorable implements ConnectionFactory, Service {
private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
public class PooledConnectionFactory extends org.apache.activemq.jms.pool.PooledConnectionFactory implements JNDIStorableInterface, Service {
public static final String POOL_PROPS_PREFIX = "pool";
private final AtomicBoolean stopped = new AtomicBoolean(false);
private final GenericKeyedObjectPool<ConnectionKey, ConnectionPool> connectionsPool;
private static final transient Logger LOG = LoggerFactory.getLogger(org.apache.activemq.jms.pool.PooledConnectionFactory.class);
private ConnectionFactory connectionFactory;
private int maximumActiveSessionPerConnection = 500;
private int idleTimeout = 30 * 1000;
private boolean blockIfSessionPoolIsFull = true;
private long expiryTimeout = 0l;
private boolean createConnectionOnStartup = true;
/**
* Creates new PooledConnectionFactory with a default ActiveMQConnectionFactory instance.
* <p/>
* The URI used to connect to ActiveMQ comes from the default value of ActiveMQConnectionFactory.
*/
public PooledConnectionFactory() {
this(new ActiveMQConnectionFactory());
super();
}
public PooledConnectionFactory(ActiveMQConnectionFactory activeMQConnectionFactory) {
setConnectionFactory(activeMQConnectionFactory);
}
/**
* Creates a new PooledConnectionFactory that will use the given broker URI to connect to
* ActiveMQ.
*
* @param brokerURL
* The URI to use to configure the internal ActiveMQConnectionFactory.
*/
public PooledConnectionFactory(String brokerURL) {
this(new ActiveMQConnectionFactory(brokerURL));
setConnectionFactory(new ActiveMQConnectionFactory(brokerURL));
}
/**
* Creates a new PooledConnectionFactory that will use the given ActiveMQConnectionFactory to
* create new ActiveMQConnection instances that will be pooled.
*
* @param connectionFactory
* The ActiveMQConnectionFactory to create new Connections for this pool.
*/
public PooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
this.connectionsPool = new GenericKeyedObjectPool<ConnectionKey, ConnectionPool>(
new KeyedPoolableObjectFactory<ConnectionKey, ConnectionPool>() {
@Override
public void activateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
}
@Override
public void destroyObject(ConnectionKey key, ConnectionPool connection) throws Exception {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Destroying connection: {}", connection);
}
connection.close();
} catch (Exception e) {
LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e);
}
}
@Override
public ConnectionPool makeObject(ConnectionKey key) throws Exception {
ActiveMQConnection delegate = createConnection(key);
ConnectionPool connection = createConnectionPool(delegate);
connection.setIdleTimeout(getIdleTimeout());
connection.setExpiryTimeout(getExpiryTimeout());
connection.setMaximumActiveSessionPerConnection(getMaximumActiveSessionPerConnection());
connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull());
if (LOG.isTraceEnabled()) {
LOG.trace("Created new connection: {}", connection);
}
return connection;
}
@Override
public void passivateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
}
@Override
public boolean validateObject(ConnectionKey key, ConnectionPool connection) {
if (connection != null && connection.expiredCheck()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Connection has expired: {} and will be destroyed", connection);
}
return false;
}
return true;
}
});
// Set max idle (not max active) since our connections always idle in the pool.
this.connectionsPool.setMaxIdle(1);
// We always want our validate method to control when idle objects are evicted.
this.connectionsPool.setTestOnBorrow(true);
this.connectionsPool.setTestWhileIdle(true);
}
/**
* @return the currently configured ConnectionFactory used to create the pooled Connections.
*/
public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}
/**
* Sets the ConnectionFactory used to create new pooled Connections.
* <p/>
* Updates to this value do not affect Connections that were previously created and placed
* into the pool. In order to allocate new Connections based off this new ConnectionFactory
* it is first necessary to {@link clear} the pooled Connections.
*
* @param connectionFactory
* The factory to use to create pooled Connections.
*/
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
@Override
public Connection createConnection() throws JMSException {
return createConnection(null, null);
}
@Override
public synchronized Connection createConnection(String userName, String password) throws JMSException {
if (stopped.get()) {
LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
return null;
}
ConnectionPool connection = null;
ConnectionKey key = new ConnectionKey(userName, password);
// This will either return an existing non-expired ConnectionPool or it
// will create a new one to meet the demand.
if (connectionsPool.getNumIdle(key) < getMaxConnections()) {
try {
// we want borrowObject to return the one we added.
connectionsPool.setLifo(true);
connectionsPool.addObject(key);
} catch (Exception e) {
throw JMSExceptionSupport.create("Error while attempting to add new Connection to the pool", e);
}
} else {
// now we want the oldest one in the pool.
connectionsPool.setLifo(false);
}
try {
// We can race against other threads returning the connection when there is an
// expiration or idle timeout. We keep pulling out ConnectionPool instances until
// we win and get a non-closed instance and then increment the reference count
// under lock to prevent another thread from triggering an expiration check and
// pulling the rug out from under us.
while (connection == null) {
connection = connectionsPool.borrowObject(key);
synchronized (connection) {
if (connection.getConnection() != null) {
connection.incrementReferenceCount();
break;
}
// Return the bad one to the pool and let if get destroyed as normal.
connectionsPool.returnObject(key, connection);
connection = null;
}
}
} catch (Exception e) {
throw JMSExceptionSupport.create("Error while attempting to retrieve a connection from the pool", e);
}
try {
connectionsPool.returnObject(key, connection);
} catch (Exception e) {
throw JMSExceptionSupport.create("Error when returning connection to the pool", e);
}
return new PooledConnection(connection);
}
protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException {
if (key.getUserName() == null && key.getPassword() == null) {
return (ActiveMQConnection)connectionFactory.createConnection();
} else {
return (ActiveMQConnection)connectionFactory.createConnection(key.getUserName(), key.getPassword());
}
}
@Override
public void start() {
LOG.debug("Staring the PooledConnectionFactory: create on start = {}", isCreateConnectionOnStartup());
stopped.set(false);
if (isCreateConnectionOnStartup()) {
try {
// warm the pool by creating a connection during startup
createConnection();
} catch (JMSException e) {
LOG.warn("Create pooled connection during start failed. This exception will be ignored.", e);
}
}
}
@Override
public void stop() {
LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}",
connectionsPool.getNumActive());
if (stopped.compareAndSet(false, true)) {
try {
connectionsPool.close();
} catch (Exception e) {
}
}
}
/**
* Clears all connections from the pool. Each connection that is currently in the pool is
* closed and removed from the pool. A new connection will be created on the next call to
* {@link createConnection}. Care should be taken when using this method as Connections that
* are in use be client's will be closed.
*/
public void clear() {
if (stopped.get()) {
return;
}
this.connectionsPool.clear();
}
/**
* Returns the currently configured maximum number of sessions a pooled Connection will
* create before it either blocks or throws an exception when a new session is requested,
* depending on configuration.
*
* @return the number of session instances that can be taken from a pooled connection.
*/
public int getMaximumActiveSessionPerConnection() {
return maximumActiveSessionPerConnection;
}
/**
* Sets the maximum number of active sessions per connection
*
* @param maximumActiveSessionPerConnection
* The maximum number of active session per connection in the pool.
*/
public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection;
}
/**
* Controls the behavior of the internal session pool. By default the call to
* Connection.getSession() will block if the session pool is full. If the
* argument false is given, it will change the default behavior and instead the
* call to getSession() will throw a JMSException.
*
* The size of the session pool is controlled by the @see #maximumActive
* property.
*
* @param block - if true, the call to getSession() blocks if the pool is full
* until a session object is available. defaults to true.
*/
public void setBlockIfSessionPoolIsFull(boolean block) {
this.blockIfSessionPoolIsFull = block;
}
/**
* Returns whether a pooled Connection will enter a blocked state or will throw an Exception
* once the maximum number of sessions has been borrowed from the the Session Pool.
*
* @return true if the pooled Connection createSession method will block when the limit is hit.
* @see setBlockIfSessionPoolIsFull
*/
public boolean isBlockIfSessionPoolIsFull() {
return this.blockIfSessionPoolIsFull;
}
/**
* Returns the maximum number to pooled Connections that this factory will allow before it
* begins to return connections from the pool on calls to ({@link createConnection}.
*
* @return the maxConnections that will be created for this pool.
*/
public int getMaxConnections() {
return connectionsPool.getMaxIdle();
}
/**
* Sets the maximum number of pooled Connections (defaults to one). Each call to
* {@link createConnection} will result in a new Connection being create up to the max
* connections value.
*
* @param maxConnections the maxConnections to set
*/
public void setMaxConnections(int maxConnections) {
this.connectionsPool.setMaxIdle(maxConnections);
}
/**
* Gets the Idle timeout value applied to new Connection's that are created by this pool.
* <p/>
* The idle timeout is used determine if a Connection instance has sat to long in the pool unused
* and if so is closed and removed from the pool. The default value is 30 seconds.
*
* @return idle timeout value (milliseconds)
*/
public int getIdleTimeout() {
return idleTimeout;
}
/**
* Sets the idle timeout value for Connection's that are created by this pool in Milliseconds,
* defaults to 30 seconds.
* <p/>
* For a Connection that is in the pool but has no current users the idle timeout determines how
* long the Connection can live before it is eligible for removal from the pool. Normally the
* connections are tested when an attempt to check one out occurs so a Connection instance can sit
* in the pool much longer than its idle timeout if connections are used infrequently.
*
* @param idleTimeout
* The maximum time a pooled Connection can sit unused before it is eligible for removal.
*/
public void setIdleTimeout(int idleTimeout) {
this.idleTimeout = idleTimeout;
}
/**
* allow connections to expire, irrespective of load or idle time. This is useful with failover
* to force a reconnect from the pool, to reestablish load balancing or use of the master post recovery
*
* @param expiryTimeout non zero in milliseconds
*/
public void setExpiryTimeout(long expiryTimeout) {
this.expiryTimeout = expiryTimeout;
}
/**
* @return the configured expiration timeout for connections in the pool.
*/
public long getExpiryTimeout() {
return expiryTimeout;
}
/**
* @return true if a Connection is created immediately on a call to {@link start}.
*/
public boolean isCreateConnectionOnStartup() {
return createConnectionOnStartup;
}
/**
* Whether to create a connection on starting this {@link PooledConnectionFactory}.
* <p/>
* This can be used to warm-up the pool on startup. Notice that any kind of exception
* happens during startup is logged at WARN level and ignored.
*
* @param createConnectionOnStartup <tt>true</tt> to create a connection on startup
*/
public void setCreateConnectionOnStartup(boolean createConnectionOnStartup) {
this.createConnectionOnStartup = createConnectionOnStartup;
}
/**
* Gets the Pool of ConnectionPool instances which are keyed by different ConnectionKeys.
*
* @return this factories pool of ConnectionPool instances.
*/
KeyedObjectPool<ConnectionKey, ConnectionPool> getConnectionsPool() {
return this.connectionsPool;
}
/**
* Sets the number of milliseconds to sleep between runs of the idle Connection eviction thread.
* When non-positive, no idle object eviction thread will be run, and Connections will only be
* checked on borrow to determine if they have sat idle for too long or have failed for some
* other reason.
* <p/>
* By default this value is set to -1 and no expiration thread ever runs.
*
* @param timeBetweenExpirationCheckMillis
* The time to wait between runs of the idle Connection eviction thread.
*/
public void setTimeBetweenExpirationCheckMillis(long timeBetweenExpirationCheckMillis) {
this.connectionsPool.setTimeBetweenEvictionRunsMillis(timeBetweenExpirationCheckMillis);
}
/**
* @return the number of milliseconds to sleep between runs of the idle connection eviction thread.
*/
public long getTimeBetweenExpirationCheckMillis() {
return this.connectionsPool.getTimeBetweenEvictionRunsMillis();
}
/**
* @return the number of Connections currently in the Pool
*/
public int getNumConnections() {
return this.connectionsPool.getNumIdle();
}
/**
* Delegate that creates each instance of an ConnectionPool object. Subclasses can override
* this method to customize the type of connection pool returned.
*
* @param connection
*
* @return instance of a new ConnectionPool.
*/
protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
return new ConnectionPool(connection);
}
@Override
protected void buildFromProperties(Properties props) {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.buildFromProperties(props);
connectionFactory = activeMQConnectionFactory;
setConnectionFactory(activeMQConnectionFactory);
IntrospectionSupport.setProperties(this, new HashMap(props), POOL_PROPS_PREFIX);
}
protected void populateProperties(Properties props) {
((ActiveMQConnectionFactory)getConnectionFactory()).populateProperties(props);
IntrospectionSupport.getProperties(this, props, POOL_PROPS_PREFIX);
}
@Override
protected void populateProperties(Properties props) {
((ActiveMQConnectionFactory)connectionFactory).populateProperties(props);
public void setProperties(Properties properties) {
buildFromProperties(properties);
}
@Override
public Properties getProperties() {
Properties properties = new Properties();
populateProperties(properties);
return properties;
}
@Override
public Reference getReference() throws NamingException {
return JNDIReferenceFactory.createReference(this.getClass().getName(), this);
}
@Override
protected Connection newPooledConnection(ConnectionPool connection) {
return new PooledConnection(connection);
}
@Override
protected org.apache.activemq.jms.pool.ConnectionPool createConnectionPool(Connection connection) {
return new ConnectionPool(connection) {
@Override
protected Connection wrap(final Connection connection) {
// Add a transport Listener so that we can notice if this connection
// should be expired due to a connection failure.
((ActiveMQConnection)connection).addTransportListener(new TransportListener() {
@Override
public void onCommand(Object command) {
}
@Override
public void onException(IOException error) {
synchronized (this) {
setHasExpired(true);
LOG.info("Expiring connection {} on IOException: {}" , connection, error);
LOG.debug("Expiring connection on IOException", error);
}
}
@Override
public void transportInterupted() {
}
@Override
public void transportResumed() {
}
});
// make sure that we set the hasFailed flag, in case the transport already failed
// prior to the addition of our new TransportListener
setHasExpired(((ActiveMQConnection)connection).isTransportFailed());
// may want to return an amq EnhancedConnection
return connection;
}
@Override
protected void unWrap(Connection connection) {
if (connection != null) {
((ActiveMQConnection)connection).cleanUpTempDestinations();
}
}
};
}
}

View File

@ -16,167 +16,143 @@
*/
package org.apache.activemq.pool;
import java.io.Serializable;
import java.util.Hashtable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Properties;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.naming.Binding;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.Name;
import javax.naming.NamingEnumeration;
import javax.naming.spi.ObjectFactory;
import javax.transaction.TransactionManager;
import javax.jms.Session;
import javax.jms.XAConnection;
import javax.jms.XASession;
import javax.naming.NamingException;
import javax.naming.Reference;
import javax.transaction.xa.XAResource;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.Service;
import org.apache.activemq.jms.pool.PooledSession;
import org.apache.activemq.jms.pool.SessionKey;
import org.apache.activemq.jms.pool.XaConnectionPool;
import org.apache.activemq.jndi.JNDIReferenceFactory;
import org.apache.activemq.jndi.JNDIStorableInterface;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IntrospectionSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A pooled connection factory that automatically enlists
* sessions in the current active XA transaction if any.
*/
public class XaPooledConnectionFactory extends PooledConnectionFactory implements ObjectFactory,
Serializable, QueueConnectionFactory, TopicConnectionFactory {
private static final transient Logger LOG = LoggerFactory.getLogger(XaPooledConnectionFactory.class);
private TransactionManager transactionManager;
private boolean tmFromJndi = false;
private String tmJndiName = "java:/TransactionManager";
private String brokerUrl = null;
* Add Service and Referenceable and TransportListener to @link{org.apache.activemq.jms.pool.XaPooledConnectionFactory}
*
* @org.apache.xbean.XBean element=xaPooledConnectionFactory"
*/
public class XaPooledConnectionFactory extends org.apache.activemq.jms.pool.XaPooledConnectionFactory implements JNDIStorableInterface, Service {
public static final String POOL_PROPS_PREFIX = "pool";
private static final transient Logger LOG = LoggerFactory.getLogger(org.apache.activemq.jms.pool.XaPooledConnectionFactory.class);
public XaPooledConnectionFactory() {
super();
}
public XaPooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
super(connectionFactory);
public XaPooledConnectionFactory(ActiveMQXAConnectionFactory connectionFactory) {
setConnectionFactory(connectionFactory);
}
public XaPooledConnectionFactory(String brokerURL) {
super(brokerURL);
}
@Override
protected org.apache.activemq.jms.pool.ConnectionPool createConnectionPool(Connection connection) {
return new XaConnectionPool(connection, getTransactionManager()) {
public TransactionManager getTransactionManager() {
if (transactionManager == null && tmFromJndi) {
try {
transactionManager = (TransactionManager) new InitialContext().lookup(getTmJndiName());
} catch (Throwable ignored) {
if (LOG.isTraceEnabled()) {
LOG.trace("exception on tmFromJndi: " + getTmJndiName(), ignored);
@Override
protected Session makeSession(SessionKey key) throws JMSException {
if (connection instanceof XAConnection) {
return ((XAConnection)connection).createXASession();
} else {
return connection.createSession(key.isTransacted(), key.getAckMode());
}
}
}
return transactionManager;
}
public void setTransactionManager(TransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
@Override
protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
return new XaConnectionPool(connection, getTransactionManager());
}
@Override
public Object getObjectInstance(Object obj, Name name, Context nameCtx, Hashtable<?, ?> environment) throws Exception {
setTmFromJndi(true);
configFromJndiConf(obj);
if (environment != null) {
IntrospectionSupport.setProperties(this, environment);
}
return this;
}
private void configFromJndiConf(Object rootContextName) {
if (rootContextName instanceof String) {
String name = (String) rootContextName;
name = name.substring(0, name.lastIndexOf('/')) + "/conf" + name.substring(name.lastIndexOf('/'));
try {
InitialContext ctx = new InitialContext();
NamingEnumeration bindings = ctx.listBindings(name);
while (bindings.hasMore()) {
Binding bd = (Binding)bindings.next();
IntrospectionSupport.setProperty(this, bd.getName(), bd.getObject());
}
} catch (Exception ignored) {
if (LOG.isTraceEnabled()) {
LOG.trace("exception on config from jndi: " + name, ignored);
@Override
protected XAResource createXaResource(PooledSession session) throws JMSException {
if (session.getInternalSession() instanceof XASession) {
return ((XASession)session.getInternalSession()).getXAResource();
} else {
return ((ActiveMQSession)session.getInternalSession()).getTransactionContext();
}
}
}
@Override
protected Connection wrap(final Connection connection) {
// Add a transport Listener so that we can notice if this connection
// should be expired due to a connection failure.
((ActiveMQConnection)connection).addTransportListener(new TransportListener() {
@Override
public void onCommand(Object command) {
}
@Override
public void onException(IOException error) {
synchronized (this) {
setHasExpired(true);
LOG.info("Expiring connection " + connection + " on IOException: " + error);
LOG.debug("Expiring connection on IOException", error);
}
}
@Override
public void transportInterupted() {
}
@Override
public void transportResumed() {
}
});
// make sure that we set the hasFailed flag, in case the transport already failed
// prior to the addition of our new TransportListener
setHasExpired(((ActiveMQConnection) connection).isTransportFailed());
// may want to return an amq EnhancedConnection
return connection;
}
@Override
protected void unWrap(Connection connection) {
if (connection != null) {
((ActiveMQConnection)connection).cleanUpTempDestinations();
}
}
};
}
public void setBrokerUrl(String url) {
if (brokerUrl == null || !brokerUrl.equals(url)) {
brokerUrl = url;
setConnectionFactory(new ActiveMQConnectionFactory(brokerUrl));
}
}
public String getTmJndiName() {
return tmJndiName;
}
public void setTmJndiName(String tmJndiName) {
this.tmJndiName = tmJndiName;
}
public boolean isTmFromJndi() {
return tmFromJndi;
}
/**
* Allow transaction manager resolution from JNDI (ee deployment)
* @param tmFromJndi
*/
public void setTmFromJndi(boolean tmFromJndi) {
this.tmFromJndi = tmFromJndi;
}
@Override
public QueueConnection createQueueConnection() throws JMSException {
return (QueueConnection) createConnection();
}
@Override
public QueueConnection createQueueConnection(String userName, String password) throws JMSException {
return (QueueConnection) createConnection(userName, password);
}
@Override
public TopicConnection createTopicConnection() throws JMSException {
return (TopicConnection) createConnection();
}
@Override
public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
return (TopicConnection) createConnection(userName, password);
}
@Override
protected void buildFromProperties(Properties props) {
super.buildFromProperties(props);
for (String v : new String[]{"tmFromJndi", "tmJndiName"}) {
if (props.containsKey(v)) {
IntrospectionSupport.setProperty(this, v, props.getProperty(v));
}
}
ActiveMQConnectionFactory activeMQConnectionFactory = props.containsKey("xaAckMode") ?
new ActiveMQXAConnectionFactory() : new ActiveMQConnectionFactory();
activeMQConnectionFactory.buildFromProperties(props);
setConnectionFactory(activeMQConnectionFactory);
IntrospectionSupport.setProperties(this, new HashMap(props), POOL_PROPS_PREFIX);
}
protected void populateProperties(Properties props) {
((ActiveMQConnectionFactory)getConnectionFactory()).populateProperties(props);
IntrospectionSupport.getProperties(this, props, POOL_PROPS_PREFIX);
}
@Override
protected void populateProperties(Properties props) {
super.populateProperties(props);
props.setProperty("tmFromJndi", String.valueOf(isTmFromJndi()));
props.setProperty("tmJndiName", tmJndiName);
public void setProperties(Properties properties) {
buildFromProperties(properties);
}
@Override
public Properties getProperties() {
Properties properties = new Properties();
populateProperties(properties);
return properties;
}
@Override
public Reference getReference() throws NamingException {
return JNDIReferenceFactory.createReference(this.getClass().getName(), this);
}
}

View File

@ -21,15 +21,21 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.EnhancedConnection;
import org.apache.activemq.advisory.DestinationSource;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.jms.pool.PooledConnection;
import org.apache.activemq.test.TestSupport;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.transport.mock.MockTransport;
@ -39,22 +45,44 @@ import org.slf4j.LoggerFactory;
public class ConnectionFailureEvictsFromPoolTest extends TestSupport {
private static final Logger LOG = LoggerFactory.getLogger(ConnectionFailureEvictsFromPoolTest.class);
private BrokerService broker;
private ActiveMQConnectionFactory factory;
private PooledConnectionFactory pooledFactory;
TransportConnector connector;
protected void setUp() throws Exception {
broker = new BrokerService();
broker.setUseJmx(false);
broker.setPersistent(false);
TransportConnector connector = broker.addConnector("tcp://localhost:0");
connector = broker.addConnector("tcp://localhost:0");
broker.start();
factory = new ActiveMQConnectionFactory("mock:" + connector.getConnectUri() + "?closeAsync=false");
pooledFactory = new PooledConnectionFactory(factory);
}
public void testEnhancedConnection() throws Exception {
XaPooledConnectionFactory pooledFactory =
new XaPooledConnectionFactory(new ActiveMQXAConnectionFactory("mock:" + connector.getConnectUri() + "?closeAsync=false"));
PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
EnhancedConnection enhancedConnection = (EnhancedConnection)connection.getConnection();
DestinationSource destinationSource = enhancedConnection.getDestinationSource();
assertNotNull(destinationSource);
}
public void testEvictionXA() throws Exception {
XaPooledConnectionFactory pooledFactory =
new XaPooledConnectionFactory(new ActiveMQXAConnectionFactory("mock:" + connector.getConnectUri() + "?closeAsync=false"));
doTestEviction(pooledFactory);
}
public void testEviction() throws Exception {
PooledConnectionFactory pooledFactory =
new PooledConnectionFactory(new ActiveMQConnectionFactory("mock:" + connector.getConnectUri() + "?closeAsync=false"));
doTestEviction(pooledFactory);
}
public void doTestEviction(ConnectionFactory pooledFactory) throws Exception {
PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
ActiveMQConnection amqC = connection.getConnection();
ActiveMQConnection amqC = (ActiveMQConnection) connection.getConnection();
final CountDownLatch gotExceptionEvent = new CountDownLatch(1);
amqC.addTransportListener(new TransportListener() {
public void onCommand(Object command) {
@ -75,12 +103,12 @@ public class ConnectionFailureEvictsFromPoolTest extends TestSupport {
createConnectionFailure(connection);
try {
sendMessage(connection);
fail("Expected Error");
TestCase.fail("Expected Error");
} catch (JMSException e) {
} finally {
connection.close();
}
assertTrue("exception event propagated ok", gotExceptionEvent.await(5, TimeUnit.SECONDS));
TestCase.assertTrue("exception event propagated ok", gotExceptionEvent.await(5, TimeUnit.SECONDS));
// If we get another connection now it should be a new connection that
// works.
LOG.info("expect new connection after failure");
@ -89,7 +117,7 @@ public class ConnectionFailureEvictsFromPoolTest extends TestSupport {
}
private void createConnectionFailure(Connection connection) throws Exception {
ActiveMQConnection c = ((PooledConnection)connection).getConnection();
ActiveMQConnection c = (ActiveMQConnection) ((PooledConnection)connection).getConnection();
MockTransport t = (MockTransport)c.getTransportChannel().narrow(MockTransport.class);
t.onException(new IOException("forcing exception for " + getName() + " to force pool eviction"));
LOG.info("arranged for failure, chucked exception");

View File

@ -18,7 +18,6 @@ package org.apache.activemq.pool;
import java.util.Hashtable;
import java.util.Vector;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
@ -40,11 +39,11 @@ import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.ActiveMQXASession;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.jms.pool.PooledSession;
import org.apache.activemq.test.TestSupport;
public class XAConnectionPoolTest extends TestSupport {
@ -54,7 +53,7 @@ public class XAConnectionPoolTest extends TestSupport {
final Vector<Synchronization> syncs = new Vector<Synchronization>();
ActiveMQTopic topic = new ActiveMQTopic("test");
XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false"));
pcf.setConnectionFactory(new ActiveMQConnectionFactory("vm://test?broker.persistent=false"));
// simple TM that is in a tx and will track syncs
pcf.setTransactionManager(new TransactionManager(){
@ -135,8 +134,8 @@ public class XAConnectionPoolTest extends TestSupport {
TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
assertTrue(session instanceof PooledSession);
PooledSession pooledSession = (PooledSession) session;
assertTrue(pooledSession.getInternalSession() instanceof ActiveMQXASession);
// PooledSession pooledSession = (PooledSession) session;
// assertTrue(pooledSession.getInternalSession() instanceof ActiveMQXASession);
TopicPublisher publisher = session.createPublisher(topic);
publisher.publish(session.createMessage());
@ -155,7 +154,7 @@ public class XAConnectionPoolTest extends TestSupport {
final Vector<Synchronization> syncs = new Vector<Synchronization>();
ActiveMQTopic topic = new ActiveMQTopic("test");
XaPooledConnectionFactory pcf = new XaPooledConnectionFactory();
pcf.setConnectionFactory(new ActiveMQConnectionFactory("vm://test?broker.persistent=false"));
pcf.setConnectionFactory(new ActiveMQXAConnectionFactory("vm://test?broker.persistent=false&jms.xaAckMode=" + Session.CLIENT_ACKNOWLEDGE));
// simple TM that is in a tx and will track syncs
pcf.setTransactionManager(new TransactionManager(){

View File

@ -20,7 +20,7 @@ import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.jms.ConnectionFactory;
import javax.transaction.TransactionManager;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.FactoryBean;
@ -38,7 +38,7 @@ import org.springframework.beans.factory.FactoryBean;
* </bean>
* </pre>
*
* The <code>resourceName</code> property should be used along with the {@link ActiveMQResourceManager} and have
* The <code>resourceName</code> property should be used along with the {@link org.apache.activemq.jms.pool.GenericResourceManager} and have
* the same value than its <code>resourceName</code> property. This will make sure the transaction manager
* maps correctly the connection factory to the recovery process.
*

View File

@ -39,7 +39,6 @@ import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
//import org.apache.activemq.pool.PooledConnectionFactory;
public class AMQ2754Test extends TestCase {

View File

@ -150,6 +150,14 @@
<groupId>${project.groupId}</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-jms-pool</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-pool</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>activemq-web</artifactId>

View File

@ -177,6 +177,7 @@
<include>${pom.groupId}:activemq-jdbc-store</include>
<include>${pom.groupId}:activemq-leveldb-store</include>
<include>${pom.groupId}:activemq-log4j-appender</include>
<include>${pom.groupId}:activemq-jms-pool</include>
<include>${pom.groupId}:activemq-pool</include>
<include>${pom.groupId}:activeio-core</include>
<include>commons-beanutils:commons-beanutils</include>

View File

@ -241,6 +241,7 @@
<module>activemq-jaas</module>
<module>activemq-karaf</module>
<module>activemq-openwire-generator</module>
<module>activemq-jms-pool</module>
<module>activemq-pool</module>
<module>activemq-ra</module>
<module>activemq-rar</module>
@ -364,6 +365,11 @@
<artifactId>activemq-jaas</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jms-pool</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>