diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java index eced58821f..26995ea80d 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java @@ -25,12 +25,12 @@ import javax.jms.Connection; import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; import org.apache.commons.pool.KeyedPoolableObjectFactory; import org.apache.commons.pool.impl.GenericKeyedObjectPool; import org.apache.commons.pool.impl.GenericObjectPool; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Holds a real JMS connection along with the session pools associated with it. @@ -42,8 +42,6 @@ import org.slf4j.LoggerFactory; */ public class ConnectionPool { - private static final transient Logger LOG = LoggerFactory.getLogger(ConnectionPool.class); - protected Connection connection; private int referenceCount; private long lastUsed = System.currentTimeMillis(); @@ -54,7 +52,7 @@ public class ConnectionPool { private boolean useAnonymousProducers = true; private final AtomicBoolean started = new AtomicBoolean(false); - private final GenericKeyedObjectPool sessionPool; + private final GenericKeyedObjectPool sessionPool; private final List loanedSessions = new CopyOnWriteArrayList(); public ConnectionPool(Connection connection) { @@ -62,33 +60,29 @@ public class ConnectionPool { this.connection = wrap(connection); // Create our internal Pool of session instances. - this.sessionPool = new GenericKeyedObjectPool( - new KeyedPoolableObjectFactory() { + this.sessionPool = new GenericKeyedObjectPool( + new KeyedPoolableObjectFactory() { @Override - public void activateObject(SessionKey key, PooledSession session) throws Exception { - ConnectionPool.this.loanedSessions.add(session); + public void activateObject(SessionKey key, Session session) throws Exception { } @Override - public void destroyObject(SessionKey key, PooledSession session) throws Exception { - ConnectionPool.this.loanedSessions.remove(session); - session.getInternalSession().close(); + public void destroyObject(SessionKey key, Session session) throws Exception { + session.close(); } @Override - public PooledSession makeObject(SessionKey key) throws Exception { - Session session = makeSession(key); - return new PooledSession(key, session, sessionPool, key.isTransacted(), useAnonymousProducers); + public Session makeObject(SessionKey key) throws Exception { + return makeSession(key); } @Override - public void passivateObject(SessionKey key, PooledSession session) throws Exception { - ConnectionPool.this.loanedSessions.remove(session); + public void passivateObject(SessionKey key, Session session) throws Exception { } @Override - public boolean validateObject(SessionKey key, PooledSession session) { + public boolean validateObject(SessionKey key, Session session) { return true; } } @@ -130,7 +124,23 @@ public class ConnectionPool { SessionKey key = new SessionKey(transacted, ackMode); PooledSession session; try { - session = sessionPool.borrowObject(key); + session = new PooledSession(key, sessionPool.borrowObject(key), sessionPool, key.isTransacted(), useAnonymousProducers); + session.addSessionEventListener(new PooledSessionEventListener() { + + @Override + public void onTemporaryTopicCreate(TemporaryTopic tempTopic) { + } + + @Override + public void onTemporaryQueueCreate(TemporaryQueue tempQueue) { + } + + @Override + public void onSessionClosed(PooledSession session) { + ConnectionPool.this.loanedSessions.remove(session); + } + }); + this.loanedSessions.add(session); } catch (Exception e) { IllegalStateException illegalStateException = new IllegalStateException(e.toString()); illegalStateException.initCause(e); diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java index 1d3fc2f6c1..3a2e698b7b 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java @@ -19,6 +19,7 @@ package org.apache.activemq.jms.pool; import java.io.Serializable; import java.util.Iterator; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.BytesMessage; import javax.jms.Destination; @@ -54,10 +55,11 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class); private final SessionKey key; - private final KeyedObjectPool sessionPool; + private final KeyedObjectPool sessionPool; private final CopyOnWriteArrayList consumers = new CopyOnWriteArrayList(); private final CopyOnWriteArrayList browsers = new CopyOnWriteArrayList(); private final CopyOnWriteArrayList sessionEventListeners = new CopyOnWriteArrayList(); + private final AtomicBoolean closed = new AtomicBoolean(); private MessageProducer producer; private TopicPublisher publisher; @@ -69,7 +71,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes private boolean isXa; private boolean useAnonymousProducers = true; - public PooledSession(SessionKey key, Session session, KeyedObjectPool sessionPool, boolean transactional, boolean anonymous) { + public PooledSession(SessionKey key, Session session, KeyedObjectPool sessionPool, boolean transactional, boolean anonymous) { this.key = key; this.session = session; this.sessionPool = sessionPool; @@ -94,7 +96,11 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes @Override public void close() throws JMSException { - if (!ignoreClose) { + if (ignoreClose) { + return; + } + + if (closed.compareAndSet(false, true)) { boolean invalidate = false; try { // lets reset the session @@ -140,22 +146,23 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes } catch (JMSException e1) { LOG.trace("Ignoring exception on close as discarding session: " + e1, e1); } - session = null; } try { - sessionPool.invalidateObject(key, this); + sessionPool.invalidateObject(key, session); } catch (Exception e) { LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e); } } else { try { - sessionPool.returnObject(key, this); + sessionPool.returnObject(key, session); } catch (Exception e) { javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString()); illegalStateException.initCause(e); throw illegalStateException; } } + + session = null; } } diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java index 4f8715358f..0f86b55ca1 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaConnectionPool.java @@ -19,8 +19,6 @@ package org.apache.activemq.jms.pool; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Session; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; import javax.jms.XAConnection; import javax.transaction.RollbackException; import javax.transaction.Status; @@ -65,22 +63,6 @@ public class XaConnectionPool extends ConnectionPool { } PooledSession session = (PooledSession) super.createSession(transacted, ackMode); if (isXa) { - session.addSessionEventListener(new PooledSessionEventListener() { - - @Override - public void onTemporaryQueueCreate(TemporaryQueue tempQueue) { - } - - @Override - public void onTemporaryTopicCreate(TemporaryTopic tempTopic) { - } - - @Override - public void onSessionClosed(PooledSession session) { - session.setIgnoreClose(true); - session.setIsXa(false); - } - }); session.setIgnoreClose(true); session.setIsXa(true); transactionManager.getTransaction().registerSynchronization(new Synchronization(session)); diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java index 05675094dc..5e44be22d5 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/XaPooledConnectionFactory.java @@ -18,14 +18,13 @@ package org.apache.activemq.jms.pool; import java.io.Serializable; import java.util.Hashtable; + import javax.jms.Connection; -import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.QueueConnection; import javax.jms.QueueConnectionFactory; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; -import javax.jms.XAConnectionFactory; import javax.naming.Binding; import javax.naming.Context; import javax.naming.InitialContext; @@ -38,13 +37,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * A pooled connection factory that automatically enlists - * sessions in the current active XA transaction if any. + * A pooled connection factory that automatically enlists sessions in the + * current active XA transaction if any. */ -public class XaPooledConnectionFactory extends PooledConnectionFactory implements ObjectFactory, - Serializable, QueueConnectionFactory, TopicConnectionFactory { +public class XaPooledConnectionFactory extends PooledConnectionFactory implements ObjectFactory, Serializable, QueueConnectionFactory, TopicConnectionFactory { private static final transient Logger LOG = LoggerFactory.getLogger(XaPooledConnectionFactory.class); + private static final long serialVersionUID = -6545688026350913005L; + private TransactionManager transactionManager; private boolean tmFromJndi = false; private String tmJndiName = "java:/TransactionManager"; @@ -87,10 +87,10 @@ public class XaPooledConnectionFactory extends PooledConnectionFactory implement name = name.substring(0, name.lastIndexOf('/')) + "/conf" + name.substring(name.lastIndexOf('/')); try { InitialContext ctx = new InitialContext(); - NamingEnumeration bindings = ctx.listBindings(name); + NamingEnumeration bindings = ctx.listBindings(name); while (bindings.hasMore()) { - Binding bd = (Binding)bindings.next(); + Binding bd = bindings.next(); IntrospectionSupport.setProperty(this, bd.getName(), bd.getObject()); } @@ -116,6 +116,7 @@ public class XaPooledConnectionFactory extends PooledConnectionFactory implement /** * Allow transaction manager resolution from JNDI (ee deployment) + * * @param tmFromJndi */ public void setTmFromJndi(boolean tmFromJndi) { @@ -141,5 +142,4 @@ public class XaPooledConnectionFactory extends PooledConnectionFactory implement public TopicConnection createTopicConnection(String userName, String password) throws JMSException { return (TopicConnection) createConnection(userName, password); } - } diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTempQueueTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTempQueueTest.java new file mode 100644 index 0000000000..fc0fbf4084 --- /dev/null +++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledConnectionTempQueueTest.java @@ -0,0 +1,102 @@ +package org.apache.activemq.jms.pool; + +import java.util.concurrent.Executors; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PooledConnectionTempQueueTest { + + private final Logger LOG = LoggerFactory.getLogger(PooledConnectionTempQueueTest.class); + + protected static final String SERVICE_QUEUE = "queue1"; + + @Test + public void testTempQueueIssue() throws JMSException, InterruptedException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + final PooledConnectionFactory cf = new PooledConnectionFactory(); + cf.setConnectionFactory(factory); + + Connection connection = cf.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + LOG.info("First connection was {}", connection); + + // This order seems to matter to reproduce the issue + connection.close(); + session.close(); + + Executors.newSingleThreadExecutor().execute(new Runnable() { + @Override + public void run() { + try { + receiveAndRespondWithMessageIdAsCorrelationId(cf, SERVICE_QUEUE); + } catch (JMSException e) { + e.printStackTrace(); + } + } + }); + + sendWithReplyToTemp(cf, SERVICE_QUEUE); + } + + private void sendWithReplyToTemp(ConnectionFactory cf, String serviceQueue) throws JMSException, + InterruptedException { + Connection con = cf.createConnection(); + con.start(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue tempQueue = session.createTemporaryQueue(); + TextMessage msg = session.createTextMessage("Request"); + msg.setJMSReplyTo(tempQueue); + MessageProducer producer = session.createProducer(session.createQueue(serviceQueue)); + producer.send(msg); + + // This sleep also seems to matter + Thread.sleep(5000); + + MessageConsumer consumer = session.createConsumer(tempQueue); + Message replyMsg = consumer.receive(); + System.out.println(replyMsg.getJMSCorrelationID()); + + consumer.close(); + + producer.close(); + session.close(); + con.close(); + } + + public void receiveAndRespondWithMessageIdAsCorrelationId(ConnectionFactory connectionFactory, + String queueName) throws JMSException { + Connection con = connectionFactory.createConnection(); + Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue(queueName)); + final javax.jms.Message inMessage = consumer.receive(); + + String requestMessageId = inMessage.getJMSMessageID(); + System.out.println("Received message " + requestMessageId); + final TextMessage replyMessage = session.createTextMessage("Result"); + replyMessage.setJMSCorrelationID(inMessage.getJMSMessageID()); + final MessageProducer producer = session.createProducer(inMessage.getJMSReplyTo()); + System.out.println("Sending reply to " + inMessage.getJMSReplyTo()); + producer.send(replyMessage); + + producer.close(); + consumer.close(); + session.close(); + con.close(); + } + +}