invalidationUpdater
+ = AtomicLongFieldUpdater.newUpdater(PutFromLoadValidator.class, "invalidationTimestamp");
/**
* Creates a new put from load validator instance.
@@ -133,9 +126,7 @@ public class PutFromLoadValidator {
public PutFromLoadValidator(
AdvancedCache cache, TransactionManager transactionManager,
long nakedPutInvalidationPeriod) {
- this(cache, cache.getCacheManager(), transactionManager,
- nakedPutInvalidationPeriod
- );
+ this(cache, cache.getCacheManager(), transactionManager, nakedPutInvalidationPeriod);
}
/**
@@ -170,81 +161,100 @@ public class PutFromLoadValidator {
// ----------------------------------------------------------------- Public
+ /**
+ * Marker for lock acquired in {@link #acquirePutFromLoadLock(Object)}
+ */
+ public static class Lock {
+ protected Lock() {}
+ }
+
/**
* Acquire a lock giving the calling thread the right to put data in the
* cache for the given key.
*
* NOTE: A call to this method that returns true
- * should always be matched with a call to {@link #releasePutFromLoadLock(Object)}.
+ * should always be matched with a call to {@link #releasePutFromLoadLock(Object, Lock)}.
*
*
* @param key the key
*
- * @return true
if the lock is acquired and the cache put
- * can proceed; false
if the data should not be cached
+ * @return AcquiredLock
if the lock is acquired and the cache put
+ * can proceed; null
if the data should not be cached
*/
- public boolean acquirePutFromLoadLock(Object key) {
+ public Lock acquirePutFromLoadLock(Object key) {
boolean valid = false;
boolean locked = false;
- final long now = System.currentTimeMillis();
+ long now = Long.MIN_VALUE;
- try {
- final PendingPutMap pending = pendingPuts.get( key );
- if ( pending != null ) {
- locked = pending.acquireLock( 100, TimeUnit.MILLISECONDS );
- if ( locked ) {
- try {
- final PendingPut toCancel = pending.remove( getOwnerForPut() );
- if ( toCancel != null ) {
- valid = !toCancel.completed;
- toCancel.completed = true;
+ PendingPutMap pending = pendingPuts.get( key );
+ for (;;) {
+ try {
+ if (pending != null) {
+ locked = pending.acquireLock(100, TimeUnit.MILLISECONDS);
+ if (locked) {
+ try {
+ final PendingPut toCancel = pending.remove(getOwnerForPut());
+ if (toCancel != null) {
+ valid = !toCancel.completed;
+ toCancel.completed = true;
+ } else {
+ // this is a naked put
+ if (pending.hasInvalidator()) {
+ valid = false;
+ } else {
+ if (now == Long.MIN_VALUE) {
+ now = System.currentTimeMillis();
+ }
+ valid = now > pending.nakedPutsDeadline;
+ }
+ }
+ return valid ? pending : null;
+ } finally {
+ if (!valid) {
+ pending.releaseLock();
+ locked = false;
+ }
+ }
+ } else {
+ // oops, we have leaked record for this owner, but we don't want to wait here
+ return null;
+ }
+ } else {
+ // Key wasn't in pendingPuts, so either this is a "naked put"
+ // or regionRemoved has been called. Check if we can proceed
+ long invalidationTimestamp = this.invalidationTimestamp;
+ if (invalidationTimestamp != Long.MIN_VALUE) {
+ now = System.currentTimeMillis();
+ if (now > invalidationTimestamp) {
+ // time is +- monotonic se don't let other threads do the expensive currentTimeMillis()
+ invalidationUpdater.compareAndSet(this, invalidationTimestamp, Long.MIN_VALUE);
+ } else {
+ return null;
}
}
- finally {
- if ( !valid ) {
- pending.releaseLock();
- locked = false;
- }
+
+ PendingPut pendingPut = new PendingPut(getOwnerForPut());
+ pending = new PendingPutMap(pendingPut);
+ PendingPutMap existing = pendingPuts.putIfAbsent(key, pending);
+ if (existing != null) {
+ pending = existing;
}
+ // continue in next loop with lock acquisition
}
- }
- else {
- // Key wasn't in pendingPuts, so either this is a "naked put"
- // or regionRemoved has been called. Check if we can proceed
- if ( now > invalidationTimestamp ) {
- final Long removedTime = recentRemovals.get( key );
- if ( removedTime == null || now > removedTime ) {
- // It's legal to proceed. But we have to record this key
- // in pendingPuts so releasePutFromLoadLock can find it.
- // To do this we basically simulate a normal "register
- // then acquire lock" pattern
- registerPendingPut( key );
- locked = acquirePutFromLoadLock( key );
- valid = locked;
- }
+ } catch (Throwable t) {
+ if (locked) {
+ pending.releaseLock();
+ }
+
+ if (t instanceof RuntimeException) {
+ throw (RuntimeException) t;
+ } else if (t instanceof Error) {
+ throw (Error) t;
+ } else {
+ throw new RuntimeException(t);
}
}
}
- catch (Throwable t) {
- if ( locked ) {
- final PendingPutMap toRelease = pendingPuts.get( key );
- if ( toRelease != null ) {
- toRelease.releaseLock();
- }
- }
-
- if ( t instanceof RuntimeException ) {
- throw (RuntimeException) t;
- }
- else if ( t instanceof Error ) {
- throw (Error) t;
- }
- else {
- throw new RuntimeException( t );
- }
- }
-
- return valid;
}
/**
@@ -253,87 +263,16 @@ public class PutFromLoadValidator {
*
* @param key the key
*/
- public void releasePutFromLoadLock(Object key) {
- final PendingPutMap pending = pendingPuts.get( key );
+ public void releasePutFromLoadLock(Object key, Lock lock) {
+ final PendingPutMap pending = (PendingPutMap) lock;
if ( pending != null ) {
- if ( pending.size() == 0 ) {
+ if ( pending.canRemove() ) {
pendingPuts.remove( key, pending );
}
pending.releaseLock();
}
}
- /**
- * Invalidates any {@link #registerPendingPut(Object) previously registered pending puts} ensuring a subsequent call to
- * {@link #acquirePutFromLoadLock(Object)} will return false
. This method will block until any
- * concurrent thread that has {@link #acquirePutFromLoadLock(Object) acquired the putFromLoad lock} for the given key
- * has released the lock. This allows the caller to be certain the putFromLoad will not execute after this method
- * returns, possibly caching stale data.
- *
- * @param key key identifying data whose pending puts should be invalidated
- *
- * @return true
if the invalidation was successful; false
if a problem occured (which the
- * caller should treat as an exception condition)
- */
- public boolean invalidateKey(Object key) {
- boolean success = true;
-
- // Invalidate any pending puts
- final PendingPutMap pending = pendingPuts.get( key );
- if ( pending != null ) {
- // This lock should be available very quickly, but we'll be
- // very patient waiting for it as callers should treat not
- // acquiring it as an exception condition
- if ( pending.acquireLock( 60, TimeUnit.SECONDS ) ) {
- try {
- pending.invalidate();
- }
- finally {
- pending.releaseLock();
- }
- }
- else {
- success = false;
- }
- }
-
- // Record when this occurred to invalidate later naked puts
- final RecentRemoval removal = new RecentRemoval( key, this.nakedPutInvalidationPeriod );
- recentRemovals.put( key, removal.timestamp );
-
- // Don't let recentRemovals map become a memory leak
- RecentRemoval toClean = null;
- final boolean attemptClean = removal.timestamp > earliestRemovalTimestamp;
- removalsLock.lock();
- try {
- removalsQueue.add( removal );
-
- if ( attemptClean ) {
- if ( removalsQueue.size() > 1 ) {
- // we have at least one as we just added it
- toClean = removalsQueue.remove( 0 );
- }
- earliestRemovalTimestamp = removalsQueue.get( 0 ).timestamp;
- }
- }
- finally {
- removalsLock.unlock();
- }
-
- if ( toClean != null ) {
- Long cleaned = recentRemovals.get( toClean.key );
- if ( cleaned != null && cleaned.equals( toClean.timestamp ) ) {
- cleaned = recentRemovals.remove( toClean.key );
- if ( cleaned != null && !cleaned.equals( toClean.timestamp ) ) {
- // Oops; removed the wrong timestamp; restore it
- recentRemovals.putIfAbsent( toClean.key, cleaned );
- }
- }
- }
-
- return success;
- }
-
/**
* Invalidates all {@link #registerPendingPut(Object) previously registered pending puts} ensuring a subsequent call to
* {@link #acquirePutFromLoadLock(Object)} will return false
. This method will block until any
@@ -345,15 +284,16 @@ public class PutFromLoadValidator {
* caller should treat as an exception condition)
*/
public boolean invalidateRegion() {
-
- boolean ok = false;
- invalidationTimestamp = System.currentTimeMillis() + this.nakedPutInvalidationPeriod;
-
+ // TODO: not sure what happens with locks acquired *after* calling this method but before
+ // the actual invalidation
+ boolean ok = true;
+ invalidationUpdater.set(this, System.currentTimeMillis() + nakedPutInvalidationPeriod);
try {
// Acquire the lock for each entry to ensure any ongoing
// work associated with it is completed before we return
- for ( PendingPutMap entry : pendingPuts.values() ) {
+ for ( Iterator it = pendingPuts.values().iterator(); it.hasNext(); ) {
+ PendingPutMap entry = it.next();
if ( entry.acquireLock( 60, TimeUnit.SECONDS ) ) {
try {
entry.invalidate();
@@ -361,30 +301,15 @@ public class PutFromLoadValidator {
finally {
entry.releaseLock();
}
+ it.remove();
}
else {
ok = false;
}
}
-
- removalsLock.lock();
- try {
- recentRemovals.clear();
- removalsQueue.clear();
-
- ok = true;
-
- }
- finally {
- removalsLock.unlock();
- }
- }
- catch (Exception e) {
+ } catch (Exception e) {
ok = false;
}
- finally {
- earliestRemovalTimestamp = invalidationTimestamp;
- }
return ok;
}
@@ -395,8 +320,7 @@ public class PutFromLoadValidator {
* wherein it is expected that a database read plus cache put will occur. Calling this method allows the validator to
* treat the subsequent acquirePutFromLoadLock
as if the database read occurred when this method was
* invoked. This allows the validator to compare the timestamp of this call against the timestamp of subsequent removal
- * notifications. A put that occurs without this call preceding it is "naked"; i.e the validator must assume the put is
- * not valid if any relevant removal has occurred within {@link #NAKED_PUT_INVALIDATION_PERIOD} milliseconds.
+ * notifications.
*
* @param key key that will be used for subsequent cache put
*/
@@ -404,50 +328,83 @@ public class PutFromLoadValidator {
final PendingPut pendingPut = new PendingPut( getOwnerForPut() );
final PendingPutMap pendingForKey = new PendingPutMap( pendingPut );
- for (; ; ) {
- final PendingPutMap existing = pendingPuts.putIfAbsent( key, pendingForKey );
- if ( existing != null ) {
- if ( existing.acquireLock( 10, TimeUnit.SECONDS ) ) {
-
- try {
- existing.put( pendingPut );
- final PendingPutMap doublecheck = pendingPuts.putIfAbsent( key, existing );
- if ( doublecheck == null || doublecheck == existing ) {
- break;
- }
- // else we hit a race and need to loop to try again
+ final PendingPutMap existing = pendingPuts.putIfAbsent( key, pendingForKey );
+ if ( existing != null ) {
+ if ( existing.acquireLock( 10, TimeUnit.SECONDS ) ) {
+ try {
+ if ( !existing.hasInvalidator() ) {
+ existing.put(pendingPut);
}
- finally {
- existing.releaseLock();
- }
- }
- else {
- // Can't get the lock; when we come back we'll be a "naked put"
- break;
+ } finally {
+ existing.releaseLock();
}
}
else {
- // normal case
- break;
+ // Can't get the lock; when we come back we'll be a "naked put"
}
}
}
- // -------------------------------------------------------------- Protected
-
/**
- * Only for use by unit tests; may be removed at any time
+ * Invalidates any {@link #registerPendingPut(Object) previously registered pending puts}
+ * and disables further registrations ensuring a subsequent call to {@link #acquirePutFromLoadLock(Object)}
+ * will return false
. This method will block until any concurrent thread that has
+ * {@link #acquirePutFromLoadLock(Object) acquired the putFromLoad lock} for the given key
+ * has released the lock. This allows the caller to be certain the putFromLoad will not execute after this method
+ * returns, possibly caching stale data.
+ * After this transaction completes, {@link #endInvalidatingKey(Object)} needs to be called }
+ *
+ * @param key key identifying data whose pending puts should be invalidated
+ *
+ * @return true
if the invalidation was successful; false
if a problem occured (which the
+ * caller should treat as an exception condition)
*/
- protected int getRemovalQueueLength() {
- removalsLock.lock();
- try {
- return removalsQueue.size();
+ public boolean beginInvalidatingKey(Object key) {
+ PendingPutMap pending = new PendingPutMap(null);
+ PendingPutMap prev = pendingPuts.putIfAbsent(key, pending);
+ if (prev != null) {
+ pending = prev;
}
- finally {
- removalsLock.unlock();
+ if (pending.acquireLock(60, TimeUnit.SECONDS)) {
+ try {
+ pending.invalidate();
+ pending.addInvalidator(getOwnerForPut(), System.currentTimeMillis() + nakedPutInvalidationPeriod);
+ } finally {
+ pending.releaseLock();
+ }
+ return true;
+ } else {
+ return false;
}
}
+ /**
+ * Called after the transaction completes, allowing caching of entries. It is possible that this method
+ * is called without previous invocation of {@link #beginInvalidatingKey(Object)}, then it should be noop.
+ *
+ * @param key
+ * @return
+ */
+ public boolean endInvalidatingKey(Object key) {
+ PendingPutMap pending = pendingPuts.get(key);
+ if (pending == null) {
+ return true;
+ }
+ if (pending.acquireLock(60, TimeUnit.SECONDS)) {
+ try {
+ pending.removeInvalidator(getOwnerForPut());
+ // we can't remove the pending put yet because we wait for naked puts
+ // pendingPuts should be configured with maxIdle time so won't have memory leak
+ return true;
+ } finally {
+ pending.releaseLock();
+ }
+ } else {
+ return false;
+ }
+ }
+
+
// ---------------------------------------------------------------- Private
private Object getOwnerForPut() {
@@ -470,10 +427,13 @@ public class PutFromLoadValidator {
*
* This class is NOT THREAD SAFE. All operations on it must be performed with the lock held.
*/
- private static class PendingPutMap {
+ private static class PendingPutMap extends Lock {
private PendingPut singlePendingPut;
private Map fullMap;
- private final Lock lock = new ReentrantLock();
+ private final java.util.concurrent.locks.Lock lock = new ReentrantLock();
+ private Object singleInvalidator;
+ private Set invalidators;
+ private long nakedPutsDeadline = Long.MIN_VALUE;
PendingPutMap(PendingPut singleItem) {
this.singlePendingPut = singleItem;
@@ -546,6 +506,41 @@ public class PutFromLoadValidator {
fullMap = null;
}
}
+
+ public void addInvalidator(Object invalidator, long deadline) {
+ if (invalidators == null) {
+ if (singleInvalidator == null) {
+ singleInvalidator = invalidator;
+ } else {
+ invalidators = new HashSet();
+ invalidators.add(singleInvalidator);
+ invalidators.add(invalidator);
+ singleInvalidator = null;
+ }
+ } else {
+ invalidators.add(invalidator);
+ }
+ nakedPutsDeadline = Math.max(nakedPutsDeadline, deadline);
+ }
+
+ public boolean hasInvalidator() {
+ return singleInvalidator != null || (invalidators != null && !invalidators.isEmpty());
+ }
+
+ public void removeInvalidator(Object invalidator) {
+ if (invalidators == null) {
+ if (singleInvalidator != null && singleInvalidator.equals(invalidator)) {
+ singleInvalidator = null;
+ }
+ } else {
+ invalidators.remove(invalidator);
+ }
+ }
+
+ public boolean canRemove() {
+ return size() == 0 && !hasInvalidator() &&
+ (nakedPutsDeadline == Long.MIN_VALUE || nakedPutsDeadline < System.currentTimeMillis());
+ }
}
private static class PendingPut {
@@ -556,15 +551,4 @@ public class PutFromLoadValidator {
this.owner = owner;
}
}
-
- private static class RecentRemoval {
- private final Object key;
- private final Long timestamp;
-
- private RecentRemoval(Object key, long nakedPutInvalidationPeriod) {
- this.key = key;
- timestamp = System.currentTimeMillis() + nakedPutInvalidationPeriod;
- }
- }
-
}
diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TransactionalAccessDelegate.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TransactionalAccessDelegate.java
index bafe2d9800..3c196784e7 100755
--- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TransactionalAccessDelegate.java
+++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TransactionalAccessDelegate.java
@@ -9,7 +9,6 @@ package org.hibernate.cache.infinispan.access;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.impl.BaseRegion;
import org.hibernate.cache.infinispan.util.Caches;
-
import org.infinispan.AdvancedCache;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@@ -111,7 +110,8 @@ public class TransactionalAccessDelegate {
return false;
}
- if ( !putValidator.acquirePutFromLoadLock( key ) ) {
+ PutFromLoadValidator.Lock lock = putValidator.acquirePutFromLoadLock(key);
+ if ( lock == null) {
if ( TRACE_ENABLED ) {
log.tracef( "Put from load lock not acquired for key %s", key );
}
@@ -131,7 +131,7 @@ public class TransactionalAccessDelegate {
}
}
finally {
- putValidator.releasePutFromLoadLock( key );
+ putValidator.releasePutFromLoadLock( key, lock);
}
return true;
@@ -185,7 +185,7 @@ public class TransactionalAccessDelegate {
* @throws CacheException if removing the cached item fails
*/
public void remove(Object key) throws CacheException {
- if ( !putValidator.invalidateKey( key ) ) {
+ if ( !putValidator.beginInvalidatingKey(key)) {
throw new CacheException(
"Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName()
);
@@ -216,7 +216,7 @@ public class TransactionalAccessDelegate {
* @throws CacheException if evicting the item fails
*/
public void evict(Object key) throws CacheException {
- if ( !putValidator.invalidateKey( key ) ) {
+ if ( !putValidator.beginInvalidatingKey(key)) {
throw new CacheException(
"Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName()
);
@@ -240,4 +240,19 @@ public class TransactionalAccessDelegate {
Caches.broadcastEvictAll( cache );
}
+ /**
+ * Called when we have finished the attempted update/delete (which may or
+ * may not have been successful), after transaction completion. This method
+ * is used by "asynchronous" concurrency strategies.
+ *
+ * @param key The item key
+ * @throws org.hibernate.cache.CacheException Propogated from underlying {@link org.hibernate.cache.spi.Region}
+ */
+ public void unlockItem(Object key) throws CacheException {
+ if ( !putValidator.endInvalidatingKey(key) ) {
+ // TODO: localization
+ log.warn("Failed to end invalidating pending putFromLoad calls for key " + key + " from region "
+ + region.getName() + "; the key won't be cached in the future.");
+ }
+ }
}
diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/collection/TransactionalAccess.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/collection/TransactionalAccess.java
index b3791254bd..3850ff66f5 100644
--- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/collection/TransactionalAccess.java
+++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/collection/TransactionalAccess.java
@@ -75,6 +75,7 @@ class TransactionalAccess implements CollectionRegionAccessStrategy {
}
public void unlockItem(SessionImplementor session, Object key, SoftLock lock) throws CacheException {
+ delegate.unlockItem(key);
}
public void unlockRegion(SoftLock lock) throws CacheException {
diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/entity/TransactionalAccess.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/entity/TransactionalAccess.java
index 6c5ff83969..f06c1f3840 100644
--- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/entity/TransactionalAccess.java
+++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/entity/TransactionalAccess.java
@@ -84,6 +84,7 @@ class TransactionalAccess implements EntityRegionAccessStrategy {
}
public void unlockItem(SessionImplementor session, Object key, SoftLock lock) throws CacheException {
+ delegate.unlockItem(key);
}
public void unlockRegion(SoftLock lock) throws CacheException {
diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/naturalid/TransactionalAccess.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/naturalid/TransactionalAccess.java
index 80bd761870..df7d927933 100644
--- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/naturalid/TransactionalAccess.java
+++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/naturalid/TransactionalAccess.java
@@ -89,6 +89,7 @@ class TransactionalAccess implements NaturalIdRegionAccessStrategy {
@Override
public void unlockItem(SessionImplementor session, Object key, SoftLock lock) throws CacheException {
+ delegate.unlockItem(key);
}
@Override
diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java
index 5d7875c03d..1ca0070aad 100644
--- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java
+++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java
@@ -20,8 +20,6 @@ import java.util.concurrent.atomic.AtomicReference;
import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
import org.hibernate.test.cache.infinispan.functional.cluster.DualNodeJtaTransactionManagerImpl;
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
-import org.infinispan.AdvancedCache;
-import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.CacheManagerCallable;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.util.logging.Log;
@@ -32,8 +30,9 @@ import org.junit.Rule;
import org.junit.Test;
import static org.infinispan.test.TestingUtil.withCacheManager;
+import static org.infinispan.test.TestingUtil.withTx;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -87,28 +86,13 @@ public class PutFromLoadValidatorUnitTestCase {
TestCacheManagerFactory.createCacheManager(false)) {
@Override
public void call() {
- try {
- PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
- transactional ? tm : null,
- PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
- if (transactional) {
- tm.begin();
- }
- boolean lockable = testee.acquirePutFromLoadLock(KEY1);
- try {
- assertTrue(lockable);
- }
- finally {
- if (lockable) {
- testee.releasePutFromLoadLock(KEY1);
- }
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
+ transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
+ exec(transactional, new NakedPut(testee, true));
}
});
}
+
@Test
public void testRegisteredPut() throws Exception {
registeredPutTest(false);
@@ -124,29 +108,12 @@ public class PutFromLoadValidatorUnitTestCase {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
- transactional ? tm : null,
- PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
- try {
- if (transactional) {
- tm.begin();
- }
- testee.registerPendingPut(KEY1);
-
- boolean lockable = testee.acquirePutFromLoadLock(KEY1);
- try {
- assertTrue(lockable);
- }
- finally {
- if (lockable) {
- testee.releasePutFromLoadLock(KEY1);
- }
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
+ exec(transactional, new RegularPut(testee));
}
});
}
+
@Test
public void testNakedPutAfterKeyRemoval() throws Exception {
nakedPutAfterRemovalTest(false, false);
@@ -171,34 +138,15 @@ public class PutFromLoadValidatorUnitTestCase {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
- transactional ? tm : null,
- PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
- if (removeRegion) {
- testee.invalidateRegion();
- } else {
- testee.invalidateKey(KEY1);
- }
- try {
- if (transactional) {
- tm.begin();
- }
-
- boolean lockable = testee.acquirePutFromLoadLock(KEY1);
- try {
- assertFalse(lockable);
- }
- finally {
- if (lockable) {
- testee.releasePutFromLoadLock(KEY1);
- }
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
+ Invalidation invalidation = new Invalidation(testee, removeRegion);
+ NakedPut nakedPut = new NakedPut(testee, false);
+ exec(transactional, invalidation, nakedPut);
}
});
}
+
@Test
public void testRegisteredPutAfterKeyRemoval() throws Exception {
registeredPutAfterRemovalTest(false, false);
@@ -223,31 +171,10 @@ public class PutFromLoadValidatorUnitTestCase {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
- transactional ? tm : null,
- PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
- if (removeRegion) {
- testee.invalidateRegion();
- } else {
- testee.invalidateKey(KEY1);
- }
- try {
- if (transactional) {
- tm.begin();
- }
- testee.registerPendingPut(KEY1);
-
- boolean lockable = testee.acquirePutFromLoadLock(KEY1);
- try {
- assertTrue(lockable);
- }
- finally {
- if (lockable) {
- testee.releasePutFromLoadLock(KEY1);
- }
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
+ transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
+ Invalidation invalidation = new Invalidation(testee, removeRegion);
+ RegularPut regularPut = new RegularPut(testee);
+ exec(transactional, invalidation, regularPut);
}
});
@@ -277,8 +204,7 @@ public class PutFromLoadValidatorUnitTestCase {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
- transactional ? tm : null,
- PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
+ transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
try {
if (transactional) {
tm.begin();
@@ -287,17 +213,18 @@ public class PutFromLoadValidatorUnitTestCase {
if (removeRegion) {
testee.invalidateRegion();
} else {
- testee.invalidateKey(KEY1);
+ testee.beginInvalidatingKey(KEY1);
}
- boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+ PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1);
try {
- assertFalse(lockable);
+ assertNull(lock);
}
finally {
- if (lockable) {
- testee.releasePutFromLoadLock(KEY1);
+ if (lock != null) {
+ testee.releasePutFromLoadLock(KEY1, lock);
}
+ testee.endInvalidatingKey(KEY1);
}
} catch (Exception e) {
throw new RuntimeException(e);
@@ -305,6 +232,7 @@ public class PutFromLoadValidatorUnitTestCase {
}
});
}
+
@Test
public void testDelayedNakedPutAfterKeyRemoval() throws Exception {
delayedNakedPutAfterRemovalTest(false, false);
@@ -329,12 +257,13 @@ public class PutFromLoadValidatorUnitTestCase {
TestCacheManagerFactory.createCacheManager(false)) {
@Override
public void call() {
- PutFromLoadValidator testee = new TestValidator(cm.getCache().getAdvancedCache(), cm,
+ PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
transactional ? tm : null, 100);
if (removeRegion) {
testee.invalidateRegion();
} else {
- testee.invalidateKey(KEY1);
+ testee.beginInvalidatingKey(KEY1);
+ testee.endInvalidatingKey(KEY1);
}
try {
if (transactional) {
@@ -342,12 +271,12 @@ public class PutFromLoadValidatorUnitTestCase {
}
Thread.sleep(110);
- boolean lockable = testee.acquirePutFromLoadLock(KEY1);
+ PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1);
try {
- assertTrue(lockable);
+ assertNotNull(lock);
} finally {
- if (lockable) {
- testee.releasePutFromLoadLock(KEY1);
+ if (lock != null) {
+ testee.releasePutFromLoadLock(KEY1, null);
}
}
} catch (Exception e) {
@@ -389,12 +318,13 @@ public class PutFromLoadValidatorUnitTestCase {
testee.registerPendingPut(KEY1);
registeredLatch.countDown();
registeredLatch.await(5, TimeUnit.SECONDS);
- if (testee.acquirePutFromLoadLock(KEY1)) {
+ PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1);
+ if (lock != null) {
try {
log.trace("Put from load lock acquired for key = " + KEY1);
success.incrementAndGet();
} finally {
- testee.releasePutFromLoadLock(KEY1);
+ testee.releasePutFromLoadLock(KEY1, lock);
}
} else {
log.trace("Unable to acquired putFromLoad lock for key = " + KEY1);
@@ -453,7 +383,8 @@ public class PutFromLoadValidatorUnitTestCase {
Callable pferCallable = new Callable() {
public Boolean call() throws Exception {
testee.registerPendingPut(KEY1);
- if (testee.acquirePutFromLoadLock(KEY1)) {
+ PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1);
+ if (lock != null) {
try {
removeLatch.countDown();
pferLatch.await();
@@ -461,7 +392,7 @@ public class PutFromLoadValidatorUnitTestCase {
return Boolean.TRUE;
}
finally {
- testee.releasePutFromLoadLock(KEY1);
+ testee.releasePutFromLoadLock(KEY1, lock);
}
}
return Boolean.FALSE;
@@ -472,7 +403,7 @@ public class PutFromLoadValidatorUnitTestCase {
public Void call() throws Exception {
removeLatch.await();
if (keyOnly) {
- testee.invalidateKey(KEY1);
+ testee.beginInvalidatingKey(KEY1);
} else {
testee.invalidateRegion();
}
@@ -505,18 +436,104 @@ public class PutFromLoadValidatorUnitTestCase {
});
}
- private static class TestValidator extends PutFromLoadValidator {
+ protected void exec(boolean transactional, Callable>... callables) {
+ try {
+ if (transactional) {
+ for (Callable> c : callables) {
+ withTx(tm, c);
+ }
+ } else {
+ for (Callable> c : callables) {
+ c.call();
+ }
+ }
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
- protected TestValidator(AdvancedCache cache, EmbeddedCacheManager cm,
- TransactionManager transactionManager,
- long nakedPutInvalidationPeriod) {
- super(cache, cm, transactionManager, nakedPutInvalidationPeriod);
+ private class Invalidation implements Callable {
+ private PutFromLoadValidator putFromLoadValidator;
+ private boolean removeRegion;
+
+ public Invalidation(PutFromLoadValidator putFromLoadValidator, boolean removeRegion) {
+ this.putFromLoadValidator = putFromLoadValidator;
+ this.removeRegion = removeRegion;
}
@Override
- public int getRemovalQueueLength() {
- return super.getRemovalQueueLength();
+ public Void call() throws Exception {
+ if (removeRegion) {
+ boolean success = putFromLoadValidator.invalidateRegion();
+ assertTrue(success);
+ } else {
+ boolean success = putFromLoadValidator.beginInvalidatingKey(KEY1);
+ assertTrue(success);
+ success = putFromLoadValidator.endInvalidatingKey(KEY1);
+ assertTrue(success);
+ }
+ return null;
+ }
+ }
+
+ private class RegularPut implements Callable {
+ private PutFromLoadValidator putFromLoadValidator;
+
+ public RegularPut(PutFromLoadValidator putFromLoadValidator) {
+ this.putFromLoadValidator = putFromLoadValidator;
}
+ @Override
+ public Void call() throws Exception {
+ try {
+ putFromLoadValidator.registerPendingPut(KEY1);
+
+ PutFromLoadValidator.Lock lock = putFromLoadValidator.acquirePutFromLoadLock(KEY1);
+ try {
+ assertNotNull(lock);
+ } finally {
+ if (lock != null) {
+ putFromLoadValidator.releasePutFromLoadLock(KEY1, lock);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+ }
+
+ private class NakedPut implements Callable {
+ private final PutFromLoadValidator testee;
+ private final boolean expectSuccess;
+
+ public NakedPut(PutFromLoadValidator testee, boolean expectSuccess) {
+ this.testee = testee;
+ this.expectSuccess = expectSuccess;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ try {
+ PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1);
+ try {
+ if (expectSuccess) {
+ assertNotNull(lock);
+ } else {
+ assertNull(lock);
+ }
+ }
+ finally {
+ if (lock != null) {
+ testee.releasePutFromLoadLock(KEY1, lock);
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
}
}
diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/collection/AbstractCollectionRegionAccessStrategyTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/collection/AbstractCollectionRegionAccessStrategyTestCase.java
index c7f234ba82..4341a82bd5 100644
--- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/collection/AbstractCollectionRegionAccessStrategyTestCase.java
+++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/collection/AbstractCollectionRegionAccessStrategyTestCase.java
@@ -161,8 +161,8 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
PutFromLoadValidator validator = new PutFromLoadValidator(remoteCollectionRegion.getCache(), cm,
remoteTm, 20000) {
@Override
- public boolean acquirePutFromLoadLock(Object key) {
- boolean acquired = super.acquirePutFromLoadLock( key );
+ public Lock acquirePutFromLoadLock(Object key) {
+ Lock lock = super.acquirePutFromLoadLock( key );
try {
removeLatch.countDown();
pferLatch.await( 2, TimeUnit.SECONDS );
@@ -175,7 +175,7 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
log.error( "Error", e );
throw new RuntimeException( "Error", e );
}
- return acquired;
+ return lock;
}
};
diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/AbstractFunctionalTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/AbstractFunctionalTestCase.java
index bf70454436..c21b493d93 100644
--- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/AbstractFunctionalTestCase.java
+++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/AbstractFunctionalTestCase.java
@@ -6,18 +6,33 @@
*/
package org.hibernate.test.cache.infinispan.functional;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.hibernate.PessimisticLockException;
import org.hibernate.Session;
+import org.hibernate.cache.infinispan.InfinispanRegionFactory;
+import org.hibernate.cache.infinispan.entity.EntityRegionImpl;
+import org.hibernate.cache.spi.Region;
import org.hibernate.stat.SecondLevelCacheStatistics;
import org.hibernate.stat.Statistics;
+import org.hibernate.testing.TestForIssue;
+import org.infinispan.AdvancedCache;
+import org.infinispan.commands.read.GetKeyValueCommand;
+import org.infinispan.context.InvocationContext;
+import org.infinispan.interceptors.base.BaseCustomInterceptor;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.junit.Test;
-import java.util.Map;
-import java.util.concurrent.Callable;
-
import static org.infinispan.test.TestingUtil.withTx;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
/**
* Parent tests for both transactional and
@@ -122,4 +137,152 @@ public abstract class AbstractFunctionalTestCase extends SingleNodeTestCase {
});
}
+ @Test
+ @TestForIssue(jiraKey = "HHH-9868")
+ public void testConcurrentRemoveAndPutFromLoad() throws Exception {
+ final Item item = new Item( "chris", "Chris's Item" );
+
+ withTx(tm, () -> {
+ Session s = openSession();
+ s.getTransaction().begin();
+ s.persist(item);
+ s.getTransaction().commit();
+ s.close();
+ return null;
+ });
+ Region region = sessionFactory().getSecondLevelCacheRegion(Item.class.getName());
+
+ Phaser deletePhaser = new Phaser(2);
+ Phaser getPhaser = new Phaser(2);
+ HookInterceptor hook = new HookInterceptor();
+
+ AdvancedCache entityCache = ((EntityRegionImpl) region).getCache();
+ AdvancedCache pendingPutsCache = entityCache.getCacheManager().getCache(
+ entityCache.getName() + "-" + InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME).getAdvancedCache();
+ pendingPutsCache.addInterceptor(hook, 0);
+
+ Thread deleteThread = new Thread(() -> {
+ try {
+ withTx(tm, () -> {
+ Session s = openSession();
+ log.trace("Session opened");
+ s.getTransaction().begin();
+ log.trace("TX started");
+ Item loadedItem = s.get(Item.class, item.getId());
+ assertNotNull(loadedItem);
+ arriveAndAwait(deletePhaser);
+ arriveAndAwait(deletePhaser);
+ log.trace("Item loaded");
+ s.delete(loadedItem);
+ log.trace("Item deleted");
+ s.getTransaction().commit();
+ log.trace("TX committed");
+ // start get-thread here
+ arriveAndAwait(deletePhaser);
+ arriveAndAwait(deletePhaser);
+ s.close();
+ log.trace("Session closed");
+ return null;
+ });
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, "delete-thread");
+ Thread getThread = new Thread(() -> {
+ try {
+ withTx(tm, () -> {
+ Session s = openSession();
+ log.trace("Session opened");
+ s.getTransaction().begin();
+ log.trace("TX started");
+ // DB load should happen before the record is deleted,
+ // putFromLoad should happen after deleteThread ends
+ Item loadedItem = s.get(Item.class, item.getId());
+ assertNotNull(loadedItem);
+ s.close();
+ log.trace("Session closed");
+ return null;
+ });
+ } catch (PessimisticLockException e) {
+ // If we end up here, database locks guard us against situation tested
+ // in this case and HHH-9868 cannot happen.
+ // (delete-thread has ITEMS table write-locked and we try to acquire read-lock)
+ try {
+ arriveAndAwait(getPhaser);
+ arriveAndAwait(getPhaser);
+ } catch (Exception e1) {
+ throw new RuntimeException(e1);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, "get-thread");
+
+ deleteThread.start();
+ // deleteThread loads the entity
+ arriveAndAwait(deletePhaser);
+ withTx(tm, () -> {
+ sessionFactory().getCache().evictEntity(Item.class, item.getId());
+ assertFalse(sessionFactory().getCache().containsEntity(Item.class, item.getId()));
+ return null;
+ });
+ arriveAndAwait(deletePhaser);
+ // delete thread invalidates PFER
+ arriveAndAwait(deletePhaser);
+ // get thread gets the entity from DB
+ hook.block(getPhaser, getThread);
+ getThread.start();
+ arriveAndAwait(getPhaser);
+ arriveAndAwait(deletePhaser);
+ // delete thread finishes the remove from DB and cache
+ deleteThread.join();
+ hook.unblock();
+ arriveAndAwait(getPhaser);
+ // get thread puts the entry into cache
+ getThread.join();
+
+ withTx(tm, () -> {
+ Session s = openSession();
+ s.getTransaction().begin();
+ Item loadedItem = s.get(Item.class, item.getId());
+ assertNull(loadedItem);
+ s.getTransaction().commit();
+ s.close();
+ return null;
+ });
+ }
+
+ protected static void arriveAndAwait(Phaser phaser) throws TimeoutException, InterruptedException {
+ phaser.awaitAdvanceInterruptibly(phaser.arrive(), 10, TimeUnit.SECONDS);
+ }
+
+ private static class HookInterceptor extends BaseCustomInterceptor {
+ Phaser phaser;
+ Thread thread;
+
+ public synchronized void block(Phaser phaser, Thread thread) {
+ this.phaser = phaser;
+ this.thread = thread;
+ }
+
+ public synchronized void unblock() {
+ phaser = null;
+ thread = null;
+ }
+
+ @Override
+ public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
+ Phaser phaser;
+ Thread thread;
+ synchronized (this) {
+ phaser = this.phaser;
+ thread = this.thread;
+ }
+ if (phaser != null && Thread.currentThread() == thread) {
+ arriveAndAwait(phaser);
+ arriveAndAwait(phaser);
+ }
+ return super.visitGetKeyValueCommand(ctx, command);
+ }
+ }
}
diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/BasicTransactionalTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/BasicTransactionalTestCase.java
index ccdecf4bdd..6fca629277 100644
--- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/BasicTransactionalTestCase.java
+++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/BasicTransactionalTestCase.java
@@ -11,7 +11,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
import org.hibernate.Cache;
import org.hibernate.Criteria;
@@ -19,12 +18,10 @@ import org.hibernate.Hibernate;
import org.hibernate.NaturalIdLoadAccess;
import org.hibernate.Session;
import org.hibernate.Transaction;
-import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
import org.hibernate.cache.spi.entry.CacheEntry;
import org.hibernate.criterion.Restrictions;
import org.hibernate.stat.SecondLevelCacheStatistics;
import org.hibernate.stat.Statistics;
-
import org.hibernate.testing.TestForIssue;
import org.junit.After;
import org.junit.Test;
@@ -954,7 +951,6 @@ public class BasicTransactionalTestCase extends AbstractFunctionalTestCase {
// TODO: Clear caches manually via cache manager (it's faster!!)
this.cleanupCache();
- Thread.sleep(PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD + TimeUnit.SECONDS.toMillis(1));
stats.setStatisticsEnabled( true );
stats.clear();
diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeJtaTransactionImpl.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeJtaTransactionImpl.java
index 73d68de78b..83656d776f 100644
--- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeJtaTransactionImpl.java
+++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeJtaTransactionImpl.java
@@ -6,13 +6,6 @@
*/
package org.hibernate.test.cache.infinispan.functional.cluster;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.RollbackException;
@@ -23,6 +16,13 @@ import javax.transaction.Transaction;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@@ -62,9 +62,11 @@ public class DualNodeJtaTransactionImpl implements Transaction {
} else {
status = Status.STATUS_PREPARING;
- for (int i = 0; i < synchronizations.size(); i++) {
- Synchronization s = (Synchronization) synchronizations.get(i);
- s.beforeCompletion();
+ if (synchronizations != null) {
+ for (int i = 0; i < synchronizations.size(); i++) {
+ Synchronization s = (Synchronization) synchronizations.get(i);
+ s.beforeCompletion();
+ }
}
if (!runXaResourcePrepare()) {
@@ -89,9 +91,11 @@ public class DualNodeJtaTransactionImpl implements Transaction {
status = Status.STATUS_COMMITTED;
- for (int i = 0; i < synchronizations.size(); i++) {
- Synchronization s = (Synchronization) synchronizations.get(i);
- s.afterCompletion(status);
+ if (synchronizations != null) {
+ for (int i = 0; i < synchronizations.size(); i++) {
+ Synchronization s = (Synchronization) synchronizations.get(i);
+ s.afterCompletion(status);
+ }
}
// status = Status.STATUS_NO_TRANSACTION;
diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/tm/XaTransactionManagerImpl.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/tm/XaTransactionManagerImpl.java
index a02943159d..17ae60596c 100644
--- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/tm/XaTransactionManagerImpl.java
+++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/tm/XaTransactionManagerImpl.java
@@ -23,41 +23,44 @@ import javax.transaction.TransactionManager;
*/
public class XaTransactionManagerImpl implements TransactionManager {
private static final XaTransactionManagerImpl INSTANCE = new XaTransactionManagerImpl();
- private XaTransactionImpl currentTransaction;
+ private final ThreadLocal currentTransaction = new ThreadLocal<>();
public static XaTransactionManagerImpl getInstance() {
return INSTANCE;
}
public int getStatus() throws SystemException {
+ XaTransactionImpl currentTransaction = this.currentTransaction.get();
return currentTransaction == null ? Status.STATUS_NO_TRANSACTION : currentTransaction.getStatus();
}
public Transaction getTransaction() throws SystemException {
- return currentTransaction;
+ return currentTransaction.get();
}
public XaTransactionImpl getCurrentTransaction() {
- return currentTransaction;
+ return currentTransaction.get();
}
public void begin() throws NotSupportedException, SystemException {
- currentTransaction = new XaTransactionImpl(this);
+ if (currentTransaction.get() != null) throw new IllegalStateException("Transaction already started.");
+ currentTransaction.set(new XaTransactionImpl(this));
}
public Transaction suspend() throws SystemException {
- Transaction suspended = currentTransaction;
- currentTransaction = null;
+ Transaction suspended = currentTransaction.get();
+ currentTransaction.remove();
return suspended;
}
public void resume(Transaction transaction) throws InvalidTransactionException, IllegalStateException,
SystemException {
- currentTransaction = (XaTransactionImpl) transaction;
+ currentTransaction.set((XaTransactionImpl) transaction);
}
public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException,
SecurityException, IllegalStateException, SystemException {
+ XaTransactionImpl currentTransaction = this.currentTransaction.get();
if (currentTransaction == null) {
throw new IllegalStateException("no current transaction to commit");
}
@@ -65,6 +68,7 @@ public class XaTransactionManagerImpl implements TransactionManager {
}
public void rollback() throws IllegalStateException, SecurityException, SystemException {
+ XaTransactionImpl currentTransaction = this.currentTransaction.get();
if (currentTransaction == null) {
throw new IllegalStateException("no current transaction");
}
@@ -72,6 +76,7 @@ public class XaTransactionManagerImpl implements TransactionManager {
}
public void setRollbackOnly() throws IllegalStateException, SystemException {
+ XaTransactionImpl currentTransaction = this.currentTransaction.get();
if (currentTransaction == null) {
throw new IllegalStateException("no current transaction");
}
@@ -82,8 +87,6 @@ public class XaTransactionManagerImpl implements TransactionManager {
}
void endCurrent(Transaction transaction) {
- if (transaction == currentTransaction) {
- currentTransaction = null;
- }
+ currentTransaction.remove();
}
}