HHH-7710 Provide an Infinispan 5.1 and 5.2 compatible implementation

* Upgraded to Infinispan 5.2.0.Beta3.
* Used a workaround to apply global statistics configuration.
* Infinispan 5.1.x compatibility is still required, which is why a CI
job will be created to verify it works fine.
This commit is contained in:
Galder Zamarreño 2012-10-23 16:54:54 +02:00
parent 1d9e45bc32
commit 0ef3a25d15
9 changed files with 540 additions and 291 deletions

View File

@ -24,7 +24,7 @@ test {
systemProperties['jgroups.udp.enable_bundling'] = false
systemProperties['jgroups.bind_addr'] = 'localhost'
// Use Infinispan's test JGroups stack that uses TEST_PING
systemProperties['hibernate.cache.infinispan.jgroups_cfg'] = 'stacks/tcp.xml'
systemProperties['hibernate.cache.infinispan.jgroups_cfg'] = '2lc-test-tcp.xml'
// systemProperties['log4j.configuration'] = 'file:/log4j/log4j-infinispan.xml'
enabled = true
}

View File

@ -19,6 +19,8 @@
import org.infinispan.config.Configuration;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.global.GlobalConfigurationBuilder;
import org.infinispan.configuration.parsing.ConfigurationBuilderHolder;
import org.infinispan.factories.GlobalComponentRegistry;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
@ -382,11 +384,34 @@ public Set<String> getDefinedConfigurations() {
protected EmbeddedCacheManager createCacheManager(Properties properties) throws CacheException {
try {
String configLoc = ConfigurationHelper.getString(INFINISPAN_CONFIG_RESOURCE_PROP, properties, DEF_INFINISPAN_CONFIG_RESOURCE);
EmbeddedCacheManager manager = new DefaultCacheManager(configLoc, false);
String globalStats = extractProperty(INFINISPAN_GLOBAL_STATISTICS_PROP, properties);
if (globalStats != null) {
manager.getGlobalConfiguration().setExposeGlobalJmxStatistics(Boolean.parseBoolean(globalStats));
// Hack to enable global JMX stats being enabled in both 5.1 and 5.2
// 1. Create a configuration builder holder
ConfigurationBuilderHolder holder = new ConfigurationBuilderHolder();
// 2. Build global configuration with custom settings
GlobalConfigurationBuilder globalBuilder = holder.getGlobalConfigurationBuilder();
globalBuilder.read(manager.getCacheManagerConfiguration());
globalBuilder.globalJmxStatistics().enabled(Boolean.parseBoolean(globalStats));
// 3. Build default configuration
holder.getDefaultConfigurationBuilder().read(manager.getDefaultCacheConfiguration());
// 4. Build all defined caches
for (String cacheName : manager.getCacheNames()){
ConfigurationBuilder builder = holder.newConfigurationBuilder(cacheName);
builder.read(manager.getCacheConfiguration(cacheName));
}
// 5. Discard existing cache manager and create a brand new one
manager.stop();
manager = new DefaultCacheManager(holder, false);
}
manager.start();
return manager;
} catch (IOException e) {

View File

@ -16,6 +16,8 @@
</properties>
<!-- See the JGroupsTransport javadocs for more flags -->
</transport>
<globalJmxStatistics allowDuplicateDomains="true" />
</global>
<default>

View File

@ -56,7 +56,7 @@ public void prepareCacheSupport() throws Exception {
preferIPv4Stack = System.getProperty(PREFER_IPV4STACK);
System.setProperty(PREFER_IPV4STACK, "true");
jgroupsCfgFile = System.getProperty(JGROUPS_CFG_FILE);
System.setProperty(JGROUPS_CFG_FILE, "stacks/tcp.xml");
System.setProperty(JGROUPS_CFG_FILE, "2lc-test-tcp.xml");
testSupport.setUp();
}

View File

@ -30,6 +30,8 @@
import org.infinispan.eviction.EvictionStrategy;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.CacheManagerCallable;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.junit.Test;
import org.hibernate.cache.CacheException;
@ -44,6 +46,7 @@
import org.hibernate.service.jta.platform.internal.JBossStandAloneJtaPlatform;
import org.hibernate.testing.ServiceRegistryBuilder;
import static org.infinispan.test.TestingUtil.withCacheManager;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@ -279,18 +282,23 @@ public void testBuildEntityRegionPersonPlusEntityOverridesWithoutCfg() {
}
}
@Test
public void testTimestampValidation() {
Properties p = new Properties();
final DefaultCacheManager manager = new DefaultCacheManager();
InfinispanRegionFactory factory = createRegionFactory(manager, p);
Configuration config = new Configuration();
config.setCacheMode(CacheMode.INVALIDATION_SYNC);
manager.defineConfiguration("timestamps", config);
try {
factory.start(null, p);
fail("Should have failed saying that invalidation is not allowed for timestamp caches.");
} catch(CacheException ce) {
}
public void testTimestampValidation() throws Exception {
final Properties p = new Properties();
withCacheManager(new CacheManagerCallable(
TestCacheManagerFactory.createLocalCacheManager(false)) {
@Override
public void call() {
InfinispanRegionFactory factory = createRegionFactory(cm, p);
Configuration config = new Configuration();
config.setCacheMode(CacheMode.INVALIDATION_SYNC);
cm.defineConfiguration("timestamps", config);
try {
factory.start(null, p);
fail("Should have failed saying that invalidation is not allowed for timestamp caches.");
} catch(CacheException ce) {
}
}
});
}
@Test
public void testBuildDefaultTimestampsRegion() {

View File

@ -3,7 +3,7 @@
*
* Copyright (c) 2009, Red Hat, Inc or third-party contributors as
* indicated by the @author tags or express copyright attribution
* statements applied by the authors.  All third-party contributions are
* statements applied by the authors. All third-party contributions are
* distributed under license by Red Hat Middleware LLC.
*
* This copyrighted material is made available to anyone wishing to use, modify,
@ -34,7 +34,11 @@
import java.util.concurrent.atomic.AtomicReference;
import javax.transaction.TransactionManager;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.CacheManagerCallable;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -42,6 +46,7 @@
import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
import org.hibernate.test.cache.infinispan.functional.cluster.DualNodeJtaTransactionManagerImpl;
import static org.infinispan.test.TestingUtil.withCacheManager;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@ -56,6 +61,10 @@
* @version $Revision: $
*/
public class PutFromLoadValidatorUnitTestCase {
private static final Log log = LogFactory.getLog(
PutFromLoadValidatorUnitTestCase.class);
private Object KEY1 = "KEY1";
private TransactionManager tm;
@ -74,7 +83,7 @@ public void tearDown() throws Exception {
finally {
DualNodeJtaTransactionManagerImpl.cleanupTransactionManagers();
}
}
}
@Test
public void testNakedPut() throws Exception {
@ -85,22 +94,32 @@ public void testNakedPutTransactional() throws Exception {
nakedPutTest(true);
}
private void nakedPutTest(boolean transactional) throws Exception {
PutFromLoadValidator testee = new PutFromLoadValidator(
new DefaultCacheManager(), transactional ? tm : null,
PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
if (transactional) {
tm.begin();
}
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertTrue(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
private void nakedPutTest(final boolean transactional) throws Exception {
withCacheManager(new CacheManagerCallable(
TestCacheManagerFactory.createLocalCacheManager(false)) {
@Override
public void call() {
try {
PutFromLoadValidator testee = new PutFromLoadValidator(cm,
transactional ? tm : null,
PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
if (transactional) {
tm.begin();
}
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertTrue(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
});
}
@Test
public void testRegisteredPut() throws Exception {
@ -111,23 +130,34 @@ public void testRegisteredPutTransactional() throws Exception {
registeredPutTest(true);
}
private void registeredPutTest(boolean transactional) throws Exception {
PutFromLoadValidator testee = new PutFromLoadValidator(new DefaultCacheManager(),
transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
private void registeredPutTest(final boolean transactional) throws Exception {
withCacheManager(new CacheManagerCallable(
TestCacheManagerFactory.createLocalCacheManager(false)) {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm,
transactional ? tm : null,
PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
try {
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertTrue(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertTrue(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
});
}
@Test
public void testNakedPutAfterKeyRemoval() throws Exception {
@ -146,28 +176,40 @@ public void testNakedPutAfterRegionRemovalTransactional() throws Exception {
nakedPutAfterRemovalTest(true, true);
}
private void nakedPutAfterRemovalTest(boolean transactional, boolean removeRegion)
throws Exception {
PutFromLoadValidator testee = new PutFromLoadValidator(new DefaultCacheManager(),
transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
if (removeRegion) {
testee.invalidateRegion();
} else {
testee.invalidateKey(KEY1);
}
if (transactional) {
tm.begin();
}
private void nakedPutAfterRemovalTest(final boolean transactional,
final boolean removeRegion) throws Exception {
withCacheManager(new CacheManagerCallable(
TestCacheManagerFactory.createLocalCacheManager(false)) {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm,
transactional ? tm : null,
PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
if (removeRegion) {
testee.invalidateRegion();
} else {
testee.invalidateKey(KEY1);
}
try {
if (transactional) {
tm.begin();
}
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertFalse(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertFalse(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
});
}
@Test
public void testRegisteredPutAfterKeyRemoval() throws Exception {
@ -177,79 +219,103 @@ public void testRegisteredPutAfterKeyRemoval() throws Exception {
public void testRegisteredPutAfterKeyRemovalTransactional() throws Exception {
registeredPutAfterRemovalTest(true, false);
}
@Test
@Test
public void testRegisteredPutAfterRegionRemoval() throws Exception {
registeredPutAfterRemovalTest(false, true);
}
@Test
@Test
public void testRegisteredPutAfterRegionRemovalTransactional() throws Exception {
registeredPutAfterRemovalTest(true, true);
}
private void registeredPutAfterRemovalTest(boolean transactional, boolean removeRegion)
throws Exception {
PutFromLoadValidator testee = new PutFromLoadValidator(new DefaultCacheManager(),
transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
if (removeRegion) {
testee.invalidateRegion();
} else {
testee.invalidateKey(KEY1);
}
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
private void registeredPutAfterRemovalTest(final boolean transactional,
final boolean removeRegion) throws Exception {
withCacheManager(new CacheManagerCallable(
TestCacheManagerFactory.createLocalCacheManager(false)) {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm,
transactional ? tm : null,
PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
if (removeRegion) {
testee.invalidateRegion();
} else {
testee.invalidateKey(KEY1);
}
try {
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertTrue(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertTrue(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
});
}
@Test
@Test
public void testRegisteredPutWithInterveningKeyRemoval() throws Exception {
registeredPutWithInterveningRemovalTest(false, false);
}
@Test
@Test
public void testRegisteredPutWithInterveningKeyRemovalTransactional() throws Exception {
registeredPutWithInterveningRemovalTest(true, false);
}
@Test
@Test
public void testRegisteredPutWithInterveningRegionRemoval() throws Exception {
registeredPutWithInterveningRemovalTest(false, true);
}
@Test
@Test
public void testRegisteredPutWithInterveningRegionRemovalTransactional() throws Exception {
registeredPutWithInterveningRemovalTest(true, true);
}
private void registeredPutWithInterveningRemovalTest(boolean transactional, boolean removeRegion)
private void registeredPutWithInterveningRemovalTest(
final boolean transactional, final boolean removeRegion)
throws Exception {
PutFromLoadValidator testee = new PutFromLoadValidator(new DefaultCacheManager(),
transactional ? tm : null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
if (removeRegion) {
testee.invalidateRegion();
} else {
testee.invalidateKey(KEY1);
}
withCacheManager(new CacheManagerCallable(
TestCacheManagerFactory.createLocalCacheManager(false)) {
@Override
public void call() {
PutFromLoadValidator testee = new PutFromLoadValidator(cm,
transactional ? tm : null,
PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
try {
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
if (removeRegion) {
testee.invalidateRegion();
} else {
testee.invalidateKey(KEY1);
}
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertFalse(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertFalse(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
});
}
@Test
public void testDelayedNakedPutAfterKeyRemoval() throws Exception {
@ -259,7 +325,7 @@ public void testDelayedNakedPutAfterKeyRemoval() throws Exception {
public void testDelayedNakedPutAfterKeyRemovalTransactional() throws Exception {
delayedNakedPutAfterRemovalTest(true, false);
}
@Test
@Test
public void testDelayedNakedPutAfterRegionRemoval() throws Exception {
delayedNakedPutAfterRemovalTest(false, true);
}
@ -268,87 +334,114 @@ public void testDelayedNakedPutAfterRegionRemovalTransactional() throws Exceptio
delayedNakedPutAfterRemovalTest(true, true);
}
private void delayedNakedPutAfterRemovalTest(boolean transactional, boolean removeRegion)
private void delayedNakedPutAfterRemovalTest(
final boolean transactional, final boolean removeRegion)
throws Exception {
PutFromLoadValidator testee = new TestValidator(transactional ? tm : null, 100);
if (removeRegion) {
testee.invalidateRegion();
} else {
testee.invalidateKey(KEY1);
}
if (transactional) {
tm.begin();
}
Thread.sleep(110);
withCacheManager(new CacheManagerCallable(
TestCacheManagerFactory.createLocalCacheManager(false)) {
@Override
public void call() {
PutFromLoadValidator testee = new TestValidator(cm,
transactional ? tm : null, 100);
if (removeRegion) {
testee.invalidateRegion();
} else {
testee.invalidateKey(KEY1);
}
try {
if (transactional) {
tm.begin();
}
Thread.sleep(110);
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertTrue(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
boolean lockable = testee.acquirePutFromLoadLock(KEY1);
try {
assertTrue(lockable);
}
finally {
if (lockable) {
testee.releasePutFromLoadLock(KEY1);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
});
}
@Test
public void testMultipleRegistrations() throws Exception {
multipleRegistrationtest(false);
}
@Test
public void testMultipleRegistrationsTransactional() throws Exception {
multipleRegistrationtest(true);
}
private void multipleRegistrationtest(final boolean transactional) throws Exception {
final PutFromLoadValidator testee = new PutFromLoadValidator(
new DefaultCacheManager(), transactional ? tm : null,
PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
withCacheManager(new CacheManagerCallable(
TestCacheManagerFactory.createLocalCacheManager(false)) {
@Override
public void call() {
final PutFromLoadValidator testee = new PutFromLoadValidator(cm,
transactional ? tm : null,
PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
final CountDownLatch registeredLatch = new CountDownLatch(3);
final CountDownLatch finishedLatch = new CountDownLatch(3);
final AtomicInteger success = new AtomicInteger();
final CountDownLatch registeredLatch = new CountDownLatch(3);
final CountDownLatch finishedLatch = new CountDownLatch(3);
final AtomicInteger success = new AtomicInteger();
Runnable r = new Runnable() {
public void run() {
try {
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
registeredLatch.countDown();
registeredLatch.await(5, TimeUnit.SECONDS);
if (testee.acquirePutFromLoadLock(KEY1)) {
Runnable r = new Runnable() {
public void run() {
try {
success.incrementAndGet();
if (transactional) {
tm.begin();
}
testee.registerPendingPut(KEY1);
registeredLatch.countDown();
registeredLatch.await(5, TimeUnit.SECONDS);
if (testee.acquirePutFromLoadLock(KEY1)) {
try {
log.trace("Put from load lock acquired for key = " + KEY1);
success.incrementAndGet();
}
finally {
testee.releasePutFromLoadLock(KEY1);
}
} else {
log.trace("Unable to acquired putFromLoad lock for key = " + KEY1);
}
finishedLatch.countDown();
}
finally {
testee.releasePutFromLoadLock(KEY1);
catch (Exception e) {
e.printStackTrace();
}
}
finishedLatch.countDown();
}
catch (Exception e) {
e.printStackTrace();
};
ExecutorService executor = Executors.newFixedThreadPool(3);
// Start with a removal so the "isPutValid" calls will fail if
// any of the concurrent activity isn't handled properly
testee.invalidateRegion();
// Do the registration + isPutValid calls
executor.execute(r);
executor.execute(r);
executor.execute(r);
try {
finishedLatch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
assertEquals("All threads succeeded", 3, success.get());
}
};
ExecutorService executor = Executors.newFixedThreadPool(3);
// Start with a removal so the "isPutValid" calls will fail if
// any of the concurrent activity isn't handled properly
testee.invalidateRegion();
// Do the registration + isPutValid calls
executor.execute(r);
executor.execute(r);
executor.execute(r);
finishedLatch.await(5, TimeUnit.SECONDS);
assertEquals("All threads succeeded", 3, success.get());
});
}
/**
@ -359,17 +452,23 @@ public void run() {
*/
@Test
public void testRemovalCleanup() throws Exception {
TestValidator testee = new TestValidator(null, 200);
testee.invalidateKey("KEY1");
testee.invalidateKey("KEY2");
expectRemovalLenth(2, testee, 60000l);
assertEquals(2, testee.getRemovalQueueLength());
expectRemovalLenth(2, testee, 60000l);
assertEquals(2, testee.getRemovalQueueLength());
expectRemovalLenth( 2, testee, 60000l );
withCacheManager(new CacheManagerCallable(
TestCacheManagerFactory.createLocalCacheManager(false)) {
@Override
public void call() {
TestValidator testee = new TestValidator(cm, null, 200);
testee.invalidateKey("KEY1");
testee.invalidateKey("KEY2");
expectRemovalLenth(2, testee, 60000l);
assertEquals(2, testee.getRemovalQueueLength());
expectRemovalLenth(2, testee, 60000l);
assertEquals(2, testee.getRemovalQueueLength());
expectRemovalLenth(2, testee, 60000l);
}
});
}
private void expectRemovalLenth(int expectedLength, TestValidator testee, long timeout) throws InterruptedException {
private void expectRemovalLenth(int expectedLength, TestValidator testee, long timeout) {
long timeoutMilestone = System.currentTimeMillis() + timeout;
while ( true ) {
int queueLength = testee.getRemovalQueueLength();
@ -381,7 +480,11 @@ private void expectRemovalLenth(int expectedLength, TestValidator testee, long t
if ( System.currentTimeMillis() > timeoutMilestone ) {
fail( "condition not reached after " + timeout + " milliseconds. giving up!" );
}
Thread.sleep(20);
try {
Thread.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
@ -397,76 +500,83 @@ public void testInvalidateRegionBlocksForInProgressPut() throws Exception {
}
private void invalidationBlocksForInProgressPutTest(final boolean keyOnly) throws Exception {
final PutFromLoadValidator testee = new PutFromLoadValidator(
new DefaultCacheManager(), null,
PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
final CountDownLatch removeLatch = new CountDownLatch(1);
final CountDownLatch pferLatch = new CountDownLatch(1);
final AtomicReference<Object> cache = new AtomicReference<Object>("INITIAL");
withCacheManager(new CacheManagerCallable(
TestCacheManagerFactory.createLocalCacheManager(false)) {
@Override
public void call() {
final PutFromLoadValidator testee = new PutFromLoadValidator(
cm, null, PutFromLoadValidator.NAKED_PUT_INVALIDATION_PERIOD);
final CountDownLatch removeLatch = new CountDownLatch(1);
final CountDownLatch pferLatch = new CountDownLatch(1);
final AtomicReference<Object> cache = new AtomicReference<Object>("INITIAL");
Callable<Boolean> pferCallable = new Callable<Boolean>() {
public Boolean call() throws Exception {
testee.registerPendingPut(KEY1);
if (testee.acquirePutFromLoadLock(KEY1)) {
Callable<Boolean> pferCallable = new Callable<Boolean>() {
public Boolean call() throws Exception {
testee.registerPendingPut(KEY1);
if (testee.acquirePutFromLoadLock(KEY1)) {
try {
removeLatch.countDown();
pferLatch.await();
cache.set("PFER");
return Boolean.TRUE;
}
finally {
testee.releasePutFromLoadLock(KEY1);
}
}
return Boolean.FALSE;
}
};
Callable<Void> invalidateCallable = new Callable<Void>() {
public Void call() throws Exception {
removeLatch.await();
if (keyOnly) {
testee.invalidateKey(KEY1);
} else {
testee.invalidateRegion();
}
cache.set(null);
return null;
}
};
ExecutorService executorService = Executors.newCachedThreadPool();
Future<Boolean> pferFuture = executorService.submit(pferCallable);
Future<Void> invalidateFuture = executorService.submit(invalidateCallable);
try {
try {
removeLatch.countDown();
pferLatch.await();
cache.set("PFER");
return Boolean.TRUE;
}
finally {
testee.releasePutFromLoadLock(KEY1);
invalidateFuture.get(1, TimeUnit.SECONDS);
fail("invalidateFuture did not block");
}
catch (TimeoutException good) {}
pferLatch.countDown();
assertTrue(pferFuture.get(5, TimeUnit.SECONDS));
invalidateFuture.get(5, TimeUnit.SECONDS);
assertNull(cache.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
return Boolean.FALSE;
}
};
Callable<Void> invalidateCallable = new Callable<Void>() {
public Void call() throws Exception {
removeLatch.await();
if (keyOnly) {
testee.invalidateKey(KEY1);
} else {
testee.invalidateRegion();
}
cache.set(null);
return null;
}
};
ExecutorService executorService = Executors.newCachedThreadPool();
Future<Boolean> pferFuture = executorService.submit(pferCallable);
Future<Void> invalidateFuture = executorService.submit(invalidateCallable);
try {
invalidateFuture.get(1, TimeUnit.SECONDS);
fail("invalidateFuture did not block");
}
catch (TimeoutException good) {}
pferLatch.countDown();
assertTrue(pferFuture.get(5, TimeUnit.SECONDS));
invalidateFuture.get(5, TimeUnit.SECONDS);
assertNull(cache.get());
});
}
private static class TestValidator extends PutFromLoadValidator {
protected TestValidator(TransactionManager transactionManager,
long nakedPutInvalidationPeriod) {
super(new DefaultCacheManager(),
transactionManager, nakedPutInvalidationPeriod);
protected TestValidator(EmbeddedCacheManager cm,
TransactionManager transactionManager,
long nakedPutInvalidationPeriod) {
super(cm, transactionManager, nakedPutInvalidationPeriod);
}
@Override
public int getRemovalQueueLength() {
// TODO Auto-generated method stub
return super.getRemovalQueueLength();
}
}
}
}

View File

@ -33,7 +33,8 @@
import junit.framework.AssertionFailedError;
import org.hibernate.cache.infinispan.util.Caches;
import org.infinispan.manager.DefaultCacheManager;
import org.infinispan.test.CacheManagerCallable;
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.transaction.tm.BatchModeTransactionManager;
import org.jboss.logging.Logger;
import org.junit.After;
@ -54,6 +55,7 @@
import org.hibernate.test.cache.infinispan.NodeEnvironment;
import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
import static org.infinispan.test.TestingUtil.withCacheManager;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
@ -162,61 +164,71 @@ public void testPutFromLoadRemoveDoesNotProduceStaleData() throws Exception {
final CountDownLatch pferLatch = new CountDownLatch( 1 );
final CountDownLatch removeLatch = new CountDownLatch( 1 );
final TransactionManager remoteTm = remoteCollectionRegion.getTransactionManager();
PutFromLoadValidator validator = new PutFromLoadValidator(
new DefaultCacheManager(), remoteTm, 20000) {
@Override
public boolean acquirePutFromLoadLock(Object key) {
boolean acquired = super.acquirePutFromLoadLock( key );
try {
removeLatch.countDown();
pferLatch.await( 2, TimeUnit.SECONDS );
}
catch (InterruptedException e) {
log.debug( "Interrupted" );
Thread.currentThread().interrupt();
}
catch (Exception e) {
log.error( "Error", e );
throw new RuntimeException( "Error", e );
}
return acquired;
}
};
final TransactionalAccessDelegate delegate =
new TransactionalAccessDelegate(localCollectionRegion, validator);
final TransactionManager localTm = localCollectionRegion.getTransactionManager();
Callable<Void> pferCallable = new Callable<Void>() {
public Void call() throws Exception {
delegate.putFromLoad( "k1", "v1", 0, null );
return null;
}
};
Callable<Void> removeCallable = new Callable<Void>() {
public Void call() throws Exception {
removeLatch.await();
Caches.withinTx(localTm, new Callable<Void>() {
withCacheManager(new CacheManagerCallable(
TestCacheManagerFactory.createLocalCacheManager(false)) {
@Override
public void call() {
PutFromLoadValidator validator = new PutFromLoadValidator(
cm, remoteTm, 20000) {
@Override
public boolean acquirePutFromLoadLock(Object key) {
boolean acquired = super.acquirePutFromLoadLock(key);
try {
removeLatch.countDown();
pferLatch.await(2, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
log.debug("Interrupted");
Thread.currentThread().interrupt();
}
catch (Exception e) {
log.error("Error", e);
throw new RuntimeException("Error", e);
}
return acquired;
}
};
final TransactionalAccessDelegate delegate =
new TransactionalAccessDelegate(localCollectionRegion, validator);
final TransactionManager localTm = localCollectionRegion.getTransactionManager();
Callable<Void> pferCallable = new Callable<Void>() {
public Void call() throws Exception {
delegate.remove("k1");
delegate.putFromLoad("k1", "v1", 0, null);
return null;
}
});
pferLatch.countDown();
return null;
}
};
};
ExecutorService executorService = Executors.newCachedThreadPool();
Future<Void> pferFuture = executorService.submit( pferCallable );
Future<Void> removeFuture = executorService.submit( removeCallable );
Callable<Void> removeCallable = new Callable<Void>() {
public Void call() throws Exception {
removeLatch.await();
Caches.withinTx(localTm, new Callable<Void>() {
@Override
public Void call() throws Exception {
delegate.remove("k1");
return null;
}
});
pferLatch.countDown();
return null;
}
};
pferFuture.get();
removeFuture.get();
ExecutorService executorService = Executors.newCachedThreadPool();
Future<Void> pferFuture = executorService.submit(pferCallable);
Future<Void> removeFuture = executorService.submit(removeCallable);
assertFalse(localCollectionRegion.getCache().containsKey("k1"));
try {
pferFuture.get();
removeFuture.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
assertFalse(localCollectionRegion.getCache().containsKey("k1"));
}
});
}
@Test

View File

@ -0,0 +1,92 @@
<!--
~ JBoss, Home of Professional Open Source
~ Copyright 2009 Red Hat Inc. and/or its affiliates and other
~ contributors as indicated by the @author tags. All rights reserved.
~ See the copyright.txt in the distribution for a full listing of
~ individual contributors.
~
~ This is free software; you can redistribute it and/or modify it
~ under the terms of the GNU Lesser General Public License as
~ published by the Free Software Foundation; either version 2.1 of
~ the License, or (at your option) any later version.
~
~ This software is distributed in the hope that it will be useful,
~ but WITHOUT ANY WARRANTY; without even the implied warranty of
~ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
~ Lesser General Public License for more details.
~
~ You should have received a copy of the GNU Lesser General Public
~ License along with this software; if not, write to the Free
~ Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
~ 02110-1301 USA, or see the FSF site: http://www.fsf.org.
-->
<config xmlns="urn:org:jgroups"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:org:jgroups file:schema/JGroups-3.0.xsd">
<TCP bind_port="7800"
loopback="true"
port_range="30"
recv_buf_size="20000000"
send_buf_size="640000"
discard_incompatible_packets="true"
max_bundle_size="64000"
max_bundle_timeout="30"
enable_bundling="true"
use_send_queues="true"
sock_conn_timeout="300"
enable_diagnostics="false"
bundler_type="old"
send_queue_size="0"
thread_pool.enabled="true"
thread_pool.min_threads="1"
thread_pool.max_threads="8"
thread_pool.keep_alive_time="5000"
thread_pool.queue_enabled="false"
thread_pool.queue_max_size="100"
thread_pool.rejection_policy="Run"
oob_thread_pool.enabled="true"
oob_thread_pool.min_threads="1"
oob_thread_pool.max_threads="8"
oob_thread_pool.keep_alive_time="5000"
oob_thread_pool.queue_enabled="false"
oob_thread_pool.queue_max_size="100"
oob_thread_pool.rejection_policy="Run"/>
<org.infinispan.test.fwk.TEST_PING ergonomics="false" testName=""/>
<!-- Ergonomics, new in JGroups 2.11, are disabled by default until JGRP-1253 is resolved -->
<!--<TCPPING timeout="3000"-->
<!--initial_hosts="localhost[7800]"-->
<!--port_range="3"-->
<!--ergonomics="false"-->
<!--/>-->
<!--<MPING bind_addr="127.0.0.1" break_on_coord_rsp="true"-->
<!--mcast_addr="${jgroups.mping.mcast_addr:228.2.4.6}"-->
<!--mcast_port="${jgroups.mping.mcast_port:43366}"-->
<!--ip_ttl="2" num_initial_members="3"/>-->
<MERGE2 max_interval="10000"
min_interval="3000"/>
<FD_SOCK/>
<!--
Note that this is an atypically short timeout and a small number of retries
configured this way to speed up unit testing, since we know all nodes run in the same JVM
and hence failure detections will be very quick.
-->
<FD timeout="3000" max_tries="3"/>
<VERIFY_SUSPECT timeout="1500"/>
<pbcast.NAKACK
use_mcast_xmit="false"
retransmit_timeout="300,600,1200,2400,4800"
discard_delivered_msgs="false"/>
<UNICAST timeout="300,600,1200"/>
<pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
max_bytes="400000"/>
<pbcast.GMS print_local_addr="false" join_timeout="7000" view_bundling="true"/>
<FC max_credits="2000000"
min_threshold="0.10"/>
<FRAG2 frag_size="60000"/>
</config>

View File

@ -6,7 +6,7 @@ slf4jVersion = '1.6.1'
junitVersion = '4.10'
h2Version = '1.2.145'
bytemanVersion = '1.5.2'
infinispanVersion = '5.1.6.FINAL'
infinispanVersion = '5.2.0.Beta3'
jnpVersion = '5.0.6.CR1'
libraries = [