HHH-11350 Intermittent failure in testEvictAll/testRemoveAll

This commit is contained in:
Radim Vansa 2016-12-21 16:04:35 +01:00
parent f59807554a
commit 2a4efd46ca
2 changed files with 36 additions and 15 deletions

View File

@ -6,6 +6,8 @@
*/ */
package org.hibernate.cache.infinispan.access; package org.hibernate.cache.infinispan.access;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -146,26 +148,29 @@ class TxPutFromLoadInterceptor extends BaseRpcInterceptor {
return endInvalidationAndInvokeNextInterceptor(ctx, command); return endInvalidationAndInvokeNextInterceptor(ctx, command);
} }
protected Object endInvalidationAndInvokeNextInterceptor(TxInvocationContext ctx, VisitableCommand command) throws Throwable { protected Object endInvalidationAndInvokeNextInterceptor(TxInvocationContext<?> ctx, VisitableCommand command) throws Throwable {
try { try {
if (ctx.isOriginLocal()) { if (ctx.isOriginLocal()) {
// send async Commit // We cannot use directly ctx.getAffectedKeys() and that includes keys from local-only operations.
Set<Object> affectedKeys = ctx.getAffectedKeys(); // During evictAll inside transaction this would cause unnecessary invalidate command
if (!ctx.getModifications().isEmpty()) {
Object[] keys = ctx.getModifications().stream()
.flatMap(mod -> mod.getAffectedKeys().stream()).distinct().toArray();
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
log.tracef( "Sending end invalidation for keys %s asynchronously, modifications are %s", affectedKeys, ctx.getCacheTransaction().getModifications()); log.tracef( "Sending end invalidation for keys %s asynchronously, modifications are %s",
Arrays.toString(keys), ctx.getCacheTransaction().getModifications());
} }
if (!affectedKeys.isEmpty()) {
GlobalTransaction globalTransaction = ctx.getGlobalTransaction(); GlobalTransaction globalTransaction = ctx.getGlobalTransaction();
EndInvalidationCommand commitCommand = cacheCommandInitializer.buildEndInvalidationCommand( EndInvalidationCommand commitCommand = cacheCommandInitializer.buildEndInvalidationCommand(
cacheName, affectedKeys.toArray(), globalTransaction); cacheName, keys, globalTransaction);
List<Address> members = stateTransferManager.getCacheTopology().getMembers(); List<Address> members = stateTransferManager.getCacheTopology().getMembers();
rpcManager.invokeRemotely(members, commitCommand, asyncUnordered); rpcManager.invokeRemotely(members, commitCommand, asyncUnordered);
// If the transaction is not successful, *RegionAccessStrategy would not be called, therefore // If the transaction is not successful, *RegionAccessStrategy would not be called, therefore
// we have to end invalidation from here manually (in successful case as well) // we have to end invalidation from here manually (in successful case as well)
for (Object key : affectedKeys) { for (Object key : keys) {
putFromLoadValidator.endInvalidatingKey(globalTransaction, key); putFromLoadValidator.endInvalidatingKey(globalTransaction, key);
} }
} }

View File

@ -45,7 +45,13 @@ import org.hibernate.test.cache.infinispan.util.BatchModeTransactionCoordinator;
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup; import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
import org.hibernate.test.cache.infinispan.util.ExpectingInterceptor; import org.hibernate.test.cache.infinispan.util.ExpectingInterceptor;
import org.hibernate.test.cache.infinispan.util.JdbcResourceTransactionMock; import org.hibernate.test.cache.infinispan.util.JdbcResourceTransactionMock;
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
import org.hibernate.test.cache.infinispan.util.TestSynchronization; import org.hibernate.test.cache.infinispan.util.TestSynchronization;
import org.hibernate.test.cache.infinispan.util.TestTimeService;
import org.hibernate.testing.AfterClassOnce;
import org.hibernate.testing.BeforeClassOnce;
import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.test.fwk.TestResourceTracker;
import org.infinispan.AdvancedCache; import org.infinispan.AdvancedCache;
import org.infinispan.commands.write.InvalidateCommand; import org.infinispan.commands.write.InvalidateCommand;
import org.infinispan.commands.write.PutKeyValueCommand; import org.infinispan.commands.write.PutKeyValueCommand;
@ -86,6 +92,8 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
public static final CacheDataDescription CACHE_DATA_DESCRIPTION public static final CacheDataDescription CACHE_DATA_DESCRIPTION
= new CacheDataDescriptionImpl(true, true, ComparableComparator.INSTANCE, null); = new CacheDataDescriptionImpl(true, true, ComparableComparator.INSTANCE, null);
protected static final TestTimeService TIME_SERVICE = new TestTimeService();
protected NodeEnvironment localEnvironment; protected NodeEnvironment localEnvironment;
protected R localRegion; protected R localRegion;
protected S localAccessStrategy; protected S localAccessStrategy;
@ -135,6 +143,13 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
waitForClusterToForm(localRegion.getCache(), remoteRegion.getCache()); waitForClusterToForm(localRegion.getCache(), remoteRegion.getCache());
} }
@Override
protected StandardServiceRegistryBuilder createStandardServiceRegistryBuilder() {
StandardServiceRegistryBuilder ssrb = super.createStandardServiceRegistryBuilder();
ssrb.applySetting(TestInfinispanRegionFactory.TIME_SERVICE, TIME_SERVICE);
return ssrb;
}
/** /**
* Simulate 2 nodes, both start, tx do a get, experience a cache miss, then * Simulate 2 nodes, both start, tx do a get, experience a cache miss, then
* 'read from db.' First does a putFromLoad, then an update (or removal if it is a collection). * 'read from db.' First does a putFromLoad, then an update (or removal if it is a collection).
@ -272,7 +287,7 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
protected SharedSessionContractImplementor mockedSession() { protected SharedSessionContractImplementor mockedSession() {
SessionMock session = mock(SessionMock.class); SessionMock session = mock(SessionMock.class);
when(session.isClosed()).thenReturn(false); when(session.isClosed()).thenReturn(false);
when(session.getTimestamp()).thenReturn(System.currentTimeMillis()); when(session.getTimestamp()).thenReturn(TIME_SERVICE.wallClockTime());
if (jtaPlatform == BatchModeJtaPlatform.class) { if (jtaPlatform == BatchModeJtaPlatform.class) {
BatchModeTransactionCoordinator txCoord = new BatchModeTransactionCoordinator(); BatchModeTransactionCoordinator txCoord = new BatchModeTransactionCoordinator();
when(session.getTransactionCoordinator()).thenReturn(txCoord); when(session.getTransactionCoordinator()).thenReturn(txCoord);
@ -493,8 +508,11 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
assertEquals(PutFromLoadValidator.class, originalValidator.getClass()); assertEquals(PutFromLoadValidator.class, originalValidator.getClass());
PutFromLoadValidator mockValidator = spy(originalValidator); PutFromLoadValidator mockValidator = spy(originalValidator);
doAnswer(invocation -> { doAnswer(invocation -> {
endInvalidationLatch.countDown(); try {
return invocation.callRealMethod(); return invocation.callRealMethod();
} finally {
endInvalidationLatch.countDown();
}
}).when(mockValidator).endInvalidatingKey(any(), any()); }).when(mockValidator).endInvalidatingKey(any(), any());
PutFromLoadValidator.addToCache(remoteRegion.getCache(), mockValidator); PutFromLoadValidator.addToCache(remoteRegion.getCache(), mockValidator);
cleanup.add(() -> { cleanup.add(() -> {
@ -521,19 +539,17 @@ public abstract class AbstractRegionAccessStrategyTest<R extends BaseRegion, S e
} }
return null; return null;
}); });
// This should re-establish the region root node in the optimistic case
SharedSessionContractImplementor s7 = mockedSession(); SharedSessionContractImplementor s7 = mockedSession();
assertNull(localAccessStrategy.get(s7, KEY, s7.getTimestamp())); assertNull(localAccessStrategy.get(s7, KEY, s7.getTimestamp()));
assertEquals(0, localRegion.getCache().size()); assertEquals(0, localRegion.getCache().size());
// Re-establishing the region root on the local node doesn't
// propagate it to other nodes. Do a get on the remote node to re-establish
SharedSessionContractImplementor s8 = mockedSession(); SharedSessionContractImplementor s8 = mockedSession();
assertNull(remoteAccessStrategy.get(s8, KEY, s8.getTimestamp())); assertNull(remoteAccessStrategy.get(s8, KEY, s8.getTimestamp()));
assertEquals(0, remoteRegion.getCache().size()); assertEquals(0, remoteRegion.getCache().size());
// Wait for async propagation of EndInvalidationCommand before executing naked put // Wait for async propagation of EndInvalidationCommand before executing naked put
assertTrue(endInvalidationLatch.await(1, TimeUnit.SECONDS)); assertTrue(endInvalidationLatch.await(1, TimeUnit.SECONDS));
TIME_SERVICE.advance(1);
CountDownLatch lastPutFromLoadLatch = expectRemotePutFromLoad(remoteRegion.getCache(), localRegion.getCache()); CountDownLatch lastPutFromLoadLatch = expectRemotePutFromLoad(remoteRegion.getCache(), localRegion.getCache());