HHH-10185 In nonstrict-read-write mode the remove may be not applied

This commit is contained in:
Radim Vansa 2015-10-14 11:45:24 +02:00 committed by Galder Zamarreño
parent c835cf11ec
commit 71e13cdf25
5 changed files with 123 additions and 22 deletions

View File

@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
*/
public class NonStrictAccessDelegate implements AccessDelegate {
private static final Log log = LogFactory.getLog( NonStrictAccessDelegate.class );
private static final boolean trace = log.isTraceEnabled();
private final BaseTransactionalDataRegion region;
private final AdvancedCache cache;
@ -90,10 +91,17 @@ public class NonStrictAccessDelegate implements AccessDelegate {
Object oldVersion = getVersion(prev);
if (oldVersion != null) {
if (versionComparator.compare(version, oldVersion) <= 0) {
if (trace) {
log.tracef("putFromLoad not executed since version(%s) <= oldVersion(%s)", version, oldVersion);
}
return false;
}
}
else if (prev instanceof VersionedEntry && txTimestamp <= ((VersionedEntry) prev).getTimestamp()) {
if (trace) {
log.tracef("putFromLoad not executed since tx started at %d and entry was invalidated at %d",
txTimestamp, ((VersionedEntry) prev).getTimestamp());
}
return false;
}
}
@ -119,11 +127,13 @@ public class NonStrictAccessDelegate implements AccessDelegate {
@Override
public void remove(SessionImplementor session, Object key) throws CacheException {
Object value = cache.get(key);
Object version = getVersion(value);
// there's no 'afterRemove', so we have to use our own synchronization
// the API does not provide version of removed item but we can't load it from the cache
// as that would be prone to race conditions - if the entry was updated in the meantime
// the remove could be discarded and we would end up with stale record
// See VersionedTest#testCollectionUpdate for such situation
TransactionCoordinator transactionCoordinator = session.getTransactionCoordinator();
RemovalSynchronization sync = new RemovalSynchronization(transactionCoordinator, writeCache, false, region, key, version);
RemovalSynchronization sync = new RemovalSynchronization(transactionCoordinator, writeCache, false, region, key);
transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync);
}

View File

@ -20,24 +20,17 @@ import java.util.concurrent.TimeUnit;
public class RemovalSynchronization extends InvocationAfterCompletion {
private final BaseTransactionalDataRegion region;
private final Object key;
private final Object version;
public RemovalSynchronization(TransactionCoordinator tc, AdvancedCache cache, boolean requiresTransaction, BaseTransactionalDataRegion region, Object key, Object version) {
public RemovalSynchronization(TransactionCoordinator tc, AdvancedCache cache, boolean requiresTransaction, BaseTransactionalDataRegion region, Object key) {
super(tc, cache, requiresTransaction);
this.region = region;
this.key = key;
this.version = version;
}
@Override
protected void invoke(boolean success, AdvancedCache cache) {
if (success) {
if (version == null) {
cache.put(key, new VersionedEntry(null, null, region.nextTimestamp()), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
}
else {
cache.put(key, new VersionedEntry(null, version, Long.MIN_VALUE), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
}
cache.put(key, new VersionedEntry(null, null, region.nextTimestamp()), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
}
}
}

View File

@ -18,12 +18,9 @@ import org.infinispan.context.InvocationContext;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.filter.NullValueConverter;
import org.infinispan.interceptors.CallInterceptor;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import java.util.Comparator;
import java.util.Set;
import java.util.UUID;
/**
* Note that this does not implement all commands, only those appropriate for {@link TombstoneAccessDelegate}

View File

@ -64,15 +64,10 @@ public abstract class AbstractNonInvalidationTest extends SingleNodeTest {
executor.shutdown();
}
@Override
protected void startUp() {
super.startUp();
region = sessionFactory().getSecondLevelCacheRegion(Item.class.getName());
entityCache = ((EntityRegionImpl) region).getCache();
}
@Before
public void insertAndClearCache() throws Exception {
region = sessionFactory().getSecondLevelCacheRegion(Item.class.getName());
entityCache = ((EntityRegionImpl) region).getCache();
Item item = new Item("my item", "Original item");
withTxSession(s -> s.persist(item));
entityCache.clear();

View File

@ -3,12 +3,19 @@ package org.hibernate.test.cache.infinispan.functional;
import org.hibernate.PessimisticLockException;
import org.hibernate.Session;
import org.hibernate.StaleStateException;
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion;
import org.hibernate.cache.infinispan.util.Caches;
import org.hibernate.cache.infinispan.util.VersionedEntry;
import org.hibernate.cache.spi.entry.CacheEntry;
import org.hibernate.engine.spi.SessionImplementor;
import org.hibernate.test.cache.infinispan.functional.entities.Item;
import org.hibernate.test.cache.infinispan.functional.entities.OtherItem;
import org.infinispan.AdvancedCache;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commons.util.ByRef;
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.interceptors.base.BaseCustomInterceptor;
import org.junit.Test;
import javax.transaction.Synchronization;
@ -17,10 +24,12 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
@ -41,6 +50,11 @@ public class VersionedTest extends AbstractNonInvalidationTest {
return Arrays.asList(NONSTRICT_REPLICATED, NONSTRICT_DISTRIBUTED);
}
@Override
protected boolean getUseQueryCache() {
return false;
}
@Test
public void testTwoRemoves() throws Exception {
CyclicBarrier loadBarrier = new CyclicBarrier(2);
@ -220,6 +234,98 @@ public class VersionedTest extends AbstractNonInvalidationTest {
assertSingleCacheEntry();
}
@Test
public void testCollectionUpdate() throws Exception {
// the first insert puts VersionedEntry(null, null, timestamp), so we have to wait a while to cache the entry
TIME_SERVICE.advance(1);
withTxSession(s -> {
Item item = s.load(Item.class, itemId);
OtherItem otherItem = new OtherItem();
otherItem.setName("Other 1");
s.persist(otherItem);
item.addOtherItem(otherItem);
});
withTxSession(s -> {
Item item = s.load(Item.class, itemId);
Set<OtherItem> otherItems = item.getOtherItems();
assertFalse(otherItems.isEmpty());
otherItems.remove(otherItems.iterator().next());
});
AdvancedCache collectionCache = ((BaseTransactionalDataRegion) sessionFactory().getSecondLevelCacheRegion(Item.class.getName() + ".otherItems")).getCache();
CountDownLatch putFromLoadLatch = new CountDownLatch(1);
AtomicBoolean committing = new AtomicBoolean(false);
CollectionUpdateTestInterceptor collectionUpdateTestInterceptor = new CollectionUpdateTestInterceptor(putFromLoadLatch);
AnotherCollectionUpdateTestInterceptor anotherInterceptor = new AnotherCollectionUpdateTestInterceptor(putFromLoadLatch, committing);
collectionCache.addInterceptor(collectionUpdateTestInterceptor, collectionCache.getInterceptorChain().size() - 1);
collectionCache.addInterceptor(anotherInterceptor, 0);
TIME_SERVICE.advance(1);
Future<Boolean> addFuture = executor.submit(() -> withTxSessionApply(s -> {
collectionUpdateTestInterceptor.updateLatch.await();
Item item = s.load(Item.class, itemId);
OtherItem otherItem = new OtherItem();
otherItem.setName("Other 2");
s.persist(otherItem);
item.addOtherItem(otherItem);
committing.set(true);
return true;
}));
Future<Boolean> readFuture = executor.submit(() -> withTxSessionApply(s -> {
Item item = s.load(Item.class, itemId);
assertTrue(item.getOtherItems().isEmpty());
return true;
}));
addFuture.get();
readFuture.get();
collectionCache.removeInterceptor(CollectionUpdateTestInterceptor.class);
collectionCache.removeInterceptor(AnotherCollectionUpdateTestInterceptor.class);
withTxSession(s -> assertFalse(s.load(Item.class, itemId).getOtherItems().isEmpty()));
}
private class CollectionUpdateTestInterceptor extends BaseCustomInterceptor {
final AtomicBoolean firstPutFromLoad = new AtomicBoolean(true);
final CountDownLatch putFromLoadLatch;
final CountDownLatch updateLatch = new CountDownLatch(1);
public CollectionUpdateTestInterceptor(CountDownLatch putFromLoadLatch) {
this.putFromLoadLatch = putFromLoadLatch;
}
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
if (command.hasFlag(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT)) {
if (firstPutFromLoad.compareAndSet(true, false)) {
updateLatch.countDown();
putFromLoadLatch.await();
}
}
return super.visitPutKeyValueCommand(ctx, command);
}
}
private class AnotherCollectionUpdateTestInterceptor extends BaseCustomInterceptor {
final CountDownLatch putFromLoadLatch;
final AtomicBoolean committing;
public AnotherCollectionUpdateTestInterceptor(CountDownLatch putFromLoadLatch, AtomicBoolean committing) {
this.putFromLoadLatch = putFromLoadLatch;
this.committing = committing;
}
@Override
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
if (committing.get() && !command.hasFlag(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT)) {
putFromLoadLatch.countDown();
}
return super.visitPutKeyValueCommand(ctx, command);
}
}
protected void assertSingleEmpty() {
Map contents = Caches.entrySet(entityCache).toMap();
Object value;