diff --git a/hibernate-core/src/main/java/org/hibernate/engine/transaction/internal/TransactionCoordinatorImpl.java b/hibernate-core/src/main/java/org/hibernate/engine/transaction/internal/TransactionCoordinatorImpl.java index 03bc4e6c51..896ef34caa 100644 --- a/hibernate-core/src/main/java/org/hibernate/engine/transaction/internal/TransactionCoordinatorImpl.java +++ b/hibernate-core/src/main/java/org/hibernate/engine/transaction/internal/TransactionCoordinatorImpl.java @@ -47,7 +47,8 @@ import org.hibernate.engine.transaction.spi.TransactionImplementor; import org.hibernate.engine.transaction.spi.TransactionObserver; import org.hibernate.engine.transaction.synchronization.internal.RegisteredSynchronization; -import org.hibernate.engine.transaction.synchronization.internal.SynchronizationCallbackCoordinatorImpl; +import org.hibernate.engine.transaction.synchronization.internal.SynchronizationCallbackCoordinatorNonTrackingImpl; +import org.hibernate.engine.transaction.synchronization.internal.SynchronizationCallbackCoordinatorTrackingImpl; import org.hibernate.engine.transaction.synchronization.spi.SynchronizationCallbackCoordinator; import org.hibernate.internal.CoreMessageLogger; import org.hibernate.internal.util.collections.CollectionHelper; @@ -74,7 +75,7 @@ public class TransactionCoordinatorImpl implements TransactionCoordinator { private transient TransactionImplementor currentHibernateTransaction; - private transient SynchronizationCallbackCoordinatorImpl callbackCoordinator; + private transient SynchronizationCallbackCoordinator callbackCoordinator; private transient boolean open = true; private transient boolean synchronizationRegistered; @@ -261,7 +262,9 @@ private void attemptToRegisterJtaSync() { @Override public SynchronizationCallbackCoordinator getSynchronizationCallbackCoordinator() { if ( callbackCoordinator == null ) { - callbackCoordinator = new SynchronizationCallbackCoordinatorImpl( this ); + callbackCoordinator = transactionEnvironment.getSessionFactory().getSettings().isJtaTrackByThread() + ? new SynchronizationCallbackCoordinatorTrackingImpl( this ) + : new SynchronizationCallbackCoordinatorNonTrackingImpl( this ); } return callbackCoordinator; } diff --git a/hibernate-core/src/main/java/org/hibernate/engine/transaction/synchronization/internal/SynchronizationCallbackCoordinatorImpl.java b/hibernate-core/src/main/java/org/hibernate/engine/transaction/synchronization/internal/SynchronizationCallbackCoordinatorNonTrackingImpl.java similarity index 73% rename from hibernate-core/src/main/java/org/hibernate/engine/transaction/synchronization/internal/SynchronizationCallbackCoordinatorImpl.java rename to hibernate-core/src/main/java/org/hibernate/engine/transaction/synchronization/internal/SynchronizationCallbackCoordinatorNonTrackingImpl.java index 4912484078..6467fe8120 100644 --- a/hibernate-core/src/main/java/org/hibernate/engine/transaction/synchronization/internal/SynchronizationCallbackCoordinatorImpl.java +++ b/hibernate-core/src/main/java/org/hibernate/engine/transaction/synchronization/internal/SynchronizationCallbackCoordinatorNonTrackingImpl.java @@ -1,7 +1,7 @@ /* * Hibernate, Relational Persistence for Idiomatic Java * - * Copyright (c) 2011, Red Hat Inc. or third-party contributors as + * Copyright (c) 2013, Red Hat Inc. or third-party contributors as * indicated by the @author tags or express copyright attribution * statements applied by the authors. All third-party contributions are * distributed under license by Red Hat Inc. @@ -25,9 +25,9 @@ import javax.transaction.SystemException; -import org.hibernate.HibernateException; +import org.jboss.logging.Logger; + import org.hibernate.TransactionException; -import org.hibernate.cfg.Settings; import org.hibernate.engine.transaction.internal.jta.JtaStatusHelper; import org.hibernate.engine.transaction.spi.TransactionContext; import org.hibernate.engine.transaction.spi.TransactionCoordinator; @@ -35,35 +35,27 @@ import org.hibernate.engine.transaction.synchronization.spi.ExceptionMapper; import org.hibernate.engine.transaction.synchronization.spi.ManagedFlushChecker; import org.hibernate.engine.transaction.synchronization.spi.SynchronizationCallbackCoordinator; +import org.hibernate.internal.CoreLogging; import org.hibernate.internal.CoreMessageLogger; -import org.jboss.logging.Logger; /** * Manages callbacks from the {@link javax.transaction.Synchronization} registered by Hibernate. * * @author Steve Ebersole - * @author Brett Meyer */ -public class SynchronizationCallbackCoordinatorImpl implements SynchronizationCallbackCoordinator { - - private static final CoreMessageLogger LOG = Logger.getMessageLogger( CoreMessageLogger.class, - SynchronizationCallbackCoordinatorImpl.class.getName() ); +public class SynchronizationCallbackCoordinatorNonTrackingImpl implements SynchronizationCallbackCoordinator { + private static final CoreMessageLogger LOG = CoreLogging.messageLogger( + SynchronizationCallbackCoordinatorNonTrackingImpl.class + ); private final TransactionCoordinator transactionCoordinator; - private final Settings settings; private ManagedFlushChecker managedFlushChecker; private AfterCompletionAction afterCompletionAction; private ExceptionMapper exceptionMapper; - private volatile long registrationThreadId; - private final int NO_STATUS = -1; - private volatile int delayedCompletionHandlingStatus; - - public SynchronizationCallbackCoordinatorImpl(TransactionCoordinator transactionCoordinator) { + public SynchronizationCallbackCoordinatorNonTrackingImpl(TransactionCoordinator transactionCoordinator) { this.transactionCoordinator = transactionCoordinator; - this.settings = transactionCoordinator.getTransactionContext() - .getTransactionEnvironment().getSessionFactory().getSettings(); reset(); pulse(); } @@ -72,7 +64,10 @@ public void reset() { managedFlushChecker = STANDARD_MANAGED_FLUSH_CHECKER; exceptionMapper = STANDARD_EXCEPTION_MAPPER; afterCompletionAction = STANDARD_AFTER_COMPLETION_ACTION; - delayedCompletionHandlingStatus = NO_STATUS; + } + + private TransactionContext transactionContext() { + return transactionCoordinator.getTransactionContext(); } @Override @@ -92,6 +87,7 @@ public void setAfterCompletionAction(AfterCompletionAction afterCompletionAction // sync callbacks ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + @Override public void beforeCompletion() { LOG.trace( "Transaction before completion callback" ); @@ -131,38 +127,12 @@ private void setRollbackOnly() { transactionCoordinator.setRollbackOnly(); } + @Override public void afterCompletion(int status) { - if ( settings.isJtaTrackByThread() && !isRegistrationThread() - && JtaStatusHelper.isRollback( status ) ) { - // The transaction was rolled back by another thread -- not the - // original application. Examples of this include a JTA transaction - // timeout getting cleaned up by a reaper thread. If this happens, - // afterCompletion must be handled by the original thread in order - // to prevent non-threadsafe session use. Set the flag here and - // check for it in SessionImpl. See HHH-7910. - LOG.warnv( "Transaction afterCompletion called by a background thread! Delaying action until the original thread can handle it. [status={0}]", status ); - delayedCompletionHandlingStatus = status; - } - else { - doAfterCompletion( status ); - } - } - - public void pulse() { - if ( settings.isJtaTrackByThread() ) { - registrationThreadId = Thread.currentThread().getId(); - } + doAfterCompletion( status ); } - public void delayedAfterCompletion() { - if ( delayedCompletionHandlingStatus != NO_STATUS ) { - doAfterCompletion( delayedCompletionHandlingStatus ); - delayedCompletionHandlingStatus = NO_STATUS; - throw new HibernateException("Transaction was rolled back in a different thread!"); - } - } - - private void doAfterCompletion(int status) { + protected void doAfterCompletion(int status) { LOG.tracef( "Starting transaction afterCompletion callback [status=%s]", status ); if ( !transactionCoordinator.isActive() ) { return; @@ -181,12 +151,12 @@ private void doAfterCompletion(int status) { } } - private boolean isRegistrationThread() { - return Thread.currentThread().getId() == registrationThreadId; + @Override + public void pulse() { } - private TransactionContext transactionContext() { - return transactionCoordinator.getTransactionContext(); + @Override + public void processAnyDelayedAfterCompletion() { } private static final ManagedFlushChecker STANDARD_MANAGED_FLUSH_CHECKER = new ManagedFlushChecker() { @@ -200,12 +170,14 @@ public boolean shouldDoManagedFlush(TransactionCoordinator coordinator, int jtaS }; private static final ExceptionMapper STANDARD_EXCEPTION_MAPPER = new ExceptionMapper() { + @Override public RuntimeException mapStatusCheckFailure(String message, SystemException systemException) { LOG.error( LOG.unableToDetermineTransactionStatus(), systemException ); return new TransactionException( "could not determine transaction status in beforeCompletion()", systemException ); } + @Override public RuntimeException mapManagedFlushFailure(String message, RuntimeException failure) { LOG.unableToPerformManagedFlush( failure.getMessage() ); return failure; diff --git a/hibernate-core/src/main/java/org/hibernate/engine/transaction/synchronization/internal/SynchronizationCallbackCoordinatorTrackingImpl.java b/hibernate-core/src/main/java/org/hibernate/engine/transaction/synchronization/internal/SynchronizationCallbackCoordinatorTrackingImpl.java new file mode 100644 index 0000000000..306361b054 --- /dev/null +++ b/hibernate-core/src/main/java/org/hibernate/engine/transaction/synchronization/internal/SynchronizationCallbackCoordinatorTrackingImpl.java @@ -0,0 +1,110 @@ +/* + * Hibernate, Relational Persistence for Idiomatic Java + * + * Copyright (c) 2011, Red Hat Inc. or third-party contributors as + * indicated by the @author tags or express copyright attribution + * statements applied by the authors. All third-party contributions are + * distributed under license by Red Hat Inc. + * + * This copyrighted material is made available to anyone wishing to use, modify, + * copy, or redistribute it subject to the terms and conditions of the GNU + * Lesser General Public License, as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this distribution; if not, write to: + * Free Software Foundation, Inc. + * 51 Franklin Street, Fifth Floor + * Boston, MA 02110-1301 USA + */ +package org.hibernate.engine.transaction.synchronization.internal; + +import org.hibernate.HibernateException; +import org.hibernate.engine.transaction.internal.jta.JtaStatusHelper; +import org.hibernate.engine.transaction.spi.TransactionCoordinator; +import org.hibernate.internal.CoreLogging; +import org.hibernate.internal.CoreMessageLogger; + +/** + * Extension of SynchronizationCallbackCoordinatorNonTrackingImpl that adds checking of whether a rollback comes from + * a thread other than the application thread (thread used to register the Synchronization) + * + * @author Steve Ebersole + * @author Brett Meyer + */ +public class SynchronizationCallbackCoordinatorTrackingImpl extends SynchronizationCallbackCoordinatorNonTrackingImpl { + private static final CoreMessageLogger log = CoreLogging.messageLogger( SynchronizationCallbackCoordinatorTrackingImpl.class ); + + // magic numbers :( + private static final int NO_STATUS = -1; + private static final long NO_THREAD_ID = Long.MIN_VALUE; + + private volatile long registrationThreadId; + private volatile int delayedCompletionHandlingStatus; + + public SynchronizationCallbackCoordinatorTrackingImpl(TransactionCoordinator transactionCoordinator) { + // super ctor calls reset() followed by pulse() + super( transactionCoordinator ); + } + + @Override + public void reset() { + super.reset(); + // NOTE : reset is typically called: + // 1) on initialization, and + // 2) after "after completion" handling is finished. + // + // Here we use that to "clear out" all 'delayed after-completion" state. The registrationThreadId will + // "lazily" be re-populated on the next pulse call to allow for the potential of the next Session transaction + // occurring on a different thread (though that transaction would need to completely operate on that thread). + delayedCompletionHandlingStatus = NO_STATUS; + registrationThreadId = NO_THREAD_ID; + } + + @Override + public void afterCompletion(int status) { + // The whole concept of "tracking" comes down to this code block.. + // Essentially we need to see if we can process the callback immediately. So here we check whether the + // current call is happening on the same thread as the thread under which we registered the Synchronization. + // As far as we know, this can only ever happen in the rollback case where the transaction had been rolled + // back on a separate "reaper" thread. Since we know the transaction status and that check is not as heavy + // as accessing the current thread, we check that first + if ( JtaStatusHelper.isRollback( status ) ) { + // we are processing a rollback, see if it is the same thread + final long currentThreadId = Thread.currentThread().getId(); + final boolean isRegistrationThread = currentThreadId == registrationThreadId; + if ( ! isRegistrationThread ) { + // so we do have the condition of a rollback initiated from a separate thread. Set the flag here and + // check for it in SessionImpl. See HHH-7910. + delayedCompletionHandlingStatus = status; + + log.rollbackFromBackgroundThread( status ); + return; + } + } + + // otherwise, do the callback immediately + doAfterCompletion( status ); + } + + @Override + public void pulse() { + // If this is the first call to pulse since an earlier call to reset, capture the current thread id + if ( registrationThreadId == NO_THREAD_ID ) { + registrationThreadId = Thread.currentThread().getId(); + } + } + + @Override + public void processAnyDelayedAfterCompletion() { + if ( delayedCompletionHandlingStatus != NO_STATUS ) { + doAfterCompletion( delayedCompletionHandlingStatus ); + delayedCompletionHandlingStatus = NO_STATUS; + throw new HibernateException("Transaction was rolled back in a different thread!"); + } + } +} diff --git a/hibernate-core/src/main/java/org/hibernate/engine/transaction/synchronization/spi/SynchronizationCallbackCoordinator.java b/hibernate-core/src/main/java/org/hibernate/engine/transaction/synchronization/spi/SynchronizationCallbackCoordinator.java index 723aa9d95a..8f67b6391c 100644 --- a/hibernate-core/src/main/java/org/hibernate/engine/transaction/synchronization/spi/SynchronizationCallbackCoordinator.java +++ b/hibernate-core/src/main/java/org/hibernate/engine/transaction/synchronization/spi/SynchronizationCallbackCoordinator.java @@ -28,10 +28,10 @@ /** * @author Steve Ebersole */ -public interface SynchronizationCallbackCoordinator extends Synchronization{ +public interface SynchronizationCallbackCoordinator extends Synchronization { + public void setExceptionMapper(ExceptionMapper exceptionMapper); public void setManagedFlushChecker(ManagedFlushChecker managedFlushChecker); public void setAfterCompletionAction(AfterCompletionAction afterCompletionAction); public void pulse(); - public void delayedAfterCompletion(); - public void setExceptionMapper(ExceptionMapper exceptionMapper); + public void processAnyDelayedAfterCompletion(); } diff --git a/hibernate-core/src/main/java/org/hibernate/internal/CoreMessageLogger.java b/hibernate-core/src/main/java/org/hibernate/internal/CoreMessageLogger.java index 956fa3ec35..e5156dbf47 100644 --- a/hibernate-core/src/main/java/org/hibernate/internal/CoreMessageLogger.java +++ b/hibernate-core/src/main/java/org/hibernate/internal/CoreMessageLogger.java @@ -1637,4 +1637,13 @@ void cannotResolveNonNullableTransientDependencies(String transientEntityString, value = "Encountered request for Service by non-primary service role [%s -> %s]; please update usage" ) void alternateServiceRole(String requestedRole, String targetRole); + + @LogMessage(level = WARN) + @Message( + id = 451, + value = "Transaction afterCompletion called by a background thread; " + + "delaying afterCompletion processing until the original thread can handle it. [status=%s]" + ) + void rollbackFromBackgroundThread(int status); + } diff --git a/hibernate-core/src/main/java/org/hibernate/internal/SessionImpl.java b/hibernate-core/src/main/java/org/hibernate/internal/SessionImpl.java index b685694614..06a7cd7678 100644 --- a/hibernate-core/src/main/java/org/hibernate/internal/SessionImpl.java +++ b/hibernate-core/src/main/java/org/hibernate/internal/SessionImpl.java @@ -23,8 +23,6 @@ */ package org.hibernate.internal; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; @@ -158,7 +156,6 @@ import org.hibernate.proxy.LazyInitializer; import org.hibernate.stat.SessionStatistics; import org.hibernate.stat.internal.SessionStatisticsImpl; -import org.hibernate.type.SerializationException; import org.hibernate.type.Type; import org.jboss.logging.Logger; @@ -644,7 +641,7 @@ private void checkNoUnresolvedActionsAfterOperation() { } private void delayedAfterCompletion() { - transactionCoordinator.getSynchronizationCallbackCoordinator().delayedAfterCompletion(); + transactionCoordinator.getSynchronizationCallbackCoordinator().processAnyDelayedAfterCompletion(); } // saveOrUpdate() operations ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -701,7 +698,7 @@ private Serializable fireSave(SaveOrUpdateEvent event) { // update() operations ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ public void update(Object obj) throws HibernateException { - update(null, obj); + update( null, obj ); } public void update(String entityName, Object object) throws HibernateException { @@ -730,7 +727,7 @@ public LockRequest buildLockRequest(LockOptions lockOptions) { } public void lock(Object object, LockMode lockMode) throws HibernateException { - fireLock( new LockEvent(object, lockMode, this) ); + fireLock( new LockEvent( object, lockMode, this ) ); } private void fireLock(String entityName, Object object, LockOptions options) { @@ -952,7 +949,7 @@ public Object internalLoad(String entityName, Serializable id, boolean eager, bo ? LoadEventListener.INTERNAL_LOAD_EAGER : LoadEventListener.INTERNAL_LOAD_LAZY; LoadEvent event = new LoadEvent(id, entityName, true, this); - fireLoad(event, type); + fireLoad( event, type ); if ( !nullable ) { UnresolvableObjectException.throwIfNull( event.getResult(), id, entityName ); }