Temporary fix until the SPIs can be reworked. Remove the transaction observer on Session close and added some checks to ensure the session is still open when the listeners fire.

This commit is contained in:
Shawn Clowater 2012-04-30 13:06:08 -06:00 committed by Steve Ebersole
parent 779e70df4d
commit 287c0eba88
4 changed files with 51 additions and 22 deletions

View File

@ -284,6 +284,11 @@ public class TransactionCoordinatorImpl implements TransactionCoordinator {
observers.add( observer ); observers.add( observer );
} }
@Override
public void removeObserver(TransactionObserver observer) {
observers.remove( observer );
}
@Override @Override
@SuppressWarnings( {"unchecked"}) @SuppressWarnings( {"unchecked"})
public boolean isTransactionJoinable() { public boolean isTransactionJoinable() {
@ -329,7 +334,7 @@ public class TransactionCoordinatorImpl implements TransactionCoordinator {
@Override @Override
public void sendAfterTransactionCompletionNotifications(TransactionImplementor hibernateTransaction, int status) { public void sendAfterTransactionCompletionNotifications(TransactionImplementor hibernateTransaction, int status) {
final boolean successful = JtaStatusHelper.isCommitted( status ); final boolean successful = JtaStatusHelper.isCommitted( status );
for ( TransactionObserver observer : observers ) { for ( TransactionObserver observer : new ArrayList<TransactionObserver>( observers ) ) {
observer.afterCompletion( successful, hibernateTransaction ); observer.afterCompletion( successful, hibernateTransaction );
} }
synchronizationRegistry.notifySynchronizationsAfterTransactionCompletion( status ); synchronizationRegistry.notifySynchronizationsAfterTransactionCompletion( status );

View File

@ -73,6 +73,13 @@ public interface TransactionCoordinator extends Serializable {
*/ */
public void addObserver(TransactionObserver observer); public void addObserver(TransactionObserver observer);
/**
* Removed an observer from the coordinator.
*
* @param observer The observer to remove.
*/
public void removeObserver(TransactionObserver observer);
/** /**
* Can we join to the underlying transaction? * Can we join to the underlying transaction?
* *

View File

@ -200,6 +200,7 @@ public final class SessionImpl extends AbstractSessionImpl implements EventSourc
private transient LoadQueryInfluencers loadQueryInfluencers; private transient LoadQueryInfluencers loadQueryInfluencers;
private final transient boolean isTransactionCoordinatorShared; private final transient boolean isTransactionCoordinatorShared;
private transient TransactionObserver transactionObserver;
/** /**
* Constructor used for openSession(...) processing, as well as construction * Constructor used for openSession(...) processing, as well as construction
@ -270,27 +271,42 @@ public final class SessionImpl extends AbstractSessionImpl implements EventSourc
// add a transaction observer so that we can handle delegating managed actions back to THIS session // add a transaction observer so that we can handle delegating managed actions back to THIS session
// versus the session that created (and therefore "owns") the transaction coordinator // versus the session that created (and therefore "owns") the transaction coordinator
transactionCoordinator.addObserver( transactionObserver = new TransactionObserver() {
new TransactionObserver() {
@Override @Override
public void afterBegin(TransactionImplementor transaction) { public void afterBegin(TransactionImplementor transaction) {
} }
@Override @Override
public void beforeCompletion(TransactionImplementor transaction) { public void beforeCompletion(TransactionImplementor transaction) {
if ( SessionImpl.this.flushBeforeCompletionEnabled ) { if ( isOpen() ) {
if ( flushBeforeCompletionEnabled ){
SessionImpl.this.managedFlush(); SessionImpl.this.managedFlush();
} }
getActionQueue().beforeTransactionCompletion();
}
else {
if (actionQueue.hasAfterTransactionActions()){
LOG.log( Logger.Level.DEBUG, "Session had after transaction actions that were not processed");
}
}
} }
@Override @Override
public void afterCompletion(boolean successful, TransactionImplementor transaction) { public void afterCompletion(boolean successful, TransactionImplementor transaction) {
if ( SessionImpl.this.autoCloseSessionEnabled ) { if ( isOpen() ) {
SessionImpl.this.managedClose(); getActionQueue().afterTransactionCompletion( successful );
if ( autoCloseSessionEnabled ) {
managedClose();
}
}
else {
if (actionQueue.hasAfterTransactionActions()){
LOG.log( Logger.Level.DEBUG, "Session had after transaction actions that were not processed");
} }
} }
} }
); };
transactionCoordinator.addObserver( transactionObserver );
} }
loadQueryInfluencers = new LoadQueryInfluencers( factory ); loadQueryInfluencers = new LoadQueryInfluencers( factory );
@ -335,7 +351,11 @@ public final class SessionImpl extends AbstractSessionImpl implements EventSourc
return transactionCoordinator.close(); return transactionCoordinator.close();
} }
else { else {
return null; // ??? if ( !getActionQueue().hasAfterTransactionActions() ){
//remove the session as a transaction observer if it has no after transaction completion actions
transactionCoordinator.removeObserver( transactionObserver );
}
return null;
} }
} }
finally { finally {

View File

@ -191,7 +191,6 @@ public class SessionWithSharedConnectionTest extends BaseCoreFunctionalTestCase
@Test @Test
@TestForIssue( jiraKey = "HHH-7239" ) @TestForIssue( jiraKey = "HHH-7239" )
@FailureExpected(jiraKey = "HHH-7239" )
public void testSessionRemovedFromObserversOnClose() throws Exception { public void testSessionRemovedFromObserversOnClose() throws Exception {
Session session = sessionFactory().openSession(); Session session = sessionFactory().openSession();
session.getTransaction().begin(); session.getTransaction().begin();
@ -235,7 +234,6 @@ public class SessionWithSharedConnectionTest extends BaseCoreFunctionalTestCase
@Test @Test
@TestForIssue( jiraKey = "HHH-7239" ) @TestForIssue( jiraKey = "HHH-7239" )
@FailureExpected(jiraKey = "HHH-7239" )
public void testChildSessionCallsAfterTransactionAction() throws Exception { public void testChildSessionCallsAfterTransactionAction() throws Exception {
Session session = openSession(); Session session = openSession();
@ -276,7 +274,6 @@ public class SessionWithSharedConnectionTest extends BaseCoreFunctionalTestCase
@Test @Test
@TestForIssue( jiraKey = "HHH-7239" ) @TestForIssue( jiraKey = "HHH-7239" )
@FailureExpected(jiraKey = "HHH-7239" )
public void testChildSessionTwoTransactions() throws Exception { public void testChildSessionTwoTransactions() throws Exception {
Session session = openSession(); Session session = openSession();