HHH-7763 No need to clear caches when these are going to be stopped

* Make sure failures in stop won't affect other crucial stop operations
that could leave nodes unstopped.
This commit is contained in:
Galder Zamarreño 2012-11-07 16:53:01 +01:00
parent 4294e0faee
commit 49ea5d65df
2 changed files with 515 additions and 533 deletions

View File

@ -25,18 +25,15 @@ package org.hibernate.test.cache.infinispan;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable;
import org.hibernate.boot.registry.StandardServiceRegistryBuilder; import org.hibernate.boot.registry.StandardServiceRegistryBuilder;
import org.hibernate.cache.infinispan.InfinispanRegionFactory; import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.hibernate.cache.infinispan.collection.CollectionRegionImpl; import org.hibernate.cache.infinispan.collection.CollectionRegionImpl;
import org.hibernate.cache.infinispan.entity.EntityRegionImpl; import org.hibernate.cache.infinispan.entity.EntityRegionImpl;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.spi.CacheDataDescription; import org.hibernate.cache.spi.CacheDataDescription;
import org.hibernate.cfg.Configuration; import org.hibernate.cfg.Configuration;
import org.hibernate.boot.registry.internal.StandardServiceRegistryImpl; import org.hibernate.boot.registry.internal.StandardServiceRegistryImpl;
import org.hibernate.test.cache.infinispan.util.CacheTestUtil; import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
import org.infinispan.context.Flag;
/** /**
* Defines the environment for a node. * Defines the environment for a node.
@ -44,111 +41,113 @@ import org.infinispan.context.Flag;
* @author Steve Ebersole * @author Steve Ebersole
*/ */
public class NodeEnvironment { public class NodeEnvironment {
private final Configuration configuration;
private StandardServiceRegistryImpl serviceRegistry; private final Configuration configuration;
private InfinispanRegionFactory regionFactory;
private Map<String,EntityRegionImpl> entityRegionMap; private StandardServiceRegistryImpl serviceRegistry;
private Map<String,CollectionRegionImpl> collectionRegionMap; private InfinispanRegionFactory regionFactory;
public NodeEnvironment(Configuration configuration) { private Map<String, EntityRegionImpl> entityRegionMap;
this.configuration = configuration; private Map<String, CollectionRegionImpl> collectionRegionMap;
}
public Configuration getConfiguration() { public NodeEnvironment(Configuration configuration) {
return configuration; this.configuration = configuration;
} }
public StandardServiceRegistryImpl getServiceRegistry() { public Configuration getConfiguration() {
return serviceRegistry; return configuration;
} }
public EntityRegionImpl getEntityRegion(String name, CacheDataDescription cacheDataDescription) { public StandardServiceRegistryImpl getServiceRegistry() {
if ( entityRegionMap == null ) { return serviceRegistry;
entityRegionMap = new HashMap<String, EntityRegionImpl>(); }
return buildAndStoreEntityRegion( name, cacheDataDescription );
}
EntityRegionImpl region = entityRegionMap.get( name );
if ( region == null ) {
region = buildAndStoreEntityRegion( name, cacheDataDescription );
}
return region;
}
private EntityRegionImpl buildAndStoreEntityRegion(String name, CacheDataDescription cacheDataDescription) { public EntityRegionImpl getEntityRegion(String name, CacheDataDescription cacheDataDescription) {
EntityRegionImpl region = (EntityRegionImpl) regionFactory.buildEntityRegion( if (entityRegionMap == null) {
name, entityRegionMap = new HashMap<String, EntityRegionImpl>();
configuration.getProperties(), return buildAndStoreEntityRegion(name, cacheDataDescription);
cacheDataDescription }
); EntityRegionImpl region = entityRegionMap.get(name);
entityRegionMap.put( name, region ); if (region == null) {
return region; region = buildAndStoreEntityRegion(name, cacheDataDescription);
} }
return region;
}
public CollectionRegionImpl getCollectionRegion(String name, CacheDataDescription cacheDataDescription) { private EntityRegionImpl buildAndStoreEntityRegion(String name, CacheDataDescription cacheDataDescription) {
if ( collectionRegionMap == null ) { EntityRegionImpl region = (EntityRegionImpl) regionFactory.buildEntityRegion(
collectionRegionMap = new HashMap<String, CollectionRegionImpl>(); name,
return buildAndStoreCollectionRegion( name, cacheDataDescription ); configuration.getProperties(),
} cacheDataDescription
CollectionRegionImpl region = collectionRegionMap.get( name ); );
if ( region == null ) { entityRegionMap.put(name, region);
region = buildAndStoreCollectionRegion( name, cacheDataDescription ); return region;
collectionRegionMap.put( name, region ); }
}
return region;
}
private CollectionRegionImpl buildAndStoreCollectionRegion(String name, CacheDataDescription cacheDataDescription) { public CollectionRegionImpl getCollectionRegion(String name, CacheDataDescription cacheDataDescription) {
CollectionRegionImpl region; if (collectionRegionMap == null) {
region = (CollectionRegionImpl) regionFactory.buildCollectionRegion( collectionRegionMap = new HashMap<String, CollectionRegionImpl>();
name, return buildAndStoreCollectionRegion(name, cacheDataDescription);
configuration.getProperties(), }
cacheDataDescription CollectionRegionImpl region = collectionRegionMap.get(name);
); if (region == null) {
return region; region = buildAndStoreCollectionRegion(name, cacheDataDescription);
} collectionRegionMap.put(name, region);
}
return region;
}
public void prepare() throws Exception { private CollectionRegionImpl buildAndStoreCollectionRegion(String name, CacheDataDescription cacheDataDescription) {
serviceRegistry = (StandardServiceRegistryImpl) new StandardServiceRegistryBuilder() CollectionRegionImpl region;
.applySettings( configuration.getProperties() ) region = (CollectionRegionImpl) regionFactory.buildCollectionRegion(
.buildServiceRegistry(); name,
regionFactory = CacheTestUtil.startRegionFactory( serviceRegistry, configuration ); configuration.getProperties(),
} cacheDataDescription
);
return region;
}
public void release() throws Exception { public void prepare() throws Exception {
if ( entityRegionMap != null ) { serviceRegistry = (StandardServiceRegistryImpl) new StandardServiceRegistryBuilder()
for ( final EntityRegionImpl region : entityRegionMap.values() ) { .applySettings(configuration.getProperties())
Caches.withinTx(region.getTransactionManager(), new Callable<Void>() { .buildServiceRegistry();
@Override regionFactory = CacheTestUtil.startRegionFactory(serviceRegistry, configuration);
public Void call() throws Exception { }
region.getCache().withFlags(Flag.CACHE_MODE_LOCAL).clear();
return null; public void release() throws Exception {
try {
if (entityRegionMap != null) {
for (EntityRegionImpl region : entityRegionMap.values()) {
try {
region.getCache().stop();
} catch (Exception e) {
// Ignore...
} }
}); }
region.getCache().stop(); entityRegionMap.clear();
} }
entityRegionMap.clear(); if (collectionRegionMap != null) {
} for (CollectionRegionImpl reg : collectionRegionMap.values()) {
if ( collectionRegionMap != null ) { try {
for ( final CollectionRegionImpl collectionRegion : collectionRegionMap.values() ) { reg.getCache().stop();
Caches.withinTx(collectionRegion.getTransactionManager(), new Callable<Void>() { } catch (Exception e) {
@Override // Ignore...
public Void call() throws Exception {
collectionRegion.getCache().withFlags(Flag.CACHE_MODE_LOCAL).clear();
return null;
} }
}); }
collectionRegion.getCache().stop(); collectionRegionMap.clear();
} }
collectionRegionMap.clear(); } finally {
} try {
if ( regionFactory != null ) { if (regionFactory != null) {
// Currently the RegionFactory is shutdown by its registration with the CacheTestSetup from CacheTestUtil when built // Currently the RegionFactory is shutdown by its registration
regionFactory.stop(); // with the CacheTestSetup from CacheTestUtil when built
} regionFactory.stop();
if ( serviceRegistry != null ) { }
serviceRegistry.destroy(); } finally {
} if (serviceRegistry != null) {
} serviceRegistry.destroy();
}
}
}
}
} }

View File

@ -61,524 +61,508 @@ import static org.junit.Assert.assertTrue;
* @since 3.5 * @since 3.5
*/ */
public abstract class AbstractEntityRegionAccessStrategyTestCase extends AbstractNonFunctionalTestCase { public abstract class AbstractEntityRegionAccessStrategyTestCase extends AbstractNonFunctionalTestCase {
private static final Logger log = Logger.getLogger( AbstractEntityRegionAccessStrategyTestCase.class );
public static final String REGION_NAME = "test/com.foo.test"; private static final Logger log = Logger.getLogger(AbstractEntityRegionAccessStrategyTestCase.class);
public static final String KEY_BASE = "KEY";
public static final String VALUE1 = "VALUE1";
public static final String VALUE2 = "VALUE2";
protected static int testCount; public static final String REGION_NAME = "test/com.foo.test";
public static final String KEY_BASE = "KEY";
public static final String VALUE1 = "VALUE1";
public static final String VALUE2 = "VALUE2";
protected NodeEnvironment localEnvironment; protected static int testCount;
protected EntityRegionImpl localEntityRegion;
protected EntityRegionAccessStrategy localAccessStrategy;
protected NodeEnvironment remoteEnvironment; protected NodeEnvironment localEnvironment;
protected EntityRegionImpl remoteEntityRegion; protected EntityRegionImpl localEntityRegion;
protected EntityRegionAccessStrategy remoteAccessStrategy; protected EntityRegionAccessStrategy localAccessStrategy;
protected boolean invalidation; protected NodeEnvironment remoteEnvironment;
protected boolean synchronous; protected EntityRegionImpl remoteEntityRegion;
protected EntityRegionAccessStrategy remoteAccessStrategy;
protected Exception node1Exception; protected boolean invalidation;
protected Exception node2Exception; protected boolean synchronous;
protected AssertionFailedError node1Failure; protected Exception node1Exception;
protected AssertionFailedError node2Failure; protected Exception node2Exception;
@Before protected AssertionFailedError node1Failure;
public void prepareResources() throws Exception { protected AssertionFailedError node2Failure;
// to mimic exactly the old code results, both environments here are exactly the same...
Configuration cfg = createConfiguration( getConfigurationName() );
localEnvironment = new NodeEnvironment( cfg );
localEnvironment.prepare();
localEntityRegion = localEnvironment.getEntityRegion( REGION_NAME, getCacheDataDescription() ); @Before
localAccessStrategy = localEntityRegion.buildAccessStrategy( getAccessType() ); public void prepareResources() throws Exception {
// to mimic exactly the old code results, both environments here are exactly the same...
Configuration cfg = createConfiguration(getConfigurationName());
localEnvironment = new NodeEnvironment(cfg);
localEnvironment.prepare();
invalidation = Caches.isInvalidationCache(localEntityRegion.getCache()); localEntityRegion = localEnvironment.getEntityRegion(REGION_NAME, getCacheDataDescription());
synchronous = Caches.isSynchronousCache(localEntityRegion.getCache()); localAccessStrategy = localEntityRegion.buildAccessStrategy(getAccessType());
// Sleep a bit to avoid concurrent FLUSH problem invalidation = Caches.isInvalidationCache(localEntityRegion.getCache());
avoidConcurrentFlush(); synchronous = Caches.isSynchronousCache(localEntityRegion.getCache());
remoteEnvironment = new NodeEnvironment( cfg ); // Sleep a bit to avoid concurrent FLUSH problem
remoteEnvironment.prepare(); avoidConcurrentFlush();
remoteEntityRegion = remoteEnvironment.getEntityRegion( REGION_NAME, getCacheDataDescription() ); remoteEnvironment = new NodeEnvironment(cfg);
remoteAccessStrategy = remoteEntityRegion.buildAccessStrategy( getAccessType() ); remoteEnvironment.prepare();
remoteEntityRegion = remoteEnvironment.getEntityRegion(REGION_NAME, getCacheDataDescription());
remoteAccessStrategy = remoteEntityRegion.buildAccessStrategy(getAccessType());
waitForClusterToForm(localEntityRegion.getCache(), waitForClusterToForm(localEntityRegion.getCache(),
remoteEntityRegion.getCache()); remoteEntityRegion.getCache());
} }
protected void waitForClusterToForm(Cache... caches) { protected void waitForClusterToForm(Cache... caches) {
TestingUtil.blockUntilViewsReceived(10000, Arrays.asList(caches)); TestingUtil.blockUntilViewsReceived(10000, Arrays.asList(caches));
} }
protected abstract String getConfigurationName(); protected abstract String getConfigurationName();
protected static Configuration createConfiguration(String configName) { protected static Configuration createConfiguration(String configName) {
Configuration cfg = CacheTestUtil.buildConfiguration( Configuration cfg = CacheTestUtil.buildConfiguration(
REGION_PREFIX, REGION_PREFIX,
InfinispanRegionFactory.class, InfinispanRegionFactory.class,
true, true,
false false
); );
cfg.setProperty( InfinispanRegionFactory.ENTITY_CACHE_RESOURCE_PROP, configName ); cfg.setProperty(InfinispanRegionFactory.ENTITY_CACHE_RESOURCE_PROP, configName);
return cfg; return cfg;
} }
protected CacheDataDescription getCacheDataDescription() { protected CacheDataDescription getCacheDataDescription() {
return new CacheDataDescriptionImpl( true, true, ComparableComparator.INSTANCE ); return new CacheDataDescriptionImpl(true, true, ComparableComparator.INSTANCE);
} }
@After @After
public void releaseResources() throws Exception { public void releaseResources() throws Exception {
if ( localEnvironment != null ) { try {
localEnvironment.release(); if (localEnvironment != null) {
} localEnvironment.release();
if ( remoteEnvironment != null ) { }
remoteEnvironment.release(); } finally {
} if (remoteEnvironment != null) {
} remoteEnvironment.release();
}
}
}
protected abstract AccessType getAccessType(); protected abstract AccessType getAccessType();
protected boolean isUsingInvalidation() { protected boolean isUsingInvalidation() {
return invalidation; return invalidation;
} }
protected boolean isSynchronous() { protected boolean isSynchronous() {
return synchronous; return synchronous;
} }
protected void assertThreadsRanCleanly() { protected void assertThreadsRanCleanly() {
if ( node1Failure != null ) { if (node1Failure != null) {
throw node1Failure; throw node1Failure;
} }
if ( node2Failure != null ) { if (node2Failure != null) {
throw node2Failure; throw node2Failure;
} }
if ( node1Exception != null ) { if (node1Exception != null) {
log.error("node1 saw an exception", node1Exception); log.error("node1 saw an exception", node1Exception);
assertEquals( "node1 saw no exceptions", null, node1Exception ); assertEquals("node1 saw no exceptions", null, node1Exception);
} }
if ( node2Exception != null ) { if (node2Exception != null) {
log.error("node2 saw an exception", node2Exception); log.error("node2 saw an exception", node2Exception);
assertEquals( "node2 saw no exceptions", null, node2Exception ); assertEquals("node2 saw no exceptions", null, node2Exception);
} }
} }
@Test @Test
public abstract void testCacheConfiguration(); public abstract void testCacheConfiguration();
@Test @Test
public void testGetRegion() { public void testGetRegion() {
assertEquals( "Correct region", localEntityRegion, localAccessStrategy.getRegion() ); assertEquals("Correct region", localEntityRegion, localAccessStrategy.getRegion());
} }
@Test @Test
public void testPutFromLoad() throws Exception { public void testPutFromLoad() throws Exception {
putFromLoadTest( false ); putFromLoadTest(false);
} }
@Test @Test
public void testPutFromLoadMinimal() throws Exception { public void testPutFromLoadMinimal() throws Exception {
putFromLoadTest( true ); putFromLoadTest(true);
} }
/** /**
* Simulate 2 nodes, both start, tx do a get, experience a cache miss, then 'read from db.' First * Simulate 2 nodes, both start, tx do a get, experience a cache miss, then
* does a putFromLoad, then an update. Second tries to do a putFromLoad with stale data (i.e. it * 'read from db.' First does a putFromLoad, then an update. Second tries to
* took longer to read from the db). Both commit their tx. Then both start a new tx and get. * do a putFromLoad with stale data (i.e. it took longer to read from the db).
* First should see the updated data; second should either see the updated data (isInvalidation() * Both commit their tx. Then both start a new tx and get. First should see
* == false) or null (isInvalidation() == true). * the updated data; second should either see the updated data
* * (isInvalidation() == false) or null (isInvalidation() == true).
* @param useMinimalAPI *
* @throws Exception * @param useMinimalAPI
*/ * @throws Exception
private void putFromLoadTest(final boolean useMinimalAPI) throws Exception { */
private void putFromLoadTest(final boolean useMinimalAPI) throws Exception {
final String KEY = KEY_BASE + testCount++; final String KEY = KEY_BASE + testCount++;
final CountDownLatch writeLatch1 = new CountDownLatch( 1 ); final CountDownLatch writeLatch1 = new CountDownLatch(1);
final CountDownLatch writeLatch2 = new CountDownLatch( 1 ); final CountDownLatch writeLatch2 = new CountDownLatch(1);
final CountDownLatch completionLatch = new CountDownLatch( 2 ); final CountDownLatch completionLatch = new CountDownLatch(2);
Thread node1 = new Thread() { Thread node1 = new Thread() {
@Override @Override
public void run() { public void run() {
try { try {
long txTimestamp = System.currentTimeMillis(); long txTimestamp = System.currentTimeMillis();
BatchModeTransactionManager.getInstance().begin(); BatchModeTransactionManager.getInstance().begin();
assertNull( "node1 starts clean", localAccessStrategy.get( KEY, txTimestamp ) ); assertNull("node1 starts clean", localAccessStrategy.get(KEY, txTimestamp));
writeLatch1.await(); writeLatch1.await();
if ( useMinimalAPI ) { if (useMinimalAPI) {
localAccessStrategy.putFromLoad( KEY, VALUE1, txTimestamp, new Integer( 1 ), true ); localAccessStrategy.putFromLoad(KEY, VALUE1, txTimestamp, new Integer(1), true);
} } else {
else { localAccessStrategy.putFromLoad(KEY, VALUE1, txTimestamp, new Integer(1));
localAccessStrategy.putFromLoad( KEY, VALUE1, txTimestamp, new Integer( 1 ) ); }
}
localAccessStrategy.update( KEY, VALUE2, new Integer( 2 ), new Integer( 1 ) ); localAccessStrategy.update(KEY, VALUE2, new Integer(2), new Integer(1));
BatchModeTransactionManager.getInstance().commit(); BatchModeTransactionManager.getInstance().commit();
} } catch (Exception e) {
catch (Exception e) { log.error("node1 caught exception", e);
log.error("node1 caught exception", e); node1Exception = e;
node1Exception = e; rollback();
rollback(); } catch (AssertionFailedError e) {
} node1Failure = e;
catch (AssertionFailedError e) { rollback();
node1Failure = e; } finally {
rollback(); // Let node2 write
} writeLatch2.countDown();
finally { completionLatch.countDown();
// Let node2 write }
writeLatch2.countDown(); }
completionLatch.countDown(); };
}
}
};
Thread node2 = new Thread() { Thread node2 = new Thread() {
@Override @Override
public void run() { public void run() {
try { try {
long txTimestamp = System.currentTimeMillis(); long txTimestamp = System.currentTimeMillis();
BatchModeTransactionManager.getInstance().begin(); BatchModeTransactionManager.getInstance().begin();
assertNull( "node1 starts clean", remoteAccessStrategy.get( KEY, txTimestamp ) ); assertNull("node1 starts clean", remoteAccessStrategy.get(KEY, txTimestamp));
// Let node1 write // Let node1 write
writeLatch1.countDown(); writeLatch1.countDown();
// Wait for node1 to finish // Wait for node1 to finish
writeLatch2.await(); writeLatch2.await();
if ( useMinimalAPI ) { if (useMinimalAPI) {
remoteAccessStrategy.putFromLoad( KEY, VALUE1, txTimestamp, new Integer( 1 ), true ); remoteAccessStrategy.putFromLoad(KEY, VALUE1, txTimestamp, new Integer(1), true);
} } else {
else { remoteAccessStrategy.putFromLoad(KEY, VALUE1, txTimestamp, new Integer(1));
remoteAccessStrategy.putFromLoad( KEY, VALUE1, txTimestamp, new Integer( 1 ) ); }
}
BatchModeTransactionManager.getInstance().commit(); BatchModeTransactionManager.getInstance().commit();
} } catch (Exception e) {
catch (Exception e) { log.error("node2 caught exception", e);
log.error("node2 caught exception", e); node2Exception = e;
node2Exception = e; rollback();
rollback(); } catch (AssertionFailedError e) {
} node2Failure = e;
catch (AssertionFailedError e) { rollback();
node2Failure = e; } finally {
rollback(); completionLatch.countDown();
} }
finally { }
completionLatch.countDown(); };
}
}
};
node1.setDaemon( true ); node1.setDaemon(true);
node2.setDaemon( true ); node2.setDaemon(true);
node1.start(); node1.start();
node2.start(); node2.start();
assertTrue( "Threads completed", completionLatch.await( 2, TimeUnit.SECONDS ) ); assertTrue("Threads completed", completionLatch.await(2, TimeUnit.SECONDS));
assertThreadsRanCleanly(); assertThreadsRanCleanly();
long txTimestamp = System.currentTimeMillis(); long txTimestamp = System.currentTimeMillis();
assertEquals( "Correct node1 value", VALUE2, localAccessStrategy.get( KEY, txTimestamp ) ); assertEquals("Correct node1 value", VALUE2, localAccessStrategy.get(KEY, txTimestamp));
if ( isUsingInvalidation() ) { if (isUsingInvalidation()) {
// no data version to prevent the PFER; we count on db locks preventing this // no data version to prevent the PFER; we count on db locks preventing this
assertEquals( "Expected node2 value", VALUE1, remoteAccessStrategy.get( KEY, txTimestamp ) ); assertEquals("Expected node2 value", VALUE1, remoteAccessStrategy.get(KEY, txTimestamp));
} } else {
else { // The node1 update is replicated, preventing the node2 PFER
// The node1 update is replicated, preventing the node2 PFER assertEquals("Correct node2 value", VALUE2, remoteAccessStrategy.get(KEY, txTimestamp));
assertEquals( "Correct node2 value", VALUE2, remoteAccessStrategy.get( KEY, txTimestamp ) ); }
} }
}
@Test @Test
public void testInsert() throws Exception { public void testInsert() throws Exception {
final String KEY = KEY_BASE + testCount++; final String KEY = KEY_BASE + testCount++;
final CountDownLatch readLatch = new CountDownLatch( 1 ); final CountDownLatch readLatch = new CountDownLatch(1);
final CountDownLatch commitLatch = new CountDownLatch( 1 ); final CountDownLatch commitLatch = new CountDownLatch(1);
final CountDownLatch completionLatch = new CountDownLatch( 2 ); final CountDownLatch completionLatch = new CountDownLatch(2);
Thread inserter = new Thread() { Thread inserter = new Thread() {
@Override @Override
public void run() { public void run() {
try { try {
long txTimestamp = System.currentTimeMillis(); long txTimestamp = System.currentTimeMillis();
BatchModeTransactionManager.getInstance().begin(); BatchModeTransactionManager.getInstance().begin();
assertNull( "Correct initial value", localAccessStrategy.get( KEY, txTimestamp ) ); assertNull("Correct initial value", localAccessStrategy.get(KEY, txTimestamp));
localAccessStrategy.insert( KEY, VALUE1, new Integer( 1 ) ); localAccessStrategy.insert(KEY, VALUE1, new Integer(1));
readLatch.countDown(); readLatch.countDown();
commitLatch.await(); commitLatch.await();
BatchModeTransactionManager.getInstance().commit(); BatchModeTransactionManager.getInstance().commit();
} } catch (Exception e) {
catch (Exception e) { log.error("node1 caught exception", e);
log.error("node1 caught exception", e); node1Exception = e;
node1Exception = e; rollback();
rollback(); } catch (AssertionFailedError e) {
} node1Failure = e;
catch (AssertionFailedError e) { rollback();
node1Failure = e; } finally {
rollback(); completionLatch.countDown();
} }
finally { }
completionLatch.countDown(); };
}
}
};
Thread reader = new Thread() { Thread reader = new Thread() {
@Override @Override
public void run() { public void run() {
try { try {
long txTimestamp = System.currentTimeMillis(); long txTimestamp = System.currentTimeMillis();
BatchModeTransactionManager.getInstance().begin(); BatchModeTransactionManager.getInstance().begin();
readLatch.await(); readLatch.await();
// Object expected = !isBlockingReads() ? null : VALUE1; // Object expected = !isBlockingReads() ? null : VALUE1;
Object expected = null; Object expected = null;
assertEquals( assertEquals(
"Correct initial value", expected, localAccessStrategy.get( "Correct initial value", expected, localAccessStrategy.get(
KEY, KEY,
txTimestamp txTimestamp
) )
); );
BatchModeTransactionManager.getInstance().commit(); BatchModeTransactionManager.getInstance().commit();
} } catch (Exception e) {
catch (Exception e) { log.error("node1 caught exception", e);
log.error("node1 caught exception", e); node1Exception = e;
node1Exception = e; rollback();
rollback(); } catch (AssertionFailedError e) {
} node1Failure = e;
catch (AssertionFailedError e) { rollback();
node1Failure = e; } finally {
rollback(); commitLatch.countDown();
} completionLatch.countDown();
finally { }
commitLatch.countDown(); }
completionLatch.countDown(); };
}
}
};
inserter.setDaemon( true ); inserter.setDaemon(true);
reader.setDaemon( true ); reader.setDaemon(true);
inserter.start(); inserter.start();
reader.start(); reader.start();
assertTrue( "Threads completed", completionLatch.await( 1, TimeUnit.SECONDS ) ); assertTrue("Threads completed", completionLatch.await(1, TimeUnit.SECONDS));
assertThreadsRanCleanly(); assertThreadsRanCleanly();
long txTimestamp = System.currentTimeMillis(); long txTimestamp = System.currentTimeMillis();
assertEquals( "Correct node1 value", VALUE1, localAccessStrategy.get( KEY, txTimestamp ) ); assertEquals("Correct node1 value", VALUE1, localAccessStrategy.get(KEY, txTimestamp));
Object expected = isUsingInvalidation() ? null : VALUE1; Object expected = isUsingInvalidation() ? null : VALUE1;
assertEquals( "Correct node2 value", expected, remoteAccessStrategy.get( KEY, txTimestamp ) ); assertEquals("Correct node2 value", expected, remoteAccessStrategy.get(KEY, txTimestamp));
} }
@Test @Test
public void testUpdate() throws Exception { public void testUpdate() throws Exception {
final String KEY = KEY_BASE + testCount++; final String KEY = KEY_BASE + testCount++;
// Set up initial state // Set up initial state
localAccessStrategy.putFromLoad( KEY, VALUE1, System.currentTimeMillis(), new Integer( 1 ) ); localAccessStrategy.putFromLoad(KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
remoteAccessStrategy.putFromLoad( KEY, VALUE1, System.currentTimeMillis(), new Integer( 1 ) ); remoteAccessStrategy.putFromLoad(KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
// Let the async put propagate // Let the async put propagate
sleep( 250 ); sleep(250);
final CountDownLatch readLatch = new CountDownLatch( 1 ); final CountDownLatch readLatch = new CountDownLatch(1);
final CountDownLatch commitLatch = new CountDownLatch( 1 ); final CountDownLatch commitLatch = new CountDownLatch(1);
final CountDownLatch completionLatch = new CountDownLatch( 2 ); final CountDownLatch completionLatch = new CountDownLatch(2);
Thread updater = new Thread( "testUpdate-updater" ) { Thread updater = new Thread("testUpdate-updater") {
@Override @Override
public void run() { public void run() {
boolean readerUnlocked = false; boolean readerUnlocked = false;
try { try {
long txTimestamp = System.currentTimeMillis(); long txTimestamp = System.currentTimeMillis();
BatchModeTransactionManager.getInstance().begin(); BatchModeTransactionManager.getInstance().begin();
log.debug("Transaction began, get initial value"); log.debug("Transaction began, get initial value");
assertEquals( "Correct initial value", VALUE1, localAccessStrategy.get( KEY, txTimestamp ) ); assertEquals("Correct initial value", VALUE1, localAccessStrategy.get(KEY, txTimestamp));
log.debug("Now update value"); log.debug("Now update value");
localAccessStrategy.update( KEY, VALUE2, new Integer( 2 ), new Integer( 1 ) ); localAccessStrategy.update(KEY, VALUE2, new Integer(2), new Integer(1));
log.debug("Notify the read latch"); log.debug("Notify the read latch");
readLatch.countDown(); readLatch.countDown();
readerUnlocked = true; readerUnlocked = true;
log.debug("Await commit"); log.debug("Await commit");
commitLatch.await(); commitLatch.await();
BatchModeTransactionManager.getInstance().commit(); BatchModeTransactionManager.getInstance().commit();
} } catch (Exception e) {
catch (Exception e) { log.error("node1 caught exception", e);
log.error("node1 caught exception", e); node1Exception = e;
node1Exception = e; rollback();
rollback(); } catch (AssertionFailedError e) {
} node1Failure = e;
catch (AssertionFailedError e) { rollback();
node1Failure = e; } finally {
rollback(); if (!readerUnlocked) {
} readLatch.countDown();
finally { }
if ( !readerUnlocked ) { log.debug("Completion latch countdown");
readLatch.countDown(); completionLatch.countDown();
} }
log.debug("Completion latch countdown"); }
completionLatch.countDown(); };
}
}
};
Thread reader = new Thread( "testUpdate-reader" ) { Thread reader = new Thread("testUpdate-reader") {
@Override @Override
public void run() { public void run() {
try { try {
long txTimestamp = System.currentTimeMillis(); long txTimestamp = System.currentTimeMillis();
BatchModeTransactionManager.getInstance().begin(); BatchModeTransactionManager.getInstance().begin();
log.debug("Transaction began, await read latch"); log.debug("Transaction began, await read latch");
readLatch.await(); readLatch.await();
log.debug("Read latch acquired, verify local access strategy"); log.debug("Read latch acquired, verify local access strategy");
// This won't block w/ mvc and will read the old value // This won't block w/ mvc and will read the old value
Object expected = VALUE1; Object expected = VALUE1;
assertEquals( "Correct value", expected, localAccessStrategy.get( KEY, txTimestamp ) ); assertEquals("Correct value", expected, localAccessStrategy.get(KEY, txTimestamp));
BatchModeTransactionManager.getInstance().commit(); BatchModeTransactionManager.getInstance().commit();
} } catch (Exception e) {
catch (Exception e) { log.error("node1 caught exception", e);
log.error("node1 caught exception", e); node1Exception = e;
node1Exception = e; rollback();
rollback(); } catch (AssertionFailedError e) {
} node1Failure = e;
catch (AssertionFailedError e) { rollback();
node1Failure = e; } finally {
rollback(); commitLatch.countDown();
} log.debug("Completion latch countdown");
finally { completionLatch.countDown();
commitLatch.countDown(); }
log.debug("Completion latch countdown"); }
completionLatch.countDown(); };
}
}
};
updater.setDaemon( true ); updater.setDaemon(true);
reader.setDaemon( true ); reader.setDaemon(true);
updater.start(); updater.start();
reader.start(); reader.start();
// Should complete promptly // Should complete promptly
assertTrue( completionLatch.await( 2, TimeUnit.SECONDS ) ); assertTrue(completionLatch.await(2, TimeUnit.SECONDS));
assertThreadsRanCleanly(); assertThreadsRanCleanly();
long txTimestamp = System.currentTimeMillis(); long txTimestamp = System.currentTimeMillis();
assertEquals( "Correct node1 value", VALUE2, localAccessStrategy.get( KEY, txTimestamp ) ); assertEquals("Correct node1 value", VALUE2, localAccessStrategy.get(KEY, txTimestamp));
Object expected = isUsingInvalidation() ? null : VALUE2; Object expected = isUsingInvalidation() ? null : VALUE2;
assertEquals( "Correct node2 value", expected, remoteAccessStrategy.get( KEY, txTimestamp ) ); assertEquals("Correct node2 value", expected, remoteAccessStrategy.get(KEY, txTimestamp));
} }
@Test @Test
public void testRemove() throws Exception { public void testRemove() throws Exception {
evictOrRemoveTest( false ); evictOrRemoveTest(false);
} }
@Test @Test
public void testRemoveAll() throws Exception { public void testRemoveAll() throws Exception {
evictOrRemoveAllTest( false ); evictOrRemoveAllTest(false);
} }
@Test @Test
public void testEvict() throws Exception { public void testEvict() throws Exception {
evictOrRemoveTest( true ); evictOrRemoveTest(true);
} }
@Test @Test
public void testEvictAll() throws Exception { public void testEvictAll() throws Exception {
evictOrRemoveAllTest( true ); evictOrRemoveAllTest(true);
} }
private void evictOrRemoveTest(final boolean evict) throws Exception { private void evictOrRemoveTest(final boolean evict) throws Exception {
final String KEY = KEY_BASE + testCount++; final String KEY = KEY_BASE + testCount++;
assertEquals( 0, getValidKeyCount( localEntityRegion.getCache().keySet() ) ); assertEquals(0, getValidKeyCount(localEntityRegion.getCache().keySet()));
assertEquals( 0, getValidKeyCount( remoteEntityRegion.getCache().keySet() ) ); assertEquals(0, getValidKeyCount(remoteEntityRegion.getCache().keySet()));
assertNull( "local is clean", localAccessStrategy.get( KEY, System.currentTimeMillis() ) ); assertNull("local is clean", localAccessStrategy.get(KEY, System.currentTimeMillis()));
assertNull( "remote is clean", remoteAccessStrategy.get( KEY, System.currentTimeMillis() ) ); assertNull("remote is clean", remoteAccessStrategy.get(KEY, System.currentTimeMillis()));
localAccessStrategy.putFromLoad( KEY, VALUE1, System.currentTimeMillis(), new Integer( 1 ) ); localAccessStrategy.putFromLoad(KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals( VALUE1, localAccessStrategy.get( KEY, System.currentTimeMillis() ) ); assertEquals(VALUE1, localAccessStrategy.get(KEY, System.currentTimeMillis()));
remoteAccessStrategy.putFromLoad( KEY, VALUE1, System.currentTimeMillis(), new Integer( 1 ) ); remoteAccessStrategy.putFromLoad(KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals( VALUE1, remoteAccessStrategy.get( KEY, System.currentTimeMillis() ) ); assertEquals(VALUE1, remoteAccessStrategy.get(KEY, System.currentTimeMillis()));
Caches.withinTx(localEntityRegion.getTransactionManager(), new Callable<Void>() { Caches.withinTx(localEntityRegion.getTransactionManager(), new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
if ( evict ) if (evict)
localAccessStrategy.evict( KEY ); localAccessStrategy.evict(KEY);
else else
localAccessStrategy.remove( KEY ); localAccessStrategy.remove(KEY);
return null; return null;
} }
}); });
assertEquals(null, localAccessStrategy.get(KEY, System.currentTimeMillis())); assertEquals(null, localAccessStrategy.get(KEY, System.currentTimeMillis()));
assertEquals( 0, getValidKeyCount( localEntityRegion.getCache().keySet() ) ); assertEquals(0, getValidKeyCount(localEntityRegion.getCache().keySet()));
assertEquals( null, remoteAccessStrategy.get( KEY, System.currentTimeMillis() ) ); assertEquals(null, remoteAccessStrategy.get(KEY, System.currentTimeMillis()));
assertEquals( 0, getValidKeyCount( remoteEntityRegion.getCache().keySet() ) ); assertEquals(0, getValidKeyCount(remoteEntityRegion.getCache().keySet()));
} }
private void evictOrRemoveAllTest(final boolean evict) throws Exception { private void evictOrRemoveAllTest(final boolean evict) throws Exception {
final String KEY = KEY_BASE + testCount++; final String KEY = KEY_BASE + testCount++;
assertEquals( 0, getValidKeyCount( localEntityRegion.getCache().keySet() ) ); assertEquals(0, getValidKeyCount(localEntityRegion.getCache().keySet()));
assertEquals( 0, getValidKeyCount( remoteEntityRegion.getCache().keySet() ) ); assertEquals(0, getValidKeyCount(remoteEntityRegion.getCache().keySet()));
assertNull( "local is clean", localAccessStrategy.get( KEY, System.currentTimeMillis() ) ); assertNull("local is clean", localAccessStrategy.get(KEY, System.currentTimeMillis()));
assertNull( "remote is clean", remoteAccessStrategy.get( KEY, System.currentTimeMillis() ) ); assertNull("remote is clean", remoteAccessStrategy.get(KEY, System.currentTimeMillis()));
localAccessStrategy.putFromLoad( KEY, VALUE1, System.currentTimeMillis(), new Integer( 1 ) ); localAccessStrategy.putFromLoad(KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals( VALUE1, localAccessStrategy.get( KEY, System.currentTimeMillis() ) ); assertEquals(VALUE1, localAccessStrategy.get(KEY, System.currentTimeMillis()));
// Wait for async propagation // Wait for async propagation
sleep( 250 ); sleep(250);
remoteAccessStrategy.putFromLoad( KEY, VALUE1, System.currentTimeMillis(), new Integer( 1 ) ); remoteAccessStrategy.putFromLoad(KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals( VALUE1, remoteAccessStrategy.get( KEY, System.currentTimeMillis() ) ); assertEquals(VALUE1, remoteAccessStrategy.get(KEY, System.currentTimeMillis()));
// Wait for async propagation // Wait for async propagation
sleep( 250 ); sleep(250);
Caches.withinTx(localEntityRegion.getTransactionManager(), new Callable<Void>() { Caches.withinTx(localEntityRegion.getTransactionManager(), new Callable<Void>() {
@Override @Override
@ -593,41 +577,40 @@ public abstract class AbstractEntityRegionAccessStrategyTestCase extends Abstrac
} }
}); });
// This should re-establish the region root node in the optimistic case // This should re-establish the region root node in the optimistic case
assertNull(localAccessStrategy.get(KEY, System.currentTimeMillis())); assertNull(localAccessStrategy.get(KEY, System.currentTimeMillis()));
assertEquals( 0, getValidKeyCount( localEntityRegion.getCache().keySet() ) ); assertEquals(0, getValidKeyCount(localEntityRegion.getCache().keySet()));
// Re-establishing the region root on the local node doesn't // Re-establishing the region root on the local node doesn't
// propagate it to other nodes. Do a get on the remote node to re-establish // propagate it to other nodes. Do a get on the remote node to re-establish
assertEquals( null, remoteAccessStrategy.get( KEY, System.currentTimeMillis() ) ); assertEquals(null, remoteAccessStrategy.get(KEY, System.currentTimeMillis()));
assertEquals( 0, getValidKeyCount( remoteEntityRegion.getCache().keySet() ) ); assertEquals(0, getValidKeyCount(remoteEntityRegion.getCache().keySet()));
// Test whether the get above messes up the optimistic version // Test whether the get above messes up the optimistic version
remoteAccessStrategy.putFromLoad( KEY, VALUE1, System.currentTimeMillis(), new Integer( 1 ) ); remoteAccessStrategy.putFromLoad(KEY, VALUE1, System.currentTimeMillis(), new Integer(1));
assertEquals( VALUE1, remoteAccessStrategy.get( KEY, System.currentTimeMillis() ) ); assertEquals(VALUE1, remoteAccessStrategy.get(KEY, System.currentTimeMillis()));
assertEquals( 1, getValidKeyCount( remoteEntityRegion.getCache().keySet() ) ); assertEquals(1, getValidKeyCount(remoteEntityRegion.getCache().keySet()));
// Wait for async propagation // Wait for async propagation
sleep( 250 ); sleep(250);
assertEquals( assertEquals(
"local is correct", (isUsingInvalidation() ? null : VALUE1), localAccessStrategy "local is correct", (isUsingInvalidation() ? null : VALUE1), localAccessStrategy
.get( KEY, System.currentTimeMillis() ) .get(KEY, System.currentTimeMillis())
); );
assertEquals( assertEquals(
"remote is correct", VALUE1, remoteAccessStrategy.get( "remote is correct", VALUE1, remoteAccessStrategy.get(
KEY, System KEY, System
.currentTimeMillis() .currentTimeMillis()
) )
); );
} }
protected void rollback() { protected void rollback() {
try { try {
BatchModeTransactionManager.getInstance().rollback(); BatchModeTransactionManager.getInstance().rollback();
} } catch (Exception e) {
catch (Exception e) { log.error(e.getMessage(), e);
log.error(e.getMessage(), e); }
} }
}
} }