[HHH-4944] (putFromLoad calls could store stale data) Fixed.

git-svn-id: https://svn.jboss.org/repos/hibernate/core/trunk@18857 1b8cb986-b30d-0410-93ca-fae66ebed9b2
This commit is contained in:
Galder Zamarreno 2010-02-23 12:11:34 +00:00
parent 011ef60c10
commit ddd1d72e5a
6 changed files with 615 additions and 166 deletions

View File

@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -46,6 +47,36 @@ import org.hibernate.cache.CacheException;
* the potential to store stale data, since the data may have been removed from the
* database and the cache between the time when the data was read from the database
* and the actual call to <code>putFromLoad</code>.
* <p>
* The expected usage of this class by a thread that read the cache and did
* not find data is:
*
* <ol>
* <li> Call {@link #registerPendingPut(Object)}</li>
* <li> Read the database</li>
* <li> Call {@link #acquirePutFromLoadLock(Object)}
* <li> if above returns <code>false</code>, the thread should not cache the data;
* only if above returns <code>true</code>, put data in the cache and...</li>
* <li> then call {@link #releasePutFromLoadLock(Object)}</li>
* </ol>
* </p>
*
* <p>
* The expected usage by a thread that is taking an action such that any pending
* <code>putFromLoad</code> may have stale data and should not cache it is to either
* call
*
* <ul>
* <li> {@link #invalidateKey(Object)} (for a single key invalidation)</li>
* <li>or {@link #invalidateRegion()} (for a general invalidation all pending puts)</li>
* </ul>
* </p>
*
* <p>
* This class also supports the concept of "naked puts", which are calls to
* {@link #acquirePutFromLoadLock(Object)} without a preceding {@link #registerPendingPut(Object)}
* call.
* </p>
*
* @author Brian Stansberry
*
@ -53,21 +84,22 @@ import org.hibernate.cache.CacheException;
*/
public class PutFromLoadValidator {
/**
* Period in ms after a removal during which a call to {@link #isPutValid(Object)} that hasn't
* been {@link #registerPendingPut(Object) pre-registered} (aka a "naked put") will return false.
* Period (in ms) after a removal during which a call to
* {@link #acquirePutFromLoadLock(Object)} that hasn't been
* {@link #registerPendingPut(Object) pre-registered} (aka a "naked put")
* will return false.
* will return false.
*/
public static final long NAKED_PUT_INVALIDATION_PERIOD = 10 * 1000;
public static final long NAKED_PUT_INVALIDATION_PERIOD = TimeUnit.SECONDS.toMillis(20);
/** Period after which a pending put is placed in the over-age queue */
private static final long PENDING_PUT_OVERAGE_PERIOD = 5 * 1000;
/** Period (in ms) after which a pending put is placed in the over-age queue */
private static final long PENDING_PUT_OVERAGE_PERIOD = TimeUnit.SECONDS.toMillis(5);
/** Period before which we stop trying to clean out pending puts */
private static final long PENDING_PUT_RECENT_PERIOD = 2 * 1000;
/** Period (in ms) before which we stop trying to clean out pending puts */
private static final long PENDING_PUT_RECENT_PERIOD = TimeUnit.SECONDS.toMillis(2);
/**
* Period after which a pending put is never expected to come in and should be cleaned
*/
private static final long MAX_PENDING_PUT_DELAY = 2 * 60 * 1000;
/** Period (in ms) after which a pending put is never expected to come in and should be cleaned */
private static final long MAX_PENDING_PUT_DELAY = TimeUnit.SECONDS.toMillis(2 * 60);
/**
* Used to determine whether the owner of a pending put is a thread or a transaction
@ -119,7 +151,7 @@ public class PutFromLoadValidator {
* Creates a new PutFromLoadValidator.
*
* @param transactionManager
* transaction manager to use to associated changes with a transaction; may be
* transaction manager to use to associate changes with a transaction; may be
* <code>null</code>
*/
public PutFromLoadValidator(TransactionManager transactionManager) {
@ -142,41 +174,136 @@ public class PutFromLoadValidator {
// ----------------------------------------------------------------- Public
public boolean isPutValid(Object key) {
/**
* Acquire a lock giving the calling thread the right to put data in the
* cache for the given key.
* <p>
* <strong>NOTE:</strong> A call to this method that returns <code>true</code>
* should always be matched with a call to {@link #releasePutFromLoadLock(Object)}.
* </p>
*
* @param key the key
*
* @return <code>true</code> if the lock is acquired and the cache put
* can proceed; <code>false</code> if the data should not be cached
*/
public boolean acquirePutFromLoadLock(Object key) {
boolean valid = false;
boolean locked = false;
long now = System.currentTimeMillis();
// Important: Do cleanup before we acquire any locks so we
// don't deadlock with invalidateRegion
cleanOutdatedPendingPuts(now, true);
try {
PendingPutMap pending = pendingPuts.get(key);
if (pending != null) {
synchronized (pending) {
locked = pending.acquireLock(100, TimeUnit.MILLISECONDS);
if (locked) {
try {
PendingPut toCancel = pending.remove(getOwnerForPut());
valid = toCancel != null;
if (valid) {
if (toCancel != null) {
valid = !toCancel.completed;
toCancel.completed = true;
if (pending.size() == 0) {
pendingPuts.remove(key);
}
}
}
}
finally {
if (!valid) {
pending.releaseLock();
locked = false;
}
}
}
}
else {
// Key wasn't in pendingPuts, so either this is a "naked put"
// or regionRemoved has been called. Check if we can proceed
if (now > invalidationTimestamp) {
Long removedTime = recentRemovals.get(key);
if (removedTime == null || now > removedTime.longValue()) {
valid = true;
// It's legal to proceed. But we have to record this key
// in pendingPuts so releasePutFromLoadLock can find it.
// To do this we basically simulate a normal "register
// then acquire lock" pattern
registerPendingPut(key);
locked = acquirePutFromLoadLock(key);
valid = locked;
}
}
}
}
catch (Throwable t) {
valid = false;
if (locked) {
PendingPutMap toRelease = pendingPuts.get(key);
if (toRelease != null) {
toRelease.releaseLock();
}
}
cleanOutdatedPendingPuts(now, true);
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else if (t instanceof Error) {
throw (Error) t;
} else {
throw new RuntimeException(t);
}
}
return valid;
}
public void keyRemoved(Object key) {
// Invalidate any pending puts
/**
* Releases the lock previously obtained by a call to
* {@link #acquirePutFromLoadLock(Object)} that returned <code>true</code>.
*
* @param key the key
*/
public void releasePutFromLoadLock(Object key) {
PendingPutMap pending = pendingPuts.get(key);
if (pending != null) {
if (pending.size() == 0) {
pendingPuts.remove(key);
}
pending.releaseLock();
}
}
/**
* Invalidates any {@link #registerPendingPut(Object) previously registered pending puts} ensuring a subsequent call to
* {@link #acquirePutFromLoadLock(Object)} will return <code>false</code>. <p> This method will block until any
* concurrent thread that has {@link #acquirePutFromLoadLock(Object) acquired the putFromLoad lock} for the given key
* has released the lock. This allows the caller to be certain the putFromLoad will not execute after this method
* returns, possibly caching stale data. </p>
*
* @param key key identifying data whose pending puts should be invalidated
* @return <code>true</code> if the invalidation was successful; <code>false</code> if a problem occured (which the
* caller should treat as an exception condition)
*/
public boolean invalidateKey(Object key) {
boolean success = true;
// Invalidate any pending puts
PendingPutMap pending = pendingPuts.get(key);
if (pending != null) {
// This lock should be available very quickly, but we'll be
// very patient waiting for it as callers should treat not
// acquiring it as an exception condition
if (pending.acquireLock(60, TimeUnit.SECONDS)) {
try {
pending.invalidate();
}
finally {
pending.releaseLock();
}
} else {
success = false;
}
}
// Record when this occurred to invalidate later naked puts
RecentRemoval removal = new RecentRemoval(key, this.nakedPutInvalidationPeriod);
@ -210,51 +337,83 @@ public class PutFromLoadValidator {
}
}
}
return success;
}
public void cleared() {
/**
* Invalidates all {@link #registerPendingPut(Object) previously registered pending puts} ensuring a subsequent call to
* {@link #acquirePutFromLoadLock(Object)} will return <code>false</code>. <p> This method will block until any
* concurrent thread that has {@link #acquirePutFromLoadLock(Object) acquired the putFromLoad lock} for the any key has
* released the lock. This allows the caller to be certain the putFromLoad will not execute after this method returns,
* possibly caching stale data. </p>
*
* @return <code>true</code> if the invalidation was successful; <code>false</code> if a problem occured (which the
* caller should treat as an exception condition)
*/
public boolean invalidateRegion() {
boolean ok = false;
invalidationTimestamp = System.currentTimeMillis() + this.nakedPutInvalidationPeriod;
pendingLock.lock();
try {
// Acquire the lock for each entry to ensure any ongoing
// work associated with it is completed before we return
for (PendingPutMap entry : pendingPuts.values()) {
if (entry.acquireLock(60, TimeUnit.SECONDS)) {
try {
entry.invalidate();
}
finally {
entry.releaseLock();
}
} else {
ok = false;
}
}
removalsLock.lock();
try {
pendingPuts.clear();
pendingQueue.clear();
overagePendingQueue.clear();
recentRemovals.clear();
removalsQueue.clear();
earliestRemovalTimestamp = invalidationTimestamp;
ok = true;
} finally {
removalsLock.unlock();
}
} finally {
pendingLock.unlock();
}
catch (Exception e) {
ok = false;
}
finally {
earliestRemovalTimestamp = invalidationTimestamp;
}
return ok;
}
/**
* Notifies this validator that it is expected that a database read followed by a subsequent
* {@link #isPutValid(Object)} call will occur. The intent is this method would be called
* following a cache miss wherein it is expected that a database read plus cache put will occur.
* Calling this method allows the validator to treat the subsequent <code>isPutValid</code> as if
* the database read occurred when this method was invoked. This allows the validator to compare
* the timestamp of this call against the timestamp of subsequent removal notifications. A put
* that occurs without this call preceding it is "naked"; i.e the validator must assume the put
* is not valid if any relevant removal has occurred within
* {@link #NAKED_PUT_INVALIDATION_PERIOD} milliseconds.
* Notifies this validator that it is expected that a database read followed by a subsequent {@link
* #acquirePutFromLoadLock(Object)} call will occur. The intent is this method would be called following a cache miss
* wherein it is expected that a database read plus cache put will occur. Calling this method allows the validator to
* treat the subsequent <code>acquirePutFromLoadLock</code> as if the database read occurred when this method was
* invoked. This allows the validator to compare the timestamp of this call against the timestamp of subsequent removal
* notifications. A put that occurs without this call preceding it is "naked"; i.e the validator must assume the put is
* not valid if any relevant removal has occurred within {@link #NAKED_PUT_INVALIDATION_PERIOD} milliseconds.
*
* @param key
* key that will be used for subsequent put
* @param key key that will be used for subsequent cache put
*/
public void registerPendingPut(Object key) {
PendingPut pendingPut = new PendingPut(key, getOwnerForPut());
PendingPutMap pendingForKey = new PendingPutMap();
synchronized (pendingForKey) {
PendingPutMap pendingForKey = new PendingPutMap(pendingPut);
for (;;) {
PendingPutMap existing = pendingPuts.putIfAbsent(key, pendingForKey);
if (existing != null && existing != pendingForKey) {
synchronized (existing) {
if (existing != null) {
if (existing.acquireLock(10, TimeUnit.SECONDS)) {
try {
existing.put(pendingPut);
PendingPutMap doublecheck = pendingPuts.putIfAbsent(key, existing);
if (doublecheck == null || doublecheck == existing) {
@ -262,10 +421,16 @@ public class PutFromLoadValidator {
}
// else we hit a race and need to loop to try again
}
finally {
existing.releaseLock();
}
} else {
pendingForKey.put(pendingPut);
// Can't get the lock; when we come back we'll be a "naked put"
break;
}
} else {
// normal case
break;
}
}
@ -324,7 +489,9 @@ public class PutFromLoadValidator {
pendingLock.lock();
try {
pendingQueue.add(new WeakReference<PendingPut>(pendingPut));
if (pendingQueue.size() > 1) {
cleanOutdatedPendingPuts(pendingPut.timestamp, false);
}
} finally {
pendingLock.unlock();
}
@ -337,9 +504,7 @@ public class PutFromLoadValidator {
pendingLock.lock();
}
try {
// Clean items out of the basic queue
long overaged = now - this.pendingPutOveragePeriod;
long recent = now - this.pendingPutRecentPeriod;
@ -392,7 +557,8 @@ public class PutFromLoadValidator {
if (toClean != null) {
PendingPutMap map = pendingPuts.get(toClean.key);
if (map != null) {
synchronized (map) {
if (map.acquireLock(100, TimeUnit.MILLISECONDS)) {
try {
PendingPut cleaned = map.remove(toClean.owner);
if (toClean.equals(cleaned) == false) {
// Oops. Restore it.
@ -401,21 +567,49 @@ public class PutFromLoadValidator {
pendingPuts.remove(toClean.key);
}
}
finally {
map.releaseLock();
}
} else {
// Something's gone wrong and the lock isn't being released.
// We removed toClean from the queue and need to restore it
// TODO this is pretty dodgy
restorePendingPut(toClean);
}
}
}
}
private void restorePendingPut(PendingPut toRestore) {
pendingLock.lock();
try {
// Give it a new lease on life so it's not out of order. We could
// scan the queue and put toRestore back at the front, but then
// we'll just immediately try removing it again; instead we
// let it cycle through the queue again
toRestore.refresh();
pendingQueue.add(new WeakReference<PendingPut>(toRestore));
}
finally {
pendingLock.unlock();
}
}
/**
* Lazy-initialization map for PendingPut. Optimized for the expected usual case where only a
* single put is pending for a given key.
*
* This class is NOT THREAD SAFE. All operations on it must be performed with the object monitor
* held.
* This class is NOT THREAD SAFE. All operations on it must be performed with the lock held.
*/
private static class PendingPutMap {
private PendingPut singlePendingPut;
private Map<Object, PendingPut> fullMap;
private final Lock lock = new ReentrantLock();
PendingPutMap(PendingPut singleItem) {
this.singlePendingPut = singleItem;
}
public void put(PendingPut pendingPut) {
if (singlePendingPut == null) {
@ -437,7 +631,8 @@ public class PutFromLoadValidator {
public PendingPut remove(Object ownerForPut) {
PendingPut removed = null;
if (fullMap == null) {
if (singlePendingPut != null && singlePendingPut.owner.equals(ownerForPut)) {
if (singlePendingPut != null
&& singlePendingPut.owner.equals(ownerForPut)) {
removed = singlePendingPut;
singlePendingPut = null;
}
@ -448,14 +643,38 @@ public class PutFromLoadValidator {
}
public int size() {
return fullMap == null ? (singlePendingPut == null ? 0 : 1) : fullMap.size();
return fullMap == null ? (singlePendingPut == null ? 0 : 1)
: fullMap.size();
}
public boolean acquireLock(long time, TimeUnit unit) {
try {
return lock.tryLock(time, unit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
public void releaseLock() {
lock.unlock();
}
public void invalidate() {
if (singlePendingPut != null) {
singlePendingPut.completed = true;
} else if (fullMap != null) {
for (PendingPut pp : fullMap.values()) {
pp.completed = true;
}
}
}
}
private static class PendingPut {
private final Object key;
private final Object owner;
private final long timestamp = System.currentTimeMillis();
private long timestamp = System.currentTimeMillis();
private volatile boolean completed;
private PendingPut(Object key, Object owner) {
@ -463,6 +682,9 @@ public class PutFromLoadValidator {
this.owner = owner;
}
private void refresh() {
timestamp = System.currentTimeMillis();
}
}
private static class RecentRemoval {

View File

@ -68,29 +68,27 @@ public class TransactionalAccessDelegate {
}
public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version) throws CacheException {
if (!region.checkValid()) {
if (!region.checkValid())
return false;
}
if (!putValidator.isPutValid(key)) {
if (!putValidator.acquirePutFromLoadLock(key))
return false;
}
try {
cacheAdapter.putForExternalRead(key, value);
} finally {
putValidator.releasePutFromLoadLock(key);
}
return true;
}
public boolean putFromLoad(Object key, Object value, long txTimestamp, Object version, boolean minimalPutOverride)
throws CacheException {
boolean trace = log.isTraceEnabled();
if (!region.checkValid()) {
if (trace) log.trace("Region not valid");
return false;
}
if (!putValidator.isPutValid(key)) {
if (trace) log.trace("Put {0} not valid", key);
return false;
}
cacheAdapter.putForExternalRead(key, value);
return true;
// We ignore minimalPutOverride. Infinispan putForExternalRead is
// already about as minimal as we can get; it will promptly return
// if it discovers that the node we want to write to already exists
return putFromLoad(key, value, txTimestamp, version);
}
public SoftLock lockItem(Object key, Object version) throws CacheException {
@ -137,25 +135,33 @@ public class TransactionalAccessDelegate {
}
public void remove(Object key) throws CacheException {
if (!putValidator.invalidateKey(key)) {
throw new CacheException("Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName());
}
// We update whether or not the region is valid. Other nodes
// may have already restored the region so they need to
// be informed of the change.
putValidator.keyRemoved(key);
cacheAdapter.remove(key);
}
public void removeAll() throws CacheException {
putValidator.cleared();
if (!putValidator.invalidateRegion()) {
throw new CacheException("Failed to invalidate pending putFromLoad calls for region " + region.getName());
}
cacheAdapter.clear();
}
public void evict(Object key) throws CacheException {
putValidator.keyRemoved(key);
if (!putValidator.invalidateKey(key)) {
throw new CacheException("Failed to invalidate pending putFromLoad calls for key " + key + " from region " + region.getName());
}
cacheAdapter.remove(key);
}
public void evictAll() throws CacheException {
putValidator.cleared();
if (!putValidator.invalidateRegion()) {
throw new CacheException("Failed to invalidate pending putFromLoad calls for region " + region.getName());
}
Transaction tx = region.suspend();
try {
CacheHelper.sendEvictAllNotification(cacheAdapter, region.getAddress());

View File

@ -23,11 +23,15 @@
*/
package org.hibernate.test.cache.infinispan.access;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@ -42,7 +46,6 @@ import junit.framework.TestCase;
*
* @author Brian Stansberry
* @author Galder Zamarreño
*
* @version $Revision: $
*/
public class PutFromLoadValidatorUnitTestCase extends TestCase {
@ -87,7 +90,15 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
if (transactional) {
tm.begin();
}
assertTrue(testee.isPutValid(KEY1));
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertTrue(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
}
}
}
public void testRegisteredPut() throws Exception {
@ -99,12 +110,22 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
}
private void registeredPutTest(boolean transactional) throws Exception {
PutFromLoadValidator testee = new PutFromLoadValidator(transactional ? tm : null);
PutFromLoadValidator testee = new PutFromLoadValidator(
transactional ? tm : null);
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
assertTrue(testee.isPutValid(KEY1));
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertTrue(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
}
}
}
public void testNakedPutAfterKeyRemoval() throws Exception {
@ -125,17 +146,26 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
private void nakedPutAfterRemovalTest(boolean transactional, boolean removeRegion)
throws Exception {
PutFromLoadValidator testee = new PutFromLoadValidator(transactional ? tm : null);
PutFromLoadValidator testee = new PutFromLoadValidator(
transactional ? tm : null);
if (removeRegion) {
testee.cleared();
testee.invalidateRegion();
} else {
testee.keyRemoved(KEY1);
testee.invalidateKey(KEY1);
}
if (transactional) {
tm.begin();
}
assertFalse(testee.isPutValid(KEY1));
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertFalse(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
}
}
}
public void testRegisteredPutAfterKeyRemoval() throws Exception {
@ -156,17 +186,27 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
private void registeredPutAfterRemovalTest(boolean transactional, boolean removeRegion)
throws Exception {
PutFromLoadValidator testee = new PutFromLoadValidator(transactional ? tm : null);
PutFromLoadValidator testee = new PutFromLoadValidator(
transactional ? tm : null);
if (removeRegion) {
testee.cleared();
testee.invalidateRegion();
} else {
testee.keyRemoved(KEY1);
testee.invalidateKey(KEY1);
}
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
assertTrue(testee.isPutValid(KEY1));
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertTrue(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
}
}
}
public void testRegisteredPutWithInterveningKeyRemoval() throws Exception {
@ -187,17 +227,27 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
private void registeredPutWithInterveningRemovalTest(boolean transactional, boolean removeRegion)
throws Exception {
PutFromLoadValidator testee = new PutFromLoadValidator(transactional ? tm : null);
PutFromLoadValidator testee = new PutFromLoadValidator(
transactional ? tm : null);
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
if (removeRegion) {
testee.cleared();
testee.invalidateRegion();
} else {
testee.keyRemoved(KEY1);
testee.invalidateKey(KEY1);
}
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertFalse(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
}
}
assertFalse(testee.isPutValid(KEY1));
}
public void testDelayedNakedPutAfterKeyRemoval() throws Exception {
@ -218,19 +268,26 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
private void delayedNakedPutAfterRemovalTest(boolean transactional, boolean removeRegion)
throws Exception {
PutFromLoadValidator testee = new TestValidator(transactional ? tm : null, 100, 1000, 500,
10000);
PutFromLoadValidator testee = new TestValidator(transactional ? tm : null, 100, 1000, 500, 10000);
if (removeRegion) {
testee.cleared();
testee.invalidateRegion();
} else {
testee.keyRemoved(KEY1);
testee.invalidateKey(KEY1);
}
if (transactional) {
tm.begin();
}
Thread.sleep(110);
assertTrue(testee.isPutValid(KEY1));
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertTrue(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
}
}
}
public void testMultipleRegistrations() throws Exception {
@ -257,11 +314,17 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
testee.registerPendingPut(KEY1);
registeredLatch.countDown();
registeredLatch.await(5, TimeUnit.SECONDS);
if (testee.isPutValid(KEY1)) {
if (testee.acquirePutFromLoadLock(KEY1)) {
try {
success.incrementAndGet();
}
finally {
testee.releasePutFromLoadLock(KEY1);
}
}
finishedLatch.countDown();
} catch (Exception e) {
}
catch (Exception e) {
e.printStackTrace();
}
}
@ -272,7 +335,7 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
// Start with a removal so the "isPutValid" calls will fail if
// any of the concurrent activity isn't handled properly
testee.cleared();
testee.invalidateRegion();
// Do the registration + isPutValid calls
executor.execute(r);
@ -285,19 +348,20 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
}
/**
* White box test for ensuring key removals get cleaned up.
* White box test for ensuring key removals get cleaned up. <b>Note</b>: Since this test is test sensitive, if you
* add trace logging, it might fail
*
* @throws Exception
*/
public void testRemovalCleanup() throws Exception {
TestValidator testee = new TestValidator(null, 200, 1000, 500, 10000);
testee.keyRemoved("KEY1");
testee.keyRemoved("KEY2");
testee.invalidateKey("KEY1");
testee.invalidateKey("KEY2");
Thread.sleep(210);
assertEquals(2, testee.getRemovalQueueLength());
testee.keyRemoved("KEY1");
testee.invalidateKey("KEY1");
assertEquals(2, testee.getRemovalQueueLength());
testee.keyRemoved("KEY2");
testee.invalidateKey("KEY2");
assertEquals(2, testee.getRemovalQueueLength());
}
@ -311,7 +375,7 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
// Start with a regionRemoval so we can confirm at the end that all
// registrations have been cleaned out
testee.cleared();
testee.invalidateRegion();
testee.registerPendingPut("1");
testee.registerPendingPut("2");
@ -319,8 +383,10 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
testee.registerPendingPut("4");
testee.registerPendingPut("5");
testee.registerPendingPut("6");
testee.isPutValid("6");
testee.isPutValid("2");
testee.acquirePutFromLoadLock("6");
testee.releasePutFromLoadLock("6");
testee.acquirePutFromLoadLock("2");
testee.releasePutFromLoadLock("2");
// ppq = [1,2(c),3,4,5,6(c)]
assertEquals(6, testee.getPendingPutQueueLength());
assertEquals(0, testee.getOveragePendingPutQueueLength());
@ -345,7 +411,8 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
// Sleep past "maxPendingPutDelay"
Thread.sleep(310);
testee.isPutValid("3");
testee.acquirePutFromLoadLock("3");
testee.releasePutFromLoadLock("3");
// White box -- should have cleaned out 1 (overage) and
// moved 7 to overage queue
// oppq = [3(c),4,5,7] ppq=[8]
@ -367,20 +434,114 @@ public class PutFromLoadValidatorUnitTestCase extends TestCase {
// Validate that only expected items can do puts, thus indirectly
// proving the others have been cleaned out of pendingPuts map
assertFalse(testee.isPutValid("1"));
boolean locked = testee.acquirePutFromLoadLock("1");
if (locked) {
testee.releasePutFromLoadLock("1");
}
assertFalse(locked);
// 5 was overage, so should have been cleaned
assertEquals(2, testee.getOveragePendingPutQueueLength());
assertFalse(testee.isPutValid("2"));
locked = testee.acquirePutFromLoadLock("2");
if (locked) {
testee.releasePutFromLoadLock("1");
}
assertFalse(locked);
// 7 was overage, so should have been cleaned
assertEquals(1, testee.getOveragePendingPutQueueLength());
assertFalse(testee.isPutValid("3"));
assertFalse(testee.isPutValid("4"));
assertFalse(testee.isPutValid("5"));
assertFalse(testee.isPutValid("6"));
assertFalse(testee.isPutValid("7"));
assertTrue(testee.isPutValid("8"));
locked = testee.acquirePutFromLoadLock("3");
if (locked) {
testee.releasePutFromLoadLock("1");
}
assertFalse(locked);
locked = testee.acquirePutFromLoadLock("4");
if (locked) {
testee.releasePutFromLoadLock("1");
}
assertFalse(locked);
locked = testee.acquirePutFromLoadLock("5");
if (locked) {
testee.releasePutFromLoadLock("1");
}
assertFalse(locked);
locked = testee.acquirePutFromLoadLock("1");
if (locked) {
testee.releasePutFromLoadLock("1");
}
assertFalse(testee.acquirePutFromLoadLock("6"));
locked = testee.acquirePutFromLoadLock("7");
if (locked) {
testee.releasePutFromLoadLock("1");
}
assertFalse(locked);
assertTrue(testee.acquirePutFromLoadLock("8"));
testee.releasePutFromLoadLock("8");
tm.resume(tx);
assertTrue(testee.isPutValid("7"));
assertTrue(testee.acquirePutFromLoadLock("7"));
testee.releasePutFromLoadLock("7");
}
public void testInvalidateKeyBlocksForInProgressPut() throws Exception {
invalidationBlocksForInProgressPutTest(true);
}
public void testInvalidateRegionBlocksForInProgressPut() throws Exception {
invalidationBlocksForInProgressPutTest(false);
}
private void invalidationBlocksForInProgressPutTest(final boolean keyOnly) throws Exception {
final PutFromLoadValidator testee = new PutFromLoadValidator(null);
final CountDownLatch removeLatch = new CountDownLatch(1);
final CountDownLatch pferLatch = new CountDownLatch(1);
final AtomicReference<Object> cache = new AtomicReference<Object>("INITIAL");
Callable<Boolean> pferCallable = new Callable<Boolean>() {
public Boolean call() throws Exception {
testee.registerPendingPut(KEY1);
if (testee.acquirePutFromLoadLock(KEY1)) {
try {
removeLatch.countDown();
pferLatch.await();
cache.set("PFER");
return Boolean.TRUE;
}
finally {
testee.releasePutFromLoadLock(KEY1);
}
}
return Boolean.FALSE;
}
};
Callable<Void> invalidateCallable = new Callable<Void>() {
public Void call() throws Exception {
removeLatch.await();
if (keyOnly) {
testee.invalidateKey(KEY1);
} else {
testee.invalidateRegion();
}
cache.set(null);
return null;
}
};
ExecutorService executorService = Executors.newCachedThreadPool();
Future<Boolean> pferFuture = executorService.submit(pferCallable);
Future<Void> invalidateFuture = executorService.submit(invalidateCallable);
try {
invalidateFuture.get(1, TimeUnit.SECONDS);
fail("invalidateFuture did not block");
}
catch (TimeoutException good) {}
pferLatch.countDown();
assertTrue(pferFuture.get(5, TimeUnit.SECONDS));
invalidateFuture.get(5, TimeUnit.SECONDS);
assertNull(cache.get());
}
private static class TestValidator extends PutFromLoadValidator {

View File

@ -23,8 +23,15 @@
*/
package org.hibernate.test.cache.infinispan.collection;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import junit.extensions.TestSetup;
import junit.framework.AssertionFailedError;
@ -32,20 +39,29 @@ import junit.framework.Test;
import junit.framework.TestSuite;
import org.hibernate.cache.CacheDataDescription;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.CollectionRegion;
import org.hibernate.cache.access.AccessType;
import org.hibernate.cache.access.CollectionRegionAccessStrategy;
import org.hibernate.cache.impl.CacheDataDescriptionImpl;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
import org.hibernate.cache.infinispan.access.TransactionalAccessDelegate;
import org.hibernate.cache.infinispan.collection.CollectionRegionImpl;
import org.hibernate.cache.infinispan.impl.BaseRegion;
import org.hibernate.cache.infinispan.util.CacheAdapter;
import org.hibernate.cache.infinispan.util.CacheAdapterImpl;
import org.hibernate.cache.infinispan.util.FlagAdapter;
import org.hibernate.cfg.Configuration;
import org.hibernate.test.cache.infinispan.AbstractNonFunctionalTestCase;
import org.hibernate.test.cache.infinispan.functional.cluster.DualNodeJtaTransactionManagerImpl;
import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
import org.hibernate.util.ComparableComparator;
import org.infinispan.Cache;
import org.infinispan.transaction.tm.BatchModeTransactionManager;
import javax.transaction.TransactionManager;
/**
* Base class for tests of CollectionRegionAccessStrategy impls.
*
@ -183,15 +199,64 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
public abstract void testCacheConfiguration();
/**
* Test method for {@link TransactionalAccess#getRegion()}.
* Test method for {@link CollectionRegionAccessStrategy#getRegion()}.
*/
public void testGetRegion() {
assertEquals("Correct region", localCollectionRegion, localAccessStrategy.getRegion());
}
public void testPutFromLoadRemoveDoesNotProduceStaleData() throws Exception {
final CountDownLatch pferLatch = new CountDownLatch(1);
final CountDownLatch removeLatch = new CountDownLatch(1);
TransactionManager tm = DualNodeJtaTransactionManagerImpl.getInstance("test1234");
PutFromLoadValidator validator = new PutFromLoadValidator(tm) {
@Override
public boolean acquirePutFromLoadLock(Object key) {
boolean acquired = super.acquirePutFromLoadLock(key);
try {
removeLatch.countDown();
pferLatch.await(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.debug("Interrupted");
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("Error", e);
throw new RuntimeException("Error", e);
}
return acquired;
}
};
final TransactionalAccessDelegate delegate = new TransactionalAccessDelegate((CollectionRegionImpl) localCollectionRegion, validator);
Callable<Void> pferCallable = new Callable<Void>() {
public Void call() throws Exception {
delegate.putFromLoad("k1", "v1", 0, null);
return null;
}
};
Callable<Void> removeCallable = new Callable<Void>() {
public Void call() throws Exception {
removeLatch.await();
delegate.remove("k1");
pferLatch.countDown();
return null;
}
};
ExecutorService executorService = Executors.newCachedThreadPool();
Future<Void> pferFuture = executorService.submit(pferCallable);
Future<Void> removeFuture = executorService.submit(removeCallable);
pferFuture.get();
removeFuture.get();
assertFalse(localCache.containsKey("k1"));
}
/**
* Test method for
* {@link TransactionalAccess#putFromLoad(java.lang.Object, java.lang.Object, long, java.lang.Object)}
* {@link CollectionRegionAccessStrategy#putFromLoad(java.lang.Object, java.lang.Object, long, java.lang.Object)}
* .
*/
public void testPutFromLoad() throws Exception {
@ -200,7 +265,7 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
/**
* Test method for
* {@link TransactionalAccess#putFromLoad(java.lang.Object, java.lang.Object, long, java.lang.Object, boolean)}
* {@link CollectionRegionAccessStrategy#putFromLoad(java.lang.Object, java.lang.Object, long, java.lang.Object, boolean)}
* .
*/
public void testPutFromLoadMinimal() throws Exception {
@ -339,21 +404,21 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
}
/**
* Test method for {@link TransactionalAccess#remove(java.lang.Object)}.
* Test method for {@link CollectionRegionAccessStrategy#remove(java.lang.Object)}.
*/
public void testRemove() {
evictOrRemoveTest(false);
}
/**
* Test method for {@link TransactionalAccess#removeAll()}.
* Test method for {@link CollectionRegionAccessStrategy#removeAll()}.
*/
public void testRemoveAll() {
evictOrRemoveAllTest(false);
}
/**
* Test method for {@link TransactionalAccess#evict(java.lang.Object)}.
* Test method for {@link CollectionRegionAccessStrategy#evict(java.lang.Object)}.
*
* FIXME add testing of the "immediately without regard for transaction isolation" bit in the
* CollectionRegionAccessStrategy API.
@ -363,7 +428,7 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
}
/**
* Test method for {@link TransactionalAccess#evictAll()}.
* Test method for {@link CollectionRegionAccessStrategy#evictAll()}.
*
* FIXME add testing of the "immediately without regard for transaction isolation" bit in the
* CollectionRegionAccessStrategy API.

View File

@ -340,8 +340,6 @@ public class ConcurrentWriteTest extends SingleNodeTestCase {
/**
* remove existing 'contact' from customer's list of contacts
*
* @param contact
* contact to remove from customer's contacts
* @param customerId
* @throws IllegalStateException
* if customer does not own a contact
@ -421,14 +419,12 @@ public class ConcurrentWriteTest extends SingleNodeTestCase {
try {
// barrier.await();
for (int i = 0; i < ITERATION_COUNT && !TERMINATE_ALL_USERS; i++) {
if (contactExists())
throw new IllegalStateException("contact already exists before add, customerId=" + customerId);
contactExists();
if (trace) log.trace("Add contact for customer " + customerId);
addContact(customerId);
if (trace) log.trace("Added contact");
thinkRandomTime();
if (!contactExists())
throw new IllegalStateException("contact missing after successful add, customerId=" + customerId);
contactExists();
thinkRandomTime();
if (trace) log.trace("Read all customers' first contact");
// read everyone's contacts
@ -438,8 +434,7 @@ public class ConcurrentWriteTest extends SingleNodeTestCase {
if (trace) log.trace("Remove contact of customer" + customerId);
removeContact(customerId);
if (trace) log.trace("Removed contact");
if (contactExists())
throw new IllegalStateException("contact still exists after successful remove call, customerId=" + customerId);
contactExists();
thinkRandomTime();
++completedIterations;
if (log.isTraceEnabled()) log.trace("Iteration completed {0}", completedIterations);

View File

@ -30,8 +30,8 @@ log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p [%t] %c{1}:%L -
log4j.rootLogger=info, stdout
#log4j.logger.org.hibernate.test=info
log4j.logger.org.hibernate.test=trace
log4j.logger.org.hibernate.cache=trace
log4j.logger.org.hibernate.SQL=debug
log4j.logger.org.hibernate.test=info
log4j.logger.org.hibernate.cache=info
log4j.logger.org.hibernate.SQL=info
#log4j.logger.org.jgroups=info
#log4j.logger.org.infinispan=trace