Set all session properties back using a session listener close event so
that the returned session is not modified after it's already been
re-dispatched from the pool.
This commit is contained in:
Timothy Bish 2014-05-29 10:33:09 -04:00
parent f2653e6936
commit 9f78f82378
1 changed files with 24 additions and 8 deletions

View File

@ -19,6 +19,8 @@ package org.apache.activemq.jms.pool;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.XAConnection; import javax.jms.XAConnection;
import javax.transaction.RollbackException; import javax.transaction.RollbackException;
import javax.transaction.Status; import javax.transaction.Status;
@ -27,10 +29,9 @@ import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource; import javax.transaction.xa.XAResource;
/** /**
* An XA-aware connection pool. When a session is created and an xa transaction is active, * An XA-aware connection pool. When a session is created and an xa transaction
* the session will automatically be enlisted in the current transaction. * is active, the session will automatically be enlisted in the current
* * transaction.
* @author gnodet
*/ */
public class XaConnectionPool extends ConnectionPool { public class XaConnectionPool extends ConnectionPool {
@ -51,7 +52,8 @@ public class XaConnectionPool extends ConnectionPool {
try { try {
boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION); boolean isXa = (transactionManager != null && transactionManager.getStatus() != Status.STATUS_NO_TRANSACTION);
if (isXa) { if (isXa) {
// if the xa tx aborts inflight we don't want to auto create a local transaction or auto ack // if the xa tx aborts inflight we don't want to auto create a
// local transaction or auto ack
transacted = false; transacted = false;
ackMode = Session.CLIENT_ACKNOWLEDGE; ackMode = Session.CLIENT_ACKNOWLEDGE;
} else if (transactionManager != null) { } else if (transactionManager != null) {
@ -63,6 +65,22 @@ public class XaConnectionPool extends ConnectionPool {
} }
PooledSession session = (PooledSession) super.createSession(transacted, ackMode); PooledSession session = (PooledSession) super.createSession(transacted, ackMode);
if (isXa) { if (isXa) {
session.addSessionEventListener(new PooledSessionEventListener() {
@Override
public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
}
@Override
public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
}
@Override
public void onSessionClosed(PooledSession session) {
session.setIgnoreClose(true);
session.setIsXa(false);
}
});
session.setIgnoreClose(true); session.setIgnoreClose(true);
session.setIsXa(true); session.setIsXa(true);
transactionManager.getTransaction().registerSynchronization(new Synchronization(session)); transactionManager.getTransaction().registerSynchronization(new Synchronization(session));
@ -104,8 +122,6 @@ public class XaConnectionPool extends ConnectionPool {
// This will return session to the pool. // This will return session to the pool.
session.setIgnoreClose(false); session.setIgnoreClose(false);
session.close(); session.close();
session.setIgnoreClose(true);
session.setIsXa(false);
decrementReferenceCount(); decrementReferenceCount();
} catch (JMSException e) { } catch (JMSException e) {
throw new RuntimeException(e); throw new RuntimeException(e);