[HHH-3817] Better handle race between putFromLoad and removal

git-svn-id: https://svn.jboss.org/repos/hibernate/core/trunk@18855 1b8cb986-b30d-0410-93ca-fae66ebed9b2
This commit is contained in:
Brian Stansberry 2010-02-23 02:11:31 +00:00
parent f84f262013
commit 2b72b05522
4 changed files with 563 additions and 133 deletions

View File

@ -63,7 +63,9 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
*/
@Override
public void evict(Object key) throws CacheException {
putValidator.keyRemoved(key);
if (!putValidator.invalidateKey(key)) {
throw new CacheException("Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName());
}
region.ensureRegionRootExists();
Option opt = NonLockingDataVersion.getInvocationOption();
@ -75,7 +77,9 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
@Override
public void evictAll() throws CacheException
{
putValidator.regionRemoved();
if (!putValidator.invalidateRegion()) {
throw new CacheException("Failed to invalidate pending putFromLoad calls for region " + region.getName());
}
Transaction tx = region.suspend();
try {
@ -116,16 +120,21 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
if (!region.checkValid())
return false;
if (!putValidator.isPutValid(key))
if (!putValidator.acquirePutFromLoadLock(key))
return false;
region.ensureRegionRootExists();
// We ignore minimalPutOverride. JBossCache putForExternalRead is
// already about as minimal as we can get; it will promptly return
// if it discovers that the node we want to write to already exists
Option opt = getDataVersionOption(version, version);
return CacheHelper.putForExternalRead(cache, regionFqn, key, value, opt);
try {
region.ensureRegionRootExists();
// We ignore minimalPutOverride. JBossCache putForExternalRead is
// already about as minimal as we can get; it will promptly return
// if it discovers that the node we want to write to already exists
Option opt = getDataVersionOption(version, version);
return CacheHelper.putForExternalRead(cache, regionFqn, key, value, opt);
}
finally {
putValidator.releasePutFromLoadLock(key);
}
}
@Override
@ -134,19 +143,26 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
if (!region.checkValid())
return false;
if (!putValidator.isPutValid(key))
if (!putValidator.acquirePutFromLoadLock(key))
return false;
region.ensureRegionRootExists();
Option opt = getDataVersionOption(version, version);
return CacheHelper.putForExternalRead(cache, regionFqn, key, value, opt);
try {
region.ensureRegionRootExists();
Option opt = getDataVersionOption(version, version);
return CacheHelper.putForExternalRead(cache, regionFqn, key, value, opt);
}
finally {
putValidator.releasePutFromLoadLock(key);
}
}
@Override
public void remove(Object key) throws CacheException {
putValidator.keyRemoved(key);
if (!putValidator.invalidateKey(key)) {
throw new CacheException("Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName());
}
// We remove whether or not the region is valid. Other nodes
// may have already restored the region so they need to
@ -160,7 +176,9 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
@Override
public void removeAll() throws CacheException {
putValidator.regionRemoved();
if (!putValidator.invalidateRegion()) {
throw new CacheException("Failed to invalidate pending putFromLoad calls for region " + region.getName());
}
Option opt = NonLockingDataVersion.getInvocationOption();
CacheHelper.removeAll(cache, regionFqn, opt);
}

View File

@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -46,6 +47,36 @@ import org.hibernate.cache.CacheException;
* the potential to store stale data, since the data may have been removed from the
* database and the cache between the time when the data was read from the database
* and the actual call to <code>putFromLoad</code>.
* <p>
* The expected usage of this class by a thread that read the cache and did
* not find data is:
*
* <ol>
* <li> Call {@link #registerPendingPut(Object)}</li>
* <li> Read the database</li>
* <li> Call {@link #acquirePutFromLoadLock(Object)}
* <li> if above returns <code>false</code>, the thread should not cache the data;
* only if above returns <code>true</code>, put data in the cache and...</li>
* <li> then call {@link #releasePutFromLoadLock(Object)}</li>
* </ol>
* </p>
*
* <p>
* The expected usage by a thread that is taking an action such that any pending
* <code>putFromLoad</code> may have stale data and should not cache it is to either
* call
*
* <ul>
* <li> {@link #invalidateKey(Object)} (for a single key invalidation)</li>
* <li>or {@link #invalidateRegion()} (for a general invalidation all pending puts)</li>
* </ul>
* </p>
*
* <p>
* This class also supports the concept of "naked puts", which are calls to
* {@link #acquirePutFromLoadLock(Object)} without a preceding {@link #registerPendingPut(Object)}
* call.
* </p>
*
* @author Brian Stansberry
*
@ -54,21 +85,21 @@ import org.hibernate.cache.CacheException;
public class PutFromLoadValidator {
/**
* Period in ms after a removal during which a call to
* {@link #isPutValid(Object)} that hasn't been
* {@link #acquirePutFromLoadLock(Object)} that hasn't been
* {@link #registerPendingPut(Object) pre-registered} (aka a "naked put")
* will return false.
*/
public static final long NAKED_PUT_INVALIDATION_PERIOD = 10 * 1000;
public static final long NAKED_PUT_INVALIDATION_PERIOD = 20 * 1000;
/** Period after which a pending put is placed in the over-age queue */
/** Period (in ms) after which a pending put is placed in the over-age queue */
private static final long PENDING_PUT_OVERAGE_PERIOD = 5 * 1000;
/** Period before which we stop trying to clean out pending puts */
/** Period (in ms) before which we stop trying to clean out pending puts */
private static final long PENDING_PUT_RECENT_PERIOD = 2 * 1000;
/**
* Period after which a pending put is never expected to come in and should
* be cleaned
* Period (in ms) after which a pending put is never expected to come in
* and should be cleaned
*/
private static final long MAX_PENDING_PUT_DELAY = 2 * 60 * 1000;
@ -128,7 +159,7 @@ public class PutFromLoadValidator {
* Creates a new PutFromLoadValidator.
*
* @param transactionManager
* transaction manager to use to associated changes with a
* transaction manager to use to associate changes with a
* transaction; may be <code>null</code>
*/
public PutFromLoadValidator(TransactionManager transactionManager) {
@ -153,41 +184,147 @@ public class PutFromLoadValidator {
// ----------------------------------------------------------------- Public
public boolean isPutValid(Object key) {
/**
* Acquire a lock giving the calling thread the right to put data in the
* cache for the given key.
* <p>
* <strong>NOTE:</strong> A call to this method that returns <code>true</code>
* should always be matched with a call to {@link #releasePutFromLoadLock(Object)}.
* </p>
*
* @param key the key
*
* @return <code>true</code> if the lock is acquired and the cache put
* can proceed; <code>false</code> if the data should not be cached
*/
public boolean acquirePutFromLoadLock(Object key) {
boolean valid = false;
boolean locked = false;
long now = System.currentTimeMillis();
PendingPutMap pending = pendingPuts.get(key);
if (pending != null) {
synchronized (pending) {
PendingPut toCancel = pending.remove(getOwnerForPut());
valid = toCancel != null;
if (valid) {
toCancel.completed = true;
if (pending.size() == 0) {
pendingPuts.remove(key);
// Important: Do cleanup before we acquire any locks so we
// don't deadlock with invalidateRegion
cleanOutdatedPendingPuts(now, true);
try {
PendingPutMap pending = pendingPuts.get(key);
if (pending != null) {
locked = pending.acquireLock(100, TimeUnit.MILLISECONDS);
if (locked) {
try {
PendingPut toCancel = pending.remove(getOwnerForPut());
if (toCancel != null) {
valid = !toCancel.completed;
toCancel.completed = true;
}
}
finally {
if (!valid) {
pending.releaseLock();
locked = false;
}
}
}
}
else {
// Key wasn't in pendingPuts, so either this is a "naked put"
// or regionRemoved has been called. Check if we can proceed
if (now > invalidationTimestamp) {
Long removedTime = recentRemovals.get(key);
if (removedTime == null || now > removedTime.longValue()) {
// It's legal to proceed. But we have to record this key
// in pendingPuts so releasePutFromLoadLock can find it.
// To do this we basically simulate a normal "register
// then acquire lock" pattern
registerPendingPut(key);
locked = acquirePutFromLoadLock(key);
valid = locked;
}
}
}
}
if (!valid) {
if (now > invalidationTimestamp) {
Long removedTime = recentRemovals.get(key);
if (removedTime == null || now > removedTime.longValue()) {
valid = true;
catch (Throwable t) {
valid = false;
if (locked) {
PendingPutMap toRelease = pendingPuts.get(key);
if (toRelease != null) {
toRelease.releaseLock();
}
}
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
}
else if (t instanceof Error) {
throw (Error) t;
}
else {
throw new RuntimeException(t);
}
}
cleanOutdatedPendingPuts(now, true);
return valid;
}
/**
* Releases the lock previously obtained by a call to
* {@link #acquirePutFromLoadLock(Object)} that returned <code>true</code>.
*
* @param key the key
*/
public void releasePutFromLoadLock(Object key) {
PendingPutMap pending = pendingPuts.get(key);
if (pending != null) {
if (pending.size() == 0) {
pendingPuts.remove(key);
}
pending.releaseLock();
}
}
public void keyRemoved(Object key) {
/**
* Invalidates any {@link #registerPendingPut(Object) previously registered pending puts}
* ensuring a subsequent call to {@link #acquirePutFromLoadLock(Object)} will
* return <code>false</code>.
* <p>
* This method will block until any concurrent thread that has
* {@link #acquirePutFromLoadLock(Object) acquired the putFromLoad lock} for
* the given key has released the lock. This allows the caller to be certain
* the putFromLoad will not execute after this method returns, possibly
* caching stale data.
* </p>
*
* @param key key identifying data whose pending puts should be invalidated
*
* @return <code>true</code> if the invalidation was successful; <code>false</code>
* if a problem occured (which the caller should treat as an
* exception condition)
*/
public boolean invalidateKey(Object key) {
boolean success = true;
// Invalidate any pending puts
pendingPuts.remove(key);
PendingPutMap pending = pendingPuts.get(key);
if (pending != null) {
// This lock should be available very quickly, but we'll be
// very patient waiting for it as callers should treat not
// acquiring it as an exception condition
if (pending.acquireLock(60, TimeUnit.SECONDS)) {
try {
pending.invalidate();
}
finally {
pending.releaseLock();
}
}
else {
success = false;
}
}
// Record when this occurred to invalidate later naked puts
RecentRemoval removal = new RecentRemoval(key,
@ -224,55 +361,96 @@ public class PutFromLoadValidator {
}
}
}
return success;
}
public void regionRemoved() {
/**
* Invalidates all {@link #registerPendingPut(Object) previously registered pending puts}
* ensuring a subsequent call to {@link #acquirePutFromLoadLock(Object)} will
* return <code>false</code>.
* <p>
* This method will block until any concurrent thread that has
* {@link #acquirePutFromLoadLock(Object) acquired the putFromLoad lock} for
* the any key has released the lock. This allows the caller to be certain
* the putFromLoad will not execute after this method returns, possibly
* caching stale data.
* </p>
*
* @return <code>true</code> if the invalidation was successful; <code>false</code>
* if a problem occured (which the caller should treat as an
* exception condition)
*/
public boolean invalidateRegion() {
boolean ok = false;
invalidationTimestamp = System.currentTimeMillis()
+ this.nakedPutInvalidationPeriod;
pendingLock.lock();
try {
// Acquire the lock for each entry to ensure any ongoing
// work associated with it is completed before we return
for (PendingPutMap entry : pendingPuts.values()) {
if (entry.acquireLock(60, TimeUnit.SECONDS)) {
try {
entry.invalidate();
}
finally {
entry.releaseLock();
}
}
else {
ok = false;
}
}
removalsLock.lock();
try {
pendingPuts.clear();
pendingQueue.clear();
overagePendingQueue.clear();
recentRemovals.clear();
removalsQueue.clear();
earliestRemovalTimestamp = invalidationTimestamp;
removalsQueue.clear();
ok = true;
} finally {
removalsLock.unlock();
}
} finally {
pendingLock.unlock();
}
catch (Exception e) {
ok = false;
}
finally {
earliestRemovalTimestamp = invalidationTimestamp;
}
return ok;
}
/**
* Notifies this validator that it is expected that a database read followed
* by a subsequent {@link #isPutValid(Object)} call will occur. The intent
* by a subsequent {@link #acquirePutFromLoadLock(Object)} call will occur. The intent
* is this method would be called following a cache miss wherein it is
* expected that a database read plus cache put will occur. Calling this
* method allows the validator to treat the subsequent
* <code>isPutValid</code> as if the database read occurred when this method
* <code>acquirePutFromLoadLock</code> as if the database read occurred when this method
* was invoked. This allows the validator to compare the timestamp of this
* call against the timestamp of subsequent removal notifications. A put
* that occurs without this call preceding it is "naked"; i.e the validator
* must assume the put is not valid if any relevant removal has occurred
* within {@link #NAKED_PUT_INVALIDATION_PERIOD} milliseconds.
*
* @param key
* key that will be used for subsequent put
* @param key key that will be used for subsequent cache put
*/
public void registerPendingPut(Object key) {
PendingPut pendingPut = new PendingPut(key, getOwnerForPut());
PendingPutMap pendingForKey = new PendingPutMap();
synchronized (pendingForKey) {
for (;;) {
PendingPutMap existing = pendingPuts.putIfAbsent(key,
pendingForKey);
if (existing != null && existing != pendingForKey) {
synchronized (existing) {
PendingPutMap pendingForKey = new PendingPutMap(pendingPut);
for (;;) {
PendingPutMap existing = pendingPuts.putIfAbsent(key,
pendingForKey);
if (existing != null) {
if (existing.acquireLock(10, TimeUnit.SECONDS)) {
try {
existing.put(pendingPut);
PendingPutMap doublecheck = pendingPuts.putIfAbsent(
key, existing);
@ -281,10 +459,17 @@ public class PutFromLoadValidator {
}
// else we hit a race and need to loop to try again
}
} else {
pendingForKey.put(pendingPut);
finally {
existing.releaseLock();
}
}
else {
// Can't get the lock; when we come back we'll be a "naked put"
break;
}
} else {
// normal case
break;
}
}
@ -343,7 +528,9 @@ public class PutFromLoadValidator {
pendingLock.lock();
try {
pendingQueue.add(new WeakReference<PendingPut>(pendingPut));
cleanOutdatedPendingPuts(pendingPut.timestamp, false);
if (pendingQueue.size() > 1) {
cleanOutdatedPendingPuts(pendingPut.timestamp, false);
}
} finally {
pendingLock.unlock();
}
@ -356,9 +543,7 @@ public class PutFromLoadValidator {
pendingLock.lock();
}
try {
// Clean items out of the basic queue
long overaged = now - this.pendingPutOveragePeriod;
long recent = now - this.pendingPutRecentPeriod;
@ -411,31 +596,62 @@ public class PutFromLoadValidator {
if (toClean != null) {
PendingPutMap map = pendingPuts.get(toClean.key);
if (map != null) {
synchronized (map) {
PendingPut cleaned = map.remove(toClean.owner);
if (toClean.equals(cleaned) == false) {
// Oops. Restore it.
map.put(cleaned);
} else if (map.size() == 0) {
pendingPuts.remove(toClean.key);
if (map.acquireLock(100, TimeUnit.MILLISECONDS)) {
try {
PendingPut cleaned = map.remove(toClean.owner);
if (toClean.equals(cleaned) == false) {
// Oops. Restore it.
map.put(cleaned);
} else if (map.size() == 0) {
pendingPuts.remove(toClean.key);
}
}
finally {
map.releaseLock();
}
}
else {
// Something's gone wrong and the lock isn't being released.
// We removed toClean from the queue and need to restore it
// TODO this is pretty dodgy
restorePendingPut(toClean);
}
}
}
}
private void restorePendingPut(PendingPut toRestore) {
pendingLock.lock();
try {
// Give it a new lease on life so it's not out of order. We could
// scan the queue and put toRestore back at the front, but then
// we'll just immediately try removing it again; instead we
// let it cycle through the queue again
toRestore.refresh();
pendingQueue.add(new WeakReference<PendingPut>(toRestore));
}
finally {
pendingLock.unlock();
}
}
/**
* Lazy-initialization map for PendingPut. Optimized for the expected usual
* case where only a single put is pending for a given key.
*
* This class is NOT THREAD SAFE. All operations on it must be performed
* with the object monitor held.
* with the lock held.
*/
private static class PendingPutMap {
private PendingPut singlePendingPut;
private Map<Object, PendingPut> fullMap;
private final Lock lock = new ReentrantLock();
PendingPutMap(PendingPut singleItem) {
this.singlePendingPut = singleItem;
}
public void put(PendingPut pendingPut) {
if (singlePendingPut == null) {
if (fullMap == null) {
@ -471,18 +687,46 @@ public class PutFromLoadValidator {
return fullMap == null ? (singlePendingPut == null ? 0 : 1)
: fullMap.size();
}
public boolean acquireLock(long time, TimeUnit unit) {
try {
return lock.tryLock(time, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
public void releaseLock() {
lock.unlock();
}
public void invalidate() {
if (singlePendingPut != null) {
singlePendingPut.completed = true;
}
else if (fullMap != null) {
for (PendingPut pp : fullMap.values()) {
pp.completed = true;
}
}
}
}
private static class PendingPut {
private final Object key;
private final Object owner;
private final long timestamp = System.currentTimeMillis();
private long timestamp = System.currentTimeMillis();
private volatile boolean completed;
private PendingPut(Object key, Object owner) {
this.key = key;
this.owner = owner;
}
private void refresh() {
timestamp = System.currentTimeMillis();
}
}

View File

@ -83,12 +83,17 @@ public class TransactionalAccessDelegate {
if (!region.checkValid())
return false;
if (!putValidator.isPutValid(key))
if (!putValidator.acquirePutFromLoadLock(key))
return false;
region.ensureRegionRootExists();
return CacheHelper.putForExternalRead(cache, regionFqn, key, value);
try {
region.ensureRegionRootExists();
return CacheHelper.putForExternalRead(cache, regionFqn, key, value);
}
finally {
putValidator.releasePutFromLoadLock(key);
}
}
public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride)
@ -97,15 +102,20 @@ public class TransactionalAccessDelegate {
if (!region.checkValid())
return false;
if (!putValidator.isPutValid(key))
if (!putValidator.acquirePutFromLoadLock(key))
return false;
region.ensureRegionRootExists();
// We ignore minimalPutOverride. JBossCache putForExternalRead is
// already about as minimal as we can get; it will promptly return
// if it discovers that the node we want to write to already exists
return CacheHelper.putForExternalRead(cache, regionFqn, key, value);
try {
region.ensureRegionRootExists();
// We ignore minimalPutOverride. JBossCache putForExternalRead is
// already about as minimal as we can get; it will promptly return
// if it discovers that the node we want to write to already exists
return CacheHelper.putForExternalRead(cache, regionFqn, key, value);
}
finally {
putValidator.releasePutFromLoadLock(key);
}
}
public SoftLock lockItem(Object key, Object version) throws CacheException {
@ -163,7 +173,9 @@ public class TransactionalAccessDelegate {
public void remove(Object key) throws CacheException {
putValidator.keyRemoved(key);
if (!putValidator.invalidateKey(key)) {
throw new CacheException("Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName());
}
// We remove whether or not the region is valid. Other nodes
// may have already restored the region so they need to
@ -175,13 +187,17 @@ public class TransactionalAccessDelegate {
}
public void removeAll() throws CacheException {
putValidator.regionRemoved();
if (!putValidator.invalidateRegion()) {
throw new CacheException("Failed to invalidate pending putFromLoad calls for region " + region.getName());
}
CacheHelper.removeAll(cache, regionFqn);
}
public void evict(Object key) throws CacheException {
putValidator.keyRemoved(key);
if (!putValidator.invalidateKey(key)) {
throw new CacheException("Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName());
}
region.ensureRegionRootExists();
@ -189,7 +205,10 @@ public class TransactionalAccessDelegate {
}
public void evictAll() throws CacheException {
putValidator.regionRemoved();
if (!putValidator.invalidateRegion()) {
throw new CacheException("Failed to invalidate pending putFromLoad calls for region " + region.getName());
}
Transaction tx = region.suspend();
try {
region.ensureRegionRootExists();

View File

@ -23,11 +23,15 @@
*/
package org.hibernate.test.cache.jbc.access;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@ -87,7 +91,7 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
if (transactional) {
tm.begin();
}
assertTrue(testee.isPutValid(KEY1));
assertTrue(testee.acquirePutFromLoadLock(KEY1));
}
public void testRegisteredPut() throws Exception {
@ -105,7 +109,16 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
tm.begin();
}
testee.registerPendingPut(KEY1);
assertTrue(testee.isPutValid(KEY1));
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertTrue(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
}
}
}
public void testNakedPutAfterKeyRemoval() throws Exception {
@ -129,14 +142,23 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
PutFromLoadValidator testee = new PutFromLoadValidator(
transactional ? tm : null);
if (removeRegion) {
testee.regionRemoved();
testee.invalidateRegion();
} else {
testee.keyRemoved(KEY1);
testee.invalidateKey(KEY1);
}
if (transactional) {
tm.begin();
}
assertFalse(testee.isPutValid(KEY1));
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertFalse(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
}
}
}
@ -163,15 +185,24 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
PutFromLoadValidator testee = new PutFromLoadValidator(
transactional ? tm : null);
if (removeRegion) {
testee.regionRemoved();
testee.invalidateRegion();
} else {
testee.keyRemoved(KEY1);
testee.invalidateKey(KEY1);
}
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
assertTrue(testee.isPutValid(KEY1));
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertTrue(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
}
}
}
public void testRegisteredPutWithInterveningKeyRemoval() throws Exception {
@ -202,11 +233,20 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
}
testee.registerPendingPut(KEY1);
if (removeRegion) {
testee.regionRemoved();
testee.invalidateRegion();
} else {
testee.keyRemoved(KEY1);
testee.invalidateKey(KEY1);
}
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertFalse(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
}
}
assertFalse(testee.isPutValid(KEY1));
}
public void testDelayedNakedPutAfterKeyRemoval() throws Exception {
@ -232,15 +272,24 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
PutFromLoadValidator testee = new TestValidator(transactional ? tm
: null, 100, 1000, 500, 10000);
if (removeRegion) {
testee.regionRemoved();
testee.invalidateRegion();
} else {
testee.keyRemoved(KEY1);
testee.invalidateKey(KEY1);
}
if (transactional) {
tm.begin();
}
Thread.sleep(110);
assertTrue(testee.isPutValid(KEY1));
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertTrue(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
}
}
}
@ -268,8 +317,13 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
testee.registerPendingPut(KEY1);
registeredLatch.countDown();
registeredLatch.await(5, TimeUnit.SECONDS);
if (testee.isPutValid(KEY1)) {
success.incrementAndGet();
if (testee.acquirePutFromLoadLock(KEY1)) {
try {
success.incrementAndGet();
}
finally {
testee.releasePutFromLoadLock(KEY1);
}
}
finishedLatch.countDown();
}
@ -284,7 +338,7 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
// Start with a removal so the "isPutValid" calls will fail if
// any of the concurrent activity isn't handled properly
testee.regionRemoved();
testee.invalidateRegion();
// Do the registration + isPutValid calls
executor.execute(r);
@ -303,13 +357,13 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
*/
public void testRemovalCleanup() throws Exception {
TestValidator testee = new TestValidator(null, 200, 1000, 500, 10000);
testee.keyRemoved("KEY1");
testee.keyRemoved("KEY2");
testee.invalidateKey("KEY1");
testee.invalidateKey("KEY2");
Thread.sleep(210);
assertEquals(2, testee.getRemovalQueueLength());
testee.keyRemoved("KEY1");
testee.invalidateKey("KEY1");
assertEquals(2, testee.getRemovalQueueLength());
testee.keyRemoved("KEY2");
testee.invalidateKey("KEY2");
assertEquals(2, testee.getRemovalQueueLength());
}
@ -324,7 +378,7 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
// Start with a regionRemoval so we can confirm at the end that all
// registrations have been cleaned out
testee.regionRemoved();
testee.invalidateRegion();
testee.registerPendingPut("1");
testee.registerPendingPut("2");
@ -332,8 +386,10 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
testee.registerPendingPut("4");
testee.registerPendingPut("5");
testee.registerPendingPut("6");
testee.isPutValid("6");
testee.isPutValid("2");
testee.acquirePutFromLoadLock("6");
testee.releasePutFromLoadLock("6");
testee.acquirePutFromLoadLock("2");
testee.releasePutFromLoadLock("2");
// ppq = [1,2(c),3,4,5,6(c)]
assertEquals(6, testee.getPendingPutQueueLength());
assertEquals(0, testee.getOveragePendingPutQueueLength());
@ -358,7 +414,8 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
// Sleep past "maxPendingPutDelay"
Thread.sleep(310);
testee.isPutValid("3");
testee.acquirePutFromLoadLock("3");
testee.releasePutFromLoadLock("3");
// White box -- should have cleaned out 1 (overage) and
// moved 7 to overage queue
// oppq = [3(c),4,5,7] ppq=[8]
@ -380,20 +437,115 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
// Validate that only expected items can do puts, thus indirectly
// proving the others have been cleaned out of pendingPuts map
assertFalse(testee.isPutValid("1"));
boolean locked = testee.acquirePutFromLoadLock("1");
if (locked) {
testee.releasePutFromLoadLock("1");
}
assertFalse(locked);
// 5 was overage, so should have been cleaned
assertEquals(2, testee.getOveragePendingPutQueueLength());
assertFalse(testee.isPutValid("2"));
locked = testee.acquirePutFromLoadLock("2");
if (locked) {
testee.releasePutFromLoadLock("1");
}
assertFalse(locked);
// 7 was overage, so should have been cleaned
assertEquals(1, testee.getOveragePendingPutQueueLength());
assertFalse(testee.isPutValid("3"));
assertFalse(testee.isPutValid("4"));
assertFalse(testee.isPutValid("5"));
assertFalse(testee.isPutValid("6"));
assertFalse(testee.isPutValid("7"));
assertTrue(testee.isPutValid("8"));
locked = testee.acquirePutFromLoadLock("3");
if (locked) {
testee.releasePutFromLoadLock("1");
}
assertFalse(locked);
locked = testee.acquirePutFromLoadLock("4");
if (locked) {
testee.releasePutFromLoadLock("1");
}
assertFalse(locked);
locked = testee.acquirePutFromLoadLock("5");
if (locked) {
testee.releasePutFromLoadLock("1");
}
assertFalse(locked);
locked = testee.acquirePutFromLoadLock("1");
if (locked) {
testee.releasePutFromLoadLock("1");
}
assertFalse(testee.acquirePutFromLoadLock("6"));
locked = testee.acquirePutFromLoadLock("7");
if (locked) {
testee.releasePutFromLoadLock("1");
}
assertFalse(locked);
assertTrue(testee.acquirePutFromLoadLock("8"));
testee.releasePutFromLoadLock("8");
tm.resume(tx);
assertTrue(testee.isPutValid("7"));
assertTrue(testee.acquirePutFromLoadLock("7"));
testee.releasePutFromLoadLock("7");
}
public void testInvalidateKeyBlocksForInProgressPut() throws Exception {
invalidationBlocksForInProgressPutTest(true);
}
public void testInvalidateRegionBlocksForInProgressPut() throws Exception {
invalidationBlocksForInProgressPutTest(false);
}
private void invalidationBlocksForInProgressPutTest(final boolean keyOnly) throws Exception {
final PutFromLoadValidator testee = new PutFromLoadValidator(null);
final CountDownLatch removeLatch = new CountDownLatch(1);
final CountDownLatch pferLatch = new CountDownLatch(1);
final AtomicReference<Object> cache = new AtomicReference<Object>("INITIAL");
Callable<Boolean> pferCallable = new Callable<Boolean>() {
public Boolean call() throws Exception {
testee.registerPendingPut(KEY1);
if (testee.acquirePutFromLoadLock(KEY1)) {
try {
removeLatch.countDown();
pferLatch.await();
cache.set("PFER");
return Boolean.TRUE;
}
finally {
testee.releasePutFromLoadLock(KEY1);
}
}
return Boolean.FALSE;
}
};
Callable<Void> invalidateCallable = new Callable<Void>() {
public Void call() throws Exception {
removeLatch.await();
if (keyOnly) {
testee.invalidateKey(KEY1);
}
else {
testee.invalidateRegion();
}
cache.set(null);
return null;
}
};
ExecutorService executorService = Executors.newCachedThreadPool();
Future<Boolean> pferFuture = executorService.submit(pferCallable);
Future<Void> invalidateFuture = executorService.submit(invalidateCallable);
try {
invalidateFuture.get(1, TimeUnit.SECONDS);
fail("invalidateFuture did not block");
}
catch (TimeoutException good) {}
pferLatch.countDown();
assertTrue(pferFuture.get(5, TimeUnit.SECONDS));
invalidateFuture.get(5, TimeUnit.SECONDS);
assertNull(cache.get());
}
private static class TestValidator extends PutFromLoadValidator {
@ -408,19 +560,16 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
@Override
public int getOveragePendingPutQueueLength() {
// TODO Auto-generated method stub
return super.getOveragePendingPutQueueLength();
}
@Override
public int getPendingPutQueueLength() {
// TODO Auto-generated method stub
return super.getPendingPutQueueLength();
}
@Override
public int getRemovalQueueLength() {
// TODO Auto-generated method stub
return super.getRemovalQueueLength();
}