From 71e13cdf25939b5a18cd79e648e64cdc53642c3e Mon Sep 17 00:00:00 2001 From: Radim Vansa Date: Wed, 14 Oct 2015 11:45:24 +0200 Subject: [PATCH] HHH-10185 In nonstrict-read-write mode the remove may be not applied --- .../access/NonStrictAccessDelegate.java | 16 ++- .../access/RemovalSynchronization.java | 11 +- .../access/VersionedCallInterceptor.java | 3 - .../AbstractNonInvalidationTest.java | 9 +- .../infinispan/functional/VersionedTest.java | 106 ++++++++++++++++++ 5 files changed, 123 insertions(+), 22 deletions(-) diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonStrictAccessDelegate.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonStrictAccessDelegate.java index a16f33d801..009aabb21d 100644 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonStrictAccessDelegate.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/NonStrictAccessDelegate.java @@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit; */ public class NonStrictAccessDelegate implements AccessDelegate { private static final Log log = LogFactory.getLog( NonStrictAccessDelegate.class ); + private static final boolean trace = log.isTraceEnabled(); private final BaseTransactionalDataRegion region; private final AdvancedCache cache; @@ -90,10 +91,17 @@ public class NonStrictAccessDelegate implements AccessDelegate { Object oldVersion = getVersion(prev); if (oldVersion != null) { if (versionComparator.compare(version, oldVersion) <= 0) { + if (trace) { + log.tracef("putFromLoad not executed since version(%s) <= oldVersion(%s)", version, oldVersion); + } return false; } } else if (prev instanceof VersionedEntry && txTimestamp <= ((VersionedEntry) prev).getTimestamp()) { + if (trace) { + log.tracef("putFromLoad not executed since tx started at %d and entry was invalidated at %d", + txTimestamp, ((VersionedEntry) prev).getTimestamp()); + } return false; } } @@ -119,11 +127,13 @@ public class NonStrictAccessDelegate implements AccessDelegate { @Override public void remove(SessionImplementor session, Object key) throws CacheException { - Object value = cache.get(key); - Object version = getVersion(value); // there's no 'afterRemove', so we have to use our own synchronization + // the API does not provide version of removed item but we can't load it from the cache + // as that would be prone to race conditions - if the entry was updated in the meantime + // the remove could be discarded and we would end up with stale record + // See VersionedTest#testCollectionUpdate for such situation TransactionCoordinator transactionCoordinator = session.getTransactionCoordinator(); - RemovalSynchronization sync = new RemovalSynchronization(transactionCoordinator, writeCache, false, region, key, version); + RemovalSynchronization sync = new RemovalSynchronization(transactionCoordinator, writeCache, false, region, key); transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync); } diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/RemovalSynchronization.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/RemovalSynchronization.java index fba130faa1..3129e8736f 100644 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/RemovalSynchronization.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/RemovalSynchronization.java @@ -20,24 +20,17 @@ import java.util.concurrent.TimeUnit; public class RemovalSynchronization extends InvocationAfterCompletion { private final BaseTransactionalDataRegion region; private final Object key; - private final Object version; - public RemovalSynchronization(TransactionCoordinator tc, AdvancedCache cache, boolean requiresTransaction, BaseTransactionalDataRegion region, Object key, Object version) { + public RemovalSynchronization(TransactionCoordinator tc, AdvancedCache cache, boolean requiresTransaction, BaseTransactionalDataRegion region, Object key) { super(tc, cache, requiresTransaction); this.region = region; this.key = key; - this.version = version; } @Override protected void invoke(boolean success, AdvancedCache cache) { if (success) { - if (version == null) { - cache.put(key, new VersionedEntry(null, null, region.nextTimestamp()), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS); - } - else { - cache.put(key, new VersionedEntry(null, version, Long.MIN_VALUE), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS); - } + cache.put(key, new VersionedEntry(null, null, region.nextTimestamp()), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS); } } } diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/VersionedCallInterceptor.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/VersionedCallInterceptor.java index c8bdc5a67a..9cf79b93de 100644 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/VersionedCallInterceptor.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/VersionedCallInterceptor.java @@ -18,12 +18,9 @@ import org.infinispan.context.InvocationContext; import org.infinispan.factories.annotations.Inject; import org.infinispan.filter.NullValueConverter; import org.infinispan.interceptors.CallInterceptor; -import org.infinispan.util.logging.Log; -import org.infinispan.util.logging.LogFactory; import java.util.Comparator; import java.util.Set; -import java.util.UUID; /** * Note that this does not implement all commands, only those appropriate for {@link TombstoneAccessDelegate} diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/AbstractNonInvalidationTest.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/AbstractNonInvalidationTest.java index d321a23de5..6a92e9996f 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/AbstractNonInvalidationTest.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/AbstractNonInvalidationTest.java @@ -64,15 +64,10 @@ public abstract class AbstractNonInvalidationTest extends SingleNodeTest { executor.shutdown(); } - @Override - protected void startUp() { - super.startUp(); - region = sessionFactory().getSecondLevelCacheRegion(Item.class.getName()); - entityCache = ((EntityRegionImpl) region).getCache(); - } - @Before public void insertAndClearCache() throws Exception { + region = sessionFactory().getSecondLevelCacheRegion(Item.class.getName()); + entityCache = ((EntityRegionImpl) region).getCache(); Item item = new Item("my item", "Original item"); withTxSession(s -> s.persist(item)); entityCache.clear(); diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/VersionedTest.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/VersionedTest.java index 2ac90cc984..a3d27f78f1 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/VersionedTest.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/VersionedTest.java @@ -3,12 +3,19 @@ package org.hibernate.test.cache.infinispan.functional; import org.hibernate.PessimisticLockException; import org.hibernate.Session; import org.hibernate.StaleStateException; +import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion; import org.hibernate.cache.infinispan.util.Caches; import org.hibernate.cache.infinispan.util.VersionedEntry; import org.hibernate.cache.spi.entry.CacheEntry; import org.hibernate.engine.spi.SessionImplementor; import org.hibernate.test.cache.infinispan.functional.entities.Item; +import org.hibernate.test.cache.infinispan.functional.entities.OtherItem; +import org.infinispan.AdvancedCache; +import org.infinispan.commands.write.PutKeyValueCommand; import org.infinispan.commons.util.ByRef; +import org.infinispan.context.Flag; +import org.infinispan.context.InvocationContext; +import org.infinispan.interceptors.base.BaseCustomInterceptor; import org.junit.Test; import javax.transaction.Synchronization; @@ -17,10 +24,12 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; @@ -41,6 +50,11 @@ public class VersionedTest extends AbstractNonInvalidationTest { return Arrays.asList(NONSTRICT_REPLICATED, NONSTRICT_DISTRIBUTED); } + @Override + protected boolean getUseQueryCache() { + return false; + } + @Test public void testTwoRemoves() throws Exception { CyclicBarrier loadBarrier = new CyclicBarrier(2); @@ -220,6 +234,98 @@ public class VersionedTest extends AbstractNonInvalidationTest { assertSingleCacheEntry(); } + @Test + public void testCollectionUpdate() throws Exception { + // the first insert puts VersionedEntry(null, null, timestamp), so we have to wait a while to cache the entry + TIME_SERVICE.advance(1); + + withTxSession(s -> { + Item item = s.load(Item.class, itemId); + OtherItem otherItem = new OtherItem(); + otherItem.setName("Other 1"); + s.persist(otherItem); + item.addOtherItem(otherItem); + }); + withTxSession(s -> { + Item item = s.load(Item.class, itemId); + Set otherItems = item.getOtherItems(); + assertFalse(otherItems.isEmpty()); + otherItems.remove(otherItems.iterator().next()); + }); + + AdvancedCache collectionCache = ((BaseTransactionalDataRegion) sessionFactory().getSecondLevelCacheRegion(Item.class.getName() + ".otherItems")).getCache(); + CountDownLatch putFromLoadLatch = new CountDownLatch(1); + AtomicBoolean committing = new AtomicBoolean(false); + CollectionUpdateTestInterceptor collectionUpdateTestInterceptor = new CollectionUpdateTestInterceptor(putFromLoadLatch); + AnotherCollectionUpdateTestInterceptor anotherInterceptor = new AnotherCollectionUpdateTestInterceptor(putFromLoadLatch, committing); + collectionCache.addInterceptor(collectionUpdateTestInterceptor, collectionCache.getInterceptorChain().size() - 1); + collectionCache.addInterceptor(anotherInterceptor, 0); + + TIME_SERVICE.advance(1); + Future addFuture = executor.submit(() -> withTxSessionApply(s -> { + collectionUpdateTestInterceptor.updateLatch.await(); + Item item = s.load(Item.class, itemId); + OtherItem otherItem = new OtherItem(); + otherItem.setName("Other 2"); + s.persist(otherItem); + item.addOtherItem(otherItem); + committing.set(true); + return true; + })); + + Future readFuture = executor.submit(() -> withTxSessionApply(s -> { + Item item = s.load(Item.class, itemId); + assertTrue(item.getOtherItems().isEmpty()); + return true; + })); + + addFuture.get(); + readFuture.get(); + collectionCache.removeInterceptor(CollectionUpdateTestInterceptor.class); + collectionCache.removeInterceptor(AnotherCollectionUpdateTestInterceptor.class); + + withTxSession(s -> assertFalse(s.load(Item.class, itemId).getOtherItems().isEmpty())); + } + + private class CollectionUpdateTestInterceptor extends BaseCustomInterceptor { + final AtomicBoolean firstPutFromLoad = new AtomicBoolean(true); + final CountDownLatch putFromLoadLatch; + final CountDownLatch updateLatch = new CountDownLatch(1); + + public CollectionUpdateTestInterceptor(CountDownLatch putFromLoadLatch) { + this.putFromLoadLatch = putFromLoadLatch; + } + + @Override + public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable { + if (command.hasFlag(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT)) { + if (firstPutFromLoad.compareAndSet(true, false)) { + updateLatch.countDown(); + putFromLoadLatch.await(); + } + } + return super.visitPutKeyValueCommand(ctx, command); + } + } + + private class AnotherCollectionUpdateTestInterceptor extends BaseCustomInterceptor { + final CountDownLatch putFromLoadLatch; + final AtomicBoolean committing; + + public AnotherCollectionUpdateTestInterceptor(CountDownLatch putFromLoadLatch, AtomicBoolean committing) { + this.putFromLoadLatch = putFromLoadLatch; + this.committing = committing; + } + + @Override + public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable { + if (committing.get() && !command.hasFlag(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT)) { + putFromLoadLatch.countDown(); + } + return super.visitPutKeyValueCommand(ctx, command); + } + } + protected void assertSingleEmpty() { Map contents = Caches.entrySet(entityCache).toMap(); Object value;