track pooled sessions in the parent ConnectionPool and ensure 
they get closed and returned to the pool.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1180038 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2011-10-07 13:51:44 +00:00
parent 617c436d24
commit bfd1be561d
4 changed files with 112 additions and 5 deletions

View File

@ -20,6 +20,7 @@ package org.apache.activemq.pool;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
@ -38,6 +39,7 @@ public class ConnectionPool {
private ActiveMQConnection connection;
private ConcurrentHashMap<SessionKey, SessionPool> cache;
private ConcurrentLinkedQueue<PooledSession> loanedSessions = new ConcurrentLinkedQueue<PooledSession>();
private AtomicBoolean started = new AtomicBoolean(false);
private int referenceCount;
private ObjectPoolFactory poolFactory;
@ -118,6 +120,7 @@ public class ConnectionPool {
pool = cache.get(key); // this will return a non-null value...
}
PooledSession session = pool.borrowSession();
this.loanedSessions.add(session);
return session;
}
@ -155,6 +158,14 @@ public class ConnectionPool {
if (referenceCount == 0) {
expiredCheck();
for (PooledSession session : this.loanedSessions) {
try {
session.close();
} catch (Exception e) {
}
}
this.loanedSessions.clear();
// only clean up temp destinations when all users
// of this connection have called close
if (getConnection() != null) {
@ -208,4 +219,11 @@ public class ConnectionPool {
return expiryTimeout;
}
void onSessionReturned(PooledSession session) {
this.loanedSessions.remove(session);
}
void onSessionInvalidated(PooledSession session) {
this.loanedSessions.remove(session);
}
}

View File

@ -55,7 +55,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*
*/
public class PooledSession implements Session, TopicSession, QueueSession, XASession {
private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);

View File

@ -27,8 +27,8 @@ import org.apache.commons.pool.PoolableObjectFactory;
/**
* Represents the session pool for a given JMS connection.
*
*
*
*
*/
public class SessionPool implements PoolableObjectFactory {
private ConnectionPool connectionPool;
@ -64,20 +64,21 @@ public class SessionPool implements PoolableObjectFactory {
// lets check if we are already closed
getConnection();
try {
connectionPool.onSessionReturned(session);
getSessionPool().returnObject(session);
} catch (Exception e) {
throw JMSExceptionSupport.create("Failed to return session to pool: " + e, e);
}
}
public void invalidateSession(PooledSession session) throws JMSException {
try {
connectionPool.onSessionInvalidated(session);
getSessionPool().invalidateObject(session);
} catch (Exception e) {
throw JMSExceptionSupport.create("Failed to invalidate session: " + e, e);
}
}
// PoolableObjectFactory methods
// -------------------------------------------------------------------------

View File

@ -0,0 +1,88 @@
package org.apache.activemq.pool;
import org.apache.activemq.*;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import junit.framework.*;
import javax.jms.*;
import javax.jms.Message;
import org.apache.log4j.Logger;
public class PooledSessionExhaustionTest extends TestCase {
private static final String QUEUE = "FOO";
private static final int NUM_MESSAGES = 700;
private Logger logger = Logger.getLogger(getClass());
private BrokerService broker;
private ActiveMQConnectionFactory factory;
private PooledConnectionFactory pooledFactory;
private String connectionUri;
private int numReceived = 0;
protected void setUp() throws Exception {
broker = new BrokerService();
broker.setPersistent(false);
TransportConnector connector = broker.addConnector("tcp://localhost:0");
broker.start();
connectionUri = connector.getPublishableConnectString();
factory = new ActiveMQConnectionFactory(connectionUri);
pooledFactory = new PooledConnectionFactory(factory);
pooledFactory.setMaxConnections(1);
pooledFactory.setBlockIfSessionPoolIsFull(false);
}
public void sendMessages(ConnectionFactory connectionFactory) throws Exception {
for (int i = 0; i < NUM_MESSAGES; i++) {
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE);
MessageProducer producer = session.createProducer(destination);
String msgTo = "hello";
TextMessage message = session.createTextMessage(msgTo);
producer.send(message);
connection.close();
logger.debug("sent " + i + " messages using " + connectionFactory.getClass());
}
}
public void testCanExhaustSessions() throws Exception {
Thread thread = new Thread(new Runnable() {
public void run() {
try {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(QUEUE);
MessageConsumer consumer = session.createConsumer(destination);
for (int i = 0; i < NUM_MESSAGES; ++i) {
Message msg = consumer.receive(5000);
if (msg == null) {
return;
}
numReceived++;
if (numReceived % 20 == 0) {
logger.debug("received " + numReceived + " messages ");
System.runFinalization();
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
thread.start();
sendMessages(pooledFactory);
thread.join();
assertEquals(NUM_MESSAGES, numReceived);
}
}