mirror of https://github.com/apache/activemq.git
patched the pooled JMS provider to make it easier to configure the size and implementation of the underlying session pool
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@399702 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3fc0546936
commit
2fd7ccbdf5
|
@ -20,6 +20,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnection;
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
import org.apache.activemq.util.JMSExceptionSupport;
|
import org.apache.activemq.util.JMSExceptionSupport;
|
||||||
|
import org.apache.commons.pool.ObjectPoolFactory;
|
||||||
|
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
|
@ -37,14 +38,16 @@ public class ConnectionPool {
|
||||||
private ActiveMQConnection connection;
|
private ActiveMQConnection connection;
|
||||||
private Map cache;
|
private Map cache;
|
||||||
private AtomicBoolean started = new AtomicBoolean(false);
|
private AtomicBoolean started = new AtomicBoolean(false);
|
||||||
|
private ObjectPoolFactory poolFactory;
|
||||||
|
|
||||||
public ConnectionPool(ActiveMQConnection connection) {
|
public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) {
|
||||||
this(connection, new HashMap());
|
this(connection, new HashMap(), poolFactory);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ConnectionPool(ActiveMQConnection connection, Map cache) {
|
public ConnectionPool(ActiveMQConnection connection, Map cache, ObjectPoolFactory poolFactory) {
|
||||||
this.connection = connection;
|
this.connection = connection;
|
||||||
this.cache = cache;
|
this.cache = cache;
|
||||||
|
this.poolFactory = poolFactory;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void start() throws JMSException {
|
public void start() throws JMSException {
|
||||||
|
@ -61,7 +64,7 @@ public class ConnectionPool {
|
||||||
SessionKey key = new SessionKey(transacted, ackMode);
|
SessionKey key = new SessionKey(transacted, ackMode);
|
||||||
SessionPool pool = (SessionPool) cache.get(key);
|
SessionPool pool = (SessionPool) cache.get(key);
|
||||||
if (pool == null) {
|
if (pool == null) {
|
||||||
pool = new SessionPool(this, key);
|
pool = new SessionPool(this, key, poolFactory.createPool());
|
||||||
cache.put(key, pool);
|
cache.put(key, pool);
|
||||||
}
|
}
|
||||||
return pool.borrowSession();
|
return pool.borrowSession();
|
||||||
|
|
|
@ -21,6 +21,9 @@ import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.Service;
|
import org.apache.activemq.Service;
|
||||||
import org.apache.activemq.util.IOExceptionSupport;
|
import org.apache.activemq.util.IOExceptionSupport;
|
||||||
import org.apache.activemq.util.ServiceStopper;
|
import org.apache.activemq.util.ServiceStopper;
|
||||||
|
import org.apache.commons.pool.ObjectPoolFactory;
|
||||||
|
import org.apache.commons.pool.impl.GenericObjectPoolFactory;
|
||||||
|
import org.apache.commons.pool.impl.GenericObjectPool.Config;
|
||||||
|
|
||||||
import javax.jms.Connection;
|
import javax.jms.Connection;
|
||||||
import javax.jms.ConnectionFactory;
|
import javax.jms.ConnectionFactory;
|
||||||
|
@ -36,16 +39,17 @@ import java.util.Map;
|
||||||
* href="http://activemq.org/Spring+Support">JmsTemplate</a>.
|
* href="http://activemq.org/Spring+Support">JmsTemplate</a>.
|
||||||
*
|
*
|
||||||
* <b>NOTE</b> this implementation is only intended for use when sending
|
* <b>NOTE</b> this implementation is only intended for use when sending
|
||||||
* messages.
|
* messages. It does not deal with pooling of consumers; for that look at a
|
||||||
* It does not deal with pooling of consumers; for that look at a library like
|
* library like <a href="http://jencks.org/">Jencks</a> such as in <a
|
||||||
* <a href="http://jencks.org/">Jencks</a> such as in
|
* href="http://jencks.org/Message+Driven+POJOs">this example</a>
|
||||||
* <a href="http://jencks.org/Message+Driven+POJOs">this example</a>
|
|
||||||
*
|
*
|
||||||
* @version $Revision: 1.1 $
|
* @version $Revision: 1.1 $
|
||||||
*/
|
*/
|
||||||
public class PooledConnectionFactory implements ConnectionFactory, Service {
|
public class PooledConnectionFactory implements ConnectionFactory, Service {
|
||||||
private ActiveMQConnectionFactory connectionFactory;
|
private ActiveMQConnectionFactory connectionFactory;
|
||||||
private Map cache = new HashMap();
|
private Map cache = new HashMap();
|
||||||
|
private ObjectPoolFactory poolFactory;
|
||||||
|
private int maximumActive = 5000;
|
||||||
|
|
||||||
public PooledConnectionFactory() {
|
public PooledConnectionFactory() {
|
||||||
this(new ActiveMQConnectionFactory());
|
this(new ActiveMQConnectionFactory());
|
||||||
|
@ -76,7 +80,7 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
|
||||||
ConnectionPool connection = (ConnectionPool) cache.get(key);
|
ConnectionPool connection = (ConnectionPool) cache.get(key);
|
||||||
if (connection == null) {
|
if (connection == null) {
|
||||||
ActiveMQConnection delegate = createConnection(key);
|
ActiveMQConnection delegate = createConnection(key);
|
||||||
connection = new ConnectionPool(delegate);
|
connection = new ConnectionPool(delegate, getPoolFactory());
|
||||||
cache.put(key, connection);
|
cache.put(key, connection);
|
||||||
}
|
}
|
||||||
return new PooledConnection(connection);
|
return new PooledConnection(connection);
|
||||||
|
@ -116,4 +120,34 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
|
||||||
}
|
}
|
||||||
stopper.throwFirstException();
|
stopper.throwFirstException();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ObjectPoolFactory getPoolFactory() {
|
||||||
|
if (poolFactory == null) {
|
||||||
|
poolFactory = createPoolFactory();
|
||||||
|
}
|
||||||
|
return poolFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the object pool factory used to create individual session pools for
|
||||||
|
* each connection
|
||||||
|
*/
|
||||||
|
public void setPoolFactory(ObjectPoolFactory poolFactory) {
|
||||||
|
this.poolFactory = poolFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaximumActive() {
|
||||||
|
return maximumActive;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the maximum number of active sessions per connection
|
||||||
|
*/
|
||||||
|
public void setMaximumActive(int maximumActive) {
|
||||||
|
this.maximumActive = maximumActive;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ObjectPoolFactory createPoolFactory() {
|
||||||
|
return new GenericObjectPoolFactory(null, maximumActive);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,10 +36,6 @@ public class SessionPool implements PoolableObjectFactory {
|
||||||
private SessionKey key;
|
private SessionKey key;
|
||||||
private ObjectPool sessionPool;
|
private ObjectPool sessionPool;
|
||||||
|
|
||||||
public SessionPool(ConnectionPool connectionPool, SessionKey key) {
|
|
||||||
this(connectionPool, key, new GenericObjectPool());
|
|
||||||
}
|
|
||||||
|
|
||||||
public SessionPool(ConnectionPool connectionPool, SessionKey key, ObjectPool sessionPool) {
|
public SessionPool(ConnectionPool connectionPool, SessionKey key, ObjectPool sessionPool) {
|
||||||
this.connectionPool = connectionPool;
|
this.connectionPool = connectionPool;
|
||||||
this.key = key;
|
this.key = key;
|
||||||
|
|
Loading…
Reference in New Issue