diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java b/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java index 5e1e1b4040..be935dd314 100644 --- a/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java +++ b/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java @@ -20,6 +20,7 @@ package org.apache.activemq.pool; import java.io.IOException; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.JMSException; @@ -38,6 +39,7 @@ public class ConnectionPool { private ActiveMQConnection connection; private ConcurrentHashMap cache; + private ConcurrentLinkedQueue loanedSessions = new ConcurrentLinkedQueue(); private AtomicBoolean started = new AtomicBoolean(false); private int referenceCount; private ObjectPoolFactory poolFactory; @@ -118,6 +120,7 @@ public class ConnectionPool { pool = cache.get(key); // this will return a non-null value... } PooledSession session = pool.borrowSession(); + this.loanedSessions.add(session); return session; } @@ -155,6 +158,14 @@ public class ConnectionPool { if (referenceCount == 0) { expiredCheck(); + for (PooledSession session : this.loanedSessions) { + try { + session.close(); + } catch (Exception e) { + } + } + this.loanedSessions.clear(); + // only clean up temp destinations when all users // of this connection have called close if (getConnection() != null) { @@ -208,4 +219,11 @@ public class ConnectionPool { return expiryTimeout; } + void onSessionReturned(PooledSession session) { + this.loanedSessions.remove(session); + } + + void onSessionInvalidated(PooledSession session) { + this.loanedSessions.remove(session); + } } diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java b/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java index 7a7fd90369..bafaf16966 100644 --- a/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java +++ b/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java @@ -55,7 +55,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * + * */ public class PooledSession implements Session, TopicSession, QueueSession, XASession { private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class); diff --git a/activemq-pool/src/main/java/org/apache/activemq/pool/SessionPool.java b/activemq-pool/src/main/java/org/apache/activemq/pool/SessionPool.java index 626056a2f2..b6baabfa1c 100644 --- a/activemq-pool/src/main/java/org/apache/activemq/pool/SessionPool.java +++ b/activemq-pool/src/main/java/org/apache/activemq/pool/SessionPool.java @@ -27,8 +27,8 @@ import org.apache.commons.pool.PoolableObjectFactory; /** * Represents the session pool for a given JMS connection. - * - * + * + * */ public class SessionPool implements PoolableObjectFactory { private ConnectionPool connectionPool; @@ -64,20 +64,21 @@ public class SessionPool implements PoolableObjectFactory { // lets check if we are already closed getConnection(); try { + connectionPool.onSessionReturned(session); getSessionPool().returnObject(session); } catch (Exception e) { throw JMSExceptionSupport.create("Failed to return session to pool: " + e, e); } } - + public void invalidateSession(PooledSession session) throws JMSException { try { + connectionPool.onSessionInvalidated(session); getSessionPool().invalidateObject(session); } catch (Exception e) { throw JMSExceptionSupport.create("Failed to invalidate session: " + e, e); } } - // PoolableObjectFactory methods // ------------------------------------------------------------------------- diff --git a/activemq-pool/src/test/java/org/apache/activemq/pool/PooledSessionExhaustionTest.java b/activemq-pool/src/test/java/org/apache/activemq/pool/PooledSessionExhaustionTest.java new file mode 100644 index 0000000000..725f436d46 --- /dev/null +++ b/activemq-pool/src/test/java/org/apache/activemq/pool/PooledSessionExhaustionTest.java @@ -0,0 +1,88 @@ +package org.apache.activemq.pool; + +import org.apache.activemq.*; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; + +import junit.framework.*; +import javax.jms.*; +import javax.jms.Message; + +import org.apache.log4j.Logger; + +public class PooledSessionExhaustionTest extends TestCase { + private static final String QUEUE = "FOO"; + private static final int NUM_MESSAGES = 700; + + private Logger logger = Logger.getLogger(getClass()); + + private BrokerService broker; + private ActiveMQConnectionFactory factory; + private PooledConnectionFactory pooledFactory; + private String connectionUri; + private int numReceived = 0; + + protected void setUp() throws Exception { + broker = new BrokerService(); + broker.setPersistent(false); + TransportConnector connector = broker.addConnector("tcp://localhost:0"); + broker.start(); + connectionUri = connector.getPublishableConnectString(); + factory = new ActiveMQConnectionFactory(connectionUri); + pooledFactory = new PooledConnectionFactory(factory); + pooledFactory.setMaxConnections(1); + pooledFactory.setBlockIfSessionPoolIsFull(false); + } + + public void sendMessages(ConnectionFactory connectionFactory) throws Exception { + for (int i = 0; i < NUM_MESSAGES; i++) { + Connection connection = connectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(QUEUE); + MessageProducer producer = session.createProducer(destination); + + String msgTo = "hello"; + TextMessage message = session.createTextMessage(msgTo); + producer.send(message); + connection.close(); + logger.debug("sent " + i + " messages using " + connectionFactory.getClass()); + } + } + + public void testCanExhaustSessions() throws Exception { + Thread thread = new Thread(new Runnable() { + public void run() { + try { + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri); + Connection connection = connectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(QUEUE); + MessageConsumer consumer = session.createConsumer(destination); + for (int i = 0; i < NUM_MESSAGES; ++i) { + Message msg = consumer.receive(5000); + if (msg == null) { + return; + } + numReceived++; + if (numReceived % 20 == 0) { + logger.debug("received " + numReceived + " messages "); + System.runFinalization(); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + thread.start(); + + sendMessages(pooledFactory); + thread.join(); + + assertEquals(NUM_MESSAGES, numReceived); + } +}