HHH-11350 Intermittent failure in EntityCollectionInvalidationTest.testAll
(cherry picked from commit 20daac6ea7
)
This commit is contained in:
parent
8ca291fe20
commit
1ecc926fa1
|
@ -6,27 +6,37 @@
|
|||
*/
|
||||
package org.hibernate.test.cache.infinispan.functional.cluster;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Phaser;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.BiPredicate;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.hibernate.Session;
|
||||
import org.hibernate.SessionFactory;
|
||||
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
|
||||
import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
|
||||
import org.hibernate.cache.infinispan.util.FutureUpdate;
|
||||
import org.hibernate.cache.infinispan.util.InfinispanMessageLogger;
|
||||
import org.hibernate.cache.spi.access.AccessType;
|
||||
import org.hibernate.test.cache.infinispan.functional.entities.Contact;
|
||||
import org.hibernate.test.cache.infinispan.functional.entities.Customer;
|
||||
import org.hibernate.test.cache.infinispan.util.ExpectingInterceptor;
|
||||
import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory;
|
||||
import org.hibernate.testing.TestForIssue;
|
||||
import org.infinispan.AdvancedCache;
|
||||
import org.infinispan.Cache;
|
||||
import org.infinispan.commands.VisitableCommand;
|
||||
import org.infinispan.commands.read.GetKeyValueCommand;
|
||||
import org.infinispan.commands.write.PutKeyValueCommand;
|
||||
import org.infinispan.commons.util.Util;
|
||||
import org.infinispan.context.InvocationContext;
|
||||
import org.infinispan.interceptors.base.BaseCustomInterceptor;
|
||||
|
@ -40,6 +50,9 @@ import org.junit.Test;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
/**
|
||||
* EntityCollectionInvalidationTestCase.
|
||||
|
@ -54,9 +67,9 @@ public class EntityCollectionInvalidationTest extends DualNodeTest {
|
|||
private static final Integer CUSTOMER_ID = new Integer( 1 );
|
||||
|
||||
private EmbeddedCacheManager localManager, remoteManager;
|
||||
private Cache localCustomerCache, remoteCustomerCache;
|
||||
private Cache localContactCache, remoteContactCache;
|
||||
private Cache localCollectionCache, remoteCollectionCache;
|
||||
private AdvancedCache localCustomerCache, remoteCustomerCache;
|
||||
private AdvancedCache localContactCache, remoteContactCache;
|
||||
private AdvancedCache localCollectionCache, remoteCollectionCache;
|
||||
private MyListener localListener, remoteListener;
|
||||
private SessionFactory localFactory, remoteFactory;
|
||||
|
||||
|
@ -72,9 +85,9 @@ public class EntityCollectionInvalidationTest extends DualNodeTest {
|
|||
// Our region factory makes its CacheManager available to us
|
||||
localManager = ClusterAwareRegionFactory.getCacheManager( DualNodeTest.LOCAL );
|
||||
// Cache localCache = localManager.getCache("entity");
|
||||
localCustomerCache = localManager.getCache( Customer.class.getName() );
|
||||
localContactCache = localManager.getCache( Contact.class.getName() );
|
||||
localCollectionCache = localManager.getCache( Customer.class.getName() + ".contacts" );
|
||||
localCustomerCache = localManager.getCache( Customer.class.getName() ).getAdvancedCache();
|
||||
localContactCache = localManager.getCache( Contact.class.getName() ).getAdvancedCache();
|
||||
localCollectionCache = localManager.getCache( Customer.class.getName() + ".contacts" ).getAdvancedCache();
|
||||
localListener = new MyListener( "local" );
|
||||
localCustomerCache.addListener( localListener );
|
||||
localContactCache.addListener( localListener );
|
||||
|
@ -82,9 +95,9 @@ public class EntityCollectionInvalidationTest extends DualNodeTest {
|
|||
|
||||
// Bind a listener to the "remote" cache
|
||||
remoteManager = ClusterAwareRegionFactory.getCacheManager( DualNodeTest.REMOTE );
|
||||
remoteCustomerCache = remoteManager.getCache( Customer.class.getName() );
|
||||
remoteContactCache = remoteManager.getCache( Contact.class.getName() );
|
||||
remoteCollectionCache = remoteManager.getCache( Customer.class.getName() + ".contacts" );
|
||||
remoteCustomerCache = remoteManager.getCache( Customer.class.getName() ).getAdvancedCache();
|
||||
remoteContactCache = remoteManager.getCache( Contact.class.getName() ).getAdvancedCache();
|
||||
remoteCollectionCache = remoteManager.getCache( Customer.class.getName() + ".contacts" ).getAdvancedCache();
|
||||
remoteListener = new MyListener( "remote" );
|
||||
remoteCustomerCache.addListener( remoteListener );
|
||||
remoteContactCache.addListener( remoteListener );
|
||||
|
@ -161,12 +174,24 @@ public class EntityCollectionInvalidationTest extends DualNodeTest {
|
|||
|
||||
// Modify customer in remote
|
||||
remoteListener.clear();
|
||||
|
||||
CountDownLatch modifyLatch = null;
|
||||
if (!cacheMode.isInvalidation() && accessType != AccessType.NONSTRICT_READ_WRITE) {
|
||||
modifyLatch = new CountDownLatch(1);
|
||||
ExpectingInterceptor.get(localCustomerCache).when(this::isFutureUpdate).countDown(modifyLatch);
|
||||
}
|
||||
|
||||
ids = modifyCustomer( ids.customerId, remoteFactory );
|
||||
sleep( 250 );
|
||||
assertLoadedFromCache( remoteListener, ids.customerId, ids.contactIds );
|
||||
|
||||
if (modifyLatch != null) {
|
||||
assertTrue(modifyLatch.await(2, TimeUnit.SECONDS));
|
||||
ExpectingInterceptor.cleanup(localCustomerCache);
|
||||
}
|
||||
|
||||
assertEquals( 0, localCollectionCache.size() );
|
||||
if (localCustomerCache.getCacheConfiguration().clustering().cacheMode().isInvalidation()) {
|
||||
if (cacheMode.isInvalidation()) {
|
||||
// After modification, local cache should have been invalidated and hence should be empty
|
||||
assertEquals(0, localCustomerCache.size());
|
||||
} else {
|
||||
|
@ -277,8 +302,32 @@ public class EntityCollectionInvalidationTest extends DualNodeTest {
|
|||
|
||||
customer.setContacts(contacts);
|
||||
|
||||
ArrayList<Runnable> cleanup = new ArrayList<>();
|
||||
CountDownLatch customerLatch = new CountDownLatch(1);
|
||||
CountDownLatch collectionLatch = new CountDownLatch(1);
|
||||
CountDownLatch contactsLatch = new CountDownLatch(2);
|
||||
|
||||
if (cacheMode.isInvalidation()) {
|
||||
cleanup.add(mockValidator(remoteCustomerCache, customerLatch));
|
||||
cleanup.add(mockValidator(remoteCollectionCache, collectionLatch));
|
||||
cleanup.add(mockValidator(remoteContactCache, contactsLatch));
|
||||
} else if (accessType == AccessType.NONSTRICT_READ_WRITE) {
|
||||
// ATM nonstrict mode has sync after-invalidation update
|
||||
Stream.of(customerLatch, collectionLatch, contactsLatch, contactsLatch).forEach(l -> l.countDown());
|
||||
} else {
|
||||
ExpectingInterceptor.get(remoteCustomerCache).when(this::isFutureUpdate).countDown(collectionLatch);
|
||||
ExpectingInterceptor.get(remoteCollectionCache).when(this::isFutureUpdate).countDown(customerLatch);
|
||||
ExpectingInterceptor.get(remoteContactCache).when(this::isFutureUpdate).countDown(contactsLatch);
|
||||
cleanup.add(() -> ExpectingInterceptor.cleanup(remoteCustomerCache, remoteCollectionCache, remoteContactCache));
|
||||
}
|
||||
|
||||
withTxSession(sessionFactory, session -> session.save(customer));
|
||||
|
||||
assertTrue(customerLatch.await(2, TimeUnit.SECONDS));
|
||||
assertTrue(collectionLatch.await(2, TimeUnit.SECONDS));
|
||||
assertTrue(contactsLatch.await(2, TimeUnit.SECONDS));
|
||||
cleanup.forEach(Runnable::run);
|
||||
|
||||
IdContainer ids = new IdContainer();
|
||||
ids.customerId = customer.getId();
|
||||
Set contactIds = new HashSet();
|
||||
|
@ -290,6 +339,27 @@ public class EntityCollectionInvalidationTest extends DualNodeTest {
|
|||
return ids;
|
||||
}
|
||||
|
||||
private boolean isFutureUpdate(InvocationContext ctx, VisitableCommand cmd) {
|
||||
return cmd instanceof PutKeyValueCommand && ((PutKeyValueCommand) cmd).getValue() instanceof FutureUpdate;
|
||||
}
|
||||
|
||||
private Runnable mockValidator(AdvancedCache cache, CountDownLatch latch) {
|
||||
PutFromLoadValidator originalValidator = PutFromLoadValidator.removeFromCache(cache);
|
||||
PutFromLoadValidator mockValidator = spy(originalValidator);
|
||||
doAnswer(invocation -> {
|
||||
try {
|
||||
return invocation.callRealMethod();
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
}).when(mockValidator).endInvalidatingKey(any(), any());
|
||||
PutFromLoadValidator.addToCache(cache, mockValidator);
|
||||
return () -> {
|
||||
PutFromLoadValidator.removeFromCache(cache);
|
||||
PutFromLoadValidator.addToCache(cache, originalValidator);
|
||||
};
|
||||
}
|
||||
|
||||
private Customer getCustomer(Integer id, SessionFactory sessionFactory) throws Exception {
|
||||
log.debug( "Find customer with id=" + id );
|
||||
return withTxSessionApply(sessionFactory, session -> doGetCustomer(id, session));
|
||||
|
|
Loading…
Reference in New Issue