AMQ-2034 move delay-session-close-in-xa-tx code to plain session so it works in managed environments. Also prevent duplicate synchronization registrations

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@732489 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
David Jencks 2009-01-07 20:57:56 +00:00
parent 07b6a38336
commit 2387612628
2 changed files with 27 additions and 22 deletions

View File

@ -139,7 +139,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* acknowledges all messages consumed by a session at when acknowledge()
* is called
*/
public static final int INDIVIDUAL_ACKNOWLEDGE=4;
public static final int INDIVIDUAL_ACKNOWLEDGE = 4;
public static interface DeliveryListener {
void beforeDelivery(ActiveMQSession session, Message msg);
@ -163,6 +163,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
protected final CopyOnWriteArrayList<ActiveMQMessageProducer> producers = new CopyOnWriteArrayList<ActiveMQMessageProducer>();
protected boolean closed;
private volatile boolean synchronizationRegistered;
protected boolean asyncDispatch;
protected boolean sessionAsyncDispatch;
protected final boolean debug;
@ -553,10 +554,33 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
*/
public void close() throws JMSException {
if (!closed) {
if (getTransacted()) {
if (!synchronizationRegistered) {
synchronizationRegistered = true;
getTransactionContext().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
doClose();
synchronizationRegistered = false;
}
public void afterRollback() throws Exception {
doClose();
synchronizationRegistered = false;
}
});
}
} else {
doClose();
}
}
}
private void doClose() throws JMSException {
dispose();
connection.asyncSendPacket(info.createRemoveCommand());
}
}
void clearMessagesInProgress() {
executor.clearMessagesInProgress();

View File

@ -27,7 +27,6 @@ import javax.jms.XATopicSession;
import javax.transaction.xa.XAResource;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.transaction.Synchronization;
/**
* The XASession interface extends the capability of Session by adding access
@ -97,24 +96,6 @@ public class ActiveMQXASession extends ActiveMQSession implements QueueSession,
return new ActiveMQTopicSession(this);
}
@Override
public void close() throws JMSException {
if (getTransactionContext().isInXATransaction()) {
getTransactionContext().addSynchronization(new Synchronization() {
public void afterCommit() throws Exception {
doClose();
}
public void afterRollback() throws Exception {
doClose();
}
});
}
}
void doClose() throws JMSException {
super.close();
}
/**
* This is called before transacted work is done by
* the session. XA Work can only be done when this