HHH-11344 Testsuite speed-up
* reduce number of setups (@Before -> @BeforeClassOnce)
* remove sleeps related to JGroups flush (issue in a 6+ years old version)
* do not create new cache manager in CollectionRegionAccessStrategyTest#doPutFromLoadRemoveDoesNotProduceStaleDataInvalidation
* Share cache manager in some tests
* Replace system time with mocked time service where possible
* Replace sleeps with synchronization
* Disabled ConcurrentWriteTest.testMany (this is a stress test)
(cherry picked from commit a21706bf02
)
This commit is contained in:
parent
4cc22a679d
commit
4a6c46dd8e
|
@ -101,6 +101,8 @@ public class PutFromLoadValidator {
|
|||
*/
|
||||
private final AdvancedCache cache;
|
||||
|
||||
private final InfinispanRegionFactory regionFactory;
|
||||
|
||||
/**
|
||||
* Injected interceptor
|
||||
*/
|
||||
|
@ -138,6 +140,7 @@ public class PutFromLoadValidator {
|
|||
* @param cacheManager where to find a cache to store pending put information
|
||||
*/
|
||||
public PutFromLoadValidator(AdvancedCache cache, InfinispanRegionFactory regionFactory, EmbeddedCacheManager cacheManager) {
|
||||
this.regionFactory = regionFactory;
|
||||
Configuration cacheConfiguration = cache.getCacheConfiguration();
|
||||
Configuration pendingPutsConfiguration = regionFactory.getPendingPutsCacheConfiguration();
|
||||
ConfigurationBuilder configurationBuilder = new ConfigurationBuilder();
|
||||
|
@ -417,7 +420,7 @@ public class PutFromLoadValidator {
|
|||
log.trace("Started invalidating region " + cache.getName());
|
||||
}
|
||||
boolean ok = true;
|
||||
long now = System.currentTimeMillis();
|
||||
long now = regionFactory.nextTimestamp();
|
||||
// deny all puts until endInvalidatingRegion is called; at that time the region should be already
|
||||
// in INVALID state, therefore all new requests should be blocked and ongoing should fail by timestamp
|
||||
synchronized (this) {
|
||||
|
@ -458,7 +461,7 @@ public class PutFromLoadValidator {
|
|||
public void endInvalidatingRegion() {
|
||||
synchronized (this) {
|
||||
if (--regionInvalidations == 0) {
|
||||
regionInvalidationTimestamp = System.currentTimeMillis();
|
||||
regionInvalidationTimestamp = regionFactory.nextTimestamp();
|
||||
if (trace) {
|
||||
log.tracef("Finished invalidating region %s at %d", cache.getName(), regionInvalidationTimestamp);
|
||||
}
|
||||
|
@ -566,7 +569,7 @@ public class PutFromLoadValidator {
|
|||
}
|
||||
continue;
|
||||
}
|
||||
long now = System.currentTimeMillis();
|
||||
long now = regionFactory.nextTimestamp();
|
||||
pending.invalidate(now);
|
||||
pending.addInvalidator(lockOwner, valueForPFER, now);
|
||||
}
|
||||
|
@ -607,7 +610,7 @@ public class PutFromLoadValidator {
|
|||
}
|
||||
if (pending.acquireLock(60, TimeUnit.SECONDS)) {
|
||||
try {
|
||||
long now = System.currentTimeMillis();
|
||||
long now = regionFactory.nextTimestamp();
|
||||
pending.removeInvalidator(lockOwner, key, now, doPFER);
|
||||
// we can't remove the pending put yet because we wait for naked puts
|
||||
// pendingPuts should be configured with maxIdle time so won't have memory leak
|
||||
|
@ -802,7 +805,8 @@ public class PutFromLoadValidator {
|
|||
*/
|
||||
private void gc() {
|
||||
assert fullMap != null;
|
||||
long now = System.currentTimeMillis();
|
||||
long now = regionFactory.nextTimestamp();
|
||||
log.tracef("Contains %d, doing GC at %d, expiration %d", size(), now, expirationPeriod);
|
||||
for ( Iterator<PendingPut> it = fullMap.values().iterator(); it.hasNext(); ) {
|
||||
PendingPut pp = it.next();
|
||||
if (pp.gc(now, expirationPeriod)) {
|
||||
|
|
|
@ -2,16 +2,15 @@ package org.hibernate.test.cache.infinispan;
|
|||
|
||||
import org.hibernate.cache.internal.CacheDataDescriptionImpl;
|
||||
import org.hibernate.cache.spi.CacheDataDescription;
|
||||
import org.hibernate.cache.spi.access.AccessType;
|
||||
import org.hibernate.cache.spi.access.RegionAccessStrategy;
|
||||
import org.hibernate.cache.spi.access.SoftLock;
|
||||
import org.hibernate.engine.spi.SessionImplementor;
|
||||
import org.hibernate.internal.util.compare.ComparableComparator;
|
||||
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
|
||||
|
||||
import org.hibernate.test.cache.infinispan.util.TestingKeyFactory;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.hibernate.testing.AfterClassOnce;
|
||||
import org.hibernate.testing.BeforeClassOnce;
|
||||
import org.infinispan.test.fwk.TestResourceTracker;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertNull;
|
||||
|
@ -21,8 +20,6 @@ import static org.mockito.Mockito.mock;
|
|||
* @author Radim Vansa <rvansa@redhat.com>
|
||||
*/
|
||||
public abstract class AbstractExtraAPITest<S extends RegionAccessStrategy> extends AbstractNonFunctionalTest {
|
||||
@Rule
|
||||
public InfinispanTestingSetup infinispanTestIdentifier = new InfinispanTestingSetup();
|
||||
|
||||
public static final String REGION_NAME = "test/com.foo.test";
|
||||
public static final Object KEY = TestingKeyFactory.generateCollectionCacheKey( "KEY" );
|
||||
|
@ -33,24 +30,23 @@ public abstract class AbstractExtraAPITest<S extends RegionAccessStrategy> exten
|
|||
protected S accessStrategy;
|
||||
protected NodeEnvironment environment;
|
||||
|
||||
@Before
|
||||
@BeforeClassOnce
|
||||
public final void prepareLocalAccessStrategy() throws Exception {
|
||||
TestResourceTracker.testStarted(getClass().getSimpleName());
|
||||
environment = new NodeEnvironment( createStandardServiceRegistryBuilder() );
|
||||
environment.prepare();
|
||||
|
||||
// Sleep a bit to avoid concurrent FLUSH problem
|
||||
avoidConcurrentFlush();
|
||||
|
||||
accessStrategy = getAccessStrategy();
|
||||
}
|
||||
|
||||
protected abstract S getAccessStrategy();
|
||||
|
||||
@After
|
||||
@AfterClassOnce
|
||||
public final void releaseLocalAccessStrategy() throws Exception {
|
||||
if ( environment != null ) {
|
||||
environment.release();
|
||||
}
|
||||
TestResourceTracker.testFinished(getClass().getSimpleName());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -7,36 +7,44 @@
|
|||
package org.hibernate.test.cache.infinispan;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.hibernate.Session;
|
||||
import org.hibernate.SessionFactory;
|
||||
import org.hibernate.SharedSessionContract;
|
||||
import org.hibernate.Transaction;
|
||||
import org.hibernate.boot.MetadataSources;
|
||||
import org.hibernate.boot.registry.StandardServiceRegistry;
|
||||
import org.hibernate.boot.registry.StandardServiceRegistryBuilder;
|
||||
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
|
||||
import org.hibernate.cache.infinispan.impl.BaseGeneralDataRegion;
|
||||
import org.hibernate.cache.infinispan.util.Caches;
|
||||
import org.hibernate.cache.infinispan.impl.BaseRegion;
|
||||
import org.hibernate.cache.spi.GeneralDataRegion;
|
||||
import org.hibernate.cache.spi.QueryResultsRegion;
|
||||
import org.hibernate.cache.spi.Region;
|
||||
import org.hibernate.cache.spi.RegionFactory;
|
||||
import org.hibernate.cache.spi.access.AccessType;
|
||||
import org.hibernate.cfg.AvailableSettings;
|
||||
import org.hibernate.engine.spi.SessionImplementor;
|
||||
import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
|
||||
import org.hibernate.test.cache.infinispan.util.ExpectingInterceptor;
|
||||
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
|
||||
import org.infinispan.AdvancedCache;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
import org.infinispan.commands.write.PutKeyValueCommand;
|
||||
import org.infinispan.commands.write.RemoveCommand;
|
||||
import org.infinispan.configuration.cache.CacheMode;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.hibernate.test.cache.infinispan.util.CacheTestUtil.assertEqualsEventually;
|
||||
import org.infinispan.AdvancedCache;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Base class for tests of QueryResultsRegion and TimestampsRegion.
|
||||
|
@ -45,14 +53,18 @@ import static org.junit.Assert.assertNull;
|
|||
* @since 3.5
|
||||
*/
|
||||
public abstract class AbstractGeneralDataRegionTest extends AbstractRegionImplTest {
|
||||
private static final Logger log = Logger.getLogger( AbstractGeneralDataRegionTest.class );
|
||||
|
||||
protected static final String KEY = "Key";
|
||||
|
||||
protected static final String VALUE1 = "value1";
|
||||
protected static final String VALUE2 = "value2";
|
||||
protected static final String VALUE3 = "value3";
|
||||
|
||||
@Override
|
||||
public List<Object[]> getCacheModeParameters() {
|
||||
// the actual cache mode and access type is irrelevant for the general data regions
|
||||
return Arrays.<Object[]>asList(new Object[]{ CacheMode.INVALIDATION_SYNC, AccessType.TRANSACTIONAL });
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void putInRegion(Region region, Object key, Object value) {
|
||||
((GeneralDataRegion) region).put(null, key, value );
|
||||
|
@ -63,11 +75,6 @@ public abstract class AbstractGeneralDataRegionTest extends AbstractRegionImplTe
|
|||
((GeneralDataRegion) region).evict( key );
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEvict() throws Exception {
|
||||
evictOrRemoveTest();
|
||||
}
|
||||
|
||||
protected interface SFRConsumer {
|
||||
void accept(List<SessionFactory> sessionFactories, List<GeneralDataRegion> regions) throws Exception;
|
||||
}
|
||||
|
@ -107,17 +114,28 @@ public abstract class AbstractGeneralDataRegionTest extends AbstractRegionImplTe
|
|||
}
|
||||
}
|
||||
|
||||
private void evictOrRemoveTest() throws Exception {
|
||||
@Test
|
||||
public void testEvict() throws Exception {
|
||||
withSessionFactoriesAndRegions(2, ((sessionFactories, regions) -> {
|
||||
GeneralDataRegion localRegion = regions.get(0);
|
||||
GeneralDataRegion remoteRegion = regions.get(1);
|
||||
SessionImplementor localSession = (SessionImplementor) sessionFactories.get(0).openSession();
|
||||
SessionImplementor remoteSession = (SessionImplementor) sessionFactories.get(1).openSession();
|
||||
AdvancedCache localCache = ((BaseRegion) localRegion).getCache();
|
||||
AdvancedCache remoteCache = ((BaseRegion) remoteRegion).getCache();
|
||||
try {
|
||||
assertNull("local is clean", localRegion.get(localSession, KEY));
|
||||
assertNull("remote is clean", remoteRegion.get(remoteSession, KEY));
|
||||
|
||||
Transaction tx = ((Session) localSession).getTransaction();
|
||||
// If this node is backup owner, it will see the update once as originator and then when getting the value from primary
|
||||
boolean isLocalNodeBackupOwner = localCache.getDistributionManager().locate(KEY).indexOf(localCache.getCacheManager().getAddress()) > 0;
|
||||
CountDownLatch insertLatch = new CountDownLatch(isLocalNodeBackupOwner ? 3 : 2);
|
||||
ExpectingInterceptor.get(localCache).when((ctx, cmd) -> cmd instanceof PutKeyValueCommand).countDown(insertLatch);
|
||||
ExpectingInterceptor.get(remoteCache).when((ctx, cmd) -> cmd instanceof PutKeyValueCommand).countDown(
|
||||
insertLatch
|
||||
);
|
||||
|
||||
Transaction tx = ( (SharedSessionContract) localSession ).getTransaction();
|
||||
tx.begin();
|
||||
try {
|
||||
localRegion.put(localSession, KEY, VALUE1);
|
||||
|
@ -127,19 +145,24 @@ public abstract class AbstractGeneralDataRegionTest extends AbstractRegionImplTe
|
|||
throw e;
|
||||
}
|
||||
|
||||
Callable<Object> getFromLocalRegion = () -> localRegion.get(localSession, KEY);
|
||||
Callable<Object> getFromRemoteRegion = () -> remoteRegion.get(remoteSession, KEY);
|
||||
assertTrue(insertLatch.await(2, TimeUnit.SECONDS));
|
||||
assertEquals(VALUE1, localRegion.get(localSession, KEY));
|
||||
assertEquals(VALUE1, remoteRegion.get(remoteSession, KEY));
|
||||
|
||||
assertEqualsEventually(VALUE1, getFromLocalRegion, 10, TimeUnit.SECONDS);
|
||||
assertEqualsEventually(VALUE1, getFromRemoteRegion, 10, TimeUnit.SECONDS);
|
||||
CountDownLatch removeLatch = new CountDownLatch(isLocalNodeBackupOwner ? 3 : 2);
|
||||
ExpectingInterceptor.get(localCache).when((ctx, cmd) -> cmd instanceof RemoveCommand).countDown(removeLatch);
|
||||
ExpectingInterceptor.get(remoteCache).when((ctx, cmd) -> cmd instanceof RemoveCommand).countDown(removeLatch);
|
||||
|
||||
regionEvict(localRegion);
|
||||
|
||||
assertEqualsEventually(null, getFromLocalRegion, 10, TimeUnit.SECONDS);
|
||||
assertEqualsEventually(null, getFromRemoteRegion, 10, TimeUnit.SECONDS);
|
||||
assertTrue(removeLatch.await(2, TimeUnit.SECONDS));
|
||||
assertEquals(null, localRegion.get(localSession, KEY));
|
||||
assertEquals(null, remoteRegion.get(remoteSession, KEY));
|
||||
} finally {
|
||||
( (Session) localSession).close();
|
||||
( (Session) remoteSession).close();
|
||||
( (Session) localSession ).close();
|
||||
( (Session) remoteSession ).close();
|
||||
|
||||
ExpectingInterceptor.cleanup(localCache, remoteCache);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
@ -157,10 +180,6 @@ public abstract class AbstractGeneralDataRegionTest extends AbstractRegionImplTe
|
|||
* CollectionRegionAccessStrategy API.
|
||||
*/
|
||||
public void testEvictAll() throws Exception {
|
||||
evictOrRemoveAllTest( "entity" );
|
||||
}
|
||||
|
||||
private void evictOrRemoveAllTest(String configName) throws Exception {
|
||||
withSessionFactoriesAndRegions(2, (sessionFactories, regions) -> {
|
||||
GeneralDataRegion localRegion = regions.get(0);
|
||||
GeneralDataRegion remoteRegion = regions.get(1);
|
||||
|
@ -182,19 +201,11 @@ public abstract class AbstractGeneralDataRegionTest extends AbstractRegionImplTe
|
|||
localRegion.put(localSession, KEY, VALUE1);
|
||||
assertEquals( VALUE1, localRegion.get(null, KEY ) );
|
||||
|
||||
// Allow async propagation
|
||||
sleep( 250 );
|
||||
|
||||
remoteRegion.put(remoteSession, KEY, VALUE1);
|
||||
assertEquals( VALUE1, remoteRegion.get(null, KEY ) );
|
||||
|
||||
// Allow async propagation
|
||||
sleep( 250 );
|
||||
|
||||
localRegion.evictAll();
|
||||
|
||||
// allow async propagation
|
||||
sleep( 250 );
|
||||
// This should re-establish the region root node in the optimistic case
|
||||
assertNull( localRegion.get(null, KEY ) );
|
||||
localKeys = localCache.keySet();
|
||||
|
@ -210,8 +221,8 @@ public abstract class AbstractGeneralDataRegionTest extends AbstractRegionImplTe
|
|||
assertEquals( "local is clean", null, localRegion.get(null, KEY ) );
|
||||
assertEquals( "remote is clean", null, remoteRegion.get(null, KEY ) );
|
||||
} finally {
|
||||
( (Session) localSession).close();
|
||||
( (Session) remoteSession).close();
|
||||
( (Session) localSession ).close();
|
||||
( (Session) remoteSession ).close();
|
||||
}
|
||||
|
||||
});
|
||||
|
|
|
@ -188,18 +188,6 @@ public abstract class AbstractNonFunctionalTest extends org.hibernate.testing.ju
|
|||
return testSupport;
|
||||
}
|
||||
|
||||
protected void sleep(long ms) {
|
||||
try {
|
||||
Thread.sleep(ms);
|
||||
} catch (InterruptedException e) {
|
||||
log.warn("Interrupted during sleep", e);
|
||||
}
|
||||
}
|
||||
|
||||
protected void avoidConcurrentFlush() {
|
||||
testSupport.avoidConcurrentFlush();
|
||||
}
|
||||
|
||||
protected StandardServiceRegistryBuilder createStandardServiceRegistryBuilder() {
|
||||
final StandardServiceRegistryBuilder ssrb = CacheTestUtil.buildBaselineStandardServiceRegistryBuilder(
|
||||
REGION_PREFIX, getRegionFactoryClass(), true, false, jtaPlatform);
|
||||
|
|
|
@ -12,7 +12,6 @@ import java.util.function.Predicate;
|
|||
|
||||
import javax.transaction.RollbackException;
|
||||
import javax.transaction.SystemException;
|
||||
import javax.transaction.TransactionManager;
|
||||
|
||||
import org.hibernate.Session;
|
||||
import org.hibernate.Transaction;
|
||||
|
@ -40,9 +39,11 @@ import org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLoca
|
|||
import org.hibernate.resource.transaction.backend.jdbc.spi.JdbcResourceTransactionAccess;
|
||||
import org.hibernate.resource.transaction.spi.TransactionCoordinatorOwner;
|
||||
import org.hibernate.service.ServiceRegistry;
|
||||
|
||||
import org.hibernate.testing.AfterClassOnce;
|
||||
import org.hibernate.testing.BeforeClassOnce;
|
||||
import org.hibernate.test.cache.infinispan.util.BatchModeJtaPlatform;
|
||||
import org.hibernate.test.cache.infinispan.util.BatchModeTransactionCoordinator;
|
||||
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
|
||||
import org.hibernate.test.cache.infinispan.util.ExpectingInterceptor;
|
||||
import org.hibernate.test.cache.infinispan.util.JdbcResourceTransactionMock;
|
||||
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
|
||||
|
@ -54,9 +55,9 @@ import org.hibernate.test.cache.infinispan.util.TestTimeService;
|
|||
import org.infinispan.commands.write.InvalidateCommand;
|
||||
import org.infinispan.AdvancedCache;
|
||||
import org.infinispan.commands.write.PutKeyValueCommand;
|
||||
import org.infinispan.test.fwk.TestResourceTracker;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -75,9 +76,6 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
|
|||
extends AbstractNonFunctionalTest {
|
||||
protected final Logger log = Logger.getLogger(getClass());
|
||||
|
||||
@Rule
|
||||
public InfinispanTestingSetup infinispanTestIdentifier = new InfinispanTestingSetup();
|
||||
|
||||
public static final String REGION_NAME = "test/com.foo.test";
|
||||
public static final String KEY_BASE = "KEY";
|
||||
public static final String VALUE1 = "VALUE1";
|
||||
|
@ -110,23 +108,21 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
|
|||
return false;
|
||||
}
|
||||
|
||||
@Before
|
||||
@BeforeClassOnce
|
||||
public void prepareResources() throws Exception {
|
||||
TestResourceTracker.testStarted( getClass().getSimpleName() );
|
||||
// to mimic exactly the old code results, both environments here are exactly the same...
|
||||
StandardServiceRegistryBuilder ssrb = createStandardServiceRegistryBuilder();
|
||||
localEnvironment = new NodeEnvironment( ssrb );
|
||||
localEnvironment.prepare();
|
||||
|
||||
localRegion = getRegion(localEnvironment);
|
||||
localAccessStrategy = getAccessStrategy(localRegion);
|
||||
localRegion = getRegion( localEnvironment );
|
||||
localAccessStrategy = getAccessStrategy( localRegion );
|
||||
|
||||
transactional = Caches.isTransactionalCache(localRegion.getCache());
|
||||
invalidation = Caches.isInvalidationCache(localRegion.getCache());
|
||||
transactional = Caches.isTransactionalCache( localRegion.getCache() );
|
||||
invalidation = Caches.isInvalidationCache( localRegion.getCache() );
|
||||
synchronous = Caches.isSynchronousCache(localRegion.getCache());
|
||||
|
||||
// Sleep a bit to avoid concurrent FLUSH problem
|
||||
avoidConcurrentFlush();
|
||||
|
||||
remoteEnvironment = new NodeEnvironment( ssrb );
|
||||
remoteEnvironment.prepare();
|
||||
|
||||
|
@ -136,6 +132,29 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
|
|||
waitForClusterToForm(localRegion.getCache(), remoteRegion.getCache());
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup() {
|
||||
cleanup.forEach(Runnable::run);
|
||||
cleanup.clear();
|
||||
if (localRegion != null) localRegion.getCache().clear();
|
||||
if (remoteRegion != null) remoteRegion.getCache().clear();
|
||||
}
|
||||
|
||||
@AfterClassOnce
|
||||
public void releaseResources() throws Exception {
|
||||
try {
|
||||
if (localEnvironment != null) {
|
||||
localEnvironment.release();
|
||||
}
|
||||
}
|
||||
finally {
|
||||
if (remoteEnvironment != null) {
|
||||
remoteEnvironment.release();
|
||||
}
|
||||
}
|
||||
TestResourceTracker.testFinished(getClass().getSimpleName());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected StandardServiceRegistryBuilder createStandardServiceRegistryBuilder() {
|
||||
StandardServiceRegistryBuilder ssrb = super.createStandardServiceRegistryBuilder();
|
||||
|
@ -224,7 +243,7 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
|
|||
node1.setDaemon(true);
|
||||
node2.setDaemon( true );
|
||||
|
||||
CountDownLatch remoteUpdate = setupExpectAfterUpdate();
|
||||
CountDownLatch remoteUpdate = expectAfterUpdate();
|
||||
|
||||
node1.start();
|
||||
node2.start();
|
||||
|
@ -248,11 +267,11 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
|
|||
}
|
||||
}
|
||||
|
||||
protected CountDownLatch setupExpectAfterUpdate() {
|
||||
return setupExpectPutWithValue(value -> value instanceof FutureUpdate);
|
||||
protected CountDownLatch expectAfterUpdate() {
|
||||
return expectPutWithValue( value -> value instanceof FutureUpdate );
|
||||
}
|
||||
|
||||
protected CountDownLatch setupExpectPutWithValue(Predicate<Object> valuePredicate) {
|
||||
protected CountDownLatch expectPutWithValue(Predicate<Object> valuePredicate) {
|
||||
if (!isUsingInvalidation() && accessType != AccessType.NONSTRICT_READ_WRITE) {
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
ExpectingInterceptor.get(remoteRegion.getCache())
|
||||
|
@ -265,8 +284,8 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
|
|||
}
|
||||
}
|
||||
|
||||
protected CountDownLatch setupExpectPutFromLoad() {
|
||||
return setupExpectPutWithValue(value -> value instanceof TombstoneUpdate);
|
||||
protected CountDownLatch expectPutFromLoad() {
|
||||
return expectPutWithValue(value -> value instanceof TombstoneUpdate);
|
||||
}
|
||||
|
||||
protected abstract void doUpdate(S strategy, SessionImplementor session, Object key, Object value, Object version) throws RollbackException, SystemException;
|
||||
|
@ -342,27 +361,6 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
|
|||
TestingUtil.blockUntilViewsReceived(10000, Arrays.asList(caches));
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanup() throws Exception {
|
||||
for (Runnable runnable : cleanup) {
|
||||
runnable.run();
|
||||
}
|
||||
cleanup.clear();
|
||||
if (localRegion != null) localRegion.getCache().clear();
|
||||
if (remoteRegion != null) remoteRegion.getCache().clear();
|
||||
|
||||
try {
|
||||
if (localEnvironment != null) {
|
||||
localEnvironment.release();
|
||||
}
|
||||
}
|
||||
finally {
|
||||
if (remoteEnvironment != null) {
|
||||
remoteEnvironment.release();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected boolean isTransactional() {
|
||||
return transactional;
|
||||
}
|
||||
|
@ -391,7 +389,7 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
|
|||
SessionImplementor s3 = mockedSession();
|
||||
localAccessStrategy.putFromLoad(s3, KEY, VALUE1, s3.getTimestamp(), 1);
|
||||
SessionImplementor s5 = mockedSession();
|
||||
remoteAccessStrategy.putFromLoad(s5, KEY, VALUE1, s5.getTimestamp(), new Integer(1));
|
||||
remoteAccessStrategy.putFromLoad(s5, KEY, VALUE1, s5.getTimestamp(), 1);
|
||||
|
||||
// putFromLoad is applied on local node synchronously, but if there's a concurrent update
|
||||
// from the other node it can silently fail when acquiring the loc . Then we could try to read
|
||||
|
@ -410,7 +408,7 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
|
|||
localAccessStrategy.evict(KEY);
|
||||
}
|
||||
else {
|
||||
doRemove(localRegion.getTransactionManager(), localAccessStrategy, session, KEY);
|
||||
doRemove(localAccessStrategy, session, KEY);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
|
@ -423,7 +421,7 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
|
|||
assertEquals(0, remoteRegion.getCache().size());
|
||||
}
|
||||
|
||||
protected void doRemove(TransactionManager tm, S strategy, SessionImplementor session, Object key) throws SystemException, RollbackException {
|
||||
protected void doRemove(S strategy, SessionImplementor session, Object key) throws SystemException, RollbackException {
|
||||
SoftLock softLock = strategy.lockItem(session, key, null);
|
||||
strategy.remove(session, key);
|
||||
session.getTransactionCoordinator().getLocalSynchronizations().registerSynchronization(
|
||||
|
@ -493,7 +491,7 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
|
|||
if (invalidation && !evict) {
|
||||
// removeAll causes transactional remove commands which trigger EndInvalidationCommands on the remote side
|
||||
// if the cache is non-transactional, PutFromLoadValidator.registerRemoteInvalidations cannot find
|
||||
// current session nor register tx synchronization, so it falls back to simpe InvalidationCommand.
|
||||
// current session nor register tx synchronization, so it falls back to simple InvalidationCommand.
|
||||
endInvalidationLatch = new CountDownLatch(1);
|
||||
if (transactional) {
|
||||
PutFromLoadValidator originalValidator = PutFromLoadValidator.removeFromCache(remoteRegion.getCache());
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
*/
|
||||
package org.hibernate.test.cache.infinispan;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.util.Properties;
|
||||
import java.util.function.BiConsumer;
|
||||
|
||||
|
@ -30,6 +31,9 @@ import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
|
|||
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
|
||||
import org.hibernate.testing.ServiceRegistryBuilder;
|
||||
import org.infinispan.configuration.cache.ClusteringConfigurationBuilder;
|
||||
import org.infinispan.commons.util.FileLookupFactory;
|
||||
import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
|
||||
import org.infinispan.configuration.parsing.ParserRegistry;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -37,7 +41,6 @@ import org.infinispan.AdvancedCache;
|
|||
import org.infinispan.configuration.cache.CacheMode;
|
||||
import org.infinispan.configuration.cache.Configuration;
|
||||
import org.infinispan.configuration.cache.ConfigurationBuilder;
|
||||
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
|
||||
import org.infinispan.eviction.EvictionStrategy;
|
||||
import org.infinispan.manager.DefaultCacheManager;
|
||||
import org.infinispan.manager.EmbeddedCacheManager;
|
||||
|
@ -298,7 +301,9 @@ public class InfinispanRegionFactoryTestCase {
|
|||
public void testTimestampValidation() {
|
||||
final String timestamps = "org.hibernate.cache.spi.UpdateTimestampsCache";
|
||||
Properties p = createProperties();
|
||||
final DefaultCacheManager manager = new DefaultCacheManager(GlobalConfigurationBuilder.defaultClusteredBuilder().build());
|
||||
InputStream configStream = FileLookupFactory.newInstance().lookupFile(InfinispanRegionFactory.DEF_INFINISPAN_CONFIG_RESOURCE, getClass().getClassLoader());
|
||||
ConfigurationBuilderHolder cbh = new ParserRegistry().parse(configStream);
|
||||
DefaultCacheManager manager = new DefaultCacheManager(cbh, true);
|
||||
ConfigurationBuilder builder = new ConfigurationBuilder();
|
||||
builder.clustering().cacheMode(CacheMode.INVALIDATION_SYNC);
|
||||
manager.defineConfiguration( DEF_TIMESTAMPS_RESOURCE, builder.build() );
|
||||
|
|
|
@ -9,6 +9,8 @@ package org.hibernate.test.cache.infinispan.access;
|
|||
import javax.transaction.TransactionManager;
|
||||
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Callable;
|
||||
|
@ -25,22 +27,33 @@ import org.hibernate.cache.infinispan.InfinispanRegionFactory;
|
|||
import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
|
||||
import org.hibernate.cache.infinispan.util.InfinispanMessageLogger;
|
||||
import org.hibernate.engine.spi.SessionImplementor;
|
||||
|
||||
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
|
||||
import org.hibernate.test.cache.infinispan.util.TestTimeService;
|
||||
import org.hibernate.testing.AfterClassOnce;
|
||||
import org.hibernate.testing.BeforeClassOnce;
|
||||
import org.hibernate.testing.TestForIssue;
|
||||
import org.hibernate.test.cache.infinispan.functional.cluster.DualNodeJtaTransactionManagerImpl;
|
||||
import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
|
||||
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
|
||||
import org.hibernate.testing.TestForIssue;
|
||||
import org.hibernate.testing.junit4.CustomRunner;
|
||||
import org.infinispan.AdvancedCache;
|
||||
import org.infinispan.test.fwk.TestResourceTracker;
|
||||
import org.junit.After;
|
||||
import org.junit.Test;
|
||||
|
||||
import org.infinispan.configuration.cache.Configuration;
|
||||
import org.infinispan.configuration.cache.ConfigurationBuilder;
|
||||
import org.infinispan.manager.EmbeddedCacheManager;
|
||||
import org.infinispan.test.CacheManagerCallable;
|
||||
import org.infinispan.test.fwk.TestCacheManagerFactory;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
||||
import static org.infinispan.test.TestingUtil.withCacheManager;
|
||||
import static org.infinispan.test.Exceptions.expectException;
|
||||
import static org.infinispan.test.TestingUtil.withTx;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.junit.Assert.*;
|
||||
|
@ -52,95 +65,97 @@ import static org.junit.Assert.*;
|
|||
* @author Galder Zamarreño
|
||||
* @version $Revision: $
|
||||
*/
|
||||
@RunWith(CustomRunner.class)
|
||||
public class PutFromLoadValidatorUnitTest {
|
||||
|
||||
private static final InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(
|
||||
PutFromLoadValidatorUnitTest.class);
|
||||
|
||||
@Rule
|
||||
public InfinispanTestingSetup infinispanTestIdentifier = new InfinispanTestingSetup();
|
||||
private static final TestTimeService TIME_SERVICE = new TestTimeService();
|
||||
|
||||
private Object KEY1 = "KEY1";
|
||||
|
||||
private TransactionManager tm;
|
||||
private EmbeddedCacheManager cm;
|
||||
private AdvancedCache<Object, Object> cache;
|
||||
private List<Runnable> cleanup = new ArrayList<>();
|
||||
|
||||
@Before
|
||||
@BeforeClassOnce
|
||||
public void setUp() throws Exception {
|
||||
TestResourceTracker.testStarted(getClass().getSimpleName());
|
||||
tm = DualNodeJtaTransactionManagerImpl.getInstance("test");
|
||||
cm = TestCacheManagerFactory.createCacheManager(true);
|
||||
cache = cm.getCache().getAdvancedCache();
|
||||
}
|
||||
|
||||
@AfterClassOnce
|
||||
public void stop() {
|
||||
tm = null;
|
||||
cm.stop();
|
||||
TestResourceTracker.testFinished(getClass().getSimpleName());
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
tm = null;
|
||||
cleanup.forEach(Runnable::run);
|
||||
cleanup.clear();
|
||||
try {
|
||||
DualNodeJtaTransactionManagerImpl.cleanupTransactions();
|
||||
}
|
||||
finally {
|
||||
DualNodeJtaTransactionManagerImpl.cleanupTransactionManagers();
|
||||
}
|
||||
}
|
||||
|
||||
private static EmbeddedCacheManager createCacheManager() {
|
||||
EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createCacheManager(false);
|
||||
return cacheManager;
|
||||
cache.clear();
|
||||
cm.getCache(cache.getName() + "-" + InfinispanRegionFactory.DEF_PENDING_PUTS_RESOURCE).clear();
|
||||
}
|
||||
|
||||
private static InfinispanRegionFactory regionFactory(EmbeddedCacheManager cm) {
|
||||
InfinispanRegionFactory regionFactory = new InfinispanRegionFactory();
|
||||
Properties properties = new Properties();
|
||||
properties.put(TestInfinispanRegionFactory.TIME_SERVICE, TIME_SERVICE);
|
||||
InfinispanRegionFactory regionFactory = new TestInfinispanRegionFactory(properties);
|
||||
regionFactory.setCacheManager(cm);
|
||||
regionFactory.start(CacheTestUtil.sfOptionsForStart(), new Properties());
|
||||
regionFactory.start(CacheTestUtil.sfOptionsForStart(), properties);
|
||||
return regionFactory;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNakedPut() throws Exception {
|
||||
nakedPutTest(false);
|
||||
nakedPutTest( false );
|
||||
}
|
||||
@Test
|
||||
public void testNakedPutTransactional() throws Exception {
|
||||
nakedPutTest(true);
|
||||
nakedPutTest( true );
|
||||
}
|
||||
|
||||
private void nakedPutTest(final boolean transactional) throws Exception {
|
||||
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
||||
@Override
|
||||
public void call() {
|
||||
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), regionFactory(cm));
|
||||
exec(transactional, new NakedPut(testee, true));
|
||||
}
|
||||
});
|
||||
PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory(cm));
|
||||
exec( transactional, new NakedPut( testee, true ) );
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegisteredPut() throws Exception {
|
||||
registeredPutTest(false);
|
||||
registeredPutTest( false );
|
||||
}
|
||||
@Test
|
||||
public void testRegisteredPutTransactional() throws Exception {
|
||||
registeredPutTest(true);
|
||||
registeredPutTest( true );
|
||||
}
|
||||
|
||||
private void registeredPutTest(final boolean transactional) throws Exception {
|
||||
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
||||
@Override
|
||||
public void call() {
|
||||
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), regionFactory(cm));
|
||||
exec(transactional, new RegularPut(testee));
|
||||
}
|
||||
});
|
||||
PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory(cm));
|
||||
exec(transactional, new RegularPut(testee));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNakedPutAfterKeyRemoval() throws Exception {
|
||||
nakedPutAfterRemovalTest(false, false);
|
||||
nakedPutAfterRemovalTest( false, false );
|
||||
}
|
||||
@Test
|
||||
public void testNakedPutAfterKeyRemovalTransactional() throws Exception {
|
||||
nakedPutAfterRemovalTest(true, false);
|
||||
nakedPutAfterRemovalTest( true, false );
|
||||
}
|
||||
@Test
|
||||
public void testNakedPutAfterRegionRemoval() throws Exception {
|
||||
nakedPutAfterRemovalTest(false, true);
|
||||
nakedPutAfterRemovalTest( false, true );
|
||||
}
|
||||
@Test
|
||||
public void testNakedPutAfterRegionRemovalTransactional() throws Exception {
|
||||
|
@ -149,30 +164,24 @@ public class PutFromLoadValidatorUnitTest {
|
|||
|
||||
private void nakedPutAfterRemovalTest(final boolean transactional,
|
||||
final boolean removeRegion) throws Exception {
|
||||
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
||||
@Override
|
||||
public void call() {
|
||||
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), regionFactory(cm));
|
||||
Invalidation invalidation = new Invalidation(testee, removeRegion);
|
||||
// the naked put can succeed because it has txTimestamp after invalidation
|
||||
NakedPut nakedPut = new NakedPut(testee, true);
|
||||
exec(transactional, invalidation, nakedPut);
|
||||
}
|
||||
});
|
||||
|
||||
PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory(cm));
|
||||
Invalidation invalidation = new Invalidation(testee, removeRegion);
|
||||
// the naked put can succeed because it has txTimestamp after invalidation
|
||||
NakedPut nakedPut = new NakedPut(testee, true);
|
||||
exec( transactional, invalidation, nakedPut );
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRegisteredPutAfterKeyRemoval() throws Exception {
|
||||
registeredPutAfterRemovalTest(false, false);
|
||||
registeredPutAfterRemovalTest( false, false );
|
||||
}
|
||||
@Test
|
||||
public void testRegisteredPutAfterKeyRemovalTransactional() throws Exception {
|
||||
registeredPutAfterRemovalTest(true, false);
|
||||
registeredPutAfterRemovalTest( true, false );
|
||||
}
|
||||
@Test
|
||||
public void testRegisteredPutAfterRegionRemoval() throws Exception {
|
||||
registeredPutAfterRemovalTest(false, true);
|
||||
registeredPutAfterRemovalTest( false, true );
|
||||
}
|
||||
@Test
|
||||
public void testRegisteredPutAfterRegionRemovalTransactional() throws Exception {
|
||||
|
@ -181,28 +190,22 @@ public class PutFromLoadValidatorUnitTest {
|
|||
|
||||
private void registeredPutAfterRemovalTest(final boolean transactional,
|
||||
final boolean removeRegion) throws Exception {
|
||||
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
||||
@Override
|
||||
public void call() {
|
||||
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), regionFactory(cm));
|
||||
Invalidation invalidation = new Invalidation(testee, removeRegion);
|
||||
RegularPut regularPut = new RegularPut(testee);
|
||||
exec(transactional, invalidation, regularPut);
|
||||
}
|
||||
});
|
||||
|
||||
PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory(cm));
|
||||
Invalidation invalidation = new Invalidation(testee, removeRegion);
|
||||
RegularPut regularPut = new RegularPut(testee);
|
||||
exec( transactional, invalidation, regularPut );
|
||||
}
|
||||
@Test
|
||||
public void testRegisteredPutWithInterveningKeyRemoval() throws Exception {
|
||||
registeredPutWithInterveningRemovalTest(false, false);
|
||||
registeredPutWithInterveningRemovalTest( false, false );
|
||||
}
|
||||
@Test
|
||||
public void testRegisteredPutWithInterveningKeyRemovalTransactional() throws Exception {
|
||||
registeredPutWithInterveningRemovalTest(true, false);
|
||||
registeredPutWithInterveningRemovalTest( true, false );
|
||||
}
|
||||
@Test
|
||||
public void testRegisteredPutWithInterveningRegionRemoval() throws Exception {
|
||||
registeredPutWithInterveningRemovalTest(false, true);
|
||||
registeredPutWithInterveningRemovalTest( false, true );
|
||||
}
|
||||
@Test
|
||||
public void testRegisteredPutWithInterveningRegionRemovalTransactional() throws Exception {
|
||||
|
@ -212,121 +215,102 @@ public class PutFromLoadValidatorUnitTest {
|
|||
private void registeredPutWithInterveningRemovalTest(
|
||||
final boolean transactional, final boolean removeRegion)
|
||||
throws Exception {
|
||||
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
||||
@Override
|
||||
public void call() {
|
||||
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), regionFactory(cm));
|
||||
try {
|
||||
long txTimestamp = System.currentTimeMillis();
|
||||
if (transactional) {
|
||||
tm.begin();
|
||||
}
|
||||
SessionImplementor session1 = mock(SessionImplementor.class);
|
||||
SessionImplementor session2 = mock(SessionImplementor.class);
|
||||
testee.registerPendingPut(session1, KEY1, txTimestamp);
|
||||
if (removeRegion) {
|
||||
testee.beginInvalidatingRegion();
|
||||
} else {
|
||||
testee.beginInvalidatingKey(session2, KEY1);
|
||||
}
|
||||
PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory(cm));
|
||||
try {
|
||||
long txTimestamp = TIME_SERVICE.wallClockTime();
|
||||
if (transactional) {
|
||||
tm.begin();
|
||||
}
|
||||
SessionImplementor session1 = mock(SessionImplementor.class);
|
||||
SessionImplementor session2 = mock(SessionImplementor.class);
|
||||
testee.registerPendingPut(session1, KEY1, txTimestamp);
|
||||
if (removeRegion) {
|
||||
testee.beginInvalidatingRegion();
|
||||
} else {
|
||||
testee.beginInvalidatingKey(session2, KEY1);
|
||||
}
|
||||
|
||||
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(session1, KEY1, txTimestamp);
|
||||
try {
|
||||
assertNull(lock);
|
||||
}
|
||||
finally {
|
||||
if (lock != null) {
|
||||
testee.releasePutFromLoadLock(KEY1, lock);
|
||||
}
|
||||
if (removeRegion) {
|
||||
testee.endInvalidatingRegion();
|
||||
} else {
|
||||
testee.endInvalidatingKey(session2, KEY1);
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(session1, KEY1, txTimestamp);
|
||||
try {
|
||||
assertNull(lock);
|
||||
}
|
||||
finally {
|
||||
if (lock != null) {
|
||||
testee.releasePutFromLoadLock(KEY1, lock);
|
||||
}
|
||||
if (removeRegion) {
|
||||
testee.endInvalidatingRegion();
|
||||
} else {
|
||||
testee.endInvalidatingKey(session2, KEY1);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleRegistrations() throws Exception {
|
||||
multipleRegistrationtest(false);
|
||||
multipleRegistrationtest( false );
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultipleRegistrationsTransactional() throws Exception {
|
||||
multipleRegistrationtest(true);
|
||||
multipleRegistrationtest( true );
|
||||
}
|
||||
|
||||
private void multipleRegistrationtest(final boolean transactional) throws Exception {
|
||||
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
||||
@Override
|
||||
public void call() {
|
||||
final PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), regionFactory(cm));
|
||||
final PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory(cm));
|
||||
|
||||
final CountDownLatch registeredLatch = new CountDownLatch(3);
|
||||
final CountDownLatch finishedLatch = new CountDownLatch(3);
|
||||
final AtomicInteger success = new AtomicInteger();
|
||||
final CountDownLatch registeredLatch = new CountDownLatch(3);
|
||||
final CountDownLatch finishedLatch = new CountDownLatch(3);
|
||||
final AtomicInteger success = new AtomicInteger();
|
||||
|
||||
Runnable r = new Runnable() {
|
||||
public void run() {
|
||||
try {
|
||||
long txTimestamp = System.currentTimeMillis();
|
||||
if (transactional) {
|
||||
tm.begin();
|
||||
}
|
||||
SessionImplementor session = mock (SessionImplementor.class);
|
||||
testee.registerPendingPut(session, KEY1, txTimestamp);
|
||||
registeredLatch.countDown();
|
||||
registeredLatch.await(5, TimeUnit.SECONDS);
|
||||
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(session, KEY1, txTimestamp);
|
||||
if (lock != null) {
|
||||
try {
|
||||
log.trace("Put from load lock acquired for key = " + KEY1);
|
||||
success.incrementAndGet();
|
||||
} finally {
|
||||
testee.releasePutFromLoadLock(KEY1, lock);
|
||||
}
|
||||
} else {
|
||||
log.trace("Unable to acquired putFromLoad lock for key = " + KEY1);
|
||||
}
|
||||
finishedLatch.countDown();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
Runnable r = () -> {
|
||||
try {
|
||||
long txTimestamp = TIME_SERVICE.wallClockTime();
|
||||
if (transactional) {
|
||||
tm.begin();
|
||||
}
|
||||
SessionImplementor session = mock (SessionImplementor.class);
|
||||
testee.registerPendingPut(session, KEY1, txTimestamp);
|
||||
registeredLatch.countDown();
|
||||
registeredLatch.await(5, TimeUnit.SECONDS);
|
||||
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(session, KEY1, txTimestamp);
|
||||
if (lock != null) {
|
||||
try {
|
||||
log.trace("Put from load lock acquired for key = " + KEY1);
|
||||
success.incrementAndGet();
|
||||
} finally {
|
||||
testee.releasePutFromLoadLock(KEY1, lock);
|
||||
}
|
||||
};
|
||||
|
||||
ExecutorService executor = Executors.newFixedThreadPool(3);
|
||||
|
||||
// Start with a removal so the "isPutValid" calls will fail if
|
||||
// any of the concurrent activity isn't handled properly
|
||||
|
||||
testee.beginInvalidatingRegion();
|
||||
testee.endInvalidatingRegion();
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
} else {
|
||||
log.trace("Unable to acquired putFromLoad lock for key = " + KEY1);
|
||||
}
|
||||
|
||||
// Do the registration + isPutValid calls
|
||||
executor.execute(r);
|
||||
executor.execute(r);
|
||||
executor.execute(r);
|
||||
|
||||
try {
|
||||
finishedLatch.await(5, TimeUnit.SECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
assertEquals("All threads succeeded", 3, success.get());
|
||||
finishedLatch.countDown();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
ExecutorService executor = Executors.newFixedThreadPool(3);
|
||||
cleanup.add(() -> executor.shutdownNow());
|
||||
|
||||
// Start with a removal so the "isPutValid" calls will fail if
|
||||
// any of the concurrent activity isn't handled properly
|
||||
|
||||
testee.beginInvalidatingRegion();
|
||||
testee.endInvalidatingRegion();
|
||||
TIME_SERVICE.advance(1);
|
||||
|
||||
// Do the registration + isPutValid calls
|
||||
executor.execute(r);
|
||||
executor.execute(r);
|
||||
executor.execute(r);
|
||||
|
||||
assertTrue(finishedLatch.await(5, TimeUnit.SECONDS));
|
||||
|
||||
assertEquals("All threads succeeded", 3, success.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -340,71 +324,55 @@ public class PutFromLoadValidatorUnitTest {
|
|||
}
|
||||
|
||||
private void invalidationBlocksForInProgressPutTest(final boolean keyOnly) throws Exception {
|
||||
withCacheManager(new CacheManagerCallable(createCacheManager()) {
|
||||
@Override
|
||||
public void call() {
|
||||
final PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), regionFactory(cm));
|
||||
final CountDownLatch removeLatch = new CountDownLatch(1);
|
||||
final CountDownLatch pferLatch = new CountDownLatch(1);
|
||||
final AtomicReference<Object> cache = new AtomicReference<Object>("INITIAL");
|
||||
|
||||
Callable<Boolean> pferCallable = new Callable<Boolean>() {
|
||||
public Boolean call() throws Exception {
|
||||
long txTimestamp = System.currentTimeMillis();
|
||||
SessionImplementor session = mock (SessionImplementor.class);
|
||||
testee.registerPendingPut(session, KEY1, txTimestamp);
|
||||
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(session, KEY1, txTimestamp);
|
||||
if (lock != null) {
|
||||
try {
|
||||
removeLatch.countDown();
|
||||
pferLatch.await();
|
||||
cache.set("PFER");
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
finally {
|
||||
testee.releasePutFromLoadLock(KEY1, lock);
|
||||
}
|
||||
}
|
||||
return Boolean.FALSE;
|
||||
}
|
||||
};
|
||||
|
||||
Callable<Void> invalidateCallable = new Callable<Void>() {
|
||||
public Void call() throws Exception {
|
||||
removeLatch.await();
|
||||
if (keyOnly) {
|
||||
SessionImplementor session = mock (SessionImplementor.class);
|
||||
testee.beginInvalidatingKey(session, KEY1);
|
||||
} else {
|
||||
testee.beginInvalidatingRegion();
|
||||
}
|
||||
cache.set(null);
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
ExecutorService executorService = Executors.newCachedThreadPool();
|
||||
Future<Boolean> pferFuture = executorService.submit(pferCallable);
|
||||
Future<Void> invalidateFuture = executorService.submit(invalidateCallable);
|
||||
final PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory(cm));
|
||||
final CountDownLatch removeLatch = new CountDownLatch(1);
|
||||
final CountDownLatch pferLatch = new CountDownLatch(1);
|
||||
final AtomicReference<Object> cache = new AtomicReference<>("INITIAL");
|
||||
|
||||
Callable<Boolean> pferCallable = () -> {
|
||||
long txTimestamp = TIME_SERVICE.wallClockTime();
|
||||
SessionImplementor session = mock (SessionImplementor.class);
|
||||
testee.registerPendingPut(session, KEY1, txTimestamp);
|
||||
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(session, KEY1, txTimestamp);
|
||||
if (lock != null) {
|
||||
try {
|
||||
try {
|
||||
invalidateFuture.get(1, TimeUnit.SECONDS);
|
||||
fail("invalidateFuture did not block");
|
||||
}
|
||||
catch (TimeoutException good) {}
|
||||
|
||||
pferLatch.countDown();
|
||||
|
||||
assertTrue(pferFuture.get(5, TimeUnit.SECONDS));
|
||||
invalidateFuture.get(5, TimeUnit.SECONDS);
|
||||
|
||||
assertNull(cache.get());
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
removeLatch.countDown();
|
||||
pferLatch.await();
|
||||
cache.set("PFER");
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
finally {
|
||||
testee.releasePutFromLoadLock(KEY1, lock);
|
||||
}
|
||||
}
|
||||
});
|
||||
return Boolean.FALSE;
|
||||
};
|
||||
|
||||
Callable<Void> invalidateCallable = () -> {
|
||||
removeLatch.await();
|
||||
if (keyOnly) {
|
||||
SessionImplementor session = mock (SessionImplementor.class);
|
||||
testee.beginInvalidatingKey(session, KEY1);
|
||||
} else {
|
||||
testee.beginInvalidatingRegion();
|
||||
}
|
||||
cache.set(null);
|
||||
return null;
|
||||
};
|
||||
|
||||
ExecutorService executor = Executors.newCachedThreadPool();
|
||||
cleanup.add(() -> executor.shutdownNow());
|
||||
Future<Boolean> pferFuture = executor.submit(pferCallable);
|
||||
Future<Void> invalidateFuture = executor.submit(invalidateCallable);
|
||||
|
||||
expectException(TimeoutException.class, () -> invalidateFuture.get(1, TimeUnit.SECONDS));
|
||||
|
||||
pferLatch.countDown();
|
||||
|
||||
assertTrue(pferFuture.get(5, TimeUnit.SECONDS));
|
||||
invalidateFuture.get(5, TimeUnit.SECONDS);
|
||||
|
||||
assertNull(cache.get());
|
||||
}
|
||||
|
||||
protected void exec(boolean transactional, Callable<?>... callables) {
|
||||
|
@ -449,7 +417,7 @@ public class PutFromLoadValidatorUnitTest {
|
|||
}
|
||||
// if we go for the timestamp-based approach, invalidation in the same millisecond
|
||||
// as the registerPendingPut/acquirePutFromLoad lock results in failure.
|
||||
Thread.sleep(10);
|
||||
TIME_SERVICE.advance(1);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
@ -464,7 +432,7 @@ public class PutFromLoadValidatorUnitTest {
|
|||
@Override
|
||||
public Void call() throws Exception {
|
||||
try {
|
||||
long txTimestamp = System.currentTimeMillis(); // this should be acquired before UserTransaction.begin()
|
||||
long txTimestamp = TIME_SERVICE.wallClockTime(); // this should be acquired before UserTransaction.begin()
|
||||
SessionImplementor session = mock (SessionImplementor.class);
|
||||
putFromLoadValidator.registerPendingPut(session, KEY1, txTimestamp);
|
||||
|
||||
|
@ -495,7 +463,7 @@ public class PutFromLoadValidatorUnitTest {
|
|||
@Override
|
||||
public Void call() throws Exception {
|
||||
try {
|
||||
long txTimestamp = System.currentTimeMillis(); // this should be acquired before UserTransaction.begin()
|
||||
long txTimestamp = TIME_SERVICE.wallClockTime(); // this should be acquired before UserTransaction.begin()
|
||||
SessionImplementor session = mock (SessionImplementor.class);
|
||||
PutFromLoadValidator.Lock lock = testee.acquirePutFromLoadLock(session, KEY1, txTimestamp);
|
||||
try {
|
||||
|
@ -520,59 +488,44 @@ public class PutFromLoadValidatorUnitTest {
|
|||
@Test
|
||||
@TestForIssue(jiraKey = "HHH-9928")
|
||||
public void testGetForNullReleasePuts() {
|
||||
EmbeddedCacheManager cm = createCacheManager();
|
||||
InfinispanRegionFactory tmp = new InfinispanRegionFactory();
|
||||
tmp.setCacheManager(cm);
|
||||
ConfigurationBuilder cb = new ConfigurationBuilder();
|
||||
cb.simpleCache(true).expiration().maxIdle(500);
|
||||
Configuration ppCfg = cb.build();
|
||||
cm.defineConfiguration(InfinispanRegionFactory.DEF_PENDING_PUTS_RESOURCE, cb.build());
|
||||
|
||||
InfinispanRegionFactory regionFactory = mock(InfinispanRegionFactory.class);
|
||||
doReturn(ppCfg).when(regionFactory).getPendingPutsCacheConfiguration();
|
||||
withCacheManager(new CacheManagerCallable(cm) {
|
||||
@Override
|
||||
public void call() {
|
||||
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), regionFactory, cm);
|
||||
long lastInsert = Long.MAX_VALUE;
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
lastInsert = System.currentTimeMillis();
|
||||
try {
|
||||
withTx(tm, new Callable<Object>() {
|
||||
@Override
|
||||
public Object call() throws Exception {
|
||||
SessionImplementor session = mock (SessionImplementor.class);
|
||||
testee.registerPendingPut(session, KEY1, 0);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
Thread.sleep(10);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
String ppName = cm.getCache().getName() + "-" + InfinispanRegionFactory.DEF_PENDING_PUTS_RESOURCE;
|
||||
Map ppCache = cm.getCache(ppName, false);
|
||||
assertNotNull(ppCache);
|
||||
Object pendingPutMap = ppCache.get(KEY1);
|
||||
long end = System.currentTimeMillis();
|
||||
if (end - lastInsert > 500) {
|
||||
log.warn("Test took too long");
|
||||
return;
|
||||
}
|
||||
assertNotNull(pendingPutMap);
|
||||
int size;
|
||||
try {
|
||||
Method sizeMethod = pendingPutMap.getClass().getMethod("size");
|
||||
sizeMethod.setAccessible(true);
|
||||
size = (Integer) sizeMethod.invoke(pendingPutMap);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
// some of the pending puts need to be expired by now
|
||||
assertTrue(size < 100);
|
||||
// but some are still registered
|
||||
assertTrue(size > 0);
|
||||
doAnswer(invocation -> TIME_SERVICE.wallClockTime()).when(regionFactory).nextTimestamp();
|
||||
|
||||
PutFromLoadValidator testee = new PutFromLoadValidator(cache, regionFactory, cm);
|
||||
|
||||
for (int i = 0; i < 100; ++i) {
|
||||
try {
|
||||
withTx(tm, () -> {
|
||||
SessionImplementor session = mock (SessionImplementor.class);
|
||||
testee.registerPendingPut(session, KEY1, 0);
|
||||
return null;
|
||||
});
|
||||
TIME_SERVICE.advance(10);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
String ppName = cm.getCache().getName() + "-" + InfinispanRegionFactory.DEF_PENDING_PUTS_RESOURCE;
|
||||
Map ppCache = cm.getCache(ppName, false);
|
||||
assertNotNull(ppCache);
|
||||
Object pendingPutMap = ppCache.get(KEY1);
|
||||
assertNotNull(pendingPutMap);
|
||||
int size;
|
||||
try {
|
||||
Method sizeMethod = pendingPutMap.getClass().getMethod("size");
|
||||
sizeMethod.setAccessible(true);
|
||||
size = (Integer) sizeMethod.invoke(pendingPutMap);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
// some of the pending puts need to be expired by now
|
||||
assertTrue(size < 100);
|
||||
// but some are still registered
|
||||
assertTrue(size > 0);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -95,41 +95,41 @@ public class CollectionRegionAccessStrategyTest extends
|
|||
}
|
||||
}).when(mockValidator).acquirePutFromLoadLock(any(), any(), anyLong());
|
||||
PutFromLoadValidator.addToCache(localRegion.getCache(), mockValidator);
|
||||
|
||||
try {
|
||||
final AccessDelegate delegate = localRegion.getCache().getCacheConfiguration().transaction().transactionMode().isTransactional() ?
|
||||
new TxInvalidationCacheAccessDelegate(localRegion, mockValidator) :
|
||||
new NonTxInvalidationCacheAccessDelegate(localRegion, mockValidator);
|
||||
|
||||
ExecutorService executorService = Executors.newCachedThreadPool();
|
||||
|
||||
final String KEY = "k1";
|
||||
Future<Void> pferFuture = executorService.submit(() -> {
|
||||
SessionImplementor session = mockedSession();
|
||||
delegate.putFromLoad(session, KEY, "v1", session.getTimestamp(), null);
|
||||
return null;
|
||||
});
|
||||
|
||||
Future<Void> removeFuture = executorService.submit(() -> {
|
||||
removeLatch.await();
|
||||
SessionImplementor session = mockedSession();
|
||||
withTx(localEnvironment, session, () -> {
|
||||
delegate.remove(session, KEY);
|
||||
return null;
|
||||
});
|
||||
pferLatch.countDown();
|
||||
return null;
|
||||
});
|
||||
|
||||
pferFuture.get();
|
||||
removeFuture.get();
|
||||
|
||||
assertFalse(localRegion.getCache().containsKey(KEY));
|
||||
assertFalse(remoteRegion.getCache().containsKey(KEY));
|
||||
} finally {
|
||||
cleanup.add(() -> {
|
||||
PutFromLoadValidator.removeFromCache(localRegion.getCache());
|
||||
PutFromLoadValidator.addToCache(localRegion.getCache(), originalValidator);
|
||||
}
|
||||
});
|
||||
|
||||
final AccessDelegate delegate = localRegion.getCache().getCacheConfiguration().transaction().transactionMode().isTransactional() ?
|
||||
new TxInvalidationCacheAccessDelegate(localRegion, mockValidator) :
|
||||
new NonTxInvalidationCacheAccessDelegate(localRegion, mockValidator);
|
||||
|
||||
ExecutorService executorService = Executors.newCachedThreadPool();
|
||||
cleanup.add(() -> executorService.shutdownNow());
|
||||
|
||||
final String KEY = "k1";
|
||||
Future<Void> pferFuture = executorService.submit(() -> {
|
||||
SessionImplementor session = mockedSession();
|
||||
delegate.putFromLoad(session, KEY, "v1", session.getTimestamp(), null);
|
||||
return null;
|
||||
});
|
||||
|
||||
Future<Void> removeFuture = executorService.submit(() -> {
|
||||
removeLatch.await();
|
||||
SessionImplementor session = mockedSession();
|
||||
withTx(localEnvironment, session, () -> {
|
||||
delegate.remove(session, KEY);
|
||||
return null;
|
||||
});
|
||||
pferLatch.countDown();
|
||||
return null;
|
||||
});
|
||||
|
||||
pferFuture.get();
|
||||
removeFuture.get();
|
||||
|
||||
assertFalse(localRegion.getCache().containsKey(KEY));
|
||||
assertFalse(remoteRegion.getCache().containsKey(KEY));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -46,7 +46,7 @@ public class CollectionRegionImplTest extends AbstractEntityCollectionRegionTest
|
|||
@Override
|
||||
protected void putInRegion(Region region, Object key, Object value) {
|
||||
CollectionRegionAccessStrategy strategy = ((CollectionRegion) region).buildAccessStrategy(AccessType.TRANSACTIONAL);
|
||||
strategy.putFromLoad(null, key, value, System.currentTimeMillis(), new Integer(1));
|
||||
strategy.putFromLoad(null, key, value, region.nextTimestamp(), new Integer(1));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -82,7 +82,7 @@ public class EntityRegionAccessStrategyTest extends
|
|||
final CountDownLatch commitLatch = new CountDownLatch(1);
|
||||
final CountDownLatch completionLatch = new CountDownLatch(2);
|
||||
|
||||
CountDownLatch asyncInsertLatch = setupExpectAfterUpdate();
|
||||
CountDownLatch asyncInsertLatch = expectAfterUpdate();
|
||||
|
||||
Thread inserter = new Thread(() -> {
|
||||
try {
|
||||
|
@ -154,7 +154,7 @@ public class EntityRegionAccessStrategyTest extends
|
|||
protected void putFromLoadTestReadOnly(boolean minimal) throws Exception {
|
||||
final Object KEY = TestingKeyFactory.generateEntityCacheKey( KEY_BASE + testCount++ );
|
||||
|
||||
CountDownLatch remotePutFromLoadLatch = setupExpectPutFromLoad();
|
||||
CountDownLatch remotePutFromLoadLatch = expectPutFromLoad();
|
||||
|
||||
SessionImplementor session = mockedSession();
|
||||
withTx(localEnvironment, session, () -> {
|
||||
|
@ -196,7 +196,7 @@ public class EntityRegionAccessStrategyTest extends
|
|||
remoteAccessStrategy.putFromLoad(s2, KEY, VALUE1, s2.getTimestamp(), 1);
|
||||
|
||||
// both nodes are updated, we don't have to wait for any async replication of putFromLoad
|
||||
CountDownLatch asyncUpdateLatch = setupExpectAfterUpdate();
|
||||
CountDownLatch asyncUpdateLatch = expectAfterUpdate();
|
||||
|
||||
final CountDownLatch readLatch = new CountDownLatch(1);
|
||||
final CountDownLatch commitLatch = new CountDownLatch(1);
|
||||
|
|
|
@ -9,10 +9,14 @@ package org.hibernate.test.cache.infinispan.functional;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
import org.hibernate.Session;
|
||||
import org.hibernate.boot.Metadata;
|
||||
import org.hibernate.boot.spi.MetadataImplementor;
|
||||
import org.hibernate.cache.infinispan.util.FutureUpdate;
|
||||
import org.hibernate.cache.infinispan.util.TombstoneUpdate;
|
||||
import org.hibernate.cache.internal.SimpleCacheKeysFactory;
|
||||
import org.hibernate.cache.spi.RegionFactory;
|
||||
import org.hibernate.cache.spi.access.AccessType;
|
||||
|
@ -29,16 +33,19 @@ import org.hibernate.resource.transaction.TransactionCoordinatorBuilder;
|
|||
import org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorBuilderImpl;
|
||||
import org.hibernate.resource.transaction.backend.jta.internal.JtaTransactionCoordinatorBuilderImpl;
|
||||
|
||||
import org.hibernate.test.cache.infinispan.util.ExpectingInterceptor;
|
||||
import org.hibernate.testing.BeforeClassOnce;
|
||||
import org.hibernate.testing.junit4.BaseNonConfigCoreFunctionalTestCase;
|
||||
import org.hibernate.testing.junit4.CustomParameterized;
|
||||
import org.hibernate.test.cache.infinispan.tm.JtaPlatformImpl;
|
||||
import org.hibernate.test.cache.infinispan.tm.XaConnectionProvider;
|
||||
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
|
||||
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
|
||||
import org.hibernate.test.cache.infinispan.util.TxUtil;
|
||||
import org.hibernate.testing.BeforeClassOnce;
|
||||
import org.hibernate.testing.junit4.BaseNonConfigCoreFunctionalTestCase;
|
||||
import org.hibernate.test.cache.infinispan.tm.JtaPlatformImpl;
|
||||
|
||||
import org.hibernate.testing.junit4.CustomParameterized;
|
||||
import org.infinispan.configuration.cache.CacheMode;
|
||||
import org.infinispan.AdvancedCache;
|
||||
import org.infinispan.commands.write.PutKeyValueCommand;
|
||||
import org.junit.After;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
@ -89,6 +96,7 @@ public abstract class AbstractFunctionalTest extends BaseNonConfigCoreFunctional
|
|||
public boolean addVersions;
|
||||
|
||||
protected boolean useJta;
|
||||
protected List<Runnable> cleanup = new ArrayList<>();
|
||||
|
||||
@CustomParameterized.Order(0)
|
||||
@Parameterized.Parameters(name = "{0}, {6}")
|
||||
|
@ -121,6 +129,12 @@ public abstract class AbstractFunctionalTest extends BaseNonConfigCoreFunctional
|
|||
useJta = jtaPlatformClass != null;
|
||||
}
|
||||
|
||||
@After
|
||||
public void runCleanup() {
|
||||
cleanup.forEach(Runnable::run);
|
||||
cleanup.clear();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String[] getMappings() {
|
||||
return new String[] {
|
||||
|
@ -198,4 +212,25 @@ public abstract class AbstractFunctionalTest extends BaseNonConfigCoreFunctional
|
|||
protected void markRollbackOnly(Session session) {
|
||||
TxUtil.markRollbackOnly(useJta, session);
|
||||
}
|
||||
|
||||
protected CountDownLatch expectAfterUpdate(AdvancedCache cache, int numUpdates) {
|
||||
return expectPutWithValue(cache, value -> value instanceof FutureUpdate, numUpdates);
|
||||
}
|
||||
|
||||
protected CountDownLatch expectEvict(AdvancedCache cache, int numUpdates) {
|
||||
return expectPutWithValue(cache, value -> value instanceof TombstoneUpdate && ((TombstoneUpdate) value).getValue() == null, numUpdates);
|
||||
}
|
||||
|
||||
protected CountDownLatch expectPutWithValue(AdvancedCache cache, Predicate<Object> valuePredicate, int numUpdates) {
|
||||
if (!cacheMode.isInvalidation() && accessType != AccessType.NONSTRICT_READ_WRITE) {
|
||||
CountDownLatch latch = new CountDownLatch(numUpdates);
|
||||
ExpectingInterceptor.get(cache)
|
||||
.when((ctx, cmd) -> cmd instanceof PutKeyValueCommand && valuePredicate.test(((PutKeyValueCommand) cmd).getValue()))
|
||||
.countDown(latch);
|
||||
cleanup.add(() -> ExpectingInterceptor.cleanup(cache));
|
||||
return latch;
|
||||
} else {
|
||||
return new CountDownLatch(0);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -111,9 +111,8 @@ public abstract class AbstractNonInvalidationTest extends SingleNodeTest {
|
|||
|
||||
@After
|
||||
public void cleanup() throws Exception {
|
||||
for (Runnable runnable : cleanup) {
|
||||
runnable.run();
|
||||
}
|
||||
cleanup.forEach(Runnable::run);
|
||||
cleanup.clear();
|
||||
withTxSession(s -> {
|
||||
s.createQuery("delete from Item").executeUpdate();
|
||||
});
|
||||
|
|
|
@ -11,6 +11,7 @@ import java.io.StringWriter;
|
|||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.Callable;
|
||||
|
@ -27,6 +28,9 @@ import org.hibernate.stat.SecondLevelCacheStatistics;
|
|||
|
||||
import org.hibernate.test.cache.infinispan.functional.entities.Contact;
|
||||
import org.hibernate.test.cache.infinispan.functional.entities.Customer;
|
||||
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
|
||||
import org.hibernate.test.cache.infinispan.util.TestTimeService;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -48,6 +52,7 @@ public class ConcurrentWriteTest extends SingleNodeTest {
|
|||
private static final int THINK_TIME_MILLIS = 10;
|
||||
private static final long LAUNCH_INTERVAL_MILLIS = 10;
|
||||
private static final Random random = new Random();
|
||||
private static final TestTimeService TIME_SERVICE = new TestTimeService();
|
||||
|
||||
/**
|
||||
* kill switch used to stop all users when one fails
|
||||
|
@ -70,6 +75,12 @@ public class ConcurrentWriteTest extends SingleNodeTest {
|
|||
TERMINATE_ALL_USERS = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void addSettings(Map settings) {
|
||||
super.addSettings(settings);
|
||||
settings.put(TestInfinispanRegionFactory.TIME_SERVICE, TIME_SERVICE);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void cleanupTest() throws Exception {
|
||||
try {
|
||||
|
@ -90,14 +101,14 @@ public class ConcurrentWriteTest extends SingleNodeTest {
|
|||
// setup
|
||||
sessionFactory().getStatistics().clear();
|
||||
// wait a while to make sure that timestamp comparison works after invalidateRegion
|
||||
Thread.sleep(1);
|
||||
TIME_SERVICE.advance(1);
|
||||
|
||||
Customer customer = createCustomer( 0 );
|
||||
final Integer customerId = customer.getId();
|
||||
getCustomerIDs().add( customerId );
|
||||
|
||||
// wait a while to make sure that timestamp comparison works after collection remove (during insert)
|
||||
Thread.sleep(1);
|
||||
TIME_SERVICE.advance(1);
|
||||
|
||||
assertNull( "contact exists despite not being added", getFirstContact( customerId ) );
|
||||
|
||||
|
@ -134,6 +145,8 @@ public class ConcurrentWriteTest extends SingleNodeTest {
|
|||
|
||||
}
|
||||
|
||||
// Ignoring the test as it's more of a stress-test: this should be enabled manually
|
||||
@Ignore
|
||||
@Test
|
||||
public void testManyUsers() throws Throwable {
|
||||
try {
|
||||
|
@ -153,7 +166,6 @@ public class ConcurrentWriteTest extends SingleNodeTest {
|
|||
futures.add( future );
|
||||
Thread.sleep( LAUNCH_INTERVAL_MILLIS ); // rampup
|
||||
}
|
||||
// barrier.await(); // wait for all threads to be ready
|
||||
barrier.await( 2, TimeUnit.MINUTES ); // wait for all threads to finish
|
||||
log.info( "All threads finished, let's shutdown the executor and check whether any exceptions were reported" );
|
||||
for ( Future<Void> future : futures ) {
|
||||
|
@ -367,7 +379,7 @@ public class ConcurrentWriteTest extends SingleNodeTest {
|
|||
thinkRandomTime();
|
||||
++completedIterations;
|
||||
if ( trace ) {
|
||||
log.tracef( "Iteration completed {0}", completedIterations );
|
||||
log.tracef( "Iteration completed %d", completedIterations );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,6 +13,8 @@ import org.hibernate.cache.infinispan.util.InfinispanMessageLogger;
|
|||
import org.hibernate.stat.SecondLevelCacheStatistics;
|
||||
import org.hibernate.stat.Statistics;
|
||||
import org.hibernate.test.cache.infinispan.functional.entities.Item;
|
||||
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
|
||||
import org.hibernate.test.cache.infinispan.util.TestTimeService;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
@ -25,7 +27,8 @@ import static org.junit.Assert.assertEquals;
|
|||
* @since 4.1
|
||||
*/
|
||||
public class ReadOnlyTest extends SingleNodeTest {
|
||||
static final InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(ReadOnlyTest.class);
|
||||
protected static final InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(ReadOnlyTest.class);
|
||||
protected static final TestTimeService TIME_SERVICE = new TestTimeService();
|
||||
|
||||
@Override
|
||||
public List<Object[]> getParameters() {
|
||||
|
@ -76,7 +79,7 @@ public class ReadOnlyTest extends SingleNodeTest {
|
|||
log.info("Entry persisted, let's load and delete it.");
|
||||
|
||||
cleanupCache();
|
||||
Thread.sleep(10);
|
||||
TIME_SERVICE.advance(1);
|
||||
|
||||
withTxSession(s -> {
|
||||
Item found = s.load(Item.class, item.getId());
|
||||
|
@ -88,4 +91,10 @@ public class ReadOnlyTest extends SingleNodeTest {
|
|||
s.delete(found);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void addSettings(Map settings) {
|
||||
super.addSettings(settings);
|
||||
settings.put(TestInfinispanRegionFactory.TIME_SERVICE, TIME_SERVICE);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -83,7 +83,7 @@ public class ReadWriteTest extends ReadOnlyTest {
|
|||
s.persist( another );
|
||||
});
|
||||
// The collection has been removed, but we can't add it again immediately using putFromLoad
|
||||
Thread.sleep(1);
|
||||
TIME_SERVICE.advance(1);
|
||||
|
||||
withTxSession(s -> {
|
||||
Item loaded = s.load( Item.class, item.getId() );
|
||||
|
@ -386,7 +386,7 @@ public class ReadWriteTest extends ReadOnlyTest {
|
|||
SecondLevelCacheStatistics slcs = stats.getSecondLevelCacheStatistics( Item.class.getName() );
|
||||
sessionFactory().getCache().evictEntityRegion( Item.class.getName() );
|
||||
|
||||
Thread.sleep(1);
|
||||
TIME_SERVICE.advance(1);
|
||||
|
||||
assertEquals(0, slcs.getPutCount());
|
||||
assertEquals( 0, slcs.getElementCountInMemory() );
|
||||
|
@ -436,8 +436,8 @@ public class ReadWriteTest extends ReadOnlyTest {
|
|||
|
||||
// Delay added to guarantee that query cache results won't be considered
|
||||
// as not up to date due to persist session and query results from first
|
||||
// query happening within same 100ms gap.
|
||||
Thread.sleep( 100 );
|
||||
// query happening simultaneously.
|
||||
TIME_SERVICE.advance(1);
|
||||
|
||||
withTxSession(s -> s.createQuery( "from Item" ).setCacheable( true ).list());
|
||||
|
||||
|
@ -459,8 +459,8 @@ public class ReadWriteTest extends ReadOnlyTest {
|
|||
|
||||
// Delay added to guarantee that query cache results won't be considered
|
||||
// as not up to date due to persist session and query results from first
|
||||
// query happening within same 100ms gap.
|
||||
Thread.sleep( 100 );
|
||||
// query happening simultaneously.
|
||||
TIME_SERVICE.advance(1);
|
||||
|
||||
withTxSession(s -> {
|
||||
s.createQuery("from Item").setCacheable(true).list();
|
||||
|
@ -476,8 +476,8 @@ public class ReadWriteTest extends ReadOnlyTest {
|
|||
saveSomeCitizens();
|
||||
|
||||
// Clear the cache before the transaction begins
|
||||
ReadWriteTest.this.cleanupCache();
|
||||
Thread.sleep(10);
|
||||
cleanupCache();
|
||||
TIME_SERVICE.advance(1);
|
||||
|
||||
withTxSession(s -> {
|
||||
State france = ReadWriteTest.this.getState(s, "Ile de France");
|
||||
|
@ -554,8 +554,8 @@ public class ReadWriteTest extends ReadOnlyTest {
|
|||
});
|
||||
|
||||
// TODO: Clear caches manually via cache manager (it's faster!!)
|
||||
this.cleanupCache();
|
||||
Thread.sleep(10);
|
||||
cleanupCache();
|
||||
TIME_SERVICE.advance(1);
|
||||
stats.setStatisticsEnabled( true );
|
||||
stats.clear();
|
||||
|
||||
|
@ -614,7 +614,7 @@ public class ReadWriteTest extends ReadOnlyTest {
|
|||
assertEquals(2, slcStats.getPutCount());
|
||||
|
||||
cache.evictEntityRegions();
|
||||
Thread.sleep(10);
|
||||
TIME_SERVICE.advance(1);
|
||||
|
||||
assertEquals(0, slcStats.getElementCountInMemory());
|
||||
assertFalse("2lc entity cache is expected to not contain Citizen id = " + citizens.get(0).getId(),
|
||||
|
|
|
@ -116,15 +116,6 @@ public abstract class DualNodeTest extends AbstractFunctionalTest {
|
|||
return JtaTransactionCoordinatorBuilderImpl.class;
|
||||
}
|
||||
|
||||
protected void sleep(long ms) {
|
||||
try {
|
||||
Thread.sleep( ms );
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
log.warn( "Interrupted during sleep", e );
|
||||
}
|
||||
}
|
||||
|
||||
protected void configureSecondNode(StandardServiceRegistryBuilder ssrb) {
|
||||
}
|
||||
|
||||
|
|
|
@ -63,7 +63,6 @@ import static org.mockito.Mockito.spy;
|
|||
public class EntityCollectionInvalidationTest extends DualNodeTest {
|
||||
private static final InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog( EntityCollectionInvalidationTest.class );
|
||||
|
||||
private static final long SLEEP_TIME = 50l;
|
||||
private static final Integer CUSTOMER_ID = new Integer( 1 );
|
||||
|
||||
private EmbeddedCacheManager localManager, remoteManager;
|
||||
|
@ -138,16 +137,10 @@ public class EntityCollectionInvalidationTest extends DualNodeTest {
|
|||
assertTrue( remoteListener.isEmpty() );
|
||||
assertTrue( localListener.isEmpty() );
|
||||
|
||||
// Sleep a bit to let async commit propagate. Really just to
|
||||
// help keep the logs organized for debugging any issues
|
||||
sleep( SLEEP_TIME );
|
||||
|
||||
log.debug( "Find node 0" );
|
||||
// This actually brings the collection into the cache
|
||||
getCustomer( ids.customerId, localFactory );
|
||||
|
||||
sleep( SLEEP_TIME );
|
||||
|
||||
// Now the collection is in the cache so, the 2nd "get"
|
||||
// should read everything from the cache
|
||||
log.debug( "Find(2) node 0" );
|
||||
|
@ -182,7 +175,6 @@ public class EntityCollectionInvalidationTest extends DualNodeTest {
|
|||
}
|
||||
|
||||
ids = modifyCustomer( ids.customerId, remoteFactory );
|
||||
sleep( 250 );
|
||||
assertLoadedFromCache( remoteListener, ids.customerId, ids.contactIds );
|
||||
|
||||
if (modifyLatch != null) {
|
||||
|
|
|
@ -8,6 +8,8 @@ package org.hibernate.test.cache.infinispan.functional.cluster;
|
|||
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.hibernate.Criteria;
|
||||
import org.hibernate.Session;
|
||||
|
@ -38,8 +40,6 @@ public class NaturalIdInvalidationTest extends DualNodeTest {
|
|||
|
||||
private static final InfinispanMessageLogger log = InfinispanMessageLogger.Provider.getLog(NaturalIdInvalidationTest.class);
|
||||
|
||||
private static final long SLEEP_TIME = 50l;
|
||||
|
||||
@Override
|
||||
public List<Object[]> getParameters() {
|
||||
return getParameters(true, true, true, true);
|
||||
|
@ -77,20 +77,18 @@ public class NaturalIdInvalidationTest extends DualNodeTest {
|
|||
assertTrue(remoteListener.isEmpty());
|
||||
assertTrue(localListener.isEmpty());
|
||||
|
||||
CountDownLatch remoteUpdateLatch = expectAfterUpdate(remoteNaturalIdCache.getAdvancedCache(), 2);
|
||||
saveSomeCitizens(localFactory);
|
||||
|
||||
assertTrue(remoteUpdateLatch.await(2, TimeUnit.SECONDS));
|
||||
|
||||
assertTrue(remoteListener.isEmpty());
|
||||
assertTrue(localListener.isEmpty());
|
||||
|
||||
// Sleep a bit to let async commit propagate. Really just to
|
||||
// help keep the logs organized for debugging any issues
|
||||
sleep( SLEEP_TIME );
|
||||
|
||||
log.debug("Find node 0");
|
||||
// This actually brings the collection into the cache
|
||||
getCitizenWithCriteria(localFactory);
|
||||
|
||||
sleep( SLEEP_TIME );
|
||||
// Now the collection is in the cache so, the 2nd "get"
|
||||
// should read everything from the cache
|
||||
log.debug( "Find(2) node 0" );
|
||||
|
@ -117,8 +115,9 @@ public class NaturalIdInvalidationTest extends DualNodeTest {
|
|||
|
||||
// Modify customer in remote
|
||||
remoteListener.clear();
|
||||
CountDownLatch localUpdate = expectEvict(localNaturalIdCache.getAdvancedCache(), 1);
|
||||
deleteCitizenWithCriteria(remoteFactory);
|
||||
sleep(250);
|
||||
assertTrue(localUpdate.await(2, TimeUnit.SECONDS));
|
||||
|
||||
Set localKeys = localNaturalIdCache.keySet();
|
||||
assertEquals(1, localKeys.size());
|
||||
|
@ -222,18 +221,6 @@ public class NaturalIdInvalidationTest extends DualNodeTest {
|
|||
log.debug( event.toString() );
|
||||
if ( !event.isPre() ) {
|
||||
visited.add(event.getKey().toString());
|
||||
// Integer primKey = (Integer) cacheKey.getKey();
|
||||
// String key = (String) cacheKey.getEntityOrRoleName() + '#' + primKey;
|
||||
// log.debug( "MyListener[" + name + "] - Visiting key " + key );
|
||||
// // String name = fqn.toString();
|
||||
// String token = ".functional.";
|
||||
// int index = key.indexOf( token );
|
||||
// if ( index > -1 ) {
|
||||
// index += token.length();
|
||||
// key = key.substring( index );
|
||||
// log.debug( "MyListener[" + name + "] - recording visit to " + key );
|
||||
// visited.add( key );
|
||||
// }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import java.util.NavigableMap;
|
|||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.BlockingDeque;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ConcurrentSkipListMap;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
@ -37,6 +38,7 @@ import java.util.concurrent.Executors;
|
|||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.ForkJoinTask;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -159,7 +161,7 @@ public abstract class CorrectnessTestCase {
|
|||
return new HashMap<>();
|
||||
}
|
||||
};
|
||||
private List<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
|
||||
private BlockingDeque<Exception> exceptions = new LinkedBlockingDeque<>();
|
||||
|
||||
public String getDbName() {
|
||||
return getClass().getName().replaceAll("\\W", "_");
|
||||
|
@ -407,13 +409,8 @@ public abstract class CorrectnessTestCase {
|
|||
}));
|
||||
}
|
||||
}
|
||||
long testEnd = System.currentTimeMillis() + EXECUTION_TIME;
|
||||
while (System.currentTimeMillis() < testEnd) {
|
||||
if (!exceptions.isEmpty()) {
|
||||
break;
|
||||
}
|
||||
Thread.sleep(1000);
|
||||
}
|
||||
Exception failure = exceptions.poll(EXECUTION_TIME, TimeUnit.SECONDS);
|
||||
if (failure != null) exceptions.addFirst(failure);
|
||||
running = false;
|
||||
exec.shutdown();
|
||||
if (!exec.awaitTermination(1000, TimeUnit.SECONDS)) throw new IllegalStateException();
|
||||
|
|
|
@ -73,15 +73,11 @@ public class TimestampsRegionImplTest extends AbstractGeneralDataRegionTest {
|
|||
registry,
|
||||
getCacheTestSupport()
|
||||
);
|
||||
// Sleep a bit to avoid concurrent FLUSH problem
|
||||
avoidConcurrentFlush();
|
||||
|
||||
InfinispanRegionFactory regionFactory2 = CacheTestUtil.startRegionFactory(
|
||||
registry2,
|
||||
getCacheTestSupport()
|
||||
);
|
||||
// Sleep a bit to avoid concurrent FLUSH problem
|
||||
avoidConcurrentFlush();
|
||||
|
||||
TimestampsRegionImpl region = (TimestampsRegionImpl) regionFactory.buildTimestampsRegion(
|
||||
getStandardRegionName(REGION_PREFIX),
|
||||
|
|
|
@ -65,21 +65,6 @@ public class CacheTestSupport {
|
|||
throwStoredException();
|
||||
}
|
||||
|
||||
public void avoidConcurrentFlush() {
|
||||
// JG 2.6.1 has a problem where calling flush more than once too quickly
|
||||
// can result in several second delays
|
||||
sleep( 100 );
|
||||
}
|
||||
|
||||
private void sleep(long ms) {
|
||||
try {
|
||||
Thread.sleep(ms);
|
||||
}
|
||||
catch (InterruptedException e) {
|
||||
log.warn("Interrupted during sleep", e);
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanUp() {
|
||||
for (Iterator it = factories.iterator(); it.hasNext(); ) {
|
||||
try {
|
||||
|
@ -105,7 +90,6 @@ public class CacheTestSupport {
|
|||
finally {
|
||||
it.remove();
|
||||
}
|
||||
avoidConcurrentFlush();
|
||||
}
|
||||
caches.clear();
|
||||
}
|
||||
|
|
|
@ -172,39 +172,6 @@ public class CacheTestUtil {
|
|||
return properties;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes {@link #assertEqualsEventually(Object, Callable, long, TimeUnit)} without time limit.
|
||||
* @param expected
|
||||
* @param callable
|
||||
* @param <T>
|
||||
*/
|
||||
public static <T> void assertEqualsEventually(T expected, Callable<T> callable) throws Exception {
|
||||
assertEqualsEventually(expected, callable, -1, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Periodically calls callable and compares returned value with expected value. If the value matches to expected,
|
||||
* the method returns. If callable throws an exception, this is propagated. If the returned value does not match to
|
||||
* expected before timeout, {@link TimeoutException} is thrown.
|
||||
* @param expected
|
||||
* @param callable
|
||||
* @param timeout If non-positive, there is no limit.
|
||||
* @param timeUnit
|
||||
* @param <T>
|
||||
*/
|
||||
public static <T> void assertEqualsEventually(T expected, Callable<T> callable, long timeout, TimeUnit timeUnit) throws Exception {
|
||||
long now, deadline = timeout <= 0 ? Long.MAX_VALUE : System.currentTimeMillis() + timeUnit.toMillis(timeout);
|
||||
for (;;) {
|
||||
T value = callable.call();
|
||||
if (EqualsHelper.equals(value, expected)) return;
|
||||
now = System.currentTimeMillis();
|
||||
if (now < deadline) {
|
||||
Thread.sleep(Math.min(100, deadline - now));
|
||||
} else break;
|
||||
}
|
||||
throw new TimeoutException();
|
||||
}
|
||||
|
||||
public static SessionFactoryOptions sfOptionsForStart() {
|
||||
return new SessionFactoryOptionsImpl(
|
||||
new SessionFactoryBuilderImpl.SessionFactoryOptionsStateStandardImpl(
|
||||
|
|
|
@ -8,12 +8,14 @@ import org.infinispan.interceptors.base.BaseCustomInterceptor;
|
|||
import org.infinispan.util.logging.Log;
|
||||
import org.infinispan.util.logging.LogFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.function.BiPredicate;
|
||||
import java.util.function.BooleanSupplier;
|
||||
|
||||
|
||||
public class ExpectingInterceptor extends BaseCustomInterceptor {
|
||||
|
@ -56,23 +58,36 @@ public class ExpectingInterceptor extends BaseCustomInterceptor {
|
|||
succeeded = true;
|
||||
return retval;
|
||||
} finally {
|
||||
log.tracef("After command %s", command);
|
||||
log.tracef("After command(successful=%s) %s", succeeded, command);
|
||||
List<Runnable> toExecute = new ArrayList<>();
|
||||
synchronized (this) {
|
||||
for (Iterator<Condition> iterator = conditions.iterator(); iterator.hasNext(); ) {
|
||||
Condition condition = iterator.next();
|
||||
log.tracef("Testing condition %s", condition);
|
||||
if ((condition.success == null || condition.success == succeeded) && condition.predicate.test(ctx, command)) {
|
||||
assert condition.action != null;
|
||||
condition.action.run();
|
||||
iterator.remove();
|
||||
log.trace("Condition succeeded");
|
||||
toExecute.add(condition.action);
|
||||
if (condition.removeCheck == null || condition.removeCheck.getAsBoolean()) {
|
||||
iterator.remove();
|
||||
}
|
||||
} else {
|
||||
log.trace("Condition test failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
// execute without holding the lock
|
||||
for (Runnable runnable : toExecute) {
|
||||
log.tracef("Executing %s", runnable);
|
||||
runnable.run();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public class Condition {
|
||||
private final BiPredicate<InvocationContext, VisitableCommand> predicate;
|
||||
private final Boolean success;
|
||||
private BooleanSupplier removeCheck;
|
||||
private Runnable action;
|
||||
|
||||
public Condition(BiPredicate<InvocationContext, VisitableCommand> predicate, Boolean success) {
|
||||
|
@ -80,20 +95,36 @@ public class ExpectingInterceptor extends BaseCustomInterceptor {
|
|||
this.success = success;
|
||||
}
|
||||
|
||||
public void run(Runnable action) {
|
||||
public Condition run(Runnable action) {
|
||||
assert this.action == null;
|
||||
this.action = action;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void countDown(CountDownLatch latch) {
|
||||
assert action == null;
|
||||
action = () -> latch.countDown();
|
||||
public Condition countDown(CountDownLatch latch) {
|
||||
return run(() -> latch.countDown()).removeWhen(() -> latch.getCount() == 0);
|
||||
}
|
||||
|
||||
public Condition removeWhen(BooleanSupplier check) {
|
||||
assert this.removeCheck == null;
|
||||
this.removeCheck = check;
|
||||
return this;
|
||||
}
|
||||
|
||||
public void cancel() {
|
||||
synchronized (ExpectingInterceptor.class) {
|
||||
synchronized (ExpectingInterceptor.this) {
|
||||
conditions.remove(this);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
final StringBuilder sb = new StringBuilder("Condition{");
|
||||
sb.append("predicate=").append(predicate);
|
||||
sb.append(", success=").append(success);
|
||||
sb.append(", action=").append(action);
|
||||
sb.append('}');
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,8 +41,13 @@ public class TestInfinispanRegionFactory extends InfinispanRegionFactory {
|
|||
|
||||
@Override
|
||||
protected EmbeddedCacheManager createCacheManager(ConfigurationBuilderHolder holder) {
|
||||
// If the cache manager has been provided by calling setCacheManager, don't create a new one
|
||||
EmbeddedCacheManager cacheManager = getCacheManager();
|
||||
if (cacheManager != null) {
|
||||
return cacheManager;
|
||||
}
|
||||
amendConfiguration(holder);
|
||||
DefaultCacheManager cacheManager = new DefaultCacheManager(holder, true);
|
||||
cacheManager = new DefaultCacheManager(holder, true);
|
||||
if (timeService != null) {
|
||||
cacheManager.getGlobalComponentRegistry().registerComponent(timeService, TimeService.class);
|
||||
cacheManager.getGlobalComponentRegistry().rewire();
|
||||
|
|
Loading…
Reference in New Issue