[HHH-3817] Create PutFromLoadValidator class to control whether puts are allowed after removes/evicts

git-svn-id: https://svn.jboss.org/repos/hibernate/core/trunk@17643 1b8cb986-b30d-0410-93ca-fae66ebed9b2
This commit is contained in:
Brian Stansberry 2009-10-07 23:30:50 +00:00
parent c8873dff76
commit d9adc4bc12
11 changed files with 734 additions and 84 deletions

View File

@ -446,20 +446,6 @@ public abstract class BasicRegionAdapter implements Region {
} }
} }
public Object getOwnerForPut()
{
Transaction tx = null;
try {
if (transactionManager != null) {
tx = transactionManager.getTransaction();
}
} catch (SystemException se) {
throw new CacheException("Could not obtain transaction", se);
}
return tx == null ? Thread.currentThread() : tx;
}
/** /**
* Tell the TransactionManager to suspend any ongoing transaction. * Tell the TransactionManager to suspend any ongoing transaction.
* *

View File

@ -34,7 +34,6 @@ import org.hibernate.cache.jbc.util.CacheHelper;
import org.hibernate.cache.jbc.util.DataVersionAdapter; import org.hibernate.cache.jbc.util.DataVersionAdapter;
import org.hibernate.cache.jbc.util.NonLockingDataVersion; import org.hibernate.cache.jbc.util.NonLockingDataVersion;
import org.jboss.cache.config.Option; import org.jboss.cache.config.Option;
import org.jboss.cache.optimistic.DataVersion;
/** /**
* Defines the strategy for transactional access to entity or collection data in * Defines the strategy for transactional access to entity or collection data in
@ -52,8 +51,8 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
protected final CacheDataDescription dataDescription; protected final CacheDataDescription dataDescription;
public OptimisticTransactionalAccessDelegate(TransactionalDataRegionAdapter region) { public OptimisticTransactionalAccessDelegate(TransactionalDataRegionAdapter region, PutFromLoadValidator validator) {
super(region); super(region, validator);
this.dataDescription = region.getCacheDataDescription(); this.dataDescription = region.getCacheDataDescription();
} }
@ -64,7 +63,7 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
*/ */
@Override @Override
public void evict(Object key) throws CacheException { public void evict(Object key) throws CacheException {
pendingPuts.remove(key); putValidator.keyRemoved(key);
region.ensureRegionRootExists(); region.ensureRegionRootExists();
Option opt = NonLockingDataVersion.getInvocationOption(); Option opt = NonLockingDataVersion.getInvocationOption();
@ -76,7 +75,8 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
@Override @Override
public void evictAll() throws CacheException public void evictAll() throws CacheException
{ {
pendingPuts.clear(); putValidator.regionRemoved();
Transaction tx = region.suspend(); Transaction tx = region.suspend();
try { try {
region.ensureRegionRootExists(); region.ensureRegionRootExists();
@ -96,8 +96,6 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
@Override @Override
public boolean insert(Object key, Object value, Object version) throws CacheException { public boolean insert(Object key, Object value, Object version) throws CacheException {
pendingPuts.remove(key);
if (!region.checkValid()) if (!region.checkValid())
return false; return false;
@ -115,7 +113,7 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
if (!region.checkValid()) if (!region.checkValid())
return false; return false;
if (!isPutValid(key)) if (!putValidator.isPutValid(key))
return false; return false;
region.ensureRegionRootExists(); region.ensureRegionRootExists();
@ -133,7 +131,7 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
if (!region.checkValid()) if (!region.checkValid())
return false; return false;
if (!isPutValid(key)) if (!putValidator.isPutValid(key))
return false; return false;
region.ensureRegionRootExists(); region.ensureRegionRootExists();
@ -145,7 +143,7 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
@Override @Override
public void remove(Object key) throws CacheException { public void remove(Object key) throws CacheException {
pendingPuts.remove(key); putValidator.keyRemoved(key);
// We remove whether or not the region is valid. Other nodes // We remove whether or not the region is valid. Other nodes
// may have already restored the region so they need to // may have already restored the region so they need to
@ -159,7 +157,7 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
@Override @Override
public void removeAll() throws CacheException { public void removeAll() throws CacheException {
pendingPuts.clear(); putValidator.regionRemoved();
Option opt = NonLockingDataVersion.getInvocationOption(); Option opt = NonLockingDataVersion.getInvocationOption();
CacheHelper.removeAll(cache, regionFqn, opt); CacheHelper.removeAll(cache, regionFqn, opt);
} }
@ -168,8 +166,6 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
public boolean update(Object key, Object value, Object currentVersion, Object previousVersion) public boolean update(Object key, Object value, Object currentVersion, Object previousVersion)
throws CacheException { throws CacheException {
pendingPuts.remove(key);
// We update whether or not the region is valid. Other nodes // We update whether or not the region is valid. Other nodes
// may have already restored the region so they need to // may have already restored the region so they need to
// be informed of the change. // be informed of the change.
@ -181,9 +177,10 @@ public class OptimisticTransactionalAccessDelegate extends TransactionalAccessDe
return true; return true;
} }
@SuppressWarnings("deprecation")
private Option getDataVersionOption(Object currentVersion, Object previousVersion) { private Option getDataVersionOption(Object currentVersion, Object previousVersion) {
DataVersion dv = (dataDescription != null && dataDescription.isVersioned()) ? new DataVersionAdapter( org.jboss.cache.optimistic.DataVersion dv = (dataDescription != null && dataDescription.isVersioned()) ? new DataVersionAdapter(
currentVersion, previousVersion, dataDescription.getVersionComparator(), dataDescription.toString()) currentVersion, previousVersion, dataDescription.getVersionComparator(), dataDescription.toString())
: NonLockingDataVersion.INSTANCE; : NonLockingDataVersion.INSTANCE;
Option opt = new Option(); Option opt = new Option();

View File

@ -0,0 +1,453 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* Copyright (c) 2009, Red Hat, Inc or third-party contributors as
* indicated by the @author tags or express copyright attribution
* statements applied by the authors.  All third-party contributions are
* distributed under license by Red Hat Middleware LLC.
*
* This copyrighted material is made available to anyone wishing to use, modify,
* copy, or redistribute it subject to the terms and conditions of the GNU
* Lesser General Public License, as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
* for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this distribution; if not, write to:
* Free Software Foundation, Inc.
* 51 Franklin Street, Fifth Floor
* Boston, MA 02110-1301 USA
*/
package org.hibernate.cache.jbc.access;
import java.lang.ref.WeakReference;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.hibernate.cache.CacheException;
/**
* Encapsulates logic to allow a {@link TransactionalAccessDelegate} to determine
* whether a {@link TransactionalAccessDelegate#putFromLoad(Object, Object, long, Object, boolean)
* call should be allowed to update the cache. A <code>putFromLoad</code> has
* the potential to store stale data, since the data may have been removed from the
* database and the cache between the time when the data was read from the database
* and the actual call to <code>putFromLoad</code>.
*
* @author Brian Stansberry
*
* @version $Revision: $
*/
public class PutFromLoadValidator
{
/**
* Period in ms after a removal during which a call to {@link #isPutValid(Object)}
* that hasn't been {@link #registerPendingPut(Object) pre-registered}
* (aka a "naked put") will return false.
*/
public static final long NAKED_PUT_INVALIDATION_PERIOD = 10 * 1000;
/** Period after which a pending put is placed in the over-age queue */
private static final long PENDING_PUT_OVERAGE_PERIOD = 5 * 1000;
/** Period before which we stop trying to clean out pending puts */
private static final long PENDING_PUT_RECENT_PERIOD = 2 * 1000;
/** Period after which a pending put is never expected to come in
* and should be cleaned */
private static final long MAX_PENDING_PUT_DELAY = 2 * 60 * 1000;
/** Used to determine whether the owner of a pending put is a thread or a transaction */
private final TransactionManager transactionManager;
private final long nakedPutInvalidationPeriod;
private final long pendingPutOveragePeriod;
private final long pendingPutRecentPeriod;
private final long maxPendingPutDelay;
/**
* Registry of expected, future, isPutValid calls. If a key+owner is registered
* in this map, it is not a "naked put" and is allowed to proceed.
*/
private final ConcurrentMap<Object, PendingPutMap> pendingPuts =
new ConcurrentHashMap<Object, PendingPutMap>();
/** List of pending puts. Used to ensure we don't leak memory via the pendingPuts map */
private final List<WeakReference<PendingPut>> pendingQueue = new LinkedList<WeakReference<PendingPut>>();
/** Separate list of pending puts that haven't been resolved within PENDING_PUT_OVERAGE_PERIOD.
* Used to ensure we don't leak memory via the pendingPuts map.
* Tracked separately from more recent pending puts for efficiency reasons. */
private final List<WeakReference<PendingPut>> overagePendingQueue = new LinkedList<WeakReference<PendingPut>>();
/** Lock controlling access to pending put queues */
private final Lock pendingLock = new ReentrantLock();
private final ConcurrentMap<Object, Long> recentRemovals = new ConcurrentHashMap<Object, Long>();
/** List of recent removals. Used to ensure we don't leak memory via the recentRemovals map */
private final List<RecentRemoval> removalsQueue = new LinkedList<RecentRemoval>();
/** The time when the first element in removalsQueue will expire. No reason to do
* housekeeping on the queue before this time. */
private volatile long earliestRemovalTimestamp;
/** Lock controlling access to removalsQueue */
private final Lock removalsLock = new ReentrantLock();
/**
* The time of the last call to regionRemoved(), plus NAKED_PUT_INVALIDATION_PERIOD.
* All naked puts will be rejected until the current time is greater than this value.
*/
private volatile long invalidationTimestamp;
/**
* Creates a new PutFromLoadValidator.
*
* @param transactionManager transaction manager to use to associated changes with a transaction;
* may be <code>null</code>
*/
public PutFromLoadValidator(TransactionManager transactionManager) {
this(transactionManager, NAKED_PUT_INVALIDATION_PERIOD, PENDING_PUT_OVERAGE_PERIOD, PENDING_PUT_RECENT_PERIOD, MAX_PENDING_PUT_DELAY);
}
/** Constructor variant for use by unit tests; allows control of variouts timeouts by the test. */
protected PutFromLoadValidator(TransactionManager transactionManager, long nakedPutInvalidationPeriod, long pendingPutOveragePeriod, long pendingPutRecentPeriod, long maxPendingPutDelay) {
this.transactionManager = transactionManager;
this.nakedPutInvalidationPeriod = nakedPutInvalidationPeriod;
this.pendingPutOveragePeriod = pendingPutOveragePeriod;
this.pendingPutRecentPeriod = pendingPutRecentPeriod;
this.maxPendingPutDelay = maxPendingPutDelay;
}
public boolean isPutValid(Object key)
{
boolean valid = false;
long now = System.currentTimeMillis();
PendingPutMap pending = pendingPuts.get(key);
if (pending != null) {
synchronized (pending) {
PendingPut toCancel = pending.remove(getOwnerForPut());
valid = toCancel != null;
if (valid) {
toCancel.completed = true;
if (pending.size() == 0) {
pendingPuts.remove(key);
}
}
}
}
if (!valid) {
if (now > invalidationTimestamp) {
Long removedTime = recentRemovals.get(key);
if (removedTime == null || now > removedTime.longValue()) {
valid = true;
}
}
}
cleanOutdatedPendingPuts(now, true);
return valid;
}
public void keyRemoved(Object key)
{
// Invalidate any pending puts
pendingPuts.remove(key);
// Record when this occurred to invalidate later naked puts
RecentRemoval removal = new RecentRemoval(key, this.nakedPutInvalidationPeriod);
recentRemovals.put(key, removal.timestamp);
// Don't let recentRemovals map become a memory leak
RecentRemoval toClean = null;
boolean attemptClean = removal.timestamp.longValue() > earliestRemovalTimestamp;
removalsLock.lock();
try {
removalsQueue.add(removal);
if (attemptClean) {
if (removalsQueue.size() > 1) { // we have at least one as we just added it
toClean = removalsQueue.remove(0);
}
earliestRemovalTimestamp = removalsQueue.get(0).timestamp.longValue();
}
}
finally {
removalsLock.unlock();
}
if (toClean != null) {
Long cleaned = recentRemovals.get(toClean.key);
if (cleaned != null && cleaned.equals(toClean.timestamp)) {
cleaned = recentRemovals.remove(toClean.key);
if (cleaned != null && cleaned.equals(toClean.timestamp) == false) {
// Oops; removed the wrong timestamp; restore it
recentRemovals.putIfAbsent(toClean.key, cleaned);
}
}
}
}
public void regionRemoved()
{
invalidationTimestamp = System.currentTimeMillis() + this.nakedPutInvalidationPeriod;
pendingLock.lock();
try {
removalsLock.lock();
try {
pendingPuts.clear();
pendingQueue.clear();
overagePendingQueue.clear();
recentRemovals.clear();
removalsQueue.clear();
earliestRemovalTimestamp = invalidationTimestamp;
}
finally {
removalsLock.unlock();
}
}
finally {
pendingLock.unlock();
}
}
/**
* Notifies this validator that it is expected that a database read followed by a
* subsequent {@link #isPutValid(Object)} call will occur. The intent is this method
* would be called following a cache miss wherein it is expected that a database
* read plus cache put will occur. Calling this method allows the validator to treat
* the subsequent <code>isPutValid</code> as if the database read occurred when
* this method was invoked. This allows the validator to compare the timestamp of
* this call against the timestamp of subsequent removal notifications. A put that
* occurs without this call preceding it is "naked"; i.e the validator must assume
* the put is not valid if any relevant removal has occurred within
* {@link #NAKED_PUT_INVALIDATION_PERIOD} milliseconds.
*
* @param key key that will be used for subsequent put
*/
public void registerPendingPut(Object key)
{
PendingPut pendingPut = new PendingPut(key, getOwnerForPut());
PendingPutMap pendingForKey = new PendingPutMap();
synchronized (pendingForKey) {
for (;;) {
PendingPutMap existing = pendingPuts.putIfAbsent(key, pendingForKey);
if (existing != null && existing != pendingForKey) {
synchronized (existing) {
existing.put(pendingPut);
PendingPutMap doublecheck = pendingPuts.putIfAbsent(key, existing);
if (doublecheck == null || doublecheck == existing) {
break;
}
// else we hit a race and need to loop to try again
}
}
else {
pendingForKey.put(pendingPut);
break;
}
}
}
// Guard against memory leaks
preventOutdatedPendingPuts(pendingPut);
}
private Object getOwnerForPut()
{
Transaction tx = null;
try {
if (transactionManager != null) {
tx = transactionManager.getTransaction();
}
} catch (SystemException se) {
throw new CacheException("Could not obtain transaction", se);
}
return tx == null ? Thread.currentThread() : tx;
}
private void preventOutdatedPendingPuts(PendingPut pendingPut)
{
pendingLock.lock();
try {
pendingQueue.add(new WeakReference<PendingPut>(pendingPut));
cleanOutdatedPendingPuts(pendingPut.timestamp, false);
}
finally {
pendingLock.unlock();
}
}
private void cleanOutdatedPendingPuts(long now, boolean lock)
{
PendingPut toClean = null;
if (lock) {
pendingLock.lock();
}
try {
// Clean items out of the basic queue
long overaged = now - this.pendingPutOveragePeriod;
long recent = now - this.pendingPutRecentPeriod;
int pos = 0;
while (pendingQueue.size() > pos) {
WeakReference<PendingPut> ref = pendingQueue.get(0);
PendingPut item = ref.get();
if (item == null || item.completed) {
pendingQueue.remove(pos);
}
else if (item.timestamp < overaged) {
// Potential leak; move to the overaged queued
pendingQueue.remove(pos);
overagePendingQueue.add(ref);
}
else if (item.timestamp >= recent) {
// Don't waste time on very recent items
break;
}
else if (pos > 2) {
// Don't spend too much time getting nowhere
break;
}
else {
// Move on to the next item
pos++;
}
}
// Process the overage queue until we find an item to clean
// or an incomplete item that hasn't aged out
long mustCleanTime = now - this.maxPendingPutDelay;
while (overagePendingQueue.size() > 0) {
WeakReference<PendingPut> ref = overagePendingQueue.get(0);
PendingPut item = ref.get();
if (item == null || item.completed) {
overagePendingQueue.remove(0);
}
else {
if (item.timestamp < mustCleanTime) {
toClean = item;
}
break;
}
}
}
finally {
if (lock) {
pendingLock.unlock();
}
}
// We've found a pendingPut that never happened; clean it up
if (toClean != null) {
PendingPutMap map = pendingPuts.get(toClean.key);
if (map != null) {
synchronized (map) {
PendingPut cleaned = map.remove(toClean.owner);
if (toClean.equals(cleaned) == false) {
// Oops. Restore it.
map.put(cleaned);
}
else if (map.size() == 0) {
pendingPuts.remove(toClean.key);
}
}
}
}
}
/**
* Lazy-initialization map for PendingPut. Optimized
* for the expected usual case where only a single put
* is pending for a given key.
*
* This class is NOT THREAD SAFE. All operations on it
* must be performed with the object monitor held.
*/
private static class PendingPutMap {
private PendingPut singlePendingPut;
private Map<Object, PendingPut> fullMap;
public void put(PendingPut pendingPut)
{
if (singlePendingPut == null) {
if (fullMap == null) {
// initial put
singlePendingPut = pendingPut;
}
else {
fullMap.put(pendingPut.owner, pendingPut);
}
}
else {
// 2nd put; need a map
fullMap = new HashMap<Object, PendingPut>(4);
fullMap.put(singlePendingPut.owner, singlePendingPut);
singlePendingPut = null;
fullMap.put(pendingPut.owner, pendingPut);
}
}
public PendingPut remove(Object ownerForPut)
{
PendingPut removed = null;
if (fullMap == null) {
if (singlePendingPut != null && singlePendingPut.owner.equals(ownerForPut)) {
removed = singlePendingPut;
singlePendingPut = null;
}
}
else {
removed = fullMap.remove(ownerForPut);
}
return removed;
}
public int size()
{
return fullMap == null ? (singlePendingPut == null ? 0 : 1) : fullMap.size();
}
}
private static class PendingPut {
private final Object key;
private final Object owner;
private final long timestamp = System.currentTimeMillis();
private volatile boolean completed;
private PendingPut(Object key, Object owner) {
this.key = key;
this.owner = owner;
}
}
private static class RecentRemoval {
private final Object key;
private final Long timestamp;
private RecentRemoval(Object key, long nakedPutInvalidationPeriod) {
this.key = key;
timestamp = Long.valueOf(System.currentTimeMillis() + nakedPutInvalidationPeriod);
}
}
}

View File

@ -23,11 +23,6 @@
*/ */
package org.hibernate.cache.jbc.access; package org.hibernate.cache.jbc.access;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.transaction.Transaction; import javax.transaction.Transaction;
import org.hibernate.cache.CacheException; import org.hibernate.cache.CacheException;
@ -55,13 +50,13 @@ public class TransactionalAccessDelegate {
protected final Cache cache; protected final Cache cache;
protected final Fqn regionFqn; protected final Fqn regionFqn;
protected final BasicRegionAdapter region; protected final BasicRegionAdapter region;
protected final ConcurrentMap<Object, Set<Object>> pendingPuts = protected final PutFromLoadValidator putValidator;
new ConcurrentHashMap<Object, Set<Object>>();
public TransactionalAccessDelegate(BasicRegionAdapter adapter) { public TransactionalAccessDelegate(BasicRegionAdapter adapter, PutFromLoadValidator validator) {
this.region = adapter; this.region = adapter;
this.cache = adapter.getCacheInstance(); this.cache = adapter.getCacheInstance();
this.regionFqn = adapter.getRegionFqn(); this.regionFqn = adapter.getRegionFqn();
this.putValidator = validator;
} }
public Object get(Object key, long txTimestamp) throws CacheException { public Object get(Object key, long txTimestamp) throws CacheException {
@ -74,7 +69,7 @@ public class TransactionalAccessDelegate {
Object val = CacheHelper.get(cache, regionFqn, key); Object val = CacheHelper.get(cache, regionFqn, key);
if (val == null) { if (val == null) {
registerPendingPut(key); putValidator.registerPendingPut(key);
} }
return val; return val;
@ -85,7 +80,7 @@ public class TransactionalAccessDelegate {
if (!region.checkValid()) if (!region.checkValid())
return false; return false;
if (!isPutValid(key)) if (!putValidator.isPutValid(key))
return false; return false;
region.ensureRegionRootExists(); region.ensureRegionRootExists();
@ -99,7 +94,7 @@ public class TransactionalAccessDelegate {
if (!region.checkValid()) if (!region.checkValid())
return false; return false;
if (!isPutValid(key)) if (!putValidator.isPutValid(key))
return false; return false;
region.ensureRegionRootExists(); region.ensureRegionRootExists();
@ -126,8 +121,6 @@ public class TransactionalAccessDelegate {
public boolean insert(Object key, Object value, Object version) throws CacheException { public boolean insert(Object key, Object value, Object version) throws CacheException {
pendingPuts.remove(key);
if (!region.checkValid()) if (!region.checkValid())
return false; return false;
@ -144,8 +137,6 @@ public class TransactionalAccessDelegate {
public boolean update(Object key, Object value, Object currentVersion, Object previousVersion) public boolean update(Object key, Object value, Object currentVersion, Object previousVersion)
throws CacheException { throws CacheException {
pendingPuts.remove(key);
// We update whether or not the region is valid. Other nodes // We update whether or not the region is valid. Other nodes
// may have already restored the region so they need to // may have already restored the region so they need to
// be informed of the change. // be informed of the change.
@ -163,7 +154,7 @@ public class TransactionalAccessDelegate {
public void remove(Object key) throws CacheException { public void remove(Object key) throws CacheException {
pendingPuts.remove(key); putValidator.keyRemoved(key);
// We remove whether or not the region is valid. Other nodes // We remove whether or not the region is valid. Other nodes
// may have already restored the region so they need to // may have already restored the region so they need to
@ -175,13 +166,13 @@ public class TransactionalAccessDelegate {
} }
public void removeAll() throws CacheException { public void removeAll() throws CacheException {
pendingPuts.clear(); putValidator.regionRemoved();
CacheHelper.removeAll(cache, regionFqn); CacheHelper.removeAll(cache, regionFqn);
} }
public void evict(Object key) throws CacheException { public void evict(Object key) throws CacheException {
pendingPuts.remove(key); putValidator.keyRemoved(key);
region.ensureRegionRootExists(); region.ensureRegionRootExists();
@ -189,7 +180,7 @@ public class TransactionalAccessDelegate {
} }
public void evictAll() throws CacheException { public void evictAll() throws CacheException {
pendingPuts.clear(); putValidator.regionRemoved();
Transaction tx = region.suspend(); Transaction tx = region.suspend();
try { try {
region.ensureRegionRootExists(); region.ensureRegionRootExists();
@ -200,37 +191,4 @@ public class TransactionalAccessDelegate {
region.resume(tx); region.resume(tx);
} }
} }
protected void registerPendingPut(Object key)
{
Set<Object> pending = pendingPuts.get(key);
if (pending == null) {
pending = new HashSet<Object>();
}
synchronized (pending) {
Object owner = region.getOwnerForPut();
pending.add(owner);
Set<Object> existing = pendingPuts.putIfAbsent(key, pending);
if (existing != pending) {
// try again
registerPendingPut(key);
}
}
}
protected boolean isPutValid(Object key)
{
boolean valid = false;
Set<Object> pending = pendingPuts.get(key);
if (pending != null) {
synchronized (pending) {
valid = pending.remove(region.getOwnerForPut());
if (valid && pending.size() == 0) {
pendingPuts.remove(key);
}
}
}
return valid;
}
} }

View File

@ -34,6 +34,7 @@ import org.hibernate.cache.CollectionRegion;
import org.hibernate.cache.access.AccessType; import org.hibernate.cache.access.AccessType;
import org.hibernate.cache.access.CollectionRegionAccessStrategy; import org.hibernate.cache.access.CollectionRegionAccessStrategy;
import org.hibernate.cache.jbc.TransactionalDataRegionAdapter; import org.hibernate.cache.jbc.TransactionalDataRegionAdapter;
import org.hibernate.cache.jbc.access.PutFromLoadValidator;
/** /**
* Defines the behavior of the collection cache regions for JBossCache 2.x. * Defines the behavior of the collection cache regions for JBossCache 2.x.
@ -68,4 +69,8 @@ public class CollectionRegionImpl extends TransactionalDataRegionAdapter impleme
protected Fqn<String> createRegionFqn(String regionName, String regionPrefix) { protected Fqn<String> createRegionFqn(String regionName, String regionPrefix) {
return getTypeLastRegionFqn(regionName, regionPrefix, TYPE); return getTypeLastRegionFqn(regionName, regionPrefix, TYPE);
} }
public PutFromLoadValidator getPutFromLoadValidator() {
return new PutFromLoadValidator(transactionManager);
}
} }

View File

@ -42,7 +42,7 @@ public class OptimisticTransactionalAccess extends TransactionalAccess {
public OptimisticTransactionalAccess(CollectionRegionImpl region) { public OptimisticTransactionalAccess(CollectionRegionImpl region) {
// We use a different delegate than the non-optimistic superclass default // We use a different delegate than the non-optimistic superclass default
super(region, new OptimisticTransactionalAccessDelegate(region)); super(region, new OptimisticTransactionalAccessDelegate(region, region.getPutFromLoadValidator()));
} }
} }

View File

@ -52,7 +52,7 @@ public class TransactionalAccess implements CollectionRegionAccessStrategy {
* @param region the region to which this provides access * @param region the region to which this provides access
*/ */
public TransactionalAccess(CollectionRegionImpl region) { public TransactionalAccess(CollectionRegionImpl region) {
this(region, new TransactionalAccessDelegate(region)); this(region, new TransactionalAccessDelegate(region, region.getPutFromLoadValidator()));
} }
/** /**

View File

@ -34,6 +34,7 @@ import org.hibernate.cache.EntityRegion;
import org.hibernate.cache.access.AccessType; import org.hibernate.cache.access.AccessType;
import org.hibernate.cache.access.EntityRegionAccessStrategy; import org.hibernate.cache.access.EntityRegionAccessStrategy;
import org.hibernate.cache.jbc.TransactionalDataRegionAdapter; import org.hibernate.cache.jbc.TransactionalDataRegionAdapter;
import org.hibernate.cache.jbc.access.PutFromLoadValidator;
/** /**
* Defines the behavior of the entity cache regions for JBossCache. * Defines the behavior of the entity cache regions for JBossCache.
@ -73,4 +74,8 @@ public class EntityRegionImpl extends TransactionalDataRegionAdapter implements
return getTypeLastRegionFqn(regionName, regionPrefix, TYPE); return getTypeLastRegionFqn(regionName, regionPrefix, TYPE);
} }
public PutFromLoadValidator getPutFromLoadValidator() {
return new PutFromLoadValidator(transactionManager);
}
} }

View File

@ -41,6 +41,6 @@ public class OptimisticTransactionalAccess extends TransactionalAccess {
* @param region The region\ to which this is providing access * @param region The region\ to which this is providing access
*/ */
public OptimisticTransactionalAccess(EntityRegionImpl region) { public OptimisticTransactionalAccess(EntityRegionImpl region) {
super(region, new OptimisticTransactionalAccessDelegate(region)); super(region, new OptimisticTransactionalAccessDelegate(region, region.getPutFromLoadValidator()));
} }
} }

View File

@ -46,7 +46,7 @@ public class TransactionalAccess implements EntityRegionAccessStrategy {
private final TransactionalAccessDelegate delegate; private final TransactionalAccessDelegate delegate;
public TransactionalAccess(EntityRegionImpl region) { public TransactionalAccess(EntityRegionImpl region) {
this(region, new TransactionalAccessDelegate(region)); this(region, new TransactionalAccessDelegate(region, region.getPutFromLoadValidator()));
} }
protected TransactionalAccess(EntityRegionImpl region, TransactionalAccessDelegate delegate) { protected TransactionalAccess(EntityRegionImpl region, TransactionalAccessDelegate delegate) {

View File

@ -0,0 +1,246 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* Copyright (c) 2009, Red Hat, Inc or third-party contributors as
* indicated by the @author tags or express copyright attribution
* statements applied by the authors.  All third-party contributions are
* distributed under license by Red Hat Middleware LLC.
*
* This copyrighted material is made available to anyone wishing to use, modify,
* copy, or redistribute it subject to the terms and conditions of the GNU
* Lesser General Public License, as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License
* for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this distribution; if not, write to:
* Free Software Foundation, Inc.
* 51 Franklin Street, Fifth Floor
* Boston, MA 02110-1301 USA
*/
package org.hibernate.test.cache.jbc.access;
import javax.transaction.TransactionManager;
import org.hibernate.cache.jbc.access.PutFromLoadValidator;
import org.hibernate.test.cache.jbc.functional.util.DualNodeJtaTransactionManagerImpl;
import junit.framework.TestCase;
/**
* Tests of {@link PutFromLoadValidator}.
*
* @author Brian Stansberry
*
* @version $Revision: $
*/
public class PutFromLoadValidatorUnitTestCase extends TestCase
{
private Object KEY1= "KEY1";
private TransactionManager tm;
public PutFromLoadValidatorUnitTestCase(String name) {
super(name);
}
@Override
protected void setUp() throws Exception
{
super.setUp();
tm = DualNodeJtaTransactionManagerImpl.getInstance("test");
}
@Override
protected void tearDown() throws Exception
{
try {
super.tearDown();
}
finally {
tm = null;
try {
DualNodeJtaTransactionManagerImpl.cleanupTransactions();
}
finally {
DualNodeJtaTransactionManagerImpl.cleanupTransactionManagers();
}
}
}
public void testNakedPut() throws Exception {
nakedPutTest(false);
}
public void testNakedPutTransactional() throws Exception {
nakedPutTest(true);
}
private void nakedPutTest(boolean transactional) throws Exception {
PutFromLoadValidator testee = new PutFromLoadValidator(transactional ? tm : null);
if (transactional) {
tm.begin();
}
assertTrue(testee.isPutValid(KEY1));
}
public void testRegisteredPut() throws Exception {
registeredPutTest(false);
}
public void testRegisteredPutTransactional() throws Exception {
registeredPutTest(true);
}
private void registeredPutTest(boolean transactional) throws Exception {
PutFromLoadValidator testee = new PutFromLoadValidator(transactional ? tm : null);
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
assertTrue(testee.isPutValid(KEY1));
}
public void testNakedPutAfterKeyRemoval() throws Exception {
nakedPutAfterRemovalTest(false, false);
}
public void testNakedPutAfterKeyRemovalTransactional() throws Exception {
nakedPutAfterRemovalTest(true, false);
}
public void testNakedPutAfterRegionRemoval() throws Exception {
nakedPutAfterRemovalTest(false, true);
}
public void testNakedPutAfterRegionRemovalTransactional() throws Exception {
nakedPutAfterRemovalTest(true, true);
}
private void nakedPutAfterRemovalTest(boolean transactional, boolean removeRegion) throws Exception
{
PutFromLoadValidator testee = new PutFromLoadValidator(transactional ? tm : null);
if (removeRegion) {
testee.regionRemoved();
}
else {
testee.keyRemoved(KEY1);
}
if (transactional) {
tm.begin();
}
assertFalse(testee.isPutValid(KEY1));
}
public void testRegisteredPutAfterKeyRemoval() throws Exception {
registeredPutAfterRemovalTest(false, false);
}
public void testRegisteredPutAfterKeyRemovalTransactional() throws Exception {
registeredPutAfterRemovalTest(true, false);
}
public void testRegisteredPutAfterRegionRemoval() throws Exception {
registeredPutAfterRemovalTest(false, true);
}
public void testRegisteredPutAfterRegionRemovalTransactional() throws Exception {
registeredPutAfterRemovalTest(true, true);
}
private void registeredPutAfterRemovalTest(boolean transactional, boolean removeRegion) throws Exception
{
PutFromLoadValidator testee = new PutFromLoadValidator(transactional ? tm : null);
if (removeRegion) {
testee.regionRemoved();
}
else {
testee.keyRemoved(KEY1);
}
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
assertTrue(testee.isPutValid(KEY1));
}
public void testRegisteredPutWithInterveningKeyRemoval() throws Exception {
registeredPutWithInterveningRemovalTest(false, false);
}
public void testRegisteredPutWithInterveningKeyRemovalTransactional() throws Exception {
registeredPutWithInterveningRemovalTest(true, false);
}
public void testRegisteredPutWithInterveningRegionRemoval() throws Exception {
registeredPutWithInterveningRemovalTest(false, true);
}
public void testRegisteredPutWithInterveningRegionRemovalTransactional() throws Exception {
registeredPutWithInterveningRemovalTest(true, true);
}
private void registeredPutWithInterveningRemovalTest(boolean transactional, boolean removeRegion) throws Exception
{
PutFromLoadValidator testee = new PutFromLoadValidator(transactional ? tm : null);
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
if (removeRegion) {
testee.regionRemoved();
}
else {
testee.keyRemoved(KEY1);
}
assertFalse(testee.isPutValid(KEY1));
}
public void testDelayedNakedPutAfterKeyRemoval() throws Exception {
delayedNakedPutAfterRemovalTest(false, false);
}
public void testDelayedNakedPutAfterKeyRemovalTransactional() throws Exception {
delayedNakedPutAfterRemovalTest(true, false);
}
public void testDelayedNakedPutAfterRegionRemoval() throws Exception {
delayedNakedPutAfterRemovalTest(false, true);
}
public void testDelayedNakedPutAfterRegionRemovalTransactional() throws Exception {
delayedNakedPutAfterRemovalTest(true, true);
}
private void delayedNakedPutAfterRemovalTest(boolean transactional, boolean removeRegion) throws Exception
{
PutFromLoadValidator testee = new TestValidator(transactional ? tm : null, 100, 1000, 500, 10000);
if (removeRegion) {
testee.regionRemoved();
}
else {
testee.keyRemoved(KEY1);
}
if (transactional) {
tm.begin();
}
Thread.sleep(110);
assertTrue(testee.isPutValid(KEY1));
}
private static class TestValidator extends PutFromLoadValidator {
protected TestValidator(TransactionManager transactionManager, long nakedPutInvalidationPeriod,
long pendingPutOveragePeriod, long pendingPutRecentPeriod, long maxPendingPutDelay)
{
super(transactionManager, nakedPutInvalidationPeriod, pendingPutOveragePeriod, pendingPutRecentPeriod,
maxPendingPutDelay);
}
}
}