From 238761262881afcf034a9f649625908cbad8b42b Mon Sep 17 00:00:00 2001 From: David Jencks Date: Wed, 7 Jan 2009 20:57:56 +0000 Subject: [PATCH] AMQ-2034 move delay-session-close-in-xa-tx code to plain session so it works in managed environments. Also prevent duplicate synchronization registrations git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@732489 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/activemq/ActiveMQSession.java | 30 +++++++++++++++++-- .../apache/activemq/ActiveMQXASession.java | 19 ------------ 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index 186c2f82d4..dea1d241a0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -139,7 +139,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * acknowledges all messages consumed by a session at when acknowledge() * is called */ - public static final int INDIVIDUAL_ACKNOWLEDGE=4; + public static final int INDIVIDUAL_ACKNOWLEDGE = 4; public static interface DeliveryListener { void beforeDelivery(ActiveMQSession session, Message msg); @@ -163,6 +163,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta protected final CopyOnWriteArrayList producers = new CopyOnWriteArrayList(); protected boolean closed; + private volatile boolean synchronizationRegistered; protected boolean asyncDispatch; protected boolean sessionAsyncDispatch; protected final boolean debug; @@ -553,11 +554,34 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta */ public void close() throws JMSException { if (!closed) { - dispose(); - connection.asyncSendPacket(info.createRemoveCommand()); + if (getTransacted()) { + if (!synchronizationRegistered) { + synchronizationRegistered = true; + getTransactionContext().addSynchronization(new Synchronization() { + + public void afterCommit() throws Exception { + doClose(); + synchronizationRegistered = false; + } + + public void afterRollback() throws Exception { + doClose(); + synchronizationRegistered = false; + } + }); + } + + } else { + doClose(); + } } } + private void doClose() throws JMSException { + dispose(); + connection.asyncSendPacket(info.createRemoveCommand()); + } + void clearMessagesInProgress() { executor.clearMessagesInProgress(); for (Iterator iter = consumers.iterator(); iter.hasNext();) { diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java index ed8e541e6c..c4ba17dbde 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQXASession.java @@ -27,7 +27,6 @@ import javax.jms.XATopicSession; import javax.transaction.xa.XAResource; import org.apache.activemq.command.SessionId; -import org.apache.activemq.transaction.Synchronization; /** * The XASession interface extends the capability of Session by adding access @@ -97,24 +96,6 @@ public class ActiveMQXASession extends ActiveMQSession implements QueueSession, return new ActiveMQTopicSession(this); } - @Override - public void close() throws JMSException { - if (getTransactionContext().isInXATransaction()) { - getTransactionContext().addSynchronization(new Synchronization() { - public void afterCommit() throws Exception { - doClose(); - } - - public void afterRollback() throws Exception { - doClose(); - } - }); - } - } - - void doClose() throws JMSException { - super.close(); - } /** * This is called before transacted work is done by * the session. XA Work can only be done when this