HHH-8709 - SynchronizationCallbackCoordinator is calling Thread.currentThread way too often
This commit is contained in:
parent
fe6263936e
commit
8580b29161
|
@ -47,7 +47,8 @@ import org.hibernate.engine.transaction.spi.TransactionFactory;
|
|||
import org.hibernate.engine.transaction.spi.TransactionImplementor;
|
||||
import org.hibernate.engine.transaction.spi.TransactionObserver;
|
||||
import org.hibernate.engine.transaction.synchronization.internal.RegisteredSynchronization;
|
||||
import org.hibernate.engine.transaction.synchronization.internal.SynchronizationCallbackCoordinatorImpl;
|
||||
import org.hibernate.engine.transaction.synchronization.internal.SynchronizationCallbackCoordinatorNonTrackingImpl;
|
||||
import org.hibernate.engine.transaction.synchronization.internal.SynchronizationCallbackCoordinatorTrackingImpl;
|
||||
import org.hibernate.engine.transaction.synchronization.spi.SynchronizationCallbackCoordinator;
|
||||
import org.hibernate.internal.CoreMessageLogger;
|
||||
import org.hibernate.internal.util.collections.CollectionHelper;
|
||||
|
@ -74,7 +75,7 @@ public class TransactionCoordinatorImpl implements TransactionCoordinator {
|
|||
|
||||
private transient TransactionImplementor currentHibernateTransaction;
|
||||
|
||||
private transient SynchronizationCallbackCoordinatorImpl callbackCoordinator;
|
||||
private transient SynchronizationCallbackCoordinator callbackCoordinator;
|
||||
|
||||
private transient boolean open = true;
|
||||
private transient boolean synchronizationRegistered;
|
||||
|
@ -261,7 +262,9 @@ public class TransactionCoordinatorImpl implements TransactionCoordinator {
|
|||
@Override
|
||||
public SynchronizationCallbackCoordinator getSynchronizationCallbackCoordinator() {
|
||||
if ( callbackCoordinator == null ) {
|
||||
callbackCoordinator = new SynchronizationCallbackCoordinatorImpl( this );
|
||||
callbackCoordinator = transactionEnvironment.getSessionFactory().getSettings().isJtaTrackByThread()
|
||||
? new SynchronizationCallbackCoordinatorTrackingImpl( this )
|
||||
: new SynchronizationCallbackCoordinatorNonTrackingImpl( this );
|
||||
}
|
||||
return callbackCoordinator;
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
/*
|
||||
* Hibernate, Relational Persistence for Idiomatic Java
|
||||
*
|
||||
* Copyright (c) 2011, Red Hat Inc. or third-party contributors as
|
||||
* Copyright (c) 2013, Red Hat Inc. or third-party contributors as
|
||||
* indicated by the @author tags or express copyright attribution
|
||||
* statements applied by the authors. All third-party contributions are
|
||||
* distributed under license by Red Hat Inc.
|
||||
|
@ -25,9 +25,9 @@ package org.hibernate.engine.transaction.synchronization.internal;
|
|||
|
||||
import javax.transaction.SystemException;
|
||||
|
||||
import org.hibernate.HibernateException;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
import org.hibernate.TransactionException;
|
||||
import org.hibernate.cfg.Settings;
|
||||
import org.hibernate.engine.transaction.internal.jta.JtaStatusHelper;
|
||||
import org.hibernate.engine.transaction.spi.TransactionContext;
|
||||
import org.hibernate.engine.transaction.spi.TransactionCoordinator;
|
||||
|
@ -35,35 +35,27 @@ import org.hibernate.engine.transaction.synchronization.spi.AfterCompletionActio
|
|||
import org.hibernate.engine.transaction.synchronization.spi.ExceptionMapper;
|
||||
import org.hibernate.engine.transaction.synchronization.spi.ManagedFlushChecker;
|
||||
import org.hibernate.engine.transaction.synchronization.spi.SynchronizationCallbackCoordinator;
|
||||
import org.hibernate.internal.CoreLogging;
|
||||
import org.hibernate.internal.CoreMessageLogger;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
* Manages callbacks from the {@link javax.transaction.Synchronization} registered by Hibernate.
|
||||
*
|
||||
* @author Steve Ebersole
|
||||
* @author Brett Meyer
|
||||
*/
|
||||
public class SynchronizationCallbackCoordinatorImpl implements SynchronizationCallbackCoordinator {
|
||||
|
||||
private static final CoreMessageLogger LOG = Logger.getMessageLogger( CoreMessageLogger.class,
|
||||
SynchronizationCallbackCoordinatorImpl.class.getName() );
|
||||
public class SynchronizationCallbackCoordinatorNonTrackingImpl implements SynchronizationCallbackCoordinator {
|
||||
private static final CoreMessageLogger LOG = CoreLogging.messageLogger(
|
||||
SynchronizationCallbackCoordinatorNonTrackingImpl.class
|
||||
);
|
||||
|
||||
private final TransactionCoordinator transactionCoordinator;
|
||||
private final Settings settings;
|
||||
|
||||
private ManagedFlushChecker managedFlushChecker;
|
||||
private AfterCompletionAction afterCompletionAction;
|
||||
private ExceptionMapper exceptionMapper;
|
||||
|
||||
private volatile long registrationThreadId;
|
||||
private final int NO_STATUS = -1;
|
||||
private volatile int delayedCompletionHandlingStatus;
|
||||
|
||||
public SynchronizationCallbackCoordinatorImpl(TransactionCoordinator transactionCoordinator) {
|
||||
public SynchronizationCallbackCoordinatorNonTrackingImpl(TransactionCoordinator transactionCoordinator) {
|
||||
this.transactionCoordinator = transactionCoordinator;
|
||||
this.settings = transactionCoordinator.getTransactionContext()
|
||||
.getTransactionEnvironment().getSessionFactory().getSettings();
|
||||
reset();
|
||||
pulse();
|
||||
}
|
||||
|
@ -72,7 +64,10 @@ public class SynchronizationCallbackCoordinatorImpl implements SynchronizationCa
|
|||
managedFlushChecker = STANDARD_MANAGED_FLUSH_CHECKER;
|
||||
exceptionMapper = STANDARD_EXCEPTION_MAPPER;
|
||||
afterCompletionAction = STANDARD_AFTER_COMPLETION_ACTION;
|
||||
delayedCompletionHandlingStatus = NO_STATUS;
|
||||
}
|
||||
|
||||
private TransactionContext transactionContext() {
|
||||
return transactionCoordinator.getTransactionContext();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -92,6 +87,7 @@ public class SynchronizationCallbackCoordinatorImpl implements SynchronizationCa
|
|||
|
||||
// sync callbacks ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
@Override
|
||||
public void beforeCompletion() {
|
||||
LOG.trace( "Transaction before completion callback" );
|
||||
|
||||
|
@ -131,38 +127,12 @@ public class SynchronizationCallbackCoordinatorImpl implements SynchronizationCa
|
|||
transactionCoordinator.setRollbackOnly();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterCompletion(int status) {
|
||||
if ( settings.isJtaTrackByThread() && !isRegistrationThread()
|
||||
&& JtaStatusHelper.isRollback( status ) ) {
|
||||
// The transaction was rolled back by another thread -- not the
|
||||
// original application. Examples of this include a JTA transaction
|
||||
// timeout getting cleaned up by a reaper thread. If this happens,
|
||||
// afterCompletion must be handled by the original thread in order
|
||||
// to prevent non-threadsafe session use. Set the flag here and
|
||||
// check for it in SessionImpl. See HHH-7910.
|
||||
LOG.warnv( "Transaction afterCompletion called by a background thread! Delaying action until the original thread can handle it. [status={0}]", status );
|
||||
delayedCompletionHandlingStatus = status;
|
||||
}
|
||||
else {
|
||||
doAfterCompletion( status );
|
||||
}
|
||||
}
|
||||
|
||||
public void pulse() {
|
||||
if ( settings.isJtaTrackByThread() ) {
|
||||
registrationThreadId = Thread.currentThread().getId();
|
||||
}
|
||||
doAfterCompletion( status );
|
||||
}
|
||||
|
||||
public void delayedAfterCompletion() {
|
||||
if ( delayedCompletionHandlingStatus != NO_STATUS ) {
|
||||
doAfterCompletion( delayedCompletionHandlingStatus );
|
||||
delayedCompletionHandlingStatus = NO_STATUS;
|
||||
throw new HibernateException("Transaction was rolled back in a different thread!");
|
||||
}
|
||||
}
|
||||
|
||||
private void doAfterCompletion(int status) {
|
||||
protected void doAfterCompletion(int status) {
|
||||
LOG.tracef( "Starting transaction afterCompletion callback [status=%s]", status );
|
||||
if ( !transactionCoordinator.isActive() ) {
|
||||
return;
|
||||
|
@ -181,12 +151,12 @@ public class SynchronizationCallbackCoordinatorImpl implements SynchronizationCa
|
|||
}
|
||||
}
|
||||
|
||||
private boolean isRegistrationThread() {
|
||||
return Thread.currentThread().getId() == registrationThreadId;
|
||||
@Override
|
||||
public void pulse() {
|
||||
}
|
||||
|
||||
private TransactionContext transactionContext() {
|
||||
return transactionCoordinator.getTransactionContext();
|
||||
@Override
|
||||
public void processAnyDelayedAfterCompletion() {
|
||||
}
|
||||
|
||||
private static final ManagedFlushChecker STANDARD_MANAGED_FLUSH_CHECKER = new ManagedFlushChecker() {
|
||||
|
@ -200,12 +170,14 @@ public class SynchronizationCallbackCoordinatorImpl implements SynchronizationCa
|
|||
};
|
||||
|
||||
private static final ExceptionMapper STANDARD_EXCEPTION_MAPPER = new ExceptionMapper() {
|
||||
@Override
|
||||
public RuntimeException mapStatusCheckFailure(String message, SystemException systemException) {
|
||||
LOG.error( LOG.unableToDetermineTransactionStatus(), systemException );
|
||||
return new TransactionException( "could not determine transaction status in beforeCompletion()",
|
||||
systemException );
|
||||
}
|
||||
|
||||
@Override
|
||||
public RuntimeException mapManagedFlushFailure(String message, RuntimeException failure) {
|
||||
LOG.unableToPerformManagedFlush( failure.getMessage() );
|
||||
return failure;
|
|
@ -0,0 +1,110 @@
|
|||
/*
|
||||
* Hibernate, Relational Persistence for Idiomatic Java
|
||||
*
|
||||
* Copyright (c) 2011, Red Hat Inc. or third-party contributors as
|
||||
* indicated by the @author tags or express copyright attribution
|
||||
* statements applied by the authors. All third-party contributions are
|
||||
* distributed under license by Red Hat Inc.
|
||||
*
|
||||
* This copyrighted material is made available to anyone wishing to use, modify,
|
||||
* copy, or redistribute it subject to the terms and conditions of the GNU
|
||||
* Lesser General Public License, as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
||||
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
|
||||
* for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Lesser General Public License
|
||||
* along with this distribution; if not, write to:
|
||||
* Free Software Foundation, Inc.
|
||||
* 51 Franklin Street, Fifth Floor
|
||||
* Boston, MA 02110-1301 USA
|
||||
*/
|
||||
package org.hibernate.engine.transaction.synchronization.internal;
|
||||
|
||||
import org.hibernate.HibernateException;
|
||||
import org.hibernate.engine.transaction.internal.jta.JtaStatusHelper;
|
||||
import org.hibernate.engine.transaction.spi.TransactionCoordinator;
|
||||
import org.hibernate.internal.CoreLogging;
|
||||
import org.hibernate.internal.CoreMessageLogger;
|
||||
|
||||
/**
|
||||
* Extension of SynchronizationCallbackCoordinatorNonTrackingImpl that adds checking of whether a rollback comes from
|
||||
* a thread other than the application thread (thread used to register the Synchronization)
|
||||
*
|
||||
* @author Steve Ebersole
|
||||
* @author Brett Meyer
|
||||
*/
|
||||
public class SynchronizationCallbackCoordinatorTrackingImpl extends SynchronizationCallbackCoordinatorNonTrackingImpl {
|
||||
private static final CoreMessageLogger log = CoreLogging.messageLogger( SynchronizationCallbackCoordinatorTrackingImpl.class );
|
||||
|
||||
// magic numbers :(
|
||||
private static final int NO_STATUS = -1;
|
||||
private static final long NO_THREAD_ID = Long.MIN_VALUE;
|
||||
|
||||
private volatile long registrationThreadId;
|
||||
private volatile int delayedCompletionHandlingStatus;
|
||||
|
||||
public SynchronizationCallbackCoordinatorTrackingImpl(TransactionCoordinator transactionCoordinator) {
|
||||
// super ctor calls reset() followed by pulse()
|
||||
super( transactionCoordinator );
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
super.reset();
|
||||
// NOTE : reset is typically called:
|
||||
// 1) on initialization, and
|
||||
// 2) after "after completion" handling is finished.
|
||||
//
|
||||
// Here we use that to "clear out" all 'delayed after-completion" state. The registrationThreadId will
|
||||
// "lazily" be re-populated on the next pulse call to allow for the potential of the next Session transaction
|
||||
// occurring on a different thread (though that transaction would need to completely operate on that thread).
|
||||
delayedCompletionHandlingStatus = NO_STATUS;
|
||||
registrationThreadId = NO_THREAD_ID;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void afterCompletion(int status) {
|
||||
// The whole concept of "tracking" comes down to this code block..
|
||||
// Essentially we need to see if we can process the callback immediately. So here we check whether the
|
||||
// current call is happening on the same thread as the thread under which we registered the Synchronization.
|
||||
// As far as we know, this can only ever happen in the rollback case where the transaction had been rolled
|
||||
// back on a separate "reaper" thread. Since we know the transaction status and that check is not as heavy
|
||||
// as accessing the current thread, we check that first
|
||||
if ( JtaStatusHelper.isRollback( status ) ) {
|
||||
// we are processing a rollback, see if it is the same thread
|
||||
final long currentThreadId = Thread.currentThread().getId();
|
||||
final boolean isRegistrationThread = currentThreadId == registrationThreadId;
|
||||
if ( ! isRegistrationThread ) {
|
||||
// so we do have the condition of a rollback initiated from a separate thread. Set the flag here and
|
||||
// check for it in SessionImpl. See HHH-7910.
|
||||
delayedCompletionHandlingStatus = status;
|
||||
|
||||
log.rollbackFromBackgroundThread( status );
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// otherwise, do the callback immediately
|
||||
doAfterCompletion( status );
|
||||
}
|
||||
|
||||
@Override
|
||||
public void pulse() {
|
||||
// If this is the first call to pulse since an earlier call to reset, capture the current thread id
|
||||
if ( registrationThreadId == NO_THREAD_ID ) {
|
||||
registrationThreadId = Thread.currentThread().getId();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void processAnyDelayedAfterCompletion() {
|
||||
if ( delayedCompletionHandlingStatus != NO_STATUS ) {
|
||||
doAfterCompletion( delayedCompletionHandlingStatus );
|
||||
delayedCompletionHandlingStatus = NO_STATUS;
|
||||
throw new HibernateException("Transaction was rolled back in a different thread!");
|
||||
}
|
||||
}
|
||||
}
|
|
@ -28,10 +28,10 @@ import javax.transaction.Synchronization;
|
|||
/**
|
||||
* @author Steve Ebersole
|
||||
*/
|
||||
public interface SynchronizationCallbackCoordinator extends Synchronization{
|
||||
public interface SynchronizationCallbackCoordinator extends Synchronization {
|
||||
public void setExceptionMapper(ExceptionMapper exceptionMapper);
|
||||
public void setManagedFlushChecker(ManagedFlushChecker managedFlushChecker);
|
||||
public void setAfterCompletionAction(AfterCompletionAction afterCompletionAction);
|
||||
public void pulse();
|
||||
public void delayedAfterCompletion();
|
||||
public void setExceptionMapper(ExceptionMapper exceptionMapper);
|
||||
public void processAnyDelayedAfterCompletion();
|
||||
}
|
||||
|
|
|
@ -1637,4 +1637,13 @@ public interface CoreMessageLogger extends BasicLogger {
|
|||
value = "Encountered request for Service by non-primary service role [%s -> %s]; please update usage"
|
||||
)
|
||||
void alternateServiceRole(String requestedRole, String targetRole);
|
||||
|
||||
@LogMessage(level = WARN)
|
||||
@Message(
|
||||
id = 451,
|
||||
value = "Transaction afterCompletion called by a background thread; " +
|
||||
"delaying afterCompletion processing until the original thread can handle it. [status=%s]"
|
||||
)
|
||||
void rollbackFromBackgroundThread(int status);
|
||||
|
||||
}
|
||||
|
|
|
@ -23,8 +23,6 @@
|
|||
*/
|
||||
package org.hibernate.internal;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.ObjectInputStream;
|
||||
|
@ -158,7 +156,6 @@ import org.hibernate.proxy.HibernateProxy;
|
|||
import org.hibernate.proxy.LazyInitializer;
|
||||
import org.hibernate.stat.SessionStatistics;
|
||||
import org.hibernate.stat.internal.SessionStatisticsImpl;
|
||||
import org.hibernate.type.SerializationException;
|
||||
import org.hibernate.type.Type;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
|
@ -644,7 +641,7 @@ public final class SessionImpl extends AbstractSessionImpl implements EventSourc
|
|||
}
|
||||
|
||||
private void delayedAfterCompletion() {
|
||||
transactionCoordinator.getSynchronizationCallbackCoordinator().delayedAfterCompletion();
|
||||
transactionCoordinator.getSynchronizationCallbackCoordinator().processAnyDelayedAfterCompletion();
|
||||
}
|
||||
|
||||
// saveOrUpdate() operations ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
@ -701,7 +698,7 @@ public final class SessionImpl extends AbstractSessionImpl implements EventSourc
|
|||
// update() operations ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
public void update(Object obj) throws HibernateException {
|
||||
update(null, obj);
|
||||
update( null, obj );
|
||||
}
|
||||
|
||||
public void update(String entityName, Object object) throws HibernateException {
|
||||
|
@ -730,7 +727,7 @@ public final class SessionImpl extends AbstractSessionImpl implements EventSourc
|
|||
}
|
||||
|
||||
public void lock(Object object, LockMode lockMode) throws HibernateException {
|
||||
fireLock( new LockEvent(object, lockMode, this) );
|
||||
fireLock( new LockEvent( object, lockMode, this ) );
|
||||
}
|
||||
|
||||
private void fireLock(String entityName, Object object, LockOptions options) {
|
||||
|
@ -952,7 +949,7 @@ public final class SessionImpl extends AbstractSessionImpl implements EventSourc
|
|||
? LoadEventListener.INTERNAL_LOAD_EAGER
|
||||
: LoadEventListener.INTERNAL_LOAD_LAZY;
|
||||
LoadEvent event = new LoadEvent(id, entityName, true, this);
|
||||
fireLoad(event, type);
|
||||
fireLoad( event, type );
|
||||
if ( !nullable ) {
|
||||
UnresolvableObjectException.throwIfNull( event.getResult(), id, entityName );
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue