From f395c706089bc0e38b2af133fb7bcfbc8d3010f0 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Thu, 12 Jun 2014 19:12:56 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5015 Refactor the way sessions are pooled. We don't need to keep the PooledSession instances around since the state is unique to the session it wraps we only need to keep the Session instances in the SessionPool and create a new PooledSession on borrow to manage that session. This allows the PooledSession to have a real closed state that protects against multiple close calls placing duplicate PooledSession instances into the SessionPool. This also simplifies the code in the XaConnectionPool since it doesn't need to try and reset state in PouledSessions before placing them back as it gets a fresh wrapper each time with the correct state. --- .../activemq/jms/pool/ConnectionPool.java | 48 +++++---- .../activemq/jms/pool/PooledSession.java | 19 ++-- .../activemq/jms/pool/XaConnectionPool.java | 18 ---- .../jms/pool/XaPooledConnectionFactory.java | 18 ++-- .../pool/PooledConnectionTempQueueTest.java | 102 ++++++++++++++++++ 5 files changed, 153 insertions(+), 52 deletions(-) create mode 100644 activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTempQueueTest.java diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java index eced58821f..26995ea80d 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java @@ -25,12 +25,12 @@ import javax.jms.Connection; import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; import org.apache.commons.pool.KeyedPoolableObjectFactory; import org.apache.commons.pool.impl.GenericKeyedObjectPool; import org.apache.commons.pool.impl.GenericObjectPool; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Holds a real JMS connection along with the session pools associated with it. @@ -42,8 +42,6 @@ import org.slf4j.LoggerFactory; */ public class ConnectionPool { - private static final transient Logger LOG = LoggerFactory.getLogger(ConnectionPool.class); - protected Connection connection; private int referenceCount; private long lastUsed = System.currentTimeMillis(); @@ -54,7 +52,7 @@ public class ConnectionPool { private boolean useAnonymousProducers = true; private final AtomicBoolean started = new AtomicBoolean(false); - private final GenericKeyedObjectPool sessionPool; + private final GenericKeyedObjectPool sessionPool; private final List loanedSessions = new CopyOnWriteArrayList(); public ConnectionPool(Connection connection) { @@ -62,33 +60,29 @@ public class ConnectionPool { this.connection = wrap(connection); // Create our internal Pool of session instances. - this.sessionPool = new GenericKeyedObjectPool( - new KeyedPoolableObjectFactory() { + this.sessionPool = new GenericKeyedObjectPool( + new KeyedPoolableObjectFactory() { @Override - public void activateObject(SessionKey key, PooledSession session) throws Exception { - ConnectionPool.this.loanedSessions.add(session); + public void activateObject(SessionKey key, Session session) throws Exception { } @Override - public void destroyObject(SessionKey key, PooledSession session) throws Exception { - ConnectionPool.this.loanedSessions.remove(session); - session.getInternalSession().close(); + public void destroyObject(SessionKey key, Session session) throws Exception { + session.close(); } @Override - public PooledSession makeObject(SessionKey key) throws Exception { - Session session = makeSession(key); - return new PooledSession(key, session, sessionPool, key.isTransacted(), useAnonymousProducers); + public Session makeObject(SessionKey key) throws Exception { + return makeSession(key); } @Override - public void passivateObject(SessionKey key, PooledSession session) throws Exception { - ConnectionPool.this.loanedSessions.remove(session); + public void passivateObject(SessionKey key, Session session) throws Exception { } @Override - public boolean validateObject(SessionKey key, PooledSession session) { + public boolean validateObject(SessionKey key, Session session) { return true; } } @@ -130,7 +124,23 @@ public class ConnectionPool { SessionKey key = new SessionKey(transacted, ackMode); PooledSession session; try { - session = sessionPool.borrowObject(key); + session = new PooledSession(key, sessionPool.borrowObject(key), sessionPool, key.isTransacted(), useAnonymousProducers); + session.addSessionEventListener(new PooledSessionEventListener() { + + @Override + public void onTemporaryTopicCreate(TemporaryTopic tempTopic) { + } + + @Override + public void onTemporaryQueueCreate(TemporaryQueue tempQueue) { + } + + @Override + public void onSessionClosed(PooledSession session) { + ConnectionPool.this.loanedSessions.remove(session); + } + }); + this.loanedSessions.add(session); } catch (Exception e) { IllegalStateException illegalStateException = new IllegalStateException(e.toString()); illegalStateException.initCause(e); diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java index 1d3fc2f6c1..3a2e698b7b 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java @@ -19,6 +19,7 @@ package org.apache.activemq.jms.pool; import java.io.Serializable; import java.util.Iterator; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.BytesMessage; import javax.jms.Destination; @@ -54,10 +55,11 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class); private final SessionKey key; - private final KeyedObjectPool sessionPool; + private final KeyedObjectPool sessionPool; private final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList(); private final CopyOnWriteArrayList browsers = new CopyOnWriteArrayList(); private final CopyOnWriteArrayList sessionEventListeners = new CopyOnWriteArrayList(); + private final AtomicBoolean closed = new AtomicBoolean(); private MessageProducer producer; private TopicPublisher publisher; @@ -69,7 +71,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes private boolean isXa; private boolean useAnonymousProducers = true; - public PooledSession(SessionKey key, Session session, KeyedObjectPool sessionPool, boolean transactional, boolean anonymous) { + public PooledSession(SessionKey key, Session session, KeyedObjectPool sessionPool, boolean transactional, boolean anonymous) { this.key = key; this.session = session; this.sessionPool = sessionPool; @@ -94,7 +96,11 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes @Override public void close() throws JMSException { - if (!ignoreClose) { + if (ignoreClose) { + return; + } + + if (closed.compareAndSet(false, true)) { boolean invalidate = false; try { // lets reset the session @@ -140,22 +146,23 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes } catch (JMSException e1) { LOG.trace("Ignoring exception on close as discarding session: " + e1, e1); } - session = null; } try { - sessionPool.invalidateObject(key, this); + sessionPool.invalidateObject(key, session); } catch (Exception e) { LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e); } } else { try { - sessionPool.returnObject(key, this); + sessionPool.returnObject(key, session); } catch (Exception e) { javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString()); illegalStateException.initCause(e); throw illegalStateException; } } + + session = null; } } diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java index 4f8715358f..0f86b55ca1 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java @@ -19,8 +19,6 @@ package org.apache.activemq.jms.pool; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Session; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; import javax.jms.XAConnection; import javax.transaction.RollbackException; import javax.transaction.Status; @@ -65,22 +63,6 @@ public class XaConnectionPool extends ConnectionPool { } PooledSession session = (PooledSession) super.createSession(transacted, ackMode); if (isXa) { - session.addSessionEventListener(new PooledSessionEventListener() { - - @Override - public void onTemporaryQueueCreate(TemporaryQueue tempQueue) { - } - - @Override - public void onTemporaryTopicCreate(TemporaryTopic tempTopic) { - } - - @Override - public void onSessionClosed(PooledSession session) { - session.setIgnoreClose(true); - session.setIsXa(false); - } - }); session.setIgnoreClose(true); session.setIsXa(true); transactionManager.getTransaction().registerSynchronization(new Synchronization(session)); diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java index 05675094dc..5e44be22d5 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java @@ -18,14 +18,13 @@ package org.apache.activemq.jms.pool; import java.io.Serializable; import java.util.Hashtable; + import javax.jms.Connection; -import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; -import javax.jms.XAConnectionFactory; import javax.naming.Binding; import javax.naming.Context; import javax.naming.InitialContext; @@ -38,13 +37,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A pooled connection factory that automatically enlists - * sessions in the current active XA transaction if any. + * A pooled connection factory that automatically enlists sessions in the + * current active XA transaction if any. */ -public class XaPooledConnectionFactory extends PooledConnectionFactory implements ObjectFactory, - Serializable, QueueConnectionFactory, TopicConnectionFactory { +public class XaPooledConnectionFactory extends PooledConnectionFactory implements ObjectFactory, Serializable, QueueConnectionFactory, TopicConnectionFactory { private static final transient Logger LOG = LoggerFactory.getLogger(XaPooledConnectionFactory.class); + private static final long serialVersionUID = -6545688026350913005L; + private TransactionManager transactionManager; private boolean tmFromJndi = false; private String tmJndiName = "java:/TransactionManager"; @@ -87,10 +87,10 @@ public class XaPooledConnectionFactory extends PooledConnectionFactory implement name = name.substring(0, name.lastIndexOf('/')) + "/conf" + name.substring(name.lastIndexOf('/')); try { InitialContext ctx = new InitialContext(); - NamingEnumeration bindings = ctx.listBindings(name); + NamingEnumeration bindings = ctx.listBindings(name); while (bindings.hasMore()) { - Binding bd = (Binding)bindings.next(); + Binding bd = bindings.next(); IntrospectionSupport.setProperty(this, bd.getName(), bd.getObject()); } @@ -116,6 +116,7 @@ public class XaPooledConnectionFactory extends PooledConnectionFactory implement /** * Allow transaction manager resolution from JNDI (ee deployment) + * * @param tmFromJndi */ public void setTmFromJndi(boolean tmFromJndi) { @@ -141,5 +142,4 @@ public class XaPooledConnectionFactory extends PooledConnectionFactory implement public TopicConnection createTopicConnection(String userName, String password) throws JMSException { return (TopicConnection) createConnection(userName, password); } - } diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTempQueueTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTempQueueTest.java new file mode 100644 index 0000000000..fc0fbf4084 --- /dev/null +++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTempQueueTest.java @@ -0,0 +1,102 @@ +package org.apache.activemq.jms.pool; + +import java.util.concurrent.Executors; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PooledConnectionTempQueueTest { + + private final Logger LOG = LoggerFactory.getLogger(PooledConnectionTempQueueTest.class); + + protected static final String SERVICE_QUEUE = "queue1"; + + @Test + public void testTempQueueIssue() throws JMSException, InterruptedException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + final PooledConnectionFactory cf = new PooledConnectionFactory(); + cf.setConnectionFactory(factory); + + Connection connection = cf.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + LOG.info("First connection was {}", connection); + + // This order seems to matter to reproduce the issue + connection.close(); + session.close(); + + Executors.newSingleThreadExecutor().execute(new Runnable() { + @Override + public void run() { + try { + receiveAndRespondWithMessageIdAsCorrelationId(cf, SERVICE_QUEUE); + } catch (JMSException e) { + e.printStackTrace(); + } + } + }); + + sendWithReplyToTemp(cf, SERVICE_QUEUE); + } + + private void sendWithReplyToTemp(ConnectionFactory cf, String serviceQueue) throws JMSException, + InterruptedException { + Connection con = cf.createConnection(); + con.start(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue tempQueue = session.createTemporaryQueue(); + TextMessage msg = session.createTextMessage("Request"); + msg.setJMSReplyTo(tempQueue); + MessageProducer producer = session.createProducer(session.createQueue(serviceQueue)); + producer.send(msg); + + // This sleep also seems to matter + Thread.sleep(5000); + + MessageConsumer consumer = session.createConsumer(tempQueue); + Message replyMsg = consumer.receive(); + System.out.println(replyMsg.getJMSCorrelationID()); + + consumer.close(); + + producer.close(); + session.close(); + con.close(); + } + + public void receiveAndRespondWithMessageIdAsCorrelationId(ConnectionFactory connectionFactory, + String queueName) throws JMSException { + Connection con = connectionFactory.createConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue(queueName)); + final javax.jms.Message inMessage = consumer.receive(); + + String requestMessageId = inMessage.getJMSMessageID(); + System.out.println("Received message " + requestMessageId); + final TextMessage replyMessage = session.createTextMessage("Result"); + replyMessage.setJMSCorrelationID(inMessage.getJMSMessageID()); + final MessageProducer producer = session.createProducer(inMessage.getJMSReplyTo()); + System.out.println("Sending reply to " + inMessage.getJMSReplyTo()); + producer.send(replyMessage); + + producer.close(); + consumer.close(); + session.close(); + con.close(); + } + +}