HHH-11339 Use UnorderedDistributionInterceptor for async replication

This commit is contained in:
Radim Vansa 2016-12-16 11:36:34 +01:00
parent 160f5ba863
commit f59807554a
9 changed files with 255 additions and 100 deletions

View File

@ -27,14 +27,13 @@ public class FutureUpdateSynchronization extends InvocationAfterCompletion {
private final Object value;
private final BaseTransactionalDataRegion region;
private final long sessionTimestamp;
private final AdvancedCache localCache;
private final AdvancedCache asyncCache;
private final AdvancedCache cache;
public FutureUpdateSynchronization(TransactionCoordinator tc, AdvancedCache cache, boolean requiresTransaction,
Object key, Object value, BaseTransactionalDataRegion region, long sessionTimestamp) {
public FutureUpdateSynchronization(TransactionCoordinator tc, AdvancedCache localCache, AdvancedCache asyncCache,
boolean requiresTransaction, Object key, Object value, BaseTransactionalDataRegion region, long sessionTimestamp) {
super(tc, requiresTransaction);
this.localCache = localCache;
this.asyncCache = asyncCache;
this.cache = cache;
this.key = key;
this.value = value;
this.region = region;
@ -57,13 +56,7 @@ public class FutureUpdateSynchronization extends InvocationAfterCompletion {
FutureUpdate futureUpdate = new FutureUpdate(uuid, region.nextTimestamp(), success ? this.value : null);
for (;;) {
try {
// Similar to putFromLoad, we have to update this node synchronously because after transaction
// is committed it is expected that we'll retrieve cached instance until next invalidation,
// but the replication this node -> primary -> this node can take a while
// We need to first execute the async update and then local one, because if we're on the primary
// owner the local future update, would fail the async one.
asyncCache.put(key, futureUpdate);
localCache.put(key, futureUpdate);
cache.put(key, futureUpdate);
return;
}
catch (Exception e) {

View File

@ -7,10 +7,11 @@
package org.hibernate.cache.infinispan.access;
import org.infinispan.commands.write.DataWriteCommand;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.interceptors.locking.NonTransactionalLockingInterceptor;
import org.infinispan.util.concurrent.locks.LockUtil;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
/**
* With regular {@link org.infinispan.interceptors.locking.NonTransactionalLockingInterceptor},
@ -21,39 +22,33 @@ import org.infinispan.util.concurrent.locks.LockUtil;
* Similar issue threatens consistency when the command has {@link org.infinispan.context.Flag#CACHE_MODE_LOCAL}
* - these commands don't acquire locks either.
*
* Therefore, this interceptor locks the entry in all situations but when it is sending message to primary owner
* (locking then could lead to deadlocks).
* Therefore, this interceptor locks the entry all the time. {@link UnorderedDistributionInterceptor} does not forward
* the message from non-origin to any other node, and the distribution interceptor won't block on RPC but will return
* {@link CompletableFuture} and we'll wait for it here.
*/
public class LockingInterceptor extends NonTransactionalLockingInterceptor {
@Override
protected Object visitDataWriteCommand(InvocationContext ctx, DataWriteCommand command) throws Throwable {
Object returnValue = null;
try {
// Clear any metadata; we'll set them as appropriate in TombstoneCallInterceptor
command.setMetadata(null);
boolean shouldLock;
if (hasSkipLocking(command)) {
shouldLock = false;
}
else if (command.hasFlag(Flag.CACHE_MODE_LOCAL)) {
shouldLock = true;
}
else if (!ctx.isOriginLocal()) {
shouldLock = true;
}
else if (LockUtil.getLockOwnership(command.getKey(), cdl) == LockUtil.LockOwnership.PRIMARY) {
shouldLock = true;
}
else {
shouldLock = false;
}
if (shouldLock) {
lockAndRecord(ctx, command.getKey(), getLockTimeoutMillis(command));
}
return invokeNextInterceptor(ctx, command);
lockAndRecord(ctx, command.getKey(), getLockTimeoutMillis(command));
returnValue = invokeNextInterceptor(ctx, command);
return returnValue;
}
finally {
lockManager.unlockAll(ctx);
if (returnValue instanceof CompletableFuture) {
try {
((CompletableFuture) returnValue).join();
}
catch (CompletionException e) {
throw e.getCause();
}
}
}
}
}

View File

@ -37,8 +37,7 @@ public class NonStrictAccessDelegate implements AccessDelegate {
private final BaseTransactionalDataRegion region;
private final AdvancedCache cache;
private final AdvancedCache writeCache;
private final AdvancedCache putFromLoadCacheLocal;
private final AdvancedCache putFromLoadCacheAsync;
private final AdvancedCache putFromLoadCache;
private final Comparator versionComparator;
@ -47,8 +46,7 @@ public class NonStrictAccessDelegate implements AccessDelegate {
this.cache = region.getCache();
this.writeCache = Caches.ignoreReturnValuesCache(cache);
// Note that correct behaviour of local and async writes depends on LockingInterceptor (see there for details)
this.putFromLoadCacheLocal = writeCache.withFlags( Flag.ZERO_LOCK_ACQUISITION_TIMEOUT, Flag.FAIL_SILENTLY, Flag.CACHE_MODE_LOCAL );
this.putFromLoadCacheAsync = writeCache.withFlags( Flag.ZERO_LOCK_ACQUISITION_TIMEOUT, Flag.FAIL_SILENTLY, Flag.FORCE_ASYNCHRONOUS );
this.putFromLoadCache = writeCache.withFlags( Flag.ZERO_LOCK_ACQUISITION_TIMEOUT, Flag.FAIL_SILENTLY, Flag.FORCE_ASYNCHRONOUS );
Configuration configuration = cache.getCacheConfiguration();
if (configuration.clustering().cacheMode().isInvalidation()) {
throw new IllegalArgumentException("Nonstrict-read-write mode cannot use invalidation.");
@ -116,8 +114,7 @@ public class NonStrictAccessDelegate implements AccessDelegate {
}
// Apply the update locally first - if we're the backup owner, async propagation wouldn't change the value
// for the subsequent operation soon enough as it goes through primary owner
putFromLoadCacheAsync.put(key, value);
putFromLoadCacheLocal.put(key, value);
putFromLoadCache.put(key, value);
return true;
}

View File

@ -6,8 +6,6 @@
*/
package org.hibernate.cache.infinispan.access;
import java.util.concurrent.TimeUnit;
import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion;
import org.hibernate.cache.infinispan.util.Caches;
@ -32,10 +30,8 @@ public class TombstoneAccessDelegate implements AccessDelegate {
protected final BaseTransactionalDataRegion region;
protected final AdvancedCache cache;
protected final AdvancedCache writeCache;
protected final AdvancedCache localWriteCache;
protected final AdvancedCache asyncWriteCache;
protected final AdvancedCache putFromLoadCacheLocal;
protected final AdvancedCache putFromLoadCacheAsync;
protected final AdvancedCache putFromLoadCache;
protected final boolean requiresTransaction;
public TombstoneAccessDelegate(BaseTransactionalDataRegion region) {
@ -43,10 +39,8 @@ public class TombstoneAccessDelegate implements AccessDelegate {
this.cache = region.getCache();
this.writeCache = Caches.ignoreReturnValuesCache(cache);
// Note that correct behaviour of local and async writes depends on LockingInterceptor (see there for details)
this.localWriteCache = Caches.localCache(writeCache);
this.asyncWriteCache = Caches.asyncWriteCache(writeCache, Flag.IGNORE_RETURN_VALUES);
this.putFromLoadCacheLocal = localWriteCache.withFlags( Flag.ZERO_LOCK_ACQUISITION_TIMEOUT, Flag.FAIL_SILENTLY );
this.putFromLoadCacheAsync = asyncWriteCache.withFlags( Flag.ZERO_LOCK_ACQUISITION_TIMEOUT, Flag.FAIL_SILENTLY );
this.asyncWriteCache = writeCache.withFlags(Flag.FORCE_ASYNCHRONOUS);
this.putFromLoadCache = asyncWriteCache.withFlags(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT, Flag.FAIL_SILENTLY);
Configuration configuration = cache.getCacheConfiguration();
if (configuration.clustering().cacheMode().isInvalidation()) {
throw new IllegalArgumentException("For tombstone-based caching, invalidation cache is not allowed.");
@ -104,11 +98,7 @@ public class TombstoneAccessDelegate implements AccessDelegate {
}
// we can't use putForExternalRead since the PFER flag means that entry is not wrapped into context
// when it is present in the container. TombstoneCallInterceptor will deal with this.
TombstoneUpdate update = new TombstoneUpdate(session.getTimestamp(), value);
// If we're the backup owner, async propagation wouldn't change the value soon enough as it goes
// through primary owner - therefore we'll synchronously update it locally.
putFromLoadCacheAsync.put(key, update);
putFromLoadCacheLocal.put(key, update);
putFromLoadCache.put(key, new TombstoneUpdate(session.getTimestamp(), value));
return true;
}
@ -131,7 +121,7 @@ public class TombstoneAccessDelegate implements AccessDelegate {
protected void write(SharedSessionContractImplementor session, Object key, Object value) {
TransactionCoordinator tc = session.getTransactionCoordinator();
FutureUpdateSynchronization sync = new FutureUpdateSynchronization(tc, localWriteCache, asyncWriteCache, requiresTransaction, key, value, region, session.getTimestamp());
FutureUpdateSynchronization sync = new FutureUpdateSynchronization(tc, asyncWriteCache, requiresTransaction, key, value, region, session.getTimestamp());
// The update will be invalidating all putFromLoads for the duration of expiration or until removed by the synchronization
Tombstone tombstone = new Tombstone(sync.getUuid(), region.nextTimestamp() + region.getTombstoneExpiration());
// The outcome of this operation is actually defined in TombstoneCallInterceptor

View File

@ -0,0 +1,90 @@
/*
* Hibernate, Relational Persistence for Idiomatic Java
*
* License: GNU Lesser General Public License (LGPL), version 2.1 or later.
* See the lgpl.txt file in the root directory or <http://www.gnu.org/licenses/lgpl-2.1.html>.
*/
package org.hibernate.cache.infinispan.access;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.distribution.NonTxDistributionInterceptor;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.OutdatedTopologyException;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import java.util.List;
/**
* Since the data handled in {@link TombstoneCallInterceptor} or {@link VersionedCallInterceptor}
* does not rely on the order how these are applied (the updates are commutative), this interceptor
* simply sends any command to all other owners without ordering them through primary owner.
* Note that {@link LockingInterceptor} is required in the stack as locking on backup is not guaranteed
* by primary owner.
*/
public class UnorderedDistributionInterceptor extends NonTxDistributionInterceptor {
private static Log log = LogFactory.getLog(UnorderedDistributionInterceptor.class);
private static final boolean trace = log.isTraceEnabled();
private DistributionManager distributionManager;
private RpcOptions syncRpcOptions, asyncRpcOptions;
@Inject
public void inject(DistributionManager distributionManager) {
this.distributionManager = distributionManager;
}
@Start
public void start() {
syncRpcOptions = rpcManager.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, DeliverOrder.NONE).build();
// We don't have to guarantee ordering even for asynchronous messages
asyncRpcOptions = rpcManager.getRpcOptionsBuilder(ResponseMode.ASYNCHRONOUS, DeliverOrder.NONE).build();
}
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
if (command.hasFlag(Flag.CACHE_MODE_LOCAL)) {
// for state-transfer related writes
return invokeNextInterceptor(ctx, command);
}
int commandTopologyId = command.getTopologyId();
int currentTopologyId = stateTransferManager.getCacheTopology().getTopologyId();
if (commandTopologyId != -1 && currentTopologyId != commandTopologyId) {
throw new OutdatedTopologyException("Cache topology changed while the command was executing: expected " +
commandTopologyId + ", got " + currentTopologyId);
}
ConsistentHash writeCH = distributionManager.getWriteConsistentHash();
List<Address> owners = null;
if (writeCH.isReplicated()) {
// local result is always ignored
invokeNextInterceptor(ctx, command);
}
else {
owners = writeCH.locateOwners(command.getKey());
if (owners.contains(rpcManager.getAddress())) {
invokeNextInterceptor(ctx, command);
}
else {
log.tracef("Not invoking %s on %s since it is not an owner", command, rpcManager.getAddress());
}
}
if (ctx.isOriginLocal() && command.isSuccessful()) {
// This is called with the entry locked. In order to avoid deadlocks we must not wait for RPC while
// holding the lock, therefore we'll return a future and wait for it in LockingInterceptor after
// unlocking (and committing) the entry.
return rpcManager.invokeRemotelyAsync(owners, command, isSynchronous(command) ? syncRpcOptions : asyncRpcOptions);
}
return null;
}
}

View File

@ -15,6 +15,7 @@ import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
import org.hibernate.cache.infinispan.access.TombstoneAccessDelegate;
import org.hibernate.cache.infinispan.access.TombstoneCallInterceptor;
import org.hibernate.cache.infinispan.access.TxInvalidationCacheAccessDelegate;
import org.hibernate.cache.infinispan.access.UnorderedDistributionInterceptor;
import org.hibernate.cache.infinispan.access.VersionedCallInterceptor;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.infinispan.util.FutureUpdate;
@ -33,11 +34,14 @@ import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.filter.KeyValueFilter;
import org.infinispan.interceptors.CallInterceptor;
import org.infinispan.interceptors.EntryWrappingInterceptor;
import org.infinispan.interceptors.base.CommandInterceptor;
import org.infinispan.interceptors.distribution.NonTxDistributionInterceptor;
import org.infinispan.interceptors.locking.NonTransactionalLockingInterceptor;
import javax.transaction.TransactionManager;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -151,7 +155,7 @@ public abstract class BaseTransactionalDataRegion
return;
}
replaceLockingInterceptor();
replaceCommonInterceptors();
cache.removeInterceptor(CallInterceptor.class);
VersionedCallInterceptor tombstoneCallInterceptor = new VersionedCallInterceptor(this, metadata.getVersionComparator());
@ -172,7 +176,7 @@ public abstract class BaseTransactionalDataRegion
log.evictionWithTombstones();
}
replaceLockingInterceptor();
replaceCommonInterceptors();
cache.removeInterceptor(CallInterceptor.class);
TombstoneCallInterceptor tombstoneCallInterceptor = new TombstoneCallInterceptor(this);
@ -183,13 +187,35 @@ public abstract class BaseTransactionalDataRegion
strategy = Strategy.TOMBSTONES;
}
private void replaceLockingInterceptor() {
private void replaceCommonInterceptors() {
CacheMode cacheMode = cache.getCacheConfiguration().clustering().cacheMode();
if (!cacheMode.isReplicated() && !cacheMode.isDistributed()) {
return;
}
LockingInterceptor lockingInterceptor = new LockingInterceptor();
cache.getComponentRegistry().registerComponent(lockingInterceptor, LockingInterceptor.class);
if (!cache.addInterceptorBefore(lockingInterceptor, NonTransactionalLockingInterceptor.class)) {
throw new IllegalStateException("Misconfigured cache, interceptor chain is " + cache.getInterceptorChain());
}
cache.removeInterceptor(NonTransactionalLockingInterceptor.class);
UnorderedDistributionInterceptor distributionInterceptor = new UnorderedDistributionInterceptor();
cache.getComponentRegistry().registerComponent(distributionInterceptor, UnorderedDistributionInterceptor.class);
if (!cache.addInterceptorBefore(distributionInterceptor, NonTxDistributionInterceptor.class)) {
throw new IllegalStateException("Misconfigured cache, interceptor chain is " + cache.getInterceptorChain());
}
cache.removeInterceptor(NonTxDistributionInterceptor.class);
EntryWrappingInterceptor ewi = cache.getComponentRegistry().getComponent(EntryWrappingInterceptor.class);
try {
Field isUsingLockDelegation = EntryWrappingInterceptor.class.getDeclaredField("isUsingLockDelegation");
isUsingLockDelegation.setAccessible(true);
isUsingLockDelegation.set(ewi, false);
}
catch (NoSuchFieldException | IllegalAccessException e) {
throw new IllegalStateException(e);
}
}
public long getTombstoneExpiration() {

View File

@ -247,10 +247,10 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
protected CountDownLatch setupExpectPutWithValue(Predicate<Object> valuePredicate) {
if (!isUsingInvalidation() && accessType != AccessType.NONSTRICT_READ_WRITE) {
CountDownLatch latch = new CountDownLatch(1);
remoteRegion.getCache().addInterceptor(new ExpectingInterceptor(latch,
cmd -> cmd instanceof PutKeyValueCommand && valuePredicate.test(((PutKeyValueCommand) cmd).getValue()),
null), 0);
cleanup.add(() -> remoteRegion.getCache().removeInterceptor(ExpectingInterceptor.class));
ExpectingInterceptor.get(remoteRegion.getCache())
.when((ctx, cmd) -> cmd instanceof PutKeyValueCommand && valuePredicate.test(((PutKeyValueCommand) cmd).getValue()))
.countDown(latch);
cleanup.add(() -> ExpectingInterceptor.cleanup(remoteRegion.getCache()));
return latch;
} else {
return new CountDownLatch(0);
@ -373,8 +373,8 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
assertEquals(0, localRegion.getCache().size());
assertEquals(0, remoteRegion.getCache().size());
CountDownLatch localPutFromLoadLatch = expectRemotePutFromLoad(localRegion.getCache());
CountDownLatch remotePutFromLoadLatch = expectRemotePutFromLoad(remoteRegion.getCache());
CountDownLatch localPutFromLoadLatch = expectRemotePutFromLoad(remoteRegion.getCache(), localRegion.getCache());
CountDownLatch remotePutFromLoadLatch = expectRemotePutFromLoad(localRegion.getCache(), remoteRegion.getCache());
SharedSessionContractImplementor s1 = mockedSession();
assertNull("local is clean", localAccessStrategy.get(s1, KEY, s1.getTimestamp()));
@ -391,8 +391,6 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
// before the update is fully applied.
assertTrue(localPutFromLoadLatch.await(1, TimeUnit.SECONDS));
assertTrue(remotePutFromLoadLatch.await(1, TimeUnit.SECONDS));
localRegion.getCache().removeInterceptor(ExpectingInterceptor.class);
remoteRegion.getCache().removeInterceptor(ExpectingInterceptor.class);
SharedSessionContractImplementor s4 = mockedSession();
assertEquals(VALUE1, localAccessStrategy.get(s4, KEY, s4.getTimestamp()));
@ -465,8 +463,8 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
SharedSessionContractImplementor s2 = mockedSession();
assertNull("remote is clean", remoteAccessStrategy.get(s2, KEY, s2.getTimestamp()));
CountDownLatch localPutFromLoadLatch = expectRemotePutFromLoad(localRegion.getCache());
CountDownLatch remotePutFromLoadLatch = expectRemotePutFromLoad(remoteRegion.getCache());
CountDownLatch localPutFromLoadLatch = expectRemotePutFromLoad(remoteRegion.getCache(), localRegion.getCache());
CountDownLatch remotePutFromLoadLatch = expectRemotePutFromLoad(localRegion.getCache(), remoteRegion.getCache());
SharedSessionContractImplementor s3 = mockedSession();
localAccessStrategy.putFromLoad(s3, KEY, VALUE1, s3.getTimestamp(), 1);
@ -478,8 +476,6 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
// before the update is fully applied.
assertTrue(localPutFromLoadLatch.await(1, TimeUnit.SECONDS));
assertTrue(remotePutFromLoadLatch.await(1, TimeUnit.SECONDS));
localRegion.getCache().removeInterceptor(ExpectingInterceptor.class);
remoteRegion.getCache().removeInterceptor(ExpectingInterceptor.class);
SharedSessionContractImplementor s4 = mockedSession();
SharedSessionContractImplementor s6 = mockedSession();
@ -506,8 +502,10 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
PutFromLoadValidator.addToCache(remoteRegion.getCache(), originalValidator);
});
} else {
ExpectingInterceptor ei = new ExpectingInterceptor(endInvalidationLatch, InvalidateCommand.class, null);
remoteRegion.getCache().addInterceptor(ei, 0);
ExpectingInterceptor.get(remoteRegion.getCache())
.when((ctx, cmd) -> cmd instanceof InvalidateCommand)
.countDown(endInvalidationLatch);
cleanup.add(() -> ExpectingInterceptor.cleanup(remoteRegion.getCache()));
}
} else {
endInvalidationLatch = new CountDownLatch(0);
@ -536,9 +534,8 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
// Wait for async propagation of EndInvalidationCommand before executing naked put
assertTrue(endInvalidationLatch.await(1, TimeUnit.SECONDS));
remoteRegion.getCache().removeInterceptor(ExpectingInterceptor.class);
CountDownLatch lastPutFromLoadLatch = expectRemotePutFromLoad(localRegion.getCache());
CountDownLatch lastPutFromLoadLatch = expectRemotePutFromLoad(remoteRegion.getCache(), localRegion.getCache());
// Test whether the get above messes up the optimistic version
SharedSessionContractImplementor s9 = mockedSession();
@ -548,7 +545,6 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
assertEquals(1, remoteRegion.getCache().size());
assertTrue(lastPutFromLoadLatch.await(1, TimeUnit.SECONDS));
localRegion.getCache().removeInterceptor(ExpectingInterceptor.class);
SharedSessionContractImplementor s11 = mockedSession();
assertEquals((isUsingInvalidation() ? null : VALUE1), localAccessStrategy.get(s11, KEY, s11.getTimestamp()));
@ -556,12 +552,25 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
assertEquals(VALUE1, remoteAccessStrategy.get(s12, KEY, s12.getTimestamp()));
}
private CountDownLatch expectRemotePutFromLoad(AdvancedCache cache) {
private CountDownLatch expectRemotePutFromLoad(AdvancedCache localCache, AdvancedCache remoteCache) {
CountDownLatch putFromLoadLatch;
if (!isUsingInvalidation()) {
putFromLoadLatch = new CountDownLatch(1);
cache.addInterceptor(new ExpectingInterceptor(putFromLoadLatch,
(ctx, cmd) -> !ctx.isOriginLocal() && cmd instanceof PutKeyValueCommand, null), 0);
// The command may fail to replicate if it can't acquire lock locally
ExpectingInterceptor.Condition remoteCondition = ExpectingInterceptor.get(remoteCache)
.when((ctx, cmd) -> !ctx.isOriginLocal() && cmd instanceof PutKeyValueCommand);
ExpectingInterceptor.Condition localCondition = ExpectingInterceptor.get(localCache)
.whenFails((ctx, cmd) -> ctx.isOriginLocal() && cmd instanceof PutKeyValueCommand);
remoteCondition.run(() -> {
localCondition.cancel();
putFromLoadLatch.countDown();
});
localCondition.run(() -> {
remoteCondition.cancel();
putFromLoadLatch.countDown();
});
// just for case the test fails and does not remove the interceptor itself
cleanup.add(() -> ExpectingInterceptor.cleanup(localCache, remoteCache));
} else {
putFromLoadLatch = new CountDownLatch(0);
}

View File

@ -181,16 +181,20 @@ public class TombstoneTest extends AbstractNonInvalidationTest {
public void testUpdateEvictExpiration() throws Exception {
CyclicBarrier loadBarrier = new CyclicBarrier(2);
CountDownLatch preEvictLatch = new CountDownLatch(1);
CountDownLatch postEvictLatch = new CountDownLatch(1);
CountDownLatch flushLatch = new CountDownLatch(1);
CountDownLatch commitLatch = new CountDownLatch(1);
Future<Boolean> first = updateFlushWait(itemId, loadBarrier, null, flushLatch, commitLatch);
Future<Boolean> second = evictWait(itemId, loadBarrier, preEvictLatch, null);
Future<Boolean> second = evictWait(itemId, loadBarrier, preEvictLatch, postEvictLatch);
awaitOrThrow(flushLatch);
assertTombstone(1);
preEvictLatch.countDown();
awaitOrThrow(postEvictLatch);
assertTombstone(1);
commitLatch.countDown();
first.get(WAIT_TIMEOUT, TimeUnit.SECONDS);
second.get(WAIT_TIMEOUT, TimeUnit.SECONDS);

View File

@ -1,47 +1,98 @@
package org.hibernate.test.cache.infinispan.util;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.context.InvocationContext;
import org.infinispan.interceptors.InvocationContextInterceptor;
import org.infinispan.interceptors.base.BaseCustomInterceptor;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiPredicate;
import java.util.function.Predicate;
public class ExpectingInterceptor extends BaseCustomInterceptor {
private final static Log log = LogFactory.getLog(ExpectingInterceptor.class);
private final CountDownLatch latch;
private final BiPredicate<InvocationContext, VisitableCommand> predicate;
private final AtomicBoolean enabled;
private final List<Condition> conditions = new LinkedList<>();
public ExpectingInterceptor(CountDownLatch latch, Class<? extends VisitableCommand> commandClazz, AtomicBoolean enabled) {
this(latch, cmd -> commandClazz.isInstance(cmd), enabled);
public static ExpectingInterceptor get(AdvancedCache cache) {
Optional<ExpectingInterceptor> self = cache.getInterceptorChain().stream().filter(ExpectingInterceptor.class::isInstance).findFirst();
if (self.isPresent()) {
return self.get();
}
ExpectingInterceptor ei = new ExpectingInterceptor();
// We are adding this after ICI because we want to handle silent failures, too
cache.addInterceptorAfter(ei, InvocationContextInterceptor.class);
return ei;
}
public ExpectingInterceptor(CountDownLatch latch, Predicate<VisitableCommand> predicate, AtomicBoolean enabled) {
this(latch, (ctx, cmd) -> predicate.test(cmd), enabled);
public static void cleanup(AdvancedCache... caches) {
for (AdvancedCache c : caches) c.removeInterceptor(ExpectingInterceptor.class);
}
public ExpectingInterceptor(CountDownLatch latch, BiPredicate<InvocationContext, VisitableCommand> predicate, AtomicBoolean enabled) {
this.latch = latch;
this.predicate = predicate;
this.enabled = enabled;
public synchronized Condition when(BiPredicate<InvocationContext, VisitableCommand> predicate) {
Condition condition = new Condition(predicate, null);
conditions.add(condition);
return condition;
}
public synchronized Condition whenFails(BiPredicate<InvocationContext, VisitableCommand> predicate) {
Condition condition = new Condition(predicate, Boolean.FALSE);
conditions.add(condition);
return condition;
}
@Override
protected Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
boolean succeeded = false;
try {
log.tracef("Before command %s", command);
return super.handleDefault(ctx, command);
Object retval = super.handleDefault(ctx, command);
succeeded = true;
return retval;
} finally {
log.tracef("After command %s, enabled? %s", command, enabled);
if ((enabled == null || enabled.get()) && predicate.test(ctx, command)) {
latch.countDown();
log.trace("Decremented the latch");
log.tracef("After command %s", command);
synchronized (this) {
for (Iterator<Condition> iterator = conditions.iterator(); iterator.hasNext(); ) {
Condition condition = iterator.next();
if ((condition.success == null || condition.success == succeeded) && condition.predicate.test(ctx, command)) {
assert condition.action != null;
condition.action.run();
iterator.remove();
}
}
}
}
}
public class Condition {
private final BiPredicate<InvocationContext, VisitableCommand> predicate;
private final Boolean success;
private Runnable action;
public Condition(BiPredicate<InvocationContext, VisitableCommand> predicate, Boolean success) {
this.predicate = predicate;
this.success = success;
}
public void run(Runnable action) {
assert this.action == null;
this.action = action;
}
public void countDown(CountDownLatch latch) {
assert action == null;
action = () -> latch.countDown();
}
public void cancel() {
synchronized (ExpectingInterceptor.class) {
conditions.remove(this);
}
}
}