diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/query/QueryResultsRegionImpl.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/query/QueryResultsRegionImpl.java index 4982466265..e8add76bde 100644 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/query/QueryResultsRegionImpl.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/query/QueryResultsRegionImpl.java @@ -23,7 +23,12 @@ */ package org.hibernate.cache.infinispan.query; +import javax.transaction.RollbackException; +import javax.transaction.SystemException; import javax.transaction.Transaction; +import javax.transaction.TransactionManager; +import javax.transaction.Status; +import javax.transaction.Synchronization; import org.hibernate.cache.CacheException; import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion; @@ -32,7 +37,16 @@ import org.hibernate.cache.spi.QueryResultsRegion; import org.hibernate.cache.spi.RegionFactory; import org.infinispan.AdvancedCache; +import org.infinispan.configuration.cache.TransactionConfiguration; import org.infinispan.context.Flag; +import org.infinispan.transaction.TransactionMode; +import org.infinispan.util.logging.Log; +import org.infinispan.util.logging.LogFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; /** * Region for caching query results. @@ -42,10 +56,14 @@ import org.infinispan.context.Flag; * @since 3.5 */ public class QueryResultsRegionImpl extends BaseTransactionalDataRegion implements QueryResultsRegion { + private static final Log log = LogFactory.getLog( QueryResultsRegionImpl.class ); - private final AdvancedCache evictCache; - private final AdvancedCache putCache; - private final AdvancedCache getCache; + private final AdvancedCache evictCache; + private final AdvancedCache putCache; + private final AdvancedCache getCache; + private final ConcurrentMap > transactionContext + = new ConcurrentHashMap >(); + private final boolean putCacheRequiresTransaction; /** * Query region constructor @@ -54,87 +72,211 @@ public class QueryResultsRegionImpl extends BaseTransactionalDataRegion implemen * @param name of the query region * @param factory for the query region */ - public QueryResultsRegionImpl(AdvancedCache cache, String name, RegionFactory factory) { - super( cache, name, null, factory ); - // If Infinispan is using INVALIDATION for query cache, we don't want to propagate changes. - // We use the Timestamps cache to manage invalidation - final boolean localOnly = Caches.isInvalidationCache( cache ); + public QueryResultsRegionImpl(AdvancedCache cache, String name, RegionFactory factory) { + super( cache, name, null, factory ); + // If Infinispan is using INVALIDATION for query cache, we don't want to propagate changes. + // We use the Timestamps cache to manage invalidation + final boolean localOnly = Caches.isInvalidationCache( cache ); - this.evictCache = localOnly ? Caches.localCache( cache ) : cache; + this.evictCache = localOnly ? Caches.localCache( cache ) : cache; - this.putCache = localOnly ? - Caches.failSilentWriteCache( cache, Flag.CACHE_MODE_LOCAL ) : - Caches.failSilentWriteCache( cache ); + this.putCache = localOnly ? + Caches.failSilentWriteCache( cache, Flag.CACHE_MODE_LOCAL ) : + Caches.failSilentWriteCache( cache ); - this.getCache = Caches.failSilentReadCache( cache ); - } + this.getCache = Caches.failSilentReadCache( cache ); - @Override - public void evict(Object key) throws CacheException { - evictCache.remove( key ); - } + TransactionConfiguration transactionConfiguration = putCache.getCacheConfiguration().transaction(); + boolean transactional = transactionConfiguration.transactionMode() != TransactionMode.NON_TRANSACTIONAL; + this.putCacheRequiresTransaction = transactional && !transactionConfiguration.autoCommit(); + } - @Override - public void evictAll() throws CacheException { - final Transaction tx = suspend(); - try { - // Invalidate the local region and then go remote - invalidateRegion(); - Caches.broadcastEvictAll( cache ); - } - finally { - resume( tx ); - } - } + @Override + public void evict(Object key) throws CacheException { + for (Map map : transactionContext.values()) { + PostTransactionQueryUpdate update = map.remove(key); + update.setValue(null); + } + evictCache.remove( key ); + } - @Override - public Object get(Object key) throws CacheException { - // If the region is not valid, skip cache store to avoid going remote to retrieve the query. - // The aim of this is to maintain same logic/semantics as when state transfer was configured. - // TODO: Once https://issues.jboss.org/browse/ISPN-835 has been resolved, revert to state transfer and remove workaround - boolean skipCacheStore = false; - if ( !isValid() ) { - skipCacheStore = true; - } + @Override + public void evictAll() throws CacheException { + transactionContext.clear(); + final Transaction tx = suspend(); + try { + // Invalidate the local region and then go remote + invalidateRegion(); + Caches.broadcastEvictAll( cache ); + } + finally { + resume( tx ); + } + } - if ( !checkValid() ) { - return null; - } + @Override + public Object get(Object key) throws CacheException { + // If the region is not valid, skip cache store to avoid going remote to retrieve the query. + // The aim of this is to maintain same logic/semantics as when state transfer was configured. + // TODO: Once https://issues.jboss.org/browse/ISPN-835 has been resolved, revert to state transfer and remove workaround + boolean skipCacheStore = false; + if ( !isValid() ) { + skipCacheStore = true; + } - // In Infinispan get doesn't acquire any locks, so no need to suspend the tx. - // In the past, when get operations acquired locks, suspending the tx was a way - // to avoid holding locks that would prevent updates. - // Add a zero (or low) timeout option so we don't block - // waiting for tx's that did a put to commit - if ( skipCacheStore ) { - return getCache.withFlags( Flag.SKIP_CACHE_STORE ).get( key ); - } - else { - return getCache.get( key ); - } - } + if ( !checkValid() ) { + return null; + } - @Override - @SuppressWarnings("unchecked") - public void put(Object key, Object value) throws CacheException { - if ( checkValid() ) { - // Here we don't want to suspend the tx. If we do: - // 1) We might be caching query results that reflect uncommitted - // changes. No tx == no WL on cache node, so other threads - // can prematurely see those query results - // 2) No tx == immediate replication. More overhead, plus we - // spread issue #1 above around the cluster + // In Infinispan get doesn't acquire any locks, so no need to suspend the tx. + // In the past, when get operations acquired locks, suspending the tx was a way + // to avoid holding locks that would prevent updates. + // Add a zero (or low) timeout option so we don't block + // waiting for tx's that did a put to commit + TransactionManager tm = getTransactionManager(); + try { + if (tm != null && tm.getStatus() == Status.STATUS_ACTIVE) { + Transaction transaction = tm.getTransaction(); + if (transaction != null) { + Map map = transactionContext.get(transaction); + if (map != null) { + PostTransactionQueryUpdate update = map.get(key); + if (update != null) { + return update.getValue(); + } + } + } + } + } catch (SystemException e) { + log.trace("Failed to retrieve current transaction status.", e); + } + if ( skipCacheStore ) { + return getCache.withFlags( Flag.SKIP_CACHE_STORE ).get( key ); + } + else { + return getCache.get( key ); + } + } - // Add a zero (or quite low) timeout option so we don't block. - // Ignore any TimeoutException. Basically we forego caching the - // query result in order to avoid blocking. - // Reads are done with suspended tx, so they should not hold the - // lock for long. Not caching the query result is OK, since - // any subsequent read will just see the old result with its - // out-of-date timestamp; that result will be discarded and the - // db query performed again. - putCache.put( key, value ); - } - } + @Override + @SuppressWarnings("unchecked") + public void put(Object key, Object value) throws CacheException { + if ( checkValid() ) { + // See HHH-7898: Even with FAIL_SILENTLY flag, failure to write in transaction + // fails the whole transaction. It is an Infinispan quirk that cannot be fixed + // ISPN-5356 tracks that. This is because if the transaction continued the + // value could be committed on backup owners, including the failed operation, + // and the result would not be consistent. + TransactionManager tm = getTransactionManager(); + Transaction transaction = null; + try { + transaction = tm != null && tm.getStatus() == Status.STATUS_ACTIVE ? tm.getTransaction() : null; + if (transaction != null) { + // no need to synchronize as the transaction will be accessed by only one thread + Map map = transactionContext.get(transaction); + if (map == null) { + transactionContext.put(transaction, map = new HashMap()); + } + PostTransactionQueryUpdate update = map.get(key); + if (update == null) { + update = new PostTransactionQueryUpdate(transaction, key, value); + transaction.registerSynchronization(update); + map.put(key, update); + } else { + update.setValue(value); + } + return; + } + } catch (SystemException e) { + log.trace(e); + } catch (RollbackException e) { + log.error("Cannot register synchronization to rolled back transaction", e); + } + // Here we don't want to suspend the tx. If we do: + // 1) We might be caching query results that reflect uncommitted + // changes. No tx == no WL on cache node, so other threads + // can prematurely see those query results + // 2) No tx == immediate replication. More overhead, plus we + // spread issue #1 above around the cluster + + // Add a zero (or quite low) timeout option so we don't block. + // Ignore any TimeoutException. Basically we forego caching the + // query result in order to avoid blocking. + // Reads are done with suspended tx, so they should not hold the + // lock for long. Not caching the query result is OK, since + // any subsequent read will just see the old result with its + // out-of-date timestamp; that result will be discarded and the + // db query performed again. + putCache.put( key, value ); + } + } + + private class PostTransactionQueryUpdate implements Synchronization { + private final Transaction transaction; + private final Object key; + private Object value; + + public PostTransactionQueryUpdate(Transaction transaction, Object key, Object value) { + this.transaction = transaction; + this.key = key; + this.value = value; + } + + public Object getValue() { + return value; + } + + public void setValue(Object value) { + this.value = value; + } + + @Override + public void beforeCompletion() { + } + + @Override + public void afterCompletion(int status) { + transactionContext.remove(transaction); + if (value == null) { + return; + } + switch (status) { + case Status.STATUS_COMMITTING: + case Status.STATUS_COMMITTED: + TransactionManager tm = getTransactionManager(); + Transaction suspended = null; + try { + suspended = tm.suspend(); + if (putCacheRequiresTransaction) { + tm.begin(); + try { + putCache.put(key, value); + } finally { + tm.commit(); + } + } else { + putCache.put(key, value); + } + } + catch (Exception e) { + // silently fail any exceptions + if (log.isTraceEnabled()) { + log.trace("Exception during query cache update", e); + } + } finally { + if (suspended != null) { + try { + tm.resume(suspended); + } catch (Exception e) { + log.error("Failed to resume suspended transaction " + suspended, e); + } + } + } + break; + default: + break; + } + } + } } diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/util/Caches.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/util/Caches.java index 04a2e3fcfb..8001b850ed 100644 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/util/Caches.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/util/Caches.java @@ -103,6 +103,16 @@ public class Caches { } } + public static void withinTx(TransactionManager tm, final Runnable runnable) throws Exception { + withinTx(tm, new Callable() { + @Override + public Void call() throws Exception { + runnable.run(); + return null; + } + }); + } + /** * Transform a given cache into a local cache * diff --git a/hibernate-infinispan/src/main/resources/org/hibernate/cache/infinispan/builder/infinispan-configs.xml b/hibernate-infinispan/src/main/resources/org/hibernate/cache/infinispan/builder/infinispan-configs.xml index beb7811dc8..46d74a5432 100644 --- a/hibernate-infinispan/src/main/resources/org/hibernate/cache/infinispan/builder/infinispan-configs.xml +++ b/hibernate-infinispan/src/main/resources/org/hibernate/cache/infinispan/builder/infinispan-configs.xml @@ -103,11 +103,6 @@ - - - - - - - - \ No newline at end of file diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/AbstractGeneralDataRegionTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/AbstractGeneralDataRegionTestCase.java index 54ce8aedc9..cf480c3751 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/AbstractGeneralDataRegionTestCase.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/AbstractGeneralDataRegionTestCase.java @@ -24,9 +24,11 @@ package org.hibernate.test.cache.infinispan; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import org.hibernate.cfg.Configuration; import org.infinispan.AdvancedCache; -import org.infinispan.transaction.tm.BatchModeTransactionManager; import org.jboss.logging.Logger; import org.junit.Test; @@ -35,10 +37,9 @@ import org.hibernate.cache.infinispan.InfinispanRegionFactory; import org.hibernate.cache.spi.GeneralDataRegion; import org.hibernate.cache.spi.QueryResultsRegion; import org.hibernate.cache.spi.Region; -import org.hibernate.cfg.Configuration; - import org.hibernate.test.cache.infinispan.util.CacheTestUtil; +import static org.hibernate.test.cache.infinispan.util.CacheTestUtil.assertEqualsEventually; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -55,6 +56,7 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm protected static final String VALUE1 = "value1"; protected static final String VALUE2 = "value2"; + protected static final String VALUE3 = "value3"; protected Configuration createConfiguration() { return CacheTestUtil.buildConfiguration( "test", InfinispanRegionFactory.class, false, true ); @@ -87,7 +89,7 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm // Sleep a bit to avoid concurrent FLUSH problem avoidConcurrentFlush(); - GeneralDataRegion localRegion = (GeneralDataRegion) createRegion( + final GeneralDataRegion localRegion = (GeneralDataRegion) createRegion( regionFactory, getStandardRegionName( REGION_PREFIX ), cfg.getProperties(), null ); @@ -99,7 +101,7 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm getCacheTestSupport() ); - GeneralDataRegion remoteRegion = (GeneralDataRegion) createRegion( + final GeneralDataRegion remoteRegion = (GeneralDataRegion) createRegion( regionFactory, getStandardRegionName( REGION_PREFIX ), cfg.getProperties(), @@ -109,32 +111,43 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm assertNull( "local is clean", localRegion.get( KEY ) ); assertNull( "remote is clean", remoteRegion.get( KEY ) ); - regionPut(localRegion); - sleep( 1000 ); - assertEquals( VALUE1, localRegion.get( KEY ) ); + regionPut(localRegion, KEY, VALUE1); - // allow async propagation - sleep( 250 ); - Object expected = invalidation ? null : VALUE1; - assertEquals( expected, remoteRegion.get( KEY ) ); + Callable getFromLocalRegion = new Callable() { + @Override + public Object call() throws Exception { + return regionGet(localRegion, KEY); + } + }; + Callable getFromRemoteRegion = new Callable() { + @Override + public Object call() throws Exception { + return regionGet(remoteRegion, KEY); + } + }; - regionEvict(localRegion); + assertEqualsEventually(VALUE1, getFromLocalRegion, 10, TimeUnit.SECONDS); + assertEqualsEventually(VALUE1, getFromRemoteRegion, 10, TimeUnit.SECONDS); - // allow async propagation - sleep( 250 ); - assertEquals( null, localRegion.get( KEY ) ); - assertEquals( null, remoteRegion.get( KEY ) ); + regionEvict(localRegion, KEY); + + assertEqualsEventually(null, getFromLocalRegion, 10, TimeUnit.SECONDS); + assertEqualsEventually(null, getFromRemoteRegion, 10, TimeUnit.SECONDS); } - protected void regionEvict(GeneralDataRegion region) throws Exception { - region.evict(KEY); - } + protected void regionEvict(GeneralDataRegion region, String key) throws Exception { + region.evict(key); + } - protected void regionPut(GeneralDataRegion region) throws Exception { - region.put(KEY, VALUE1); - } + protected void regionPut(GeneralDataRegion region, String key, String value) throws Exception { + region.put(key, value); + } - protected abstract String getStandardRegionName(String regionPrefix); + protected Object regionGet(GeneralDataRegion region, String key) throws Exception { + return region.get(key); + } + + protected abstract String getStandardRegionName(String regionPrefix); /** * Test method for {@link QueryResultsRegion#evictAll()}. @@ -171,7 +184,7 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm cfg, getCacheTestSupport() ); - AdvancedCache remoteCache = getInfinispanCache( regionFactory ); + AdvancedCache remoteCache = getInfinispanCache( regionFactory ); // Sleep a bit to avoid concurrent FLUSH problem avoidConcurrentFlush(); @@ -192,14 +205,14 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm assertNull( "local is clean", localRegion.get( KEY ) ); assertNull( "remote is clean", remoteRegion.get( KEY ) ); - regionPut(localRegion); - assertEquals( VALUE1, localRegion.get( KEY ) ); + regionPut(localRegion, KEY, VALUE1); + assertEquals( VALUE1, localRegion.get( KEY ) ); // Allow async propagation sleep( 250 ); - regionPut(remoteRegion); - assertEquals( VALUE1, remoteRegion.get( KEY ) ); + regionPut(remoteRegion, KEY, VALUE1); + assertEquals( VALUE1, remoteRegion.get( KEY ) ); // Allow async propagation sleep( 250 ); @@ -221,13 +234,4 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm assertEquals( "local is clean", null, localRegion.get( KEY ) ); assertEquals( "remote is clean", null, remoteRegion.get( KEY ) ); } - - protected void rollback() { - try { - BatchModeTransactionManager.getInstance().rollback(); - } - catch (Exception e) { - log.error( e.getMessage(), e ); - } - } -} \ No newline at end of file +} diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/query/QueryRegionImplTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/query/QueryRegionImplTestCase.java index 0587c14eaf..8b49f5b54b 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/query/QueryRegionImplTestCase.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/query/QueryRegionImplTestCase.java @@ -23,16 +23,22 @@ */ package org.hibernate.test.cache.infinispan.query; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Properties; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import junit.framework.AssertionFailedError; -import org.hibernate.cache.infinispan.util.Caches; +import org.hibernate.testing.TestForIssue; import org.infinispan.AdvancedCache; import org.infinispan.notifications.Listener; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified; import org.infinispan.notifications.cachelistener.annotation.CacheEntryVisited; +import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent; import org.infinispan.notifications.cachelistener.event.CacheEntryVisitedEvent; import org.infinispan.transaction.tm.BatchModeTransactionManager; import org.infinispan.util.concurrent.IsolationLevel; @@ -49,7 +55,9 @@ import org.hibernate.cfg.Configuration; import org.hibernate.test.cache.infinispan.AbstractGeneralDataRegionTestCase; import org.hibernate.test.cache.infinispan.util.CacheTestUtil; +import org.junit.Test; +import static org.hibernate.cache.infinispan.util.Caches.withinTx; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -62,6 +70,7 @@ import static org.junit.Assert.assertTrue; */ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase { private static final Logger log = Logger.getLogger( QueryRegionImplTestCase.class ); + private final BatchModeTransactionManager tm = BatchModeTransactionManager.getInstance(); @Override protected Region createRegion( @@ -77,31 +86,41 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase { return regionPrefix + "/" + StandardQueryCache.class.getName(); } - @Override - protected void regionPut(final GeneralDataRegion region) throws Exception { - Caches.withinTx(BatchModeTransactionManager.getInstance(), new Callable() { - @Override - public Void call() throws Exception { - region.put(KEY, VALUE1); - return null; - } - }); - } + @Override + protected void regionPut(final GeneralDataRegion region, final String key, final String value) throws Exception { + withinTx(tm, new Callable() { + @Override + public Void call() throws Exception { + region.put(key, value); + return null; + } + }); + } - @Override - protected void regionEvict(final GeneralDataRegion region) throws Exception { - Caches.withinTx(BatchModeTransactionManager.getInstance(), new Callable() { - @Override - public Void call() throws Exception { - region.evict(KEY); - return null; - } - }); - } + @Override + protected void regionEvict(final GeneralDataRegion region, final String key) throws Exception { + withinTx(tm, new Callable() { + @Override + public Void call() throws Exception { + region.evict(key); + return null; + } + }); + } - @Override + @Override + protected Object regionGet(final GeneralDataRegion region, final String key) throws Exception { + return withinTx(tm, new Callable() { + @Override + public Object call() throws Exception { + return region.get(key); + } + }); + } + + @Override protected AdvancedCache getInfinispanCache(InfinispanRegionFactory regionFactory) { - return regionFactory.getCacheManager().getCache( "local-query" ).getAdvancedCache(); + return regionFactory.getCacheManager().getCache( getStandardRegionName( REGION_PREFIX ) ).getAdvancedCache(); } @Override @@ -109,7 +128,8 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase { return CacheTestUtil.buildCustomQueryCacheConfiguration( "test", "replicated-query" ); } - private void putDoesNotBlockGetTest() throws Exception { + @Test + public void testPutDoesNotBlockGet() throws Exception { Configuration cfg = createConfiguration(); InfinispanRegionFactory regionFactory = CacheTestUtil.startRegionFactory( new StandardServiceRegistryBuilder().applySettings( cfg.getProperties() ).build(), @@ -125,7 +145,7 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase { cfg.getProperties() ); - region.put( KEY, VALUE1 ); + regionPut(region, KEY, VALUE1); assertEquals( VALUE1, region.get( KEY ) ); final CountDownLatch readerLatch = new CountDownLatch( 1 ); @@ -137,18 +157,19 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase { @Override public void run() { try { - BatchModeTransactionManager.getInstance().begin(); - log.debug( "Transaction began, get value for key" ); - assertTrue( VALUE2.equals( region.get( KEY ) ) == false ); - BatchModeTransactionManager.getInstance().commit(); + withinTx(tm, new Callable() { + @Override + public Object call() throws Exception { + assertTrue( VALUE2.equals( region.get( KEY ) ) == false ); + return null; + } + }); } catch (AssertionFailedError e) { - holder.a1 = e; - rollback(); + holder.addAssertionFailure(e); } catch (Exception e) { - holder.e1 = e; - rollback(); + holder.addException(e); } finally { readerLatch.countDown(); @@ -160,18 +181,17 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase { @Override public void run() { try { - BatchModeTransactionManager.getInstance().begin(); - log.debug( "Put value2" ); - region.put( KEY, VALUE2 ); - log.debug( "Put finished for value2, await writer latch" ); - writerLatch.await(); - log.debug( "Writer latch finished" ); - BatchModeTransactionManager.getInstance().commit(); - log.debug( "Transaction committed" ); + withinTx(tm, new Callable() { + @Override + public Object call() throws Exception { + region.put( KEY, VALUE2 ); + writerLatch.await(); + return null; + } + }); } catch (Exception e) { - holder.e2 = e; - rollback(); + holder.addException(e); } finally { completionLatch.countDown(); @@ -187,29 +207,18 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase { // Start the reader reader.start(); - assertTrue( "Reader finished promptly", readerLatch.await( 1000000000, TimeUnit.MILLISECONDS ) ); + assertTrue( "Reader finished promptly", readerLatch.await( 100, TimeUnit.MILLISECONDS ) ); writerLatch.countDown(); assertTrue( "Reader finished promptly", completionLatch.await( 100, TimeUnit.MILLISECONDS ) ); assertEquals( VALUE2, region.get( KEY ) ); - if ( holder.a1 != null ) { - throw holder.a1; - } - else if ( holder.a2 != null ) { - throw holder.a2; - } - - assertEquals( "writer saw no exceptions", null, holder.e1 ); - assertEquals( "reader saw no exceptions", null, holder.e2 ); + holder.checkExceptions(); } + @Test public void testGetDoesNotBlockPut() throws Exception { - getDoesNotBlockPutTest(); - } - - private void getDoesNotBlockPutTest() throws Exception { Configuration cfg = createConfiguration(); InfinispanRegionFactory regionFactory = CacheTestUtil.startRegionFactory( new StandardServiceRegistryBuilder().applySettings( cfg.getProperties() ).build(), @@ -225,12 +234,11 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase { cfg.getProperties() ); - region.put( KEY, VALUE1 ); + regionPut(region, KEY, VALUE1); assertEquals( VALUE1, region.get( KEY ) ); // final Fqn rootFqn = getRegionFqn(getStandardRegionName(REGION_PREFIX), REGION_PREFIX); - final AdvancedCache jbc = getInfinispanCache(regionFactory); - + final AdvancedCache cache = getInfinispanCache(regionFactory); final CountDownLatch blockerLatch = new CountDownLatch( 1 ); final CountDownLatch writerLatch = new CountDownLatch( 1 ); final CountDownLatch completionLatch = new CountDownLatch( 1 ); @@ -243,18 +251,19 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase { // Fqn toBlock = new Fqn(rootFqn, KEY); GetBlocker blocker = new GetBlocker( blockerLatch, KEY ); try { - jbc.addListener( blocker ); - - BatchModeTransactionManager.getInstance().begin(); - region.get( KEY ); - BatchModeTransactionManager.getInstance().commit(); + cache.addListener( blocker ); + withinTx(tm, new Callable() { + @Override + public Object call() throws Exception { + return region.get( KEY ); + } + }); } catch (Exception e) { - holder.e1 = e; - rollback(); + holder.addException(e); } finally { - jbc.removeListener( blocker ); + cache.removeListener( blocker ); } } }; @@ -265,14 +274,10 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase { public void run() { try { writerLatch.await(); - - BatchModeTransactionManager.getInstance().begin(); - region.put( KEY, VALUE2 ); - BatchModeTransactionManager.getInstance().commit(); + regionPut(region, KEY, VALUE2); } catch (Exception e) { - holder.e2 = e; - rollback(); + holder.addException(e); } finally { completionLatch.countDown(); @@ -283,7 +288,6 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase { blocker.setDaemon( true ); writer.setDaemon( true ); - boolean unblocked = false; try { blocker.start(); writer.start(); @@ -294,43 +298,186 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase { assertTrue( "Writer finished promptly", completionLatch.await( 100, TimeUnit.MILLISECONDS ) ); blockerLatch.countDown(); - unblocked = true; - if ( IsolationLevel.REPEATABLE_READ.equals( jbc.getCacheConfiguration().locking().isolationLevel() ) ) { + if ( IsolationLevel.REPEATABLE_READ.equals( cache.getCacheConfiguration().locking().isolationLevel() ) ) { assertEquals( VALUE1, region.get( KEY ) ); } else { assertEquals( VALUE2, region.get( KEY ) ); } - if ( holder.a1 != null ) { - throw holder.a1; - } - else if ( holder.a2 != null ) { - throw holder.a2; - } - - assertEquals( "blocker saw no exceptions", null, holder.e1 ); - assertEquals( "writer saw no exceptions", null, holder.e2 ); + holder.checkExceptions(); } finally { - if ( !unblocked ) { - blockerLatch.countDown(); - } + blockerLatch.countDown(); } } + @Test + @TestForIssue(jiraKey = "HHH-7898") + public void testPutDuringPut() throws Exception { + Configuration cfg = createConfiguration(); + InfinispanRegionFactory regionFactory = CacheTestUtil.startRegionFactory( + new StandardServiceRegistryBuilder().applySettings( cfg.getProperties() ).build(), + cfg, + getCacheTestSupport() + ); + + // Sleep a bit to avoid concurrent FLUSH problem + avoidConcurrentFlush(); + + final QueryResultsRegion region = regionFactory.buildQueryResultsRegion( + getStandardRegionName( REGION_PREFIX ), + cfg.getProperties() + ); + + regionPut(region, KEY, VALUE1); + assertEquals( VALUE1, region.get( KEY ) ); + + final AdvancedCache cache = getInfinispanCache(regionFactory); + final CountDownLatch blockerLatch = new CountDownLatch(1); + final CountDownLatch triggerLatch = new CountDownLatch(1); + final ExceptionHolder holder = new ExceptionHolder(); + + Thread blocking = new Thread() { + @Override + public void run() { + PutBlocker blocker = null; + try { + blocker = new PutBlocker(blockerLatch, triggerLatch, KEY); + cache.addListener(blocker); + regionPut(region, KEY, VALUE2); + } catch (Exception e) { + holder.addException(e); + } finally { + if (blocker != null) { + cache.removeListener(blocker); + } + if (triggerLatch.getCount() > 0) { + triggerLatch.countDown(); + } + } + } + }; + + Thread blocked = new Thread() { + @Override + public void run() { + try { + triggerLatch.await(); + // this should silently fail + regionPut(region, KEY, VALUE3); + } catch (Exception e) { + holder.addException(e); + } + } + }; + + blocking.setName("blocking-thread"); + blocking.start(); + blocked.setName("blocked-thread"); + blocked.start(); + blocked.join(); + blockerLatch.countDown(); + blocking.join(); + + holder.checkExceptions(); + + assertEquals(VALUE2, region.get(KEY)); + } + + @Test + public void testQueryUpdate() throws Exception { + Configuration cfg = createConfiguration(); + InfinispanRegionFactory regionFactory = CacheTestUtil.startRegionFactory( + new StandardServiceRegistryBuilder().applySettings( cfg.getProperties() ).build(), + cfg, + getCacheTestSupport() + ); + + // Sleep a bit to avoid concurrent FLUSH problem + avoidConcurrentFlush(); + + final QueryResultsRegion region = regionFactory.buildQueryResultsRegion( + getStandardRegionName( REGION_PREFIX ), + cfg.getProperties() + ); + + final ExceptionHolder holder = new ExceptionHolder(); + final CyclicBarrier barrier = new CyclicBarrier(2); + regionPut(region, KEY, VALUE1); + + Thread updater = new Thread() { + @Override + public void run() { + try { + withinTx(tm, new Callable() { + @Override + public Void call() throws Exception { + assertEquals(VALUE1, region.get(KEY)); + region.put(KEY, VALUE2); + assertEquals(VALUE2, region.get(KEY)); + barrier.await(5, TimeUnit.SECONDS); + barrier.await(5, TimeUnit.SECONDS); + region.put(KEY, VALUE3); + assertEquals(VALUE3, region.get(KEY)); + barrier.await(5, TimeUnit.SECONDS); + barrier.await(5, TimeUnit.SECONDS); + return null; + } + }); + } catch (AssertionFailedError e) { + holder.addAssertionFailure(e); + barrier.reset(); + } catch (Exception e) { + holder.addException(e); + barrier.reset(); + } + } + }; + + Thread reader = new Thread() { + @Override + public void run() { + try { + withinTx(tm, new Callable() { + @Override + public Void call() throws Exception { + assertEquals(VALUE1, region.get(KEY)); + barrier.await(5, TimeUnit.SECONDS); + assertEquals(VALUE1, region.get(KEY)); + barrier.await(5, TimeUnit.SECONDS); + barrier.await(5, TimeUnit.SECONDS); + assertEquals(VALUE1, region.get(KEY)); + barrier.await(5, TimeUnit.SECONDS); + return null; + } + }); + } catch (AssertionFailedError e) { + holder.addAssertionFailure(e); + barrier.reset(); + } catch (Exception e) { + holder.addException(e); + barrier.reset(); + } + } + }; + + updater.start(); + reader.start(); + updater.join(); + reader.join(); + holder.checkExceptions(); + + assertEquals(VALUE3, regionGet(region, KEY)); + } + @Listener public class GetBlocker { + private final CountDownLatch latch; + private final Object key; - private CountDownLatch latch; - // private Fqn fqn; - private Object key; - - GetBlocker( - CountDownLatch latch, - Object key - ) { + GetBlocker(CountDownLatch latch, Object key) { this.latch = latch; this.key = key; } @@ -348,10 +495,57 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase { } } + @Listener + public class PutBlocker { + private final CountDownLatch blockLatch, triggerLatch; + private final Object key; + private boolean enabled = true; + + PutBlocker(CountDownLatch blockLatch, CountDownLatch triggerLatch, Object key) { + this.blockLatch = blockLatch; + this.triggerLatch = triggerLatch; + this.key = key; + } + + @CacheEntryModified + public void nodeVisisted(CacheEntryModifiedEvent event) { + // we need isPre since lock is acquired in the commit phase + if ( !event.isPre() && event.getKey().equals( key ) ) { + try { + synchronized (this) { + if (enabled) { + triggerLatch.countDown(); + enabled = false; + blockLatch.await(); + } + } + } + catch (InterruptedException e) { + log.error( "Interrupted waiting for latch", e ); + } + } + } + } + private class ExceptionHolder { - Exception e1; - Exception e2; - AssertionFailedError a1; - AssertionFailedError a2; + private final List exceptions = Collections.synchronizedList(new ArrayList()); + private final List assertionFailures = Collections.synchronizedList(new ArrayList()); + + public void addException(Exception e) { + exceptions.add(e); + } + + public void addAssertionFailure(AssertionFailedError e) { + assertionFailures.add(e); + } + + public void checkExceptions() throws Exception { + for (AssertionFailedError a : assertionFailures) { + throw a; + } + for (Exception e : exceptions) { + throw e; + } + } } } diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/util/CacheTestUtil.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/util/CacheTestUtil.java index 7803dbcec7..57ffaf085e 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/util/CacheTestUtil.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/util/CacheTestUtil.java @@ -24,12 +24,16 @@ package org.hibernate.test.cache.infinispan.util; import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.hibernate.cache.infinispan.InfinispanRegionFactory; import org.hibernate.cfg.AvailableSettings; import org.hibernate.cfg.Configuration; import org.hibernate.cfg.Environment; import org.hibernate.cfg.Settings; +import org.hibernate.internal.util.compare.EqualsHelper; import org.hibernate.service.ServiceRegistry; import org.hibernate.test.cache.infinispan.functional.SingleNodeTestCase; @@ -96,6 +100,39 @@ public class CacheTestUtil { testSupport.unregisterFactory(factory); } + /** + * Executes {@link #assertEqualsEventually(Object, Callable, long, TimeUnit)} without time limit. + * @param expected + * @param callable + * @param + */ + public static void assertEqualsEventually(T expected, Callable callable) throws Exception { + assertEqualsEventually(expected, callable, -1, TimeUnit.SECONDS); + } + + /** + * Periodically calls callable and compares returned value with expected value. If the value matches to expected, + * the method returns. If callable throws an exception, this is propagated. If the returned value does not match to + * expected before timeout, {@link TimeoutException} is thrown. + * @param expected + * @param callable + * @param timeout If non-positive, there is no limit. + * @param timeUnit + * @param + */ + public static void assertEqualsEventually(T expected, Callable callable, long timeout, TimeUnit timeUnit) throws Exception { + long now, deadline = timeout <= 0 ? Long.MAX_VALUE : System.currentTimeMillis() + timeUnit.toMillis(timeout); + for (;;) { + T value = callable.call(); + if (EqualsHelper.equals(value, expected)) return; + now = System.currentTimeMillis(); + if (now < deadline) { + Thread.sleep(Math.min(100, deadline - now)); + } else break; + } + throw new TimeoutException(); + } + /** * Prevent instantiation. */