diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/EntityCollectionInvalidationTest.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/EntityCollectionInvalidationTest.java index 3a947daad7..4515fc5499 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/EntityCollectionInvalidationTest.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/EntityCollectionInvalidationTest.java @@ -6,27 +6,37 @@ */ package org.hibernate.test.cache.infinispan.functional.cluster; +import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Phaser; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiPredicate; +import java.util.stream.Stream; import org.hibernate.Session; import org.hibernate.SessionFactory; import org.hibernate.cache.infinispan.InfinispanRegionFactory; +import org.hibernate.cache.infinispan.access.PutFromLoadValidator; +import org.hibernate.cache.infinispan.util.FutureUpdate; import org.hibernate.cache.infinispan.util.InfinispanMessageLogger; +import org.hibernate.cache.spi.access.AccessType; import org.hibernate.test.cache.infinispan.functional.entities.Contact; import org.hibernate.test.cache.infinispan.functional.entities.Customer; +import org.hibernate.test.cache.infinispan.util.ExpectingInterceptor; import org.hibernate.test.cache.infinispan.util.TestInfinispanRegionFactory; import org.hibernate.testing.TestForIssue; import org.infinispan.AdvancedCache; import org.infinispan.Cache; +import org.infinispan.commands.VisitableCommand; import org.infinispan.commands.read.GetKeyValueCommand; +import org.infinispan.commands.write.PutKeyValueCommand; import org.infinispan.commons.util.Util; import org.infinispan.context.InvocationContext; import org.infinispan.interceptors.base.BaseCustomInterceptor; @@ -40,6 +50,9 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; /** * EntityCollectionInvalidationTestCase. @@ -54,9 +67,9 @@ public class EntityCollectionInvalidationTest extends DualNodeTest { private static final Integer CUSTOMER_ID = new Integer( 1 ); private EmbeddedCacheManager localManager, remoteManager; - private Cache localCustomerCache, remoteCustomerCache; - private Cache localContactCache, remoteContactCache; - private Cache localCollectionCache, remoteCollectionCache; + private AdvancedCache localCustomerCache, remoteCustomerCache; + private AdvancedCache localContactCache, remoteContactCache; + private AdvancedCache localCollectionCache, remoteCollectionCache; private MyListener localListener, remoteListener; private SessionFactory localFactory, remoteFactory; @@ -72,9 +85,9 @@ public class EntityCollectionInvalidationTest extends DualNodeTest { // Our region factory makes its CacheManager available to us localManager = ClusterAwareRegionFactory.getCacheManager( DualNodeTest.LOCAL ); // Cache localCache = localManager.getCache("entity"); - localCustomerCache = localManager.getCache( Customer.class.getName() ); - localContactCache = localManager.getCache( Contact.class.getName() ); - localCollectionCache = localManager.getCache( Customer.class.getName() + ".contacts" ); + localCustomerCache = localManager.getCache( Customer.class.getName() ).getAdvancedCache(); + localContactCache = localManager.getCache( Contact.class.getName() ).getAdvancedCache(); + localCollectionCache = localManager.getCache( Customer.class.getName() + ".contacts" ).getAdvancedCache(); localListener = new MyListener( "local" ); localCustomerCache.addListener( localListener ); localContactCache.addListener( localListener ); @@ -82,9 +95,9 @@ public class EntityCollectionInvalidationTest extends DualNodeTest { // Bind a listener to the "remote" cache remoteManager = ClusterAwareRegionFactory.getCacheManager( DualNodeTest.REMOTE ); - remoteCustomerCache = remoteManager.getCache( Customer.class.getName() ); - remoteContactCache = remoteManager.getCache( Contact.class.getName() ); - remoteCollectionCache = remoteManager.getCache( Customer.class.getName() + ".contacts" ); + remoteCustomerCache = remoteManager.getCache( Customer.class.getName() ).getAdvancedCache(); + remoteContactCache = remoteManager.getCache( Contact.class.getName() ).getAdvancedCache(); + remoteCollectionCache = remoteManager.getCache( Customer.class.getName() + ".contacts" ).getAdvancedCache(); remoteListener = new MyListener( "remote" ); remoteCustomerCache.addListener( remoteListener ); remoteContactCache.addListener( remoteListener ); @@ -161,12 +174,24 @@ public class EntityCollectionInvalidationTest extends DualNodeTest { // Modify customer in remote remoteListener.clear(); + + CountDownLatch modifyLatch = null; + if (!cacheMode.isInvalidation() && accessType != AccessType.NONSTRICT_READ_WRITE) { + modifyLatch = new CountDownLatch(1); + ExpectingInterceptor.get(localCustomerCache).when(this::isFutureUpdate).countDown(modifyLatch); + } + ids = modifyCustomer( ids.customerId, remoteFactory ); sleep( 250 ); assertLoadedFromCache( remoteListener, ids.customerId, ids.contactIds ); + if (modifyLatch != null) { + assertTrue(modifyLatch.await(2, TimeUnit.SECONDS)); + ExpectingInterceptor.cleanup(localCustomerCache); + } + assertEquals( 0, localCollectionCache.size() ); - if (localCustomerCache.getCacheConfiguration().clustering().cacheMode().isInvalidation()) { + if (cacheMode.isInvalidation()) { // After modification, local cache should have been invalidated and hence should be empty assertEquals(0, localCustomerCache.size()); } else { @@ -277,8 +302,32 @@ public class EntityCollectionInvalidationTest extends DualNodeTest { customer.setContacts(contacts); + ArrayList cleanup = new ArrayList<>(); + CountDownLatch customerLatch = new CountDownLatch(1); + CountDownLatch collectionLatch = new CountDownLatch(1); + CountDownLatch contactsLatch = new CountDownLatch(2); + + if (cacheMode.isInvalidation()) { + cleanup.add(mockValidator(remoteCustomerCache, customerLatch)); + cleanup.add(mockValidator(remoteCollectionCache, collectionLatch)); + cleanup.add(mockValidator(remoteContactCache, contactsLatch)); + } else if (accessType == AccessType.NONSTRICT_READ_WRITE) { + // ATM nonstrict mode has sync after-invalidation update + Stream.of(customerLatch, collectionLatch, contactsLatch, contactsLatch).forEach(l -> l.countDown()); + } else { + ExpectingInterceptor.get(remoteCustomerCache).when(this::isFutureUpdate).countDown(collectionLatch); + ExpectingInterceptor.get(remoteCollectionCache).when(this::isFutureUpdate).countDown(customerLatch); + ExpectingInterceptor.get(remoteContactCache).when(this::isFutureUpdate).countDown(contactsLatch); + cleanup.add(() -> ExpectingInterceptor.cleanup(remoteCustomerCache, remoteCollectionCache, remoteContactCache)); + } + withTxSession(sessionFactory, session -> session.save(customer)); + assertTrue(customerLatch.await(2, TimeUnit.SECONDS)); + assertTrue(collectionLatch.await(2, TimeUnit.SECONDS)); + assertTrue(contactsLatch.await(2, TimeUnit.SECONDS)); + cleanup.forEach(Runnable::run); + IdContainer ids = new IdContainer(); ids.customerId = customer.getId(); Set contactIds = new HashSet(); @@ -290,6 +339,27 @@ public class EntityCollectionInvalidationTest extends DualNodeTest { return ids; } + private boolean isFutureUpdate(InvocationContext ctx, VisitableCommand cmd) { + return cmd instanceof PutKeyValueCommand && ((PutKeyValueCommand) cmd).getValue() instanceof FutureUpdate; + } + + private Runnable mockValidator(AdvancedCache cache, CountDownLatch latch) { + PutFromLoadValidator originalValidator = PutFromLoadValidator.removeFromCache(cache); + PutFromLoadValidator mockValidator = spy(originalValidator); + doAnswer(invocation -> { + try { + return invocation.callRealMethod(); + } finally { + latch.countDown(); + } + }).when(mockValidator).endInvalidatingKey(any(), any()); + PutFromLoadValidator.addToCache(cache, mockValidator); + return () -> { + PutFromLoadValidator.removeFromCache(cache); + PutFromLoadValidator.addToCache(cache, originalValidator); + }; + } + private Customer getCustomer(Integer id, SessionFactory sessionFactory) throws Exception { log.debug( "Find customer with id=" + id ); return withTxSessionApply(sessionFactory, session -> doGetCustomer(id, session));