Refactor the way sessions are pooled.  We don't need to keep the
PooledSession instances around since the state is unique to the session
it wraps we only need to keep the Session instances in the SessionPool
and create a new PooledSession on borrow to manage that session.  This
allows the PooledSession to have a real closed state that protects
against multiple close calls placing duplicate PooledSession instances
into the SessionPool.  This also simplifies the code in the
XaConnectionPool since it doesn't need to try and reset state in
PouledSessions before placing them back as it gets a fresh wrapper each
time with the correct state.
This commit is contained in:
Timothy Bish 2014-06-12 19:12:56 -04:00
parent 91a0041915
commit f395c70608
5 changed files with 153 additions and 52 deletions

View File

@ -25,12 +25,12 @@ import javax.jms.Connection;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Holds a real JMS connection along with the session pools associated with it.
@ -42,8 +42,6 @@ import org.slf4j.LoggerFactory;
*/
public class ConnectionPool {
private static final transient Logger LOG = LoggerFactory.getLogger(ConnectionPool.class);
protected Connection connection;
private int referenceCount;
private long lastUsed = System.currentTimeMillis();
@ -54,7 +52,7 @@ public class ConnectionPool {
private boolean useAnonymousProducers = true;
private final AtomicBoolean started = new AtomicBoolean(false);
private final GenericKeyedObjectPool<SessionKey, PooledSession> sessionPool;
private final GenericKeyedObjectPool<SessionKey, Session> sessionPool;
private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>();
public ConnectionPool(Connection connection) {
@ -62,33 +60,29 @@ public class ConnectionPool {
this.connection = wrap(connection);
// Create our internal Pool of session instances.
this.sessionPool = new GenericKeyedObjectPool<SessionKey, PooledSession>(
new KeyedPoolableObjectFactory<SessionKey, PooledSession>() {
this.sessionPool = new GenericKeyedObjectPool<SessionKey, Session>(
new KeyedPoolableObjectFactory<SessionKey, Session>() {
@Override
public void activateObject(SessionKey key, PooledSession session) throws Exception {
ConnectionPool.this.loanedSessions.add(session);
public void activateObject(SessionKey key, Session session) throws Exception {
}
@Override
public void destroyObject(SessionKey key, PooledSession session) throws Exception {
ConnectionPool.this.loanedSessions.remove(session);
session.getInternalSession().close();
public void destroyObject(SessionKey key, Session session) throws Exception {
session.close();
}
@Override
public PooledSession makeObject(SessionKey key) throws Exception {
Session session = makeSession(key);
return new PooledSession(key, session, sessionPool, key.isTransacted(), useAnonymousProducers);
public Session makeObject(SessionKey key) throws Exception {
return makeSession(key);
}
@Override
public void passivateObject(SessionKey key, PooledSession session) throws Exception {
ConnectionPool.this.loanedSessions.remove(session);
public void passivateObject(SessionKey key, Session session) throws Exception {
}
@Override
public boolean validateObject(SessionKey key, PooledSession session) {
public boolean validateObject(SessionKey key, Session session) {
return true;
}
}
@ -130,7 +124,23 @@ public class ConnectionPool {
SessionKey key = new SessionKey(transacted, ackMode);
PooledSession session;
try {
session = sessionPool.borrowObject(key);
session = new PooledSession(key, sessionPool.borrowObject(key), sessionPool, key.isTransacted(), useAnonymousProducers);
session.addSessionEventListener(new PooledSessionEventListener() {
@Override
public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
}
@Override
public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
}
@Override
public void onSessionClosed(PooledSession session) {
ConnectionPool.this.loanedSessions.remove(session);
}
});
this.loanedSessions.add(session);
} catch (Exception e) {
IllegalStateException illegalStateException = new IllegalStateException(e.toString());
illegalStateException.initCause(e);

View File

@ -19,6 +19,7 @@ package org.apache.activemq.jms.pool;
import java.io.Serializable;
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.Destination;
@ -54,10 +55,11 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class);
private final SessionKey key;
private final KeyedObjectPool<SessionKey, PooledSession> sessionPool;
private final KeyedObjectPool<SessionKey, Session> sessionPool;
private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>();
private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>();
private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners = new CopyOnWriteArrayList<PooledSessionEventListener>();
private final AtomicBoolean closed = new AtomicBoolean();
private MessageProducer producer;
private TopicPublisher publisher;
@ -69,7 +71,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
private boolean isXa;
private boolean useAnonymousProducers = true;
public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey, PooledSession> sessionPool, boolean transactional, boolean anonymous) {
public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey, Session> sessionPool, boolean transactional, boolean anonymous) {
this.key = key;
this.session = session;
this.sessionPool = sessionPool;
@ -94,7 +96,11 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
@Override
public void close() throws JMSException {
if (!ignoreClose) {
if (ignoreClose) {
return;
}
if (closed.compareAndSet(false, true)) {
boolean invalidate = false;
try {
// lets reset the session
@ -140,22 +146,23 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes
} catch (JMSException e1) {
LOG.trace("Ignoring exception on close as discarding session: " + e1, e1);
}
session = null;
}
try {
sessionPool.invalidateObject(key, this);
sessionPool.invalidateObject(key, session);
} catch (Exception e) {
LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e);
}
} else {
try {
sessionPool.returnObject(key, this);
sessionPool.returnObject(key, session);
} catch (Exception e) {
javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString());
illegalStateException.initCause(e);
throw illegalStateException;
}
}
session = null;
}
}

View File

@ -19,8 +19,6 @@ package org.apache.activemq.jms.pool;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.XAConnection;
import javax.transaction.RollbackException;
import javax.transaction.Status;
@ -65,22 +63,6 @@ public class XaConnectionPool extends ConnectionPool {
}
PooledSession session = (PooledSession) super.createSession(transacted, ackMode);
if (isXa) {
session.addSessionEventListener(new PooledSessionEventListener() {
@Override
public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
}
@Override
public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
}
@Override
public void onSessionClosed(PooledSession session) {
session.setIgnoreClose(true);
session.setIsXa(false);
}
});
session.setIgnoreClose(true);
session.setIsXa(true);
transactionManager.getTransaction().registerSynchronization(new Synchronization(session));

View File

@ -18,14 +18,13 @@ package org.apache.activemq.jms.pool;
import java.io.Serializable;
import java.util.Hashtable;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.XAConnectionFactory;
import javax.naming.Binding;
import javax.naming.Context;
import javax.naming.InitialContext;
@ -38,13 +37,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A pooled connection factory that automatically enlists
* sessions in the current active XA transaction if any.
* A pooled connection factory that automatically enlists sessions in the
* current active XA transaction if any.
*/
public class XaPooledConnectionFactory extends PooledConnectionFactory implements ObjectFactory,
Serializable, QueueConnectionFactory, TopicConnectionFactory {
public class XaPooledConnectionFactory extends PooledConnectionFactory implements ObjectFactory, Serializable, QueueConnectionFactory, TopicConnectionFactory {
private static final transient Logger LOG = LoggerFactory.getLogger(XaPooledConnectionFactory.class);
private static final long serialVersionUID = -6545688026350913005L;
private TransactionManager transactionManager;
private boolean tmFromJndi = false;
private String tmJndiName = "java:/TransactionManager";
@ -87,10 +87,10 @@ public class XaPooledConnectionFactory extends PooledConnectionFactory implement
name = name.substring(0, name.lastIndexOf('/')) + "/conf" + name.substring(name.lastIndexOf('/'));
try {
InitialContext ctx = new InitialContext();
NamingEnumeration bindings = ctx.listBindings(name);
NamingEnumeration<Binding> bindings = ctx.listBindings(name);
while (bindings.hasMore()) {
Binding bd = (Binding)bindings.next();
Binding bd = bindings.next();
IntrospectionSupport.setProperty(this, bd.getName(), bd.getObject());
}
@ -116,6 +116,7 @@ public class XaPooledConnectionFactory extends PooledConnectionFactory implement
/**
* Allow transaction manager resolution from JNDI (ee deployment)
*
* @param tmFromJndi
*/
public void setTmFromJndi(boolean tmFromJndi) {
@ -141,5 +142,4 @@ public class XaPooledConnectionFactory extends PooledConnectionFactory implement
public TopicConnection createTopicConnection(String userName, String password) throws JMSException {
return (TopicConnection) createConnection(userName, password);
}
}

View File

@ -0,0 +1,102 @@
package org.apache.activemq.jms.pool;
import java.util.concurrent.Executors;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PooledConnectionTempQueueTest {
private final Logger LOG = LoggerFactory.getLogger(PooledConnectionTempQueueTest.class);
protected static final String SERVICE_QUEUE = "queue1";
@Test
public void testTempQueueIssue() throws JMSException, InterruptedException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
final PooledConnectionFactory cf = new PooledConnectionFactory();
cf.setConnectionFactory(factory);
Connection connection = cf.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
LOG.info("First connection was {}", connection);
// This order seems to matter to reproduce the issue
connection.close();
session.close();
Executors.newSingleThreadExecutor().execute(new Runnable() {
@Override
public void run() {
try {
receiveAndRespondWithMessageIdAsCorrelationId(cf, SERVICE_QUEUE);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
sendWithReplyToTemp(cf, SERVICE_QUEUE);
}
private void sendWithReplyToTemp(ConnectionFactory cf, String serviceQueue) throws JMSException,
InterruptedException {
Connection con = cf.createConnection();
con.start();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
TemporaryQueue tempQueue = session.createTemporaryQueue();
TextMessage msg = session.createTextMessage("Request");
msg.setJMSReplyTo(tempQueue);
MessageProducer producer = session.createProducer(session.createQueue(serviceQueue));
producer.send(msg);
// This sleep also seems to matter
Thread.sleep(5000);
MessageConsumer consumer = session.createConsumer(tempQueue);
Message replyMsg = consumer.receive();
System.out.println(replyMsg.getJMSCorrelationID());
consumer.close();
producer.close();
session.close();
con.close();
}
public void receiveAndRespondWithMessageIdAsCorrelationId(ConnectionFactory connectionFactory,
String queueName) throws JMSException {
Connection con = connectionFactory.createConnection();
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(session.createQueue(queueName));
final javax.jms.Message inMessage = consumer.receive();
String requestMessageId = inMessage.getJMSMessageID();
System.out.println("Received message " + requestMessageId);
final TextMessage replyMessage = session.createTextMessage("Result");
replyMessage.setJMSCorrelationID(inMessage.getJMSMessageID());
final MessageProducer producer = session.createProducer(inMessage.getJMSReplyTo());
System.out.println("Sending reply to " + inMessage.getJMSReplyTo());
producer.send(replyMessage);
producer.close();
consumer.close();
session.close();
con.close();
}
}