HHH-11344 Testsuite speed-up

* reduce number of setups (@Before -> @BeforeClassOnce)
* remove sleeps related to JGroups flush (issue in a 6+ years old version)
* do not create new cache manager in CollectionRegionAccessStrategyTest#doPutFromLoadRemoveDoesNotProduceStaleDataInvalidation
* Share cache manager in some tests
* Replace system time with mocked time service where possible
* Replace sleeps with synchronization
* Disabled ConcurrentWriteTest.testMany (this is a stress test)
This commit is contained in:
Radim Vansa 2016-11-30 10:48:46 +01:00
parent 4ceb71f08a
commit a21706bf02
24 changed files with 474 additions and 536 deletions

View File

@ -102,6 +102,8 @@ public class PutFromLoadValidator {
*/
private final AdvancedCache cache;
private final InfinispanRegionFactory regionFactory;
/**
* Injected interceptor
*/
@ -139,6 +141,7 @@ public class PutFromLoadValidator {
* @param cacheManager where to find a cache to store pending put information
*/
public PutFromLoadValidator(AdvancedCache cache, InfinispanRegionFactory regionFactory, EmbeddedCacheManager cacheManager) {
this.regionFactory = regionFactory;
Configuration cacheConfiguration = cache.getCacheConfiguration();
Configuration pendingPutsConfiguration = regionFactory.getPendingPutsCacheConfiguration();
ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
@ -418,7 +421,7 @@ public class PutFromLoadValidator {
log.trace("Started invalidating region " + cache.getName());
}
boolean ok = true;
long now = System.currentTimeMillis();
long now = regionFactory.nextTimestamp();
// deny all puts until endInvalidatingRegion is called; at that time the region should be already
// in INVALID state, therefore all new requests should be blocked and ongoing should fail by timestamp
synchronized (this) {
@ -459,7 +462,7 @@ public class PutFromLoadValidator {
public void endInvalidatingRegion() {
synchronized (this) {
if (--regionInvalidations == 0) {
regionInvalidationTimestamp = System.currentTimeMillis();
regionInvalidationTimestamp = regionFactory.nextTimestamp();
if (trace) {
log.tracef("Finished invalidating region %s at %d", cache.getName(), regionInvalidationTimestamp);
}
@ -567,7 +570,7 @@ public class PutFromLoadValidator {
}
continue;
}
long now = System.currentTimeMillis();
long now = regionFactory.nextTimestamp();
pending.invalidate(now);
pending.addInvalidator(lockOwner, valueForPFER, now);
}
@ -608,7 +611,7 @@ public class PutFromLoadValidator {
}
if (pending.acquireLock(60, TimeUnit.SECONDS)) {
try {
long now = System.currentTimeMillis();
long now = regionFactory.nextTimestamp();
pending.removeInvalidator(lockOwner, key, now, doPFER);
// we can't remove the pending put yet because we wait for naked puts
// pendingPuts should be configured with maxIdle time so won't have memory leak
@ -803,7 +806,8 @@ public class PutFromLoadValidator {
*/
private void gc() {
assert fullMap != null;
long now = System.currentTimeMillis();
long now = regionFactory.nextTimestamp();
log.tracef("Contains %d, doing GC at %d, expiration %d", size(), now, expirationPeriod);
for ( Iterator<PendingPut> it = fullMap.values().iterator(); it.hasNext(); ) {
PendingPut pp = it.next();
if (pp.gc(now, expirationPeriod)) {

View File

@ -7,11 +7,10 @@ import org.hibernate.cache.spi.access.SoftLock;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.internal.util.compare.ComparableComparator;
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
import org.hibernate.test.cache.infinispan.util.TestingKeyFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.hibernate.testing.AfterClassOnce;
import org.hibernate.testing.BeforeClassOnce;
import org.infinispan.test.fwk.TestResourceTracker;
import org.junit.Test;
import static org.junit.Assert.assertNull;
@ -21,8 +20,6 @@ import static org.mockito.Mockito.mock;
* @author Radim Vansa &lt;rvansa@redhat.com&gt;
*/
public abstract class AbstractExtraAPITest<S extends RegionAccessStrategy> extends AbstractNonFunctionalTest {
@Rule
public InfinispanTestingSetup infinispanTestIdentifier = new InfinispanTestingSetup();
public static final String REGION_NAME = "test/com.foo.test";
public static final Object KEY = TestingKeyFactory.generateCollectionCacheKey( "KEY" );
@ -33,24 +30,23 @@ public abstract class AbstractExtraAPITest<S extends RegionAccessStrategy> exten
protected S accessStrategy;
protected NodeEnvironment environment;
@Before
@BeforeClassOnce
public final void prepareLocalAccessStrategy() throws Exception {
TestResourceTracker.testStarted(getClass().getSimpleName());
environment = new NodeEnvironment( createStandardServiceRegistryBuilder() );
environment.prepare();
// Sleep a bit to avoid concurrent FLUSH problem
avoidConcurrentFlush();
accessStrategy = getAccessStrategy();
}
protected abstract S getAccessStrategy();
@After
@AfterClassOnce
public final void releaseLocalAccessStrategy() throws Exception {
if ( environment != null ) {
environment.release();
}
TestResourceTracker.testFinished(getClass().getSimpleName());
}
@Test

View File

@ -7,13 +7,13 @@
package org.hibernate.test.cache.infinispan;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.hibernate.Session;
import org.hibernate.SessionFactory;
import org.hibernate.Transaction;
import org.hibernate.boot.MetadataSources;
@ -21,24 +21,28 @@ import org.hibernate.boot.registry.StandardServiceRegistry;
import org.hibernate.boot.registry.StandardServiceRegistryBuilder;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.impl.BaseGeneralDataRegion;
import org.hibernate.cache.infinispan.impl.BaseRegion;
import org.hibernate.cache.spi.GeneralDataRegion;
import org.hibernate.cache.spi.QueryResultsRegion;
import org.hibernate.cache.spi.Region;
import org.hibernate.cache.spi.RegionFactory;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.cfg.AvailableSettings;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
import org.hibernate.test.cache.infinispan.util.ExpectingInterceptor;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.junit.Test;
import org.infinispan.AdvancedCache;
import org.jboss.logging.Logger;
import static org.hibernate.test.cache.infinispan.util.CacheTestUtil.assertEqualsEventually;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* Base class for tests of QueryResultsRegion and TimestampsRegion.
@ -47,14 +51,18 @@ import static org.junit.Assert.assertNull;
* @since 3.5
*/
public abstract class AbstractGeneralDataRegionTest extends AbstractRegionImplTest {
private static final Logger log = Logger.getLogger( AbstractGeneralDataRegionTest.class );
protected static final String KEY = "Key";
protected static final String VALUE1 = "value1";
protected static final String VALUE2 = "value2";
protected static final String VALUE3 = "value3";
@Override
public List<Object[]> getCacheModeParameters() {
// the actual cache mode and access type is irrelevant for the general data regions
return Arrays.<Object[]>asList(new Object[]{ CacheMode.INVALIDATION_SYNC, AccessType.TRANSACTIONAL });
}
@Override
protected void putInRegion(Region region, Object key, Object value) {
((GeneralDataRegion) region).put(null, key, value );
@ -65,11 +73,6 @@ public abstract class AbstractGeneralDataRegionTest extends AbstractRegionImplTe
((GeneralDataRegion) region).evict( key );
}
@Test
public void testEvict() throws Exception {
evictOrRemoveTest();
}
protected interface SFRConsumer {
void accept(List<SessionFactory> sessionFactories, List<GeneralDataRegion> regions) throws Exception;
}
@ -109,17 +112,26 @@ public abstract class AbstractGeneralDataRegionTest extends AbstractRegionImplTe
}
}
private void evictOrRemoveTest() throws Exception {
@Test
public void testEvict() throws Exception {
withSessionFactoriesAndRegions(2, ((sessionFactories, regions) -> {
GeneralDataRegion localRegion = regions.get(0);
GeneralDataRegion remoteRegion = regions.get(1);
SharedSessionContractImplementor localSession = (SharedSessionContractImplementor) sessionFactories.get(0).openSession();
SharedSessionContractImplementor remoteSession = (SharedSessionContractImplementor) sessionFactories.get(1).openSession();
AdvancedCache localCache = ((BaseRegion) localRegion).getCache();
AdvancedCache remoteCache = ((BaseRegion) remoteRegion).getCache();
try {
assertNull("local is clean", localRegion.get(localSession, KEY));
assertNull("remote is clean", remoteRegion.get(remoteSession, KEY));
Transaction tx = ((Session) localSession).getTransaction();
// If this node is backup owner, it will see the update once as originator and then when getting the value from primary
boolean isLocalNodeBackupOwner = localCache.getDistributionManager().locate(KEY).indexOf(localCache.getCacheManager().getAddress()) > 0;
CountDownLatch insertLatch = new CountDownLatch(isLocalNodeBackupOwner ? 3 : 2);
ExpectingInterceptor.get(localCache).when((ctx, cmd) -> cmd instanceof PutKeyValueCommand).countDown(insertLatch);
ExpectingInterceptor.get(remoteCache).when((ctx, cmd) -> cmd instanceof PutKeyValueCommand).countDown(insertLatch);
Transaction tx = localSession.getTransaction();
tx.begin();
try {
localRegion.put(localSession, KEY, VALUE1);
@ -129,19 +141,24 @@ public abstract class AbstractGeneralDataRegionTest extends AbstractRegionImplTe
throw e;
}
Callable<Object> getFromLocalRegion = () -> localRegion.get(localSession, KEY);
Callable<Object> getFromRemoteRegion = () -> remoteRegion.get(remoteSession, KEY);
assertTrue(insertLatch.await(2, TimeUnit.SECONDS));
assertEquals(VALUE1, localRegion.get(localSession, KEY));
assertEquals(VALUE1, remoteRegion.get(remoteSession, KEY));
assertEqualsEventually(VALUE1, getFromLocalRegion, 10, TimeUnit.SECONDS);
assertEqualsEventually(VALUE1, getFromRemoteRegion, 10, TimeUnit.SECONDS);
CountDownLatch removeLatch = new CountDownLatch(isLocalNodeBackupOwner ? 3 : 2);
ExpectingInterceptor.get(localCache).when((ctx, cmd) -> cmd instanceof RemoveCommand).countDown(removeLatch);
ExpectingInterceptor.get(remoteCache).when((ctx, cmd) -> cmd instanceof RemoveCommand).countDown(removeLatch);
regionEvict(localRegion);
assertEqualsEventually(null, getFromLocalRegion, 10, TimeUnit.SECONDS);
assertEqualsEventually(null, getFromRemoteRegion, 10, TimeUnit.SECONDS);
assertTrue(removeLatch.await(2, TimeUnit.SECONDS));
assertEquals(null, localRegion.get(localSession, KEY));
assertEquals(null, remoteRegion.get(remoteSession, KEY));
} finally {
( (Session) localSession).close();
( (Session) remoteSession).close();
localSession.close();
remoteSession.close();
ExpectingInterceptor.cleanup(localCache, remoteCache);
}
}));
}
@ -159,10 +176,6 @@ public abstract class AbstractGeneralDataRegionTest extends AbstractRegionImplTe
* CollectionRegionAccessStrategy API.
*/
public void testEvictAll() throws Exception {
evictOrRemoveAllTest( "entity" );
}
private void evictOrRemoveAllTest(String configName) throws Exception {
withSessionFactoriesAndRegions(2, (sessionFactories, regions) -> {
GeneralDataRegion localRegion = regions.get(0);
GeneralDataRegion remoteRegion = regions.get(1);
@ -184,19 +197,11 @@ public abstract class AbstractGeneralDataRegionTest extends AbstractRegionImplTe
localRegion.put(localSession, KEY, VALUE1);
assertEquals( VALUE1, localRegion.get(null, KEY ) );
// Allow async propagation
sleep( 250 );
remoteRegion.put(remoteSession, KEY, VALUE1);
assertEquals( VALUE1, remoteRegion.get(null, KEY ) );
// Allow async propagation
sleep( 250 );
localRegion.evictAll();
// allow async propagation
sleep( 250 );
// This should re-establish the region root node in the optimistic case
assertNull( localRegion.get(null, KEY ) );
localKeys = localCache.keySet();
@ -212,8 +217,8 @@ public abstract class AbstractGeneralDataRegionTest extends AbstractRegionImplTe
assertEquals( "local is clean", null, localRegion.get(null, KEY ) );
assertEquals( "remote is clean", null, remoteRegion.get(null, KEY ) );
} finally {
( (Session) localSession).close();
( (Session) remoteSession).close();
localSession.close();
remoteSession.close();
}
});

View File

@ -189,18 +189,6 @@ public abstract class AbstractNonFunctionalTest extends org.hibernate.testing.ju
return testSupport;
}
protected void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
log.warn("Interrupted during sleep", e);
}
}
protected void avoidConcurrentFlush() {
testSupport.avoidConcurrentFlush();
}
protected StandardServiceRegistryBuilder createStandardServiceRegistryBuilder() {
final StandardServiceRegistryBuilder ssrb = CacheTestUtil.buildBaselineStandardServiceRegistryBuilder(
REGION_PREFIX, getRegionFactoryClass(), true, false, jtaPlatform);

View File

@ -11,7 +11,6 @@ import java.util.function.Predicate;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import org.hibernate.Session;
import org.hibernate.Transaction;
@ -42,7 +41,6 @@ import org.hibernate.service.ServiceRegistry;
import org.hibernate.test.cache.infinispan.util.BatchModeJtaPlatform;
import org.hibernate.test.cache.infinispan.util.BatchModeTransactionCoordinator;
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
import org.hibernate.test.cache.infinispan.util.ExpectingInterceptor;
import org.hibernate.test.cache.infinispan.util.JdbcResourceTransactionMock;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
@ -53,11 +51,8 @@ import org.hibernate.testing.BeforeClassOnce;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.test.fwk.TestResourceTracker;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import junit.framework.AssertionFailedError;
@ -82,9 +77,6 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
extends AbstractNonFunctionalTest {
protected final Logger log = Logger.getLogger(getClass());
@Rule
public InfinispanTestingSetup infinispanTestIdentifier = new InfinispanTestingSetup();
public static final String REGION_NAME = "test/com.foo.test";
public static final String KEY_BASE = "KEY";
public static final String VALUE1 = "VALUE1";
@ -117,8 +109,9 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
return false;
}
@Before
@BeforeClassOnce
public void prepareResources() throws Exception {
TestResourceTracker.testStarted( getClass().getSimpleName() );
// to mimic exactly the old code results, both environments here are exactly the same...
StandardServiceRegistryBuilder ssrb = createStandardServiceRegistryBuilder();
localEnvironment = new NodeEnvironment( ssrb );
@ -131,9 +124,6 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
invalidation = Caches.isInvalidationCache(localRegion.getCache());
synchronous = Caches.isSynchronousCache(localRegion.getCache());
// Sleep a bit to avoid concurrent FLUSH problem
avoidConcurrentFlush();
remoteEnvironment = new NodeEnvironment( ssrb );
remoteEnvironment.prepare();
@ -143,6 +133,29 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
waitForClusterToForm(localRegion.getCache(), remoteRegion.getCache());
}
@After
public void cleanup() {
cleanup.forEach(Runnable::run);
cleanup.clear();
if (localRegion != null) localRegion.getCache().clear();
if (remoteRegion != null) remoteRegion.getCache().clear();
}
@AfterClassOnce
public void releaseResources() throws Exception {
try {
if (localEnvironment != null) {
localEnvironment.release();
}
}
finally {
if (remoteEnvironment != null) {
remoteEnvironment.release();
}
}
TestResourceTracker.testFinished(getClass().getSimpleName());
}
@Override
protected StandardServiceRegistryBuilder createStandardServiceRegistryBuilder() {
StandardServiceRegistryBuilder ssrb = super.createStandardServiceRegistryBuilder();
@ -231,7 +244,7 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
node1.setDaemon(true);
node2.setDaemon(true);
CountDownLatch remoteUpdate = setupExpectAfterUpdate();
CountDownLatch remoteUpdate = expectAfterUpdate();
node1.start();
node2.start();
@ -255,11 +268,11 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
}
}
protected CountDownLatch setupExpectAfterUpdate() {
return setupExpectPutWithValue(value -> value instanceof FutureUpdate);
protected CountDownLatch expectAfterUpdate() {
return expectPutWithValue(value -> value instanceof FutureUpdate);
}
protected CountDownLatch setupExpectPutWithValue(Predicate<Object> valuePredicate) {
protected CountDownLatch expectPutWithValue(Predicate<Object> valuePredicate) {
if (!isUsingInvalidation() && accessType != AccessType.NONSTRICT_READ_WRITE) {
CountDownLatch latch = new CountDownLatch(1);
ExpectingInterceptor.get(remoteRegion.getCache())
@ -272,8 +285,8 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
}
}
protected CountDownLatch setupExpectPutFromLoad() {
return setupExpectPutWithValue(value -> value instanceof TombstoneUpdate);
protected CountDownLatch expectPutFromLoad() {
return expectPutWithValue(value -> value instanceof TombstoneUpdate);
}
protected abstract void doUpdate(S strategy, SharedSessionContractImplementor session, Object key, Object value, Object version) throws RollbackException, SystemException;
@ -350,27 +363,6 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
TestingUtil.blockUntilViewsReceived(10000, Arrays.asList(caches));
}
@After
public void cleanup() throws Exception {
for (Runnable runnable : cleanup) {
runnable.run();
}
cleanup.clear();
if (localRegion != null) localRegion.getCache().clear();
if (remoteRegion != null) remoteRegion.getCache().clear();
try {
if (localEnvironment != null) {
localEnvironment.release();
}
}
finally {
if (remoteEnvironment != null) {
remoteEnvironment.release();
}
}
}
protected boolean isTransactional() {
return transactional;
}
@ -399,7 +391,7 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
SharedSessionContractImplementor s3 = mockedSession();
localAccessStrategy.putFromLoad(s3, KEY, VALUE1, s3.getTimestamp(), 1);
SharedSessionContractImplementor s5 = mockedSession();
remoteAccessStrategy.putFromLoad(s5, KEY, VALUE1, s5.getTimestamp(), new Integer(1));
remoteAccessStrategy.putFromLoad(s5, KEY, VALUE1, s5.getTimestamp(), 1);
// putFromLoad is applied on local node synchronously, but if there's a concurrent update
// from the other node it can silently fail when acquiring the loc . Then we could try to read
@ -418,7 +410,7 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
localAccessStrategy.evict(KEY);
}
else {
doRemove(localRegion.getTransactionManager(), localAccessStrategy, session, KEY);
doRemove(localAccessStrategy, session, KEY);
}
return null;
});
@ -431,7 +423,7 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
assertEquals(0, remoteRegion.getCache().size());
}
protected void doRemove(TransactionManager tm, S strategy, SharedSessionContractImplementor session, Object key) throws SystemException, RollbackException {
protected void doRemove(S strategy, SharedSessionContractImplementor session, Object key) throws SystemException, RollbackException {
SoftLock softLock = strategy.lockItem(session, key, null);
strategy.remove(session, key);
session.getTransactionCoordinator().getLocalSynchronizations().registerSynchronization(
@ -501,7 +493,7 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
if (invalidation && !evict) {
// removeAll causes transactional remove commands which trigger EndInvalidationCommands on the remote side
// if the cache is non-transactional, PutFromLoadValidator.registerRemoteInvalidations cannot find
// current session nor register tx synchronization, so it falls back to simpe InvalidationCommand.
// current session nor register tx synchronization, so it falls back to simple InvalidationCommand.
endInvalidationLatch = new CountDownLatch(1);
if (transactional) {
PutFromLoadValidator originalValidator = PutFromLoadValidator.removeFromCache(remoteRegion.getCache());

View File

@ -6,6 +6,7 @@
*/
package org.hibernate.test.cache.infinispan;
import java.io.InputStream;
import java.util.Properties;
import java.util.function.BiConsumer;
import javax.transaction.TransactionManager;
@ -28,6 +29,9 @@ import org.hibernate.service.ServiceRegistry;
import org.hibernate.testing.ServiceRegistryBuilder;
import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
import org.infinispan.commons.util.FileLookupFactory;
import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
import org.infinispan.configuration.parsing.ParserRegistry;
import org.junit.Rule;
import org.junit.Test;
@ -306,7 +310,9 @@ public class InfinispanRegionFactoryTestCase {
public void testTimestampValidation() {
final String timestamps = "org.hibernate.cache.spi.UpdateTimestampsCache";
Properties p = createProperties();
final DefaultCacheManager manager = new DefaultCacheManager(GlobalConfigurationBuilder.defaultClusteredBuilder().build());
InputStream configStream = FileLookupFactory.newInstance().lookupFile(InfinispanRegionFactory.DEF_INFINISPAN_CONFIG_RESOURCE, getClass().getClassLoader());
ConfigurationBuilderHolder cbh = new ParserRegistry().parse(configStream);
DefaultCacheManager manager = new DefaultCacheManager(cbh, true);
ConfigurationBuilder builder = new ConfigurationBuilder();
builder.clustering().cacheMode(CacheMode.INVALIDATION_SYNC);
manager.defineConfiguration( DEF_TIMESTAMPS_RESOURCE, builder.build() );

View File

@ -7,6 +7,8 @@
package org.hibernate.test.cache.infinispan.access;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
@ -25,28 +27,32 @@ import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
import org.hibernate.cache.infinispan.util.InfinispanMessageLogger;
import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
import org.hibernate.test.cache.infinispan.util.TestTimeService;
import org.hibernate.testing.AfterClassOnce;
import org.hibernate.testing.BeforeClassOnce;
import org.hibernate.testing.TestForIssue;
import org.hibernate.test.cache.infinispan.functional.cluster.DualNodeJtaTransactionManagerImpl;
import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
import org.hibernate.testing.junit4.CustomRunner;
import org.infinispan.AdvancedCache;
import org.infinispan.test.fwk.TestResourceTracker;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.CacheManagerCallable;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.junit.runner.RunWith;
import static org.infinispan.test.TestingUtil.withCacheManager;
import static org.infinispan.test.Exceptions.expectException;
import static org.infinispan.test.TestingUtil.withTx;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
@ -57,43 +63,55 @@ import static org.mockito.Mockito.mock;
* @author Galder Zamarreño
* @version $Revision: $
*/
@RunWith(CustomRunner.class)
public class PutFromLoadValidatorUnitTest {
private static final InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(
PutFromLoadValidatorUnitTest.class);
@Rule
public InfinispanTestingSetup infinispanTestIdentifier = new InfinispanTestingSetup();
private static final TestTimeService TIME_SERVICE = new TestTimeService();
private Object KEY1 = "KEY1";
private TransactionManager tm;
private EmbeddedCacheManager cm;
private AdvancedCache<Object, Object> cache;
private List<Runnable> cleanup = new ArrayList<>();
@Before
@BeforeClassOnce
public void setUp() throws Exception {
TestResourceTracker.testStarted(getClass().getSimpleName());
tm = DualNodeJtaTransactionManagerImpl.getInstance("test");
cm = TestCacheManagerFactory.createCacheManager(true);
cache = cm.getCache().getAdvancedCache();
}
@AfterClassOnce
public void stop() {
tm = null;
cm.stop();
TestResourceTracker.testFinished(getClass().getSimpleName());
}
@After
public void tearDown() throws Exception {
tm = null;
cleanup.forEach(Runnable::run);
cleanup.clear();
try {
DualNodeJtaTransactionManagerImpl.cleanupTransactions();
}
finally {
DualNodeJtaTransactionManagerImpl.cleanupTransactionManagers();
}
}
private static EmbeddedCacheManager createCacheManager() {
EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createCacheManager(false);
return cacheManager;
cache.clear();
cm.getCache(cache.getName() + "-" + InfinispanRegionFactory.DEF_PENDING_PUTS_RESOURCE).clear();
}
private static InfinispanRegionFactory regionFactory(EmbeddedCacheManager cm) {
InfinispanRegionFactory regionFactory = new InfinispanRegionFactory();
Properties properties = new Properties();
properties.put(TestInfinispanRegionFactory.TIME_SERVICE, TIME_SERVICE);
InfinispanRegionFactory regionFactory = new TestInfinispanRegionFactory(properties);
regionFactory.setCacheManager(cm);
regionFactory.start(CacheTestUtil.sfOptionsForStart(), new Properties());
regionFactory.start(CacheTestUtil.sfOptionsForStart(), properties);
return regionFactory;
}
@ -107,13 +125,8 @@ public class PutFromLoadValidatorUnitTest {
}
private void nakedPutTest(final boolean transactional) throws Exception {
withCacheManager(new CacheManagerCallable(createCacheManager()) {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), regionFactory(cm));
exec(transactional, new NakedPut(testee, true));
}
});
PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory(cm));
exec(transactional, new NakedPut(testee, true));
}
@Test
@ -126,13 +139,8 @@ public class PutFromLoadValidatorUnitTest {
}
private void registeredPutTest(final boolean transactional) throws Exception {
withCacheManager(new CacheManagerCallable(createCacheManager()) {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), regionFactory(cm));
exec(transactional, new RegularPut(testee));
}
});
PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory(cm));
exec(transactional, new RegularPut(testee));
}
@Test
@ -154,17 +162,11 @@ public class PutFromLoadValidatorUnitTest {
private void nakedPutAfterRemovalTest(final boolean transactional,
final boolean removeRegion) throws Exception {
withCacheManager(new CacheManagerCallable(createCacheManager()) {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), regionFactory(cm));
Invalidation invalidation = new Invalidation(testee, removeRegion);
// the naked put can succeed because it has txTimestamp after invalidation
NakedPut nakedPut = new NakedPut(testee, true);
exec(transactional, invalidation, nakedPut);
}
});
PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory(cm));
Invalidation invalidation = new Invalidation(testee, removeRegion);
// the naked put can succeed because it has txTimestamp after invalidation
NakedPut nakedPut = new NakedPut(testee, true);
exec(transactional, invalidation, nakedPut);
}
@Test
@ -186,16 +188,10 @@ public class PutFromLoadValidatorUnitTest {
private void registeredPutAfterRemovalTest(final boolean transactional,
final boolean removeRegion) throws Exception {
withCacheManager(new CacheManagerCallable(createCacheManager()) {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), regionFactory(cm));
Invalidation invalidation = new Invalidation(testee, removeRegion);
RegularPut regularPut = new RegularPut(testee);
exec(transactional, invalidation, regularPut);
}
});
PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory(cm));
Invalidation invalidation = new Invalidation(testee, removeRegion);
RegularPut regularPut = new RegularPut(testee);
exec(transactional, invalidation, regularPut);
}
@Test
public void testRegisteredPutWithInterveningKeyRemoval() throws Exception {
@ -217,43 +213,38 @@ public class PutFromLoadValidatorUnitTest {
private void registeredPutWithInterveningRemovalTest(
final boolean transactional, final boolean removeRegion)
throws Exception {
withCacheManager(new CacheManagerCallable(createCacheManager()) {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), regionFactory(cm));
try {
long txTimestamp = System.currentTimeMillis();
if (transactional) {
tm.begin();
}
SharedSessionContractImplementor session1 = mock(SharedSessionContractImplementor.class);
SharedSessionContractImplementor session2 = mock(SharedSessionContractImplementor.class);
testee.registerPendingPut(session1, KEY1, txTimestamp);
if (removeRegion) {
testee.beginInvalidatingRegion();
} else {
testee.beginInvalidatingKey(session2, KEY1);
}
PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory(cm));
try {
long txTimestamp = TIME_SERVICE.wallClockTime();
if (transactional) {
tm.begin();
}
SharedSessionContractImplementor session1 = mock(SharedSessionContractImplementor.class);
SharedSessionContractImplementor session2 = mock(SharedSessionContractImplementor.class);
testee.registerPendingPut(session1, KEY1, txTimestamp);
if (removeRegion) {
testee.beginInvalidatingRegion();
} else {
testee.beginInvalidatingKey(session2, KEY1);
}
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(session1, KEY1, txTimestamp);
try {
assertNull(lock);
}
finally {
if (lock != null) {
testee.releasePutFromLoadLock(KEY1, lock);
}
if (removeRegion) {
testee.endInvalidatingRegion();
} else {
testee.endInvalidatingKey(session2, KEY1);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(session1, KEY1, txTimestamp);
try {
assertNull(lock);
}
finally {
if (lock != null) {
testee.releasePutFromLoadLock(KEY1, lock);
}
if (removeRegion) {
testee.endInvalidatingRegion();
} else {
testee.endInvalidatingKey(session2, KEY1);
}
}
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Test
@ -267,71 +258,57 @@ public class PutFromLoadValidatorUnitTest {
}
private void multipleRegistrationtest(final boolean transactional) throws Exception {
withCacheManager(new CacheManagerCallable(createCacheManager()) {
@Override
public void call() {
final PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), regionFactory(cm));
final PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory(cm));
final CountDownLatch registeredLatch = new CountDownLatch(3);
final CountDownLatch finishedLatch = new CountDownLatch(3);
final AtomicInteger success = new AtomicInteger();
final CountDownLatch registeredLatch = new CountDownLatch(3);
final CountDownLatch finishedLatch = new CountDownLatch(3);
final AtomicInteger success = new AtomicInteger();
Runnable r = new Runnable() {
public void run() {
try {
long txTimestamp = System.currentTimeMillis();
if (transactional) {
tm.begin();
}
SharedSessionContractImplementor session = mock (SharedSessionContractImplementor.class);
testee.registerPendingPut(session, KEY1, txTimestamp);
registeredLatch.countDown();
registeredLatch.await(5, TimeUnit.SECONDS);
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(session, KEY1, txTimestamp);
if (lock != null) {
try {
log.trace("Put from load lock acquired for key = " + KEY1);
success.incrementAndGet();
} finally {
testee.releasePutFromLoadLock(KEY1, lock);
}
} else {
log.trace("Unable to acquired putFromLoad lock for key = " + KEY1);
}
finishedLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
Runnable r = () -> {
try {
long txTimestamp = TIME_SERVICE.wallClockTime();
if (transactional) {
tm.begin();
}
SharedSessionContractImplementor session = mock (SharedSessionContractImplementor.class);
testee.registerPendingPut(session, KEY1, txTimestamp);
registeredLatch.countDown();
registeredLatch.await(5, TimeUnit.SECONDS);
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(session, KEY1, txTimestamp);
if (lock != null) {
try {
log.trace("Put from load lock acquired for key = " + KEY1);
success.incrementAndGet();
} finally {
testee.releasePutFromLoadLock(KEY1, lock);
}
};
ExecutorService executor = Executors.newFixedThreadPool(3);
// Start with a removal so the "isPutValid" calls will fail if
// any of the concurrent activity isn't handled properly
testee.beginInvalidatingRegion();
testee.endInvalidatingRegion();
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} else {
log.trace("Unable to acquired putFromLoad lock for key = " + KEY1);
}
// Do the registration + isPutValid calls
executor.execute(r);
executor.execute(r);
executor.execute(r);
try {
finishedLatch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertEquals("All threads succeeded", 3, success.get());
finishedLatch.countDown();
} catch (Exception e) {
e.printStackTrace();
}
});
};
ExecutorService executor = Executors.newFixedThreadPool(3);
cleanup.add(() -> executor.shutdownNow());
// Start with a removal so the "isPutValid" calls will fail if
// any of the concurrent activity isn't handled properly
testee.beginInvalidatingRegion();
testee.endInvalidatingRegion();
TIME_SERVICE.advance(1);
// Do the registration + isPutValid calls
executor.execute(r);
executor.execute(r);
executor.execute(r);
assertTrue(finishedLatch.await(5, TimeUnit.SECONDS));
assertEquals("All threads succeeded", 3, success.get());
}
@Test
@ -345,71 +322,55 @@ public class PutFromLoadValidatorUnitTest {
}
private void invalidationBlocksForInProgressPutTest(final boolean keyOnly) throws Exception {
withCacheManager(new CacheManagerCallable(createCacheManager()) {
@Override
public void call() {
final PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), regionFactory(cm));
final CountDownLatch removeLatch = new CountDownLatch(1);
final CountDownLatch pferLatch = new CountDownLatch(1);
final AtomicReference<Object> cache = new AtomicReference<Object>("INITIAL");
Callable<Boolean> pferCallable = new Callable<Boolean>() {
public Boolean call() throws Exception {
long txTimestamp = System.currentTimeMillis();
SharedSessionContractImplementor session = mock (SharedSessionContractImplementor.class);
testee.registerPendingPut(session, KEY1, txTimestamp);
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(session, KEY1, txTimestamp);
if (lock != null) {
try {
removeLatch.countDown();
pferLatch.await();
cache.set("PFER");
return Boolean.TRUE;
}
finally {
testee.releasePutFromLoadLock(KEY1, lock);
}
}
return Boolean.FALSE;
}
};
Callable<Void> invalidateCallable = new Callable<Void>() {
public Void call() throws Exception {
removeLatch.await();
if (keyOnly) {
SharedSessionContractImplementor session = mock (SharedSessionContractImplementor.class);
testee.beginInvalidatingKey(session, KEY1);
} else {
testee.beginInvalidatingRegion();
}
cache.set(null);
return null;
}
};
ExecutorService executorService = Executors.newCachedThreadPool();
Future<Boolean> pferFuture = executorService.submit(pferCallable);
Future<Void> invalidateFuture = executorService.submit(invalidateCallable);
final PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory(cm));
final CountDownLatch removeLatch = new CountDownLatch(1);
final CountDownLatch pferLatch = new CountDownLatch(1);
final AtomicReference<Object> cache = new AtomicReference<>("INITIAL");
Callable<Boolean> pferCallable = () -> {
long txTimestamp = TIME_SERVICE.wallClockTime();
SharedSessionContractImplementor session = mock (SharedSessionContractImplementor.class);
testee.registerPendingPut(session, KEY1, txTimestamp);
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(session, KEY1, txTimestamp);
if (lock != null) {
try {
try {
invalidateFuture.get(1, TimeUnit.SECONDS);
fail("invalidateFuture did not block");
}
catch (TimeoutException good) {}
pferLatch.countDown();
assertTrue(pferFuture.get(5, TimeUnit.SECONDS));
invalidateFuture.get(5, TimeUnit.SECONDS);
assertNull(cache.get());
} catch (Exception e) {
throw new RuntimeException(e);
removeLatch.countDown();
pferLatch.await();
cache.set("PFER");
return Boolean.TRUE;
}
finally {
testee.releasePutFromLoadLock(KEY1, lock);
}
}
});
return Boolean.FALSE;
};
Callable<Void> invalidateCallable = () -> {
removeLatch.await();
if (keyOnly) {
SharedSessionContractImplementor session = mock (SharedSessionContractImplementor.class);
testee.beginInvalidatingKey(session, KEY1);
} else {
testee.beginInvalidatingRegion();
}
cache.set(null);
return null;
};
ExecutorService executor = Executors.newCachedThreadPool();
cleanup.add(() -> executor.shutdownNow());
Future<Boolean> pferFuture = executor.submit(pferCallable);
Future<Void> invalidateFuture = executor.submit(invalidateCallable);
expectException(TimeoutException.class, () -> invalidateFuture.get(1, TimeUnit.SECONDS));
pferLatch.countDown();
assertTrue(pferFuture.get(5, TimeUnit.SECONDS));
invalidateFuture.get(5, TimeUnit.SECONDS);
assertNull(cache.get());
}
protected void exec(boolean transactional, Callable<?>... callables) {
@ -454,7 +415,7 @@ public class PutFromLoadValidatorUnitTest {
}
// if we go for the timestamp-based approach, invalidation in the same millisecond
// as the registerPendingPut/acquirePutFromLoad lock results in failure.
Thread.sleep(10);
TIME_SERVICE.advance(1);
return null;
}
}
@ -469,7 +430,7 @@ public class PutFromLoadValidatorUnitTest {
@Override
public Void call() throws Exception {
try {
long txTimestamp = System.currentTimeMillis(); // this should be acquired before UserTransaction.begin()
long txTimestamp = TIME_SERVICE.wallClockTime(); // this should be acquired before UserTransaction.begin()
SharedSessionContractImplementor session = mock (SharedSessionContractImplementor.class);
putFromLoadValidator.registerPendingPut(session, KEY1, txTimestamp);
@ -500,7 +461,7 @@ public class PutFromLoadValidatorUnitTest {
@Override
public Void call() throws Exception {
try {
long txTimestamp = System.currentTimeMillis(); // this should be acquired before UserTransaction.begin()
long txTimestamp = TIME_SERVICE.wallClockTime(); // this should be acquired before UserTransaction.begin()
SharedSessionContractImplementor session = mock (SharedSessionContractImplementor.class);
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(session, KEY1, txTimestamp);
try {
@ -525,59 +486,44 @@ public class PutFromLoadValidatorUnitTest {
@Test
@TestForIssue(jiraKey = "HHH-9928")
public void testGetForNullReleasePuts() {
EmbeddedCacheManager cm = createCacheManager();
InfinispanRegionFactory tmp = new InfinispanRegionFactory();
tmp.setCacheManager(cm);
ConfigurationBuilder cb = new ConfigurationBuilder();
cb.simpleCache(true).expiration().maxIdle(500);
Configuration ppCfg = cb.build();
cm.defineConfiguration(InfinispanRegionFactory.DEF_PENDING_PUTS_RESOURCE, cb.build());
InfinispanRegionFactory regionFactory = mock(InfinispanRegionFactory.class);
doReturn(ppCfg).when(regionFactory).getPendingPutsCacheConfiguration();
withCacheManager(new CacheManagerCallable(cm) {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), regionFactory, cm);
long lastInsert = Long.MAX_VALUE;
for (int i = 0; i < 100; ++i) {
lastInsert = System.currentTimeMillis();
try {
withTx(tm, new Callable<Object>() {
@Override
public Object call() throws Exception {
SharedSessionContractImplementor session = mock (SharedSessionContractImplementor.class);
testee.registerPendingPut(session, KEY1, 0);
return null;
}
});
Thread.sleep(10);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
String ppName = cm.getCache().getName() + "-" + InfinispanRegionFactory.DEF_PENDING_PUTS_RESOURCE;
Map ppCache = cm.getCache(ppName, false);
assertNotNull(ppCache);
Object pendingPutMap = ppCache.get(KEY1);
long end = System.currentTimeMillis();
if (end - lastInsert > 500) {
log.warn("Test took too long");
return;
}
assertNotNull(pendingPutMap);
int size;
try {
Method sizeMethod = pendingPutMap.getClass().getMethod("size");
sizeMethod.setAccessible(true);
size = (Integer) sizeMethod.invoke(pendingPutMap);
} catch (Exception e) {
throw new RuntimeException(e);
}
// some of the pending puts need to be expired by now
assertTrue(size < 100);
// but some are still registered
assertTrue(size > 0);
doAnswer(invocation -> TIME_SERVICE.wallClockTime()).when(regionFactory).nextTimestamp();
PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory, cm);
for (int i = 0; i < 100; ++i) {
try {
withTx(tm, () -> {
SharedSessionContractImplementor session = mock (SharedSessionContractImplementor.class);
testee.registerPendingPut(session, KEY1, 0);
return null;
});
TIME_SERVICE.advance(10);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
String ppName = cm.getCache().getName() + "-" + InfinispanRegionFactory.DEF_PENDING_PUTS_RESOURCE;
Map ppCache = cm.getCache(ppName, false);
assertNotNull(ppCache);
Object pendingPutMap = ppCache.get(KEY1);
assertNotNull(pendingPutMap);
int size;
try {
Method sizeMethod = pendingPutMap.getClass().getMethod("size");
sizeMethod.setAccessible(true);
size = (Integer) sizeMethod.invoke(pendingPutMap);
} catch (Exception e) {
throw new RuntimeException(e);
}
// some of the pending puts need to be expired by now
assertTrue(size < 100);
// but some are still registered
assertTrue(size > 0);
}
}

View File

@ -95,41 +95,41 @@ public class CollectionRegionAccessStrategyTest extends
}
}).when(mockValidator).acquirePutFromLoadLock(any(), any(), anyLong());
PutFromLoadValidator.addToCache(localRegion.getCache(), mockValidator);
try {
final AccessDelegate delegate = localRegion.getCache().getCacheConfiguration().transaction().transactionMode().isTransactional() ?
new TxInvalidationCacheAccessDelegate(localRegion, mockValidator) :
new NonTxInvalidationCacheAccessDelegate(localRegion, mockValidator);
ExecutorService executorService = Executors.newCachedThreadPool();
final String KEY = "k1";
Future<Void> pferFuture = executorService.submit(() -> {
SharedSessionContractImplementor session = mockedSession();
delegate.putFromLoad(session, KEY, "v1", session.getTimestamp(), null);
return null;
});
Future<Void> removeFuture = executorService.submit(() -> {
removeLatch.await();
SharedSessionContractImplementor session = mockedSession();
withTx(localEnvironment, session, () -> {
delegate.remove(session, KEY);
return null;
});
pferLatch.countDown();
return null;
});
pferFuture.get();
removeFuture.get();
assertFalse(localRegion.getCache().containsKey(KEY));
assertFalse(remoteRegion.getCache().containsKey(KEY));
} finally {
cleanup.add(() -> {
PutFromLoadValidator.removeFromCache(localRegion.getCache());
PutFromLoadValidator.addToCache(localRegion.getCache(), originalValidator);
}
});
final AccessDelegate delegate = localRegion.getCache().getCacheConfiguration().transaction().transactionMode().isTransactional() ?
new TxInvalidationCacheAccessDelegate(localRegion, mockValidator) :
new NonTxInvalidationCacheAccessDelegate(localRegion, mockValidator);
ExecutorService executorService = Executors.newCachedThreadPool();
cleanup.add(() -> executorService.shutdownNow());
final String KEY = "k1";
Future<Void> pferFuture = executorService.submit(() -> {
SharedSessionContractImplementor session = mockedSession();
delegate.putFromLoad(session, KEY, "v1", session.getTimestamp(), null);
return null;
});
Future<Void> removeFuture = executorService.submit(() -> {
removeLatch.await();
SharedSessionContractImplementor session = mockedSession();
withTx(localEnvironment, session, () -> {
delegate.remove(session, KEY);
return null;
});
pferLatch.countDown();
return null;
});
pferFuture.get();
removeFuture.get();
assertFalse(localRegion.getCache().containsKey(KEY));
assertFalse(remoteRegion.getCache().containsKey(KEY));
}
@Test

View File

@ -46,7 +46,7 @@ public class CollectionRegionImplTest extends AbstractEntityCollectionRegionTest
@Override
protected void putInRegion(Region region, Object key, Object value) {
CollectionRegionAccessStrategy strategy = ((CollectionRegion) region).buildAccessStrategy(AccessType.TRANSACTIONAL);
strategy.putFromLoad(null, key, value, System.currentTimeMillis(), new Integer(1));
strategy.putFromLoad(null, key, value, region.nextTimestamp(), new Integer(1));
}
@Override

View File

@ -8,11 +8,8 @@ package org.hibernate.test.cache.infinispan.entity;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.hibernate.cache.infinispan.entity.EntityRegionImpl;
import org.hibernate.cache.infinispan.util.FutureUpdate;
import org.hibernate.cache.infinispan.util.TombstoneUpdate;
import org.hibernate.cache.spi.access.AccessType;
import org.hibernate.cache.spi.access.EntityRegionAccessStrategy;
import org.hibernate.cache.spi.access.SoftLock;
@ -20,10 +17,8 @@ import org.hibernate.engine.spi.SharedSessionContractImplementor;
import org.hibernate.test.cache.infinispan.AbstractRegionAccessStrategyTest;
import org.hibernate.test.cache.infinispan.NodeEnvironment;
import org.hibernate.test.cache.infinispan.util.ExpectingInterceptor;
import org.hibernate.test.cache.infinispan.util.TestSynchronization;
import org.hibernate.test.cache.infinispan.util.TestingKeyFactory;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.junit.Ignore;
import org.junit.Test;
import junit.framework.AssertionFailedError;
@ -88,7 +83,7 @@ public class EntityRegionAccessStrategyTest extends
final CountDownLatch commitLatch = new CountDownLatch(1);
final CountDownLatch completionLatch = new CountDownLatch(2);
CountDownLatch asyncInsertLatch = setupExpectAfterUpdate();
CountDownLatch asyncInsertLatch = expectAfterUpdate();
Thread inserter = new Thread(() -> {
try {
@ -160,7 +155,7 @@ public class EntityRegionAccessStrategyTest extends
protected void putFromLoadTestReadOnly(boolean minimal) throws Exception {
final Object KEY = TestingKeyFactory.generateEntityCacheKey( KEY_BASE + testCount++ );
CountDownLatch remotePutFromLoadLatch = setupExpectPutFromLoad();
CountDownLatch remotePutFromLoadLatch = expectPutFromLoad();
SharedSessionContractImplementor session = mockedSession();
withTx(localEnvironment, session, () -> {
@ -202,7 +197,7 @@ public class EntityRegionAccessStrategyTest extends
remoteAccessStrategy.putFromLoad(s2, KEY, VALUE1, s2.getTimestamp(), 1);
// both nodes are updated, we don't have to wait for any async replication of putFromLoad
CountDownLatch asyncUpdateLatch = setupExpectAfterUpdate();
CountDownLatch asyncUpdateLatch = expectAfterUpdate();
final CountDownLatch readLatch = new CountDownLatch(1);
final CountDownLatch commitLatch = new CountDownLatch(1);

View File

@ -9,10 +9,14 @@ package org.hibernate.test.cache.infinispan.functional;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.function.Predicate;
import org.hibernate.Session;
import org.hibernate.boot.Metadata;
import org.hibernate.boot.spi.MetadataImplementor;
import org.hibernate.cache.infinispan.util.FutureUpdate;
import org.hibernate.cache.infinispan.util.TombstoneUpdate;
import org.hibernate.cache.internal.SimpleCacheKeysFactory;
import org.hibernate.cache.spi.RegionFactory;
import org.hibernate.cache.spi.access.AccessType;
@ -29,6 +33,7 @@ import org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLoca
import org.hibernate.resource.transaction.backend.jta.internal.JtaTransactionCoordinatorBuilderImpl;
import org.hibernate.resource.transaction.spi.TransactionCoordinatorBuilder;
import org.hibernate.test.cache.infinispan.util.ExpectingInterceptor;
import org.hibernate.testing.BeforeClassOnce;
import org.hibernate.testing.junit4.BaseNonConfigCoreFunctionalTestCase;
import org.hibernate.testing.junit4.CustomParameterized;
@ -37,6 +42,9 @@ import org.hibernate.test.cache.infinispan.tm.XaConnectionProvider;
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
import org.hibernate.test.cache.infinispan.util.TxUtil;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.junit.After;
import org.junit.ClassRule;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -89,6 +97,7 @@ public abstract class AbstractFunctionalTest extends BaseNonConfigCoreFunctional
public boolean addVersions;
protected boolean useJta;
protected List<Runnable> cleanup = new ArrayList<>();
@CustomParameterized.Order(0)
@Parameterized.Parameters(name = "{0}, {6}")
@ -121,6 +130,12 @@ public abstract class AbstractFunctionalTest extends BaseNonConfigCoreFunctional
useJta = jtaPlatformClass != null;
}
@After
public void runCleanup() {
cleanup.forEach(Runnable::run);
cleanup.clear();
}
@Override
public String[] getMappings() {
return new String[] {
@ -198,4 +213,25 @@ public abstract class AbstractFunctionalTest extends BaseNonConfigCoreFunctional
protected void markRollbackOnly(Session session) {
TxUtil.markRollbackOnly(useJta, session);
}
protected CountDownLatch expectAfterUpdate(AdvancedCache cache, int numUpdates) {
return expectPutWithValue(cache, value -> value instanceof FutureUpdate, numUpdates);
}
protected CountDownLatch expectEvict(AdvancedCache cache, int numUpdates) {
return expectPutWithValue(cache, value -> value instanceof TombstoneUpdate && ((TombstoneUpdate) value).getValue() == null, numUpdates);
}
protected CountDownLatch expectPutWithValue(AdvancedCache cache, Predicate<Object> valuePredicate, int numUpdates) {
if (!cacheMode.isInvalidation() && accessType != AccessType.NONSTRICT_READ_WRITE) {
CountDownLatch latch = new CountDownLatch(numUpdates);
ExpectingInterceptor.get(cache)
.when((ctx, cmd) -> cmd instanceof PutKeyValueCommand && valuePredicate.test(((PutKeyValueCommand) cmd).getValue()))
.countDown(latch);
cleanup.add(() -> ExpectingInterceptor.cleanup(cache));
return latch;
} else {
return new CountDownLatch(0);
}
}
}

View File

@ -110,9 +110,8 @@ public abstract class AbstractNonInvalidationTest extends SingleNodeTest {
@After
public void cleanup() throws Exception {
for (Runnable runnable : cleanup) {
runnable.run();
}
cleanup.forEach(Runnable::run);
cleanup.clear();
withTxSession(s -> {
s.createQuery("delete from Item").executeUpdate();
});

View File

@ -11,6 +11,7 @@ import java.io.StringWriter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
@ -27,6 +28,9 @@ import org.hibernate.stat.SecondLevelCacheStatistics;
import org.hibernate.test.cache.infinispan.functional.entities.Contact;
import org.hibernate.test.cache.infinispan.functional.entities.Customer;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
import org.hibernate.test.cache.infinispan.util.TestTimeService;
import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@ -48,6 +52,7 @@ public class ConcurrentWriteTest extends SingleNodeTest {
private static final int THINK_TIME_MILLIS = 10;
private static final long LAUNCH_INTERVAL_MILLIS = 10;
private static final Random random = new Random();
private static final TestTimeService TIME_SERVICE = new TestTimeService();
/**
* kill switch used to stop all users when one fails
@ -70,6 +75,12 @@ public class ConcurrentWriteTest extends SingleNodeTest {
TERMINATE_ALL_USERS = false;
}
@Override
protected void addSettings(Map settings) {
super.addSettings(settings);
settings.put(TestInfinispanRegionFactory.TIME_SERVICE, TIME_SERVICE);
}
@Override
protected void cleanupTest() throws Exception {
try {
@ -90,14 +101,14 @@ public class ConcurrentWriteTest extends SingleNodeTest {
// setup
sessionFactory().getStatistics().clear();
// wait a while to make sure that timestamp comparison works after invalidateRegion
Thread.sleep(1);
TIME_SERVICE.advance(1);
Customer customer = createCustomer( 0 );
final Integer customerId = customer.getId();
getCustomerIDs().add( customerId );
// wait a while to make sure that timestamp comparison works after collection remove (during insert)
Thread.sleep(1);
TIME_SERVICE.advance(1);
assertNull( "contact exists despite not being added", getFirstContact( customerId ) );
@ -134,6 +145,8 @@ public class ConcurrentWriteTest extends SingleNodeTest {
}
// Ignoring the test as it's more of a stress-test: this should be enabled manually
@Ignore
@Test
public void testManyUsers() throws Throwable {
try {
@ -153,7 +166,6 @@ public class ConcurrentWriteTest extends SingleNodeTest {
futures.add( future );
Thread.sleep( LAUNCH_INTERVAL_MILLIS ); // rampup
}
// barrier.await(); // wait for all threads to be ready
barrier.await( 2, TimeUnit.MINUTES ); // wait for all threads to finish
log.info( "All threads finished, let's shutdown the executor and check whether any exceptions were reported" );
for ( Future<Void> future : futures ) {
@ -367,7 +379,7 @@ public class ConcurrentWriteTest extends SingleNodeTest {
thinkRandomTime();
++completedIterations;
if ( trace ) {
log.tracef( "Iteration completed {0}", completedIterations );
log.tracef( "Iteration completed %d", completedIterations );
}
}
}

View File

@ -13,6 +13,8 @@ import org.hibernate.cache.infinispan.util.InfinispanMessageLogger;
import org.hibernate.stat.SecondLevelCacheStatistics;
import org.hibernate.stat.Statistics;
import org.hibernate.test.cache.infinispan.functional.entities.Item;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
import org.hibernate.test.cache.infinispan.util.TestTimeService;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
@ -25,7 +27,8 @@ import static org.junit.Assert.assertEquals;
* @since 4.1
*/
public class ReadOnlyTest extends SingleNodeTest {
static final InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(ReadOnlyTest.class);
protected static final InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(ReadOnlyTest.class);
protected static final TestTimeService TIME_SERVICE = new TestTimeService();
@Override
public List<Object[]> getParameters() {
@ -76,7 +79,7 @@ public class ReadOnlyTest extends SingleNodeTest {
log.info("Entry persisted, let's load and delete it.");
cleanupCache();
Thread.sleep(10);
TIME_SERVICE.advance(1);
withTxSession(s -> {
Item found = s.load(Item.class, item.getId());
@ -88,4 +91,10 @@ public class ReadOnlyTest extends SingleNodeTest {
s.delete(found);
});
}
@Override
protected void addSettings(Map settings) {
super.addSettings(settings);
settings.put(TestInfinispanRegionFactory.TIME_SERVICE, TIME_SERVICE);
}
}

View File

@ -85,7 +85,7 @@ public class ReadWriteTest extends ReadOnlyTest {
s.persist( another );
});
// The collection has been removed, but we can't add it again immediately using putFromLoad
Thread.sleep(1);
TIME_SERVICE.advance(1);
withTxSession(s -> {
Item loaded = s.load( Item.class, item.getId() );
@ -388,7 +388,7 @@ public class ReadWriteTest extends ReadOnlyTest {
SecondLevelCacheStatistics slcs = stats.getSecondLevelCacheStatistics( Item.class.getName() );
sessionFactory().getCache().evictEntityRegion( Item.class.getName() );
Thread.sleep(1);
TIME_SERVICE.advance(1);
assertEquals(0, slcs.getPutCount());
assertEquals( 0, slcs.getElementCountInMemory() );
@ -438,8 +438,8 @@ public class ReadWriteTest extends ReadOnlyTest {
// Delay added to guarantee that query cache results won't be considered
// as not up to date due to persist session and query results from first
// query happening within same 100ms gap.
Thread.sleep( 100 );
// query happening simultaneously.
TIME_SERVICE.advance(1);
withTxSession(s -> s.createQuery( "from Item" ).setCacheable( true ).list());
@ -461,8 +461,8 @@ public class ReadWriteTest extends ReadOnlyTest {
// Delay added to guarantee that query cache results won't be considered
// as not up to date due to persist session and query results from first
// query happening within same 100ms gap.
Thread.sleep( 100 );
// query happening simultaneously.
TIME_SERVICE.advance(1);
withTxSession(s -> {
s.createQuery("from Item").setCacheable(true).list();
@ -478,8 +478,8 @@ public class ReadWriteTest extends ReadOnlyTest {
saveSomeCitizens();
// Clear the cache before the transaction begins
ReadWriteTest.this.cleanupCache();
Thread.sleep(10);
cleanupCache();
TIME_SERVICE.advance(1);
withTxSession(s -> {
State france = ReadWriteTest.this.getState(s, "Ile de France");
@ -556,8 +556,8 @@ public class ReadWriteTest extends ReadOnlyTest {
});
// TODO: Clear caches manually via cache manager (it's faster!!)
this.cleanupCache();
Thread.sleep(10);
cleanupCache();
TIME_SERVICE.advance(1);
stats.setStatisticsEnabled( true );
stats.clear();
@ -616,7 +616,7 @@ public class ReadWriteTest extends ReadOnlyTest {
assertEquals(2, slcStats.getPutCount());
cache.evictEntityRegions();
Thread.sleep(10);
TIME_SERVICE.advance(1);
assertEquals(0, slcStats.getElementCountInMemory());
assertFalse("2lc entity cache is expected to not contain Citizen id = " + citizens.get(0).getId(),

View File

@ -115,15 +115,6 @@ public abstract class DualNodeTest extends AbstractFunctionalTest {
return JtaTransactionCoordinatorBuilderImpl.class;
}
protected void sleep(long ms) {
try {
Thread.sleep( ms );
}
catch (InterruptedException e) {
log.warn( "Interrupted during sleep", e );
}
}
protected void configureSecondNode(StandardServiceRegistryBuilder ssrb) {
}

View File

@ -63,7 +63,6 @@ import static org.mockito.Mockito.spy;
public class EntityCollectionInvalidationTest extends DualNodeTest {
private static final InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog( EntityCollectionInvalidationTest.class );
private static final long SLEEP_TIME = 50l;
private static final Integer CUSTOMER_ID = new Integer( 1 );
private EmbeddedCacheManager localManager, remoteManager;
@ -138,16 +137,10 @@ public class EntityCollectionInvalidationTest extends DualNodeTest {
assertTrue( remoteListener.isEmpty() );
assertTrue( localListener.isEmpty() );
// Sleep a bit to let async commit propagate. Really just to
// help keep the logs organized for debugging any issues
sleep( SLEEP_TIME );
log.debug( "Find node 0" );
// This actually brings the collection into the cache
getCustomer( ids.customerId, localFactory );
sleep( SLEEP_TIME );
// Now the collection is in the cache so, the 2nd "get"
// should read everything from the cache
log.debug( "Find(2) node 0" );
@ -182,7 +175,6 @@ public class EntityCollectionInvalidationTest extends DualNodeTest {
}
ids = modifyCustomer( ids.customerId, remoteFactory );
sleep( 250 );
assertLoadedFromCache( remoteListener, ids.customerId, ids.contactIds );
if (modifyLatch != null) {

View File

@ -8,6 +8,8 @@ package org.hibernate.test.cache.infinispan.functional.cluster;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.hibernate.Criteria;
import org.hibernate.Session;
@ -38,8 +40,6 @@ public class NaturalIdInvalidationTest extends DualNodeTest {
private static final InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(NaturalIdInvalidationTest.class);
private static final long SLEEP_TIME = 50l;
@Override
public List<Object[]> getParameters() {
return getParameters(true, true, true, true);
@ -77,20 +77,18 @@ public class NaturalIdInvalidationTest extends DualNodeTest {
assertTrue(remoteListener.isEmpty());
assertTrue(localListener.isEmpty());
CountDownLatch remoteUpdateLatch = expectAfterUpdate(remoteNaturalIdCache.getAdvancedCache(), 2);
saveSomeCitizens(localFactory);
assertTrue(remoteUpdateLatch.await(2, TimeUnit.SECONDS));
assertTrue(remoteListener.isEmpty());
assertTrue(localListener.isEmpty());
// Sleep a bit to let async commit propagate. Really just to
// help keep the logs organized for debugging any issues
sleep( SLEEP_TIME );
log.debug("Find node 0");
// This actually brings the collection into the cache
getCitizenWithCriteria(localFactory);
sleep( SLEEP_TIME );
// Now the collection is in the cache so, the 2nd "get"
// should read everything from the cache
log.debug( "Find(2) node 0" );
@ -117,8 +115,9 @@ public class NaturalIdInvalidationTest extends DualNodeTest {
// Modify customer in remote
remoteListener.clear();
CountDownLatch localUpdate = expectEvict(localNaturalIdCache.getAdvancedCache(), 1);
deleteCitizenWithCriteria(remoteFactory);
sleep(250);
assertTrue(localUpdate.await(2, TimeUnit.SECONDS));
Set localKeys = localNaturalIdCache.keySet();
assertEquals(1, localKeys.size());
@ -222,18 +221,6 @@ public class NaturalIdInvalidationTest extends DualNodeTest {
log.debug( event.toString() );
if ( !event.isPre() ) {
visited.add(event.getKey().toString());
// Integer primKey = (Integer) cacheKey.getKey();
// String key = (String) cacheKey.getEntityOrRoleName() + '#' + primKey;
// log.debug( "MyListener[" + name + "] - Visiting key " + key );
// // String name = fqn.toString();
// String token = ".functional.";
// int index = key.indexOf( token );
// if ( index > -1 ) {
// index += token.length();
// key = key.substring( index );
// log.debug( "MyListener[" + name + "] - recording visit to " + key );
// visited.add( key );
// }
}
}
}

View File

@ -30,6 +30,7 @@ import java.util.NavigableMap;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
@ -37,6 +38,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ -160,7 +162,7 @@ public abstract class CorrectnessTestCase {
return new HashMap<>();
}
};
private List<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
private BlockingDeque<Exception> exceptions = new LinkedBlockingDeque<>();
public String getDbName() {
return getClass().getName().replaceAll("\\W", "_");
@ -408,13 +410,8 @@ public abstract class CorrectnessTestCase {
}));
}
}
long testEnd = System.currentTimeMillis() + EXECUTION_TIME;
while (System.currentTimeMillis() < testEnd) {
if (!exceptions.isEmpty()) {
break;
}
Thread.sleep(1000);
}
Exception failure = exceptions.poll(EXECUTION_TIME, TimeUnit.SECONDS);
if (failure != null) exceptions.addFirst(failure);
running = false;
exec.shutdown();
if (!exec.awaitTermination(1000, TimeUnit.SECONDS)) throw new IllegalStateException();

View File

@ -73,15 +73,11 @@ public class TimestampsRegionImplTest extends AbstractGeneralDataRegionTest {
registry,
getCacheTestSupport()
);
// Sleep a bit to avoid concurrent FLUSH problem
avoidConcurrentFlush();
InfinispanRegionFactory regionFactory2 = CacheTestUtil.startRegionFactory(
registry2,
getCacheTestSupport()
);
// Sleep a bit to avoid concurrent FLUSH problem
avoidConcurrentFlush();
TimestampsRegionImpl region = (TimestampsRegionImpl) regionFactory.buildTimestampsRegion(
getStandardRegionName(REGION_PREFIX),

View File

@ -65,21 +65,6 @@ public class CacheTestSupport {
throwStoredException();
}
public void avoidConcurrentFlush() {
// JG 2.6.1 has a problem where calling flush more than once too quickly
// can result in several second delays
sleep( 100 );
}
private void sleep(long ms) {
try {
Thread.sleep(ms);
}
catch (InterruptedException e) {
log.warn("Interrupted during sleep", e);
}
}
private void cleanUp() {
for (Iterator it = factories.iterator(); it.hasNext(); ) {
try {
@ -105,7 +90,6 @@ public class CacheTestSupport {
finally {
it.remove();
}
avoidConcurrentFlush();
}
caches.clear();
}

View File

@ -173,39 +173,6 @@ public class CacheTestUtil {
return properties;
}
/**
* Executes {@link #assertEqualsEventually(Object, Callable, long, TimeUnit)} without time limit.
* @param expected
* @param callable
* @param <T>
*/
public static <T> void assertEqualsEventually(T expected, Callable<T> callable) throws Exception {
assertEqualsEventually(expected, callable, -1, TimeUnit.SECONDS);
}
/**
* Periodically calls callable and compares returned value with expected value. If the value matches to expected,
* the method returns. If callable throws an exception, this is propagated. If the returned value does not match to
* expected before timeout, {@link TimeoutException} is thrown.
* @param expected
* @param callable
* @param timeout If non-positive, there is no limit.
* @param timeUnit
* @param <T>
*/
public static <T> void assertEqualsEventually(T expected, Callable<T> callable, long timeout, TimeUnit timeUnit) throws Exception {
long now, deadline = timeout <= 0 ? Long.MAX_VALUE : System.currentTimeMillis() + timeUnit.toMillis(timeout);
for (;;) {
T value = callable.call();
if (EqualsHelper.equals(value, expected)) return;
now = System.currentTimeMillis();
if (now < deadline) {
Thread.sleep(Math.min(100, deadline - now));
} else break;
}
throw new TimeoutException();
}
public static SessionFactoryOptions sfOptionsForStart() {
return new SessionFactoryOptionsImpl(
new SessionFactoryBuilderImpl.SessionFactoryOptionsStateStandardImpl(

View File

@ -8,12 +8,14 @@ import org.infinispan.interceptors.base.BaseCustomInterceptor;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.function.BiPredicate;
import java.util.function.BooleanSupplier;
public class ExpectingInterceptor extends BaseCustomInterceptor {
@ -56,23 +58,36 @@ public class ExpectingInterceptor extends BaseCustomInterceptor {
succeeded = true;
return retval;
} finally {
log.tracef("After command %s", command);
log.tracef("After command(successful=%s) %s", succeeded, command);
List<Runnable> toExecute = new ArrayList<>();
synchronized (this) {
for (Iterator<Condition> iterator = conditions.iterator(); iterator.hasNext(); ) {
Condition condition = iterator.next();
log.tracef("Testing condition %s", condition);
if ((condition.success == null || condition.success == succeeded) && condition.predicate.test(ctx, command)) {
assert condition.action != null;
condition.action.run();
iterator.remove();
log.trace("Condition succeeded");
toExecute.add(condition.action);
if (condition.removeCheck == null || condition.removeCheck.getAsBoolean()) {
iterator.remove();
}
} else {
log.trace("Condition test failed");
}
}
}
// execute without holding the lock
for (Runnable runnable : toExecute) {
log.tracef("Executing %s", runnable);
runnable.run();
}
}
}
public class Condition {
private final BiPredicate<InvocationContext, VisitableCommand> predicate;
private final Boolean success;
private BooleanSupplier removeCheck;
private Runnable action;
public Condition(BiPredicate<InvocationContext, VisitableCommand> predicate, Boolean success) {
@ -80,20 +95,36 @@ public class ExpectingInterceptor extends BaseCustomInterceptor {
this.success = success;
}
public void run(Runnable action) {
public Condition run(Runnable action) {
assert this.action == null;
this.action = action;
return this;
}
public void countDown(CountDownLatch latch) {
assert action == null;
action = () -> latch.countDown();
public Condition countDown(CountDownLatch latch) {
return run(() -> latch.countDown()).removeWhen(() -> latch.getCount() == 0);
}
public Condition removeWhen(BooleanSupplier check) {
assert this.removeCheck == null;
this.removeCheck = check;
return this;
}
public void cancel() {
synchronized (ExpectingInterceptor.class) {
synchronized (ExpectingInterceptor.this) {
conditions.remove(this);
}
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("Condition{");
sb.append("predicate=").append(predicate);
sb.append(", success=").append(success);
sb.append(", action=").append(action);
sb.append('}');
return sb.toString();
}
}
}

View File

@ -41,8 +41,13 @@ public class TestInfinispanRegionFactory extends InfinispanRegionFactory {
@Override
protected EmbeddedCacheManager createCacheManager(ConfigurationBuilderHolder holder) {
// If the cache manager has been provided by calling setCacheManager, don't create a new one
EmbeddedCacheManager cacheManager = getCacheManager();
if (cacheManager != null) {
return cacheManager;
}
amendConfiguration(holder);
DefaultCacheManager cacheManager = new DefaultCacheManager(holder, true);
cacheManager = new DefaultCacheManager(holder, true);
if (timeService != null) {
cacheManager.getGlobalComponentRegistry().registerComponent(timeService, TimeService.class);
cacheManager.getGlobalComponentRegistry().rewire();