Refactoring the pools code to use commons-pool for more of the work and allowing us to
expose more stats info as well as enabling async connection pool cleanup of expired 
connections if so desired.  Added some additional tests.  We should continue to add some
more tests and perhaps add some more functionality to the Session pool in ConnectionPool
to let the commons-pool clean up Idle sessions after a time or to maintain the certain 
number of idle sessions etc.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1383746 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-09-12 03:44:48 +00:00
parent 215dd18196
commit a59c9baaf5
19 changed files with 876 additions and 425 deletions

View File

@ -17,12 +17,13 @@
package org.apache.activemq.pool;
import java.util.Properties;
import javax.naming.NamingException;
import javax.naming.Reference;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.jndi.JNDIReferenceFactory;
import org.apache.activemq.jndi.JNDIStorableInterface;
import org.apache.activemq.pool.PooledConnectionFactory;
/**
* AmqJNDIPooledConnectionFactory.java
@ -35,7 +36,7 @@ public class AmqJNDIPooledConnectionFactory extends PooledConnectionFactory
public AmqJNDIPooledConnectionFactory() {
super();
}
public AmqJNDIPooledConnectionFactory(String brokerURL) {
super(brokerURL);
}
@ -46,7 +47,7 @@ public class AmqJNDIPooledConnectionFactory extends PooledConnectionFactory
/**
* set the properties for this instance as retrieved from JNDI
*
*
* @param props
*/
public synchronized void setProperties(Properties props) {
@ -56,7 +57,7 @@ public class AmqJNDIPooledConnectionFactory extends PooledConnectionFactory
/**
* Get the properties from this instance for storing in JNDI
*
*
* @return the properties
*/
public synchronized Properties getProperties() {
@ -69,7 +70,7 @@ public class AmqJNDIPooledConnectionFactory extends PooledConnectionFactory
/**
* Retrive a Reference for this instance to store in JNDI
*
*
* @return the built Reference
* @throws NamingException
* if error on building Reference
@ -87,7 +88,7 @@ public class AmqJNDIPooledConnectionFactory extends PooledConnectionFactory
.buildFromProperties(properties);
String temp = properties.getProperty("maximumActive");
if (temp != null && temp.length() > 0) {
setMaximumActive(Integer.parseInt(temp));
setMaximumActiveSessionPerConnection(Integer.parseInt(temp));
}
temp = properties.getProperty("maximumActiveSessionPerConnection");
if (temp != null && temp.length() > 0) {
@ -103,7 +104,7 @@ public class AmqJNDIPooledConnectionFactory extends PooledConnectionFactory
((ActiveMQConnectionFactory) getConnectionFactory())
.populateProperties(props);
props.setProperty("maximumActive", Integer
.toString(getMaximumActive()));
.toString(getMaximumActiveSessionPerConnection()));
props.setProperty("maximumActiveSessionPerConnection", Integer
.toString(getMaximumActiveSessionPerConnection()));
props.setProperty("maxConnections", Integer

View File

@ -18,12 +18,11 @@ package org.apache.activemq.pool;
/**
* A cache key for the connection details
*
*
*/
public class ConnectionKey {
private String userName;
private String password;
private final String userName;
private final String password;
private int hash;
public ConnectionKey(String userName, String password) {

View File

@ -18,31 +18,33 @@
package org.apache.activemq.pool;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.transport.TransportListener;
import org.apache.commons.pool.ObjectPoolFactory;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool;
/**
* Holds a real JMS connection along with the session pools associated with it.
*
*
* <p/>
* Instances of this class are shared amongst one or more PooledConnection object and must
* track the session objects that are loaned out for cleanup on close as well as ensuring
* that the temporary destinations of the managed Connection are purged when all references
* to this ConnectionPool are released.
*/
public class ConnectionPool {
private ActiveMQConnection connection;
private ConcurrentHashMap<SessionKey, SessionPool> cache;
private List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
private AtomicBoolean started = new AtomicBoolean(false);
private int referenceCount;
private ObjectPoolFactory poolFactory;
private long lastUsed = System.currentTimeMillis();
private long firstUsed = lastUsed;
private boolean hasFailed;
@ -50,8 +52,14 @@ public class ConnectionPool {
private int idleTimeout = 30 * 1000;
private long expiryTimeout = 0l;
public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) {
this(connection, new ConcurrentHashMap<SessionKey, SessionPool>(), poolFactory);
private final AtomicBoolean started = new AtomicBoolean(false);
private final GenericKeyedObjectPool<SessionKey, PooledSession> sessionPool;
private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
public ConnectionPool(ActiveMQConnection connection) {
this.connection = connection;
// Add a transport Listener so that we can notice if this connection
// should be expired due to a connection failure.
connection.addTransportListener(new TransportListener() {
@ -76,12 +84,40 @@ public class ConnectionPool {
if(connection.isTransportFailed()) {
hasFailed = true;
}
}
public ConnectionPool(ActiveMQConnection connection, ConcurrentHashMap<SessionKey, SessionPool> cache, ObjectPoolFactory poolFactory) {
this.connection = connection;
this.cache = cache;
this.poolFactory = poolFactory;
// Create our internal Pool of session instances.
this.sessionPool = new GenericKeyedObjectPool<SessionKey, PooledSession>(
new KeyedPoolableObjectFactory<SessionKey, PooledSession>() {
@Override
public void activateObject(SessionKey key, PooledSession session) throws Exception {
ConnectionPool.this.loanedSessions.add(session);
}
@Override
public void destroyObject(SessionKey key, PooledSession session) throws Exception {
ConnectionPool.this.loanedSessions.remove(session);
session.getInternalSession().close();
}
@Override
public PooledSession makeObject(SessionKey key) throws Exception {
ActiveMQSession session = (ActiveMQSession)
ConnectionPool.this.connection.createSession(key.isTransacted(), key.getAckMode());
return new PooledSession(key, session, sessionPool);
}
@Override
public void passivateObject(SessionKey key, PooledSession session) throws Exception {
ConnectionPool.this.loanedSessions.remove(session);
}
@Override
public boolean validateObject(SessionKey key, PooledSession session) {
return true;
}
}
);
}
public void start() throws JMSException {
@ -101,39 +137,20 @@ public class ConnectionPool {
public Session createSession(boolean transacted, int ackMode) throws JMSException {
SessionKey key = new SessionKey(transacted, ackMode);
SessionPool pool = null;
pool = cache.get(key);
if (pool == null) {
SessionPool newPool = createSessionPool(key);
SessionPool prevPool = cache.putIfAbsent(key, newPool);
if (prevPool != null && prevPool != newPool) {
// newPool was not the first one to be associated with this
// key... close created session pool
try {
newPool.close();
} catch (Exception e) {
throw new JMSException(e.getMessage());
}
}
pool = cache.get(key); // this will return a non-null value...
PooledSession session;
try {
session = sessionPool.borrowObject(key);
} catch (Exception e) {
throw JMSExceptionSupport.create(e);
}
PooledSession session = pool.borrowSession();
this.loanedSessions.add(session);
return session;
}
public synchronized void close() {
if (connection != null) {
try {
Iterator<SessionPool> i = cache.values().iterator();
while (i.hasNext()) {
SessionPool pool = i.next();
i.remove();
try {
pool.close();
} catch (Exception e) {
}
}
sessionPool.close();
} catch (Exception e) {
} finally {
try {
connection.close();
@ -156,6 +173,10 @@ public class ConnectionPool {
if (referenceCount == 0) {
expiredCheck();
// Loaned sessions are those that are active in the sessionPool and
// have not been closed by the client before closing the connection.
// These need to be closed so that all session's reflect the fact
// that the parent Connection is closed.
for (PooledSession session : this.loanedSessions) {
try {
session.close();
@ -164,8 +185,8 @@ public class ConnectionPool {
}
this.loanedSessions.clear();
// only clean up temp destinations when all users
// of this connection have called close
// We only clean up temporary destinations when all users of this
// connection have called close.
if (getConnection() != null) {
getConnection().cleanUpTempDestinations();
}
@ -173,21 +194,30 @@ public class ConnectionPool {
}
/**
* Determines if this Connection has expired.
* <p/>
* A ConnectionPool is considered expired when all references to it are released AND either
* the configured idleTimeout has elapsed OR the configured expiryTimeout has elapsed.
* Once a ConnectionPool is determined to have expired its underlying Connection is closed.
*
* @return true if this connection has expired.
*/
public synchronized boolean expiredCheck() {
if (connection == null) {
return true;
}
if (hasExpired) {
if (referenceCount == 0) {
close();
}
return true;
}
if (hasFailed
|| (idleTimeout > 0 && System.currentTimeMillis() > lastUsed + idleTimeout)
|| expiryTimeout > 0 && System.currentTimeMillis() > firstUsed + expiryTimeout) {
hasExpired = true;
if (referenceCount == 0) {
close();
@ -205,24 +235,59 @@ public class ConnectionPool {
this.idleTimeout = idleTimeout;
}
protected SessionPool createSessionPool(SessionKey key) {
return new SessionPool(this, key, poolFactory.createPool());
}
public void setExpiryTimeout(long expiryTimeout) {
this.expiryTimeout = expiryTimeout;
this.expiryTimeout = expiryTimeout;
}
public long getExpiryTimeout() {
return expiryTimeout;
}
void onSessionReturned(PooledSession session) {
this.loanedSessions.remove(session);
public int getMaximumActiveSessionPerConnection() {
return this.sessionPool.getMaxActive();
}
void onSessionInvalidated(PooledSession session) {
this.loanedSessions.remove(session);
public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
this.sessionPool.setMaxActive(maximumActiveSessionPerConnection);
}
/**
* @return the total number of Pooled session including idle sessions that are not
* currently loaned out to any client.
*/
public int getNumSessions() {
return this.sessionPool.getNumIdle() + this.sessionPool.getNumActive();
}
/**
* @return the total number of Sessions that are in the Session pool but not loaned out.
*/
public int getNumIdleSessions() {
return this.sessionPool.getNumIdle();
}
/**
* @return the total number of Session's that have been loaned to PooledConnection instances.
*/
public int getNumActiveSessions() {
return this.sessionPool.getNumActive();
}
/**
* Configure whether the createSession method should block when there are no more idle sessions and the
* pool already contains the maximum number of active sessions. If false the create method will fail
* and throw an exception.
*
* @param block
* Indicates whether blocking should be used to wait for more space to create a session.
*/
public void setBlockIfSessionPoolIsFull(boolean block) {
this.sessionPool.setWhenExhaustedAction(
(block ? GenericObjectPool.WHEN_EXHAUSTED_BLOCK : GenericObjectPool.WHEN_EXHAUSTED_FAIL));
}
public boolean isBlockIfSessionPoolIsFull() {
return this.sessionPool.getWhenExhaustedAction() == GenericObjectPool.WHEN_EXHAUSTED_BLOCK;
}
@Override

View File

@ -28,11 +28,18 @@ public class JcaConnectionPool extends XaConnectionPool {
private String name;
public JcaConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager, String name) {
super(connection, poolFactory, transactionManager);
public JcaConnectionPool(ActiveMQConnection connection, TransactionManager transactionManager, String name) {
super(connection, transactionManager);
this.name = name;
}
/**
* @deprecated
*/
public JcaConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager, String name) {
this(connection, transactionManager, name);
}
protected XAResource createXaResource(PooledSession session) throws JMSException {
XAResource xares = new LocalAndXATransaction(session.getInternalSession().getTransactionContext());
if (name != null) {

View File

@ -19,7 +19,7 @@ import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
*
*
*/
public class JcaPooledConnectionFactory extends XaPooledConnectionFactory {
@ -46,7 +46,6 @@ public class JcaPooledConnectionFactory extends XaPooledConnectionFactory {
}
protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
return new JcaConnectionPool(connection, getPoolFactory(), getTransactionManager(), getName());
return new JcaConnectionPool(connection, getTransactionManager(), getName());
}
}

View File

@ -47,7 +47,7 @@ import org.slf4j.LoggerFactory;
/**
* Represents a proxy {@link Connection} which is-a {@link TopicConnection} and
* {@link QueueConnection} which is pooled and on {@link #close()} will return
* itself to the sessionPool.
* its reference to the ConnectionPool backing it.
*
* <b>NOTE</b> this implementation is only intended for use when sending
* messages. It does not deal with pooling of consumers; for that look at a
@ -63,6 +63,14 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
private final List<TemporaryQueue> connTempQueues = new CopyOnWriteArrayList<TemporaryQueue>();
private final List<TemporaryTopic> connTempTopics = new CopyOnWriteArrayList<TemporaryTopic>();
/**
* Creates a new PooledConnection instance that uses the given ConnectionPool to create
* and manage its resources. The ConnectionPool instance can be shared amongst many
* PooledConnection instances.
*
* @param pool
* The connection and pool manager backing this proxy connection object.
*/
public PooledConnection(ConnectionPool pool) {
this.pool = pool;
this.pool.incrementReferenceCount();
@ -75,6 +83,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
return new PooledConnection(pool);
}
@Override
public void close() throws JMSException {
this.cleanupConnectionTemporaryDestinations();
if (this.pool != null) {
@ -83,45 +92,55 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
}
}
@Override
public void start() throws JMSException {
assertNotClosed();
pool.start();
}
@Override
public void stop() throws JMSException {
stopped = true;
}
@Override
public ConnectionConsumer createConnectionConsumer(Destination destination, String selector, ServerSessionPool serverSessionPool, int maxMessages)
throws JMSException {
return getConnection().createConnectionConsumer(destination, selector, serverSessionPool, maxMessages);
}
@Override
public ConnectionConsumer createConnectionConsumer(Topic topic, String s, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
return getConnection().createConnectionConsumer(topic, s, serverSessionPool, maxMessages);
}
@Override
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String selector, String s1, ServerSessionPool serverSessionPool, int i)
throws JMSException {
return getConnection().createDurableConnectionConsumer(topic, selector, s1, serverSessionPool, i);
}
@Override
public String getClientID() throws JMSException {
return getConnection().getClientID();
}
@Override
public ExceptionListener getExceptionListener() throws JMSException {
return getConnection().getExceptionListener();
}
@Override
public ConnectionMetaData getMetaData() throws JMSException {
return getConnection().getMetaData();
}
@Override
public void setExceptionListener(ExceptionListener exceptionListener) throws JMSException {
getConnection().setExceptionListener(exceptionListener);
}
@Override
public void setClientID(String clientID) throws JMSException {
// ignore repeated calls to setClientID() with the same client id
@ -132,20 +151,24 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
}
}
@Override
public ConnectionConsumer createConnectionConsumer(Queue queue, String selector, ServerSessionPool serverSessionPool, int maxMessages) throws JMSException {
return getConnection().createConnectionConsumer(queue, selector, serverSessionPool, maxMessages);
}
// Session factory methods
// -------------------------------------------------------------------------
@Override
public QueueSession createQueueSession(boolean transacted, int ackMode) throws JMSException {
return (QueueSession) createSession(transacted, ackMode);
}
@Override
public TopicSession createTopicSession(boolean transacted, int ackMode) throws JMSException {
return (TopicSession) createSession(transacted, ackMode);
}
@Override
public Session createSession(boolean transacted, int ackMode) throws JMSException {
PooledSession result;
result = (PooledSession) pool.createSession(transacted, ackMode);
@ -156,6 +179,16 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
return (Session) result;
}
// EnhancedCollection API
// -------------------------------------------------------------------------
@Override
public DestinationSource getDestinationSource() throws JMSException {
return getConnection().getDestinationSource();
}
// Implementation methods
// -------------------------------------------------------------------------
public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
connTempQueues.add(tempQueue);
@ -165,16 +198,6 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
connTempTopics.add(tempTopic);
}
// EnhancedCollection API
// -------------------------------------------------------------------------
public DestinationSource getDestinationSource() throws JMSException {
return getConnection().getDestinationSource();
}
// Implementation methods
// -------------------------------------------------------------------------
public ActiveMQConnection getConnection() throws JMSException {
assertNotClosed();
return pool.getConnection();
@ -222,4 +245,26 @@ public class PooledConnection implements TopicConnection, QueueConnection, Enhan
}
connTempTopics.clear();
}
/**
* @return the total number of Pooled session including idle sessions that are not
* currently loaned out to any client.
*/
public int getNumSessions() {
return this.pool.getNumSessions();
}
/**
* @return the number of Sessions that are currently checked out of this Connection's session pool.
*/
public int getNumActiveSessions() {
return this.pool.getNumActiveSessions();
}
/**
* @return the number of Sessions that are idle in this Connection's sessions pool.
*/
public int getNumtIdleSessions() {
return this.pool.getNumIdleSessions();
}
}

View File

@ -16,28 +16,27 @@
*/
package org.apache.activemq.pool;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.Service;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.commons.pool.KeyedObjectPool;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.ObjectPoolFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.pool.ObjectPoolFactory;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.apache.commons.pool.impl.GenericObjectPoolFactory;
/**
* A JMS provider which pools Connection, Session and MessageProducer instances
* so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's <a
* href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
* so it can be used with tools like <a href="http://camel.apache.org/activemq.html">Camel</a> and Spring's
* <a href="http://activemq.apache.org/spring-support.html">JmsTemplate and MessagListenerContainer</a>.
* Connections, sessions and producers are returned to a pool after use so that they can be reused later
* without having to undergo the cost of creating them again.
*
@ -54,92 +53,193 @@ import org.apache.commons.pool.impl.GenericObjectPoolFactory;
* all messages don't end up going to just one of the consumers. See this FAQ entry for more detail:
* http://activemq.apache.org/i-do-not-receive-messages-in-my-second-consumer.html
*
* Optionally, one may configure the pool to examine and possibly evict objects as they sit idle in the
* pool. This is performed by an "idle object eviction" thread, which runs asynchronously. Caution should
* be used when configuring this optional feature. Eviction runs contend with client threads for access
* to objects in the pool, so if they run too frequently performance issues may result. The idle object
* eviction thread may be configured using the {@link setTimeBetweenExpirationCheckMillis} method. By
* default the value is -1 which means no eviction thread will be run. Set to a non-negative value to
* configure the idle eviction thread to run.
*
* @org.apache.xbean.XBean element="pooledConnectionFactory"
*
*
*/
public class PooledConnectionFactory implements ConnectionFactory, Service {
private static final transient Logger LOG = LoggerFactory.getLogger(PooledConnectionFactory.class);
private final AtomicBoolean stopped = new AtomicBoolean(false);
private final GenericKeyedObjectPool<ConnectionKey, ConnectionPool> connectionsPool;
private ConnectionFactory connectionFactory;
private final Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>();
private ObjectPoolFactory poolFactory;
private int maximumActiveSessionPerConnection = 500;
private int maxConnections = 1;
private int idleTimeout = 30 * 1000;
private boolean blockIfSessionPoolIsFull = true;
private final AtomicBoolean stopped = new AtomicBoolean(false);
private long expiryTimeout = 0l;
private boolean createConnectionOnStartup = true;
/**
* Creates new PooledConnectionFactory with a default ActiveMQConnectionFactory instance.
* <p/>
* The URI used to connect to ActiveMQ comes from the default value of ActiveMQConnectionFactory.
*/
public PooledConnectionFactory() {
this(new ActiveMQConnectionFactory());
}
/**
* Creates a new PooledConnectionFactory that will use the given broker URI to connect to
* ActiveMQ.
*
* @param brokerURL
* The URI to use to configure the internal ActiveMQConnectionFactory.
*/
public PooledConnectionFactory(String brokerURL) {
this(new ActiveMQConnectionFactory(brokerURL));
}
/**
* Creates a new PooledConnectionFactory that will use the given ActiveMQConnectionFactory to
* create new ActiveMQConnection instances that will be pooled.
*
* @param connectionFactory
* The ActiveMQConnectionFactory to create new Connections for this pool.
*/
public PooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
this.connectionsPool = new GenericKeyedObjectPool<ConnectionKey, ConnectionPool>(
new KeyedPoolableObjectFactory<ConnectionKey, ConnectionPool>() {
@Override
public void activateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
}
@Override
public void destroyObject(ConnectionKey key, ConnectionPool connection) throws Exception {
try {
if (LOG.isTraceEnabled()) {
LOG.trace("Destroying connection: {}", connection);
}
connection.close();
} catch (Exception e) {
LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e);
}
}
@Override
public ConnectionPool makeObject(ConnectionKey key) throws Exception {
ActiveMQConnection delegate = createConnection(key);
ConnectionPool connection = new ConnectionPool(delegate);
connection.setIdleTimeout(getIdleTimeout());
connection.setExpiryTimeout(getExpiryTimeout());
connection.setMaximumActiveSessionPerConnection(getMaximumActiveSessionPerConnection());
connection.setBlockIfSessionPoolIsFull(isBlockIfSessionPoolIsFull());
if (LOG.isTraceEnabled()) {
LOG.trace("Created new connection: {}", connection);
}
return connection;
}
@Override
public void passivateObject(ConnectionKey key, ConnectionPool connection) throws Exception {
}
@Override
public boolean validateObject(ConnectionKey key, ConnectionPool connection) {
if (connection != null && connection.expiredCheck()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Connection has expired: {} and will be destroyed", connection);
}
return false;
}
return true;
}
});
// Set max idle (not max active) since our connections always idle in the pool.
this.connectionsPool.setMaxIdle(1);
// We always want our validate method to control when idle objects are evicted.
this.connectionsPool.setTestOnBorrow(true);
this.connectionsPool.setTestWhileIdle(true);
}
/**
* @return the currently configured ConnectionFactory used to create the pooled Connections.
*/
public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}
/**
* Sets the ConnectionFactory used to create new pooled Connections.
* <p/>
* Updates to this value do not affect Connections that were previously created and placed
* into the pool. In order to allocate new Connections based off this new ConnectionFactory
* it is first necessary to {@link clear} the pooled Connections.
*
* @param connectionFactory
* The factory to use to create pooled Connections.
*/
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
@Override
public Connection createConnection() throws JMSException {
return createConnection(null, null);
}
@Override
public synchronized Connection createConnection(String userName, String password) throws JMSException {
if (stopped.get()) {
LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
return null;
}
ConnectionKey key = new ConnectionKey(userName, password);
LinkedList<ConnectionPool> pools = cache.get(key);
if (pools == null) {
pools = new LinkedList<ConnectionPool>();
cache.put(key, pools);
}
ConnectionPool connection = null;
if (pools.size() == maxConnections) {
connection = pools.removeFirst();
}
ConnectionKey key = new ConnectionKey(userName, password);
// Now.. we might get a connection, but it might be that we need to
// dump it..
if (connection != null && connection.expiredCheck()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Connection has expired: {}", connection);
// This will either return an existing non-expired ConnectionPool or it
// will create a new one to meet the demand.
if (connectionsPool.getNumIdle(key) < getMaxConnections()) {
try {
// we want borrowObject to return the one we added.
connectionsPool.setLifo(true);
connectionsPool.addObject(key);
} catch (Exception e) {
throw JMSExceptionSupport.create("Error while attempting to add new Connection to the pool", e);
}
connection = null;
} else {
// now we want the oldest one in the pool.
connectionsPool.setLifo(false);
}
if (connection == null) {
ActiveMQConnection delegate = createConnection(key);
connection = createConnectionPool(delegate);
if (LOG.isTraceEnabled()) {
LOG.trace("Created new connection: {}", connection);
}
try {
connection = connectionsPool.borrowObject(key);
} catch (Exception e) {
throw JMSExceptionSupport.create("Error while attempting to retrieve a connection from the pool", e);
}
pools.add(connection);
try {
connectionsPool.returnObject(key, connection);
} catch (Exception e) {
throw JMSExceptionSupport.create("Error when returning connection to the pool", e);
}
return new PooledConnection(connection);
}
protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
ConnectionPool result = new ConnectionPool(connection, getPoolFactory());
result.setIdleTimeout(getIdleTimeout());
result.setExpiryTimeout(getExpiryTimeout());
return result;
/**
* @deprecated
*/
public ObjectPoolFactory<?> getPoolFactory() {
return null;
}
protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException {
@ -150,8 +250,9 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
}
}
@Override
public void start() {
LOG.debug("Staring the PooledConnectionFactory");
LOG.debug("Staring the PooledConnectionFactory: create on start = {}", isCreateConnectionOnStartup());
stopped.set(false);
if (isCreateConnectionOnStartup()) {
try {
@ -163,34 +264,32 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
}
}
@Override
public void stop() {
LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}", cache.size());
stopped.set(true);
for (Iterator<LinkedList<ConnectionPool>> iter = cache.values().iterator(); iter.hasNext();) {
for (ConnectionPool connection : iter.next()) {
try {
connection.close();
} catch (Exception e) {
LOG.warn("Close connection failed for connection: " + connection + ". This exception will be ignored.",e);
}
LOG.debug("Stopping the PooledConnectionFactory, number of connections in cache: {}",
connectionsPool.getNumActive());
if (stopped.compareAndSet(false, true)) {
try {
connectionsPool.close();
} catch (Exception e) {
}
}
cache.clear();
}
public ObjectPoolFactory getPoolFactory() {
if (poolFactory == null) {
poolFactory = createPoolFactory();
}
return poolFactory;
}
/**
* Sets the object pool factory used to create individual session pools for
* each connection
* Clears all connections from the pool. Each connection that is currently in the pool is
* closed and removed from the pool. A new connection will be created on the next call to
* {@link createConnection}. Care should be taken when using this method as Connections that
* are in use be client's will be closed.
*/
public void setPoolFactory(ObjectPoolFactory poolFactory) {
this.poolFactory = poolFactory;
public void clear() {
if (stopped.get()) {
return;
}
this.connectionsPool.clear();
}
/**
@ -209,12 +308,22 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
setMaximumActiveSessionPerConnection(maximumActive);
}
/**
* Returns the currently configured maximum number of sessions a pooled Connection will
* create before it either blocks or throws an exception when a new session is requested,
* depending on configuration.
*
* @return the number of session instances that can be taken from a pooled connection.
*/
public int getMaximumActiveSessionPerConnection() {
return maximumActiveSessionPerConnection;
}
/**
* Sets the maximum number of active sessions per connection
*
* @param maximumActiveSessionPerConnection
* The maximum number of active session per connection in the pool.
*/
public void setMaximumActiveSessionPerConnection(int maximumActiveSessionPerConnection) {
this.maximumActiveSessionPerConnection = maximumActiveSessionPerConnection;
@ -237,40 +346,61 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
}
/**
* @return the maxConnections
* Returns whether a pooled Connection will enter a blocked state or will throw an Exception
* once the maximum number of sessions has been borrowed from the the Session Pool.
*
* @return true if the pooled Connection createSession method will block when the limit is hit.
* @see setBlockIfSessionPoolIsFull
*/
public int getMaxConnections() {
return maxConnections;
public boolean isBlockIfSessionPoolIsFull() {
return this.blockIfSessionPoolIsFull;
}
/**
* Returns the maximum number to pooled Connections that this factory will allow before it
* begins to return connections from the pool on calls to ({@link createConnection}.
*
* @return the maxConnections that will be created for this pool.
*/
public int getMaxConnections() {
return connectionsPool.getMaxIdle();
}
/**
* Sets the maximum number of pooled Connections (defaults to one). Each call to
* {@link createConnection} will result in a new Connection being create up to the max
* connections value.
*
* @param maxConnections the maxConnections to set
*/
public void setMaxConnections(int maxConnections) {
this.maxConnections = maxConnections;
this.connectionsPool.setMaxIdle(maxConnections);
}
/**
* Creates an ObjectPoolFactory. Its behavior is controlled by the two
* properties @see #maximumActive and @see #blockIfSessionPoolIsFull.
* Gets the Idle timeout value applied to new Connection's that are created by this pool.
* <p/>
* The idle timeout is used determine if a Connection instance has sat to long in the pool unused
* and if so is closed and removed from the pool. The default value is 30 seconds.
*
* @return the newly created but empty ObjectPoolFactory
* @return
*/
protected ObjectPoolFactory createPoolFactory() {
if (blockIfSessionPoolIsFull) {
return new GenericObjectPoolFactory(null, maximumActiveSessionPerConnection);
} else {
return new GenericObjectPoolFactory(null,
maximumActiveSessionPerConnection,
GenericObjectPool.WHEN_EXHAUSTED_FAIL,
GenericObjectPool.DEFAULT_MAX_WAIT);
}
}
public int getIdleTimeout() {
return idleTimeout;
}
/**
* Sets the idle timeout value for Connection's that are created by this pool, defaults to 30 seconds.
* <p/>
* For a Connection that is in the pool but has no current users the idle timeout determines how
* long the Connection can live before it is eligible for removal from the pool. Normally the
* connections are tested when an attempt to check one out occurs so a Connection instance can sit
* in the pool much longer than its idle timeout if connections are used infrequently.
*
*
* @param idleTimeout
* The maximum time a pooled Connection can sit unused before it is eligible for removal.
*/
public void setIdleTimeout(int idleTimeout) {
this.idleTimeout = idleTimeout;
}
@ -285,10 +415,16 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
this.expiryTimeout = expiryTimeout;
}
/**
* @return the configured expiration timeout for connections in the pool.
*/
public long getExpiryTimeout() {
return expiryTimeout;
}
/**
* @return true if a Connection is created immediately on a call to {@link start}.
*/
public boolean isCreateConnectionOnStartup() {
return createConnectionOnStartup;
}
@ -296,7 +432,7 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
/**
* Whether to create a connection on starting this {@link PooledConnectionFactory}.
* <p/>
* This can be used to warmup the pool on startup. Notice that any kind of exception
* This can be used to warm-up the pool on startup. Notice that any kind of exception
* happens during startup is logged at WARN level and ignored.
*
* @param createConnectionOnStartup <tt>true</tt> to create a connection on startup
@ -304,4 +440,48 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
public void setCreateConnectionOnStartup(boolean createConnectionOnStartup) {
this.createConnectionOnStartup = createConnectionOnStartup;
}
/**
* Gets the Pool of ConnectionPool instances which are keyed by different ConnectionKeys.
*
* @return this factories pool of ConnectionPool instances.
*/
KeyedObjectPool<ConnectionKey, ConnectionPool> getConnectionsPool() {
return this.connectionsPool;
}
/**
* Sets the number of milliseconds to sleep between runs of the idle Connection eviction thread.
* When non-positive, no idle object eviction thread will be run, and Connections will only be
* checked on borrow to determine if they have sat idle for too long or have failed for some
* other reason.
* <p/>
* By default this value is set to -1 and no expiration thread ever runs.
*
* @param timeBetweenExpirationCheckMillis
* The time to wait between runs of the idle Connection eviction thread.
*/
public void setTimeBetweenExpirationCheckMillis(long timeBetweenExpirationCheckMillis) {
this.connectionsPool.setTimeBetweenEvictionRunsMillis(timeBetweenExpirationCheckMillis);
}
/**
* @return the number of milliseconds to sleep between runs of the idle connection eviction thread.
*/
public long setTimeBetweenExpirationCheckMillis() {
return this.connectionsPool.getTimeBetweenEvictionRunsMillis();
}
/**
* @return the number of Connections currently in the Pool
*/
public int getNumConnections() {
return this.connectionsPool.getNumIdle();
}
/**
* @deprecated
*/
public void setPoolFactory(ObjectPoolFactory<?> factory) {
}
}

View File

@ -79,6 +79,6 @@ public class PooledMessageConsumer implements MessageConsumer {
@Override
public String toString() {
return delegate.toString();
return "PooledMessageConsumer { " + delegate + " }";
}
}

View File

@ -25,12 +25,12 @@ import org.apache.activemq.ActiveMQMessageProducer;
/**
* A pooled {@link MessageProducer}
*
*
*/
public class PooledProducer implements MessageProducer {
private ActiveMQMessageProducer messageProducer;
private Destination destination;
private final ActiveMQMessageProducer messageProducer;
private final Destination destination;
private int deliveryMode;
private boolean disableMessageID;
private boolean disableMessageTimestamp;
@ -48,21 +48,26 @@ public class PooledProducer implements MessageProducer {
this.timeToLive = messageProducer.getTimeToLive();
}
@Override
public void close() throws JMSException {
}
@Override
public void send(Destination destination, Message message) throws JMSException {
send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive());
}
@Override
public void send(Message message) throws JMSException {
send(destination, message, getDeliveryMode(), getPriority(), getTimeToLive());
}
@Override
public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
send(destination, message, deliveryMode, priority, timeToLive);
}
@Override
public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
if (destination == null) {
destination = this.destination;
@ -75,46 +80,57 @@ public class PooledProducer implements MessageProducer {
}
}
@Override
public Destination getDestination() {
return destination;
}
@Override
public int getDeliveryMode() {
return deliveryMode;
}
@Override
public void setDeliveryMode(int deliveryMode) {
this.deliveryMode = deliveryMode;
}
@Override
public boolean getDisableMessageID() {
return disableMessageID;
}
@Override
public void setDisableMessageID(boolean disableMessageID) {
this.disableMessageID = disableMessageID;
}
@Override
public boolean getDisableMessageTimestamp() {
return disableMessageTimestamp;
}
@Override
public void setDisableMessageTimestamp(boolean disableMessageTimestamp) {
this.disableMessageTimestamp = disableMessageTimestamp;
}
@Override
public int getPriority() {
return priority;
}
@Override
public void setPriority(int priority) {
this.priority = priority;
}
@Override
public long getTimeToLive() {
return timeToLive;
}
@Override
public void setTimeToLive(long timeToLive) {
this.timeToLive = timeToLive;
}
@ -125,8 +141,8 @@ public class PooledProducer implements MessageProducer {
return messageProducer;
}
@Override
public String toString() {
return "PooledProducer { " + messageProducer + " }";
}
}

View File

@ -25,7 +25,7 @@ import javax.jms.QueueSender;
import org.apache.activemq.ActiveMQQueueSender;
/**
*
* {@link QueueSender} instance that is created and managed by the PooledConnection.
*/
public class PooledQueueSender extends PooledProducer implements QueueSender {
@ -33,21 +33,22 @@ public class PooledQueueSender extends PooledProducer implements QueueSender {
super(messageProducer, destination);
}
@Override
public void send(Queue queue, Message message, int i, int i1, long l) throws JMSException {
getQueueSender().send(queue, message, i, i1, l);
}
@Override
public void send(Queue queue, Message message) throws JMSException {
getQueueSender().send(queue, message);
}
@Override
public Queue getQueue() throws JMSException {
return getQueueSender().getQueue();
}
protected ActiveMQQueueSender getQueueSender() {
return (ActiveMQQueueSender) getMessageProducer();
}
}

View File

@ -51,28 +51,32 @@ import org.apache.activemq.ActiveMQQueueSender;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.ActiveMQTopicPublisher;
import org.apache.activemq.AlreadyClosedException;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.commons.pool.KeyedObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PooledSession implements Session, TopicSession, QueueSession, XASession {
private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
private final SessionKey key;
private final KeyedObjectPool<SessionKey, PooledSession> sessionPool;
private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
private final CopyOnWriteArrayList<PooledSessionEventListener> tempDestEventListeners =
new CopyOnWriteArrayList<PooledSessionEventListener>();
private ActiveMQSession session;
private SessionPool sessionPool;
private ActiveMQMessageProducer messageProducer;
private ActiveMQQueueSender queueSender;
private ActiveMQTopicPublisher topicPublisher;
private boolean transactional = true;
private boolean ignoreClose;
private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
private final CopyOnWriteArrayList<PooledSessionEventListener> tempDestEventListeners =
new CopyOnWriteArrayList<PooledSessionEventListener>();
private boolean isXa;
public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) {
this.session = aSession;
public PooledSession(SessionKey key, ActiveMQSession session, KeyedObjectPool<SessionKey, PooledSession> sessionPool) {
this.key = key;
this.session = session;
this.sessionPool = sessionPool;
this.transactional = session.isTransacted();
}
@ -92,10 +96,9 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
this.ignoreClose = ignoreClose;
}
@Override
public void close() throws JMSException {
if (!ignoreClose) {
// TODO a cleaner way to reset??
boolean invalidate = false;
try {
// lets reset the session
@ -130,8 +133,8 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
}
if (invalidate) {
// lets close the session and not put the session back into
// the pool
// lets close the session and not put the session back into the pool
// instead invalidate it so the pool can create a new one on demand.
if (session != null) {
try {
session.close();
@ -140,45 +143,62 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
}
session = null;
}
sessionPool.invalidateSession(this);
try {
sessionPool.invalidateObject(key, this);
} catch (Exception e) {
throw JMSExceptionSupport.create(e);
}
} else {
sessionPool.returnSession(this);
try {
sessionPool.returnObject(key, this);
} catch (Exception e) {
throw JMSExceptionSupport.create(e);
}
}
}
}
@Override
public void commit() throws JMSException {
getInternalSession().commit();
}
@Override
public BytesMessage createBytesMessage() throws JMSException {
return getInternalSession().createBytesMessage();
}
@Override
public MapMessage createMapMessage() throws JMSException {
return getInternalSession().createMapMessage();
}
@Override
public Message createMessage() throws JMSException {
return getInternalSession().createMessage();
}
@Override
public ObjectMessage createObjectMessage() throws JMSException {
return getInternalSession().createObjectMessage();
}
@Override
public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
return getInternalSession().createObjectMessage(serializable);
}
@Override
public Queue createQueue(String s) throws JMSException {
return getInternalSession().createQueue(s);
}
@Override
public StreamMessage createStreamMessage() throws JMSException {
return getInternalSession().createStreamMessage();
}
@Override
public TemporaryQueue createTemporaryQueue() throws JMSException {
TemporaryQueue result;
@ -192,6 +212,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
return result;
}
@Override
public TemporaryTopic createTemporaryTopic() throws JMSException {
TemporaryTopic result;
@ -205,38 +226,47 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
return result;
}
@Override
public void unsubscribe(String s) throws JMSException {
getInternalSession().unsubscribe(s);
}
@Override
public TextMessage createTextMessage() throws JMSException {
return getInternalSession().createTextMessage();
}
@Override
public TextMessage createTextMessage(String s) throws JMSException {
return getInternalSession().createTextMessage(s);
}
@Override
public Topic createTopic(String s) throws JMSException {
return getInternalSession().createTopic(s);
}
@Override
public int getAcknowledgeMode() throws JMSException {
return getInternalSession().getAcknowledgeMode();
}
@Override
public boolean getTransacted() throws JMSException {
return getInternalSession().getTransacted();
}
@Override
public void recover() throws JMSException {
getInternalSession().recover();
}
@Override
public void rollback() throws JMSException {
getInternalSession().rollback();
}
@Override
public XAResource getXAResource() {
if (session == null) {
throw new IllegalStateException("Session is closed");
@ -244,10 +274,12 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
return session.getTransactionContext();
}
@Override
public Session getSession() {
return this;
}
@Override
public void run() {
if (session != null) {
session.run();
@ -256,68 +288,84 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
// Consumer related methods
// -------------------------------------------------------------------------
@Override
public QueueBrowser createBrowser(Queue queue) throws JMSException {
return addQueueBrowser(getInternalSession().createBrowser(queue));
}
@Override
public QueueBrowser createBrowser(Queue queue, String selector) throws JMSException {
return addQueueBrowser(getInternalSession().createBrowser(queue, selector));
}
@Override
public MessageConsumer createConsumer(Destination destination) throws JMSException {
return addConsumer(getInternalSession().createConsumer(destination));
}
@Override
public MessageConsumer createConsumer(Destination destination, String selector) throws JMSException {
return addConsumer(getInternalSession().createConsumer(destination, selector));
}
@Override
public MessageConsumer createConsumer(Destination destination, String selector, boolean noLocal) throws JMSException {
return addConsumer(getInternalSession().createConsumer(destination, selector, noLocal));
}
@Override
public TopicSubscriber createDurableSubscriber(Topic topic, String selector) throws JMSException {
return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, selector));
}
@Override
public TopicSubscriber createDurableSubscriber(Topic topic, String name, String selector, boolean noLocal) throws JMSException {
return addTopicSubscriber(getInternalSession().createDurableSubscriber(topic, name, selector, noLocal));
}
@Override
public MessageListener getMessageListener() throws JMSException {
return getInternalSession().getMessageListener();
}
@Override
public void setMessageListener(MessageListener messageListener) throws JMSException {
getInternalSession().setMessageListener(messageListener);
}
@Override
public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
return addTopicSubscriber(getInternalSession().createSubscriber(topic));
}
@Override
public TopicSubscriber createSubscriber(Topic topic, String selector, boolean local) throws JMSException {
return addTopicSubscriber(getInternalSession().createSubscriber(topic, selector, local));
}
@Override
public QueueReceiver createReceiver(Queue queue) throws JMSException {
return addQueueReceiver(getInternalSession().createReceiver(queue));
}
@Override
public QueueReceiver createReceiver(Queue queue, String selector) throws JMSException {
return addQueueReceiver(getInternalSession().createReceiver(queue, selector));
}
// Producer related methods
// -------------------------------------------------------------------------
@Override
public MessageProducer createProducer(Destination destination) throws JMSException {
return new PooledProducer(getMessageProducer(), destination);
}
@Override
public QueueSender createSender(Queue queue) throws JMSException {
return new PooledQueueSender(getQueueSender(), queue);
}
@Override
public TopicPublisher createPublisher(Topic topic) throws JMSException {
return new PooledTopicPublisher(getTopicPublisher(), topic);
}
@ -371,11 +419,9 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
private MessageConsumer addConsumer(MessageConsumer consumer) {
consumers.add(consumer);
// must wrap in PooledMessageConsumer to ensure the onConsumerClose
// method is invoked
// when the returned consumer is closed, to avoid memory leak in this
// session class
// in case many consumers is created
// must wrap in PooledMessageConsumer to ensure the onConsumerClose method is
// invoked when the returned consumer is closed, to avoid memory leak in this
// session class in case many consumers is created
return new PooledMessageConsumer(this, consumer);
}
@ -393,6 +439,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
this.isXa = isXa;
}
@Override
public String toString() {
return "PooledSession { " + session + " }";
}

View File

@ -25,7 +25,7 @@ import javax.jms.TopicPublisher;
import org.apache.activemq.ActiveMQTopicPublisher;
/**
*
* A {@link TopicPublisher} instance that is created and managed by a PooledConnection.
*/
public class PooledTopicPublisher extends PooledProducer implements TopicPublisher {
@ -33,22 +33,27 @@ public class PooledTopicPublisher extends PooledProducer implements TopicPublish
super(messageProducer, destination);
}
@Override
public Topic getTopic() throws JMSException {
return getTopicPublisher().getTopic();
}
@Override
public void publish(Message message) throws JMSException {
getTopicPublisher().publish((Topic) getDestination(), message);
}
@Override
public void publish(Message message, int i, int i1, long l) throws JMSException {
getTopicPublisher().publish((Topic) getDestination(), message, i, i1, l);
}
@Override
public void publish(Topic topic, Message message) throws JMSException {
getTopicPublisher().publish(topic, message);
}
@Override
public void publish(Topic topic, Message message, int i, int i1, long l) throws JMSException {
getTopicPublisher().publish(topic, message, i, i1, l);
}

View File

@ -17,13 +17,13 @@
package org.apache.activemq.pool;
/**
* A cache key for the session details
*
*
* A cache key for the session details used to locate PooledSession intances.
*/
public class SessionKey {
private boolean transacted;
private int ackMode;
private final boolean transacted;
private final int ackMode;
private int hash;
public SessionKey(boolean transacted, int ackMode) {

View File

@ -1,121 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
import javax.jms.JMSException;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.AlreadyClosedException;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.PoolableObjectFactory;
/**
* Represents the session pool for a given JMS connection.
*
*
*/
public class SessionPool implements PoolableObjectFactory {
private ConnectionPool connectionPool;
private SessionKey key;
private ObjectPool sessionPool;
public SessionPool(ConnectionPool connectionPool, SessionKey key, ObjectPool sessionPool) {
this.connectionPool = connectionPool;
this.key = key;
this.sessionPool = sessionPool;
sessionPool.setFactory(this);
}
public void close() throws Exception {
if (sessionPool != null) {
sessionPool.close();
}
sessionPool = null;
}
public PooledSession borrowSession() throws JMSException {
try {
Object object = getSessionPool().borrowObject();
return (PooledSession)object;
} catch (JMSException e) {
throw e;
} catch (Exception e) {
throw JMSExceptionSupport.create(e);
}
}
public void returnSession(PooledSession session) throws JMSException {
// lets check if we are already closed
getConnection();
try {
connectionPool.onSessionReturned(session);
getSessionPool().returnObject(session);
} catch (Exception e) {
throw JMSExceptionSupport.create("Failed to return session to pool: " + e, e);
}
}
public void invalidateSession(PooledSession session) throws JMSException {
try {
connectionPool.onSessionInvalidated(session);
getSessionPool().invalidateObject(session);
} catch (Exception e) {
throw JMSExceptionSupport.create("Failed to invalidate session: " + e, e);
}
}
// PoolableObjectFactory methods
// -------------------------------------------------------------------------
public Object makeObject() throws Exception {
return new PooledSession(createSession(), this);
}
public void destroyObject(Object o) throws Exception {
PooledSession session = (PooledSession)o;
session.getInternalSession().close();
}
public boolean validateObject(Object o) {
return true;
}
public void activateObject(Object o) throws Exception {
}
public void passivateObject(Object o) throws Exception {
}
// Implemention methods
// -------------------------------------------------------------------------
protected ObjectPool getSessionPool() throws AlreadyClosedException {
if (sessionPool == null) {
throw new AlreadyClosedException();
}
return sessionPool;
}
protected ActiveMQConnection getConnection() throws JMSException {
return connectionPool.getConnection();
}
protected ActiveMQSession createSession() throws JMSException {
return (ActiveMQSession)getConnection().createSession(key.isTransacted(), key.getAckMode());
}
}

View File

@ -16,9 +16,6 @@
*/
package org.apache.activemq.pool;
import org.apache.activemq.ActiveMQConnection;
import org.apache.commons.pool.ObjectPoolFactory;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.transaction.RollbackException;
@ -27,21 +24,32 @@ import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import org.apache.activemq.ActiveMQConnection;
import org.apache.commons.pool.ObjectPoolFactory;
/**
* An XA-aware connection pool. When a session is created and an xa transaction is active,
* the session will automatically be enlisted in the current transaction.
*
*
* @author gnodet
*/
public class XaConnectionPool extends ConnectionPool {
private TransactionManager transactionManager;
public XaConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager) {
super(connection, poolFactory);
public XaConnectionPool(ActiveMQConnection connection, TransactionManager transactionManager) {
super(connection);
this.transactionManager = transactionManager;
}
/**
* @deprecated
*/
public XaConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager) {
this(connection, transactionManager);
}
@Override
public Session createSession(boolean transacted, int ackMode) throws JMSException {
try {
boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION);
@ -74,8 +82,7 @@ public class XaConnectionPool extends ConnectionPool {
protected XAResource createXaResource(PooledSession session) throws JMSException {
return session.getXAResource();
}
protected class Synchronization implements javax.transaction.Synchronization {
private final PooledSession session;
@ -85,7 +92,7 @@ public class XaConnectionPool extends ConnectionPool {
public void beforeCompletion() {
}
public void afterCompletion(int status) {
try {
// This will return session to the pool.
@ -99,5 +106,4 @@ public class XaConnectionPool extends ConnectionPool {
}
}
}
}

View File

@ -28,7 +28,7 @@ import org.apache.activemq.ActiveMQConnectionFactory;
public class XaPooledConnectionFactory extends PooledConnectionFactory {
private TransactionManager transactionManager;
public XaPooledConnectionFactory() {
super();
}
@ -50,7 +50,6 @@ public class XaPooledConnectionFactory extends PooledConnectionFactory {
}
protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
return new XaConnectionPool(connection, getPoolFactory(), getTransactionManager());
return new XaConnectionPool(connection, getTransactionManager());
}
}

View File

@ -16,60 +16,188 @@
*/
package org.apache.activemq.pool;
import junit.framework.Assert;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import junit.framework.Assert;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.Wait;
import org.apache.log4j.Logger;
/**
* Checks the behavior of the PooledConnectionFactory when the maximum amount
* of sessions is being reached.
* Older versions simply block in the call to Connection.getSession(), which isn't good.
* An exception being returned is the better option, so JMS clients don't block.
* This test succeeds if an exception is returned and fails if the call to getSession()
* blocks.
* Checks the behavior of the PooledConnectionFactory when the maximum amount of
* sessions is being reached.
*
* Older versions simply block in the call to Connection.getSession(), which
* isn't good. An exception being returned is the better option, so JMS clients
* don't block. This test succeeds if an exception is returned and fails if the
* call to getSession() blocks.
*/
public class PooledConnectionFactoryTest extends TestCase
{
public final static Logger LOG = Logger.getLogger(PooledConnectionFactoryTest.class);
public class PooledConnectionFactoryTest extends TestCase {
public final static Logger LOG = Logger.getLogger(PooledConnectionFactoryTest.class);
/**
* Create the test case
*
* @param testName name of the test case
* @param testName
* name of the test case
*/
public PooledConnectionFactoryTest( String testName )
{
super( testName );
public PooledConnectionFactoryTest(String testName) {
super(testName);
}
/**
* @return the suite of tests being tested
*/
public static Test suite()
{
return new TestSuite( PooledConnectionFactoryTest.class );
public static Test suite() {
return new TestSuite(PooledConnectionFactoryTest.class);
}
public void testClearAllConnections() throws Exception {
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
PooledConnectionFactory cf = new PooledConnectionFactory(amq);
cf.setMaxConnections(3);
PooledConnection conn1 = (PooledConnection) cf.createConnection();
PooledConnection conn2 = (PooledConnection) cf.createConnection();
PooledConnection conn3 = (PooledConnection) cf.createConnection();
assertNotSame(conn1.getConnection(), conn2.getConnection());
assertNotSame(conn1.getConnection(), conn3.getConnection());
assertNotSame(conn2.getConnection(), conn3.getConnection());
assertEquals(3, cf.getNumConnections());
cf.clear();
assertEquals(0, cf.getNumConnections());
conn1 = (PooledConnection) cf.createConnection();
conn2 = (PooledConnection) cf.createConnection();
conn3 = (PooledConnection) cf.createConnection();
assertNotSame(conn1.getConnection(), conn2.getConnection());
assertNotSame(conn1.getConnection(), conn3.getConnection());
assertNotSame(conn2.getConnection(), conn3.getConnection());
}
public void testMaxConnectionsAreCreated() throws Exception {
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
PooledConnectionFactory cf = new PooledConnectionFactory(amq);
cf.setMaxConnections(3);
PooledConnection conn1 = (PooledConnection) cf.createConnection();
PooledConnection conn2 = (PooledConnection) cf.createConnection();
PooledConnection conn3 = (PooledConnection) cf.createConnection();
assertNotSame(conn1.getConnection(), conn2.getConnection());
assertNotSame(conn1.getConnection(), conn3.getConnection());
assertNotSame(conn2.getConnection(), conn3.getConnection());
assertEquals(3, cf.getNumConnections());
}
public void testConnectionsAreRotated() throws Exception {
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
PooledConnectionFactory cf = new PooledConnectionFactory(amq);
cf.setMaxConnections(10);
ActiveMQConnection previous = null;
// Front load the pool.
for (int i = 0; i < 10; ++i) {
cf.createConnection();
}
for (int i = 0; i < 100; ++i) {
ActiveMQConnection current = ((PooledConnection) cf.createConnection()).getConnection();
assertNotSame(previous, current);
previous = current;
}
}
public void testConnectionsArePooled() throws Exception {
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
PooledConnectionFactory cf = new PooledConnectionFactory(amq);
cf.setMaxConnections(1);
PooledConnection conn1 = (PooledConnection) cf.createConnection();
PooledConnection conn2 = (PooledConnection) cf.createConnection();
PooledConnection conn3 = (PooledConnection) cf.createConnection();
assertSame(conn1.getConnection(), conn2.getConnection());
assertSame(conn1.getConnection(), conn3.getConnection());
assertSame(conn2.getConnection(), conn3.getConnection());
assertEquals(1, cf.getNumConnections());
}
public void testConnectionsArePooledAsyncCreate() throws Exception {
final ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
final PooledConnectionFactory cf = new PooledConnectionFactory(amq);
cf.setMaxConnections(1);
final ConcurrentLinkedQueue<PooledConnection> connections = new ConcurrentLinkedQueue<PooledConnection>();
final PooledConnection primary = (PooledConnection) cf.createConnection();
final ExecutorService executor = Executors.newFixedThreadPool(10);
final int numConnections = 100;
for (int i = 0; i < numConnections; ++i) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
connections.add((PooledConnection) cf.createConnection());
} catch (JMSException e) {
}
}
});
}
assertTrue("", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return connections.size() == numConnections;
}
}));
executor.shutdown();
assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
for(PooledConnection connection : connections) {
assertSame(primary.getConnection(), connection.getConnection());
}
connections.clear();
}
/**
* Tests the behavior of the sessionPool of the PooledConnectionFactory
* when maximum number of sessions are reached.
* Tests the behavior of the sessionPool of the PooledConnectionFactory when
* maximum number of sessions are reached.
*/
public void testApp() throws Exception
{
public void testApp() throws Exception {
// using separate thread for testing so that we can interrupt the test
// if the call to get a new session blocks.
@ -78,74 +206,74 @@ public class PooledConnectionFactoryTest extends TestCase
Future<Boolean> result = (Future<Boolean>) executor.submit(new TestRunner());
// test should not take > 5secs, so test fails i
Thread.sleep(5*1000);
Thread.sleep(5 * 1000);
if (!result.isDone() || !result.get().booleanValue()) {
PooledConnectionFactoryTest.LOG.error("2nd call to createSession()" +
" is blocking but should have returned an error instead.");
PooledConnectionFactoryTest.LOG.error("2nd call to createSession()" + " is blocking but should have returned an error instead.");
executor.shutdownNow();
Assert.fail("SessionPool inside PooledConnectionFactory is blocking if " +
"limit is exceeded but should return an exception instead.");
Assert.fail("SessionPool inside PooledConnectionFactory is blocking if " + "limit is exceeded but should return an exception instead.");
}
}
}
class TestRunner implements Callable<Boolean> {
static class TestRunner implements Callable<Boolean> {
public final static Logger LOG = Logger.getLogger(TestRunner.class);
public final static Logger LOG = Logger.getLogger(TestRunner.class);
/**
* @return true if test succeeded, false otherwise
*/
public Boolean call() {
/**
* @return true if test succeeded, false otherwise
*/
public Boolean call() {
Connection conn = null;
Session one = null;
Connection conn = null;
Session one = null;
// wait at most 5 seconds for the call to createSession
try {
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
PooledConnectionFactory cf = new PooledConnectionFactory(amq);
cf.setMaxConnections(3);
cf.setMaximumActiveSessionPerConnection(1);
cf.setBlockIfSessionPoolIsFull(false);
conn = cf.createConnection();
one = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session two = null;
// wait at most 5 seconds for the call to createSession
try {
// this should raise an exception as we called setMaximumActive(1)
two = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
two.close();
ActiveMQConnectionFactory amq = new ActiveMQConnectionFactory("vm://broker1?marshal=false&broker.persistent=false");
PooledConnectionFactory cf = new PooledConnectionFactory(amq);
cf.setMaxConnections(3);
cf.setMaximumActiveSessionPerConnection(1);
cf.setBlockIfSessionPoolIsFull(false);
LOG.error("Expected JMSException wasn't thrown.");
Assert.fail("seconds call to Connection.createSession() was supposed" +
"to raise an JMSException as internal session pool" +
"is exhausted. This did not happen and indiates a problem");
return new Boolean(false);
} catch (JMSException ex) {
if (ex.getCause().getClass() == java.util.NoSuchElementException.class) {
//expected, ignore but log
LOG.info("Caught expected " + ex);
} else {
LOG.error(ex);
conn = cf.createConnection();
one = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session two = null;
try {
// this should raise an exception as we called
// setMaximumActive(1)
two = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
two.close();
LOG.error("Expected JMSException wasn't thrown.");
Assert.fail("seconds call to Connection.createSession() was supposed" + "to raise an JMSException as internal session pool"
+ "is exhausted. This did not happen and indiates a problem");
return new Boolean(false);
} catch (JMSException ex) {
if (ex.getCause().getClass() == java.util.NoSuchElementException.class) {
// expected, ignore but log
LOG.info("Caught expected " + ex);
} else {
LOG.error(ex);
return new Boolean(false);
}
} finally {
if (one != null)
one.close();
if (conn != null)
conn.close();
}
} finally {
if (one != null)
one.close();
if (conn != null)
conn.close();
} catch (Exception ex) {
LOG.error(ex.getMessage());
return new Boolean(false);
}
} catch (Exception ex) {
LOG.error(ex.getMessage());
return new Boolean(false);
}
// all good, test succeeded
return new Boolean(true);
// all good, test succeeded
return new Boolean(true);
}
}
}

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.pool;
import static org.junit.Assert.assertEquals;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.log4j.Logger;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class PooledSessionTest {
private Logger LOG = Logger.getLogger(getClass());
private BrokerService broker;
private ActiveMQConnectionFactory factory;
private PooledConnectionFactory pooledFactory;
private String connectionUri;
@Before
public void setUp() throws Exception {
broker = new BrokerService();
broker.setPersistent(false);
broker.setUseJmx(false);
TransportConnector connector = broker.addConnector("tcp://localhost:0");
broker.start();
connectionUri = connector.getPublishableConnectString();
factory = new ActiveMQConnectionFactory(connectionUri);
pooledFactory = new PooledConnectionFactory(factory);
pooledFactory.setMaxConnections(1);
pooledFactory.setBlockIfSessionPoolIsFull(false);
}
@After
public void tearDown() throws Exception {
broker.stop();
broker.waitUntilStopped();
broker = null;
}
@Test
public void testPooledSessionStats() throws Exception {
PooledConnection connection = (PooledConnection) pooledFactory.createConnection();
assertEquals(0, connection.getNumActiveSessions());
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertEquals(1, connection.getNumActiveSessions());
session.close();
assertEquals(0, connection.getNumActiveSessions());
assertEquals(1, connection.getNumtIdleSessions());
assertEquals(1, connection.getNumSessions());
}
}

View File

@ -35,7 +35,7 @@ import org.apache.activemq.test.TestSupport;
import org.apache.activemq.util.SocketProxy;
/**
*
*
*/
public class PooledTopicPublisherTest extends TestSupport {
@ -52,7 +52,7 @@ public class PooledTopicPublisherTest extends TestSupport {
publisher.publish(session.createMessage());
}
public void testSetGetExceptionListener() throws Exception {
PooledConnectionFactory pcf = new PooledConnectionFactory();
pcf.setConnectionFactory(new ActiveMQConnectionFactory("vm://test"));
@ -65,37 +65,37 @@ public class PooledTopicPublisherTest extends TestSupport {
connection.setExceptionListener(listener);
assertEquals(listener, connection.getExceptionListener());
}
public void testPooledConnectionAfterInactivity() throws Exception {
BrokerService broker = new BrokerService();
TransportConnector networkConnector = broker.addConnector("tcp://localhost:0");
broker.setPersistent(false);
broker.setUseJmx(false);
broker.start();
SocketProxy proxy = new SocketProxy(networkConnector.getConnectUri());
PooledConnectionFactory pcf = new PooledConnectionFactory();
String uri = proxy.getUrl().toString() + "?trace=true&wireFormat.maxInactivityDuration=500&wireFormat.maxInactivityDurationInitalDelay=500";
pcf.setConnectionFactory(new ActiveMQConnectionFactory(uri));
PooledConnection conn = (PooledConnection) pcf.createConnection();
ActiveMQConnection amq = conn.getConnection();
assertNotNull(amq);
final CountDownLatch gotException = new CountDownLatch(1);
//amq.set
conn.setExceptionListener(new ExceptionListener() {
public void onException(JMSException exception) {
gotException.countDown();
}});
conn.setClientID(getName());
// let it hang, simulate a server hang so inactivity timeout kicks in
proxy.pause();
//assertTrue("got an exception", gotException.await(5, TimeUnit.SECONDS));
TimeUnit.SECONDS.sleep(2);
conn.close();
}
@Override
protected void tearDown() throws Exception {
if (connection != null) {