diff --git a/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java b/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java index 020ef7a769..f9ff84c28f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java +++ b/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java @@ -20,6 +20,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.util.JMSExceptionSupport; +import org.apache.commons.pool.ObjectPoolFactory; import javax.jms.JMSException; import javax.jms.Session; @@ -37,14 +38,16 @@ public class ConnectionPool { private ActiveMQConnection connection; private Map cache; private AtomicBoolean started = new AtomicBoolean(false); + private ObjectPoolFactory poolFactory; - public ConnectionPool(ActiveMQConnection connection) { - this(connection, new HashMap()); + public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) { + this(connection, new HashMap(), poolFactory); } - public ConnectionPool(ActiveMQConnection connection, Map cache) { + public ConnectionPool(ActiveMQConnection connection, Map cache, ObjectPoolFactory poolFactory) { this.connection = connection; this.cache = cache; + this.poolFactory = poolFactory; } public void start() throws JMSException { @@ -61,7 +64,7 @@ public class ConnectionPool { SessionKey key = new SessionKey(transacted, ackMode); SessionPool pool = (SessionPool) cache.get(key); if (pool == null) { - pool = new SessionPool(this, key); + pool = new SessionPool(this, key, poolFactory.createPool()); cache.put(key, pool); } return pool.borrowSession(); diff --git a/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java index 8f197e107e..9bc083162f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java @@ -21,6 +21,9 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.Service; import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.ServiceStopper; +import org.apache.commons.pool.ObjectPoolFactory; +import org.apache.commons.pool.impl.GenericObjectPoolFactory; +import org.apache.commons.pool.impl.GenericObjectPool.Config; import javax.jms.Connection; import javax.jms.ConnectionFactory; @@ -36,16 +39,17 @@ import java.util.Map; * href="http://activemq.org/Spring+Support">JmsTemplate. * * NOTE this implementation is only intended for use when sending - * messages. - * It does not deal with pooling of consumers; for that look at a library like - * Jencks such as in - * this example + * messages. It does not deal with pooling of consumers; for that look at a + * library like Jencks such as in this example * * @version $Revision: 1.1 $ */ public class PooledConnectionFactory implements ConnectionFactory, Service { private ActiveMQConnectionFactory connectionFactory; private Map cache = new HashMap(); + private ObjectPoolFactory poolFactory; + private int maximumActive = 5000; public PooledConnectionFactory() { this(new ActiveMQConnectionFactory()); @@ -76,7 +80,7 @@ public class PooledConnectionFactory implements ConnectionFactory, Service { ConnectionPool connection = (ConnectionPool) cache.get(key); if (connection == null) { ActiveMQConnection delegate = createConnection(key); - connection = new ConnectionPool(delegate); + connection = new ConnectionPool(delegate, getPoolFactory()); cache.put(key, connection); } return new PooledConnection(connection); @@ -116,4 +120,34 @@ public class PooledConnectionFactory implements ConnectionFactory, Service { } stopper.throwFirstException(); } + + public ObjectPoolFactory getPoolFactory() { + if (poolFactory == null) { + poolFactory = createPoolFactory(); + } + return poolFactory; + } + + /** + * Sets the object pool factory used to create individual session pools for + * each connection + */ + public void setPoolFactory(ObjectPoolFactory poolFactory) { + this.poolFactory = poolFactory; + } + + public int getMaximumActive() { + return maximumActive; + } + + /** + * Sets the maximum number of active sessions per connection + */ + public void setMaximumActive(int maximumActive) { + this.maximumActive = maximumActive; + } + + protected ObjectPoolFactory createPoolFactory() { + return new GenericObjectPoolFactory(null, maximumActive); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/pool/SessionPool.java b/activemq-core/src/main/java/org/apache/activemq/pool/SessionPool.java index 189693b5ef..f66a909130 100644 --- a/activemq-core/src/main/java/org/apache/activemq/pool/SessionPool.java +++ b/activemq-core/src/main/java/org/apache/activemq/pool/SessionPool.java @@ -36,10 +36,6 @@ public class SessionPool implements PoolableObjectFactory { private SessionKey key; private ObjectPool sessionPool; - public SessionPool(ConnectionPool connectionPool, SessionKey key) { - this(connectionPool, key, new GenericObjectPool()); - } - public SessionPool(ConnectionPool connectionPool, SessionKey key, ObjectPool sessionPool) { this.connectionPool = connectionPool; this.key = key;