diff --git a/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java b/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java index d53e68a78f..d20be6eef6 100644 --- a/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java +++ b/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java @@ -24,6 +24,11 @@ import java.util.Map; import javax.jms.JMSException; import javax.jms.Session; +import javax.transaction.RollbackException; +import javax.transaction.Status; +import javax.transaction.SystemException; +import javax.transaction.TransactionManager; +import javax.transaction.xa.XAResource; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.transport.TransportListener; @@ -38,17 +43,18 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public class ConnectionPool { + private TransactionManager transactionManager; private ActiveMQConnection connection; private Map cache; private AtomicBoolean started = new AtomicBoolean(false); private int referenceCount; private ObjectPoolFactory poolFactory; - private long lastUsed; + private long lastUsed = System.currentTimeMillis(); private boolean hasFailed; private int idleTimeout = 30*1000; - public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) { - this(connection, new HashMap(), poolFactory); + public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory, TransactionManager transactionManager) { + this(connection, new HashMap(), poolFactory, transactionManager); // Add a transport Listener so that we can notice if this connection should be expired due to // a connection failure. connection.addTransportListener(new TransportListener(){ @@ -66,10 +72,11 @@ public class ConnectionPool { }); } - public ConnectionPool(ActiveMQConnection connection, Map cache, ObjectPoolFactory poolFactory) { + public ConnectionPool(ActiveMQConnection connection, Map cache, ObjectPoolFactory poolFactory, TransactionManager transactionManager) { this.connection = connection; this.cache = cache; this.poolFactory = poolFactory; + this.transactionManager = transactionManager; } public void start() throws JMSException { @@ -83,13 +90,35 @@ public class ConnectionPool { } public Session createSession(boolean transacted, int ackMode) throws JMSException { - SessionKey key = new SessionKey(transacted, ackMode); - SessionPool pool = (SessionPool) cache.get(key); - if (pool == null) { - pool = new SessionPool(this, key, poolFactory.createPool()); - cache.put(key, pool); + try { + boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION); + if (isXa) { + transacted = true; + ackMode = Session.SESSION_TRANSACTED; + } + SessionKey key = new SessionKey(transacted, ackMode); + SessionPool pool = (SessionPool) cache.get(key); + if (pool == null) { + pool = new SessionPool(this, key, poolFactory.createPool()); + cache.put(key, pool); + } + PooledSession session = pool.borrowSession(); + if (isXa) { + session.setIgnoreClose(true); + transactionManager.getTransaction().registerSynchronization(new Synchronization(session)); + incrementReferenceCount(); + transactionManager.getTransaction().enlistResource(createXaResource(session)); + } + return session; + } catch (RollbackException e) { + final JMSException jmsException = new JMSException("Rollback Exception"); + jmsException.initCause(e); + throw jmsException; + } catch (SystemException e) { + final JMSException jmsException = new JMSException("System Exception"); + jmsException.initCause(e); + throw jmsException; } - return pool.borrowSession(); } synchronized public void close() { @@ -117,8 +146,8 @@ public class ConnectionPool { synchronized public void decrementReferenceCount() { referenceCount--; + lastUsed = System.currentTimeMillis(); if( referenceCount == 0 ) { - lastUsed = System.currentTimeMillis(); expiredCheck(); } } @@ -129,7 +158,8 @@ public class ConnectionPool { synchronized public boolean expiredCheck() { if( connection == null ) return true; - if( hasFailed || idleTimeout> 0 && System.currentTimeMillis() > lastUsed+idleTimeout ) { + long t = System.currentTimeMillis(); + if( hasFailed || idleTimeout> 0 && t > lastUsed+idleTimeout ) { if( referenceCount == 0 ) { close(); } @@ -145,5 +175,31 @@ public class ConnectionPool { public void setIdleTimeout(int idleTimeout) { this.idleTimeout = idleTimeout; } + + protected XAResource createXaResource(PooledSession session) throws JMSException { + return session.getSession().getTransactionContext(); + } + protected class Synchronization implements javax.transaction.Synchronization { + private final PooledSession session; + + private Synchronization(PooledSession session) { + this.session = session; + } + + public void beforeCompletion() { + } + + public void afterCompletion(int status) { + try { + // This will return session to the pool. + session.setIgnoreClose(false); + session.close(); + decrementReferenceCount(); + } catch (JMSException e) { + throw new RuntimeException(e); + } + } + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java b/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java index f53bda2f7b..cdb18e059b 100644 --- a/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java @@ -19,11 +19,13 @@ package org.apache.activemq.pool; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedList; import java.util.Map; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; +import javax.transaction.TransactionManager; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; @@ -45,10 +47,12 @@ import org.apache.commons.pool.impl.GenericObjectPoolFactory; * @version $Revision: 1.1 $ */ public class PooledConnectionFactory implements ConnectionFactory, Service { - private ActiveMQConnectionFactory connectionFactory; + private ConnectionFactory connectionFactory; private Map cache = new HashMap(); private ObjectPoolFactory poolFactory; - private int maximumActive = 5000; + private int maximumActive = 500; + private int maxConnections = 1; + private TransactionManager transactionManager; public PooledConnectionFactory() { this(new ActiveMQConnectionFactory()); @@ -62,21 +66,39 @@ public class PooledConnectionFactory implements ConnectionFactory, Service { this.connectionFactory = connectionFactory; } - public ActiveMQConnectionFactory getConnectionFactory() { + public ConnectionFactory getConnectionFactory() { return connectionFactory; } - public void setConnectionFactory(ActiveMQConnectionFactory connectionFactory) { + public void setConnectionFactory(ConnectionFactory connectionFactory) { this.connectionFactory = connectionFactory; } + public TransactionManager getTransactionManager() { + return transactionManager; + } + + public void setTransactionManager(TransactionManager transactionManager) { + this.transactionManager = transactionManager; + } + public Connection createConnection() throws JMSException { return createConnection(null, null); } public synchronized Connection createConnection(String userName, String password) throws JMSException { ConnectionKey key = new ConnectionKey(userName, password); - ConnectionPool connection = (ConnectionPool) cache.get(key); + LinkedList pools = (LinkedList) cache.get(key); + + if (pools == null) { + pools = new LinkedList(); + cache.put(key, pools); + } + + ConnectionPool connection = null; + if (pools.size() == maxConnections) { + connection = (ConnectionPool) pools.removeFirst(); + } // Now.. we might get a connection, but it might be that we need to // dump it.. @@ -86,11 +108,15 @@ public class PooledConnectionFactory implements ConnectionFactory, Service { if (connection == null) { ActiveMQConnection delegate = createConnection(key); - connection = new ConnectionPool(delegate, getPoolFactory()); - cache.put(key, connection); + connection = createConnectionPool(delegate); } + pools.add(connection); return new PooledConnection(connection); } + + protected ConnectionPool createConnectionPool(ActiveMQConnection connection) { + return new ConnectionPool(connection, getPoolFactory(), transactionManager); + } protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException { if (key.getUserName() == null && key.getPassword() == null) { @@ -146,6 +172,20 @@ public class PooledConnectionFactory implements ConnectionFactory, Service { this.maximumActive = maximumActive; } + /** + * @return the maxConnections + */ + public int getMaxConnections() { + return maxConnections; + } + + /** + * @param maxConnections the maxConnections to set + */ + public void setMaxConnections(int maxConnections) { + this.maxConnections = maxConnections; + } + protected ObjectPoolFactory createPoolFactory() { return new GenericObjectPoolFactory(null, maximumActive); } diff --git a/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java b/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java index 33828a8bd8..5c0a9ce30e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/pool/PooledSession.java @@ -65,57 +65,68 @@ public class PooledSession implements TopicSession, QueueSession { private ActiveMQQueueSender queueSender; private ActiveMQTopicPublisher topicPublisher; private boolean transactional = true; + private boolean ignoreClose = false; private final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList(); private final CopyOnWriteArrayList browsers = new CopyOnWriteArrayList(); + public PooledSession(ActiveMQSession aSession, SessionPool sessionPool) { this.session = aSession; this.sessionPool = sessionPool; this.transactional = session.isTransacted(); } + protected boolean isIgnoreClose() { + return ignoreClose; + } + + protected void setIgnoreClose(boolean ignoreClose) { + this.ignoreClose = ignoreClose; + } public void close() throws JMSException { - // TODO a cleaner way to reset?? - - // lets reset the session - getSession().setMessageListener(null); - - // Close any consumers and browsers that may have been created. - for (Iterator iter = consumers.iterator(); iter.hasNext();) { - MessageConsumer consumer = (MessageConsumer) iter.next(); - consumer.close(); - } - consumers.clear(); - - for (Iterator iter = browsers.iterator(); iter.hasNext();) { - QueueBrowser browser = (QueueBrowser) iter.next(); - browser.close(); - } - browsers.clear(); - - // maybe do a rollback? - if (transactional) { - try { - getSession().rollback(); + if (!ignoreClose) { + // TODO a cleaner way to reset?? + + // lets reset the session + getSession().setMessageListener(null); + + // Close any consumers and browsers that may have been created. + for (Iterator iter = consumers.iterator(); iter.hasNext();) { + MessageConsumer consumer = (MessageConsumer) iter.next(); + consumer.close(); } - catch (JMSException e) { - log.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e); - - // lets close the session and not put the session back into the pool + consumers.clear(); + + for (Iterator iter = browsers.iterator(); iter.hasNext();) { + QueueBrowser browser = (QueueBrowser) iter.next(); + browser.close(); + } + browsers.clear(); + + // maybe do a rollback? + if (transactional) { try { - session.close(); + getSession().rollback(); } - catch (JMSException e1) { - log.trace("Ignoring exception as discarding session: " + e1, e1); + catch (JMSException e) { + log.warn("Caught exception trying rollback() when putting session back into the pool: " + e, e); + + // lets close the session and not put the session back into the pool + try { + session.close(); + } + catch (JMSException e1) { + log.trace("Ignoring exception as discarding session: " + e1, e1); + } + session = null; + return; } - session = null; - return; } + + sessionPool.returnSession(this); } - - sessionPool.returnSession(this); } public void commit() throws JMSException {