HHH-9881 Pending put needs to be invalidated on update on remote node

* This could lead to performance degradation since new EndInvalidatingCommand
  needs to be send after transaction is committed
This commit is contained in:
Radim Vansa 2015-08-04 11:06:41 +02:00 committed by Galder Zamarreño
parent 4b2a78785e
commit fa7265ff0e
16 changed files with 1043 additions and 358 deletions

View File

@ -214,6 +214,20 @@ public class InfinispanRegionFactory implements RegionFactory {
*/
public static final String PENDING_PUTS_CACHE_NAME = "pending-puts";
/**
* A local, lightweight cache for pending puts, which is
* non-transactional and has aggressive expiration settings.
* Locking is still required since the putFromLoad validator
* code uses conditional operations (i.e. putIfAbsent)
*/
public static final Configuration PENDING_PUTS_CACHE_CONFIGURATION = new ConfigurationBuilder()
.clustering().cacheMode(CacheMode.LOCAL)
.transaction().transactionMode(TransactionMode.NON_TRANSACTIONAL)
.expiration().maxIdle(TimeUnit.SECONDS.toMillis(60))
.storeAsBinary().enabled(false)
.locking().isolationLevel(IsolationLevel.READ_COMMITTED)
.jmxStatistics().disable().build();
private EmbeddedCacheManager manager;
private final Map<String, TypeOverrides> typeOverrides = new HashMap<String, TypeOverrides>();
@ -345,7 +359,7 @@ public class InfinispanRegionFactory implements RegionFactory {
@Override
public long nextTimestamp() {
return System.currentTimeMillis() / 100;
return System.currentTimeMillis();
}
public void setCacheManager(EmbeddedCacheManager manager) {
@ -374,7 +388,7 @@ public class InfinispanRegionFactory implements RegionFactory {
}
}
defineGenericDataTypeCacheConfigurations( properties );
definePendingPutsCache();
manager.defineConfiguration( PENDING_PUTS_CACHE_NAME, PENDING_PUTS_CACHE_CONFIGURATION );
}
catch (CacheException ce) {
throw ce;
@ -384,22 +398,6 @@ public class InfinispanRegionFactory implements RegionFactory {
}
}
private void definePendingPutsCache() {
final ConfigurationBuilder builder = new ConfigurationBuilder();
// A local, lightweight cache for pending puts, which is
// non-transactional and has aggressive expiration settings.
// Locking is still required since the putFromLoad validator
// code uses conditional operations (i.e. putIfAbsent).
builder.clustering().cacheMode( CacheMode.LOCAL )
.transaction().transactionMode( TransactionMode.NON_TRANSACTIONAL )
.expiration().maxIdle( TimeUnit.SECONDS.toMillis( 60 ) )
.storeAsBinary().enabled( false )
.locking().isolationLevel( IsolationLevel.READ_COMMITTED )
.jmxStatistics().disable();
manager.defineConfiguration( PENDING_PUTS_CACHE_NAME, builder.build() );
}
protected org.infinispan.transaction.lookup.TransactionManagerLookup createTransactionManagerLookup(
SessionFactoryOptions settings, Properties properties) {
return new HibernateTransactionManagerLookup( settings, properties );

View File

@ -6,25 +6,42 @@
*/
package org.hibernate.cache.infinispan.access;
import javax.transaction.Status;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.ReentrantLock;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.util.CacheCommandInitializer;
import org.hibernate.cache.infinispan.util.EndInvalidationCommand;
import org.hibernate.cache.spi.RegionFactory;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.WriteCommand;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.interceptors.EntryWrappingInterceptor;
import org.infinispan.interceptors.base.BaseRpcInterceptor;
import org.infinispan.interceptors.base.CommandInterceptor;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.util.concurrent.ConcurrentHashSet;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
/**
* Encapsulates logic to allow a {@link TransactionalAccessDelegate} to determine
@ -38,9 +55,9 @@ import org.infinispan.manager.EmbeddedCacheManager;
* not find data is:
* <p/>
* <ol>
* <li> Call {@link #registerPendingPut(Object)}</li>
* <li> Call {@link #registerPendingPut(Object, long)}</li>
* <li> Read the database</li>
* <li> Call {@link #acquirePutFromLoadLock(Object)}
* <li> Call {@link #acquirePutFromLoadLock(Object, long)}
* <li> if above returns <code>null</code>, the thread should not cache the data;
* only if above returns instance of <code>AcquiredLock</code>, put data in the cache and...</li>
* <li> then call {@link #releasePutFromLoadLock(Object, Lock)}</li>
@ -54,7 +71,8 @@ import org.infinispan.manager.EmbeddedCacheManager;
* <p/>
* <ul>
* <li> {@link #beginInvalidatingKey(Object)} (for a single key invalidation)</li>
* <li>or {@link #invalidateRegion()} (for a general invalidation all pending puts)</li>
* <li>or {@link #beginInvalidatingRegion()} followed by {@link #endInvalidatingRegion()}
* (for a general invalidation all pending puts)</li>
* </ul>
* After transaction commit (when the DB is updated) {@link #endInvalidatingKey(Object)} should
* be called in order to allow further attempts to cache entry.
@ -62,30 +80,31 @@ import org.infinispan.manager.EmbeddedCacheManager;
* <p/>
* <p>
* This class also supports the concept of "naked puts", which are calls to
* {@link #acquirePutFromLoadLock(Object)} without a preceding {@link #registerPendingPut(Object)}.
* Besides not acquiring lock in {@link #registerPendingPut(Object)} this can happen when collection
* {@link #acquirePutFromLoadLock(Object, long)} without a preceding {@link #registerPendingPut(Object, long)}.
* Besides not acquiring lock in {@link #registerPendingPut(Object, long)} this can happen when collection
* elements are loaded after the collection has not been found in the cache, where the elements
* don't have their own table but can be listed as 'select ... from Element where collection_id = ...'.
* Naked puts are handled according to txTimestamp obtained by calling {@link RegionFactory#nextTimestamp()}
* before the transaction is started. The timestamp is compared with timestamp of last invalidation end time
* and the write to the cache is denied if it is lower or equal.
* </p>
*
* @author Brian Stansberry
* @version $Revision: $
*/
public class PutFromLoadValidator {
/**
* Period (in ms) after a removal during which a call to
* {@link #acquirePutFromLoadLock(Object)} that hasn't been
* {@link #registerPendingPut(Object) pre-registered} (aka a "naked put")
* will return false.
*/
public static final long NAKED_PUT_INVALIDATION_PERIOD = TimeUnit.SECONDS.toMillis( 20 );
private static final Log log = LogFactory.getLog(PutFromLoadValidator.class);
private static final boolean trace = log.isTraceEnabled();
/**
* Used to determine whether the owner of a pending put is a thread or a transaction
*/
private final TransactionManager transactionManager;
private final long nakedPutInvalidationPeriod;
/**
* Period after which ongoing invalidation is removed. Value is retrieved from cache configuration.
*/
private final long expirationPeriod;
/**
* Registry of expected, future, isPutValid calls. If a key+owner is registered in this map, it
@ -94,14 +113,26 @@ public class PutFromLoadValidator {
private final ConcurrentMap<Object, PendingPutMap> pendingPuts;
/**
* The time of the last call to {@link #invalidateRegion()}, plus NAKED_PUT_INVALIDATION_PERIOD. All naked
* puts will be rejected until the current time is greater than this value.
* NOTE: update only through {@link #invalidationUpdater}!
* Main cache where the entities/collections are stored. This is not modified from within this class.
*/
private volatile long invalidationTimestamp = Long.MIN_VALUE;
private final AdvancedCache cache;
/**
* The time of the last call to {@link #endInvalidatingRegion()}. Puts from transactions started after
* this timestamp are denied.
*/
private volatile long regionInvalidationTimestamp = Long.MIN_VALUE;
/**
* Number of ongoing concurrent invalidations.
*/
private int regionInvalidations = 0;
/**
* Transactions that invalidate the region. Entries are removed during next invalidation based on transaction status.
*/
private final ConcurrentHashSet<Transaction> regionInvalidators = new ConcurrentHashSet<Transaction>();
private static final AtomicLongFieldUpdater<PutFromLoadValidator> invalidationUpdater
= AtomicLongFieldUpdater.newUpdater(PutFromLoadValidator.class, "invalidationTimestamp");
/**
* Creates a new put from load validator instance.
@ -110,23 +141,7 @@ public class PutFromLoadValidator {
* @param transactionManager Transaction manager
*/
public PutFromLoadValidator(AdvancedCache cache, TransactionManager transactionManager) {
this( cache, transactionManager, NAKED_PUT_INVALIDATION_PERIOD );
}
/**
* Constructor variant for use by unit tests; allows control of various timeouts by the test.
*
* @param cache Cache instance on which to store pending put information.
* @param transactionManager Transaction manager
* @param nakedPutInvalidationPeriod Period (in ms) after a removal during which a call to
* {@link #acquirePutFromLoadLock(Object)} that hasn't been
* {@link #registerPendingPut(Object) pre-registered} (aka a "naked put")
* will return false.
*/
public PutFromLoadValidator(
AdvancedCache cache, TransactionManager transactionManager,
long nakedPutInvalidationPeriod) {
this(cache, cache.getCacheManager(), transactionManager, nakedPutInvalidationPeriod);
this( cache, cache.getCacheManager(), transactionManager);
}
/**
@ -135,37 +150,68 @@ public class PutFromLoadValidator {
* @param cache Cache instance on which to store pending put information.
* @param cacheManager where to find a cache to store pending put information
* @param tm transaction manager
* @param nakedPutInvalidationPeriod Period (in ms) after a removal during which a call to
* {@link #acquirePutFromLoadLock(Object)} that hasn't been
* {@link #registerPendingPut(Object) pre-registered} (aka a "naked put")
* will return false.
*/
public PutFromLoadValidator(AdvancedCache cache,
EmbeddedCacheManager cacheManager,
TransactionManager tm, long nakedPutInvalidationPeriod) {
EmbeddedCacheManager cacheManager, TransactionManager tm) {
Configuration cacheConfiguration = cache.getCacheConfiguration();
Configuration pendingPutsConfiguration = cacheManager.getCacheConfiguration(InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME);
ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
if (pendingPutsConfiguration != null) {
configurationBuilder.read(pendingPutsConfiguration);
}
configurationBuilder.dataContainer().keyEquivalence(cacheConfiguration.dataContainer().keyEquivalence());
String pendingPutsName = cache.getName() + "-" + InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME;
cacheManager.defineConfiguration(pendingPutsName, configurationBuilder.build());
if (pendingPutsConfiguration.expiration() != null && pendingPutsConfiguration.expiration().maxIdle() > 0) {
this.expirationPeriod = pendingPutsConfiguration.expiration().maxIdle();
}
else {
throw new IllegalArgumentException("Pending puts cache needs to have maxIdle expiration set!");
}
// Since we need to intercept both invalidations of entries that are in the cache and those
// that are not, we need to use custom interceptor, not listeners (which fire only for present entries).
if (cacheConfiguration.clustering().cacheMode().isClustered()) {
RpcManager rpcManager = cache.getComponentRegistry().getComponent(RpcManager.class);
CacheCommandInitializer cacheCommandInitializer = cache.getComponentRegistry().getComponent(CacheCommandInitializer.class);
// Note that invalidation does *NOT* acquire locks; therefore, we have to start invalidating before
// wrapping the entry, since if putFromLoad was invoked between wrap and beginInvalidatingKey, the invalidation
// would not commit the entry removal (as during wrap the entry was not in cache)
cache.addInterceptorBefore(new PutFromLoadInterceptor(cache.getName(), rpcManager, cacheCommandInitializer), EntryWrappingInterceptor.class);
cacheCommandInitializer.addPutFromLoadValidator(cache.getName(), this);
}
this.cache = cache;
this.pendingPuts = cacheManager.getCache(pendingPutsName);
this.transactionManager = tm;
this.nakedPutInvalidationPeriod = nakedPutInvalidationPeriod;
}
/**
* This methods should be called only from tests; it removes existing validator from the cache structures
* in order to replace it with new one.
*
* @param cache
*/
public static void removeFromCache(AdvancedCache cache) {
List<CommandInterceptor> interceptorChain = cache.getInterceptorChain();
int index = 0;
for (; index < interceptorChain.size(); ++index) {
if (interceptorChain.get(index).getClass().getName().startsWith(PutFromLoadValidator.class.getName())) {
cache.removeInterceptor(index);
break;
}
}
CacheCommandInitializer cci = cache.getComponentRegistry().getComponent(CacheCommandInitializer.class);
cci.removePutFromLoadValidator(cache.getName());
}
// ----------------------------------------------------------------- Public
/**
* Marker for lock acquired in {@link #acquirePutFromLoadLock(Object)}
* Marker for lock acquired in {@link #acquirePutFromLoadLock(Object, long)}
*/
public static class Lock {
protected Lock() {}
public static abstract class Lock {
private Lock() {}
}
/**
@ -178,13 +224,16 @@ public class PutFromLoadValidator {
*
* @param key the key
*
* @param txTimestamp
* @return <code>AcquiredLock</code> if the lock is acquired and the cache put
* can proceed; <code>null</code> if the data should not be cached
*/
public Lock acquirePutFromLoadLock(Object key) {
public Lock acquirePutFromLoadLock(Object key, long txTimestamp) {
if (trace) {
log.tracef("acquirePutFromLoadLock(%s#%s, %d)", cache.getName(), key, txTimestamp);
}
boolean valid = false;
boolean locked = false;
long now = Long.MIN_VALUE;
PendingPutMap pending = pendingPuts.get( key );
for (;;) {
@ -197,40 +246,43 @@ public class PutFromLoadValidator {
if (toCancel != null) {
valid = !toCancel.completed;
toCancel.completed = true;
} else {
}
else {
// this is a naked put
if (pending.hasInvalidator()) {
valid = false;
} else {
if (now == Long.MIN_VALUE) {
now = System.currentTimeMillis();
}
valid = now > pending.nakedPutsDeadline;
else {
// if this transaction started after last invalidation we can continue
valid = txTimestamp > pending.lastInvalidationEnd;
}
}
return valid ? pending : null;
} finally {
}
finally {
if (!valid) {
pending.releaseLock();
locked = false;
}
if (trace) {
log.tracef("acquirePutFromLoadLock(%s#%s, %d) ended with %s", cache.getName(), key, txTimestamp, pending);
}
}
}
else {
if (trace) {
log.tracef("acquirePutFromLoadLock(%s#%s, %d) failed to lock", cache.getName(), key, txTimestamp);
}
} else {
// oops, we have leaked record for this owner, but we don't want to wait here
return null;
}
} else {
// Key wasn't in pendingPuts, so either this is a "naked put"
// or regionRemoved has been called. Check if we can proceed
long invalidationTimestamp = this.invalidationTimestamp;
if (invalidationTimestamp != Long.MIN_VALUE) {
now = System.currentTimeMillis();
if (now > invalidationTimestamp) {
// time is +- monotonic se don't let other threads do the expensive currentTimeMillis()
invalidationUpdater.compareAndSet(this, invalidationTimestamp, Long.MIN_VALUE);
} else {
return null;
}
else {
if (txTimestamp <= regionInvalidationTimestamp) {
if (trace) {
log.tracef("acquirePutFromLoadLock(%s#%s, %d) failed due to invalidated region", cache.getName(), key, txTimestamp);
}
return null;
}
PendingPut pendingPut = new PendingPut(getOwnerForPut());
@ -241,16 +293,19 @@ public class PutFromLoadValidator {
}
// continue in next loop with lock acquisition
}
} catch (Throwable t) {
}
catch (Throwable t) {
if (locked) {
pending.releaseLock();
}
if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else if (t instanceof Error) {
}
else if (t instanceof Error) {
throw (Error) t;
} else {
}
else {
throw new RuntimeException(t);
}
}
@ -259,11 +314,14 @@ public class PutFromLoadValidator {
/**
* Releases the lock previously obtained by a call to
* {@link #acquirePutFromLoadLock(Object)} that returned <code>true</code>.
* {@link #acquirePutFromLoadLock(Object, long)}.
*
* @param key the key
*/
public void releasePutFromLoadLock(Object key, Lock lock) {
if (trace) {
log.tracef("releasePutFromLoadLock(%s#%s, %s)", cache.getName(), key, lock);
}
final PendingPutMap pending = (PendingPutMap) lock;
if ( pending != null ) {
if ( pending.canRemove() ) {
@ -274,29 +332,65 @@ public class PutFromLoadValidator {
}
/**
* Invalidates all {@link #registerPendingPut(Object) previously registered pending puts} ensuring a subsequent call to
* {@link #acquirePutFromLoadLock(Object)} will return <code>false</code>. <p> This method will block until any
* concurrent thread that has {@link #acquirePutFromLoadLock(Object) acquired the putFromLoad lock} for the any key has
* Invalidates all {@link #registerPendingPut(Object, long) previously registered pending puts} ensuring a subsequent call to
* {@link #acquirePutFromLoadLock(Object, long)} will return <code>false</code>. <p> This method will block until any
* concurrent thread that has {@link #acquirePutFromLoadLock(Object, long) acquired the putFromLoad lock} for the any key has
* released the lock. This allows the caller to be certain the putFromLoad will not execute after this method returns,
* possibly caching stale data. </p>
*
* @return <code>true</code> if the invalidation was successful; <code>false</code> if a problem occured (which the
* caller should treat as an exception condition)
*/
public boolean invalidateRegion() {
// TODO: not sure what happens with locks acquired *after* calling this method but before
// the actual invalidation
public boolean beginInvalidatingRegion() {
if (trace) {
log.trace("Started invalidating region " + cache.getName());
}
boolean ok = true;
invalidationUpdater.set(this, System.currentTimeMillis() + nakedPutInvalidationPeriod);
long now = System.currentTimeMillis();
// deny all puts until endInvalidatingRegion is called; at that time the region should be already
// in INVALID state, therefore all new requests should be blocked and ongoing should fail by timestamp
synchronized (this) {
regionInvalidationTimestamp = Long.MAX_VALUE;
regionInvalidations++;
}
if (transactionManager != null) {
// cleanup old transactions
for (Iterator<Transaction> it = regionInvalidators.iterator(); it.hasNext(); ) {
Transaction tx = it.next();
try {
switch (tx.getStatus()) {
case Status.STATUS_COMMITTED:
case Status.STATUS_ROLLEDBACK:
case Status.STATUS_UNKNOWN:
case Status.STATUS_NO_TRANSACTION:
it.remove();
}
}
catch (SystemException e) {
log.error("Cannot retrieve transaction status", e);
}
}
// add this transaction
try {
Transaction tx = transactionManager.getTransaction();
if (tx != null) {
regionInvalidators.add(tx);
}
}
catch (SystemException e) {
log.error("TransactionManager failed to provide transaction", e);
return false;
}
}
try {
// Acquire the lock for each entry to ensure any ongoing
// work associated with it is completed before we return
for ( Iterator<PendingPutMap> it = pendingPuts.values().iterator(); it.hasNext(); ) {
for (Iterator<PendingPutMap> it = pendingPuts.values().iterator(); it.hasNext(); ) {
PendingPutMap entry = it.next();
if ( entry.acquireLock( 60, TimeUnit.SECONDS ) ) {
if (entry.acquireLock(60, TimeUnit.SECONDS)) {
try {
entry.invalidate();
entry.invalidate(now, expirationPeriod);
}
finally {
entry.releaseLock();
@ -307,24 +401,66 @@ public class PutFromLoadValidator {
ok = false;
}
}
} catch (Exception e) {
}
catch (Exception e) {
ok = false;
}
return ok;
}
/**
* Called when the region invalidation is finished.
*/
public void endInvalidatingRegion() {
synchronized (this) {
if (--regionInvalidations == 0) {
regionInvalidationTimestamp = System.currentTimeMillis();
}
}
if (trace) {
log.trace("Finished invalidating region " + cache.getName());
}
}
/**
* Notifies this validator that it is expected that a database read followed by a subsequent {@link
* #acquirePutFromLoadLock(Object)} call will occur. The intent is this method would be called following a cache miss
* #acquirePutFromLoadLock(Object, long)} call will occur. The intent is this method would be called following a cache miss
* wherein it is expected that a database read plus cache put will occur. Calling this method allows the validator to
* treat the subsequent <code>acquirePutFromLoadLock</code> as if the database read occurred when this method was
* invoked. This allows the validator to compare the timestamp of this call against the timestamp of subsequent removal
* notifications.
*
* @param key key that will be used for subsequent cache put
* @param txTimestamp
*/
public void registerPendingPut(Object key) {
public void registerPendingPut(Object key, long txTimestamp) {
long invalidationTimestamp = this.regionInvalidationTimestamp;
if (txTimestamp <= invalidationTimestamp) {
boolean skip;
if (invalidationTimestamp == Long.MAX_VALUE) {
// there is ongoing invalidation of pending puts
skip = true;
}
else {
Transaction tx = null;
if (transactionManager != null) {
try {
tx = transactionManager.getTransaction();
}
catch (SystemException e) {
log.error("TransactionManager failed to provide transaction", e);
}
}
skip = tx == null || !regionInvalidators.contains(tx);
}
if (skip) {
if (trace) {
log.tracef("registerPendingPut(%s#%s, %d) skipped due to region invalidation (%d)", cache.getName(), key, txTimestamp, invalidationTimestamp);
}
return;
}
}
final PendingPut pendingPut = new PendingPut( getOwnerForPut() );
final PendingPutMap pendingForKey = new PendingPutMap( pendingPut );
@ -335,21 +471,42 @@ public class PutFromLoadValidator {
if ( !existing.hasInvalidator() ) {
existing.put(pendingPut);
}
} finally {
}
finally {
existing.releaseLock();
}
if (trace) {
log.tracef("registerPendingPut(%s#%s, %d) ended with %s", cache.getName(), key, txTimestamp, existing);
}
}
else {
if (trace) {
log.tracef("registerPendingPut(%s#%s, %d) failed to acquire lock", cache.getName(), key, txTimestamp);
}
// Can't get the lock; when we come back we'll be a "naked put"
}
}
else {
if (trace) {
log.tracef("registerPendingPut(%s#%s, %d) registered using putIfAbsent: %s", cache.getName(), key, txTimestamp, pendingForKey);
}
}
}
/**
* Invalidates any {@link #registerPendingPut(Object) previously registered pending puts}
* and disables further registrations ensuring a subsequent call to {@link #acquirePutFromLoadLock(Object)}
* Calls {@link #beginInvalidatingKey(Object, Object)} with current transaction or thread.
* @param key
* @return
*/
public boolean beginInvalidatingKey(Object key) {
return beginInvalidatingKey(key, getOwnerForPut());
}
/**
* Invalidates any {@link #registerPendingPut(Object, long) previously registered pending puts}
* and disables further registrations ensuring a subsequent call to {@link #acquirePutFromLoadLock(Object, long)}
* will return <code>false</code>. <p> This method will block until any concurrent thread that has
* {@link #acquirePutFromLoadLock(Object) acquired the putFromLoad lock} for the given key
* {@link #acquirePutFromLoadLock(Object, long) acquired the putFromLoad lock} for the given key
* has released the lock. This allows the caller to be certain the putFromLoad will not execute after this method
* returns, possibly caching stale data. </p>
* After this transaction completes, {@link #endInvalidatingKey(Object)} needs to be called }
@ -359,7 +516,7 @@ public class PutFromLoadValidator {
* @return <code>true</code> if the invalidation was successful; <code>false</code> if a problem occured (which the
* caller should treat as an exception condition)
*/
public boolean beginInvalidatingKey(Object key) {
public boolean beginInvalidatingKey(Object key, Object lockOwner) {
PendingPutMap pending = new PendingPutMap(null);
PendingPutMap prev = pendingPuts.putIfAbsent(key, pending);
if (prev != null) {
@ -367,39 +524,68 @@ public class PutFromLoadValidator {
}
if (pending.acquireLock(60, TimeUnit.SECONDS)) {
try {
pending.invalidate();
pending.addInvalidator(getOwnerForPut(), System.currentTimeMillis() + nakedPutInvalidationPeriod);
} finally {
long now = System.currentTimeMillis();
pending.invalidate(now, expirationPeriod);
pending.addInvalidator(lockOwner, now, expirationPeriod);
}
finally {
pending.releaseLock();
}
if (trace) {
log.tracef("beginInvalidatingKey(%s#%s, %s) ends with %s", cache.getName(), key, lockOwner, pending);
}
return true;
} else {
}
else {
log.tracef("beginInvalidatingKey(%s#%s, %s) failed to acquire lock", cache.getName(), key);
return false;
}
}
/**
* Called after the transaction completes, allowing caching of entries. It is possible that this method
* is called without previous invocation of {@link #beginInvalidatingKey(Object)}, then it should be noop.
*
* Calls {@link #endInvalidatingKey(Object, Object)} with current transaction or thread.
* @param key
* @return
*/
public boolean endInvalidatingKey(Object key) {
return endInvalidatingKey(key, getOwnerForPut());
}
/**
* Called after the transaction completes, allowing caching of entries. It is possible that this method
* is called without previous invocation of {@link #beginInvalidatingKey(Object)}, then it should be a no-op.
*
* @param key
* @param lockOwner owner of the invalidation - transaction or thread
* @return
*/
public boolean endInvalidatingKey(Object key, Object lockOwner) {
PendingPutMap pending = pendingPuts.get(key);
if (pending == null) {
if (trace) {
log.tracef("endInvalidatingKey(%s#%s, %s) could not find pending puts", cache.getName(), key, lockOwner);
}
return true;
}
if (pending.acquireLock(60, TimeUnit.SECONDS)) {
try {
pending.removeInvalidator(getOwnerForPut());
long now = System.currentTimeMillis();
pending.removeInvalidator(lockOwner, now);
// we can't remove the pending put yet because we wait for naked puts
// pendingPuts should be configured with maxIdle time so won't have memory leak
return true;
} finally {
pending.releaseLock();
}
} else {
finally {
pending.releaseLock();
if (trace) {
log.tracef("endInvalidatingKey(%s#%s, %s) ends with %s", cache.getName(), key, lockOwner, pending);
}
}
}
else {
if (trace) {
log.tracef("endInvalidatingKey(%s#%s, %s) failed to acquire lock", cache.getName(), key, lockOwner);
}
return false;
}
}
@ -431,14 +617,61 @@ public class PutFromLoadValidator {
private PendingPut singlePendingPut;
private Map<Object, PendingPut> fullMap;
private final java.util.concurrent.locks.Lock lock = new ReentrantLock();
private Object singleInvalidator;
private Set<Object> invalidators;
private long nakedPutsDeadline = Long.MIN_VALUE;
private Invalidator singleInvalidator;
private Map<Object, Invalidator> invalidators;
private long lastInvalidationEnd = Long.MIN_VALUE;
PendingPutMap(PendingPut singleItem) {
this.singlePendingPut = singleItem;
}
// toString should be called only for debugging purposes
public String toString() {
if (lock.tryLock()) {
try {
StringBuilder sb = new StringBuilder();
sb.append("{ PendingPuts=");
if (singlePendingPut == null) {
if (fullMap == null) {
sb.append("[]");
}
else {
sb.append(fullMap.values());
}
}
else {
sb.append('[').append(singlePendingPut).append(']');
}
sb.append(", Invalidators=");
if (singleInvalidator == null) {
if (invalidators == null) {
sb.append("[]");
}
else {
sb.append(invalidators);
}
}
else {
sb.append('[').append(singleInvalidator).append(']');
}
sb.append(", LastInvalidationEnd=");
if (lastInvalidationEnd == Long.MIN_VALUE) {
sb.append("<none>");
}
else {
sb.append(lastInvalidationEnd);
}
return sb.append("}").toString();
}
finally {
lock.unlock();
}
}
else {
return "PendingPutMap: <locked>";
}
}
public void put(PendingPut pendingPut) {
if ( singlePendingPut == null ) {
if ( fullMap == null ) {
@ -492,63 +725,167 @@ public class PutFromLoadValidator {
lock.unlock();
}
public void invalidate() {
public void invalidate(long now, long expirationPeriod) {
if ( singlePendingPut != null ) {
singlePendingPut.completed = true;
// Nullify to avoid leaking completed pending puts
if (singlePendingPut.invalidate(now, expirationPeriod)) {
singlePendingPut = null;
}
else if ( fullMap != null ) {
for ( PendingPut pp : fullMap.values() ) {
pp.completed = true;
}
// Nullify to avoid leaking completed pending puts
fullMap = null;
else if ( fullMap != null ) {
for ( Iterator<PendingPut> it = fullMap.values().iterator(); it.hasNext(); ) {
PendingPut pp = it.next();
if (pp.invalidate(now, expirationPeriod)) {
it.remove();
}
}
}
}
public void addInvalidator(Object invalidator, long deadline) {
public void addInvalidator(Object owner, long now, long invalidatorTimeout) {
assert owner != null;
if (invalidators == null) {
if (singleInvalidator == null) {
singleInvalidator = invalidator;
} else {
invalidators = new HashSet<Object>();
invalidators.add(singleInvalidator);
invalidators.add(invalidator);
singleInvalidator = new Invalidator(owner, now);
}
else {
if (singleInvalidator.registeredTimestamp + invalidatorTimeout < now) {
// remove leaked invalidator
singleInvalidator = new Invalidator(owner, now);
}
invalidators = new HashMap<Object, Invalidator>();
invalidators.put(singleInvalidator.owner, singleInvalidator);
invalidators.put(owner, new Invalidator(owner, now));
singleInvalidator = null;
}
} else {
invalidators.add(invalidator);
}
nakedPutsDeadline = Math.max(nakedPutsDeadline, deadline);
else {
long allowedRegistration = now - invalidatorTimeout;
// remove leaked invalidators
for (Iterator<Invalidator> it = invalidators.values().iterator(); it.hasNext(); ) {
if (it.next().registeredTimestamp < allowedRegistration) {
it.remove();
}
}
invalidators.put(owner, new Invalidator(owner, now));
}
}
public boolean hasInvalidator() {
return singleInvalidator != null || (invalidators != null && !invalidators.isEmpty());
}
public void removeInvalidator(Object invalidator) {
public void removeInvalidator(Object owner, long now) {
if (invalidators == null) {
if (singleInvalidator != null && singleInvalidator.equals(invalidator)) {
if (singleInvalidator != null && singleInvalidator.owner.equals(owner)) {
singleInvalidator = null;
}
} else {
invalidators.remove(invalidator);
}
else {
invalidators.remove(owner);
}
lastInvalidationEnd = Math.max(lastInvalidationEnd, now);
}
public boolean canRemove() {
return size() == 0 && !hasInvalidator() &&
(nakedPutsDeadline == Long.MIN_VALUE || nakedPutsDeadline < System.currentTimeMillis());
return size() == 0 && !hasInvalidator() && lastInvalidationEnd == Long.MIN_VALUE;
}
}
private static class PendingPut {
private final Object owner;
private volatile boolean completed;
private boolean completed;
// the timestamp is not filled during registration in order to avoid expensive currentTimeMillis() calls
private long registeredTimestamp = Long.MIN_VALUE;
private PendingPut(Object owner) {
this.owner = owner;
}
public String toString() {
return (completed ? "C@" : "R@") + owner;
}
public boolean invalidate(long now, long expirationPeriod) {
completed = true;
if (registeredTimestamp == Long.MIN_VALUE) {
registeredTimestamp = now;
}
else if (registeredTimestamp + expirationPeriod < now){
return true; // this is a leaked pending put
}
return false;
}
}
private static class Invalidator {
private final Object owner;
private final long registeredTimestamp;
private Invalidator(Object owner, long registeredTimestamp) {
this.owner = owner;
this.registeredTimestamp = registeredTimestamp;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("{");
sb.append("Owner=").append(owner);
sb.append(", Timestamp=").append(registeredTimestamp);
sb.append('}');
return sb.toString();
}
}
private class PutFromLoadInterceptor extends BaseRpcInterceptor {
private final String cacheName;
private final RpcManager rpcManager;
private final CacheCommandInitializer cacheCommandInitializer;
public PutFromLoadInterceptor(String cacheName, RpcManager rpcManager, CacheCommandInitializer cacheCommandInitializer) {
this.cacheName = cacheName;
this.rpcManager = rpcManager;
this.cacheCommandInitializer = cacheCommandInitializer;
}
// We need to intercept PrepareCommand, not InvalidateCommand since the interception takes
// place before EntryWrappingInterceptor and the PrepareCommand is multiplexed into InvalidateCommands
// as part of EntryWrappingInterceptor
@Override
public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable {
if (!ctx.isOriginLocal()) {
for (WriteCommand wc : command.getModifications()) {
if (wc instanceof InvalidateCommand) {
// InvalidateCommand does not correctly implement getAffectedKeys()
for (Object key : ((InvalidateCommand) wc).getKeys()) {
beginInvalidatingKey(key, ctx.getLockOwner());
}
}
else {
for (Object key : wc.getAffectedKeys()) {
beginInvalidatingKey(key, ctx.getLockOwner());
}
}
}
}
return invokeNextInterceptor(ctx, command);
}
@Override
public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable {
try {
if (ctx.isOriginLocal()) {
// send async Commit
Set<Object> affectedKeys = ctx.getAffectedKeys();
if (!affectedKeys.isEmpty()) {
EndInvalidationCommand commitCommand = cacheCommandInitializer.buildEndInvalidationCommand(
cacheName, affectedKeys.toArray(), ctx.getGlobalTransaction());
rpcManager.invokeRemotely(null, commitCommand, rpcManager.getDefaultRpcOptions(false, DeliverOrder.NONE));
}
}
}
finally {
return invokeNextInterceptor(ctx, command);
}
}
}
}

View File

@ -61,7 +61,7 @@ public class TransactionalAccessDelegate {
}
final Object val = cache.get( key );
if ( val == null ) {
putValidator.registerPendingPut( key );
putValidator.registerPendingPut( key, txTimestamp );
}
return val;
}
@ -110,7 +110,7 @@ public class TransactionalAccessDelegate {
return false;
}
PutFromLoadValidator.Lock lock = putValidator.acquirePutFromLoadLock(key);
PutFromLoadValidator.Lock lock = putValidator.acquirePutFromLoadLock(key, txTimestamp);
if ( lock == null) {
if ( TRACE_ENABLED ) {
log.tracef( "Put from load lock not acquired for key %s", key );
@ -202,10 +202,15 @@ public class TransactionalAccessDelegate {
* @throws CacheException if eviction the region fails
*/
public void removeAll() throws CacheException {
if ( !putValidator.invalidateRegion() ) {
throw new CacheException( "Failed to invalidate pending putFromLoad calls for region " + region.getName() );
try {
if (!putValidator.beginInvalidatingRegion()) {
throw new CacheException("Failed to invalidate pending putFromLoad calls for region " + region.getName());
}
Caches.removeAll(cache);
}
finally {
putValidator.endInvalidatingRegion();
}
Caches.removeAll( cache );
}
/**
@ -231,13 +236,18 @@ public class TransactionalAccessDelegate {
* @throws CacheException if evicting items fails
*/
public void evictAll() throws CacheException {
if ( !putValidator.invalidateRegion() ) {
throw new CacheException( "Failed to invalidate pending putFromLoad calls for region " + region.getName() );
try {
if (!putValidator.beginInvalidatingRegion()) {
throw new CacheException("Failed to invalidate pending putFromLoad calls for region " + region.getName());
}
// Invalidate the local region and then go remote
region.invalidateRegion();
Caches.broadcastEvictAll( cache );
Caches.broadcastEvictAll(cache);
}
finally {
putValidator.endInvalidatingRegion();
}
}
/**

View File

@ -58,6 +58,7 @@ public class CacheCommandFactory implements ExtendedModuleCommandFactory {
public Map<Byte, Class<? extends ReplicableCommand>> getModuleCommands() {
final Map<Byte, Class<? extends ReplicableCommand>> map = new HashMap<Byte, Class<? extends ReplicableCommand>>( 3 );
map.put( CacheCommandIds.EVICT_ALL, EvictAllCommand.class );
map.put( CacheCommandIds.END_INVALIDATION, EndInvalidationCommand.class );
return map;
}
@ -68,6 +69,9 @@ public class CacheCommandFactory implements ExtendedModuleCommandFactory {
case CacheCommandIds.EVICT_ALL:
c = new EvictAllCommand( cacheName, allRegions.get( cacheName ) );
break;
case CacheCommandIds.END_INVALIDATION:
c = new EndInvalidationCommand(cacheName);
break;
default:
throw new IllegalArgumentException( "Not registered to handle command id " + commandId );
}

View File

@ -14,7 +14,12 @@ package org.hibernate.cache.infinispan.util;
*/
public interface CacheCommandIds {
/**
* The "evict all" command id
* {@link EvictAllCommand} id
*/
public static final byte EVICT_ALL = 120;
/**
* {@link EndInvalidationCommand} id
*/
public static final byte END_INVALIDATION = 121;
}

View File

@ -6,9 +6,12 @@
*/
package org.hibernate.cache.infinispan.util;
import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.module.ModuleCommandInitializer;
import java.util.concurrent.ConcurrentHashMap;
/**
* Command initializer
*
@ -17,6 +20,21 @@ import org.infinispan.commands.module.ModuleCommandInitializer;
*/
public class CacheCommandInitializer implements ModuleCommandInitializer {
private final ConcurrentHashMap<String, PutFromLoadValidator> putFromLoadValidators
= new ConcurrentHashMap<String, PutFromLoadValidator>();
public void addPutFromLoadValidator(String cacheName, PutFromLoadValidator putFromLoadValidator) {
// there could be two instances of PutFromLoadValidator bound to the same cache when
// there are two JndiInfinispanRegionFactories bound to the same cacheManager via JNDI.
// In that case, as putFromLoadValidator does not really own the pendingPuts cache,
// it's safe to have more instances.
putFromLoadValidators.put(cacheName, putFromLoadValidator);
}
public PutFromLoadValidator removePutFromLoadValidator(String cacheName) {
return putFromLoadValidators.remove(cacheName);
}
/**
* Build an instance of {@link EvictAllCommand} for a given region.
*
@ -31,9 +49,17 @@ public class CacheCommandInitializer implements ModuleCommandInitializer {
return new EvictAllCommand( regionName );
}
@Override
public void initializeReplicableCommand(ReplicableCommand c, boolean isRemote) {
// No need to initialize...
public EndInvalidationCommand buildEndInvalidationCommand(String cacheName, Object[] keys, Object lockOwner) {
return new EndInvalidationCommand( cacheName, keys, lockOwner );
}
@Override
public void initializeReplicableCommand(ReplicableCommand c, boolean isRemote) {
switch (c.getCommandId()) {
case CacheCommandIds.END_INVALIDATION:
EndInvalidationCommand endInvalidationCommand = (EndInvalidationCommand) c;
endInvalidationCommand.setPutFromLoadValidator(putFromLoadValidators.get(endInvalidationCommand.getCacheName()));
break;
}
}
}

View File

@ -0,0 +1,86 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.util;
import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
import org.infinispan.commands.remote.BaseRpcCommand;
import org.infinispan.context.InvocationContext;
import java.util.Arrays;
/**
* Sent in commit phase (after DB commit) to remote nodes in order to stop invalidating
* putFromLoads.
*
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public class EndInvalidationCommand extends BaseRpcCommand {
private Object[] keys;
private Object lockOwner;
private PutFromLoadValidator putFromLoadValidator;
public EndInvalidationCommand(String cacheName) {
this(cacheName, null, null);
}
/**
* @param cacheName name of the cache to evict
*/
public EndInvalidationCommand(String cacheName, Object[] keys, Object lockOwner) {
super(cacheName);
this.keys = keys;
this.lockOwner = lockOwner;
}
@Override
public Object perform(InvocationContext ctx) throws Throwable {
for (Object key : keys) {
putFromLoadValidator.endInvalidatingKey(key, lockOwner);
}
return null;
}
@Override
public byte getCommandId() {
return CacheCommandIds.END_INVALIDATION;
}
@Override
public Object[] getParameters() {
return new Object[] { keys, lockOwner };
}
@Override
public void setParameters(int commandId, Object[] parameters) {
keys = (Object[]) parameters[0];
lockOwner = parameters[1];
}
@Override
public boolean isReturnValueExpected() {
return false;
}
@Override
public boolean canBlock() {
return true;
}
public void setPutFromLoadValidator(PutFromLoadValidator putFromLoadValidator) {
this.putFromLoadValidator = putFromLoadValidator;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("EndInvalidationCommand{");
sb.append("cacheName=").append(cacheName);
sb.append(", keys=").append(Arrays.toString(keys));
sb.append(", lockOwner=").append(lockOwner);
sb.append('}');
return sb.toString();
}
}

View File

@ -17,9 +17,11 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
import org.hibernate.test.cache.infinispan.functional.cluster.DualNodeJtaTransactionManagerImpl;
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.CacheManagerCallable;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.util.logging.Log;
@ -72,6 +74,13 @@ public class PutFromLoadValidatorUnitTestCase {
}
}
private static EmbeddedCacheManager createCacheManager() {
EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createCacheManager(false);
cacheManager.defineConfiguration(InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME,
InfinispanRegionFactory.PENDING_PUTS_CACHE_CONFIGURATION);
return cacheManager;
}
@Test
public void testNakedPut() throws Exception {
nakedPutTest(false);
@ -82,12 +91,11 @@ public class PutFromLoadValidatorUnitTestCase {
}
private void nakedPutTest(final boolean transactional) throws Exception {
withCacheManager(new CacheManagerCallable(
TestCacheManagerFactory.createCacheManager(false)) {
withCacheManager(new CacheManagerCallable(createCacheManager()) {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
transactional ? tm : null);
exec(transactional, new NakedPut(testee, true));
}
});
@ -103,12 +111,11 @@ public class PutFromLoadValidatorUnitTestCase {
}
private void registeredPutTest(final boolean transactional) throws Exception {
withCacheManager(new CacheManagerCallable(
TestCacheManagerFactory.createCacheManager(false)) {
withCacheManager(new CacheManagerCallable(createCacheManager()) {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
transactional ? tm : null);
exec(transactional, new RegularPut(testee));
}
});
@ -133,14 +140,14 @@ public class PutFromLoadValidatorUnitTestCase {
private void nakedPutAfterRemovalTest(final boolean transactional,
final boolean removeRegion) throws Exception {
withCacheManager(new CacheManagerCallable(
TestCacheManagerFactory.createCacheManager(false)) {
withCacheManager(new CacheManagerCallable(createCacheManager()) {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
transactional ? tm : null);
Invalidation invalidation = new Invalidation(testee, removeRegion);
NakedPut nakedPut = new NakedPut(testee, false);
// the naked put can succeed because it has txTimestamp after invalidation
NakedPut nakedPut = new NakedPut(testee, true);
exec(transactional, invalidation, nakedPut);
}
});
@ -166,12 +173,11 @@ public class PutFromLoadValidatorUnitTestCase {
private void registeredPutAfterRemovalTest(final boolean transactional,
final boolean removeRegion) throws Exception {
withCacheManager(new CacheManagerCallable(
TestCacheManagerFactory.createCacheManager(false)) {
withCacheManager(new CacheManagerCallable(createCacheManager()) {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
transactional ? tm : null);
Invalidation invalidation = new Invalidation(testee, removeRegion);
RegularPut regularPut = new RegularPut(testee);
exec(transactional, invalidation, regularPut);
@ -199,24 +205,24 @@ public class PutFromLoadValidatorUnitTestCase {
private void registeredPutWithInterveningRemovalTest(
final boolean transactional, final boolean removeRegion)
throws Exception {
withCacheManager(new CacheManagerCallable(
TestCacheManagerFactory.createCacheManager(false)) {
withCacheManager(new CacheManagerCallable(createCacheManager()) {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
transactional ? tm : null);
try {
long txTimestamp = System.currentTimeMillis();
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
testee.registerPendingPut(KEY1, txTimestamp);
if (removeRegion) {
testee.invalidateRegion();
testee.beginInvalidatingRegion();
} else {
testee.beginInvalidatingKey(KEY1);
}
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1);
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1, txTimestamp);
try {
assertNull(lock);
}
@ -224,60 +230,11 @@ public class PutFromLoadValidatorUnitTestCase {
if (lock != null) {
testee.releasePutFromLoadLock(KEY1, lock);
}
testee.endInvalidatingKey(KEY1);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
}
@Test
public void testDelayedNakedPutAfterKeyRemoval() throws Exception {
delayedNakedPutAfterRemovalTest(false, false);
}
@Test
public void testDelayedNakedPutAfterKeyRemovalTransactional() throws Exception {
delayedNakedPutAfterRemovalTest(true, false);
}
@Test
public void testDelayedNakedPutAfterRegionRemoval() throws Exception {
delayedNakedPutAfterRemovalTest(false, true);
}
@Test
public void testDelayedNakedPutAfterRegionRemovalTransactional() throws Exception {
delayedNakedPutAfterRemovalTest(true, true);
}
private void delayedNakedPutAfterRemovalTest(
final boolean transactional, final boolean removeRegion)
throws Exception {
withCacheManager(new CacheManagerCallable(
TestCacheManagerFactory.createCacheManager(false)) {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
transactional ? tm : null, 100);
if (removeRegion) {
testee.invalidateRegion();
testee.endInvalidatingRegion();
} else {
testee.beginInvalidatingKey(KEY1);
testee.endInvalidatingKey(KEY1);
}
try {
if (transactional) {
tm.begin();
}
Thread.sleep(110);
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1);
try {
assertNotNull(lock);
} finally {
if (lock != null) {
testee.releasePutFromLoadLock(KEY1, null);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
@ -297,13 +254,11 @@ public class PutFromLoadValidatorUnitTestCase {
}
private void multipleRegistrationtest(final boolean transactional) throws Exception {
withCacheManager(new CacheManagerCallable(
TestCacheManagerFactory.createCacheManager(false)) {
withCacheManager(new CacheManagerCallable(createCacheManager()) {
@Override
public void call() {
final PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm,
transactional ? tm : null,
PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
transactional ? tm : null);
final CountDownLatch registeredLatch = new CountDownLatch(3);
final CountDownLatch finishedLatch = new CountDownLatch(3);
@ -312,13 +267,14 @@ public class PutFromLoadValidatorUnitTestCase {
Runnable r = new Runnable() {
public void run() {
try {
long txTimestamp = System.currentTimeMillis();
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
testee.registerPendingPut(KEY1, txTimestamp);
registeredLatch.countDown();
registeredLatch.await(5, TimeUnit.SECONDS);
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1);
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1, txTimestamp);
if (lock != null) {
try {
log.trace("Put from load lock acquired for key = " + KEY1);
@ -341,7 +297,13 @@ public class PutFromLoadValidatorUnitTestCase {
// Start with a removal so the "isPutValid" calls will fail if
// any of the concurrent activity isn't handled properly
testee.invalidateRegion();
testee.beginInvalidatingRegion();
testee.endInvalidatingRegion();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// Do the registration + isPutValid calls
executor.execute(r);
@ -370,20 +332,20 @@ public class PutFromLoadValidatorUnitTestCase {
}
private void invalidationBlocksForInProgressPutTest(final boolean keyOnly) throws Exception {
withCacheManager(new CacheManagerCallable(
TestCacheManagerFactory.createCacheManager(false)) {
withCacheManager(new CacheManagerCallable(createCacheManager()) {
@Override
public void call() {
final PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(),
cm, null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
cm, null);
final CountDownLatch removeLatch = new CountDownLatch(1);
final CountDownLatch pferLatch = new CountDownLatch(1);
final AtomicReference<Object> cache = new AtomicReference<Object>("INITIAL");
Callable<Boolean> pferCallable = new Callable<Boolean>() {
public Boolean call() throws Exception {
testee.registerPendingPut(KEY1);
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1);
long txTimestamp = System.currentTimeMillis();
testee.registerPendingPut(KEY1, txTimestamp);
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1, txTimestamp);
if (lock != null) {
try {
removeLatch.countDown();
@ -405,7 +367,7 @@ public class PutFromLoadValidatorUnitTestCase {
if (keyOnly) {
testee.beginInvalidatingKey(KEY1);
} else {
testee.invalidateRegion();
testee.beginInvalidatingRegion();
}
cache.set(null);
return null;
@ -466,14 +428,18 @@ public class PutFromLoadValidatorUnitTestCase {
@Override
public Void call() throws Exception {
if (removeRegion) {
boolean success = putFromLoadValidator.invalidateRegion();
boolean success = putFromLoadValidator.beginInvalidatingRegion();
assertTrue(success);
putFromLoadValidator.endInvalidatingRegion();;
} else {
boolean success = putFromLoadValidator.beginInvalidatingKey(KEY1);
assertTrue(success);
success = putFromLoadValidator.endInvalidatingKey(KEY1);
assertTrue(success);
}
// if we go for the timestamp-based approach, invalidation in the same millisecond
// as the registerPendingPut/acquirePutFromLoad lock results in failure.
Thread.sleep(10);
return null;
}
}
@ -488,9 +454,10 @@ public class PutFromLoadValidatorUnitTestCase {
@Override
public Void call() throws Exception {
try {
putFromLoadValidator.registerPendingPut(KEY1);
long txTimestamp = System.currentTimeMillis(); // this should be acquired before UserTransaction.begin()
putFromLoadValidator.registerPendingPut(KEY1, txTimestamp);
PutFromLoadValidator.Lock lock = putFromLoadValidator.acquirePutFromLoadLock(KEY1);
PutFromLoadValidator.Lock lock = putFromLoadValidator.acquirePutFromLoadLock(KEY1, txTimestamp);
try {
assertNotNull(lock);
} finally {
@ -517,7 +484,8 @@ public class PutFromLoadValidatorUnitTestCase {
@Override
public Void call() throws Exception {
try {
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1);
long txTimestamp = System.currentTimeMillis(); // this should be acquired before UserTransaction.begin()
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1, txTimestamp);
try {
if (expectSuccess) {
assertNotNull(lock);

View File

@ -7,6 +7,7 @@
package org.hibernate.test.cache.infinispan.collection;
import javax.transaction.TransactionManager;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -31,6 +32,8 @@ import org.hibernate.test.cache.infinispan.NodeEnvironment;
import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
import org.hibernate.test.cache.infinispan.util.TestingKeyFactory;
import org.infinispan.AdvancedCache;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.CacheManagerCallable;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.transaction.tm.BatchModeTransactionManager;
@ -155,29 +158,10 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
final CountDownLatch pferLatch = new CountDownLatch( 1 );
final CountDownLatch removeLatch = new CountDownLatch( 1 );
final TransactionManager remoteTm = remoteCollectionRegion.getTransactionManager();
withCacheManager(new CacheManagerCallable(TestCacheManagerFactory.createCacheManager(false)) {
withCacheManager(new CacheManagerCallable(createCacheManager()) {
@Override
public void call() {
PutFromLoadValidator validator = new PutFromLoadValidator(remoteCollectionRegion.getCache(), cm,
remoteTm, 20000) {
@Override
public Lock acquirePutFromLoadLock(Object key) {
Lock lock = super.acquirePutFromLoadLock( key );
try {
removeLatch.countDown();
pferLatch.await( 2, TimeUnit.SECONDS );
}
catch (InterruptedException e) {
log.debug( "Interrupted" );
Thread.currentThread().interrupt();
}
catch (Exception e) {
log.error( "Error", e );
throw new RuntimeException( "Error", e );
}
return lock;
}
};
PutFromLoadValidator validator = getPutFromLoadValidator(remoteCollectionRegion.getCache(), cm, remoteTm, removeLatch, pferLatch);
final TransactionalAccessDelegate delegate =
new TransactionalAccessDelegate(localCollectionRegion, validator);
@ -221,6 +205,39 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
});
}
private static EmbeddedCacheManager createCacheManager() {
EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createCacheManager(false);
cacheManager.defineConfiguration(InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME,
InfinispanRegionFactory.PENDING_PUTS_CACHE_CONFIGURATION);
return cacheManager;
}
protected PutFromLoadValidator getPutFromLoadValidator(AdvancedCache cache, EmbeddedCacheManager cm,
TransactionManager tm,
CountDownLatch removeLatch, CountDownLatch pferLatch) {
// remove the interceptor inserted by default PutFromLoadValidator, we're using different one
PutFromLoadValidator.removeFromCache(cache);
return new PutFromLoadValidator(cache, cm, tm) {
@Override
public Lock acquirePutFromLoadLock(Object key, long txTimestamp) {
Lock lock = super.acquirePutFromLoadLock( key, txTimestamp);
try {
removeLatch.countDown();
pferLatch.await( 2, TimeUnit.SECONDS );
}
catch (InterruptedException e) {
log.debug( "Interrupted" );
Thread.currentThread().interrupt();
}
catch (Exception e) {
log.error( "Error", e );
throw new RuntimeException( "Error", e );
}
return lock;
}
};
}
@Test
public void testPutFromLoad() throws Exception {
putFromLoadTest( false );
@ -455,6 +472,9 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs
assertEquals( 0, remoteCollectionRegion.getCache().size() );
// Wait for async propagation of EndInvalidationCommand
sleep( 250 );
// Test whether the get above messes up the optimistic version
remoteAccessStrategy.putFromLoad(null, KEY, VALUE1, System.currentTimeMillis(), new Integer( 1 ) );
assertEquals( VALUE1, remoteAccessStrategy.get(null, KEY, System.currentTimeMillis() ) );

View File

@ -289,8 +289,8 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
assertEquals("Correct node1 value", VALUE2, localAccessStrategy.get(null, KEY, txTimestamp));
if (isUsingInvalidation()) {
// no data version to prevent the PFER; we count on db locks preventing this
assertEquals("Expected node2 value", VALUE1, remoteAccessStrategy.get(null, KEY, txTimestamp));
// invalidation command invalidates pending put
assertEquals("Expected node2 value", null, remoteAccessStrategy.get(null, KEY, txTimestamp));
} else {
// The node1 update is replicated, preventing the node2 PFER
assertEquals("Correct node2 value", VALUE2, remoteAccessStrategy.get(null, KEY, txTimestamp));
@ -571,9 +571,12 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
// Re-establishing the region root on the local node doesn't
// propagate it to other nodes. Do a get on the remote node to re-establish
assertEquals(null, remoteAccessStrategy.get(null, KEY, System.currentTimeMillis()));
assertNull(remoteAccessStrategy.get(null, KEY, System.currentTimeMillis()));
assertEquals(0, remoteEntityRegion.getCache().size());
// Wait for async propagation of EndInvalidationCommand before executing naked put
sleep(250);
// Test whether the get above messes up the optimistic version
remoteAccessStrategy.putFromLoad(null, KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals(VALUE1, remoteAccessStrategy.get(null, KEY, System.currentTimeMillis()));

View File

@ -117,6 +117,7 @@ public abstract class AbstractFunctionalTestCase extends SingleNodeTestCase {
log.info("Entry persisted, let's load and delete it.");
cleanupCache();
Thread.sleep(10);
withTx(tm, new Callable<Void>() {
@Override

View File

@ -861,6 +861,7 @@ public class BasicTransactionalTestCase extends AbstractFunctionalTestCase {
// Clear the cache before the transaction begins
BasicTransactionalTestCase.this.cleanupCache();
Thread.sleep(10);
withTx(tm, new Callable<Void>() {
@Override
@ -951,6 +952,7 @@ public class BasicTransactionalTestCase extends AbstractFunctionalTestCase {
// TODO: Clear caches manually via cache manager (it's faster!!)
this.cleanupCache();
Thread.sleep(10);
stats.setStatisticsEnabled( true );
stats.clear();
@ -1028,6 +1030,7 @@ public class BasicTransactionalTestCase extends AbstractFunctionalTestCase {
assertEquals(2, slcStats.getPutCount());
cache.evictEntityRegions();
Thread.sleep(10);
assertEquals(0, slcStats.getElementCountInMemory());
assertFalse("2lc entity cache is expected to not contain Citizen id = " + citizens.get(0).getId(),
@ -1052,6 +1055,46 @@ public class BasicTransactionalTestCase extends AbstractFunctionalTestCase {
});
}
@Test
public void testMultipleEvictAll() throws Exception {
final List<Citizen> citizens = saveSomeCitizens();
withTx(tm, new Callable<Void>() {
@Override
public Void call() throws Exception {
Session s = openSession();
Transaction tx = s.beginTransaction();
Cache cache = s.getSessionFactory().getCache();
cache.evictEntityRegions();
cache.evictEntityRegions();
// cleanup
tx.commit();
s.close();
return null;
}
});
withTx(tm, new Callable<Void>() {
@Override
public Void call() throws Exception {
Session s = openSession();
Transaction tx = s.beginTransaction();
Cache cache = s.getSessionFactory().getCache();
cache.evictEntityRegions();
s.delete(s.load(Citizen.class, citizens.get(0).getId()));
s.delete(s.load(Citizen.class, citizens.get(1).getId()));
// cleanup
tx.commit();
s.close();
return null;
}
});
}
private List<Citizen> saveSomeCitizens() throws Exception {
final Citizen c1 = new Citizen();
c1.setFirstname( "Emmanuel" );

View File

@ -25,6 +25,9 @@ import org.hibernate.service.spi.Configurable;
public class DualNodeJtaPlatformImpl implements JtaPlatform, Configurable {
private String nodeId;
public DualNodeJtaPlatformImpl() {
}
@Override
public void configure(Map configurationValues) {
nodeId = (String) configurationValues.get( DualNodeTestCase.NODE_ID_PROP );

View File

@ -20,11 +20,10 @@ import org.hibernate.resource.transaction.backend.jta.internal.JtaTransactionCoo
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
import org.hibernate.testing.junit4.BaseNonConfigCoreFunctionalTestCase;
import org.junit.After;
import org.junit.Before;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.junit.Before;
import org.junit.ClassRule;
/**
@ -78,18 +77,20 @@ public abstract class DualNodeTestCase extends BaseNonConfigCoreFunctionalTestCa
DualNodeJtaTransactionManagerImpl.cleanupTransactionManagers();
}
@Before
public void prepare() throws Exception {
@Override
public void startUp() {
super.startUp();
// In some cases tests are multi-threaded, so they have to join the group
infinispanTestIdentifier.joinContext();
secondNodeEnvironment = new SecondNodeEnvironment();
}
@After
public void unPrepare() {
@Override
public void shutDown() {
if ( secondNodeEnvironment != null ) {
secondNodeEnvironment.shutDown();
}
super.shutDown();
}
protected SecondNodeEnvironment secondNodeEnvironment() {

View File

@ -10,13 +10,24 @@ import javax.transaction.TransactionManager;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.test.cache.infinispan.functional.Contact;
import org.hibernate.test.cache.infinispan.functional.Customer;
import org.hibernate.testing.TestForIssue;
import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.manager.CacheContainer;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commons.util.Util;
import org.infinispan.context.InvocationContext;
import org.infinispan.interceptors.base.BaseCustomInterceptor;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryVisited;
import org.infinispan.notifications.cachelistener.event.CacheEntryVisitedEvent;
@ -25,7 +36,9 @@ import org.infinispan.util.logging.LogFactory;
import org.jboss.util.collection.ConcurrentSet;
import org.junit.Test;
import static org.infinispan.test.TestingUtil.withTx;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
@ -42,38 +55,62 @@ public class EntityCollectionInvalidationTestCase extends DualNodeTestCase {
static int test = 0;
@Test
public void testAll() throws Exception {
log.info( "*** testAll()" );
private EmbeddedCacheManager localManager, remoteManager;
private Cache localCustomerCache, remoteCustomerCache;
private Cache localContactCache, remoteContactCache;
private Cache localCollectionCache, remoteCollectionCache;
private MyListener localListener, remoteListener;
private TransactionManager localTM, remoteTM;
private SessionFactory localFactory, remoteFactory;
@Override
public void startUp() {
super.startUp();
// Bind a listener to the "local" cache
// Our region factory makes its CacheManager available to us
CacheContainer localManager = ClusterAwareRegionFactory.getCacheManager( DualNodeTestCase.LOCAL );
localManager = ClusterAwareRegionFactory.getCacheManager( DualNodeTestCase.LOCAL );
// Cache localCache = localManager.getCache("entity");
Cache localCustomerCache = localManager.getCache( Customer.class.getName() );
Cache localContactCache = localManager.getCache( Contact.class.getName() );
Cache localCollectionCache = localManager.getCache( Customer.class.getName() + ".contacts" );
MyListener localListener = new MyListener( "local" );
localCustomerCache = localManager.getCache( Customer.class.getName() );
localContactCache = localManager.getCache( Contact.class.getName() );
localCollectionCache = localManager.getCache( Customer.class.getName() + ".contacts" );
localListener = new MyListener( "local" );
localCustomerCache.addListener( localListener );
localContactCache.addListener( localListener );
localCollectionCache.addListener( localListener );
TransactionManager localTM = DualNodeJtaTransactionManagerImpl.getInstance( DualNodeTestCase.LOCAL );
// Bind a listener to the "remote" cache
CacheContainer remoteManager = ClusterAwareRegionFactory.getCacheManager( DualNodeTestCase.REMOTE );
Cache remoteCustomerCache = remoteManager.getCache( Customer.class.getName() );
Cache remoteContactCache = remoteManager.getCache( Contact.class.getName() );
Cache remoteCollectionCache = remoteManager.getCache( Customer.class.getName() + ".contacts" );
MyListener remoteListener = new MyListener( "remote" );
remoteManager = ClusterAwareRegionFactory.getCacheManager( DualNodeTestCase.REMOTE );
remoteCustomerCache = remoteManager.getCache( Customer.class.getName() );
remoteContactCache = remoteManager.getCache( Contact.class.getName() );
remoteCollectionCache = remoteManager.getCache( Customer.class.getName() + ".contacts" );
remoteListener = new MyListener( "remote" );
remoteCustomerCache.addListener( remoteListener );
remoteContactCache.addListener( remoteListener );
remoteCollectionCache.addListener( remoteListener );
TransactionManager remoteTM = DualNodeJtaTransactionManagerImpl.getInstance( DualNodeTestCase.REMOTE );
SessionFactory localFactory = sessionFactory();
SessionFactory remoteFactory = secondNodeEnvironment().getSessionFactory();
localFactory = sessionFactory();
remoteFactory = secondNodeEnvironment().getSessionFactory();
try {
localTM = DualNodeJtaTransactionManagerImpl.getInstance( DualNodeTestCase.LOCAL );
remoteTM = DualNodeJtaTransactionManagerImpl.getInstance( DualNodeTestCase.REMOTE );
}
@Override
public void shutDown() {
cleanupTransactionManagement();
}
@Override
protected void cleanupTest() throws Exception {
cleanup(localFactory, localTM);
localListener.clear();
remoteListener.clear();
// do not call super.cleanupTest becasue we would clean transaction managers
}
@Test
public void testAll() throws Exception {
assertEmptyCaches();
assertTrue( remoteListener.isEmpty() );
assertTrue( localListener.isEmpty() );
@ -127,11 +164,95 @@ public class EntityCollectionInvalidationTestCase extends DualNodeTestCase {
assertEquals( 0, localCollectionCache.size() );
assertEquals( 0, localCustomerCache.size() );
}
finally {
// cleanup the db
log.debug( "Cleaning up" );
cleanup( localFactory, localTM );
@TestForIssue(jiraKey = "HHH-9881")
@Test
public void testConcurrentLoadAndRemoval() throws Exception {
AtomicReference<Exception> getException = new AtomicReference<>();
AtomicReference<Exception> deleteException = new AtomicReference<>();
Phaser getPhaser = new Phaser(2);
HookInterceptor hookInterceptor = new HookInterceptor(getException);
AdvancedCache remotePPCache = remoteCustomerCache.getCacheManager().getCache(
remoteCustomerCache.getName() + "-" + InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME).getAdvancedCache();
remotePPCache.getAdvancedCache().addInterceptor(hookInterceptor, 0);
IdContainer idContainer = new IdContainer();
withTx(localTM, () -> {
Session s = localFactory.getCurrentSession();
s.getTransaction().begin();
Customer customer = new Customer();
customer.setName( "JBoss" );
s.persist(customer);
s.getTransaction().commit();
s.close();
idContainer.customerId = customer.getId();
return null;
});
// start loading
Thread getThread = new Thread(() -> {
try {
withTx(remoteTM, () -> {
Session s = remoteFactory.getCurrentSession();
s.getTransaction().begin();
s.get(Customer.class, idContainer.customerId);
s.getTransaction().commit();
s.close();
return null;
});
} catch (Exception e) {
log.error("Failure to get customer", e);
getException.set(e);
}
}, "get-thread");
Thread deleteThread = new Thread(() -> {
try {
withTx(localTM, () -> {
Session s = localFactory.getCurrentSession();
s.getTransaction().begin();
Customer customer = s.get(Customer.class, idContainer.customerId);
s.delete(customer);
s.getTransaction().commit();
return null;
});
} catch (Exception e) {
log.error("Failure to delete customer", e);
deleteException.set(e);
}
}, "delete-thread");
// get thread should block on the beginning of PutFromLoadValidator#acquirePutFromLoadLock
hookInterceptor.block(getPhaser, getThread);
getThread.start();
arriveAndAwait(getPhaser);
deleteThread.start();
deleteThread.join();
hookInterceptor.unblock();
arriveAndAwait(getPhaser);
getThread.join();
if (getException.get() != null) {
throw new IllegalStateException("get-thread failed", getException.get());
}
if (deleteException.get() != null) {
throw new IllegalStateException("delete-thread failed", deleteException.get());
}
Customer localCustomer = getCustomer(idContainer.customerId, localFactory, localTM);
assertNull(localCustomer);
Customer remoteCustomer = getCustomer(idContainer.customerId, remoteFactory, remoteTM);
assertNull(remoteCustomer);
assertTrue(remoteCustomerCache.isEmpty());
}
protected void assertEmptyCaches() {
assertTrue( localCustomerCache.isEmpty() );
assertTrue( localContactCache.isEmpty() );
assertTrue( localCollectionCache.isEmpty() );
assertTrue( remoteCustomerCache.isEmpty() );
assertTrue( remoteContactCache.isEmpty() );
assertTrue( remoteCollectionCache.isEmpty() );
}
private IdContainer createCustomer(SessionFactory sessionFactory, TransactionManager tm)
@ -211,11 +332,17 @@ public class EntityCollectionInvalidationTestCase extends DualNodeTestCase {
}
private Customer doGetCustomer(Integer id, Session session, TransactionManager tm) throws Exception {
Customer customer = (Customer) session.get( Customer.class, id );
Customer customer = session.get( Customer.class, id );
if (customer == null) {
return null;
}
// Access all the contacts
for ( Iterator it = customer.getContacts().iterator(); it.hasNext(); ) {
Set<Contact> contacts = customer.getContacts();
if (contacts != null) {
for (Iterator it = contacts.iterator(); it.hasNext(); ) {
((Contact) it.next()).getName();
}
}
return customer;
}
@ -271,7 +398,10 @@ public class EntityCollectionInvalidationTestCase extends DualNodeTestCase {
c.setContacts( null );
session.delete( c );
}
// since we don't use orphan removal, some contacts may persist
for (Object contact : session.createCriteria(Contact.class).list()) {
session.delete(contact);
}
tm.commit();
}
catch (Exception e) {
@ -313,6 +443,15 @@ public class EntityCollectionInvalidationTestCase extends DualNodeTestCase {
);
}
protected static void arriveAndAwait(Phaser phaser) throws TimeoutException, InterruptedException {
try {
phaser.awaitAdvanceInterruptibly(phaser.arrive(), 10, TimeUnit.SECONDS);
} catch (TimeoutException e) {
log.error("Failed to progress: " + Util.threadDump());
throw e;
}
}
@Listener
public static class MyListener {
private static final Log log = LogFactory.getLog( MyListener.class );
@ -355,4 +494,45 @@ public class EntityCollectionInvalidationTestCase extends DualNodeTestCase {
Set<Integer> contactIds;
}
private static class HookInterceptor extends BaseCustomInterceptor {
final AtomicReference<Exception> failure;
Phaser phaser;
Thread thread;
private HookInterceptor(AtomicReference<Exception> failure) {
this.failure = failure;
}
public synchronized void block(Phaser phaser, Thread thread) {
this.phaser = phaser;
this.thread = thread;
}
public synchronized void unblock() {
phaser = null;
thread = null;
}
@Override
public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable {
try {
Phaser phaser;
Thread thread;
synchronized (this) {
phaser = this.phaser;
thread = this.thread;
}
if (phaser != null && Thread.currentThread() == thread) {
arriveAndAwait(phaser);
arriveAndAwait(phaser);
}
} catch (Exception e) {
failure.set(e);
throw e;
} finally {
return super.visitGetKeyValueCommand(ctx, command);
}
}
}
}