HHH-11350 Intermittent failure in testEvictAll/testRemoveAll

(cherry picked from commit 2a4efd46ca)
This commit is contained in:
Radim Vansa 2016-12-21 16:04:35 +01:00 committed by Gail Badner
parent f2f4c1a954
commit 8ca291fe20
2 changed files with 35 additions and 19 deletions

View File

@ -6,6 +6,10 @@
*/ */
package org.hibernate.cache.infinispan.access; package org.hibernate.cache.infinispan.access;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.hibernate.cache.infinispan.util.CacheCommandInitializer; import org.hibernate.cache.infinispan.util.CacheCommandInitializer;
import org.hibernate.cache.infinispan.util.EndInvalidationCommand; import org.hibernate.cache.infinispan.util.EndInvalidationCommand;
import org.hibernate.cache.infinispan.util.InfinispanMessageLogger; import org.hibernate.cache.infinispan.util.InfinispanMessageLogger;
@ -31,9 +35,6 @@ import org.infinispan.remoting.transport.Address;
import org.infinispan.statetransfer.StateTransferManager; import org.infinispan.statetransfer.StateTransferManager;
import org.infinispan.transaction.xa.GlobalTransaction; import org.infinispan.transaction.xa.GlobalTransaction;
import java.util.Set;
import java.util.List;
/** /**
* Intercepts transactions in Infinispan, calling {@link PutFromLoadValidator#beginInvalidatingKey(Object, Object)} * Intercepts transactions in Infinispan, calling {@link PutFromLoadValidator#beginInvalidatingKey(Object, Object)}
* before locks are acquired (and the entry is invalidated) and sends {@link EndInvalidationCommand} to release * before locks are acquired (and the entry is invalidated) and sends {@link EndInvalidationCommand} to release
@ -145,26 +146,29 @@ class TxPutFromLoadInterceptor extends BaseRpcInterceptor {
return endInvalidationAndInvokeNextInterceptor(ctx, command); return endInvalidationAndInvokeNextInterceptor(ctx, command);
} }
protected Object endInvalidationAndInvokeNextInterceptor(TxInvocationContext ctx, VisitableCommand command) throws Throwable { protected Object endInvalidationAndInvokeNextInterceptor(TxInvocationContext<?> ctx, VisitableCommand command) throws Throwable {
try { try {
if (ctx.isOriginLocal()) { if (ctx.isOriginLocal()) {
// send async Commit // We cannot use directly ctx.getAffectedKeys() and that includes keys from local-only operations.
Set<Object> affectedKeys = ctx.getAffectedKeys(); // During evictAll inside transaction this would cause unnecessary invalidate command
if (!ctx.getModifications().isEmpty()) {
Object[] keys = ctx.getModifications().stream()
.flatMap(mod -> mod.getAffectedKeys().stream()).distinct().toArray();
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
log.tracef( "Sending end invalidation for keys %s asynchronously, modifications are %s", affectedKeys, ctx.getCacheTransaction().getModifications()); log.tracef( "Sending end invalidation for keys %s asynchronously, modifications are %s",
} Arrays.toString(keys), ctx.getCacheTransaction().getModifications());
}
if (!affectedKeys.isEmpty()) {
GlobalTransaction globalTransaction = ctx.getGlobalTransaction(); GlobalTransaction globalTransaction = ctx.getGlobalTransaction();
EndInvalidationCommand commitCommand = cacheCommandInitializer.buildEndInvalidationCommand( EndInvalidationCommand commitCommand = cacheCommandInitializer.buildEndInvalidationCommand(
cacheName, affectedKeys.toArray(), globalTransaction); cacheName, keys, globalTransaction);
List<Address> members = stateTransferManager.getCacheTopology().getMembers(); List<Address> members = stateTransferManager.getCacheTopology().getMembers();
rpcManager.invokeRemotely(members, commitCommand, asyncUnordered); rpcManager.invokeRemotely(members, commitCommand, asyncUnordered);
// If the transaction is not successful, *RegionAccessStrategy would not be called, therefore // If the transaction is not successful, *RegionAccessStrategy would not be called, therefore
// we have to end invalidation from here manually (in successful case as well) // we have to end invalidation from here manually (in successful case as well)
for (Object key : affectedKeys) { for (Object key : keys) {
putFromLoadValidator.endInvalidatingKey(globalTransaction, key); putFromLoadValidator.endInvalidatingKey(globalTransaction, key);
} }
} }

View File

@ -45,12 +45,14 @@ import org.hibernate.test.cache.infinispan.util.BatchModeTransactionCoordinator;
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup; import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
import org.hibernate.test.cache.infinispan.util.ExpectingInterceptor; import org.hibernate.test.cache.infinispan.util.ExpectingInterceptor;
import org.hibernate.test.cache.infinispan.util.JdbcResourceTransactionMock; import org.hibernate.test.cache.infinispan.util.JdbcResourceTransactionMock;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
import org.hibernate.test.cache.infinispan.util.TestSynchronization; import org.hibernate.test.cache.infinispan.util.TestSynchronization;
import org.infinispan.Cache; import org.infinispan.Cache;
import org.infinispan.test.TestingUtil; import org.infinispan.test.TestingUtil;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import org.infinispan.AdvancedCache; import org.hibernate.test.cache.infinispan.util.TestTimeService;
import org.infinispan.commands.write.InvalidateCommand; import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.write.PutKeyValueCommand; import org.infinispan.commands.write.PutKeyValueCommand;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -83,6 +85,8 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
public static final CacheDataDescription CACHE_DATA_DESCRIPTION public static final CacheDataDescription CACHE_DATA_DESCRIPTION
= new CacheDataDescriptionImpl(true, true, ComparableComparator.INSTANCE, null); = new CacheDataDescriptionImpl(true, true, ComparableComparator.INSTANCE, null);
protected static final TestTimeService TIME_SERVICE = new TestTimeService();
protected NodeEnvironment localEnvironment; protected NodeEnvironment localEnvironment;
protected R localRegion; protected R localRegion;
protected S localAccessStrategy; protected S localAccessStrategy;
@ -132,6 +136,13 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
waitForClusterToForm(localRegion.getCache(), remoteRegion.getCache()); waitForClusterToForm(localRegion.getCache(), remoteRegion.getCache());
} }
@Override
protected StandardServiceRegistryBuilder createStandardServiceRegistryBuilder() {
StandardServiceRegistryBuilder ssrb = super.createStandardServiceRegistryBuilder();
ssrb.applySetting(TestInfinispanRegionFactory.TIME_SERVICE, TIME_SERVICE);
return ssrb;
}
/** /**
* Simulate 2 nodes, both start, tx do a get, experience a cache miss, then * Simulate 2 nodes, both start, tx do a get, experience a cache miss, then
* 'read from db.' First does a putFromLoad, then an update (or removal if it is a collection). * 'read from db.' First does a putFromLoad, then an update (or removal if it is a collection).
@ -269,7 +280,7 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
protected SessionImplementor mockedSession() { protected SessionImplementor mockedSession() {
SessionMock session = mock(SessionMock.class); SessionMock session = mock(SessionMock.class);
when(session.isClosed()).thenReturn(false); when(session.isClosed()).thenReturn(false);
when(session.getTimestamp()).thenReturn(System.currentTimeMillis()); when(session.getTimestamp()).thenReturn(TIME_SERVICE.wallClockTime());
if (jtaPlatform == BatchModeJtaPlatform.class) { if (jtaPlatform == BatchModeJtaPlatform.class) {
BatchModeTransactionCoordinator txCoord = new BatchModeTransactionCoordinator(); BatchModeTransactionCoordinator txCoord = new BatchModeTransactionCoordinator();
when(session.getTransactionCoordinator()).thenReturn(txCoord); when(session.getTransactionCoordinator()).thenReturn(txCoord);
@ -489,8 +500,11 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
assertEquals(PutFromLoadValidator.class, originalValidator.getClass()); assertEquals(PutFromLoadValidator.class, originalValidator.getClass());
PutFromLoadValidator mockValidator = spy(originalValidator); PutFromLoadValidator mockValidator = spy(originalValidator);
doAnswer(invocation -> { doAnswer(invocation -> {
endInvalidationLatch.countDown(); try {
return invocation.callRealMethod(); return invocation.callRealMethod();
} finally {
endInvalidationLatch.countDown();
}
}).when(mockValidator).endInvalidatingKey(any(), any()); }).when(mockValidator).endInvalidatingKey(any(), any());
PutFromLoadValidator.addToCache(remoteRegion.getCache(), mockValidator); PutFromLoadValidator.addToCache(remoteRegion.getCache(), mockValidator);
cleanup.add(() -> { cleanup.add(() -> {
@ -517,19 +531,17 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
} }
return null; return null;
}); });
// This should re-establish the region root node in the optimistic case
SessionImplementor s7 = mockedSession(); SessionImplementor s7 = mockedSession();
assertNull(localAccessStrategy.get(s7, KEY, s7.getTimestamp())); assertNull(localAccessStrategy.get(s7, KEY, s7.getTimestamp()));
assertEquals(0, localRegion.getCache().size()); assertEquals(0, localRegion.getCache().size());
// Re-establishing the region root on the local node doesn't
// propagate it to other nodes. Do a get on the remote node to re-establish
SessionImplementor s8 = mockedSession(); SessionImplementor s8 = mockedSession();
assertNull(remoteAccessStrategy.get(s8, KEY, s8.getTimestamp())); assertNull(remoteAccessStrategy.get(s8, KEY, s8.getTimestamp()));
assertEquals(0, remoteRegion.getCache().size()); assertEquals(0, remoteRegion.getCache().size());
// Wait for async propagation of EndInvalidationCommand before executing naked put // Wait for async propagation of EndInvalidationCommand before executing naked put
assertTrue(endInvalidationLatch.await(1, TimeUnit.SECONDS)); assertTrue(endInvalidationLatch.await(1, TimeUnit.SECONDS));
TIME_SERVICE.advance(1);
CountDownLatch lastPutFromLoadLatch = expectRemotePutFromLoad(remoteRegion.getCache(), localRegion.getCache()); CountDownLatch lastPutFromLoadLatch = expectRemotePutFromLoad(remoteRegion.getCache(), localRegion.getCache());