diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java index 0229a85b1d..18801f6076 100644 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java @@ -10,13 +10,13 @@ import javax.transaction.SystemException; import javax.transaction.Transaction; import javax.transaction.TransactionManager; import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; +import java.util.HashSet; +import java.util.Iterator; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.locks.ReentrantLock; import org.hibernate.cache.CacheException; @@ -41,9 +41,9 @@ import org.infinispan.manager.EmbeddedCacheManager; *
  • Call {@link #registerPendingPut(Object)}
  • *
  • Read the database
  • *
  • Call {@link #acquirePutFromLoadLock(Object)} - *
  • if above returns false, the thread should not cache the data; - * only if above returns true, put data in the cache and...
  • - *
  • then call {@link #releasePutFromLoadLock(Object)}
  • + *
  • if above returns null, the thread should not cache the data; + * only if above returns instance of AcquiredLock, put data in the cache and...
  • + *
  • then call {@link #releasePutFromLoadLock(Object, Lock)}
  • * *

    *

    @@ -53,15 +53,19 @@ import org.infinispan.manager.EmbeddedCacheManager; * call *

    *

    + * After transaction commit (when the DB is updated) {@link #endInvalidatingKey(Object)} should + * be called in order to allow further attempts to cache entry. *

    *

    *

    * This class also supports the concept of "naked puts", which are calls to - * {@link #acquirePutFromLoadLock(Object)} without a preceding {@link #registerPendingPut(Object)} - * call. + * {@link #acquirePutFromLoadLock(Object)} without a preceding {@link #registerPendingPut(Object)}. + * Besides not acquiring lock in {@link #registerPendingPut(Object)} this can happen when collection + * elements are loaded after the collection has not been found in the cache, where the elements + * don't have their own table but can be listed as 'select ... from Element where collection_id = ...'. *

    * * @author Brian Stansberry @@ -89,26 +93,15 @@ public class PutFromLoadValidator { */ private final ConcurrentMap pendingPuts; - private final ConcurrentMap recentRemovals = new ConcurrentHashMap(); /** - * List of recent removals. Used to ensure we don't leak memory via the recentRemovals map - */ - private final List removalsQueue = new LinkedList(); - /** - * The time when the first element in removalsQueue will expire. No reason to do housekeeping on - * the queue before this time. - */ - private volatile long earliestRemovalTimestamp; - /** - * Lock controlling access to removalsQueue - */ - private final Lock removalsLock = new ReentrantLock(); - - /** - * The time of the last call to regionRemoved(), plus NAKED_PUT_INVALIDATION_PERIOD. All naked + * The time of the last call to {@link #invalidateRegion()}, plus NAKED_PUT_INVALIDATION_PERIOD. All naked * puts will be rejected until the current time is greater than this value. + * NOTE: update only through {@link #invalidationUpdater}! */ - private volatile long invalidationTimestamp; + private volatile long invalidationTimestamp = Long.MIN_VALUE; + + private static final AtomicLongFieldUpdater invalidationUpdater + = AtomicLongFieldUpdater.newUpdater(PutFromLoadValidator.class, "invalidationTimestamp"); /** * Creates a new put from load validator instance. @@ -133,9 +126,7 @@ public class PutFromLoadValidator { public PutFromLoadValidator( AdvancedCache cache, TransactionManager transactionManager, long nakedPutInvalidationPeriod) { - this(cache, cache.getCacheManager(), transactionManager, - nakedPutInvalidationPeriod - ); + this(cache, cache.getCacheManager(), transactionManager, nakedPutInvalidationPeriod); } /** @@ -170,81 +161,100 @@ public class PutFromLoadValidator { // ----------------------------------------------------------------- Public + /** + * Marker for lock acquired in {@link #acquirePutFromLoadLock(Object)} + */ + public static class Lock { + protected Lock() {} + } + /** * Acquire a lock giving the calling thread the right to put data in the * cache for the given key. *

    * NOTE: A call to this method that returns true - * should always be matched with a call to {@link #releasePutFromLoadLock(Object)}. + * should always be matched with a call to {@link #releasePutFromLoadLock(Object, Lock)}. *

    * * @param key the key * - * @return true if the lock is acquired and the cache put - * can proceed; false if the data should not be cached + * @return AcquiredLock if the lock is acquired and the cache put + * can proceed; null if the data should not be cached */ - public boolean acquirePutFromLoadLock(Object key) { + public Lock acquirePutFromLoadLock(Object key) { boolean valid = false; boolean locked = false; - final long now = System.currentTimeMillis(); + long now = Long.MIN_VALUE; - try { - final PendingPutMap pending = pendingPuts.get( key ); - if ( pending != null ) { - locked = pending.acquireLock( 100, TimeUnit.MILLISECONDS ); - if ( locked ) { - try { - final PendingPut toCancel = pending.remove( getOwnerForPut() ); - if ( toCancel != null ) { - valid = !toCancel.completed; - toCancel.completed = true; + PendingPutMap pending = pendingPuts.get( key ); + for (;;) { + try { + if (pending != null) { + locked = pending.acquireLock(100, TimeUnit.MILLISECONDS); + if (locked) { + try { + final PendingPut toCancel = pending.remove(getOwnerForPut()); + if (toCancel != null) { + valid = !toCancel.completed; + toCancel.completed = true; + } else { + // this is a naked put + if (pending.hasInvalidator()) { + valid = false; + } else { + if (now == Long.MIN_VALUE) { + now = System.currentTimeMillis(); + } + valid = now > pending.nakedPutsDeadline; + } + } + return valid ? pending : null; + } finally { + if (!valid) { + pending.releaseLock(); + locked = false; + } + } + } else { + // oops, we have leaked record for this owner, but we don't want to wait here + return null; + } + } else { + // Key wasn't in pendingPuts, so either this is a "naked put" + // or regionRemoved has been called. Check if we can proceed + long invalidationTimestamp = this.invalidationTimestamp; + if (invalidationTimestamp != Long.MIN_VALUE) { + now = System.currentTimeMillis(); + if (now > invalidationTimestamp) { + // time is +- monotonic se don't let other threads do the expensive currentTimeMillis() + invalidationUpdater.compareAndSet(this, invalidationTimestamp, Long.MIN_VALUE); + } else { + return null; } } - finally { - if ( !valid ) { - pending.releaseLock(); - locked = false; - } + + PendingPut pendingPut = new PendingPut(getOwnerForPut()); + pending = new PendingPutMap(pendingPut); + PendingPutMap existing = pendingPuts.putIfAbsent(key, pending); + if (existing != null) { + pending = existing; } + // continue in next loop with lock acquisition } - } - else { - // Key wasn't in pendingPuts, so either this is a "naked put" - // or regionRemoved has been called. Check if we can proceed - if ( now > invalidationTimestamp ) { - final Long removedTime = recentRemovals.get( key ); - if ( removedTime == null || now > removedTime ) { - // It's legal to proceed. But we have to record this key - // in pendingPuts so releasePutFromLoadLock can find it. - // To do this we basically simulate a normal "register - // then acquire lock" pattern - registerPendingPut( key ); - locked = acquirePutFromLoadLock( key ); - valid = locked; - } + } catch (Throwable t) { + if (locked) { + pending.releaseLock(); + } + + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else if (t instanceof Error) { + throw (Error) t; + } else { + throw new RuntimeException(t); } } } - catch (Throwable t) { - if ( locked ) { - final PendingPutMap toRelease = pendingPuts.get( key ); - if ( toRelease != null ) { - toRelease.releaseLock(); - } - } - - if ( t instanceof RuntimeException ) { - throw (RuntimeException) t; - } - else if ( t instanceof Error ) { - throw (Error) t; - } - else { - throw new RuntimeException( t ); - } - } - - return valid; } /** @@ -253,87 +263,16 @@ public class PutFromLoadValidator { * * @param key the key */ - public void releasePutFromLoadLock(Object key) { - final PendingPutMap pending = pendingPuts.get( key ); + public void releasePutFromLoadLock(Object key, Lock lock) { + final PendingPutMap pending = (PendingPutMap) lock; if ( pending != null ) { - if ( pending.size() == 0 ) { + if ( pending.canRemove() ) { pendingPuts.remove( key, pending ); } pending.releaseLock(); } } - /** - * Invalidates any {@link #registerPendingPut(Object) previously registered pending puts} ensuring a subsequent call to - * {@link #acquirePutFromLoadLock(Object)} will return false.

    This method will block until any - * concurrent thread that has {@link #acquirePutFromLoadLock(Object) acquired the putFromLoad lock} for the given key - * has released the lock. This allows the caller to be certain the putFromLoad will not execute after this method - * returns, possibly caching stale data.

    - * - * @param key key identifying data whose pending puts should be invalidated - * - * @return true if the invalidation was successful; false if a problem occured (which the - * caller should treat as an exception condition) - */ - public boolean invalidateKey(Object key) { - boolean success = true; - - // Invalidate any pending puts - final PendingPutMap pending = pendingPuts.get( key ); - if ( pending != null ) { - // This lock should be available very quickly, but we'll be - // very patient waiting for it as callers should treat not - // acquiring it as an exception condition - if ( pending.acquireLock( 60, TimeUnit.SECONDS ) ) { - try { - pending.invalidate(); - } - finally { - pending.releaseLock(); - } - } - else { - success = false; - } - } - - // Record when this occurred to invalidate later naked puts - final RecentRemoval removal = new RecentRemoval( key, this.nakedPutInvalidationPeriod ); - recentRemovals.put( key, removal.timestamp ); - - // Don't let recentRemovals map become a memory leak - RecentRemoval toClean = null; - final boolean attemptClean = removal.timestamp > earliestRemovalTimestamp; - removalsLock.lock(); - try { - removalsQueue.add( removal ); - - if ( attemptClean ) { - if ( removalsQueue.size() > 1 ) { - // we have at least one as we just added it - toClean = removalsQueue.remove( 0 ); - } - earliestRemovalTimestamp = removalsQueue.get( 0 ).timestamp; - } - } - finally { - removalsLock.unlock(); - } - - if ( toClean != null ) { - Long cleaned = recentRemovals.get( toClean.key ); - if ( cleaned != null && cleaned.equals( toClean.timestamp ) ) { - cleaned = recentRemovals.remove( toClean.key ); - if ( cleaned != null && !cleaned.equals( toClean.timestamp ) ) { - // Oops; removed the wrong timestamp; restore it - recentRemovals.putIfAbsent( toClean.key, cleaned ); - } - } - } - - return success; - } - /** * Invalidates all {@link #registerPendingPut(Object) previously registered pending puts} ensuring a subsequent call to * {@link #acquirePutFromLoadLock(Object)} will return false.

    This method will block until any @@ -345,15 +284,16 @@ public class PutFromLoadValidator { * caller should treat as an exception condition) */ public boolean invalidateRegion() { - - boolean ok = false; - invalidationTimestamp = System.currentTimeMillis() + this.nakedPutInvalidationPeriod; - + // TODO: not sure what happens with locks acquired *after* calling this method but before + // the actual invalidation + boolean ok = true; + invalidationUpdater.set(this, System.currentTimeMillis() + nakedPutInvalidationPeriod); try { // Acquire the lock for each entry to ensure any ongoing // work associated with it is completed before we return - for ( PendingPutMap entry : pendingPuts.values() ) { + for ( Iterator it = pendingPuts.values().iterator(); it.hasNext(); ) { + PendingPutMap entry = it.next(); if ( entry.acquireLock( 60, TimeUnit.SECONDS ) ) { try { entry.invalidate(); @@ -361,30 +301,15 @@ public class PutFromLoadValidator { finally { entry.releaseLock(); } + it.remove(); } else { ok = false; } } - - removalsLock.lock(); - try { - recentRemovals.clear(); - removalsQueue.clear(); - - ok = true; - - } - finally { - removalsLock.unlock(); - } - } - catch (Exception e) { + } catch (Exception e) { ok = false; } - finally { - earliestRemovalTimestamp = invalidationTimestamp; - } return ok; } @@ -395,8 +320,7 @@ public class PutFromLoadValidator { * wherein it is expected that a database read plus cache put will occur. Calling this method allows the validator to * treat the subsequent acquirePutFromLoadLock as if the database read occurred when this method was * invoked. This allows the validator to compare the timestamp of this call against the timestamp of subsequent removal - * notifications. A put that occurs without this call preceding it is "naked"; i.e the validator must assume the put is - * not valid if any relevant removal has occurred within {@link #NAKED_PUT_INVALIDATION_PERIOD} milliseconds. + * notifications. * * @param key key that will be used for subsequent cache put */ @@ -404,50 +328,83 @@ public class PutFromLoadValidator { final PendingPut pendingPut = new PendingPut( getOwnerForPut() ); final PendingPutMap pendingForKey = new PendingPutMap( pendingPut ); - for (; ; ) { - final PendingPutMap existing = pendingPuts.putIfAbsent( key, pendingForKey ); - if ( existing != null ) { - if ( existing.acquireLock( 10, TimeUnit.SECONDS ) ) { - - try { - existing.put( pendingPut ); - final PendingPutMap doublecheck = pendingPuts.putIfAbsent( key, existing ); - if ( doublecheck == null || doublecheck == existing ) { - break; - } - // else we hit a race and need to loop to try again + final PendingPutMap existing = pendingPuts.putIfAbsent( key, pendingForKey ); + if ( existing != null ) { + if ( existing.acquireLock( 10, TimeUnit.SECONDS ) ) { + try { + if ( !existing.hasInvalidator() ) { + existing.put(pendingPut); } - finally { - existing.releaseLock(); - } - } - else { - // Can't get the lock; when we come back we'll be a "naked put" - break; + } finally { + existing.releaseLock(); } } else { - // normal case - break; + // Can't get the lock; when we come back we'll be a "naked put" } } } - // -------------------------------------------------------------- Protected - /** - * Only for use by unit tests; may be removed at any time + * Invalidates any {@link #registerPendingPut(Object) previously registered pending puts} + * and disables further registrations ensuring a subsequent call to {@link #acquirePutFromLoadLock(Object)} + * will return false.

    This method will block until any concurrent thread that has + * {@link #acquirePutFromLoadLock(Object) acquired the putFromLoad lock} for the given key + * has released the lock. This allows the caller to be certain the putFromLoad will not execute after this method + * returns, possibly caching stale data.

    + * After this transaction completes, {@link #endInvalidatingKey(Object)} needs to be called } + * + * @param key key identifying data whose pending puts should be invalidated + * + * @return true if the invalidation was successful; false if a problem occured (which the + * caller should treat as an exception condition) */ - protected int getRemovalQueueLength() { - removalsLock.lock(); - try { - return removalsQueue.size(); + public boolean beginInvalidatingKey(Object key) { + PendingPutMap pending = new PendingPutMap(null); + PendingPutMap prev = pendingPuts.putIfAbsent(key, pending); + if (prev != null) { + pending = prev; } - finally { - removalsLock.unlock(); + if (pending.acquireLock(60, TimeUnit.SECONDS)) { + try { + pending.invalidate(); + pending.addInvalidator(getOwnerForPut(), System.currentTimeMillis() + nakedPutInvalidationPeriod); + } finally { + pending.releaseLock(); + } + return true; + } else { + return false; } } + /** + * Called after the transaction completes, allowing caching of entries. It is possible that this method + * is called without previous invocation of {@link #beginInvalidatingKey(Object)}, then it should be noop. + * + * @param key + * @return + */ + public boolean endInvalidatingKey(Object key) { + PendingPutMap pending = pendingPuts.get(key); + if (pending == null) { + return true; + } + if (pending.acquireLock(60, TimeUnit.SECONDS)) { + try { + pending.removeInvalidator(getOwnerForPut()); + // we can't remove the pending put yet because we wait for naked puts + // pendingPuts should be configured with maxIdle time so won't have memory leak + return true; + } finally { + pending.releaseLock(); + } + } else { + return false; + } + } + + // ---------------------------------------------------------------- Private private Object getOwnerForPut() { @@ -470,10 +427,13 @@ public class PutFromLoadValidator { *

    * This class is NOT THREAD SAFE. All operations on it must be performed with the lock held. */ - private static class PendingPutMap { + private static class PendingPutMap extends Lock { private PendingPut singlePendingPut; private Map fullMap; - private final Lock lock = new ReentrantLock(); + private final java.util.concurrent.locks.Lock lock = new ReentrantLock(); + private Object singleInvalidator; + private Set invalidators; + private long nakedPutsDeadline = Long.MIN_VALUE; PendingPutMap(PendingPut singleItem) { this.singlePendingPut = singleItem; @@ -546,6 +506,41 @@ public class PutFromLoadValidator { fullMap = null; } } + + public void addInvalidator(Object invalidator, long deadline) { + if (invalidators == null) { + if (singleInvalidator == null) { + singleInvalidator = invalidator; + } else { + invalidators = new HashSet(); + invalidators.add(singleInvalidator); + invalidators.add(invalidator); + singleInvalidator = null; + } + } else { + invalidators.add(invalidator); + } + nakedPutsDeadline = Math.max(nakedPutsDeadline, deadline); + } + + public boolean hasInvalidator() { + return singleInvalidator != null || (invalidators != null && !invalidators.isEmpty()); + } + + public void removeInvalidator(Object invalidator) { + if (invalidators == null) { + if (singleInvalidator != null && singleInvalidator.equals(invalidator)) { + singleInvalidator = null; + } + } else { + invalidators.remove(invalidator); + } + } + + public boolean canRemove() { + return size() == 0 && !hasInvalidator() && + (nakedPutsDeadline == Long.MIN_VALUE || nakedPutsDeadline < System.currentTimeMillis()); + } } private static class PendingPut { @@ -556,15 +551,4 @@ public class PutFromLoadValidator { this.owner = owner; } } - - private static class RecentRemoval { - private final Object key; - private final Long timestamp; - - private RecentRemoval(Object key, long nakedPutInvalidationPeriod) { - this.key = key; - timestamp = System.currentTimeMillis() + nakedPutInvalidationPeriod; - } - } - } diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TransactionalAccessDelegate.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TransactionalAccessDelegate.java index bafe2d9800..3c196784e7 100755 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TransactionalAccessDelegate.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TransactionalAccessDelegate.java @@ -9,7 +9,6 @@ package org.hibernate.cache.infinispan.access; import org.hibernate.cache.CacheException; import org.hibernate.cache.infinispan.impl.BaseRegion; import org.hibernate.cache.infinispan.util.Caches; - import org.infinispan.AdvancedCache; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; @@ -111,7 +110,8 @@ public class TransactionalAccessDelegate { return false; } - if ( !putValidator.acquirePutFromLoadLock( key ) ) { + PutFromLoadValidator.Lock lock = putValidator.acquirePutFromLoadLock(key); + if ( lock == null) { if ( TRACE_ENABLED ) { log.tracef( "Put from load lock not acquired for key %s", key ); } @@ -131,7 +131,7 @@ public class TransactionalAccessDelegate { } } finally { - putValidator.releasePutFromLoadLock( key ); + putValidator.releasePutFromLoadLock( key, lock); } return true; @@ -185,7 +185,7 @@ public class TransactionalAccessDelegate { * @throws CacheException if removing the cached item fails */ public void remove(Object key) throws CacheException { - if ( !putValidator.invalidateKey( key ) ) { + if ( !putValidator.beginInvalidatingKey(key)) { throw new CacheException( "Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName() ); @@ -216,7 +216,7 @@ public class TransactionalAccessDelegate { * @throws CacheException if evicting the item fails */ public void evict(Object key) throws CacheException { - if ( !putValidator.invalidateKey( key ) ) { + if ( !putValidator.beginInvalidatingKey(key)) { throw new CacheException( "Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName() ); @@ -240,4 +240,19 @@ public class TransactionalAccessDelegate { Caches.broadcastEvictAll( cache ); } + /** + * Called when we have finished the attempted update/delete (which may or + * may not have been successful), after transaction completion. This method + * is used by "asynchronous" concurrency strategies. + * + * @param key The item key + * @throws org.hibernate.cache.CacheException Propogated from underlying {@link org.hibernate.cache.spi.Region} + */ + public void unlockItem(Object key) throws CacheException { + if ( !putValidator.endInvalidatingKey(key) ) { + // TODO: localization + log.warn("Failed to end invalidating pending putFromLoad calls for key " + key + " from region " + + region.getName() + "; the key won't be cached in the future."); + } + } } diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/collection/TransactionalAccess.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/collection/TransactionalAccess.java index b3791254bd..3850ff66f5 100644 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/collection/TransactionalAccess.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/collection/TransactionalAccess.java @@ -75,6 +75,7 @@ class TransactionalAccess implements CollectionRegionAccessStrategy { } public void unlockItem(SessionImplementor session, Object key, SoftLock lock) throws CacheException { + delegate.unlockItem(key); } public void unlockRegion(SoftLock lock) throws CacheException { diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/entity/TransactionalAccess.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/entity/TransactionalAccess.java index 6c5ff83969..f06c1f3840 100644 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/entity/TransactionalAccess.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/entity/TransactionalAccess.java @@ -84,6 +84,7 @@ class TransactionalAccess implements EntityRegionAccessStrategy { } public void unlockItem(SessionImplementor session, Object key, SoftLock lock) throws CacheException { + delegate.unlockItem(key); } public void unlockRegion(SoftLock lock) throws CacheException { diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/naturalid/TransactionalAccess.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/naturalid/TransactionalAccess.java index 80bd761870..df7d927933 100644 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/naturalid/TransactionalAccess.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/naturalid/TransactionalAccess.java @@ -89,6 +89,7 @@ class TransactionalAccess implements NaturalIdRegionAccessStrategy { @Override public void unlockItem(SessionImplementor session, Object key, SoftLock lock) throws CacheException { + delegate.unlockItem(key); } @Override diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java index 5d7875c03d..1ca0070aad 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java @@ -20,8 +20,6 @@ import java.util.concurrent.atomic.AtomicReference; import org.hibernate.cache.infinispan.access.PutFromLoadValidator; import org.hibernate.test.cache.infinispan.functional.cluster.DualNodeJtaTransactionManagerImpl; import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup; -import org.infinispan.AdvancedCache; -import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.test.CacheManagerCallable; import org.infinispan.test.fwk.TestCacheManagerFactory; import org.infinispan.util.logging.Log; @@ -32,8 +30,9 @@ import org.junit.Rule; import org.junit.Test; import static org.infinispan.test.TestingUtil.withCacheManager; +import static org.infinispan.test.TestingUtil.withTx; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -87,28 +86,13 @@ public class PutFromLoadValidatorUnitTestCase { TestCacheManagerFactory.createCacheManager(false)) { @Override public void call() { - try { - PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm, - transactional ? tm : null, - PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD); - if (transactional) { - tm.begin(); - } - boolean lockable = testee.acquirePutFromLoadLock(KEY1); - try { - assertTrue(lockable); - } - finally { - if (lockable) { - testee.releasePutFromLoadLock(KEY1); - } - } - } catch (Exception e) { - throw new RuntimeException(e); - } + PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm, + transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD); + exec(transactional, new NakedPut(testee, true)); } }); } + @Test public void testRegisteredPut() throws Exception { registeredPutTest(false); @@ -124,29 +108,12 @@ public class PutFromLoadValidatorUnitTestCase { @Override public void call() { PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm, - transactional ? tm : null, - PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD); - try { - if (transactional) { - tm.begin(); - } - testee.registerPendingPut(KEY1); - - boolean lockable = testee.acquirePutFromLoadLock(KEY1); - try { - assertTrue(lockable); - } - finally { - if (lockable) { - testee.releasePutFromLoadLock(KEY1); - } - } - } catch (Exception e) { - throw new RuntimeException(e); - } + transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD); + exec(transactional, new RegularPut(testee)); } }); } + @Test public void testNakedPutAfterKeyRemoval() throws Exception { nakedPutAfterRemovalTest(false, false); @@ -171,34 +138,15 @@ public class PutFromLoadValidatorUnitTestCase { @Override public void call() { PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm, - transactional ? tm : null, - PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD); - if (removeRegion) { - testee.invalidateRegion(); - } else { - testee.invalidateKey(KEY1); - } - try { - if (transactional) { - tm.begin(); - } - - boolean lockable = testee.acquirePutFromLoadLock(KEY1); - try { - assertFalse(lockable); - } - finally { - if (lockable) { - testee.releasePutFromLoadLock(KEY1); - } - } - } catch (Exception e) { - throw new RuntimeException(e); - } + transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD); + Invalidation invalidation = new Invalidation(testee, removeRegion); + NakedPut nakedPut = new NakedPut(testee, false); + exec(transactional, invalidation, nakedPut); } }); } + @Test public void testRegisteredPutAfterKeyRemoval() throws Exception { registeredPutAfterRemovalTest(false, false); @@ -223,31 +171,10 @@ public class PutFromLoadValidatorUnitTestCase { @Override public void call() { PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm, - transactional ? tm : null, - PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD); - if (removeRegion) { - testee.invalidateRegion(); - } else { - testee.invalidateKey(KEY1); - } - try { - if (transactional) { - tm.begin(); - } - testee.registerPendingPut(KEY1); - - boolean lockable = testee.acquirePutFromLoadLock(KEY1); - try { - assertTrue(lockable); - } - finally { - if (lockable) { - testee.releasePutFromLoadLock(KEY1); - } - } - } catch (Exception e) { - throw new RuntimeException(e); - } + transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD); + Invalidation invalidation = new Invalidation(testee, removeRegion); + RegularPut regularPut = new RegularPut(testee); + exec(transactional, invalidation, regularPut); } }); @@ -277,8 +204,7 @@ public class PutFromLoadValidatorUnitTestCase { @Override public void call() { PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm, - transactional ? tm : null, - PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD); + transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD); try { if (transactional) { tm.begin(); @@ -287,17 +213,18 @@ public class PutFromLoadValidatorUnitTestCase { if (removeRegion) { testee.invalidateRegion(); } else { - testee.invalidateKey(KEY1); + testee.beginInvalidatingKey(KEY1); } - boolean lockable = testee.acquirePutFromLoadLock(KEY1); + PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1); try { - assertFalse(lockable); + assertNull(lock); } finally { - if (lockable) { - testee.releasePutFromLoadLock(KEY1); + if (lock != null) { + testee.releasePutFromLoadLock(KEY1, lock); } + testee.endInvalidatingKey(KEY1); } } catch (Exception e) { throw new RuntimeException(e); @@ -305,6 +232,7 @@ public class PutFromLoadValidatorUnitTestCase { } }); } + @Test public void testDelayedNakedPutAfterKeyRemoval() throws Exception { delayedNakedPutAfterRemovalTest(false, false); @@ -329,12 +257,13 @@ public class PutFromLoadValidatorUnitTestCase { TestCacheManagerFactory.createCacheManager(false)) { @Override public void call() { - PutFromLoadValidator testee = new TestValidator(cm.getCache().getAdvancedCache(), cm, + PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm, transactional ? tm : null, 100); if (removeRegion) { testee.invalidateRegion(); } else { - testee.invalidateKey(KEY1); + testee.beginInvalidatingKey(KEY1); + testee.endInvalidatingKey(KEY1); } try { if (transactional) { @@ -342,12 +271,12 @@ public class PutFromLoadValidatorUnitTestCase { } Thread.sleep(110); - boolean lockable = testee.acquirePutFromLoadLock(KEY1); + PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1); try { - assertTrue(lockable); + assertNotNull(lock); } finally { - if (lockable) { - testee.releasePutFromLoadLock(KEY1); + if (lock != null) { + testee.releasePutFromLoadLock(KEY1, null); } } } catch (Exception e) { @@ -389,12 +318,13 @@ public class PutFromLoadValidatorUnitTestCase { testee.registerPendingPut(KEY1); registeredLatch.countDown(); registeredLatch.await(5, TimeUnit.SECONDS); - if (testee.acquirePutFromLoadLock(KEY1)) { + PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1); + if (lock != null) { try { log.trace("Put from load lock acquired for key = " + KEY1); success.incrementAndGet(); } finally { - testee.releasePutFromLoadLock(KEY1); + testee.releasePutFromLoadLock(KEY1, lock); } } else { log.trace("Unable to acquired putFromLoad lock for key = " + KEY1); @@ -453,7 +383,8 @@ public class PutFromLoadValidatorUnitTestCase { Callable pferCallable = new Callable() { public Boolean call() throws Exception { testee.registerPendingPut(KEY1); - if (testee.acquirePutFromLoadLock(KEY1)) { + PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1); + if (lock != null) { try { removeLatch.countDown(); pferLatch.await(); @@ -461,7 +392,7 @@ public class PutFromLoadValidatorUnitTestCase { return Boolean.TRUE; } finally { - testee.releasePutFromLoadLock(KEY1); + testee.releasePutFromLoadLock(KEY1, lock); } } return Boolean.FALSE; @@ -472,7 +403,7 @@ public class PutFromLoadValidatorUnitTestCase { public Void call() throws Exception { removeLatch.await(); if (keyOnly) { - testee.invalidateKey(KEY1); + testee.beginInvalidatingKey(KEY1); } else { testee.invalidateRegion(); } @@ -505,18 +436,104 @@ public class PutFromLoadValidatorUnitTestCase { }); } - private static class TestValidator extends PutFromLoadValidator { + protected void exec(boolean transactional, Callable... callables) { + try { + if (transactional) { + for (Callable c : callables) { + withTx(tm, c); + } + } else { + for (Callable c : callables) { + c.call(); + } + } + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } - protected TestValidator(AdvancedCache cache, EmbeddedCacheManager cm, - TransactionManager transactionManager, - long nakedPutInvalidationPeriod) { - super(cache, cm, transactionManager, nakedPutInvalidationPeriod); + private class Invalidation implements Callable { + private PutFromLoadValidator putFromLoadValidator; + private boolean removeRegion; + + public Invalidation(PutFromLoadValidator putFromLoadValidator, boolean removeRegion) { + this.putFromLoadValidator = putFromLoadValidator; + this.removeRegion = removeRegion; } @Override - public int getRemovalQueueLength() { - return super.getRemovalQueueLength(); + public Void call() throws Exception { + if (removeRegion) { + boolean success = putFromLoadValidator.invalidateRegion(); + assertTrue(success); + } else { + boolean success = putFromLoadValidator.beginInvalidatingKey(KEY1); + assertTrue(success); + success = putFromLoadValidator.endInvalidatingKey(KEY1); + assertTrue(success); + } + return null; + } + } + + private class RegularPut implements Callable { + private PutFromLoadValidator putFromLoadValidator; + + public RegularPut(PutFromLoadValidator putFromLoadValidator) { + this.putFromLoadValidator = putFromLoadValidator; } + @Override + public Void call() throws Exception { + try { + putFromLoadValidator.registerPendingPut(KEY1); + + PutFromLoadValidator.Lock lock = putFromLoadValidator.acquirePutFromLoadLock(KEY1); + try { + assertNotNull(lock); + } finally { + if (lock != null) { + putFromLoadValidator.releasePutFromLoadLock(KEY1, lock); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return null; + } + } + + private class NakedPut implements Callable { + private final PutFromLoadValidator testee; + private final boolean expectSuccess; + + public NakedPut(PutFromLoadValidator testee, boolean expectSuccess) { + this.testee = testee; + this.expectSuccess = expectSuccess; + } + + @Override + public Void call() throws Exception { + try { + PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1); + try { + if (expectSuccess) { + assertNotNull(lock); + } else { + assertNull(lock); + } + } + finally { + if (lock != null) { + testee.releasePutFromLoadLock(KEY1, lock); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return null; + } } } diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/collection/AbstractCollectionRegionAccessStrategyTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/collection/AbstractCollectionRegionAccessStrategyTestCase.java index c7f234ba82..4341a82bd5 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/collection/AbstractCollectionRegionAccessStrategyTestCase.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/collection/AbstractCollectionRegionAccessStrategyTestCase.java @@ -161,8 +161,8 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs PutFromLoadValidator validator = new PutFromLoadValidator(remoteCollectionRegion.getCache(), cm, remoteTm, 20000) { @Override - public boolean acquirePutFromLoadLock(Object key) { - boolean acquired = super.acquirePutFromLoadLock( key ); + public Lock acquirePutFromLoadLock(Object key) { + Lock lock = super.acquirePutFromLoadLock( key ); try { removeLatch.countDown(); pferLatch.await( 2, TimeUnit.SECONDS ); @@ -175,7 +175,7 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs log.error( "Error", e ); throw new RuntimeException( "Error", e ); } - return acquired; + return lock; } }; diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/AbstractFunctionalTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/AbstractFunctionalTestCase.java index bf70454436..c21b493d93 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/AbstractFunctionalTestCase.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/AbstractFunctionalTestCase.java @@ -6,18 +6,33 @@ */ package org.hibernate.test.cache.infinispan.functional; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.hibernate.PessimisticLockException; import org.hibernate.Session; +import org.hibernate.cache.infinispan.InfinispanRegionFactory; +import org.hibernate.cache.infinispan.entity.EntityRegionImpl; +import org.hibernate.cache.spi.Region; import org.hibernate.stat.SecondLevelCacheStatistics; import org.hibernate.stat.Statistics; +import org.hibernate.testing.TestForIssue; +import org.infinispan.AdvancedCache; +import org.infinispan.commands.read.GetKeyValueCommand; +import org.infinispan.context.InvocationContext; +import org.infinispan.interceptors.base.BaseCustomInterceptor; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; import org.junit.Test; -import java.util.Map; -import java.util.concurrent.Callable; - import static org.infinispan.test.TestingUtil.withTx; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; /** * Parent tests for both transactional and @@ -122,4 +137,152 @@ public abstract class AbstractFunctionalTestCase extends SingleNodeTestCase { }); } + @Test + @TestForIssue(jiraKey = "HHH-9868") + public void testConcurrentRemoveAndPutFromLoad() throws Exception { + final Item item = new Item( "chris", "Chris's Item" ); + + withTx(tm, () -> { + Session s = openSession(); + s.getTransaction().begin(); + s.persist(item); + s.getTransaction().commit(); + s.close(); + return null; + }); + Region region = sessionFactory().getSecondLevelCacheRegion(Item.class.getName()); + + Phaser deletePhaser = new Phaser(2); + Phaser getPhaser = new Phaser(2); + HookInterceptor hook = new HookInterceptor(); + + AdvancedCache entityCache = ((EntityRegionImpl) region).getCache(); + AdvancedCache pendingPutsCache = entityCache.getCacheManager().getCache( + entityCache.getName() + "-" + InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME).getAdvancedCache(); + pendingPutsCache.addInterceptor(hook, 0); + + Thread deleteThread = new Thread(() -> { + try { + withTx(tm, () -> { + Session s = openSession(); + log.trace("Session opened"); + s.getTransaction().begin(); + log.trace("TX started"); + Item loadedItem = s.get(Item.class, item.getId()); + assertNotNull(loadedItem); + arriveAndAwait(deletePhaser); + arriveAndAwait(deletePhaser); + log.trace("Item loaded"); + s.delete(loadedItem); + log.trace("Item deleted"); + s.getTransaction().commit(); + log.trace("TX committed"); + // start get-thread here + arriveAndAwait(deletePhaser); + arriveAndAwait(deletePhaser); + s.close(); + log.trace("Session closed"); + return null; + }); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, "delete-thread"); + Thread getThread = new Thread(() -> { + try { + withTx(tm, () -> { + Session s = openSession(); + log.trace("Session opened"); + s.getTransaction().begin(); + log.trace("TX started"); + // DB load should happen before the record is deleted, + // putFromLoad should happen after deleteThread ends + Item loadedItem = s.get(Item.class, item.getId()); + assertNotNull(loadedItem); + s.close(); + log.trace("Session closed"); + return null; + }); + } catch (PessimisticLockException e) { + // If we end up here, database locks guard us against situation tested + // in this case and HHH-9868 cannot happen. + // (delete-thread has ITEMS table write-locked and we try to acquire read-lock) + try { + arriveAndAwait(getPhaser); + arriveAndAwait(getPhaser); + } catch (Exception e1) { + throw new RuntimeException(e1); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }, "get-thread"); + + deleteThread.start(); + // deleteThread loads the entity + arriveAndAwait(deletePhaser); + withTx(tm, () -> { + sessionFactory().getCache().evictEntity(Item.class, item.getId()); + assertFalse(sessionFactory().getCache().containsEntity(Item.class, item.getId())); + return null; + }); + arriveAndAwait(deletePhaser); + // delete thread invalidates PFER + arriveAndAwait(deletePhaser); + // get thread gets the entity from DB + hook.block(getPhaser, getThread); + getThread.start(); + arriveAndAwait(getPhaser); + arriveAndAwait(deletePhaser); + // delete thread finishes the remove from DB and cache + deleteThread.join(); + hook.unblock(); + arriveAndAwait(getPhaser); + // get thread puts the entry into cache + getThread.join(); + + withTx(tm, () -> { + Session s = openSession(); + s.getTransaction().begin(); + Item loadedItem = s.get(Item.class, item.getId()); + assertNull(loadedItem); + s.getTransaction().commit(); + s.close(); + return null; + }); + } + + protected static void arriveAndAwait(Phaser phaser) throws TimeoutException, InterruptedException { + phaser.awaitAdvanceInterruptibly(phaser.arrive(), 10, TimeUnit.SECONDS); + } + + private static class HookInterceptor extends BaseCustomInterceptor { + Phaser phaser; + Thread thread; + + public synchronized void block(Phaser phaser, Thread thread) { + this.phaser = phaser; + this.thread = thread; + } + + public synchronized void unblock() { + phaser = null; + thread = null; + } + + @Override + public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable { + Phaser phaser; + Thread thread; + synchronized (this) { + phaser = this.phaser; + thread = this.thread; + } + if (phaser != null && Thread.currentThread() == thread) { + arriveAndAwait(phaser); + arriveAndAwait(phaser); + } + return super.visitGetKeyValueCommand(ctx, command); + } + } } diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/BasicTransactionalTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/BasicTransactionalTestCase.java index ccdecf4bdd..6fca629277 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/BasicTransactionalTestCase.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/BasicTransactionalTestCase.java @@ -11,7 +11,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; import org.hibernate.Cache; import org.hibernate.Criteria; @@ -19,12 +18,10 @@ import org.hibernate.Hibernate; import org.hibernate.NaturalIdLoadAccess; import org.hibernate.Session; import org.hibernate.Transaction; -import org.hibernate.cache.infinispan.access.PutFromLoadValidator; import org.hibernate.cache.spi.entry.CacheEntry; import org.hibernate.criterion.Restrictions; import org.hibernate.stat.SecondLevelCacheStatistics; import org.hibernate.stat.Statistics; - import org.hibernate.testing.TestForIssue; import org.junit.After; import org.junit.Test; @@ -954,7 +951,6 @@ public class BasicTransactionalTestCase extends AbstractFunctionalTestCase { // TODO: Clear caches manually via cache manager (it's faster!!) this.cleanupCache(); - Thread.sleep(PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD + TimeUnit.SECONDS.toMillis(1)); stats.setStatisticsEnabled( true ); stats.clear(); diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeJtaTransactionImpl.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeJtaTransactionImpl.java index 73d68de78b..83656d776f 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeJtaTransactionImpl.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeJtaTransactionImpl.java @@ -6,13 +6,6 @@ */ package org.hibernate.test.cache.infinispan.functional.cluster; -import java.sql.Connection; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; import javax.transaction.HeuristicMixedException; import javax.transaction.HeuristicRollbackException; import javax.transaction.RollbackException; @@ -23,6 +16,13 @@ import javax.transaction.Transaction; import javax.transaction.xa.XAException; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; @@ -62,9 +62,11 @@ public class DualNodeJtaTransactionImpl implements Transaction { } else { status = Status.STATUS_PREPARING; - for (int i = 0; i < synchronizations.size(); i++) { - Synchronization s = (Synchronization) synchronizations.get(i); - s.beforeCompletion(); + if (synchronizations != null) { + for (int i = 0; i < synchronizations.size(); i++) { + Synchronization s = (Synchronization) synchronizations.get(i); + s.beforeCompletion(); + } } if (!runXaResourcePrepare()) { @@ -89,9 +91,11 @@ public class DualNodeJtaTransactionImpl implements Transaction { status = Status.STATUS_COMMITTED; - for (int i = 0; i < synchronizations.size(); i++) { - Synchronization s = (Synchronization) synchronizations.get(i); - s.afterCompletion(status); + if (synchronizations != null) { + for (int i = 0; i < synchronizations.size(); i++) { + Synchronization s = (Synchronization) synchronizations.get(i); + s.afterCompletion(status); + } } // status = Status.STATUS_NO_TRANSACTION; diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/tm/XaTransactionManagerImpl.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/tm/XaTransactionManagerImpl.java index a02943159d..17ae60596c 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/tm/XaTransactionManagerImpl.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/tm/XaTransactionManagerImpl.java @@ -23,41 +23,44 @@ import javax.transaction.TransactionManager; */ public class XaTransactionManagerImpl implements TransactionManager { private static final XaTransactionManagerImpl INSTANCE = new XaTransactionManagerImpl(); - private XaTransactionImpl currentTransaction; + private final ThreadLocal currentTransaction = new ThreadLocal<>(); public static XaTransactionManagerImpl getInstance() { return INSTANCE; } public int getStatus() throws SystemException { + XaTransactionImpl currentTransaction = this.currentTransaction.get(); return currentTransaction == null ? Status.STATUS_NO_TRANSACTION : currentTransaction.getStatus(); } public Transaction getTransaction() throws SystemException { - return currentTransaction; + return currentTransaction.get(); } public XaTransactionImpl getCurrentTransaction() { - return currentTransaction; + return currentTransaction.get(); } public void begin() throws NotSupportedException, SystemException { - currentTransaction = new XaTransactionImpl(this); + if (currentTransaction.get() != null) throw new IllegalStateException("Transaction already started."); + currentTransaction.set(new XaTransactionImpl(this)); } public Transaction suspend() throws SystemException { - Transaction suspended = currentTransaction; - currentTransaction = null; + Transaction suspended = currentTransaction.get(); + currentTransaction.remove(); return suspended; } public void resume(Transaction transaction) throws InvalidTransactionException, IllegalStateException, SystemException { - currentTransaction = (XaTransactionImpl) transaction; + currentTransaction.set((XaTransactionImpl) transaction); } public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException, SecurityException, IllegalStateException, SystemException { + XaTransactionImpl currentTransaction = this.currentTransaction.get(); if (currentTransaction == null) { throw new IllegalStateException("no current transaction to commit"); } @@ -65,6 +68,7 @@ public class XaTransactionManagerImpl implements TransactionManager { } public void rollback() throws IllegalStateException, SecurityException, SystemException { + XaTransactionImpl currentTransaction = this.currentTransaction.get(); if (currentTransaction == null) { throw new IllegalStateException("no current transaction"); } @@ -72,6 +76,7 @@ public class XaTransactionManagerImpl implements TransactionManager { } public void setRollbackOnly() throws IllegalStateException, SystemException { + XaTransactionImpl currentTransaction = this.currentTransaction.get(); if (currentTransaction == null) { throw new IllegalStateException("no current transaction"); } @@ -82,8 +87,6 @@ public class XaTransactionManagerImpl implements TransactionManager { } void endCurrent(Transaction transaction) { - if (transaction == currentTransaction) { - currentTransaction = null; - } + currentTransaction.remove(); } }