AMQ-1084: Allow o.a.a.pool to support XA transactions

Also enhance the pooled ConnectionFactory to reuse several connections.
Fix the lastUsed timestamp on the ConnectionPool to avoid discarding reusable connections.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@481742 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Guillaume Nodet 2006-12-03 09:24:55 +00:00
parent 4554b5d22f
commit 870401217a
3 changed files with 159 additions and 52 deletions

View File

@ -24,6 +24,11 @@ import java.util.Map;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.transport.TransportListener;
@ -38,17 +43,18 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class ConnectionPool {
private TransactionManager transactionManager;
private ActiveMQConnection connection;
private Map cache;
private AtomicBoolean started = new AtomicBoolean(false);
private int referenceCount;
private ObjectPoolFactory poolFactory;
private long lastUsed;
private long lastUsed = System.currentTimeMillis();
private boolean hasFailed;
private int idleTimeout = 30*1000;
public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) {
this(connection, new HashMap(), poolFactory);
public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager) {
this(connection, new HashMap(), poolFactory, transactionManager);
// Add a transport Listener so that we can notice if this connection should be expired due to
// a connection failure.
connection.addTransportListener(new TransportListener(){
@ -66,10 +72,11 @@ public class ConnectionPool {
});
}
public ConnectionPool(ActiveMQConnection connection, Map cache, ObjectPoolFactory poolFactory) {
public ConnectionPool(ActiveMQConnection connection, Map cache, ObjectPoolFactory poolFactory, TransactionManager transactionManager) {
this.connection = connection;
this.cache = cache;
this.poolFactory = poolFactory;
this.transactionManager = transactionManager;
}
public void start() throws JMSException {
@ -83,13 +90,35 @@ public class ConnectionPool {
}
public Session createSession(boolean transacted, int ackMode) throws JMSException {
SessionKey key = new SessionKey(transacted, ackMode);
SessionPool pool = (SessionPool) cache.get(key);
if (pool == null) {
pool = new SessionPool(this, key, poolFactory.createPool());
cache.put(key, pool);
try {
boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION);
if (isXa) {
transacted = true;
ackMode = Session.SESSION_TRANSACTED;
}
SessionKey key = new SessionKey(transacted, ackMode);
SessionPool pool = (SessionPool) cache.get(key);
if (pool == null) {
pool = new SessionPool(this, key, poolFactory.createPool());
cache.put(key, pool);
}
PooledSession session = pool.borrowSession();
if (isXa) {
session.setIgnoreClose(true);
transactionManager.getTransaction().registerSynchronization(new Synchronization(session));
incrementReferenceCount();
transactionManager.getTransaction().enlistResource(createXaResource(session));
}
return session;
} catch (RollbackException e) {
final JMSException jmsException = new JMSException("Rollback Exception");
jmsException.initCause(e);
throw jmsException;
} catch (SystemException e) {
final JMSException jmsException = new JMSException("System Exception");
jmsException.initCause(e);
throw jmsException;
}
return pool.borrowSession();
}
synchronized public void close() {
@ -117,8 +146,8 @@ public class ConnectionPool {
synchronized public void decrementReferenceCount() {
referenceCount--;
lastUsed = System.currentTimeMillis();
if( referenceCount == 0 ) {
lastUsed = System.currentTimeMillis();
expiredCheck();
}
}
@ -129,7 +158,8 @@ public class ConnectionPool {
synchronized public boolean expiredCheck() {
if( connection == null )
return true;
if( hasFailed || idleTimeout> 0 && System.currentTimeMillis() > lastUsed+idleTimeout ) {
long t = System.currentTimeMillis();
if( hasFailed || idleTimeout> 0 && t > lastUsed+idleTimeout ) {
if( referenceCount == 0 ) {
close();
}
@ -145,5 +175,31 @@ public class ConnectionPool {
public void setIdleTimeout(int idleTimeout) {
this.idleTimeout = idleTimeout;
}
protected XAResource createXaResource(PooledSession session) throws JMSException {
return session.getSession().getTransactionContext();
}
protected class Synchronization implements javax.transaction.Synchronization {
private final PooledSession session;
private Synchronization(PooledSession session) {
this.session = session;
}
public void beforeCompletion() {
}
public void afterCompletion(int status) {
try {
// This will return session to the pool.
session.setIgnoreClose(false);
session.close();
decrementReferenceCount();
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
}
}

View File

@ -19,11 +19,13 @@ package org.apache.activemq.pool;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.transaction.TransactionManager;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
@ -45,10 +47,12 @@ import org.apache.commons.pool.impl.GenericObjectPoolFactory;
* @version $Revision: 1.1 $
*/
public class PooledConnectionFactory implements ConnectionFactory, Service {
private ActiveMQConnectionFactory connectionFactory;
private ConnectionFactory connectionFactory;
private Map cache = new HashMap();
private ObjectPoolFactory poolFactory;
private int maximumActive = 5000;
private int maximumActive = 500;
private int maxConnections = 1;
private TransactionManager transactionManager;
public PooledConnectionFactory() {
this(new ActiveMQConnectionFactory());
@ -62,21 +66,39 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
this.connectionFactory = connectionFactory;
}
public ActiveMQConnectionFactory getConnectionFactory() {
public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}
public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
public void setConnectionFactory(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}
public TransactionManager getTransactionManager() {
return transactionManager;
}
public void setTransactionManager(TransactionManager transactionManager) {
this.transactionManager = transactionManager;
}
public Connection createConnection() throws JMSException {
return createConnection(null, null);
}
public synchronized Connection createConnection(String userName, String password) throws JMSException {
ConnectionKey key = new ConnectionKey(userName, password);
ConnectionPool connection = (ConnectionPool) cache.get(key);
LinkedList pools = (LinkedList) cache.get(key);
if (pools == null) {
pools = new LinkedList();
cache.put(key, pools);
}
ConnectionPool connection = null;
if (pools.size() == maxConnections) {
connection = (ConnectionPool) pools.removeFirst();
}
// Now.. we might get a connection, but it might be that we need to
// dump it..
@ -86,11 +108,15 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
if (connection == null) {
ActiveMQConnection delegate = createConnection(key);
connection = new ConnectionPool(delegate, getPoolFactory());
cache.put(key, connection);
connection = createConnectionPool(delegate);
}
pools.add(connection);
return new PooledConnection(connection);
}
protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
return new ConnectionPool(connection, getPoolFactory(), transactionManager);
}
protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException {
if (key.getUserName() == null && key.getPassword() == null) {
@ -146,6 +172,20 @@ public class PooledConnectionFactory implements ConnectionFactory, Service {
this.maximumActive = maximumActive;
}
/**
* @return the maxConnections
*/
public int getMaxConnections() {
return maxConnections;
}
/**
* @param maxConnections the maxConnections to set
*/
public void setMaxConnections(int maxConnections) {
this.maxConnections = maxConnections;
}
protected ObjectPoolFactory createPoolFactory() {
return new GenericObjectPoolFactory(null, maximumActive);
}

View File

@ -65,57 +65,68 @@ public class PooledSession implements TopicSession, QueueSession {
private ActiveMQQueueSender queueSender;
private ActiveMQTopicPublisher topicPublisher;
private boolean transactional = true;
private boolean ignoreClose = false;
private final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList();
private final CopyOnWriteArrayList browsers = new CopyOnWriteArrayList();
public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) {
this.session = aSession;
this.sessionPool = sessionPool;
this.transactional = session.isTransacted();
}
protected boolean isIgnoreClose() {
return ignoreClose;
}
protected void setIgnoreClose(boolean ignoreClose) {
this.ignoreClose = ignoreClose;
}
public void close() throws JMSException {
// TODO a cleaner way to reset??
// lets reset the session
getSession().setMessageListener(null);
// Close any consumers and browsers that may have been created.
for (Iterator iter = consumers.iterator(); iter.hasNext();) {
MessageConsumer consumer = (MessageConsumer) iter.next();
consumer.close();
}
consumers.clear();
for (Iterator iter = browsers.iterator(); iter.hasNext();) {
QueueBrowser browser = (QueueBrowser) iter.next();
browser.close();
}
browsers.clear();
// maybe do a rollback?
if (transactional) {
try {
getSession().rollback();
if (!ignoreClose) {
// TODO a cleaner way to reset??
// lets reset the session
getSession().setMessageListener(null);
// Close any consumers and browsers that may have been created.
for (Iterator iter = consumers.iterator(); iter.hasNext();) {
MessageConsumer consumer = (MessageConsumer) iter.next();
consumer.close();
}
catch (JMSException e) {
log.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e);
// lets close the session and not put the session back into the pool
consumers.clear();
for (Iterator iter = browsers.iterator(); iter.hasNext();) {
QueueBrowser browser = (QueueBrowser) iter.next();
browser.close();
}
browsers.clear();
// maybe do a rollback?
if (transactional) {
try {
session.close();
getSession().rollback();
}
catch (JMSException e1) {
log.trace("Ignoring exception as discarding session: " + e1, e1);
catch (JMSException e) {
log.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e);
// lets close the session and not put the session back into the pool
try {
session.close();
}
catch (JMSException e1) {
log.trace("Ignoring exception as discarding session: " + e1, e1);
}
session = null;
return;
}
session = null;
return;
}
sessionPool.returnSession(this);
}
sessionPool.returnSession(this);
}
public void commit() throws JMSException {