diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/InfinispanRegionFactory.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/InfinispanRegionFactory.java index 509373ce00..9d767b1877 100644 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/InfinispanRegionFactory.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/InfinispanRegionFactory.java @@ -214,6 +214,20 @@ public class InfinispanRegionFactory implements RegionFactory { */ public static final String PENDING_PUTS_CACHE_NAME = "pending-puts"; + /** + * A local, lightweight cache for pending puts, which is + * non-transactional and has aggressive expiration settings. + * Locking is still required since the putFromLoad validator + * code uses conditional operations (i.e. putIfAbsent) + */ + public static final Configuration PENDING_PUTS_CACHE_CONFIGURATION = new ConfigurationBuilder() + .clustering().cacheMode(CacheMode.LOCAL) + .transaction().transactionMode(TransactionMode.NON_TRANSACTIONAL) + .expiration().maxIdle(TimeUnit.SECONDS.toMillis(60)) + .storeAsBinary().enabled(false) + .locking().isolationLevel(IsolationLevel.READ_COMMITTED) + .jmxStatistics().disable().build(); + private EmbeddedCacheManager manager; private final Map typeOverrides = new HashMap(); @@ -345,7 +359,7 @@ public class InfinispanRegionFactory implements RegionFactory { @Override public long nextTimestamp() { - return System.currentTimeMillis() / 100; + return System.currentTimeMillis(); } public void setCacheManager(EmbeddedCacheManager manager) { @@ -374,7 +388,7 @@ public class InfinispanRegionFactory implements RegionFactory { } } defineGenericDataTypeCacheConfigurations( properties ); - definePendingPutsCache(); + manager.defineConfiguration( PENDING_PUTS_CACHE_NAME, PENDING_PUTS_CACHE_CONFIGURATION ); } catch (CacheException ce) { throw ce; @@ -384,22 +398,6 @@ public class InfinispanRegionFactory implements RegionFactory { } } - private void definePendingPutsCache() { - final ConfigurationBuilder builder = new ConfigurationBuilder(); - // A local, lightweight cache for pending puts, which is - // non-transactional and has aggressive expiration settings. - // Locking is still required since the putFromLoad validator - // code uses conditional operations (i.e. putIfAbsent). - builder.clustering().cacheMode( CacheMode.LOCAL ) - .transaction().transactionMode( TransactionMode.NON_TRANSACTIONAL ) - .expiration().maxIdle( TimeUnit.SECONDS.toMillis( 60 ) ) - .storeAsBinary().enabled( false ) - .locking().isolationLevel( IsolationLevel.READ_COMMITTED ) - .jmxStatistics().disable(); - - manager.defineConfiguration( PENDING_PUTS_CACHE_NAME, builder.build() ); - } - protected org.infinispan.transaction.lookup.TransactionManagerLookup createTransactionManagerLookup( SessionFactoryOptions settings, Properties properties) { return new HibernateTransactionManagerLookup( settings, properties ); diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java index 18801f6076..08bf1c42af 100644 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java @@ -6,25 +6,42 @@ */ package org.hibernate.cache.infinispan.access; +import javax.transaction.Status; import javax.transaction.SystemException; import javax.transaction.Transaction; import javax.transaction.TransactionManager; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; import java.util.concurrent.locks.ReentrantLock; import org.hibernate.cache.CacheException; import org.hibernate.cache.infinispan.InfinispanRegionFactory; +import org.hibernate.cache.infinispan.util.CacheCommandInitializer; +import org.hibernate.cache.infinispan.util.EndInvalidationCommand; +import org.hibernate.cache.spi.RegionFactory; import org.infinispan.AdvancedCache; +import org.infinispan.commands.tx.CommitCommand; +import org.infinispan.commands.tx.PrepareCommand; +import org.infinispan.commands.write.InvalidateCommand; +import org.infinispan.commands.write.WriteCommand; import org.infinispan.configuration.cache.Configuration; import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.context.impl.TxInvocationContext; +import org.infinispan.interceptors.EntryWrappingInterceptor; +import org.infinispan.interceptors.base.BaseRpcInterceptor; +import org.infinispan.interceptors.base.CommandInterceptor; import org.infinispan.manager.EmbeddedCacheManager; +import org.infinispan.remoting.inboundhandler.DeliverOrder; +import org.infinispan.remoting.rpc.RpcManager; +import org.infinispan.transaction.TransactionMode; +import org.infinispan.util.concurrent.ConcurrentHashSet; +import org.infinispan.util.logging.Log; +import org.infinispan.util.logging.LogFactory; /** * Encapsulates logic to allow a {@link TransactionalAccessDelegate} to determine @@ -38,9 +55,9 @@ import org.infinispan.manager.EmbeddedCacheManager; * not find data is: *

*

    - *
  1. Call {@link #registerPendingPut(Object)}
  2. + *
  3. Call {@link #registerPendingPut(Object, long)}
  4. *
  5. Read the database
  6. - *
  7. Call {@link #acquirePutFromLoadLock(Object)} + *
  8. Call {@link #acquirePutFromLoadLock(Object, long)} *
  9. if above returns null, the thread should not cache the data; * only if above returns instance of AcquiredLock, put data in the cache and...
  10. *
  11. then call {@link #releasePutFromLoadLock(Object, Lock)}
  12. @@ -54,7 +71,8 @@ import org.infinispan.manager.EmbeddedCacheManager; *

    *

      *
    • {@link #beginInvalidatingKey(Object)} (for a single key invalidation)
    • - *
    • or {@link #invalidateRegion()} (for a general invalidation all pending puts)
    • + *
    • or {@link #beginInvalidatingRegion()} followed by {@link #endInvalidatingRegion()} + * (for a general invalidation all pending puts)
    • *
    * After transaction commit (when the DB is updated) {@link #endInvalidatingKey(Object)} should * be called in order to allow further attempts to cache entry. @@ -62,30 +80,31 @@ import org.infinispan.manager.EmbeddedCacheManager; *

    *

    * This class also supports the concept of "naked puts", which are calls to - * {@link #acquirePutFromLoadLock(Object)} without a preceding {@link #registerPendingPut(Object)}. - * Besides not acquiring lock in {@link #registerPendingPut(Object)} this can happen when collection + * {@link #acquirePutFromLoadLock(Object, long)} without a preceding {@link #registerPendingPut(Object, long)}. + * Besides not acquiring lock in {@link #registerPendingPut(Object, long)} this can happen when collection * elements are loaded after the collection has not been found in the cache, where the elements * don't have their own table but can be listed as 'select ... from Element where collection_id = ...'. + * Naked puts are handled according to txTimestamp obtained by calling {@link RegionFactory#nextTimestamp()} + * before the transaction is started. The timestamp is compared with timestamp of last invalidation end time + * and the write to the cache is denied if it is lower or equal. *

    * * @author Brian Stansberry * @version $Revision: $ */ public class PutFromLoadValidator { - /** - * Period (in ms) after a removal during which a call to - * {@link #acquirePutFromLoadLock(Object)} that hasn't been - * {@link #registerPendingPut(Object) pre-registered} (aka a "naked put") - * will return false. - */ - public static final long NAKED_PUT_INVALIDATION_PERIOD = TimeUnit.SECONDS.toMillis( 20 ); + private static final Log log = LogFactory.getLog(PutFromLoadValidator.class); + private static final boolean trace = log.isTraceEnabled(); /** * Used to determine whether the owner of a pending put is a thread or a transaction */ private final TransactionManager transactionManager; - private final long nakedPutInvalidationPeriod; + /** + * Period after which ongoing invalidation is removed. Value is retrieved from cache configuration. + */ + private final long expirationPeriod; /** * Registry of expected, future, isPutValid calls. If a key+owner is registered in this map, it @@ -94,14 +113,26 @@ public class PutFromLoadValidator { private final ConcurrentMap pendingPuts; /** - * The time of the last call to {@link #invalidateRegion()}, plus NAKED_PUT_INVALIDATION_PERIOD. All naked - * puts will be rejected until the current time is greater than this value. - * NOTE: update only through {@link #invalidationUpdater}! + * Main cache where the entities/collections are stored. This is not modified from within this class. */ - private volatile long invalidationTimestamp = Long.MIN_VALUE; + private final AdvancedCache cache; + + /** + * The time of the last call to {@link #endInvalidatingRegion()}. Puts from transactions started after + * this timestamp are denied. + */ + private volatile long regionInvalidationTimestamp = Long.MIN_VALUE; + + /** + * Number of ongoing concurrent invalidations. + */ + private int regionInvalidations = 0; + + /** + * Transactions that invalidate the region. Entries are removed during next invalidation based on transaction status. + */ + private final ConcurrentHashSet regionInvalidators = new ConcurrentHashSet(); - private static final AtomicLongFieldUpdater invalidationUpdater - = AtomicLongFieldUpdater.newUpdater(PutFromLoadValidator.class, "invalidationTimestamp"); /** * Creates a new put from load validator instance. @@ -110,23 +141,7 @@ public class PutFromLoadValidator { * @param transactionManager Transaction manager */ public PutFromLoadValidator(AdvancedCache cache, TransactionManager transactionManager) { - this( cache, transactionManager, NAKED_PUT_INVALIDATION_PERIOD ); - } - - /** - * Constructor variant for use by unit tests; allows control of various timeouts by the test. - * - * @param cache Cache instance on which to store pending put information. - * @param transactionManager Transaction manager - * @param nakedPutInvalidationPeriod Period (in ms) after a removal during which a call to - * {@link #acquirePutFromLoadLock(Object)} that hasn't been - * {@link #registerPendingPut(Object) pre-registered} (aka a "naked put") - * will return false. - */ - public PutFromLoadValidator( - AdvancedCache cache, TransactionManager transactionManager, - long nakedPutInvalidationPeriod) { - this(cache, cache.getCacheManager(), transactionManager, nakedPutInvalidationPeriod); + this( cache, cache.getCacheManager(), transactionManager); } /** @@ -135,37 +150,68 @@ public class PutFromLoadValidator { * @param cache Cache instance on which to store pending put information. * @param cacheManager where to find a cache to store pending put information * @param tm transaction manager - * @param nakedPutInvalidationPeriod Period (in ms) after a removal during which a call to - * {@link #acquirePutFromLoadLock(Object)} that hasn't been - * {@link #registerPendingPut(Object) pre-registered} (aka a "naked put") - * will return false. */ public PutFromLoadValidator(AdvancedCache cache, - EmbeddedCacheManager cacheManager, - TransactionManager tm, long nakedPutInvalidationPeriod) { + EmbeddedCacheManager cacheManager, TransactionManager tm) { Configuration cacheConfiguration = cache.getCacheConfiguration(); Configuration pendingPutsConfiguration = cacheManager.getCacheConfiguration(InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME); ConfigurationBuilder configurationBuilder = new ConfigurationBuilder(); - if (pendingPutsConfiguration != null) { - configurationBuilder.read(pendingPutsConfiguration); - } + configurationBuilder.read(pendingPutsConfiguration); configurationBuilder.dataContainer().keyEquivalence(cacheConfiguration.dataContainer().keyEquivalence()); String pendingPutsName = cache.getName() + "-" + InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME; cacheManager.defineConfiguration(pendingPutsName, configurationBuilder.build()); + if (pendingPutsConfiguration.expiration() != null && pendingPutsConfiguration.expiration().maxIdle() > 0) { + this.expirationPeriod = pendingPutsConfiguration.expiration().maxIdle(); + } + else { + throw new IllegalArgumentException("Pending puts cache needs to have maxIdle expiration set!"); + } + + // Since we need to intercept both invalidations of entries that are in the cache and those + // that are not, we need to use custom interceptor, not listeners (which fire only for present entries). + if (cacheConfiguration.clustering().cacheMode().isClustered()) { + RpcManager rpcManager = cache.getComponentRegistry().getComponent(RpcManager.class); + CacheCommandInitializer cacheCommandInitializer = cache.getComponentRegistry().getComponent(CacheCommandInitializer.class); + // Note that invalidation does *NOT* acquire locks; therefore, we have to start invalidating before + // wrapping the entry, since if putFromLoad was invoked between wrap and beginInvalidatingKey, the invalidation + // would not commit the entry removal (as during wrap the entry was not in cache) + cache.addInterceptorBefore(new PutFromLoadInterceptor(cache.getName(), rpcManager, cacheCommandInitializer), EntryWrappingInterceptor.class); + cacheCommandInitializer.addPutFromLoadValidator(cache.getName(), this); + } + + this.cache = cache; this.pendingPuts = cacheManager.getCache(pendingPutsName); this.transactionManager = tm; - this.nakedPutInvalidationPeriod = nakedPutInvalidationPeriod; + } + + /** + * This methods should be called only from tests; it removes existing validator from the cache structures + * in order to replace it with new one. + * + * @param cache + */ + public static void removeFromCache(AdvancedCache cache) { + List interceptorChain = cache.getInterceptorChain(); + int index = 0; + for (; index < interceptorChain.size(); ++index) { + if (interceptorChain.get(index).getClass().getName().startsWith(PutFromLoadValidator.class.getName())) { + cache.removeInterceptor(index); + break; + } + } + CacheCommandInitializer cci = cache.getComponentRegistry().getComponent(CacheCommandInitializer.class); + cci.removePutFromLoadValidator(cache.getName()); } // ----------------------------------------------------------------- Public /** - * Marker for lock acquired in {@link #acquirePutFromLoadLock(Object)} + * Marker for lock acquired in {@link #acquirePutFromLoadLock(Object, long)} */ - public static class Lock { - protected Lock() {} + public static abstract class Lock { + private Lock() {} } /** @@ -178,13 +224,16 @@ public class PutFromLoadValidator { * * @param key the key * + * @param txTimestamp * @return AcquiredLock if the lock is acquired and the cache put * can proceed; null if the data should not be cached */ - public Lock acquirePutFromLoadLock(Object key) { + public Lock acquirePutFromLoadLock(Object key, long txTimestamp) { + if (trace) { + log.tracef("acquirePutFromLoadLock(%s#%s, %d)", cache.getName(), key, txTimestamp); + } boolean valid = false; boolean locked = false; - long now = Long.MIN_VALUE; PendingPutMap pending = pendingPuts.get( key ); for (;;) { @@ -197,40 +246,43 @@ public class PutFromLoadValidator { if (toCancel != null) { valid = !toCancel.completed; toCancel.completed = true; - } else { + } + else { // this is a naked put if (pending.hasInvalidator()) { valid = false; - } else { - if (now == Long.MIN_VALUE) { - now = System.currentTimeMillis(); - } - valid = now > pending.nakedPutsDeadline; + } + else { + // if this transaction started after last invalidation we can continue + valid = txTimestamp > pending.lastInvalidationEnd; } } return valid ? pending : null; - } finally { + } + finally { if (!valid) { pending.releaseLock(); locked = false; } + if (trace) { + log.tracef("acquirePutFromLoadLock(%s#%s, %d) ended with %s", cache.getName(), key, txTimestamp, pending); + } + } + } + else { + if (trace) { + log.tracef("acquirePutFromLoadLock(%s#%s, %d) failed to lock", cache.getName(), key, txTimestamp); } - } else { // oops, we have leaked record for this owner, but we don't want to wait here return null; } - } else { - // Key wasn't in pendingPuts, so either this is a "naked put" - // or regionRemoved has been called. Check if we can proceed - long invalidationTimestamp = this.invalidationTimestamp; - if (invalidationTimestamp != Long.MIN_VALUE) { - now = System.currentTimeMillis(); - if (now > invalidationTimestamp) { - // time is +- monotonic se don't let other threads do the expensive currentTimeMillis() - invalidationUpdater.compareAndSet(this, invalidationTimestamp, Long.MIN_VALUE); - } else { - return null; + } + else { + if (txTimestamp <= regionInvalidationTimestamp) { + if (trace) { + log.tracef("acquirePutFromLoadLock(%s#%s, %d) failed due to invalidated region", cache.getName(), key, txTimestamp); } + return null; } PendingPut pendingPut = new PendingPut(getOwnerForPut()); @@ -241,16 +293,19 @@ public class PutFromLoadValidator { } // continue in next loop with lock acquisition } - } catch (Throwable t) { + } + catch (Throwable t) { if (locked) { pending.releaseLock(); } if (t instanceof RuntimeException) { throw (RuntimeException) t; - } else if (t instanceof Error) { + } + else if (t instanceof Error) { throw (Error) t; - } else { + } + else { throw new RuntimeException(t); } } @@ -259,11 +314,14 @@ public class PutFromLoadValidator { /** * Releases the lock previously obtained by a call to - * {@link #acquirePutFromLoadLock(Object)} that returned true. + * {@link #acquirePutFromLoadLock(Object, long)}. * * @param key the key */ public void releasePutFromLoadLock(Object key, Lock lock) { + if (trace) { + log.tracef("releasePutFromLoadLock(%s#%s, %s)", cache.getName(), key, lock); + } final PendingPutMap pending = (PendingPutMap) lock; if ( pending != null ) { if ( pending.canRemove() ) { @@ -274,29 +332,65 @@ public class PutFromLoadValidator { } /** - * Invalidates all {@link #registerPendingPut(Object) previously registered pending puts} ensuring a subsequent call to - * {@link #acquirePutFromLoadLock(Object)} will return false.

    This method will block until any - * concurrent thread that has {@link #acquirePutFromLoadLock(Object) acquired the putFromLoad lock} for the any key has + * Invalidates all {@link #registerPendingPut(Object, long) previously registered pending puts} ensuring a subsequent call to + * {@link #acquirePutFromLoadLock(Object, long)} will return false.

    This method will block until any + * concurrent thread that has {@link #acquirePutFromLoadLock(Object, long) acquired the putFromLoad lock} for the any key has * released the lock. This allows the caller to be certain the putFromLoad will not execute after this method returns, * possibly caching stale data.

    * * @return true if the invalidation was successful; false if a problem occured (which the * caller should treat as an exception condition) */ - public boolean invalidateRegion() { - // TODO: not sure what happens with locks acquired *after* calling this method but before - // the actual invalidation + public boolean beginInvalidatingRegion() { + if (trace) { + log.trace("Started invalidating region " + cache.getName()); + } boolean ok = true; - invalidationUpdater.set(this, System.currentTimeMillis() + nakedPutInvalidationPeriod); - try { + long now = System.currentTimeMillis(); + // deny all puts until endInvalidatingRegion is called; at that time the region should be already + // in INVALID state, therefore all new requests should be blocked and ongoing should fail by timestamp + synchronized (this) { + regionInvalidationTimestamp = Long.MAX_VALUE; + regionInvalidations++; + } + if (transactionManager != null) { + // cleanup old transactions + for (Iterator it = regionInvalidators.iterator(); it.hasNext(); ) { + Transaction tx = it.next(); + try { + switch (tx.getStatus()) { + case Status.STATUS_COMMITTED: + case Status.STATUS_ROLLEDBACK: + case Status.STATUS_UNKNOWN: + case Status.STATUS_NO_TRANSACTION: + it.remove(); + } + } + catch (SystemException e) { + log.error("Cannot retrieve transaction status", e); + } + } + // add this transaction + try { + Transaction tx = transactionManager.getTransaction(); + if (tx != null) { + regionInvalidators.add(tx); + } + } + catch (SystemException e) { + log.error("TransactionManager failed to provide transaction", e); + return false; + } + } + try { // Acquire the lock for each entry to ensure any ongoing // work associated with it is completed before we return - for ( Iterator it = pendingPuts.values().iterator(); it.hasNext(); ) { + for (Iterator it = pendingPuts.values().iterator(); it.hasNext(); ) { PendingPutMap entry = it.next(); - if ( entry.acquireLock( 60, TimeUnit.SECONDS ) ) { + if (entry.acquireLock(60, TimeUnit.SECONDS)) { try { - entry.invalidate(); + entry.invalidate(now, expirationPeriod); } finally { entry.releaseLock(); @@ -307,24 +401,66 @@ public class PutFromLoadValidator { ok = false; } } - } catch (Exception e) { + } + catch (Exception e) { ok = false; } - return ok; } + /** + * Called when the region invalidation is finished. + */ + public void endInvalidatingRegion() { + synchronized (this) { + if (--regionInvalidations == 0) { + regionInvalidationTimestamp = System.currentTimeMillis(); + } + } + if (trace) { + log.trace("Finished invalidating region " + cache.getName()); + } + } + /** * Notifies this validator that it is expected that a database read followed by a subsequent {@link - * #acquirePutFromLoadLock(Object)} call will occur. The intent is this method would be called following a cache miss + * #acquirePutFromLoadLock(Object, long)} call will occur. The intent is this method would be called following a cache miss * wherein it is expected that a database read plus cache put will occur. Calling this method allows the validator to * treat the subsequent acquirePutFromLoadLock as if the database read occurred when this method was * invoked. This allows the validator to compare the timestamp of this call against the timestamp of subsequent removal * notifications. * * @param key key that will be used for subsequent cache put + * @param txTimestamp */ - public void registerPendingPut(Object key) { + public void registerPendingPut(Object key, long txTimestamp) { + long invalidationTimestamp = this.regionInvalidationTimestamp; + if (txTimestamp <= invalidationTimestamp) { + boolean skip; + if (invalidationTimestamp == Long.MAX_VALUE) { + // there is ongoing invalidation of pending puts + skip = true; + } + else { + Transaction tx = null; + if (transactionManager != null) { + try { + tx = transactionManager.getTransaction(); + } + catch (SystemException e) { + log.error("TransactionManager failed to provide transaction", e); + } + } + skip = tx == null || !regionInvalidators.contains(tx); + } + if (skip) { + if (trace) { + log.tracef("registerPendingPut(%s#%s, %d) skipped due to region invalidation (%d)", cache.getName(), key, txTimestamp, invalidationTimestamp); + } + return; + } + } + final PendingPut pendingPut = new PendingPut( getOwnerForPut() ); final PendingPutMap pendingForKey = new PendingPutMap( pendingPut ); @@ -335,21 +471,42 @@ public class PutFromLoadValidator { if ( !existing.hasInvalidator() ) { existing.put(pendingPut); } - } finally { + } + finally { existing.releaseLock(); } + if (trace) { + log.tracef("registerPendingPut(%s#%s, %d) ended with %s", cache.getName(), key, txTimestamp, existing); + } } else { + if (trace) { + log.tracef("registerPendingPut(%s#%s, %d) failed to acquire lock", cache.getName(), key, txTimestamp); + } // Can't get the lock; when we come back we'll be a "naked put" } } + else { + if (trace) { + log.tracef("registerPendingPut(%s#%s, %d) registered using putIfAbsent: %s", cache.getName(), key, txTimestamp, pendingForKey); + } + } } /** - * Invalidates any {@link #registerPendingPut(Object) previously registered pending puts} - * and disables further registrations ensuring a subsequent call to {@link #acquirePutFromLoadLock(Object)} + * Calls {@link #beginInvalidatingKey(Object, Object)} with current transaction or thread. + * @param key + * @return + */ + public boolean beginInvalidatingKey(Object key) { + return beginInvalidatingKey(key, getOwnerForPut()); + } + + /** + * Invalidates any {@link #registerPendingPut(Object, long) previously registered pending puts} + * and disables further registrations ensuring a subsequent call to {@link #acquirePutFromLoadLock(Object, long)} * will return false.

    This method will block until any concurrent thread that has - * {@link #acquirePutFromLoadLock(Object) acquired the putFromLoad lock} for the given key + * {@link #acquirePutFromLoadLock(Object, long) acquired the putFromLoad lock} for the given key * has released the lock. This allows the caller to be certain the putFromLoad will not execute after this method * returns, possibly caching stale data.

    * After this transaction completes, {@link #endInvalidatingKey(Object)} needs to be called } @@ -359,7 +516,7 @@ public class PutFromLoadValidator { * @return true if the invalidation was successful; false if a problem occured (which the * caller should treat as an exception condition) */ - public boolean beginInvalidatingKey(Object key) { + public boolean beginInvalidatingKey(Object key, Object lockOwner) { PendingPutMap pending = new PendingPutMap(null); PendingPutMap prev = pendingPuts.putIfAbsent(key, pending); if (prev != null) { @@ -367,39 +524,68 @@ public class PutFromLoadValidator { } if (pending.acquireLock(60, TimeUnit.SECONDS)) { try { - pending.invalidate(); - pending.addInvalidator(getOwnerForPut(), System.currentTimeMillis() + nakedPutInvalidationPeriod); - } finally { + long now = System.currentTimeMillis(); + pending.invalidate(now, expirationPeriod); + pending.addInvalidator(lockOwner, now, expirationPeriod); + } + finally { pending.releaseLock(); } + if (trace) { + log.tracef("beginInvalidatingKey(%s#%s, %s) ends with %s", cache.getName(), key, lockOwner, pending); + } return true; - } else { + } + else { + log.tracef("beginInvalidatingKey(%s#%s, %s) failed to acquire lock", cache.getName(), key); return false; } } /** - * Called after the transaction completes, allowing caching of entries. It is possible that this method - * is called without previous invocation of {@link #beginInvalidatingKey(Object)}, then it should be noop. - * + * Calls {@link #endInvalidatingKey(Object, Object)} with current transaction or thread. * @param key * @return */ public boolean endInvalidatingKey(Object key) { + return endInvalidatingKey(key, getOwnerForPut()); + } + + /** + * Called after the transaction completes, allowing caching of entries. It is possible that this method + * is called without previous invocation of {@link #beginInvalidatingKey(Object)}, then it should be a no-op. + * + * @param key + * @param lockOwner owner of the invalidation - transaction or thread + * @return + */ + public boolean endInvalidatingKey(Object key, Object lockOwner) { PendingPutMap pending = pendingPuts.get(key); if (pending == null) { + if (trace) { + log.tracef("endInvalidatingKey(%s#%s, %s) could not find pending puts", cache.getName(), key, lockOwner); + } return true; } if (pending.acquireLock(60, TimeUnit.SECONDS)) { try { - pending.removeInvalidator(getOwnerForPut()); + long now = System.currentTimeMillis(); + pending.removeInvalidator(lockOwner, now); // we can't remove the pending put yet because we wait for naked puts // pendingPuts should be configured with maxIdle time so won't have memory leak return true; - } finally { - pending.releaseLock(); } - } else { + finally { + pending.releaseLock(); + if (trace) { + log.tracef("endInvalidatingKey(%s#%s, %s) ends with %s", cache.getName(), key, lockOwner, pending); + } + } + } + else { + if (trace) { + log.tracef("endInvalidatingKey(%s#%s, %s) failed to acquire lock", cache.getName(), key, lockOwner); + } return false; } } @@ -431,14 +617,61 @@ public class PutFromLoadValidator { private PendingPut singlePendingPut; private Map fullMap; private final java.util.concurrent.locks.Lock lock = new ReentrantLock(); - private Object singleInvalidator; - private Set invalidators; - private long nakedPutsDeadline = Long.MIN_VALUE; + private Invalidator singleInvalidator; + private Map invalidators; + private long lastInvalidationEnd = Long.MIN_VALUE; PendingPutMap(PendingPut singleItem) { this.singlePendingPut = singleItem; } + // toString should be called only for debugging purposes + public String toString() { + if (lock.tryLock()) { + try { + StringBuilder sb = new StringBuilder(); + sb.append("{ PendingPuts="); + if (singlePendingPut == null) { + if (fullMap == null) { + sb.append("[]"); + } + else { + sb.append(fullMap.values()); + } + } + else { + sb.append('[').append(singlePendingPut).append(']'); + } + sb.append(", Invalidators="); + if (singleInvalidator == null) { + if (invalidators == null) { + sb.append("[]"); + } + else { + sb.append(invalidators); + } + } + else { + sb.append('[').append(singleInvalidator).append(']'); + } + sb.append(", LastInvalidationEnd="); + if (lastInvalidationEnd == Long.MIN_VALUE) { + sb.append(""); + } + else { + sb.append(lastInvalidationEnd); + } + return sb.append("}").toString(); + } + finally { + lock.unlock(); + } + } + else { + return "PendingPutMap: "; + } + } + public void put(PendingPut pendingPut) { if ( singlePendingPut == null ) { if ( fullMap == null ) { @@ -492,63 +725,167 @@ public class PutFromLoadValidator { lock.unlock(); } - public void invalidate() { + public void invalidate(long now, long expirationPeriod) { if ( singlePendingPut != null ) { - singlePendingPut.completed = true; - // Nullify to avoid leaking completed pending puts - singlePendingPut = null; + if (singlePendingPut.invalidate(now, expirationPeriod)) { + singlePendingPut = null; + } } else if ( fullMap != null ) { - for ( PendingPut pp : fullMap.values() ) { - pp.completed = true; + for ( Iterator it = fullMap.values().iterator(); it.hasNext(); ) { + PendingPut pp = it.next(); + if (pp.invalidate(now, expirationPeriod)) { + it.remove(); + } } - // Nullify to avoid leaking completed pending puts - fullMap = null; } } - public void addInvalidator(Object invalidator, long deadline) { + public void addInvalidator(Object owner, long now, long invalidatorTimeout) { + assert owner != null; if (invalidators == null) { if (singleInvalidator == null) { - singleInvalidator = invalidator; - } else { - invalidators = new HashSet(); - invalidators.add(singleInvalidator); - invalidators.add(invalidator); + singleInvalidator = new Invalidator(owner, now); + } + else { + if (singleInvalidator.registeredTimestamp + invalidatorTimeout < now) { + // remove leaked invalidator + singleInvalidator = new Invalidator(owner, now); + } + invalidators = new HashMap(); + invalidators.put(singleInvalidator.owner, singleInvalidator); + invalidators.put(owner, new Invalidator(owner, now)); singleInvalidator = null; } - } else { - invalidators.add(invalidator); } - nakedPutsDeadline = Math.max(nakedPutsDeadline, deadline); + else { + long allowedRegistration = now - invalidatorTimeout; + // remove leaked invalidators + for (Iterator it = invalidators.values().iterator(); it.hasNext(); ) { + if (it.next().registeredTimestamp < allowedRegistration) { + it.remove(); + } + } + invalidators.put(owner, new Invalidator(owner, now)); + } } public boolean hasInvalidator() { return singleInvalidator != null || (invalidators != null && !invalidators.isEmpty()); } - public void removeInvalidator(Object invalidator) { + public void removeInvalidator(Object owner, long now) { if (invalidators == null) { - if (singleInvalidator != null && singleInvalidator.equals(invalidator)) { + if (singleInvalidator != null && singleInvalidator.owner.equals(owner)) { singleInvalidator = null; } - } else { - invalidators.remove(invalidator); } + else { + invalidators.remove(owner); + } + lastInvalidationEnd = Math.max(lastInvalidationEnd, now); } public boolean canRemove() { - return size() == 0 && !hasInvalidator() && - (nakedPutsDeadline == Long.MIN_VALUE || nakedPutsDeadline < System.currentTimeMillis()); + return size() == 0 && !hasInvalidator() && lastInvalidationEnd == Long.MIN_VALUE; } } private static class PendingPut { private final Object owner; - private volatile boolean completed; + private boolean completed; + // the timestamp is not filled during registration in order to avoid expensive currentTimeMillis() calls + private long registeredTimestamp = Long.MIN_VALUE; private PendingPut(Object owner) { this.owner = owner; } + + public String toString() { + return (completed ? "C@" : "R@") + owner; + } + + public boolean invalidate(long now, long expirationPeriod) { + completed = true; + if (registeredTimestamp == Long.MIN_VALUE) { + registeredTimestamp = now; + } + else if (registeredTimestamp + expirationPeriod < now){ + return true; // this is a leaked pending put + } + return false; + } + } + + private static class Invalidator { + private final Object owner; + private final long registeredTimestamp; + + private Invalidator(Object owner, long registeredTimestamp) { + this.owner = owner; + this.registeredTimestamp = registeredTimestamp; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("{"); + sb.append("Owner=").append(owner); + sb.append(", Timestamp=").append(registeredTimestamp); + sb.append('}'); + return sb.toString(); + } + } + + private class PutFromLoadInterceptor extends BaseRpcInterceptor { + private final String cacheName; + private final RpcManager rpcManager; + private final CacheCommandInitializer cacheCommandInitializer; + + public PutFromLoadInterceptor(String cacheName, RpcManager rpcManager, CacheCommandInitializer cacheCommandInitializer) { + this.cacheName = cacheName; + this.rpcManager = rpcManager; + this.cacheCommandInitializer = cacheCommandInitializer; + } + + // We need to intercept PrepareCommand, not InvalidateCommand since the interception takes + // place before EntryWrappingInterceptor and the PrepareCommand is multiplexed into InvalidateCommands + // as part of EntryWrappingInterceptor + @Override + public Object visitPrepareCommand(TxInvocationContext ctx, PrepareCommand command) throws Throwable { + if (!ctx.isOriginLocal()) { + for (WriteCommand wc : command.getModifications()) { + if (wc instanceof InvalidateCommand) { + // InvalidateCommand does not correctly implement getAffectedKeys() + for (Object key : ((InvalidateCommand) wc).getKeys()) { + beginInvalidatingKey(key, ctx.getLockOwner()); + } + } + else { + for (Object key : wc.getAffectedKeys()) { + beginInvalidatingKey(key, ctx.getLockOwner()); + } + } + } + } + return invokeNextInterceptor(ctx, command); + } + + @Override + public Object visitCommitCommand(TxInvocationContext ctx, CommitCommand command) throws Throwable { + try { + if (ctx.isOriginLocal()) { + // send async Commit + Set affectedKeys = ctx.getAffectedKeys(); + if (!affectedKeys.isEmpty()) { + EndInvalidationCommand commitCommand = cacheCommandInitializer.buildEndInvalidationCommand( + cacheName, affectedKeys.toArray(), ctx.getGlobalTransaction()); + rpcManager.invokeRemotely(null, commitCommand, rpcManager.getDefaultRpcOptions(false, DeliverOrder.NONE)); + } + } + } + finally { + return invokeNextInterceptor(ctx, command); + } + } } } diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TransactionalAccessDelegate.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TransactionalAccessDelegate.java index 3c196784e7..0cc01c4aaa 100755 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TransactionalAccessDelegate.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/TransactionalAccessDelegate.java @@ -61,7 +61,7 @@ public class TransactionalAccessDelegate { } final Object val = cache.get( key ); if ( val == null ) { - putValidator.registerPendingPut( key ); + putValidator.registerPendingPut( key, txTimestamp ); } return val; } @@ -110,7 +110,7 @@ public class TransactionalAccessDelegate { return false; } - PutFromLoadValidator.Lock lock = putValidator.acquirePutFromLoadLock(key); + PutFromLoadValidator.Lock lock = putValidator.acquirePutFromLoadLock(key, txTimestamp); if ( lock == null) { if ( TRACE_ENABLED ) { log.tracef( "Put from load lock not acquired for key %s", key ); @@ -202,10 +202,15 @@ public class TransactionalAccessDelegate { * @throws CacheException if eviction the region fails */ public void removeAll() throws CacheException { - if ( !putValidator.invalidateRegion() ) { - throw new CacheException( "Failed to invalidate pending putFromLoad calls for region " + region.getName() ); + try { + if (!putValidator.beginInvalidatingRegion()) { + throw new CacheException("Failed to invalidate pending putFromLoad calls for region " + region.getName()); + } + Caches.removeAll(cache); + } + finally { + putValidator.endInvalidatingRegion(); } - Caches.removeAll( cache ); } /** @@ -231,13 +236,18 @@ public class TransactionalAccessDelegate { * @throws CacheException if evicting items fails */ public void evictAll() throws CacheException { - if ( !putValidator.invalidateRegion() ) { - throw new CacheException( "Failed to invalidate pending putFromLoad calls for region " + region.getName() ); - } + try { + if (!putValidator.beginInvalidatingRegion()) { + throw new CacheException("Failed to invalidate pending putFromLoad calls for region " + region.getName()); + } - // Invalidate the local region and then go remote - region.invalidateRegion(); - Caches.broadcastEvictAll( cache ); + // Invalidate the local region and then go remote + region.invalidateRegion(); + Caches.broadcastEvictAll(cache); + } + finally { + putValidator.endInvalidatingRegion(); + } } /** diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandFactory.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandFactory.java index c5beedc809..751a1a3285 100644 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandFactory.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandFactory.java @@ -58,6 +58,7 @@ public class CacheCommandFactory implements ExtendedModuleCommandFactory { public Map> getModuleCommands() { final Map> map = new HashMap>( 3 ); map.put( CacheCommandIds.EVICT_ALL, EvictAllCommand.class ); + map.put( CacheCommandIds.END_INVALIDATION, EndInvalidationCommand.class ); return map; } @@ -68,6 +69,9 @@ public class CacheCommandFactory implements ExtendedModuleCommandFactory { case CacheCommandIds.EVICT_ALL: c = new EvictAllCommand( cacheName, allRegions.get( cacheName ) ); break; + case CacheCommandIds.END_INVALIDATION: + c = new EndInvalidationCommand(cacheName); + break; default: throw new IllegalArgumentException( "Not registered to handle command id " + commandId ); } diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandIds.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandIds.java index cdbfd3d303..7eb1df2d0e 100644 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandIds.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandIds.java @@ -14,7 +14,12 @@ package org.hibernate.cache.infinispan.util; */ public interface CacheCommandIds { /** - * The "evict all" command id + * {@link EvictAllCommand} id */ public static final byte EVICT_ALL = 120; + + /** + * {@link EndInvalidationCommand} id + */ + public static final byte END_INVALIDATION = 121; } diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandInitializer.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandInitializer.java index 41b09bbd1c..0cf73c6dc4 100644 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandInitializer.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/util/CacheCommandInitializer.java @@ -6,9 +6,12 @@ */ package org.hibernate.cache.infinispan.util; +import org.hibernate.cache.infinispan.access.PutFromLoadValidator; import org.infinispan.commands.ReplicableCommand; import org.infinispan.commands.module.ModuleCommandInitializer; +import java.util.concurrent.ConcurrentHashMap; + /** * Command initializer * @@ -17,7 +20,22 @@ import org.infinispan.commands.module.ModuleCommandInitializer; */ public class CacheCommandInitializer implements ModuleCommandInitializer { - /** + private final ConcurrentHashMap putFromLoadValidators + = new ConcurrentHashMap(); + + public void addPutFromLoadValidator(String cacheName, PutFromLoadValidator putFromLoadValidator) { + // there could be two instances of PutFromLoadValidator bound to the same cache when + // there are two JndiInfinispanRegionFactories bound to the same cacheManager via JNDI. + // In that case, as putFromLoadValidator does not really own the pendingPuts cache, + // it's safe to have more instances. + putFromLoadValidators.put(cacheName, putFromLoadValidator); + } + + public PutFromLoadValidator removePutFromLoadValidator(String cacheName) { + return putFromLoadValidators.remove(cacheName); + } + + /** * Build an instance of {@link EvictAllCommand} for a given region. * * @param regionName name of region for {@link EvictAllCommand} @@ -31,9 +49,17 @@ public class CacheCommandInitializer implements ModuleCommandInitializer { return new EvictAllCommand( regionName ); } - @Override - public void initializeReplicableCommand(ReplicableCommand c, boolean isRemote) { - // No need to initialize... + public EndInvalidationCommand buildEndInvalidationCommand(String cacheName, Object[] keys, Object lockOwner) { + return new EndInvalidationCommand( cacheName, keys, lockOwner ); } + @Override + public void initializeReplicableCommand(ReplicableCommand c, boolean isRemote) { + switch (c.getCommandId()) { + case CacheCommandIds.END_INVALIDATION: + EndInvalidationCommand endInvalidationCommand = (EndInvalidationCommand) c; + endInvalidationCommand.setPutFromLoadValidator(putFromLoadValidators.get(endInvalidationCommand.getCacheName())); + break; + } + } } diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/util/EndInvalidationCommand.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/util/EndInvalidationCommand.java new file mode 100644 index 0000000000..8fd753d415 --- /dev/null +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/util/EndInvalidationCommand.java @@ -0,0 +1,86 @@ +/* + * Hibernate, Relational Persistence for Idiomatic Java + * + * License: GNU Lesser General Public License (LGPL), version 2.1 or later. + * See the lgpl.txt file in the root directory or . + */ +package org.hibernate.cache.infinispan.util; + +import org.hibernate.cache.infinispan.access.PutFromLoadValidator; +import org.infinispan.commands.remote.BaseRpcCommand; +import org.infinispan.context.InvocationContext; + +import java.util.Arrays; + +/** + * Sent in commit phase (after DB commit) to remote nodes in order to stop invalidating + * putFromLoads. + * + * @author Radim Vansa <rvansa@redhat.com> + */ +public class EndInvalidationCommand extends BaseRpcCommand { + private Object[] keys; + private Object lockOwner; + private PutFromLoadValidator putFromLoadValidator; + + public EndInvalidationCommand(String cacheName) { + this(cacheName, null, null); + } + + /** + * @param cacheName name of the cache to evict + */ + public EndInvalidationCommand(String cacheName, Object[] keys, Object lockOwner) { + super(cacheName); + this.keys = keys; + this.lockOwner = lockOwner; + } + + @Override + public Object perform(InvocationContext ctx) throws Throwable { + for (Object key : keys) { + putFromLoadValidator.endInvalidatingKey(key, lockOwner); + } + return null; + } + + @Override + public byte getCommandId() { + return CacheCommandIds.END_INVALIDATION; + } + + @Override + public Object[] getParameters() { + return new Object[] { keys, lockOwner }; + } + + @Override + public void setParameters(int commandId, Object[] parameters) { + keys = (Object[]) parameters[0]; + lockOwner = parameters[1]; + } + + @Override + public boolean isReturnValueExpected() { + return false; + } + + @Override + public boolean canBlock() { + return true; + } + + public void setPutFromLoadValidator(PutFromLoadValidator putFromLoadValidator) { + this.putFromLoadValidator = putFromLoadValidator; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder("EndInvalidationCommand{"); + sb.append("cacheName=").append(cacheName); + sb.append(", keys=").append(Arrays.toString(keys)); + sb.append(", lockOwner=").append(lockOwner); + sb.append('}'); + return sb.toString(); + } +} diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/AbstractGeneralDataRegionTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/AbstractGeneralDataRegionTestCase.java index 9f57e7de74..e118f0327d 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/AbstractGeneralDataRegionTestCase.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/AbstractGeneralDataRegionTestCase.java @@ -164,9 +164,9 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm })); } - protected void regionEvict(GeneralDataRegion region) throws Exception { + protected void regionEvict(GeneralDataRegion region) throws Exception { region.evict(KEY); - } + } protected abstract String getStandardRegionName(String regionPrefix); diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java index 1ca0070aad..0511ab8760 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java @@ -17,9 +17,11 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.hibernate.cache.infinispan.InfinispanRegionFactory; import org.hibernate.cache.infinispan.access.PutFromLoadValidator; import org.hibernate.test.cache.infinispan.functional.cluster.DualNodeJtaTransactionManagerImpl; import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup; +import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.test.CacheManagerCallable; import org.infinispan.test.fwk.TestCacheManagerFactory; import org.infinispan.util.logging.Log; @@ -70,7 +72,14 @@ public class PutFromLoadValidatorUnitTestCase { finally { DualNodeJtaTransactionManagerImpl.cleanupTransactionManagers(); } - } + } + + private static EmbeddedCacheManager createCacheManager() { + EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createCacheManager(false); + cacheManager.defineConfiguration(InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME, + InfinispanRegionFactory.PENDING_PUTS_CACHE_CONFIGURATION); + return cacheManager; + } @Test public void testNakedPut() throws Exception { @@ -82,12 +91,11 @@ public class PutFromLoadValidatorUnitTestCase { } private void nakedPutTest(final boolean transactional) throws Exception { - withCacheManager(new CacheManagerCallable( - TestCacheManagerFactory.createCacheManager(false)) { + withCacheManager(new CacheManagerCallable(createCacheManager()) { @Override public void call() { PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm, - transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD); + transactional ? tm : null); exec(transactional, new NakedPut(testee, true)); } }); @@ -103,12 +111,11 @@ public class PutFromLoadValidatorUnitTestCase { } private void registeredPutTest(final boolean transactional) throws Exception { - withCacheManager(new CacheManagerCallable( - TestCacheManagerFactory.createCacheManager(false)) { + withCacheManager(new CacheManagerCallable(createCacheManager()) { @Override public void call() { PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm, - transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD); + transactional ? tm : null); exec(transactional, new RegularPut(testee)); } }); @@ -133,14 +140,14 @@ public class PutFromLoadValidatorUnitTestCase { private void nakedPutAfterRemovalTest(final boolean transactional, final boolean removeRegion) throws Exception { - withCacheManager(new CacheManagerCallable( - TestCacheManagerFactory.createCacheManager(false)) { + withCacheManager(new CacheManagerCallable(createCacheManager()) { @Override public void call() { PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm, - transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD); + transactional ? tm : null); Invalidation invalidation = new Invalidation(testee, removeRegion); - NakedPut nakedPut = new NakedPut(testee, false); + // the naked put can succeed because it has txTimestamp after invalidation + NakedPut nakedPut = new NakedPut(testee, true); exec(transactional, invalidation, nakedPut); } }); @@ -166,12 +173,11 @@ public class PutFromLoadValidatorUnitTestCase { private void registeredPutAfterRemovalTest(final boolean transactional, final boolean removeRegion) throws Exception { - withCacheManager(new CacheManagerCallable( - TestCacheManagerFactory.createCacheManager(false)) { + withCacheManager(new CacheManagerCallable(createCacheManager()) { @Override public void call() { PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm, - transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD); + transactional ? tm : null); Invalidation invalidation = new Invalidation(testee, removeRegion); RegularPut regularPut = new RegularPut(testee); exec(transactional, invalidation, regularPut); @@ -199,24 +205,24 @@ public class PutFromLoadValidatorUnitTestCase { private void registeredPutWithInterveningRemovalTest( final boolean transactional, final boolean removeRegion) throws Exception { - withCacheManager(new CacheManagerCallable( - TestCacheManagerFactory.createCacheManager(false)) { + withCacheManager(new CacheManagerCallable(createCacheManager()) { @Override public void call() { PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm, - transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD); + transactional ? tm : null); try { + long txTimestamp = System.currentTimeMillis(); if (transactional) { tm.begin(); } - testee.registerPendingPut(KEY1); + testee.registerPendingPut(KEY1, txTimestamp); if (removeRegion) { - testee.invalidateRegion(); + testee.beginInvalidatingRegion(); } else { testee.beginInvalidatingKey(KEY1); } - PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1); + PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1, txTimestamp); try { assertNull(lock); } @@ -224,59 +230,10 @@ public class PutFromLoadValidatorUnitTestCase { if (lock != null) { testee.releasePutFromLoadLock(KEY1, lock); } - testee.endInvalidatingKey(KEY1); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - } - }); - } - - @Test - public void testDelayedNakedPutAfterKeyRemoval() throws Exception { - delayedNakedPutAfterRemovalTest(false, false); - } - @Test - public void testDelayedNakedPutAfterKeyRemovalTransactional() throws Exception { - delayedNakedPutAfterRemovalTest(true, false); - } - @Test - public void testDelayedNakedPutAfterRegionRemoval() throws Exception { - delayedNakedPutAfterRemovalTest(false, true); - } - @Test - public void testDelayedNakedPutAfterRegionRemovalTransactional() throws Exception { - delayedNakedPutAfterRemovalTest(true, true); - } - - private void delayedNakedPutAfterRemovalTest( - final boolean transactional, final boolean removeRegion) - throws Exception { - withCacheManager(new CacheManagerCallable( - TestCacheManagerFactory.createCacheManager(false)) { - @Override - public void call() { - PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm, - transactional ? tm : null, 100); - if (removeRegion) { - testee.invalidateRegion(); - } else { - testee.beginInvalidatingKey(KEY1); - testee.endInvalidatingKey(KEY1); - } - try { - if (transactional) { - tm.begin(); - } - Thread.sleep(110); - - PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1); - try { - assertNotNull(lock); - } finally { - if (lock != null) { - testee.releasePutFromLoadLock(KEY1, null); + if (removeRegion) { + testee.endInvalidatingRegion(); + } else { + testee.endInvalidatingKey(KEY1); } } } catch (Exception e) { @@ -297,13 +254,11 @@ public class PutFromLoadValidatorUnitTestCase { } private void multipleRegistrationtest(final boolean transactional) throws Exception { - withCacheManager(new CacheManagerCallable( - TestCacheManagerFactory.createCacheManager(false)) { + withCacheManager(new CacheManagerCallable(createCacheManager()) { @Override public void call() { final PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm, - transactional ? tm : null, - PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD); + transactional ? tm : null); final CountDownLatch registeredLatch = new CountDownLatch(3); final CountDownLatch finishedLatch = new CountDownLatch(3); @@ -312,13 +267,14 @@ public class PutFromLoadValidatorUnitTestCase { Runnable r = new Runnable() { public void run() { try { + long txTimestamp = System.currentTimeMillis(); if (transactional) { tm.begin(); } - testee.registerPendingPut(KEY1); + testee.registerPendingPut(KEY1, txTimestamp); registeredLatch.countDown(); registeredLatch.await(5, TimeUnit.SECONDS); - PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1); + PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1, txTimestamp); if (lock != null) { try { log.trace("Put from load lock acquired for key = " + KEY1); @@ -341,7 +297,13 @@ public class PutFromLoadValidatorUnitTestCase { // Start with a removal so the "isPutValid" calls will fail if // any of the concurrent activity isn't handled properly - testee.invalidateRegion(); + testee.beginInvalidatingRegion(); + testee.endInvalidatingRegion(); + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } // Do the registration + isPutValid calls executor.execute(r); @@ -370,20 +332,20 @@ public class PutFromLoadValidatorUnitTestCase { } private void invalidationBlocksForInProgressPutTest(final boolean keyOnly) throws Exception { - withCacheManager(new CacheManagerCallable( - TestCacheManagerFactory.createCacheManager(false)) { + withCacheManager(new CacheManagerCallable(createCacheManager()) { @Override public void call() { final PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), - cm, null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD); + cm, null); final CountDownLatch removeLatch = new CountDownLatch(1); final CountDownLatch pferLatch = new CountDownLatch(1); final AtomicReference cache = new AtomicReference("INITIAL"); Callable pferCallable = new Callable() { public Boolean call() throws Exception { - testee.registerPendingPut(KEY1); - PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1); + long txTimestamp = System.currentTimeMillis(); + testee.registerPendingPut(KEY1, txTimestamp); + PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1, txTimestamp); if (lock != null) { try { removeLatch.countDown(); @@ -405,7 +367,7 @@ public class PutFromLoadValidatorUnitTestCase { if (keyOnly) { testee.beginInvalidatingKey(KEY1); } else { - testee.invalidateRegion(); + testee.beginInvalidatingRegion(); } cache.set(null); return null; @@ -466,14 +428,18 @@ public class PutFromLoadValidatorUnitTestCase { @Override public Void call() throws Exception { if (removeRegion) { - boolean success = putFromLoadValidator.invalidateRegion(); + boolean success = putFromLoadValidator.beginInvalidatingRegion(); assertTrue(success); + putFromLoadValidator.endInvalidatingRegion();; } else { boolean success = putFromLoadValidator.beginInvalidatingKey(KEY1); assertTrue(success); success = putFromLoadValidator.endInvalidatingKey(KEY1); assertTrue(success); } + // if we go for the timestamp-based approach, invalidation in the same millisecond + // as the registerPendingPut/acquirePutFromLoad lock results in failure. + Thread.sleep(10); return null; } } @@ -488,9 +454,10 @@ public class PutFromLoadValidatorUnitTestCase { @Override public Void call() throws Exception { try { - putFromLoadValidator.registerPendingPut(KEY1); + long txTimestamp = System.currentTimeMillis(); // this should be acquired before UserTransaction.begin() + putFromLoadValidator.registerPendingPut(KEY1, txTimestamp); - PutFromLoadValidator.Lock lock = putFromLoadValidator.acquirePutFromLoadLock(KEY1); + PutFromLoadValidator.Lock lock = putFromLoadValidator.acquirePutFromLoadLock(KEY1, txTimestamp); try { assertNotNull(lock); } finally { @@ -517,7 +484,8 @@ public class PutFromLoadValidatorUnitTestCase { @Override public Void call() throws Exception { try { - PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1); + long txTimestamp = System.currentTimeMillis(); // this should be acquired before UserTransaction.begin() + PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(KEY1, txTimestamp); try { if (expectSuccess) { assertNotNull(lock); diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/collection/AbstractCollectionRegionAccessStrategyTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/collection/AbstractCollectionRegionAccessStrategyTestCase.java index 4341a82bd5..9326633152 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/collection/AbstractCollectionRegionAccessStrategyTestCase.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/collection/AbstractCollectionRegionAccessStrategyTestCase.java @@ -7,6 +7,7 @@ package org.hibernate.test.cache.infinispan.collection; import javax.transaction.TransactionManager; + import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -31,6 +32,8 @@ import org.hibernate.test.cache.infinispan.NodeEnvironment; import org.hibernate.test.cache.infinispan.util.CacheTestUtil; import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup; import org.hibernate.test.cache.infinispan.util.TestingKeyFactory; +import org.infinispan.AdvancedCache; +import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.test.CacheManagerCallable; import org.infinispan.test.fwk.TestCacheManagerFactory; import org.infinispan.transaction.tm.BatchModeTransactionManager; @@ -155,29 +158,10 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs final CountDownLatch pferLatch = new CountDownLatch( 1 ); final CountDownLatch removeLatch = new CountDownLatch( 1 ); final TransactionManager remoteTm = remoteCollectionRegion.getTransactionManager(); - withCacheManager(new CacheManagerCallable(TestCacheManagerFactory.createCacheManager(false)) { + withCacheManager(new CacheManagerCallable(createCacheManager()) { @Override public void call() { - PutFromLoadValidator validator = new PutFromLoadValidator(remoteCollectionRegion.getCache(), cm, - remoteTm, 20000) { - @Override - public Lock acquirePutFromLoadLock(Object key) { - Lock lock = super.acquirePutFromLoadLock( key ); - try { - removeLatch.countDown(); - pferLatch.await( 2, TimeUnit.SECONDS ); - } - catch (InterruptedException e) { - log.debug( "Interrupted" ); - Thread.currentThread().interrupt(); - } - catch (Exception e) { - log.error( "Error", e ); - throw new RuntimeException( "Error", e ); - } - return lock; - } - }; + PutFromLoadValidator validator = getPutFromLoadValidator(remoteCollectionRegion.getCache(), cm, remoteTm, removeLatch, pferLatch); final TransactionalAccessDelegate delegate = new TransactionalAccessDelegate(localCollectionRegion, validator); @@ -218,7 +202,40 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs assertFalse(localCollectionRegion.getCache().containsKey("k1")); } - }); + }); + } + + private static EmbeddedCacheManager createCacheManager() { + EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createCacheManager(false); + cacheManager.defineConfiguration(InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME, + InfinispanRegionFactory.PENDING_PUTS_CACHE_CONFIGURATION); + return cacheManager; + } + + protected PutFromLoadValidator getPutFromLoadValidator(AdvancedCache cache, EmbeddedCacheManager cm, + TransactionManager tm, + CountDownLatch removeLatch, CountDownLatch pferLatch) { + // remove the interceptor inserted by default PutFromLoadValidator, we're using different one + PutFromLoadValidator.removeFromCache(cache); + return new PutFromLoadValidator(cache, cm, tm) { + @Override + public Lock acquirePutFromLoadLock(Object key, long txTimestamp) { + Lock lock = super.acquirePutFromLoadLock( key, txTimestamp); + try { + removeLatch.countDown(); + pferLatch.await( 2, TimeUnit.SECONDS ); + } + catch (InterruptedException e) { + log.debug( "Interrupted" ); + Thread.currentThread().interrupt(); + } + catch (Exception e) { + log.error( "Error", e ); + throw new RuntimeException( "Error", e ); + } + return lock; + } + }; } @Test @@ -455,6 +472,9 @@ public abstract class AbstractCollectionRegionAccessStrategyTestCase extends Abs assertEquals( 0, remoteCollectionRegion.getCache().size() ); + // Wait for async propagation of EndInvalidationCommand + sleep( 250 ); + // Test whether the get above messes up the optimistic version remoteAccessStrategy.putFromLoad(null, KEY, VALUE1, System.currentTimeMillis(), new Integer( 1 ) ); assertEquals( VALUE1, remoteAccessStrategy.get(null, KEY, System.currentTimeMillis() ) ); diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/entity/AbstractEntityRegionAccessStrategyTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/entity/AbstractEntityRegionAccessStrategyTestCase.java index 790e2c99ab..dd11a9275f 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/entity/AbstractEntityRegionAccessStrategyTestCase.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/entity/AbstractEntityRegionAccessStrategyTestCase.java @@ -289,8 +289,8 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac assertEquals("Correct node1 value", VALUE2, localAccessStrategy.get(null, KEY, txTimestamp)); if (isUsingInvalidation()) { - // no data version to prevent the PFER; we count on db locks preventing this - assertEquals("Expected node2 value", VALUE1, remoteAccessStrategy.get(null, KEY, txTimestamp)); + // invalidation command invalidates pending put + assertEquals("Expected node2 value", null, remoteAccessStrategy.get(null, KEY, txTimestamp)); } else { // The node1 update is replicated, preventing the node2 PFER assertEquals("Correct node2 value", VALUE2, remoteAccessStrategy.get(null, KEY, txTimestamp)); @@ -571,9 +571,12 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac // Re-establishing the region root on the local node doesn't // propagate it to other nodes. Do a get on the remote node to re-establish - assertEquals(null, remoteAccessStrategy.get(null, KEY, System.currentTimeMillis())); + assertNull(remoteAccessStrategy.get(null, KEY, System.currentTimeMillis())); assertEquals(0, remoteEntityRegion.getCache().size()); + // Wait for async propagation of EndInvalidationCommand before executing naked put + sleep(250); + // Test whether the get above messes up the optimistic version remoteAccessStrategy.putFromLoad(null, KEY, VALUE1, System.currentTimeMillis(), new Integer(1)); assertEquals(VALUE1, remoteAccessStrategy.get(null, KEY, System.currentTimeMillis())); diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/AbstractFunctionalTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/AbstractFunctionalTestCase.java index c21b493d93..ed9436e10e 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/AbstractFunctionalTestCase.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/AbstractFunctionalTestCase.java @@ -117,6 +117,7 @@ public abstract class AbstractFunctionalTestCase extends SingleNodeTestCase { log.info("Entry persisted, let's load and delete it."); cleanupCache(); + Thread.sleep(10); withTx(tm, new Callable() { @Override diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/BasicTransactionalTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/BasicTransactionalTestCase.java index 6fca629277..dfba841191 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/BasicTransactionalTestCase.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/BasicTransactionalTestCase.java @@ -861,6 +861,7 @@ public class BasicTransactionalTestCase extends AbstractFunctionalTestCase { // Clear the cache before the transaction begins BasicTransactionalTestCase.this.cleanupCache(); + Thread.sleep(10); withTx(tm, new Callable() { @Override @@ -951,6 +952,7 @@ public class BasicTransactionalTestCase extends AbstractFunctionalTestCase { // TODO: Clear caches manually via cache manager (it's faster!!) this.cleanupCache(); + Thread.sleep(10); stats.setStatisticsEnabled( true ); stats.clear(); @@ -1028,6 +1030,7 @@ public class BasicTransactionalTestCase extends AbstractFunctionalTestCase { assertEquals(2, slcStats.getPutCount()); cache.evictEntityRegions(); + Thread.sleep(10); assertEquals(0, slcStats.getElementCountInMemory()); assertFalse("2lc entity cache is expected to not contain Citizen id = " + citizens.get(0).getId(), @@ -1052,6 +1055,46 @@ public class BasicTransactionalTestCase extends AbstractFunctionalTestCase { }); } + @Test + public void testMultipleEvictAll() throws Exception { + final List citizens = saveSomeCitizens(); + + withTx(tm, new Callable() { + @Override + public Void call() throws Exception { + Session s = openSession(); + Transaction tx = s.beginTransaction(); + Cache cache = s.getSessionFactory().getCache(); + + cache.evictEntityRegions(); + cache.evictEntityRegions(); + + // cleanup + tx.commit(); + s.close(); + return null; + } + }); + withTx(tm, new Callable() { + @Override + public Void call() throws Exception { + Session s = openSession(); + Transaction tx = s.beginTransaction(); + Cache cache = s.getSessionFactory().getCache(); + + cache.evictEntityRegions(); + + s.delete(s.load(Citizen.class, citizens.get(0).getId())); + s.delete(s.load(Citizen.class, citizens.get(1).getId())); + + // cleanup + tx.commit(); + s.close(); + return null; + } + }); + } + private List saveSomeCitizens() throws Exception { final Citizen c1 = new Citizen(); c1.setFirstname( "Emmanuel" ); diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeJtaPlatformImpl.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeJtaPlatformImpl.java index bad1fa55aa..a15121de18 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeJtaPlatformImpl.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeJtaPlatformImpl.java @@ -25,6 +25,9 @@ import org.hibernate.service.spi.Configurable; public class DualNodeJtaPlatformImpl implements JtaPlatform, Configurable { private String nodeId; + public DualNodeJtaPlatformImpl() { + } + @Override public void configure(Map configurationValues) { nodeId = (String) configurationValues.get( DualNodeTestCase.NODE_ID_PROP ); diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeTestCase.java index 3a5d8e2968..886e240426 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeTestCase.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeTestCase.java @@ -20,11 +20,10 @@ import org.hibernate.resource.transaction.backend.jta.internal.JtaTransactionCoo import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup; import org.hibernate.testing.junit4.BaseNonConfigCoreFunctionalTestCase; -import org.junit.After; -import org.junit.Before; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; +import org.junit.Before; import org.junit.ClassRule; /** @@ -78,18 +77,20 @@ public abstract class DualNodeTestCase extends BaseNonConfigCoreFunctionalTestCa DualNodeJtaTransactionManagerImpl.cleanupTransactionManagers(); } - @Before - public void prepare() throws Exception { + @Override + public void startUp() { + super.startUp(); // In some cases tests are multi-threaded, so they have to join the group infinispanTestIdentifier.joinContext(); secondNodeEnvironment = new SecondNodeEnvironment(); } - @After - public void unPrepare() { + @Override + public void shutDown() { if ( secondNodeEnvironment != null ) { secondNodeEnvironment.shutDown(); } + super.shutDown(); } protected SecondNodeEnvironment secondNodeEnvironment() { diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/EntityCollectionInvalidationTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/EntityCollectionInvalidationTestCase.java index 2e92009fe0..64a22cf7b1 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/EntityCollectionInvalidationTestCase.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/EntityCollectionInvalidationTestCase.java @@ -10,13 +10,24 @@ import javax.transaction.TransactionManager; import java.util.HashSet; import java.util.Iterator; import java.util.Set; +import java.util.concurrent.Phaser; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import org.hibernate.Session; import org.hibernate.SessionFactory; +import org.hibernate.cache.infinispan.InfinispanRegionFactory; import org.hibernate.test.cache.infinispan.functional.Contact; import org.hibernate.test.cache.infinispan.functional.Customer; +import org.hibernate.testing.TestForIssue; +import org.infinispan.AdvancedCache; import org.infinispan.Cache; -import org.infinispan.manager.CacheContainer; +import org.infinispan.commands.read.GetKeyValueCommand; +import org.infinispan.commons.util.Util; +import org.infinispan.context.InvocationContext; +import org.infinispan.interceptors.base.BaseCustomInterceptor; +import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.notifications.Listener; import org.infinispan.notifications.cachelistener.annotation.CacheEntryVisited; import org.infinispan.notifications.cachelistener.event.CacheEntryVisitedEvent; @@ -25,7 +36,9 @@ import org.infinispan.util.logging.LogFactory; import org.jboss.util.collection.ConcurrentSet; import org.junit.Test; +import static org.infinispan.test.TestingUtil.withTx; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; /** @@ -42,96 +55,204 @@ public class EntityCollectionInvalidationTestCase extends DualNodeTestCase { static int test = 0; - @Test - public void testAll() throws Exception { - log.info( "*** testAll()" ); + private EmbeddedCacheManager localManager, remoteManager; + private Cache localCustomerCache, remoteCustomerCache; + private Cache localContactCache, remoteContactCache; + private Cache localCollectionCache, remoteCollectionCache; + private MyListener localListener, remoteListener; + private TransactionManager localTM, remoteTM; + private SessionFactory localFactory, remoteFactory; + @Override + public void startUp() { + super.startUp(); // Bind a listener to the "local" cache // Our region factory makes its CacheManager available to us - CacheContainer localManager = ClusterAwareRegionFactory.getCacheManager( DualNodeTestCase.LOCAL ); + localManager = ClusterAwareRegionFactory.getCacheManager( DualNodeTestCase.LOCAL ); // Cache localCache = localManager.getCache("entity"); - Cache localCustomerCache = localManager.getCache( Customer.class.getName() ); - Cache localContactCache = localManager.getCache( Contact.class.getName() ); - Cache localCollectionCache = localManager.getCache( Customer.class.getName() + ".contacts" ); - MyListener localListener = new MyListener( "local" ); + localCustomerCache = localManager.getCache( Customer.class.getName() ); + localContactCache = localManager.getCache( Contact.class.getName() ); + localCollectionCache = localManager.getCache( Customer.class.getName() + ".contacts" ); + localListener = new MyListener( "local" ); localCustomerCache.addListener( localListener ); localContactCache.addListener( localListener ); localCollectionCache.addListener( localListener ); - TransactionManager localTM = DualNodeJtaTransactionManagerImpl.getInstance( DualNodeTestCase.LOCAL ); // Bind a listener to the "remote" cache - CacheContainer remoteManager = ClusterAwareRegionFactory.getCacheManager( DualNodeTestCase.REMOTE ); - Cache remoteCustomerCache = remoteManager.getCache( Customer.class.getName() ); - Cache remoteContactCache = remoteManager.getCache( Contact.class.getName() ); - Cache remoteCollectionCache = remoteManager.getCache( Customer.class.getName() + ".contacts" ); - MyListener remoteListener = new MyListener( "remote" ); + remoteManager = ClusterAwareRegionFactory.getCacheManager( DualNodeTestCase.REMOTE ); + remoteCustomerCache = remoteManager.getCache( Customer.class.getName() ); + remoteContactCache = remoteManager.getCache( Contact.class.getName() ); + remoteCollectionCache = remoteManager.getCache( Customer.class.getName() + ".contacts" ); + remoteListener = new MyListener( "remote" ); remoteCustomerCache.addListener( remoteListener ); remoteContactCache.addListener( remoteListener ); remoteCollectionCache.addListener( remoteListener ); - TransactionManager remoteTM = DualNodeJtaTransactionManagerImpl.getInstance( DualNodeTestCase.REMOTE ); - SessionFactory localFactory = sessionFactory(); - SessionFactory remoteFactory = secondNodeEnvironment().getSessionFactory(); + localFactory = sessionFactory(); + remoteFactory = secondNodeEnvironment().getSessionFactory(); - try { - assertTrue( remoteListener.isEmpty() ); - assertTrue( localListener.isEmpty() ); + localTM = DualNodeJtaTransactionManagerImpl.getInstance( DualNodeTestCase.LOCAL ); + remoteTM = DualNodeJtaTransactionManagerImpl.getInstance( DualNodeTestCase.REMOTE ); + } - log.debug( "Create node 0" ); - IdContainer ids = createCustomer( localFactory, localTM ); + @Override + public void shutDown() { + cleanupTransactionManagement(); + } - assertTrue( remoteListener.isEmpty() ); - assertTrue( localListener.isEmpty() ); + @Override + protected void cleanupTest() throws Exception { + cleanup(localFactory, localTM); + localListener.clear(); + remoteListener.clear(); + // do not call super.cleanupTest becasue we would clean transaction managers + } - // Sleep a bit to let async commit propagate. Really just to - // help keep the logs organized for debugging any issues - sleep( SLEEP_TIME ); + @Test + public void testAll() throws Exception { + assertEmptyCaches(); + assertTrue( remoteListener.isEmpty() ); + assertTrue( localListener.isEmpty() ); - log.debug( "Find node 0" ); - // This actually brings the collection into the cache - getCustomer( ids.customerId, localFactory, localTM ); + log.debug( "Create node 0" ); + IdContainer ids = createCustomer( localFactory, localTM ); - sleep( SLEEP_TIME ); + assertTrue( remoteListener.isEmpty() ); + assertTrue( localListener.isEmpty() ); - // Now the collection is in the cache so, the 2nd "get" - // should read everything from the cache - log.debug( "Find(2) node 0" ); - localListener.clear(); - getCustomer( ids.customerId, localFactory, localTM ); + // Sleep a bit to let async commit propagate. Really just to + // help keep the logs organized for debugging any issues + sleep( SLEEP_TIME ); - // Check the read came from the cache - log.debug( "Check cache 0" ); - assertLoadedFromCache( localListener, ids.customerId, ids.contactIds ); + log.debug( "Find node 0" ); + // This actually brings the collection into the cache + getCustomer( ids.customerId, localFactory, localTM ); - log.debug( "Find node 1" ); - // This actually brings the collection into the cache since invalidation is in use - getCustomer( ids.customerId, remoteFactory, remoteTM ); + sleep( SLEEP_TIME ); - // Now the collection is in the cache so, the 2nd "get" - // should read everything from the cache - log.debug( "Find(2) node 1" ); - remoteListener.clear(); - getCustomer( ids.customerId, remoteFactory, remoteTM ); + // Now the collection is in the cache so, the 2nd "get" + // should read everything from the cache + log.debug( "Find(2) node 0" ); + localListener.clear(); + getCustomer( ids.customerId, localFactory, localTM ); - // Check the read came from the cache - log.debug( "Check cache 1" ); - assertLoadedFromCache( remoteListener, ids.customerId, ids.contactIds ); + // Check the read came from the cache + log.debug( "Check cache 0" ); + assertLoadedFromCache( localListener, ids.customerId, ids.contactIds ); - // Modify customer in remote - remoteListener.clear(); - ids = modifyCustomer( ids.customerId, remoteFactory, remoteTM ); - sleep( 250 ); - assertLoadedFromCache( remoteListener, ids.customerId, ids.contactIds ); + log.debug( "Find node 1" ); + // This actually brings the collection into the cache since invalidation is in use + getCustomer( ids.customerId, remoteFactory, remoteTM ); - // After modification, local cache should have been invalidated and hence should be empty - assertEquals( 0, localCollectionCache.size() ); - assertEquals( 0, localCustomerCache.size() ); + // Now the collection is in the cache so, the 2nd "get" + // should read everything from the cache + log.debug( "Find(2) node 1" ); + remoteListener.clear(); + getCustomer( ids.customerId, remoteFactory, remoteTM ); + + // Check the read came from the cache + log.debug( "Check cache 1" ); + assertLoadedFromCache( remoteListener, ids.customerId, ids.contactIds ); + + // Modify customer in remote + remoteListener.clear(); + ids = modifyCustomer( ids.customerId, remoteFactory, remoteTM ); + sleep( 250 ); + assertLoadedFromCache( remoteListener, ids.customerId, ids.contactIds ); + + // After modification, local cache should have been invalidated and hence should be empty + assertEquals( 0, localCollectionCache.size() ); + assertEquals( 0, localCustomerCache.size() ); + } + + @TestForIssue(jiraKey = "HHH-9881") + @Test + public void testConcurrentLoadAndRemoval() throws Exception { + AtomicReference getException = new AtomicReference<>(); + AtomicReference deleteException = new AtomicReference<>(); + + Phaser getPhaser = new Phaser(2); + HookInterceptor hookInterceptor = new HookInterceptor(getException); + AdvancedCache remotePPCache = remoteCustomerCache.getCacheManager().getCache( + remoteCustomerCache.getName() + "-" + InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME).getAdvancedCache(); + remotePPCache.getAdvancedCache().addInterceptor(hookInterceptor, 0); + + IdContainer idContainer = new IdContainer(); + withTx(localTM, () -> { + Session s = localFactory.getCurrentSession(); + s.getTransaction().begin(); + Customer customer = new Customer(); + customer.setName( "JBoss" ); + s.persist(customer); + s.getTransaction().commit(); + s.close(); + idContainer.customerId = customer.getId(); + return null; + }); + // start loading + + Thread getThread = new Thread(() -> { + try { + withTx(remoteTM, () -> { + Session s = remoteFactory.getCurrentSession(); + s.getTransaction().begin(); + s.get(Customer.class, idContainer.customerId); + s.getTransaction().commit(); + s.close(); + return null; + }); + } catch (Exception e) { + log.error("Failure to get customer", e); + getException.set(e); + } + }, "get-thread"); + Thread deleteThread = new Thread(() -> { + try { + withTx(localTM, () -> { + Session s = localFactory.getCurrentSession(); + s.getTransaction().begin(); + Customer customer = s.get(Customer.class, idContainer.customerId); + s.delete(customer); + s.getTransaction().commit(); + return null; + }); + } catch (Exception e) { + log.error("Failure to delete customer", e); + deleteException.set(e); + } + }, "delete-thread"); + // get thread should block on the beginning of PutFromLoadValidator#acquirePutFromLoadLock + hookInterceptor.block(getPhaser, getThread); + getThread.start(); + + arriveAndAwait(getPhaser); + deleteThread.start(); + deleteThread.join(); + hookInterceptor.unblock(); + arriveAndAwait(getPhaser); + getThread.join(); + + if (getException.get() != null) { + throw new IllegalStateException("get-thread failed", getException.get()); } - finally { - // cleanup the db - log.debug( "Cleaning up" ); - cleanup( localFactory, localTM ); + if (deleteException.get() != null) { + throw new IllegalStateException("delete-thread failed", deleteException.get()); } + + Customer localCustomer = getCustomer(idContainer.customerId, localFactory, localTM); + assertNull(localCustomer); + Customer remoteCustomer = getCustomer(idContainer.customerId, remoteFactory, remoteTM); + assertNull(remoteCustomer); + assertTrue(remoteCustomerCache.isEmpty()); + } + + protected void assertEmptyCaches() { + assertTrue( localCustomerCache.isEmpty() ); + assertTrue( localContactCache.isEmpty() ); + assertTrue( localCollectionCache.isEmpty() ); + assertTrue( remoteCustomerCache.isEmpty() ); + assertTrue( remoteContactCache.isEmpty() ); + assertTrue( remoteCollectionCache.isEmpty() ); } private IdContainer createCustomer(SessionFactory sessionFactory, TransactionManager tm) @@ -211,10 +332,16 @@ public class EntityCollectionInvalidationTestCase extends DualNodeTestCase { } private Customer doGetCustomer(Integer id, Session session, TransactionManager tm) throws Exception { - Customer customer = (Customer) session.get( Customer.class, id ); + Customer customer = session.get( Customer.class, id ); + if (customer == null) { + return null; + } // Access all the contacts - for ( Iterator it = customer.getContacts().iterator(); it.hasNext(); ) { - ((Contact) it.next()).getName(); + Set contacts = customer.getContacts(); + if (contacts != null) { + for (Iterator it = contacts.iterator(); it.hasNext(); ) { + ((Contact) it.next()).getName(); + } } return customer; } @@ -271,7 +398,10 @@ public class EntityCollectionInvalidationTestCase extends DualNodeTestCase { c.setContacts( null ); session.delete( c ); } - + // since we don't use orphan removal, some contacts may persist + for (Object contact : session.createCriteria(Contact.class).list()) { + session.delete(contact); + } tm.commit(); } catch (Exception e) { @@ -313,6 +443,15 @@ public class EntityCollectionInvalidationTestCase extends DualNodeTestCase { ); } + protected static void arriveAndAwait(Phaser phaser) throws TimeoutException, InterruptedException { + try { + phaser.awaitAdvanceInterruptibly(phaser.arrive(), 10, TimeUnit.SECONDS); + } catch (TimeoutException e) { + log.error("Failed to progress: " + Util.threadDump()); + throw e; + } + } + @Listener public static class MyListener { private static final Log log = LogFactory.getLog( MyListener.class ); @@ -355,4 +494,45 @@ public class EntityCollectionInvalidationTestCase extends DualNodeTestCase { Set contactIds; } + private static class HookInterceptor extends BaseCustomInterceptor { + final AtomicReference failure; + Phaser phaser; + Thread thread; + + private HookInterceptor(AtomicReference failure) { + this.failure = failure; + } + + public synchronized void block(Phaser phaser, Thread thread) { + this.phaser = phaser; + this.thread = thread; + } + + public synchronized void unblock() { + phaser = null; + thread = null; + } + + @Override + public Object visitGetKeyValueCommand(InvocationContext ctx, GetKeyValueCommand command) throws Throwable { + try { + Phaser phaser; + Thread thread; + synchronized (this) { + phaser = this.phaser; + thread = this.thread; + } + if (phaser != null && Thread.currentThread() == thread) { + arriveAndAwait(phaser); + arriveAndAwait(phaser); + } + } catch (Exception e) { + failure.set(e); + throw e; + } finally { + return super.visitGetKeyValueCommand(ctx, command); + } + } + } + }