HHH-11304 Replace PutFromLoadValidator properly in CollectionRegionAccessStrategyTest
This commit is contained in:
parent
294ba74c76
commit
c94df359d6
|
@ -105,7 +105,7 @@ public class PutFromLoadValidator {
|
|||
/**
|
||||
* Injected interceptor
|
||||
*/
|
||||
private final NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor;
|
||||
private NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor;
|
||||
|
||||
/**
|
||||
* The time of the last call to {@link #endInvalidatingRegion()}. Puts from transactions started after
|
||||
|
@ -161,54 +161,61 @@ public class PutFromLoadValidator {
|
|||
if (!cacheMode.isInvalidation()) {
|
||||
throw new IllegalArgumentException("PutFromLoadValidator in clustered caches requires invalidation mode.");
|
||||
}
|
||||
List<CommandInterceptor> interceptorChain = cache.getInterceptorChain();
|
||||
log.debug("Interceptor chain was: " + interceptorChain);
|
||||
int position = 0;
|
||||
// add interceptor before uses exact match, not instanceof match
|
||||
int invalidationPosition = 0;
|
||||
int entryWrappingPosition = 0;
|
||||
for (CommandInterceptor ci : interceptorChain) {
|
||||
if (ci instanceof InvalidationInterceptor) {
|
||||
invalidationPosition = position;
|
||||
}
|
||||
if (ci instanceof EntryWrappingInterceptor) {
|
||||
entryWrappingPosition = position;
|
||||
}
|
||||
position++;
|
||||
}
|
||||
boolean transactional = cache.getCacheConfiguration().transaction().transactionMode().isTransactional();
|
||||
if (transactional) {
|
||||
cache.removeInterceptor(invalidationPosition);
|
||||
TxInvalidationInterceptor txInvalidationInterceptor = new TxInvalidationInterceptor();
|
||||
cache.getComponentRegistry().registerComponent(txInvalidationInterceptor, TxInvalidationInterceptor.class);
|
||||
cache.addInterceptor(txInvalidationInterceptor, invalidationPosition);
|
||||
|
||||
// Note that invalidation does *NOT* acquire locks; therefore, we have to start invalidating before
|
||||
// wrapping the entry, since if putFromLoad was invoked between wrap and beginInvalidatingKey, the invalidation
|
||||
// would not commit the entry removal (as during wrap the entry was not in cache)
|
||||
TxPutFromLoadInterceptor txPutFromLoadInterceptor = new TxPutFromLoadInterceptor(this, cache.getName());
|
||||
cache.getComponentRegistry().registerComponent(txPutFromLoadInterceptor, TxPutFromLoadInterceptor.class);
|
||||
cache.addInterceptor(txPutFromLoadInterceptor, entryWrappingPosition);
|
||||
}
|
||||
else {
|
||||
cache.removeInterceptor(invalidationPosition);
|
||||
NonTxInvalidationInterceptor nonTxInvalidationInterceptor = new NonTxInvalidationInterceptor(this);
|
||||
cache.getComponentRegistry().registerComponent(nonTxInvalidationInterceptor, NonTxInvalidationInterceptor.class);
|
||||
cache.addInterceptor(nonTxInvalidationInterceptor, invalidationPosition);
|
||||
|
||||
nonTxPutFromLoadInterceptor = new NonTxPutFromLoadInterceptor(this, cache.getName());
|
||||
cache.getComponentRegistry().registerComponent(nonTxPutFromLoadInterceptor, NonTxPutFromLoadInterceptor.class);
|
||||
cache.addInterceptor(nonTxPutFromLoadInterceptor, entryWrappingPosition);
|
||||
}
|
||||
log.debug("New interceptor chain is: " + cache.getInterceptorChain());
|
||||
|
||||
CacheCommandInitializer cacheCommandInitializer = cache.getComponentRegistry().getComponent(CacheCommandInitializer.class);
|
||||
cacheCommandInitializer.addPutFromLoadValidator(cache.getName(), this);
|
||||
addToCache(cache, this);
|
||||
}
|
||||
|
||||
this.cache = cache;
|
||||
this.pendingPuts = cacheManager.getCache(pendingPutsName);
|
||||
this.nonTxPutFromLoadInterceptor = nonTxPutFromLoadInterceptor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Besides the call from constructor, this should be called only from tests when mocking the validator.
|
||||
*/
|
||||
public static void addToCache(AdvancedCache cache, PutFromLoadValidator validator) {
|
||||
List<CommandInterceptor> interceptorChain = cache.getInterceptorChain();
|
||||
log.debug("Interceptor chain was: " + interceptorChain);
|
||||
int position = 0;
|
||||
// add interceptor before uses exact match, not instanceof match
|
||||
int invalidationPosition = 0;
|
||||
int entryWrappingPosition = 0;
|
||||
for (CommandInterceptor ci : interceptorChain) {
|
||||
if (ci instanceof InvalidationInterceptor) {
|
||||
invalidationPosition = position;
|
||||
}
|
||||
if (ci instanceof EntryWrappingInterceptor) {
|
||||
entryWrappingPosition = position;
|
||||
}
|
||||
position++;
|
||||
}
|
||||
boolean transactional = cache.getCacheConfiguration().transaction().transactionMode().isTransactional();
|
||||
if (transactional) {
|
||||
cache.removeInterceptor(invalidationPosition);
|
||||
TxInvalidationInterceptor txInvalidationInterceptor = new TxInvalidationInterceptor();
|
||||
cache.getComponentRegistry().registerComponent(txInvalidationInterceptor, TxInvalidationInterceptor.class);
|
||||
cache.addInterceptor(txInvalidationInterceptor, invalidationPosition);
|
||||
|
||||
// Note that invalidation does *NOT* acquire locks; therefore, we have to start invalidating before
|
||||
// wrapping the entry, since if putFromLoad was invoked between wrap and beginInvalidatingKey, the invalidation
|
||||
// would not commit the entry removal (as during wrap the entry was not in cache)
|
||||
TxPutFromLoadInterceptor txPutFromLoadInterceptor = new TxPutFromLoadInterceptor(validator, cache.getName());
|
||||
cache.getComponentRegistry().registerComponent(txPutFromLoadInterceptor, TxPutFromLoadInterceptor.class);
|
||||
cache.addInterceptor(txPutFromLoadInterceptor, entryWrappingPosition);
|
||||
}
|
||||
else {
|
||||
cache.removeInterceptor(invalidationPosition);
|
||||
NonTxInvalidationInterceptor nonTxInvalidationInterceptor = new NonTxInvalidationInterceptor(validator);
|
||||
cache.getComponentRegistry().registerComponent(nonTxInvalidationInterceptor, NonTxInvalidationInterceptor.class);
|
||||
cache.addInterceptor(nonTxInvalidationInterceptor, invalidationPosition);
|
||||
|
||||
NonTxPutFromLoadInterceptor nonTxPutFromLoadInterceptor = new NonTxPutFromLoadInterceptor(validator, cache.getName());
|
||||
cache.getComponentRegistry().registerComponent(nonTxPutFromLoadInterceptor, NonTxPutFromLoadInterceptor.class);
|
||||
cache.addInterceptor(nonTxPutFromLoadInterceptor, entryWrappingPosition);
|
||||
validator.nonTxPutFromLoadInterceptor = nonTxPutFromLoadInterceptor;
|
||||
}
|
||||
log.debug("New interceptor chain is: " + cache.getInterceptorChain());
|
||||
|
||||
CacheCommandInitializer cacheCommandInitializer = cache.getComponentRegistry().getComponent(CacheCommandInitializer.class);
|
||||
cacheCommandInitializer.addPutFromLoadValidator(cache.getName(), validator);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -217,7 +224,7 @@ public class PutFromLoadValidator {
|
|||
*
|
||||
* @param cache
|
||||
*/
|
||||
public static void removeFromCache(AdvancedCache cache) {
|
||||
public static PutFromLoadValidator removeFromCache(AdvancedCache cache) {
|
||||
cache.removeInterceptor(TxPutFromLoadInterceptor.class);
|
||||
cache.removeInterceptor(NonTxPutFromLoadInterceptor.class);
|
||||
for (Object i : cache.getInterceptorChain()) {
|
||||
|
@ -237,7 +244,7 @@ public class PutFromLoadValidator {
|
|||
}
|
||||
}
|
||||
CacheCommandInitializer cci = cache.getComponentRegistry().getComponent(CacheCommandInitializer.class);
|
||||
cci.removePutFromLoadValidator(cache.getName());
|
||||
return cci.removePutFromLoadValidator(cache.getName());
|
||||
}
|
||||
|
||||
public void setCurrentSession(SharedSessionContractImplementor session) {
|
||||
|
|
|
@ -6,7 +6,6 @@
|
|||
*/
|
||||
package org.hibernate.test.cache.infinispan.collection;
|
||||
|
||||
import java.util.Properties;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
@ -14,7 +13,6 @@ import java.util.concurrent.Executors;
|
|||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
|
||||
import org.hibernate.cache.infinispan.access.AccessDelegate;
|
||||
import org.hibernate.cache.infinispan.access.NonTxInvalidationCacheAccessDelegate;
|
||||
import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
|
||||
|
@ -25,21 +23,18 @@ import org.hibernate.engine.spi.SharedSessionContractImplementor;
|
|||
|
||||
import org.hibernate.test.cache.infinispan.AbstractRegionAccessStrategyTest;
|
||||
import org.hibernate.test.cache.infinispan.NodeEnvironment;
|
||||
import org.hibernate.test.cache.infinispan.util.CacheTestUtil;
|
||||
import org.hibernate.test.cache.infinispan.util.TestingKeyFactory;
|
||||
import org.junit.Test;
|
||||
import junit.framework.AssertionFailedError;
|
||||
|
||||
import org.infinispan.AdvancedCache;
|
||||
import org.infinispan.manager.EmbeddedCacheManager;
|
||||
import org.infinispan.test.CacheManagerCallable;
|
||||
import org.infinispan.test.fwk.TestCacheManagerFactory;
|
||||
|
||||
import static org.infinispan.test.TestingUtil.withCacheManager;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
/**
|
||||
* Base class for tests of CollectionRegionAccessStrategy impls.
|
||||
|
@ -73,79 +68,18 @@ public class CollectionRegionAccessStrategyTest extends
|
|||
|
||||
@Test
|
||||
public void testPutFromLoadRemoveDoesNotProduceStaleData() throws Exception {
|
||||
if (cacheMode.isInvalidation()) {
|
||||
doPutFromLoadRemoveDoesNotProduceStaleDataInvalidation();
|
||||
if (!cacheMode.isInvalidation()) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
public void doPutFromLoadRemoveDoesNotProduceStaleDataInvalidation() {
|
||||
final CountDownLatch pferLatch = new CountDownLatch( 1 );
|
||||
final CountDownLatch removeLatch = new CountDownLatch( 1 );
|
||||
withCacheManager(new CacheManagerCallable(createCacheManager(localRegion.getRegionFactory())) {
|
||||
@Override
|
||||
public void call() {
|
||||
PutFromLoadValidator validator = getPutFromLoadValidator(remoteRegion.getCache(), cm, removeLatch, pferLatch);
|
||||
|
||||
final AccessDelegate delegate = localRegion.getCache().getCacheConfiguration().transaction().transactionMode().isTransactional() ?
|
||||
new TxInvalidationCacheAccessDelegate(localRegion, validator) :
|
||||
new NonTxInvalidationCacheAccessDelegate(localRegion, validator);
|
||||
|
||||
Callable<Void> pferCallable = new Callable<Void>() {
|
||||
public Void call() throws Exception {
|
||||
SharedSessionContractImplementor session = mockedSession();
|
||||
delegate.putFromLoad(session, "k1", "v1", session.getTimestamp(), null );
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
Callable<Void> removeCallable = new Callable<Void>() {
|
||||
public Void call() throws Exception {
|
||||
removeLatch.await();
|
||||
SharedSessionContractImplementor session = mockedSession();
|
||||
withTx(localEnvironment, session, new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
delegate.remove(session, "k1");
|
||||
return null;
|
||||
}
|
||||
});
|
||||
pferLatch.countDown();
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
ExecutorService executorService = Executors.newCachedThreadPool();
|
||||
Future<Void> pferFuture = executorService.submit( pferCallable );
|
||||
Future<Void> removeFuture = executorService.submit( removeCallable );
|
||||
|
||||
try {
|
||||
pferFuture.get();
|
||||
removeFuture.get();
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
assertFalse(localRegion.getCache().containsKey("k1"));
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private static EmbeddedCacheManager createCacheManager(InfinispanRegionFactory regionFactory) {
|
||||
EmbeddedCacheManager cacheManager = TestCacheManagerFactory.createCacheManager(false);
|
||||
return cacheManager;
|
||||
}
|
||||
|
||||
protected PutFromLoadValidator getPutFromLoadValidator(AdvancedCache cache, EmbeddedCacheManager cm,
|
||||
CountDownLatch removeLatch, CountDownLatch pferLatch) {
|
||||
// remove the interceptor inserted by default PutFromLoadValidator, we're using different one
|
||||
PutFromLoadValidator.removeFromCache(cache);
|
||||
InfinispanRegionFactory regionFactory = new InfinispanRegionFactory();
|
||||
regionFactory.setCacheManager(cm);
|
||||
regionFactory.start(CacheTestUtil.sfOptionsForStart(), new Properties());
|
||||
return new PutFromLoadValidator(cache, regionFactory, cm) {
|
||||
@Override
|
||||
public Lock acquirePutFromLoadLock(SharedSessionContractImplementor session, Object key, long txTimestamp) {
|
||||
Lock lock = super.acquirePutFromLoadLock(session, key, txTimestamp);
|
||||
PutFromLoadValidator originalValidator = PutFromLoadValidator.removeFromCache(localRegion.getCache());
|
||||
PutFromLoadValidator mockValidator = spy(originalValidator);
|
||||
doAnswer(invocation -> {
|
||||
try {
|
||||
return invocation.callRealMethod();
|
||||
} finally {
|
||||
try {
|
||||
removeLatch.countDown();
|
||||
// the remove should be blocked because the putFromLoad has been acquired
|
||||
|
@ -160,9 +94,44 @@ public class CollectionRegionAccessStrategyTest extends
|
|||
log.error( "Error", e );
|
||||
throw new RuntimeException( "Error", e );
|
||||
}
|
||||
return lock;
|
||||
}
|
||||
};
|
||||
}).when(mockValidator).acquirePutFromLoadLock(any(), any(), anyLong());
|
||||
PutFromLoadValidator.addToCache(localRegion.getCache(), mockValidator);
|
||||
|
||||
try {
|
||||
final AccessDelegate delegate = localRegion.getCache().getCacheConfiguration().transaction().transactionMode().isTransactional() ?
|
||||
new TxInvalidationCacheAccessDelegate(localRegion, mockValidator) :
|
||||
new NonTxInvalidationCacheAccessDelegate(localRegion, mockValidator);
|
||||
|
||||
ExecutorService executorService = Executors.newCachedThreadPool();
|
||||
|
||||
final String KEY = "k1";
|
||||
Future<Void> pferFuture = executorService.submit(() -> {
|
||||
SharedSessionContractImplementor session = mockedSession();
|
||||
delegate.putFromLoad(session, KEY, "v1", session.getTimestamp(), null);
|
||||
return null;
|
||||
});
|
||||
|
||||
Future<Void> removeFuture = executorService.submit(() -> {
|
||||
removeLatch.await();
|
||||
SharedSessionContractImplementor session = mockedSession();
|
||||
withTx(localEnvironment, session, () -> {
|
||||
delegate.remove(session, KEY);
|
||||
return null;
|
||||
});
|
||||
pferLatch.countDown();
|
||||
return null;
|
||||
});
|
||||
|
||||
pferFuture.get();
|
||||
removeFuture.get();
|
||||
|
||||
assertFalse(localRegion.getCache().containsKey(KEY));
|
||||
assertFalse(remoteRegion.getCache().containsKey(KEY));
|
||||
} finally {
|
||||
PutFromLoadValidator.removeFromCache(localRegion.getCache());
|
||||
PutFromLoadValidator.addToCache(localRegion.getCache(), originalValidator);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue