HHH-7898 Regression on org.hibernate.cache.infinispan.query.QueryResultsRegionImpl.put(Object, Object)

* Moved query cache update to second phase of transaction commit
* Query caches are now recommended to be non-transactional (transactional ones will be slower)
This commit is contained in:
Radim Vansa 2015-07-28 16:56:45 +02:00 committed by Andrea Boriero
parent 4fd7680191
commit 0cb00db3b9
5 changed files with 496 additions and 354 deletions

View File

@ -7,15 +7,33 @@
package org.hibernate.cache.infinispan.query;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import org.hibernate.HibernateException;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.spi.QueryResultsRegion;
import org.hibernate.cache.spi.RegionFactory;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.jdbc.WorkExecutor;
import org.hibernate.jdbc.WorkExecutorVisitable;
import org.hibernate.resource.transaction.TransactionCoordinator;
import org.infinispan.AdvancedCache;
import org.infinispan.configuration.cache.TransactionConfiguration;
import org.infinispan.context.Flag;
import org.infinispan.transaction.TransactionMode;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Region for caching query results.
@ -25,10 +43,13 @@ import org.infinispan.context.Flag;
* @since 3.5
*/
public class QueryResultsRegionImpl extends BaseTransactionalDataRegion implements QueryResultsRegion {
private static final Log log = LogFactory.getLog( QueryResultsRegionImpl.class );
private final AdvancedCache evictCache;
private final AdvancedCache putCache;
private final AdvancedCache getCache;
private final ConcurrentMap<SessionImplementor, Map> transactionContext = new ConcurrentHashMap<SessionImplementor, Map>();
private final boolean putCacheRequiresTransaction;
/**
* Query region constructor
@ -50,15 +71,28 @@ public class QueryResultsRegionImpl extends BaseTransactionalDataRegion implemen
Caches.failSilentWriteCache( cache );
this.getCache = Caches.failSilentReadCache( cache );
TransactionConfiguration transactionConfiguration = putCache.getCacheConfiguration().transaction();
boolean transactional = transactionConfiguration.transactionMode() != TransactionMode.NON_TRANSACTIONAL;
this.putCacheRequiresTransaction = transactional && !transactionConfiguration.autoCommit();
// Since we execute the query update explicitly form transaction synchronization, the putCache does not need
// to be transactional anymore (it had to be in the past to prevent revealing uncommitted changes).
if (transactional) {
log.warn("Use non-transactional query caches for best performance!");
}
}
@Override
public void evict(Object key) throws CacheException {
for (Map map : transactionContext.values()) {
map.remove(key);
}
evictCache.remove( key );
}
@Override
public void evictAll() throws CacheException {
transactionContext.clear();
final Transaction tx = suspend();
try {
// Invalidate the local region and then go remote
@ -89,18 +123,42 @@ public class QueryResultsRegionImpl extends BaseTransactionalDataRegion implemen
// to avoid holding locks that would prevent updates.
// Add a zero (or low) timeout option so we don't block
// waiting for tx's that did a put to commit
Object result;
if ( skipCacheStore ) {
return getCache.withFlags( Flag.SKIP_CACHE_STORE ).get( key );
result = getCache.withFlags( Flag.SKIP_CACHE_STORE ).get( key );
}
else {
return getCache.get( key );
result = getCache.get( key );
}
if (result == null) {
Map map = transactionContext.get(session);
if (map != null) {
result = map.get(key);
}
}
return result;
}
@Override
@SuppressWarnings("unchecked")
public void put(SessionImplementor session, Object key, Object value) throws CacheException {
if ( checkValid() ) {
// See HHH-7898: Even with FAIL_SILENTLY flag, failure to write in transaction
// fails the whole transaction. It is an Infinispan quirk that cannot be fixed
// ISPN-5356 tracks that. This is because if the transaction continued the
// value could be committed on backup owners, including the failed operation,
// and the result would not be consistent.
TransactionCoordinator tc = session.getTransactionCoordinator();
if (tc != null && tc.isJoined()) {
tc.getLocalSynchronizations().registerSynchronization(new PostTransactionQueryUpdate(tc, session, key, value));
// no need to synchronize as the transaction will be accessed by only one thread
Map map = transactionContext.get(session);
if (map == null) {
transactionContext.put(session, map = new HashMap());
}
map.put(key, value);
return;
}
// Here we don't want to suspend the tx. If we do:
// 1) We might be caching query results that reflect uncommitted
// changes. No tx == no WL on cache node, so other threads
@ -120,4 +178,52 @@ public class QueryResultsRegionImpl extends BaseTransactionalDataRegion implemen
}
}
private class PostTransactionQueryUpdate implements Synchronization {
private final TransactionCoordinator tc;
private final SessionImplementor session;
private final Object key;
private final Object value;
public PostTransactionQueryUpdate(TransactionCoordinator tc, SessionImplementor session, Object key, Object value) {
this.tc = tc;
this.session = session;
this.key = key;
this.value = value;
}
@Override
public void beforeCompletion() {
}
@Override
public void afterCompletion(int status) {
transactionContext.remove(session);
switch (status) {
case Status.STATUS_COMMITTING:
case Status.STATUS_COMMITTED:
try {
// TODO: isolation without obtaining Connection
tc.createIsolationDelegate().delegateWork(new WorkExecutorVisitable<Void>() {
@Override
public Void accept(WorkExecutor<Void> executor, Connection connection) throws SQLException {
putCache.put(key, value);
return null;
}
}
, putCacheRequiresTransaction);
}
catch (HibernateException e) {
// silently fail any exceptions
if (log.isTraceEnabled()) {
log.trace("Exception during query cache update", e);
}
}
break;
default:
// it would be nicer to react only on ROLLING_BACK and ROLLED_BACK statuses
// but TransactionCoordinator gives us UNKNOWN on rollback
break;
}
}
}
}

View File

@ -87,6 +87,16 @@ public class Caches {
}
}
public static void withinTx(TransactionManager tm, final Runnable runnable) throws Exception {
withinTx(tm, new Callable<Void>() {
@Override
public Void call() throws Exception {
runnable.run();
return null;
}
});
}
/**
* Transform a given cache into a local cache
*

View File

@ -54,7 +54,7 @@
<!-- A config appropriate for query caching. Does not replicate queries. -->
<local-cache name="local-query">
<locking isolation="READ_COMMITTED" concurrency-level="1000" acquire-timeout="15000" striping="false"/>
<transaction mode="NON_XA" locking="OPTIMISTIC" auto-commit="false"/>
<transaction mode="NONE" />
<eviction max-entries="10000" strategy="LRU"/>
<expiration max-idle="100000" interval="5000"/>
</local-cache>
@ -62,7 +62,7 @@
<!-- A query cache that replicates queries. Replication is asynchronous. -->
<replicated-cache name="replicated-query" mode="ASYNC">
<locking isolation="READ_COMMITTED" concurrency-level="1000" acquire-timeout="15000" striping="false"/>
<transaction mode="NON_XA" locking="OPTIMISTIC" auto-commit="false"/>
<transaction mode="NONE" />
<eviction max-entries="10000" strategy="LRU"/>
<expiration max-idle="100000" interval="5000"/>
</replicated-cache>

View File

@ -6,23 +6,39 @@
*/
package org.hibernate.test.cache.infinispan;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.Transaction;
import org.hibernate.boot.MetadataSources;
import org.hibernate.boot.registry.StandardServiceRegistry;
import org.hibernate.boot.registry.StandardServiceRegistryBuilder;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.impl.BaseGeneralDataRegion;
import org.hibernate.cache.spi.GeneralDataRegion;
import org.hibernate.cache.spi.QueryResultsRegion;
import org.hibernate.cache.spi.Region;
import org.hibernate.cache.spi.RegionFactory;
import org.hibernate.cfg.AvailableSettings;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.resource.transaction.backend.jta.internal.JtaTransactionCoordinatorBuilderImpl;
import org.hibernate.test.cache.infinispan.functional.SingleNodeTestCase;
import org.hibernate.test.cache.infinispan.util.BatchModeJtaPlatform;
import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
import org.infinispan.AdvancedCache;
import org.infinispan.transaction.tm.BatchModeTransactionManager;
import org.jboss.logging.Logger;
import org.junit.Test;
import javax.transaction.TransactionManager;
import static org.hibernate.test.cache.infinispan.util.CacheTestUtil.assertEqualsEventually;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@ -40,6 +56,9 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm
protected static final String VALUE1 = "value1";
protected static final String VALUE2 = "value2";
protected static final String VALUE3 = "value3";
protected TransactionManager tm = BatchModeTransactionManager.getInstance();
protected StandardServiceRegistryBuilder createStandardServiceRegistryBuilder() {
return CacheTestUtil.buildBaselineStandardServiceRegistryBuilder(
@ -65,81 +84,88 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm
evictOrRemoveTest();
}
private void evictOrRemoveTest() throws Exception {
final StandardServiceRegistryBuilder ssrb = createStandardServiceRegistryBuilder();
StandardServiceRegistry registry1 = ssrb.build();
StandardServiceRegistry registry2 = ssrb.build();
protected interface SFRConsumer {
void accept(List<SessionFactory> sessionFactories, List<GeneralDataRegion> regions) throws Exception;
}
protected void withSessionFactoriesAndRegions(int num, SFRConsumer consumer) throws Exception {
StandardServiceRegistryBuilder ssrb = createStandardServiceRegistryBuilder()
.applySetting(AvailableSettings.CACHE_REGION_FACTORY, SingleNodeTestCase.TestInfinispanRegionFactory.class.getName())
.applySetting(AvailableSettings.JTA_PLATFORM, BatchModeJtaPlatform.class.getName())
.applySetting(AvailableSettings.TRANSACTION_COORDINATOR_STRATEGY, JtaTransactionCoordinatorBuilderImpl.class.getName());
Properties properties = CacheTestUtil.toProperties( ssrb.getSettings() );
List<StandardServiceRegistry> registries = new ArrayList<>();
List<SessionFactory> sessionFactories = new ArrayList<>();
List<GeneralDataRegion> regions = new ArrayList<>();
for (int i = 0; i < num; ++i) {
StandardServiceRegistry registry = ssrb.build();
registries.add(registry);
SessionFactory sessionFactory = new MetadataSources(registry).buildMetadata().buildSessionFactory();
sessionFactories.add(sessionFactory);
InfinispanRegionFactory regionFactory = (InfinispanRegionFactory) registry.getService(RegionFactory.class);
GeneralDataRegion region = (GeneralDataRegion) createRegion(
regionFactory,
getStandardRegionName( REGION_PREFIX ),
properties,
null
);
regions.add(region);
}
try {
InfinispanRegionFactory regionFactory = CacheTestUtil.startRegionFactory(
registry1,
getCacheTestSupport()
);
final Properties properties = CacheTestUtil.toProperties( ssrb.getSettings() );
boolean invalidation = false;
// Sleep a bit to avoid concurrent FLUSH problem
avoidConcurrentFlush();
final GeneralDataRegion localRegion = (GeneralDataRegion) createRegion(
regionFactory,
getStandardRegionName( REGION_PREFIX ),
properties,
null
);
regionFactory = CacheTestUtil.startRegionFactory(
registry2,
getCacheTestSupport()
);
final GeneralDataRegion remoteRegion = (GeneralDataRegion) createRegion(
regionFactory,
getStandardRegionName( REGION_PREFIX ),
properties,
null
);
assertNull( "local is clean", localRegion.get(null, KEY ) );
assertNull( "remote is clean", remoteRegion.get(null, KEY ) );
regionPut( localRegion );
Callable<Object> getFromLocalRegion = new Callable<Object>() {
@Override
public Object call() throws Exception {
return localRegion.get(null, KEY);
consumer.accept(sessionFactories, regions);
} finally {
for (SessionFactory sessionFactory : sessionFactories) {
sessionFactory.close();
}
};
Callable<Object> getFromRemoteRegion = new Callable<Object>() {
@Override
public Object call() throws Exception {
return remoteRegion.get(null, KEY);
for (StandardServiceRegistry registry : registries) {
StandardServiceRegistryBuilder.destroy( registry );
}
};
}
}
private void evictOrRemoveTest() throws Exception {
withSessionFactoriesAndRegions(2, ((sessionFactories, regions) -> {
GeneralDataRegion localRegion = regions.get(0);
GeneralDataRegion remoteRegion = regions.get(1);
SessionImplementor localSession = (SessionImplementor) sessionFactories.get(0).openSession();
SessionImplementor remoteSession = (SessionImplementor) sessionFactories.get(1).openSession();
try {
assertNull("local is clean", localRegion.get(localSession, KEY));
assertNull("remote is clean", remoteRegion.get(remoteSession, KEY));
Transaction tx = ((Session) localSession).getTransaction();
tx.begin();
try {
localRegion.put(localSession, KEY, VALUE1);
tx.commit();
} catch (Exception e) {
tx.rollback();
throw e;
}
Callable<Object> getFromLocalRegion = () -> localRegion.get(localSession, KEY);
Callable<Object> getFromRemoteRegion = () -> remoteRegion.get(remoteSession, KEY);
assertEqualsEventually(VALUE1, getFromLocalRegion, 10, TimeUnit.SECONDS);
Object expected = invalidation ? null : VALUE1;
assertEqualsEventually(expected, getFromRemoteRegion, 10, TimeUnit.SECONDS);
assertEqualsEventually(VALUE1, getFromRemoteRegion, 10, TimeUnit.SECONDS);
regionEvict(localRegion);
assertEqualsEventually(null, getFromLocalRegion, 10, TimeUnit.SECONDS);
assertEqualsEventually(null, getFromRemoteRegion, 10, TimeUnit.SECONDS);
} finally {
StandardServiceRegistryBuilder.destroy( registry1 );
StandardServiceRegistryBuilder.destroy( registry2 );
( (Session) localSession).close();
( (Session) remoteSession).close();
}
}));
}
protected void regionEvict(GeneralDataRegion region) throws Exception {
region.evict(KEY);
}
protected void regionPut(GeneralDataRegion region) throws Exception {
region.put(null, KEY, VALUE1);
}
protected abstract String getStandardRegionName(String regionPrefix);
/**
@ -153,45 +179,15 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm
}
private void evictOrRemoveAllTest(String configName) throws Exception {
final StandardServiceRegistryBuilder ssrb = createStandardServiceRegistryBuilder();
StandardServiceRegistry registry1 = ssrb.build();
StandardServiceRegistry registry2 = ssrb.build();
withSessionFactoriesAndRegions(2, (sessionFactories, regions) -> {
GeneralDataRegion localRegion = regions.get(0);
GeneralDataRegion remoteRegion = regions.get(1);
AdvancedCache localCache = ((BaseGeneralDataRegion) localRegion).getCache();
AdvancedCache remoteCache = ((BaseGeneralDataRegion) remoteRegion).getCache();
SessionImplementor localSession = (SessionImplementor) sessionFactories.get(0).openSession();
SessionImplementor remoteSession = (SessionImplementor) sessionFactories.get(1).openSession();
try {
final Properties properties = CacheTestUtil.toProperties( ssrb.getSettings() );
InfinispanRegionFactory regionFactory = CacheTestUtil.startRegionFactory(
registry1,
getCacheTestSupport()
);
AdvancedCache localCache = getInfinispanCache( regionFactory );
// Sleep a bit to avoid concurrent FLUSH problem
avoidConcurrentFlush();
GeneralDataRegion localRegion = (GeneralDataRegion) createRegion(
regionFactory,
getStandardRegionName( REGION_PREFIX ),
properties,
null
);
regionFactory = CacheTestUtil.startRegionFactory(
registry2,
getCacheTestSupport()
);
AdvancedCache remoteCache = getInfinispanCache( regionFactory );
// Sleep a bit to avoid concurrent FLUSH problem
avoidConcurrentFlush();
GeneralDataRegion remoteRegion = (GeneralDataRegion) createRegion(
regionFactory,
getStandardRegionName( REGION_PREFIX ),
properties,
null
);
Set keys = localCache.keySet();
assertEquals( "No valid children in " + keys, 0, getValidKeyCount( keys ) );
@ -201,13 +197,13 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm
assertNull( "local is clean", localRegion.get(null, KEY ) );
assertNull( "remote is clean", remoteRegion.get(null, KEY ) );
regionPut(localRegion);
localRegion.put(localSession, KEY, VALUE1);
assertEquals( VALUE1, localRegion.get(null, KEY ) );
// Allow async propagation
sleep( 250 );
regionPut(remoteRegion);
remoteRegion.put(remoteSession, KEY, VALUE1);
assertEquals( VALUE1, remoteRegion.get(null, KEY ) );
// Allow async propagation
@ -229,19 +225,11 @@ public abstract class AbstractGeneralDataRegionTestCase extends AbstractRegionIm
assertEquals( "local is clean", null, localRegion.get(null, KEY ) );
assertEquals( "remote is clean", null, remoteRegion.get(null, KEY ) );
}
finally {
StandardServiceRegistryBuilder.destroy( registry1 );
StandardServiceRegistryBuilder.destroy( registry2 );
}
} finally {
( (Session) localSession).close();
( (Session) remoteSession).close();
}
protected void rollback() {
try {
BatchModeTransactionManager.getInstance().rollback();
}
catch (Exception e) {
log.error( e.getMessage(), e );
}
});
}
}

View File

@ -6,36 +6,44 @@
*/
package org.hibernate.test.cache.infinispan.query;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.hibernate.boot.registry.StandardServiceRegistry;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.Transaction;
import org.hibernate.boot.registry.StandardServiceRegistryBuilder;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.infinispan.query.QueryResultsRegionImpl;
import org.hibernate.cache.internal.StandardQueryCache;
import org.hibernate.cache.spi.CacheDataDescription;
import org.hibernate.cache.spi.GeneralDataRegion;
import org.hibernate.cache.spi.QueryResultsRegion;
import org.hibernate.cache.spi.Region;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.test.cache.infinispan.AbstractGeneralDataRegionTestCase;
import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
import junit.framework.AssertionFailedError;
import org.hibernate.testing.TestForIssue;
import org.infinispan.AdvancedCache;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryVisited;
import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryVisitedEvent;
import org.infinispan.transaction.tm.BatchModeTransactionManager;
import org.infinispan.util.concurrent.IsolationLevel;
import org.jboss.logging.Logger;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
/**
@ -61,83 +69,45 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase {
return regionPrefix + "/" + StandardQueryCache.class.getName();
}
@Override
protected void regionPut(final GeneralDataRegion region) throws Exception {
Caches.withinTx(BatchModeTransactionManager.getInstance(), new Callable<Void>() {
@Override
public Void call() throws Exception {
region.put(null, KEY, VALUE1);
return null;
}
});
}
@Override
protected void regionEvict(final GeneralDataRegion region) throws Exception {
Caches.withinTx(BatchModeTransactionManager.getInstance(), new Callable<Void>() {
@Override
public Void call() throws Exception {
region.evict(KEY);
return null;
}
});
}
@Override
protected AdvancedCache getInfinispanCache(InfinispanRegionFactory regionFactory) {
return regionFactory.getCacheManager().getCache( "local-query" ).getAdvancedCache();
return regionFactory.getCacheManager().getCache( getStandardRegionName( REGION_PREFIX ) ).getAdvancedCache();
}
@Override
protected StandardServiceRegistryBuilder createStandardServiceRegistryBuilder() {
return CacheTestUtil.buildCustomQueryCacheStandardServiceRegistryBuilder( "test", "replicated-query" );
return CacheTestUtil.buildCustomQueryCacheStandardServiceRegistryBuilder( REGION_PREFIX, "replicated-query" );
}
private void putDoesNotBlockGetTest() throws Exception {
StandardServiceRegistryBuilder ssrb = createStandardServiceRegistryBuilder();
StandardServiceRegistry registry = ssrb.build();
try {
final Properties properties = CacheTestUtil.toProperties( ssrb.getSettings() );
private interface RegionConsumer {
void accept(SessionFactory sessionFactory, QueryResultsRegion region) throws Exception;
}
InfinispanRegionFactory regionFactory = CacheTestUtil.startRegionFactory(
registry,
getCacheTestSupport()
);
private void withQueryRegion(RegionConsumer callable) throws Exception {
withSessionFactoriesAndRegions(1, (sessionFactories, regions) -> callable.accept(sessionFactories.get(0), (QueryResultsRegion) regions.get(0)));
}
// Sleep a bit to avoid concurrent FLUSH problem
avoidConcurrentFlush();
@Test
public void testPutDoesNotBlockGet() throws Exception {
withQueryRegion((sessionFactory, region) -> {
withSession(sessionFactory, session -> region.put(session, KEY, VALUE1));
assertEquals(VALUE1, callWithSession(sessionFactory, session -> region.get(session, KEY)));
final QueryResultsRegion region = regionFactory.buildQueryResultsRegion(
getStandardRegionName( REGION_PREFIX ),
properties
);
region.put(null, KEY, VALUE1 );
assertEquals( VALUE1, region.get(null, KEY ) );
final CountDownLatch readerLatch = new CountDownLatch( 1 );
final CountDownLatch writerLatch = new CountDownLatch( 1 );
final CountDownLatch completionLatch = new CountDownLatch( 1 );
final CountDownLatch readerLatch = new CountDownLatch(1);
final CountDownLatch writerLatch = new CountDownLatch(1);
final CountDownLatch completionLatch = new CountDownLatch(1);
final ExceptionHolder holder = new ExceptionHolder();
Thread reader = new Thread() {
@Override
public void run() {
try {
BatchModeTransactionManager.getInstance().begin();
log.debug( "Transaction began, get value for key" );
assertTrue( VALUE2.equals( region.get(null, KEY ) ) == false );
BatchModeTransactionManager.getInstance().commit();
}
catch (AssertionFailedError e) {
holder.a1 = e;
rollback();
}
catch (Exception e) {
holder.e1 = e;
rollback();
}
finally {
assertNotEquals(VALUE2, callWithSession(sessionFactory, session-> region.get(session, KEY)));
} catch (AssertionFailedError e) {
holder.addAssertionFailure(e);
} catch (Exception e) {
holder.addException(e);
} finally {
readerLatch.countDown();
}
}
@ -147,18 +117,74 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase {
@Override
public void run() {
try {
BatchModeTransactionManager.getInstance().begin();
log.debug( "Put value2" );
region.put(null, KEY, VALUE2 );
log.debug( "Put finished for value2, await writer latch" );
withSession(sessionFactory, session -> {
region.put((SessionImplementor) session, KEY, VALUE2);
writerLatch.await();
log.debug( "Writer latch finished" );
BatchModeTransactionManager.getInstance().commit();
log.debug( "Transaction committed" );
});
} catch (Exception e) {
holder.addException(e);
} finally {
completionLatch.countDown();
}
}
};
reader.setDaemon(true);
writer.setDaemon(true);
writer.start();
assertFalse("Writer is blocking", completionLatch.await(100, TimeUnit.MILLISECONDS));
// Start the reader
reader.start();
assertTrue("Reader finished promptly", readerLatch.await(1000000000, TimeUnit.MILLISECONDS));
writerLatch.countDown();
assertTrue("Reader finished promptly", completionLatch.await(100, TimeUnit.MILLISECONDS));
assertEquals(VALUE2, region.get(null, KEY));
});
}
@Test
public void testGetDoesNotBlockPut() throws Exception {
withQueryRegion((sessionFactory, region) -> {
withSession(sessionFactory, session -> region.put( session, KEY, VALUE1 ));
assertEquals(VALUE1, callWithSession(sessionFactory, session -> region.get( session, KEY )));
final AdvancedCache cache = ((QueryResultsRegionImpl) region).getCache();
final CountDownLatch blockerLatch = new CountDownLatch( 1 );
final CountDownLatch writerLatch = new CountDownLatch( 1 );
final CountDownLatch completionLatch = new CountDownLatch( 1 );
final ExceptionHolder holder = new ExceptionHolder();
Thread reader = new Thread() {
@Override
public void run() {
GetBlocker blocker = new GetBlocker( blockerLatch, KEY );
try {
cache.addListener( blocker );
withSession(sessionFactory, session -> region.get(session, KEY ));
}
catch (Exception e) {
holder.e2 = e;
rollback();
holder.addException(e);
}
finally {
cache.removeListener( blocker );
}
}
};
Thread writer = new Thread() {
@Override
public void run() {
try {
writerLatch.await();
withSession(sessionFactory, session -> region.put( session, KEY, VALUE2 ));
}
catch (Exception e) {
holder.addException(e);
}
finally {
completionLatch.countDown();
@ -169,119 +195,12 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase {
reader.setDaemon( true );
writer.setDaemon( true );
writer.start();
assertFalse( "Writer is blocking", completionLatch.await( 100, TimeUnit.MILLISECONDS ) );
// Start the reader
reader.start();
assertTrue( "Reader finished promptly", readerLatch.await( 1000000000, TimeUnit.MILLISECONDS ) );
writerLatch.countDown();
assertTrue( "Reader finished promptly", completionLatch.await( 100, TimeUnit.MILLISECONDS ) );
assertEquals( VALUE2, region.get(null, KEY ) );
if ( holder.a1 != null ) {
throw holder.a1;
}
else if ( holder.a2 != null ) {
throw holder.a2;
}
assertEquals( "writer saw no exceptions", null, holder.e1 );
assertEquals( "reader saw no exceptions", null, holder.e2 );
}
finally {
StandardServiceRegistryBuilder.destroy( registry );
}
}
public void testGetDoesNotBlockPut() throws Exception {
getDoesNotBlockPutTest();
}
private void getDoesNotBlockPutTest() throws Exception {
StandardServiceRegistryBuilder ssrb = createStandardServiceRegistryBuilder();
StandardServiceRegistry registry = ssrb.build();
try {
final Properties properties = CacheTestUtil.toProperties( ssrb.getSettings() );
InfinispanRegionFactory regionFactory = CacheTestUtil.startRegionFactory(
registry,
getCacheTestSupport()
);
// Sleep a bit to avoid concurrent FLUSH problem
avoidConcurrentFlush();
final QueryResultsRegion region = regionFactory.buildQueryResultsRegion(
getStandardRegionName( REGION_PREFIX ),
properties
);
region.put(null, KEY, VALUE1 );
assertEquals( VALUE1, region.get(null, KEY ) );
// final Fqn rootFqn = getRegionFqn(getStandardRegionName(REGION_PREFIX), REGION_PREFIX);
final AdvancedCache jbc = getInfinispanCache(regionFactory);
final CountDownLatch blockerLatch = new CountDownLatch( 1 );
final CountDownLatch writerLatch = new CountDownLatch( 1 );
final CountDownLatch completionLatch = new CountDownLatch( 1 );
final ExceptionHolder holder = new ExceptionHolder();
Thread blocker = new Thread() {
@Override
public void run() {
// Fqn toBlock = new Fqn(rootFqn, KEY);
GetBlocker blocker = new GetBlocker( blockerLatch, KEY );
try {
jbc.addListener( blocker );
BatchModeTransactionManager.getInstance().begin();
region.get(null, KEY );
BatchModeTransactionManager.getInstance().commit();
}
catch (Exception e) {
holder.e1 = e;
rollback();
}
finally {
jbc.removeListener( blocker );
}
}
};
Thread writer = new Thread() {
@Override
public void run() {
try {
writerLatch.await();
BatchModeTransactionManager.getInstance().begin();
region.put(null, KEY, VALUE2 );
BatchModeTransactionManager.getInstance().commit();
}
catch (Exception e) {
holder.e2 = e;
rollback();
}
finally {
completionLatch.countDown();
}
}
};
blocker.setDaemon( true );
writer.setDaemon( true );
boolean unblocked = false;
try {
blocker.start();
reader.start();
writer.start();
assertFalse( "Blocker is blocking", completionLatch.await( 100, TimeUnit.MILLISECONDS ) );
assertFalse( "Reader is blocking", completionLatch.await( 100, TimeUnit.MILLISECONDS ) );
// Start the writer
writerLatch.countDown();
assertTrue( "Writer finished promptly", completionLatch.await( 100, TimeUnit.MILLISECONDS ) );
@ -289,45 +208,117 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase {
blockerLatch.countDown();
unblocked = true;
if ( IsolationLevel.REPEATABLE_READ.equals( jbc.getCacheConfiguration().locking().isolationLevel() ) ) {
assertEquals( VALUE1, region.get(null, KEY ) );
if ( IsolationLevel.REPEATABLE_READ.equals( cache.getCacheConfiguration().locking().isolationLevel() ) ) {
assertEquals( VALUE1, callWithSession(sessionFactory, session -> region.get( session, KEY )) );
}
else {
assertEquals( VALUE2, region.get(null, KEY ) );
assertEquals( VALUE2, callWithSession(sessionFactory, session -> region.get( session, KEY )) );
}
if ( holder.a1 != null ) {
throw holder.a1;
}
else if ( holder.a2 != null ) {
throw holder.a2;
}
assertEquals( "blocker saw no exceptions", null, holder.e1 );
assertEquals( "writer saw no exceptions", null, holder.e2 );
holder.checkExceptions();
}
finally {
if ( !unblocked ) {
blockerLatch.countDown();
}
}
});
}
finally {
StandardServiceRegistryBuilder.destroy( registry );
protected interface SessionConsumer {
void accept(SessionImplementor session) throws Exception;
}
protected interface SessionCallable<T> {
T call(SessionImplementor session) throws Exception;
}
protected <T> T callWithSession(SessionFactory sessionFactory, SessionCallable<T> callable) throws Exception {
Session session = sessionFactory.openSession();
Transaction tx = session.getTransaction();
tx.begin();
try {
T retval = callable.call((SessionImplementor) session);
tx.commit();
return retval;
} catch (Exception e) {
tx.rollback();
throw e;
} finally {
session.close();
}
}
protected void withSession(SessionFactory sessionFactory, SessionConsumer consumer) throws Exception {
callWithSession(sessionFactory, session -> { consumer.accept(session); return null;} );
}
@Test
@TestForIssue(jiraKey = "HHH-7898")
public void testPutDuringPut() throws Exception {
withQueryRegion((sessionFactory, region) -> {
withSession(sessionFactory, session -> region.put(session, KEY, VALUE1));
assertEquals(VALUE1, callWithSession(sessionFactory, session -> region.get(session, KEY) ));
final AdvancedCache cache = ((QueryResultsRegionImpl) region).getCache();
CountDownLatch blockerLatch = new CountDownLatch(1);
CountDownLatch triggerLatch = new CountDownLatch(1);
ExceptionHolder holder = new ExceptionHolder();
Thread blocking = new Thread() {
@Override
public void run() {
PutBlocker blocker = null;
try {
blocker = new PutBlocker(blockerLatch, triggerLatch, KEY);
cache.addListener(blocker);
withSession(sessionFactory, session -> region.put(session, KEY, VALUE2));
} catch (Exception e) {
holder.addException(e);
} finally {
if (blocker != null) {
cache.removeListener(blocker);
}
if (triggerLatch.getCount() > 0) {
triggerLatch.countDown();
}
}
}
};
Thread blocked = new Thread() {
@Override
public void run() {
try {
triggerLatch.await();
// this should silently fail
withSession(sessionFactory, session -> region.put(session, KEY, VALUE3));
} catch (Exception e) {
holder.addException(e);
}
}
};
blocking.setName("blocking-thread");
blocking.start();
blocked.setName("blocked-thread");
blocked.start();
blocked.join();
blockerLatch.countDown();
blocking.join();
holder.checkExceptions();
assertEquals(VALUE2, callWithSession(sessionFactory, session -> region.get(session, KEY)));
});
}
@Listener
public class GetBlocker {
private final CountDownLatch latch;
private final Object key;
private CountDownLatch latch;
// private Fqn fqn;
private Object key;
GetBlocker(
CountDownLatch latch,
Object key
) {
GetBlocker(CountDownLatch latch, Object key) {
this.latch = latch;
this.key = key;
}
@ -345,10 +336,57 @@ public class QueryRegionImplTestCase extends AbstractGeneralDataRegionTestCase {
}
}
@Listener
public class PutBlocker {
private final CountDownLatch blockLatch, triggerLatch;
private final Object key;
private boolean enabled = true;
PutBlocker(CountDownLatch blockLatch, CountDownLatch triggerLatch, Object key) {
this.blockLatch = blockLatch;
this.triggerLatch = triggerLatch;
this.key = key;
}
@CacheEntryModified
public void nodeVisisted(CacheEntryModifiedEvent event) {
// we need isPre since lock is acquired in the commit phase
if ( !event.isPre() && event.getKey().equals( key ) ) {
try {
synchronized (this) {
if (enabled) {
triggerLatch.countDown();
enabled = false;
blockLatch.await();
}
}
}
catch (InterruptedException e) {
log.error( "Interrupted waiting for latch", e );
}
}
}
}
private class ExceptionHolder {
Exception e1;
Exception e2;
AssertionFailedError a1;
AssertionFailedError a2;
private final List<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
private final List<AssertionFailedError> assertionFailures = Collections.synchronizedList(new ArrayList<>());
public void addException(Exception e) {
exceptions.add(e);
}
public void addAssertionFailure(AssertionFailedError e) {
assertionFailures.add(e);
}
public void checkExceptions() throws Exception {
for (AssertionFailedError a : assertionFailures) {
throw a;
}
for (Exception e : exceptions) {
throw e;
}
}
}
}