HHH-10182 Backport HHH-7898 for JTA environment only

* this fix works only for JTA transactions! (JDBC transactions behavior
  was not altered)
* removed cluster loader from default configuration since this can
  insert already evicted entries to the cache, and with current version
  of Infinispan its use is not justified (configuration was outdated)
This commit is contained in:
Radim Vansa 2015-10-12 15:46:59 +02:00
parent 5912f65120
commit a42d9444fc
6 changed files with 602 additions and 225 deletions

View File

@ -23,7 +23,12 @@
*/
package org.hibernate.cache.infinispan.query;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion;
@ -32,7 +37,16 @@ import org.hibernate.cache.spi.QueryResultsRegion;
import org.hibernate.cache.spi.RegionFactory;
import org.infinispan.AdvancedCache;
import org.infinispan.configuration.cache.TransactionConfiguration;
import org.infinispan.context.Flag;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Region for caching query results.
@ -42,10 +56,14 @@ import org.infinispan.context.Flag;
* @since 3.5
*/
public class QueryResultsRegionImpl extends BaseTransactionalDataRegion implements QueryResultsRegion {
private static final Log log = LogFactory.getLog( QueryResultsRegionImpl.class );
private final AdvancedCache evictCache;
private final AdvancedCache putCache;
private final AdvancedCache getCache;
private final ConcurrentMap<Transaction, Map<Object, PostTransactionQueryUpdate> > transactionContext
= new ConcurrentHashMap<Transaction, Map<Object, PostTransactionQueryUpdate> >();
private final boolean putCacheRequiresTransaction;
/**
* Query region constructor
@ -67,15 +85,24 @@ public class QueryResultsRegionImpl extends BaseTransactionalDataRegion implemen
Caches.failSilentWriteCache( cache );
this.getCache = Caches.failSilentReadCache( cache );
TransactionConfiguration transactionConfiguration = putCache.getCacheConfiguration().transaction();
boolean transactional = transactionConfiguration.transactionMode() != TransactionMode.NON_TRANSACTIONAL;
this.putCacheRequiresTransaction = transactional && !transactionConfiguration.autoCommit();
}
@Override
public void evict(Object key) throws CacheException {
for (Map<Object, PostTransactionQueryUpdate> map : transactionContext.values()) {
PostTransactionQueryUpdate update = map.remove(key);
update.setValue(null);
}
evictCache.remove( key );
}
@Override
public void evictAll() throws CacheException {
transactionContext.clear();
final Transaction tx = suspend();
try {
// Invalidate the local region and then go remote
@ -106,6 +133,23 @@ public class QueryResultsRegionImpl extends BaseTransactionalDataRegion implemen
// to avoid holding locks that would prevent updates.
// Add a zero (or low) timeout option so we don't block
// waiting for tx's that did a put to commit
TransactionManager tm = getTransactionManager();
try {
if (tm != null && tm.getStatus() == Status.STATUS_ACTIVE) {
Transaction transaction = tm.getTransaction();
if (transaction != null) {
Map<Object, PostTransactionQueryUpdate> map = transactionContext.get(transaction);
if (map != null) {
PostTransactionQueryUpdate update = map.get(key);
if (update != null) {
return update.getValue();
}
}
}
}
} catch (SystemException e) {
log.trace("Failed to retrieve current transaction status.", e);
}
if ( skipCacheStore ) {
return getCache.withFlags( Flag.SKIP_CACHE_STORE ).get( key );
}
@ -118,6 +162,37 @@ public class QueryResultsRegionImpl extends BaseTransactionalDataRegion implemen
@SuppressWarnings("unchecked")
public void put(Object key, Object value) throws CacheException {
if ( checkValid() ) {
// See HHH-7898: Even with FAIL_SILENTLY flag, failure to write in transaction
// fails the whole transaction. It is an Infinispan quirk that cannot be fixed
// ISPN-5356 tracks that. This is because if the transaction continued the
// value could be committed on backup owners, including the failed operation,
// and the result would not be consistent.
TransactionManager tm = getTransactionManager();
Transaction transaction = null;
try {
transaction = tm != null && tm.getStatus() == Status.STATUS_ACTIVE ? tm.getTransaction() : null;
if (transaction != null) {
// no need to synchronize as the transaction will be accessed by only one thread
Map<Object, PostTransactionQueryUpdate> map = transactionContext.get(transaction);
if (map == null) {
transactionContext.put(transaction, map = new HashMap());
}
PostTransactionQueryUpdate update = map.get(key);
if (update == null) {
update = new PostTransactionQueryUpdate(transaction, key, value);
transaction.registerSynchronization(update);
map.put(key, update);
} else {
update.setValue(value);
}
return;
}
} catch (SystemException e) {
log.trace(e);
} catch (RollbackException e) {
log.error("Cannot register synchronization to rolled back transaction", e);
}
// Here we don't want to suspend the tx. If we do:
// 1) We might be caching query results that reflect uncommitted
// changes. No tx == no WL on cache node, so other threads
@ -137,4 +212,71 @@ public class QueryResultsRegionImpl extends BaseTransactionalDataRegion implemen
}
}
private class PostTransactionQueryUpdate implements Synchronization {
private final Transaction transaction;
private final Object key;
private Object value;
public PostTransactionQueryUpdate(Transaction transaction, Object key, Object value) {
this.transaction = transaction;
this.key = key;
this.value = value;
}
public Object getValue() {
return value;
}
public void setValue(Object value) {
this.value = value;
}
@Override
public void beforeCompletion() {
}
@Override
public void afterCompletion(int status) {
transactionContext.remove(transaction);
if (value == null) {
return;
}
switch (status) {
case Status.STATUS_COMMITTING:
case Status.STATUS_COMMITTED:
TransactionManager tm = getTransactionManager();
Transaction suspended = null;
try {
suspended = tm.suspend();
if (putCacheRequiresTransaction) {
tm.begin();
try {
putCache.put(key, value);
} finally {
tm.commit();
}
} else {
putCache.put(key, value);
}
}
catch (Exception e) {
// silently fail any exceptions
if (log.isTraceEnabled()) {
log.trace("Exception during query cache update", e);
}
} finally {
if (suspended != null) {
try {
tm.resume(suspended);
} catch (Exception e) {
log.error("Failed to resume suspended transaction " + suspended, e);
}
}
}
break;
default:
break;
}
}
}
}

View File

@ -103,6 +103,16 @@ public class Caches {
}
}
public static void withinTx(TransactionManager tm, final Runnable runnable) throws Exception {
withinTx(tm, new Callable<Void>() {
@Override
public Void call() throws Exception {
runnable.run();
return null;
}
});
}
/**
* Transform a given cache into a local cache
*

View File

@ -103,11 +103,6 @@
<expiration maxIdle="100000" wakeUpInterval="5000"/>
<transaction transactionMode="TRANSACTIONAL" autoCommit="false"
lockingMode="OPTIMISTIC"/>
<!-- State transfer forces all replication calls to be synchronous,
so for calls to remain async, use a cluster cache loader instead -->
<persistence passivation="false">
<cluster remoteCallTimeout="20000" />
</persistence>
</namedCache>
<!-- Optimized for timestamp caching. A clustered timestamp cache
@ -124,11 +119,6 @@
<expiration wakeUpInterval="0"/>
<!-- Explicitly non transactional -->
<transaction transactionMode="NON_TRANSACTIONAL"/>
<!-- State transfer forces all replication calls to be synchronous,
so for calls to remain async, use a cluster cache loader instead -->
<persistence passivation="false">
<cluster remoteCallTimeout="20000" />
</persistence>
</namedCache>
</infinispan>

View File

@ -24,9 +24,11 @@
package org.hibernate.test.cache.infinispan;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.hibernate.cfg.Configuration;
import org.infinispan.AdvancedCache;
import org.infinispan.transaction.tm.BatchModeTransactionManager;
import org.jboss.logging.Logger;
import org.junit.Test;
@ -35,10 +37,9 @@ import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.spi.GeneralDataRegion;
import org.hibernate.cache.spi.QueryResultsRegion;
import org.hibernate.cache.spi.Region;
import org.hibernate.cfg.Configuration;
import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
import static org.hibernate.test.cache.infinispan.util.CacheTestUtil.assertEqualsEventually;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@ -55,6 +56,7 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm
protected static final String VALUE1 = "value1";
protected static final String VALUE2 = "value2";
protected static final String VALUE3 = "value3";
protected Configuration createConfiguration() {
return CacheTestUtil.buildConfiguration( "test", InfinispanRegionFactory.class, false, true );
@ -87,7 +89,7 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm
// Sleep a bit to avoid concurrent FLUSH problem
avoidConcurrentFlush();
GeneralDataRegion localRegion = (GeneralDataRegion) createRegion(
final GeneralDataRegion localRegion = (GeneralDataRegion) createRegion(
regionFactory,
getStandardRegionName( REGION_PREFIX ), cfg.getProperties(), null
);
@ -99,7 +101,7 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm
getCacheTestSupport()
);
GeneralDataRegion remoteRegion = (GeneralDataRegion) createRegion(
final GeneralDataRegion remoteRegion = (GeneralDataRegion) createRegion(
regionFactory,
getStandardRegionName( REGION_PREFIX ),
cfg.getProperties(),
@ -109,29 +111,40 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm
assertNull( "local is clean", localRegion.get( KEY ) );
assertNull( "remote is clean", remoteRegion.get( KEY ) );
regionPut(localRegion);
sleep( 1000 );
assertEquals( VALUE1, localRegion.get( KEY ) );
regionPut(localRegion, KEY, VALUE1);
// allow async propagation
sleep( 250 );
Object expected = invalidation ? null : VALUE1;
assertEquals( expected, remoteRegion.get( KEY ) );
Callable<Object> getFromLocalRegion = new Callable<Object>() {
@Override
public Object call() throws Exception {
return regionGet(localRegion, KEY);
}
};
Callable<Object> getFromRemoteRegion = new Callable<Object>() {
@Override
public Object call() throws Exception {
return regionGet(remoteRegion, KEY);
}
};
regionEvict(localRegion);
assertEqualsEventually(VALUE1, getFromLocalRegion, 10, TimeUnit.SECONDS);
assertEqualsEventually(VALUE1, getFromRemoteRegion, 10, TimeUnit.SECONDS);
// allow async propagation
sleep( 250 );
assertEquals( null, localRegion.get( KEY ) );
assertEquals( null, remoteRegion.get( KEY ) );
regionEvict(localRegion, KEY);
assertEqualsEventually(null, getFromLocalRegion, 10, TimeUnit.SECONDS);
assertEqualsEventually(null, getFromRemoteRegion, 10, TimeUnit.SECONDS);
}
protected void regionEvict(GeneralDataRegion region) throws Exception {
region.evict(KEY);
protected void regionEvict(GeneralDataRegion region, String key) throws Exception {
region.evict(key);
}
protected void regionPut(GeneralDataRegion region) throws Exception {
region.put(KEY, VALUE1);
protected void regionPut(GeneralDataRegion region, String key, String value) throws Exception {
region.put(key, value);
}
protected Object regionGet(GeneralDataRegion region, String key) throws Exception {
return region.get(key);
}
protected abstract String getStandardRegionName(String regionPrefix);
@ -192,13 +205,13 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm
assertNull( "local is clean", localRegion.get( KEY ) );
assertNull( "remote is clean", remoteRegion.get( KEY ) );
regionPut(localRegion);
regionPut(localRegion, KEY, VALUE1);
assertEquals( VALUE1, localRegion.get( KEY ) );
// Allow async propagation
sleep( 250 );
regionPut(remoteRegion);
regionPut(remoteRegion, KEY, VALUE1);
assertEquals( VALUE1, remoteRegion.get( KEY ) );
// Allow async propagation
@ -221,13 +234,4 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm
assertEquals( "local is clean", null, localRegion.get( KEY ) );
assertEquals( "remote is clean", null, remoteRegion.get( KEY ) );
}
protected void rollback() {
try {
BatchModeTransactionManager.getInstance().rollback();
}
catch (Exception e) {
log.error( e.getMessage(), e );
}
}
}

View File

@ -23,16 +23,22 @@
*/
package org.hibernate.test.cache.infinispan.query;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import junit.framework.AssertionFailedError;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.testing.TestForIssue;
import org.infinispan.AdvancedCache;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryVisited;
import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryVisitedEvent;
import org.infinispan.transaction.tm.BatchModeTransactionManager;
import org.infinispan.util.concurrent.IsolationLevel;
@ -49,7 +55,9 @@ import org.hibernate.cfg.Configuration;
import org.hibernate.test.cache.infinispan.AbstractGeneralDataRegionTestCase;
import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
import org.junit.Test;
import static org.hibernate.cache.infinispan.util.Caches.withinTx;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -62,6 +70,7 @@ import static org.junit.Assert.assertTrue;
*/
public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase {
private static final Logger log = Logger.getLogger( QueryRegionImplTestCase.class );
private final BatchModeTransactionManager tm = BatchModeTransactionManager.getInstance();
@Override
protected Region createRegion(
@ -78,30 +87,40 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase {
}
@Override
protected void regionPut(final GeneralDataRegion region) throws Exception {
Caches.withinTx(BatchModeTransactionManager.getInstance(), new Callable<Void>() {
protected void regionPut(final GeneralDataRegion region, final String key, final String value) throws Exception {
withinTx(tm, new Callable<Void>() {
@Override
public Void call() throws Exception {
region.put(KEY, VALUE1);
region.put(key, value);
return null;
}
});
}
@Override
protected void regionEvict(final GeneralDataRegion region) throws Exception {
Caches.withinTx(BatchModeTransactionManager.getInstance(), new Callable<Void>() {
protected void regionEvict(final GeneralDataRegion region, final String key) throws Exception {
withinTx(tm, new Callable<Void>() {
@Override
public Void call() throws Exception {
region.evict(KEY);
region.evict(key);
return null;
}
});
}
@Override
protected Object regionGet(final GeneralDataRegion region, final String key) throws Exception {
return withinTx(tm, new Callable<Object>() {
@Override
public Object call() throws Exception {
return region.get(key);
}
});
}
@Override
protected AdvancedCache getInfinispanCache(InfinispanRegionFactory regionFactory) {
return regionFactory.getCacheManager().getCache( "local-query" ).getAdvancedCache();
return regionFactory.getCacheManager().getCache( getStandardRegionName( REGION_PREFIX ) ).getAdvancedCache();
}
@Override
@ -109,7 +128,8 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase {
return CacheTestUtil.buildCustomQueryCacheConfiguration( "test", "replicated-query" );
}
private void putDoesNotBlockGetTest() throws Exception {
@Test
public void testPutDoesNotBlockGet() throws Exception {
Configuration cfg = createConfiguration();
InfinispanRegionFactory regionFactory = CacheTestUtil.startRegionFactory(
new StandardServiceRegistryBuilder().applySettings( cfg.getProperties() ).build(),
@ -125,7 +145,7 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase {
cfg.getProperties()
);
region.put( KEY, VALUE1 );
regionPut(region, KEY, VALUE1);
assertEquals( VALUE1, region.get( KEY ) );
final CountDownLatch readerLatch = new CountDownLatch( 1 );
@ -137,18 +157,19 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase {
@Override
public void run() {
try {
BatchModeTransactionManager.getInstance().begin();
log.debug( "Transaction began, get value for key" );
withinTx(tm, new Callable() {
@Override
public Object call() throws Exception {
assertTrue( VALUE2.equals( region.get( KEY ) ) == false );
BatchModeTransactionManager.getInstance().commit();
return null;
}
});
}
catch (AssertionFailedError e) {
holder.a1 = e;
rollback();
holder.addAssertionFailure(e);
}
catch (Exception e) {
holder.e1 = e;
rollback();
holder.addException(e);
}
finally {
readerLatch.countDown();
@ -160,18 +181,17 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase {
@Override
public void run() {
try {
BatchModeTransactionManager.getInstance().begin();
log.debug( "Put value2" );
withinTx(tm, new Callable() {
@Override
public Object call() throws Exception {
region.put( KEY, VALUE2 );
log.debug( "Put finished for value2, await writer latch" );
writerLatch.await();
log.debug( "Writer latch finished" );
BatchModeTransactionManager.getInstance().commit();
log.debug( "Transaction committed" );
return null;
}
});
}
catch (Exception e) {
holder.e2 = e;
rollback();
holder.addException(e);
}
finally {
completionLatch.countDown();
@ -187,29 +207,18 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase {
// Start the reader
reader.start();
assertTrue( "Reader finished promptly", readerLatch.await( 1000000000, TimeUnit.MILLISECONDS ) );
assertTrue( "Reader finished promptly", readerLatch.await( 100, TimeUnit.MILLISECONDS ) );
writerLatch.countDown();
assertTrue( "Reader finished promptly", completionLatch.await( 100, TimeUnit.MILLISECONDS ) );
assertEquals( VALUE2, region.get( KEY ) );
if ( holder.a1 != null ) {
throw holder.a1;
}
else if ( holder.a2 != null ) {
throw holder.a2;
}
assertEquals( "writer saw no exceptions", null, holder.e1 );
assertEquals( "reader saw no exceptions", null, holder.e2 );
holder.checkExceptions();
}
@Test
public void testGetDoesNotBlockPut() throws Exception {
getDoesNotBlockPutTest();
}
private void getDoesNotBlockPutTest() throws Exception {
Configuration cfg = createConfiguration();
InfinispanRegionFactory regionFactory = CacheTestUtil.startRegionFactory(
new StandardServiceRegistryBuilder().applySettings( cfg.getProperties() ).build(),
@ -225,12 +234,11 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase {
cfg.getProperties()
);
region.put( KEY, VALUE1 );
regionPut(region, KEY, VALUE1);
assertEquals( VALUE1, region.get( KEY ) );
// final Fqn rootFqn = getRegionFqn(getStandardRegionName(REGION_PREFIX), REGION_PREFIX);
final AdvancedCache jbc = getInfinispanCache(regionFactory);
final AdvancedCache cache = getInfinispanCache(regionFactory);
final CountDownLatch blockerLatch = new CountDownLatch( 1 );
final CountDownLatch writerLatch = new CountDownLatch( 1 );
final CountDownLatch completionLatch = new CountDownLatch( 1 );
@ -243,18 +251,19 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase {
// Fqn toBlock = new Fqn(rootFqn, KEY);
GetBlocker blocker = new GetBlocker( blockerLatch, KEY );
try {
jbc.addListener( blocker );
BatchModeTransactionManager.getInstance().begin();
region.get( KEY );
BatchModeTransactionManager.getInstance().commit();
cache.addListener( blocker );
withinTx(tm, new Callable() {
@Override
public Object call() throws Exception {
return region.get( KEY );
}
});
}
catch (Exception e) {
holder.e1 = e;
rollback();
holder.addException(e);
}
finally {
jbc.removeListener( blocker );
cache.removeListener( blocker );
}
}
};
@ -265,14 +274,10 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase {
public void run() {
try {
writerLatch.await();
BatchModeTransactionManager.getInstance().begin();
region.put( KEY, VALUE2 );
BatchModeTransactionManager.getInstance().commit();
regionPut(region, KEY, VALUE2);
}
catch (Exception e) {
holder.e2 = e;
rollback();
holder.addException(e);
}
finally {
completionLatch.countDown();
@ -283,7 +288,6 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase {
blocker.setDaemon( true );
writer.setDaemon( true );
boolean unblocked = false;
try {
blocker.start();
writer.start();
@ -294,43 +298,186 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase {
assertTrue( "Writer finished promptly", completionLatch.await( 100, TimeUnit.MILLISECONDS ) );
blockerLatch.countDown();
unblocked = true;
if ( IsolationLevel.REPEATABLE_READ.equals( jbc.getCacheConfiguration().locking().isolationLevel() ) ) {
if ( IsolationLevel.REPEATABLE_READ.equals( cache.getCacheConfiguration().locking().isolationLevel() ) ) {
assertEquals( VALUE1, region.get( KEY ) );
}
else {
assertEquals( VALUE2, region.get( KEY ) );
}
if ( holder.a1 != null ) {
throw holder.a1;
}
else if ( holder.a2 != null ) {
throw holder.a2;
}
assertEquals( "blocker saw no exceptions", null, holder.e1 );
assertEquals( "writer saw no exceptions", null, holder.e2 );
holder.checkExceptions();
}
finally {
if ( !unblocked ) {
blockerLatch.countDown();
}
}
@Test
@TestForIssue(jiraKey = "HHH-7898")
public void testPutDuringPut() throws Exception {
Configuration cfg = createConfiguration();
InfinispanRegionFactory regionFactory = CacheTestUtil.startRegionFactory(
new StandardServiceRegistryBuilder().applySettings( cfg.getProperties() ).build(),
cfg,
getCacheTestSupport()
);
// Sleep a bit to avoid concurrent FLUSH problem
avoidConcurrentFlush();
final QueryResultsRegion region = regionFactory.buildQueryResultsRegion(
getStandardRegionName( REGION_PREFIX ),
cfg.getProperties()
);
regionPut(region, KEY, VALUE1);
assertEquals( VALUE1, region.get( KEY ) );
final AdvancedCache cache = getInfinispanCache(regionFactory);
final CountDownLatch blockerLatch = new CountDownLatch(1);
final CountDownLatch triggerLatch = new CountDownLatch(1);
final ExceptionHolder holder = new ExceptionHolder();
Thread blocking = new Thread() {
@Override
public void run() {
PutBlocker blocker = null;
try {
blocker = new PutBlocker(blockerLatch, triggerLatch, KEY);
cache.addListener(blocker);
regionPut(region, KEY, VALUE2);
} catch (Exception e) {
holder.addException(e);
} finally {
if (blocker != null) {
cache.removeListener(blocker);
}
if (triggerLatch.getCount() > 0) {
triggerLatch.countDown();
}
}
}
};
Thread blocked = new Thread() {
@Override
public void run() {
try {
triggerLatch.await();
// this should silently fail
regionPut(region, KEY, VALUE3);
} catch (Exception e) {
holder.addException(e);
}
}
};
blocking.setName("blocking-thread");
blocking.start();
blocked.setName("blocked-thread");
blocked.start();
blocked.join();
blockerLatch.countDown();
blocking.join();
holder.checkExceptions();
assertEquals(VALUE2, region.get(KEY));
}
@Test
public void testQueryUpdate() throws Exception {
Configuration cfg = createConfiguration();
InfinispanRegionFactory regionFactory = CacheTestUtil.startRegionFactory(
new StandardServiceRegistryBuilder().applySettings( cfg.getProperties() ).build(),
cfg,
getCacheTestSupport()
);
// Sleep a bit to avoid concurrent FLUSH problem
avoidConcurrentFlush();
final QueryResultsRegion region = regionFactory.buildQueryResultsRegion(
getStandardRegionName( REGION_PREFIX ),
cfg.getProperties()
);
final ExceptionHolder holder = new ExceptionHolder();
final CyclicBarrier barrier = new CyclicBarrier(2);
regionPut(region, KEY, VALUE1);
Thread updater = new Thread() {
@Override
public void run() {
try {
withinTx(tm, new Callable<Void>() {
@Override
public Void call() throws Exception {
assertEquals(VALUE1, region.get(KEY));
region.put(KEY, VALUE2);
assertEquals(VALUE2, region.get(KEY));
barrier.await(5, TimeUnit.SECONDS);
barrier.await(5, TimeUnit.SECONDS);
region.put(KEY, VALUE3);
assertEquals(VALUE3, region.get(KEY));
barrier.await(5, TimeUnit.SECONDS);
barrier.await(5, TimeUnit.SECONDS);
return null;
}
});
} catch (AssertionFailedError e) {
holder.addAssertionFailure(e);
barrier.reset();
} catch (Exception e) {
holder.addException(e);
barrier.reset();
}
}
};
Thread reader = new Thread() {
@Override
public void run() {
try {
withinTx(tm, new Callable<Void>() {
@Override
public Void call() throws Exception {
assertEquals(VALUE1, region.get(KEY));
barrier.await(5, TimeUnit.SECONDS);
assertEquals(VALUE1, region.get(KEY));
barrier.await(5, TimeUnit.SECONDS);
barrier.await(5, TimeUnit.SECONDS);
assertEquals(VALUE1, region.get(KEY));
barrier.await(5, TimeUnit.SECONDS);
return null;
}
});
} catch (AssertionFailedError e) {
holder.addAssertionFailure(e);
barrier.reset();
} catch (Exception e) {
holder.addException(e);
barrier.reset();
}
}
};
updater.start();
reader.start();
updater.join();
reader.join();
holder.checkExceptions();
assertEquals(VALUE3, regionGet(region, KEY));
}
@Listener
public class GetBlocker {
private final CountDownLatch latch;
private final Object key;
private CountDownLatch latch;
// private Fqn fqn;
private Object key;
GetBlocker(
CountDownLatch latch,
Object key
) {
GetBlocker(CountDownLatch latch, Object key) {
this.latch = latch;
this.key = key;
}
@ -348,10 +495,57 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase {
}
}
@Listener
public class PutBlocker {
private final CountDownLatch blockLatch, triggerLatch;
private final Object key;
private boolean enabled = true;
PutBlocker(CountDownLatch blockLatch, CountDownLatch triggerLatch, Object key) {
this.blockLatch = blockLatch;
this.triggerLatch = triggerLatch;
this.key = key;
}
@CacheEntryModified
public void nodeVisisted(CacheEntryModifiedEvent event) {
// we need isPre since lock is acquired in the commit phase
if ( !event.isPre() && event.getKey().equals( key ) ) {
try {
synchronized (this) {
if (enabled) {
triggerLatch.countDown();
enabled = false;
blockLatch.await();
}
}
}
catch (InterruptedException e) {
log.error( "Interrupted waiting for latch", e );
}
}
}
}
private class ExceptionHolder {
Exception e1;
Exception e2;
AssertionFailedError a1;
AssertionFailedError a2;
private final List<Exception> exceptions = Collections.synchronizedList(new ArrayList<Exception>());
private final List<AssertionFailedError> assertionFailures = Collections.synchronizedList(new ArrayList<AssertionFailedError>());
public void addException(Exception e) {
exceptions.add(e);
}
public void addAssertionFailure(AssertionFailedError e) {
assertionFailures.add(e);
}
public void checkExceptions() throws Exception {
for (AssertionFailedError a : assertionFailures) {
throw a;
}
for (Exception e : exceptions) {
throw e;
}
}
}
}

View File

@ -24,12 +24,16 @@
package org.hibernate.test.cache.infinispan.util;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cfg.AvailableSettings;
import org.hibernate.cfg.Configuration;
import org.hibernate.cfg.Environment;
import org.hibernate.cfg.Settings;
import org.hibernate.internal.util.compare.EqualsHelper;
import org.hibernate.service.ServiceRegistry;
import org.hibernate.test.cache.infinispan.functional.SingleNodeTestCase;
@ -96,6 +100,39 @@ public class CacheTestUtil {
testSupport.unregisterFactory(factory);
}
/**
* Executes {@link #assertEqualsEventually(Object, Callable, long, TimeUnit)} without time limit.
* @param expected
* @param callable
* @param <T>
*/
public static <T> void assertEqualsEventually(T expected, Callable<T> callable) throws Exception {
assertEqualsEventually(expected, callable, -1, TimeUnit.SECONDS);
}
/**
* Periodically calls callable and compares returned value with expected value. If the value matches to expected,
* the method returns. If callable throws an exception, this is propagated. If the returned value does not match to
* expected before timeout, {@link TimeoutException} is thrown.
* @param expected
* @param callable
* @param timeout If non-positive, there is no limit.
* @param timeUnit
* @param <T>
*/
public static <T> void assertEqualsEventually(T expected, Callable<T> callable, long timeout, TimeUnit timeUnit) throws Exception {
long now, deadline = timeout <= 0 ? Long.MAX_VALUE : System.currentTimeMillis() + timeUnit.toMillis(timeout);
for (;;) {
T value = callable.call();
if (EqualsHelper.equals(value, expected)) return;
now = System.currentTimeMillis();
if (now < deadline) {
Thread.sleep(Math.min(100, deadline - now));
} else break;
}
throw new TimeoutException();
}
/**
* Prevent instantiation.
*/