HHH-9868 Infinispan 2LC can store stale data

* invalidation blocks putFromLoads until the transaction with invalidation
  is committed
* rewritten the naked puts support: timestamp is stored in the pendingPutMap
  and removal of the record relies on pending puts' idle expiration or
  piggy-backs on release from putFromLoad
This commit is contained in:
Radim Vansa 2015-08-04 10:56:48 +02:00 committed by Galder Zamarreño
parent fa8e94071f
commit 4b2a78785e
11 changed files with 565 additions and 380 deletions

View File

@ -10,13 +10,13 @@ import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.ReentrantLock;
import org.hibernate.cache.CacheException;
@ -41,9 +41,9 @@ import org.infinispan.manager.EmbeddedCacheManager;
* <li> Call {@link #registerPendingPut(Object)}</li>
* <li> Read the database</li>
* <li> Call {@link #acquirePutFromLoadLock(Object)}
* <li> if above returns <code>false</code>, the thread should not cache the data;
* only if above returns <code>true</code>, put data in the cache and...</li>
* <li> then call {@link #releasePutFromLoadLock(Object)}</li>
* <li> if above returns <code>null</code>, the thread should not cache the data;
* only if above returns instance of <code>AcquiredLock</code>, put data in the cache and...</li>
* <li> then call {@link #releasePutFromLoadLock(Object, Lock)}</li>
* </ol>
* </p>
* <p/>
@ -53,15 +53,19 @@ import org.infinispan.manager.EmbeddedCacheManager;
* call
* <p/>
* <ul>
* <li> {@link #invalidateKey(Object)} (for a single key invalidation)</li>
* <li> {@link #beginInvalidatingKey(Object)} (for a single key invalidation)</li>
* <li>or {@link #invalidateRegion()} (for a general invalidation all pending puts)</li>
* </ul>
* After transaction commit (when the DB is updated) {@link #endInvalidatingKey(Object)} should
* be called in order to allow further attempts to cache entry.
* </p>
* <p/>
* <p>
* This class also supports the concept of "naked puts", which are calls to
* {@link #acquirePutFromLoadLock(Object)} without a preceding {@link #registerPendingPut(Object)}
* call.
* {@link #acquirePutFromLoadLock(Object)} without a preceding {@link #registerPendingPut(Object)}.
* Besides not acquiring lock in {@link #registerPendingPut(Object)} this can happen when collection
* elements are loaded after the collection has not been found in the cache, where the elements
* don't have their own table but can be listed as 'select ... from Element where collection_id = ...'.
* </p>
*
* @author Brian Stansberry
@ -89,26 +93,15 @@ public class PutFromLoadValidator {
*/
private final ConcurrentMap<Object, PendingPutMap> pendingPuts;
private final ConcurrentMap<Object, Long> recentRemovals = new ConcurrentHashMap<Object, Long>();
/**
* List of recent removals. Used to ensure we don't leak memory via the recentRemovals map
*/
private final List<RecentRemoval> removalsQueue = new LinkedList<RecentRemoval>();
/**
* The time when the first element in removalsQueue will expire. No reason to do housekeeping on
* the queue before this time.
*/
private volatile long earliestRemovalTimestamp;
/**
* Lock controlling access to removalsQueue
*/
private final Lock removalsLock = new ReentrantLock();
/**
* The time of the last call to regionRemoved(), plus NAKED_PUT_INVALIDATION_PERIOD. All naked
* The time of the last call to {@link #invalidateRegion()}, plus NAKED_PUT_INVALIDATION_PERIOD. All naked
* puts will be rejected until the current time is greater than this value.
* NOTE: update only through {@link #invalidationUpdater}!
*/
private volatile long invalidationTimestamp;
private volatile long invalidationTimestamp = Long.MIN_VALUE;
private static final AtomicLongFieldUpdater<PutFromLoadValidator> invalidationUpdater
= AtomicLongFieldUpdater.newUpdater(PutFromLoadValidator.class, "invalidationTimestamp");
/**
* Creates a new put from load validator instance.
@ -133,9 +126,7 @@ public class PutFromLoadValidator {
public PutFromLoadValidator(
AdvancedCache cache, TransactionManager transactionManager,
long nakedPutInvalidationPeriod) {
this(cache, cache.getCacheManager(), transactionManager,
nakedPutInvalidationPeriod
);
this(cache, cache.getCacheManager(), transactionManager, nakedPutInvalidationPeriod);
}
/**
@ -170,81 +161,100 @@ public class PutFromLoadValidator {
// ----------------------------------------------------------------- Public
/**
* Marker for lock acquired in {@link #acquirePutFromLoadLock(Object)}
*/
public static class Lock {
protected Lock() {}
}
/**
* Acquire a lock giving the calling thread the right to put data in the
* cache for the given key.
* <p>
* <strong>NOTE:</strong> A call to this method that returns <code>true</code>
* should always be matched with a call to {@link #releasePutFromLoadLock(Object)}.
* should always be matched with a call to {@link #releasePutFromLoadLock(Object, Lock)}.
* </p>
*
* @param key the key
*
* @return <code>true</code> if the lock is acquired and the cache put
* can proceed; <code>false</code> if the data should not be cached
* @return <code>AcquiredLock</code> if the lock is acquired and the cache put
* can proceed; <code>null</code> if the data should not be cached
*/
public boolean acquirePutFromLoadLock(Object key) {
public Lock acquirePutFromLoadLock(Object key) {
boolean valid = false;
boolean locked = false;
final long now = System.currentTimeMillis();
long now = Long.MIN_VALUE;
PendingPutMap pending = pendingPuts.get( key );
for (;;) {
try {
final PendingPutMap pending = pendingPuts.get( key );
if ( pending != null ) {
locked = pending.acquireLock( 100, TimeUnit.MILLISECONDS );
if ( locked ) {
if (pending != null) {
locked = pending.acquireLock(100, TimeUnit.MILLISECONDS);
if (locked) {
try {
final PendingPut toCancel = pending.remove( getOwnerForPut() );
if ( toCancel != null ) {
final PendingPut toCancel = pending.remove(getOwnerForPut());
if (toCancel != null) {
valid = !toCancel.completed;
toCancel.completed = true;
} else {
// this is a naked put
if (pending.hasInvalidator()) {
valid = false;
} else {
if (now == Long.MIN_VALUE) {
now = System.currentTimeMillis();
}
valid = now > pending.nakedPutsDeadline;
}
}
finally {
if ( !valid ) {
return valid ? pending : null;
} finally {
if (!valid) {
pending.releaseLock();
locked = false;
}
}
} else {
// oops, we have leaked record for this owner, but we don't want to wait here
return null;
}
}
else {
} else {
// Key wasn't in pendingPuts, so either this is a "naked put"
// or regionRemoved has been called. Check if we can proceed
if ( now > invalidationTimestamp ) {
final Long removedTime = recentRemovals.get( key );
if ( removedTime == null || now > removedTime ) {
// It's legal to proceed. But we have to record this key
// in pendingPuts so releasePutFromLoadLock can find it.
// To do this we basically simulate a normal "register
// then acquire lock" pattern
registerPendingPut( key );
locked = acquirePutFromLoadLock( key );
valid = locked;
}
}
}
}
catch (Throwable t) {
if ( locked ) {
final PendingPutMap toRelease = pendingPuts.get( key );
if ( toRelease != null ) {
toRelease.releaseLock();
long invalidationTimestamp = this.invalidationTimestamp;
if (invalidationTimestamp != Long.MIN_VALUE) {
now = System.currentTimeMillis();
if (now > invalidationTimestamp) {
// time is +- monotonic se don't let other threads do the expensive currentTimeMillis()
invalidationUpdater.compareAndSet(this, invalidationTimestamp, Long.MIN_VALUE);
} else {
return null;
}
}
if ( t instanceof RuntimeException ) {
PendingPut pendingPut = new PendingPut(getOwnerForPut());
pending = new PendingPutMap(pendingPut);
PendingPutMap existing = pendingPuts.putIfAbsent(key, pending);
if (existing != null) {
pending = existing;
}
// continue in next loop with lock acquisition
}
} catch (Throwable t) {
if (locked) {
pending.releaseLock();
}
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
else if ( t instanceof Error ) {
} else if (t instanceof Error) {
throw (Error) t;
}
else {
throw new RuntimeException( t );
} else {
throw new RuntimeException(t);
}
}
}
return valid;
}
/**
@ -253,87 +263,16 @@ public class PutFromLoadValidator {
*
* @param key the key
*/
public void releasePutFromLoadLock(Object key) {
final PendingPutMap pending = pendingPuts.get( key );
public void releasePutFromLoadLock(Object key, Lock lock) {
final PendingPutMap pending = (PendingPutMap) lock;
if ( pending != null ) {
if ( pending.size() == 0 ) {
if ( pending.canRemove() ) {
pendingPuts.remove( key, pending );
}
pending.releaseLock();
}
}
/**
* Invalidates any {@link #registerPendingPut(Object) previously registered pending puts} ensuring a subsequent call to
* {@link #acquirePutFromLoadLock(Object)} will return <code>false</code>. <p> This method will block until any
* concurrent thread that has {@link #acquirePutFromLoadLock(Object) acquired the putFromLoad lock} for the given key
* has released the lock. This allows the caller to be certain the putFromLoad will not execute after this method
* returns, possibly caching stale data. </p>
*
* @param key key identifying data whose pending puts should be invalidated
*
* @return <code>true</code> if the invalidation was successful; <code>false</code> if a problem occured (which the
* caller should treat as an exception condition)
*/
public boolean invalidateKey(Object key) {
boolean success = true;
// Invalidate any pending puts
final PendingPutMap pending = pendingPuts.get( key );
if ( pending != null ) {
// This lock should be available very quickly, but we'll be
// very patient waiting for it as callers should treat not
// acquiring it as an exception condition
if ( pending.acquireLock( 60, TimeUnit.SECONDS ) ) {
try {
pending.invalidate();
}
finally {
pending.releaseLock();
}
}
else {
success = false;
}
}
// Record when this occurred to invalidate later naked puts
final RecentRemoval removal = new RecentRemoval( key, this.nakedPutInvalidationPeriod );
recentRemovals.put( key, removal.timestamp );
// Don't let recentRemovals map become a memory leak
RecentRemoval toClean = null;
final boolean attemptClean = removal.timestamp > earliestRemovalTimestamp;
removalsLock.lock();
try {
removalsQueue.add( removal );
if ( attemptClean ) {
if ( removalsQueue.size() > 1 ) {
// we have at least one as we just added it
toClean = removalsQueue.remove( 0 );
}
earliestRemovalTimestamp = removalsQueue.get( 0 ).timestamp;
}
}
finally {
removalsLock.unlock();
}
if ( toClean != null ) {
Long cleaned = recentRemovals.get( toClean.key );
if ( cleaned != null && cleaned.equals( toClean.timestamp ) ) {
cleaned = recentRemovals.remove( toClean.key );
if ( cleaned != null && !cleaned.equals( toClean.timestamp ) ) {
// Oops; removed the wrong timestamp; restore it
recentRemovals.putIfAbsent( toClean.key, cleaned );
}
}
}
return success;
}
/**
* Invalidates all {@link #registerPendingPut(Object) previously registered pending puts} ensuring a subsequent call to
* {@link #acquirePutFromLoadLock(Object)} will return <code>false</code>. <p> This method will block until any
@ -345,15 +284,16 @@ public class PutFromLoadValidator {
* caller should treat as an exception condition)
*/
public boolean invalidateRegion() {
boolean ok = false;
invalidationTimestamp = System.currentTimeMillis() + this.nakedPutInvalidationPeriod;
// TODO: not sure what happens with locks acquired *after* calling this method but before
// the actual invalidation
boolean ok = true;
invalidationUpdater.set(this, System.currentTimeMillis() + nakedPutInvalidationPeriod);
try {
// Acquire the lock for each entry to ensure any ongoing
// work associated with it is completed before we return
for ( PendingPutMap entry : pendingPuts.values() ) {
for ( Iterator<PendingPutMap> it = pendingPuts.values().iterator(); it.hasNext(); ) {
PendingPutMap entry = it.next();
if ( entry.acquireLock( 60, TimeUnit.SECONDS ) ) {
try {
entry.invalidate();
@ -361,30 +301,15 @@ public class PutFromLoadValidator {
finally {
entry.releaseLock();
}
it.remove();
}
else {
ok = false;
}
}
removalsLock.lock();
try {
recentRemovals.clear();
removalsQueue.clear();
ok = true;
}
finally {
removalsLock.unlock();
}
}
catch (Exception e) {
} catch (Exception e) {
ok = false;
}
finally {
earliestRemovalTimestamp = invalidationTimestamp;
}
return ok;
}
@ -395,8 +320,7 @@ public class PutFromLoadValidator {
* wherein it is expected that a database read plus cache put will occur. Calling this method allows the validator to
* treat the subsequent <code>acquirePutFromLoadLock</code> as if the database read occurred when this method was
* invoked. This allows the validator to compare the timestamp of this call against the timestamp of subsequent removal
* notifications. A put that occurs without this call preceding it is "naked"; i.e the validator must assume the put is
* not valid if any relevant removal has occurred within {@link #NAKED_PUT_INVALIDATION_PERIOD} milliseconds.
* notifications.
*
* @param key key that will be used for subsequent cache put
*/
@ -404,50 +328,83 @@ public class PutFromLoadValidator {
final PendingPut pendingPut = new PendingPut( getOwnerForPut() );
final PendingPutMap pendingForKey = new PendingPutMap( pendingPut );
for (; ; ) {
final PendingPutMap existing = pendingPuts.putIfAbsent( key, pendingForKey );
if ( existing != null ) {
if ( existing.acquireLock( 10, TimeUnit.SECONDS ) ) {
try {
existing.put( pendingPut );
final PendingPutMap doublecheck = pendingPuts.putIfAbsent( key, existing );
if ( doublecheck == null || doublecheck == existing ) {
break;
if ( !existing.hasInvalidator() ) {
existing.put(pendingPut);
}
// else we hit a race and need to loop to try again
}
finally {
} finally {
existing.releaseLock();
}
}
else {
// Can't get the lock; when we come back we'll be a "naked put"
break;
}
}
else {
// normal case
break;
}
}
}
// -------------------------------------------------------------- Protected
/**
* Only for use by unit tests; may be removed at any time
* Invalidates any {@link #registerPendingPut(Object) previously registered pending puts}
* and disables further registrations ensuring a subsequent call to {@link #acquirePutFromLoadLock(Object)}
* will return <code>false</code>. <p> This method will block until any concurrent thread that has
* {@link #acquirePutFromLoadLock(Object) acquired the putFromLoad lock} for the given key
* has released the lock. This allows the caller to be certain the putFromLoad will not execute after this method
* returns, possibly caching stale data. </p>
* After this transaction completes, {@link #endInvalidatingKey(Object)} needs to be called }
*
* @param key key identifying data whose pending puts should be invalidated
*
* @return <code>true</code> if the invalidation was successful; <code>false</code> if a problem occured (which the
* caller should treat as an exception condition)
*/
protected int getRemovalQueueLength() {
removalsLock.lock();
public boolean beginInvalidatingKey(Object key) {
PendingPutMap pending = new PendingPutMap(null);
PendingPutMap prev = pendingPuts.putIfAbsent(key, pending);
if (prev != null) {
pending = prev;
}
if (pending.acquireLock(60, TimeUnit.SECONDS)) {
try {
return removalsQueue.size();
pending.invalidate();
pending.addInvalidator(getOwnerForPut(), System.currentTimeMillis() + nakedPutInvalidationPeriod);
} finally {
pending.releaseLock();
}
finally {
removalsLock.unlock();
return true;
} else {
return false;
}
}
/**
* Called after the transaction completes, allowing caching of entries. It is possible that this method
* is called without previous invocation of {@link #beginInvalidatingKey(Object)}, then it should be noop.
*
* @param key
* @return
*/
public boolean endInvalidatingKey(Object key) {
PendingPutMap pending = pendingPuts.get(key);
if (pending == null) {
return true;
}
if (pending.acquireLock(60, TimeUnit.SECONDS)) {
try {
pending.removeInvalidator(getOwnerForPut());
// we can't remove the pending put yet because we wait for naked puts
// pendingPuts should be configured with maxIdle time so won't have memory leak
return true;
} finally {
pending.releaseLock();
}
} else {
return false;
}
}
// ---------------------------------------------------------------- Private
private Object getOwnerForPut() {
@ -470,10 +427,13 @@ public class PutFromLoadValidator {
* <p/>
* This class is NOT THREAD SAFE. All operations on it must be performed with the lock held.
*/
private static class PendingPutMap {
private static class PendingPutMap extends Lock {
private PendingPut singlePendingPut;
private Map<Object, PendingPut> fullMap;
private final Lock lock = new ReentrantLock();
private final java.util.concurrent.locks.Lock lock = new ReentrantLock();
private Object singleInvalidator;
private Set<Object> invalidators;
private long nakedPutsDeadline = Long.MIN_VALUE;
PendingPutMap(PendingPut singleItem) {
this.singlePendingPut = singleItem;
@ -546,6 +506,41 @@ public class PutFromLoadValidator {
fullMap = null;
}
}
public void addInvalidator(Object invalidator, long deadline) {
if (invalidators == null) {
if (singleInvalidator == null) {
singleInvalidator = invalidator;
} else {
invalidators = new HashSet<Object>();
invalidators.add(singleInvalidator);
invalidators.add(invalidator);
singleInvalidator = null;
}
} else {
invalidators.add(invalidator);
}
nakedPutsDeadline = Math.max(nakedPutsDeadline, deadline);
}
public boolean hasInvalidator() {
return singleInvalidator != null || (invalidators != null && !invalidators.isEmpty());
}
public void removeInvalidator(Object invalidator) {
if (invalidators == null) {
if (singleInvalidator != null && singleInvalidator.equals(invalidator)) {
singleInvalidator = null;
}
} else {
invalidators.remove(invalidator);
}
}
public boolean canRemove() {
return size() == 0 && !hasInvalidator() &&
(nakedPutsDeadline == Long.MIN_VALUE || nakedPutsDeadline < System.currentTimeMillis());
}
}
private static class PendingPut {
@ -556,15 +551,4 @@ public class PutFromLoadValidator {
this.owner = owner;
}
}
private static class RecentRemoval {
private final Object key;
private final Long timestamp;
private RecentRemoval(Object key, long nakedPutInvalidationPeriod) {
this.key = key;
timestamp = System.currentTimeMillis() + nakedPutInvalidationPeriod;
}
}
}

View File

@ -9,7 +9,6 @@ package org.hibernate.cache.infinispan.access;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.impl.BaseRegion;
import org.hibernate.cache.infinispan.util.Caches;
import org.infinispan.AdvancedCache;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@ -111,7 +110,8 @@ public class TransactionalAccessDelegate {
return false;
}
if ( !putValidator.acquirePutFromLoadLock( key ) ) {
PutFromLoadValidator.Lock lock = putValidator.acquirePutFromLoadLock(key);
if ( lock == null) {
if ( TRACE_ENABLED ) {
log.tracef( "Put from load lock not acquired for key %s", key );
}
@ -131,7 +131,7 @@ public class TransactionalAccessDelegate {
}
}
finally {
putValidator.releasePutFromLoadLock( key );
putValidator.releasePutFromLoadLock( key, lock);
}
return true;
@ -185,7 +185,7 @@ public class TransactionalAccessDelegate {
* @throws CacheException if removing the cached item fails
*/
public void remove(Object key) throws CacheException {
if ( !putValidator.invalidateKey( key ) ) {
if ( !putValidator.beginInvalidatingKey(key)) {
throw new CacheException(
"Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName()
);
@ -216,7 +216,7 @@ public class TransactionalAccessDelegate {
* @throws CacheException if evicting the item fails
*/
public void evict(Object key) throws CacheException {
if ( !putValidator.invalidateKey( key ) ) {
if ( !putValidator.beginInvalidatingKey(key)) {
throw new CacheException(
"Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName()
);
@ -240,4 +240,19 @@ public class TransactionalAccessDelegate {
Caches.broadcastEvictAll( cache );
}
/**
* Called when we have finished the attempted update/delete (which may or
* may not have been successful), after transaction completion. This method
* is used by "asynchronous" concurrency strategies.
*
* @param key The item key
* @throws org.hibernate.cache.CacheException Propogated from underlying {@link org.hibernate.cache.spi.Region}
*/
public void unlockItem(Object key) throws CacheException {
if ( !putValidator.endInvalidatingKey(key) ) {
// TODO: localization
log.warn("Failed to end invalidating pending putFromLoad calls for key " + key + " from region "
+ region.getName() + "; the key won't be cached in the future.");
}
}
}

View File

@ -75,6 +75,7 @@ class TransactionalAccess implements CollectionRegionAccessStrategy {
}
public void unlockItem(SessionImplementor session, Object key, SoftLock lock) throws CacheException {
delegate.unlockItem(key);
}
public void unlockRegion(SoftLock lock) throws CacheException {

View File

@ -84,6 +84,7 @@ class TransactionalAccess implements EntityRegionAccessStrategy {
}
public void unlockItem(SessionImplementor session, Object key, SoftLock lock) throws CacheException {
delegate.unlockItem(key);
}
public void unlockRegion(SoftLock lock) throws CacheException {

View File

@ -89,6 +89,7 @@ class TransactionalAccess implements NaturalIdRegionAccessStrategy {
@Override
public void unlockItem(SessionImplementor session, Object key, SoftLock lock) throws CacheException {
delegate.unlockItem(key);
}
@Override

View File

@ -20,8 +20,6 @@ import java.util.concurrent.atomic.AtomicReference;
import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
import org.hibernate.test.cache.infinispan.functional.cluster.DualNodeJtaTransactionManagerImpl;
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
import org.infinispan.AdvancedCache;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.CacheManagerCallable;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.util.logging.Log;
@ -32,8 +30,9 @@ import org.junit.Rule;
import org.junit.Test;
import static org.infinispan.test.TestingUtil.withCacheManager;
import static org.infinispan.test.TestingUtil.withTx;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -87,28 +86,13 @@ public class PutFromLoadValidatorUnitTestCase {
TestCacheManagerFactory.createCacheManager(false)) {
@Override
public void call() {
try {
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
transactional ? tm : null,
PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
if (transactional) {
tm.begin();
}
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertTrue(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
exec(transactional, new NakedPut(testee, true));
}
});
}
@Test
public void testRegisteredPut() throws Exception {
registeredPutTest(false);
@ -124,29 +108,12 @@ public class PutFromLoadValidatorUnitTestCase {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
transactional ? tm : null,
PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
try {
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertTrue(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
exec(transactional, new RegularPut(testee));
}
});
}
@Test
public void testNakedPutAfterKeyRemoval() throws Exception {
nakedPutAfterRemovalTest(false, false);
@ -171,34 +138,15 @@ public class PutFromLoadValidatorUnitTestCase {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
transactional ? tm : null,
PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
if (removeRegion) {
testee.invalidateRegion();
} else {
testee.invalidateKey(KEY1);
}
try {
if (transactional) {
tm.begin();
}
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertFalse(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
Invalidation invalidation = new Invalidation(testee, removeRegion);
NakedPut nakedPut = new NakedPut(testee, false);
exec(transactional, invalidation, nakedPut);
}
});
}
@Test
public void testRegisteredPutAfterKeyRemoval() throws Exception {
registeredPutAfterRemovalTest(false, false);
@ -223,31 +171,10 @@ public class PutFromLoadValidatorUnitTestCase {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
transactional ? tm : null,
PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
if (removeRegion) {
testee.invalidateRegion();
} else {
testee.invalidateKey(KEY1);
}
try {
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertTrue(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
Invalidation invalidation = new Invalidation(testee, removeRegion);
RegularPut regularPut = new RegularPut(testee);
exec(transactional, invalidation, regularPut);
}
});
@ -277,8 +204,7 @@ public class PutFromLoadValidatorUnitTestCase {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
transactional ? tm : null,
PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
try {
if (transactional) {
tm.begin();
@ -287,17 +213,18 @@ public class PutFromLoadValidatorUnitTestCase {
if (removeRegion) {
testee.invalidateRegion();
} else {
testee.invalidateKey(KEY1);
testee.beginInvalidatingKey(KEY1);
}
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1);
try {
assertFalse(lockable);
assertNull(lock);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
if (lock != null) {
testee.releasePutFromLoadLock(KEY1, lock);
}
testee.endInvalidatingKey(KEY1);
}
} catch (Exception e) {
throw new RuntimeException(e);
@ -305,6 +232,7 @@ public class PutFromLoadValidatorUnitTestCase {
}
});
}
@Test
public void testDelayedNakedPutAfterKeyRemoval() throws Exception {
delayedNakedPutAfterRemovalTest(false, false);
@ -329,12 +257,13 @@ public class PutFromLoadValidatorUnitTestCase {
TestCacheManagerFactory.createCacheManager(false)) {
@Override
public void call() {
PutFromLoadValidator testee = new TestValidator(cm.getCache().getAdvancedCache(), cm,
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
transactional ? tm : null, 100);
if (removeRegion) {
testee.invalidateRegion();
} else {
testee.invalidateKey(KEY1);
testee.beginInvalidatingKey(KEY1);
testee.endInvalidatingKey(KEY1);
}
try {
if (transactional) {
@ -342,12 +271,12 @@ public class PutFromLoadValidatorUnitTestCase {
}
Thread.sleep(110);
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1);
try {
assertTrue(lockable);
assertNotNull(lock);
} finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
if (lock != null) {
testee.releasePutFromLoadLock(KEY1, null);
}
}
} catch (Exception e) {
@ -389,12 +318,13 @@ public class PutFromLoadValidatorUnitTestCase {
testee.registerPendingPut(KEY1);
registeredLatch.countDown();
registeredLatch.await(5, TimeUnit.SECONDS);
if (testee.acquirePutFromLoadLock(KEY1)) {
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1);
if (lock != null) {
try {
log.trace("Put from load lock acquired for key = " + KEY1);
success.incrementAndGet();
} finally {
testee.releasePutFromLoadLock(KEY1);
testee.releasePutFromLoadLock(KEY1, lock);
}
} else {
log.trace("Unable to acquired putFromLoad lock for key = " + KEY1);
@ -453,7 +383,8 @@ public class PutFromLoadValidatorUnitTestCase {
Callable<Boolean> pferCallable = new Callable<Boolean>() {
public Boolean call() throws Exception {
testee.registerPendingPut(KEY1);
if (testee.acquirePutFromLoadLock(KEY1)) {
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1);
if (lock != null) {
try {
removeLatch.countDown();
pferLatch.await();
@ -461,7 +392,7 @@ public class PutFromLoadValidatorUnitTestCase {
return Boolean.TRUE;
}
finally {
testee.releasePutFromLoadLock(KEY1);
testee.releasePutFromLoadLock(KEY1, lock);
}
}
return Boolean.FALSE;
@ -472,7 +403,7 @@ public class PutFromLoadValidatorUnitTestCase {
public Void call() throws Exception {
removeLatch.await();
if (keyOnly) {
testee.invalidateKey(KEY1);
testee.beginInvalidatingKey(KEY1);
} else {
testee.invalidateRegion();
}
@ -505,18 +436,104 @@ public class PutFromLoadValidatorUnitTestCase {
});
}
private static class TestValidator extends PutFromLoadValidator {
protected void exec(boolean transactional, Callable<?>... callables) {
try {
if (transactional) {
for (Callable<?> c : callables) {
withTx(tm, c);
}
} else {
for (Callable<?> c : callables) {
c.call();
}
}
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
protected TestValidator(AdvancedCache cache, EmbeddedCacheManager cm,
TransactionManager transactionManager,
long nakedPutInvalidationPeriod) {
super(cache, cm, transactionManager, nakedPutInvalidationPeriod);
private class Invalidation implements Callable<Void> {
private PutFromLoadValidator putFromLoadValidator;
private boolean removeRegion;
public Invalidation(PutFromLoadValidator putFromLoadValidator, boolean removeRegion) {
this.putFromLoadValidator = putFromLoadValidator;
this.removeRegion = removeRegion;
}
@Override
public int getRemovalQueueLength() {
return super.getRemovalQueueLength();
public Void call() throws Exception {
if (removeRegion) {
boolean success = putFromLoadValidator.invalidateRegion();
assertTrue(success);
} else {
boolean success = putFromLoadValidator.beginInvalidatingKey(KEY1);
assertTrue(success);
success = putFromLoadValidator.endInvalidatingKey(KEY1);
assertTrue(success);
}
return null;
}
}
private class RegularPut implements Callable<Void> {
private PutFromLoadValidator putFromLoadValidator;
public RegularPut(PutFromLoadValidator putFromLoadValidator) {
this.putFromLoadValidator = putFromLoadValidator;
}
@Override
public Void call() throws Exception {
try {
putFromLoadValidator.registerPendingPut(KEY1);
PutFromLoadValidator.Lock lock = putFromLoadValidator.acquirePutFromLoadLock(KEY1);
try {
assertNotNull(lock);
} finally {
if (lock != null) {
putFromLoadValidator.releasePutFromLoadLock(KEY1, lock);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return null;
}
}
private class NakedPut implements Callable<Void> {
private final PutFromLoadValidator testee;
private final boolean expectSuccess;
public NakedPut(PutFromLoadValidator testee, boolean expectSuccess) {
this.testee = testee;
this.expectSuccess = expectSuccess;
}
@Override
public Void call() throws Exception {
try {
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1);
try {
if (expectSuccess) {
assertNotNull(lock);
} else {
assertNull(lock);
}
}
finally {
if (lock != null) {
testee.releasePutFromLoadLock(KEY1, lock);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return null;
}
}
}

View File

@ -161,8 +161,8 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
PutFromLoadValidator validator = new PutFromLoadValidator(remoteCollectionRegion.getCache(), cm,
remoteTm, 20000) {
@Override
public boolean acquirePutFromLoadLock(Object key) {
boolean acquired = super.acquirePutFromLoadLock( key );
public Lock acquirePutFromLoadLock(Object key) {
Lock lock = super.acquirePutFromLoadLock( key );
try {
removeLatch.countDown();
pferLatch.await( 2, TimeUnit.SECONDS );
@ -175,7 +175,7 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
log.error( "Error", e );
throw new RuntimeException( "Error", e );
}
return acquired;
return lock;
}
};

View File

@ -6,18 +6,33 @@
*/
package org.hibernate.test.cache.infinispan.functional;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.hibernate.PessimisticLockException;
import org.hibernate.Session;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.entity.EntityRegionImpl;
import org.hibernate.cache.spi.Region;
import org.hibernate.stat.SecondLevelCacheStatistics;
import org.hibernate.stat.Statistics;
import org.hibernate.testing.TestForIssue;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.context.InvocationContext;
import org.infinispan.interceptors.base.BaseCustomInterceptor;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.junit.Test;
import java.util.Map;
import java.util.concurrent.Callable;
import static org.infinispan.test.TestingUtil.withTx;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
/**
* Parent tests for both transactional and
@ -122,4 +137,152 @@ public abstract class AbstractFunctionalTestCase extends SingleNodeTestCase {
});
}
@Test
@TestForIssue(jiraKey = "HHH-9868")
public void testConcurrentRemoveAndPutFromLoad() throws Exception {
final Item item = new Item( "chris", "Chris's Item" );
withTx(tm, () -> {
Session s = openSession();
s.getTransaction().begin();
s.persist(item);
s.getTransaction().commit();
s.close();
return null;
});
Region region = sessionFactory().getSecondLevelCacheRegion(Item.class.getName());
Phaser deletePhaser = new Phaser(2);
Phaser getPhaser = new Phaser(2);
HookInterceptor hook = new HookInterceptor();
AdvancedCache entityCache = ((EntityRegionImpl) region).getCache();
AdvancedCache pendingPutsCache = entityCache.getCacheManager().getCache(
entityCache.getName() + "-" + InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME).getAdvancedCache();
pendingPutsCache.addInterceptor(hook, 0);
Thread deleteThread = new Thread(() -> {
try {
withTx(tm, () -> {
Session s = openSession();
log.trace("Session opened");
s.getTransaction().begin();
log.trace("TX started");
Item loadedItem = s.get(Item.class, item.getId());
assertNotNull(loadedItem);
arriveAndAwait(deletePhaser);
arriveAndAwait(deletePhaser);
log.trace("Item loaded");
s.delete(loadedItem);
log.trace("Item deleted");
s.getTransaction().commit();
log.trace("TX committed");
// start get-thread here
arriveAndAwait(deletePhaser);
arriveAndAwait(deletePhaser);
s.close();
log.trace("Session closed");
return null;
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}, "delete-thread");
Thread getThread = new Thread(() -> {
try {
withTx(tm, () -> {
Session s = openSession();
log.trace("Session opened");
s.getTransaction().begin();
log.trace("TX started");
// DB load should happen before the record is deleted,
// putFromLoad should happen after deleteThread ends
Item loadedItem = s.get(Item.class, item.getId());
assertNotNull(loadedItem);
s.close();
log.trace("Session closed");
return null;
});
} catch (PessimisticLockException e) {
// If we end up here, database locks guard us against situation tested
// in this case and HHH-9868 cannot happen.
// (delete-thread has ITEMS table write-locked and we try to acquire read-lock)
try {
arriveAndAwait(getPhaser);
arriveAndAwait(getPhaser);
} catch (Exception e1) {
throw new RuntimeException(e1);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}, "get-thread");
deleteThread.start();
// deleteThread loads the entity
arriveAndAwait(deletePhaser);
withTx(tm, () -> {
sessionFactory().getCache().evictEntity(Item.class, item.getId());
assertFalse(sessionFactory().getCache().containsEntity(Item.class, item.getId()));
return null;
});
arriveAndAwait(deletePhaser);
// delete thread invalidates PFER
arriveAndAwait(deletePhaser);
// get thread gets the entity from DB
hook.block(getPhaser, getThread);
getThread.start();
arriveAndAwait(getPhaser);
arriveAndAwait(deletePhaser);
// delete thread finishes the remove from DB and cache
deleteThread.join();
hook.unblock();
arriveAndAwait(getPhaser);
// get thread puts the entry into cache
getThread.join();
withTx(tm, () -> {
Session s = openSession();
s.getTransaction().begin();
Item loadedItem = s.get(Item.class, item.getId());
assertNull(loadedItem);
s.getTransaction().commit();
s.close();
return null;
});
}
protected static void arriveAndAwait(Phaser phaser) throws TimeoutException, InterruptedException {
phaser.awaitAdvanceInterruptibly(phaser.arrive(), 10, TimeUnit.SECONDS);
}
private static class HookInterceptor extends BaseCustomInterceptor {
Phaser phaser;
Thread thread;
public synchronized void block(Phaser phaser, Thread thread) {
this.phaser = phaser;
this.thread = thread;
}
public synchronized void unblock() {
phaser = null;
thread = null;
}
@Override
public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
Phaser phaser;
Thread thread;
synchronized (this) {
phaser = this.phaser;
thread = this.thread;
}
if (phaser != null && Thread.currentThread() == thread) {
arriveAndAwait(phaser);
arriveAndAwait(phaser);
}
return super.visitGetKeyValueCommand(ctx, command);
}
}
}

View File

@ -11,7 +11,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.hibernate.Cache;
import org.hibernate.Criteria;
@ -19,12 +18,10 @@ import org.hibernate.Hibernate;
import org.hibernate.NaturalIdLoadAccess;
import org.hibernate.Session;
import org.hibernate.Transaction;
import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
import org.hibernate.cache.spi.entry.CacheEntry;
import org.hibernate.criterion.Restrictions;
import org.hibernate.stat.SecondLevelCacheStatistics;
import org.hibernate.stat.Statistics;
import org.hibernate.testing.TestForIssue;
import org.junit.After;
import org.junit.Test;
@ -954,7 +951,6 @@ public class BasicTransactionalTestCase extends AbstractFunctionalTestCase {
// TODO: Clear caches manually via cache manager (it's faster!!)
this.cleanupCache();
Thread.sleep(PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD + TimeUnit.SECONDS.toMillis(1));
stats.setStatisticsEnabled( true );
stats.clear();

View File

@ -6,13 +6,6 @@
*/
package org.hibernate.test.cache.infinispan.functional.cluster;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
import javax.transaction.RollbackException;
@ -23,6 +16,13 @@ import javax.transaction.Transaction;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
@ -62,10 +62,12 @@ public class DualNodeJtaTransactionImpl implements Transaction {
} else {
status = Status.STATUS_PREPARING;
if (synchronizations != null) {
for (int i = 0; i < synchronizations.size(); i++) {
Synchronization s = (Synchronization) synchronizations.get(i);
s.beforeCompletion();
}
}
if (!runXaResourcePrepare()) {
status = Status.STATUS_ROLLING_BACK;
@ -89,10 +91,12 @@ public class DualNodeJtaTransactionImpl implements Transaction {
status = Status.STATUS_COMMITTED;
if (synchronizations != null) {
for (int i = 0; i < synchronizations.size(); i++) {
Synchronization s = (Synchronization) synchronizations.get(i);
s.afterCompletion(status);
}
}
// status = Status.STATUS_NO_TRANSACTION;
jtaTransactionManager.endCurrent(this);

View File

@ -23,41 +23,44 @@ import javax.transaction.TransactionManager;
*/
public class XaTransactionManagerImpl implements TransactionManager {
private static final XaTransactionManagerImpl INSTANCE = new XaTransactionManagerImpl();
private XaTransactionImpl currentTransaction;
private final ThreadLocal<XaTransactionImpl> currentTransaction = new ThreadLocal<>();
public static XaTransactionManagerImpl getInstance() {
return INSTANCE;
}
public int getStatus() throws SystemException {
XaTransactionImpl currentTransaction = this.currentTransaction.get();
return currentTransaction == null ? Status.STATUS_NO_TRANSACTION : currentTransaction.getStatus();
}
public Transaction getTransaction() throws SystemException {
return currentTransaction;
return currentTransaction.get();
}
public XaTransactionImpl getCurrentTransaction() {
return currentTransaction;
return currentTransaction.get();
}
public void begin() throws NotSupportedException, SystemException {
currentTransaction = new XaTransactionImpl(this);
if (currentTransaction.get() != null) throw new IllegalStateException("Transaction already started.");
currentTransaction.set(new XaTransactionImpl(this));
}
public Transaction suspend() throws SystemException {
Transaction suspended = currentTransaction;
currentTransaction = null;
Transaction suspended = currentTransaction.get();
currentTransaction.remove();
return suspended;
}
public void resume(Transaction transaction) throws InvalidTransactionException, IllegalStateException,
SystemException {
currentTransaction = (XaTransactionImpl) transaction;
currentTransaction.set((XaTransactionImpl) transaction);
}
public void commit() throws RollbackException, HeuristicMixedException, HeuristicRollbackException,
SecurityException, IllegalStateException, SystemException {
XaTransactionImpl currentTransaction = this.currentTransaction.get();
if (currentTransaction == null) {
throw new IllegalStateException("no current transaction to commit");
}
@ -65,6 +68,7 @@ public class XaTransactionManagerImpl implements TransactionManager {
}
public void rollback() throws IllegalStateException, SecurityException, SystemException {
XaTransactionImpl currentTransaction = this.currentTransaction.get();
if (currentTransaction == null) {
throw new IllegalStateException("no current transaction");
}
@ -72,6 +76,7 @@ public class XaTransactionManagerImpl implements TransactionManager {
}
public void setRollbackOnly() throws IllegalStateException, SystemException {
XaTransactionImpl currentTransaction = this.currentTransaction.get();
if (currentTransaction == null) {
throw new IllegalStateException("no current transaction");
}
@ -82,8 +87,6 @@ public class XaTransactionManagerImpl implements TransactionManager {
}
void endCurrent(Transaction transaction) {
if (transaction == currentTransaction) {
currentTransaction = null;
}
currentTransaction.remove();
}
}