HHH-10185 In nonstrict-read-write mode the remove may be not applied
This commit is contained in:
parent
9a9fb43ca5
commit
cc61914f0f
|
@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
|
||||||
*/
|
*/
|
||||||
public class NonStrictAccessDelegate implements AccessDelegate {
|
public class NonStrictAccessDelegate implements AccessDelegate {
|
||||||
private static final Log log = LogFactory.getLog( NonStrictAccessDelegate.class );
|
private static final Log log = LogFactory.getLog( NonStrictAccessDelegate.class );
|
||||||
|
private static final boolean trace = log.isTraceEnabled();
|
||||||
|
|
||||||
private final BaseTransactionalDataRegion region;
|
private final BaseTransactionalDataRegion region;
|
||||||
private final AdvancedCache cache;
|
private final AdvancedCache cache;
|
||||||
|
@ -90,10 +91,17 @@ public class NonStrictAccessDelegate implements AccessDelegate {
|
||||||
Object oldVersion = getVersion(prev);
|
Object oldVersion = getVersion(prev);
|
||||||
if (oldVersion != null) {
|
if (oldVersion != null) {
|
||||||
if (versionComparator.compare(version, oldVersion) <= 0) {
|
if (versionComparator.compare(version, oldVersion) <= 0) {
|
||||||
|
if (trace) {
|
||||||
|
log.tracef("putFromLoad not executed since version(%s) <= oldVersion(%s)", version, oldVersion);
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (prev instanceof VersionedEntry && txTimestamp <= ((VersionedEntry) prev).getTimestamp()) {
|
else if (prev instanceof VersionedEntry && txTimestamp <= ((VersionedEntry) prev).getTimestamp()) {
|
||||||
|
if (trace) {
|
||||||
|
log.tracef("putFromLoad not executed since tx started at %d and entry was invalidated at %d",
|
||||||
|
txTimestamp, ((VersionedEntry) prev).getTimestamp());
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -119,11 +127,13 @@ public class NonStrictAccessDelegate implements AccessDelegate {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void remove(SessionImplementor session, Object key) throws CacheException {
|
public void remove(SessionImplementor session, Object key) throws CacheException {
|
||||||
Object value = cache.get(key);
|
|
||||||
Object version = getVersion(value);
|
|
||||||
// there's no 'afterRemove', so we have to use our own synchronization
|
// there's no 'afterRemove', so we have to use our own synchronization
|
||||||
|
// the API does not provide version of removed item but we can't load it from the cache
|
||||||
|
// as that would be prone to race conditions - if the entry was updated in the meantime
|
||||||
|
// the remove could be discarded and we would end up with stale record
|
||||||
|
// See VersionedTest#testCollectionUpdate for such situation
|
||||||
TransactionCoordinator transactionCoordinator = session.getTransactionCoordinator();
|
TransactionCoordinator transactionCoordinator = session.getTransactionCoordinator();
|
||||||
RemovalSynchronization sync = new RemovalSynchronization(transactionCoordinator, writeCache, false, region, key, version);
|
RemovalSynchronization sync = new RemovalSynchronization(transactionCoordinator, writeCache, false, region, key);
|
||||||
transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync);
|
transactionCoordinator.getLocalSynchronizations().registerSynchronization(sync);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,24 +20,17 @@ import java.util.concurrent.TimeUnit;
|
||||||
public class RemovalSynchronization extends InvocationAfterCompletion {
|
public class RemovalSynchronization extends InvocationAfterCompletion {
|
||||||
private final BaseTransactionalDataRegion region;
|
private final BaseTransactionalDataRegion region;
|
||||||
private final Object key;
|
private final Object key;
|
||||||
private final Object version;
|
|
||||||
|
|
||||||
public RemovalSynchronization(TransactionCoordinator tc, AdvancedCache cache, boolean requiresTransaction, BaseTransactionalDataRegion region, Object key, Object version) {
|
public RemovalSynchronization(TransactionCoordinator tc, AdvancedCache cache, boolean requiresTransaction, BaseTransactionalDataRegion region, Object key) {
|
||||||
super(tc, cache, requiresTransaction);
|
super(tc, cache, requiresTransaction);
|
||||||
this.region = region;
|
this.region = region;
|
||||||
this.key = key;
|
this.key = key;
|
||||||
this.version = version;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void invoke(boolean success, AdvancedCache cache) {
|
protected void invoke(boolean success, AdvancedCache cache) {
|
||||||
if (success) {
|
if (success) {
|
||||||
if (version == null) {
|
|
||||||
cache.put(key, new VersionedEntry(null, null, region.nextTimestamp()), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
|
cache.put(key, new VersionedEntry(null, null, region.nextTimestamp()), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
|
||||||
}
|
}
|
||||||
else {
|
|
||||||
cache.put(key, new VersionedEntry(null, version, Long.MIN_VALUE), region.getTombstoneExpiration(), TimeUnit.MILLISECONDS);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,12 +18,9 @@ import org.infinispan.context.InvocationContext;
|
||||||
import org.infinispan.factories.annotations.Inject;
|
import org.infinispan.factories.annotations.Inject;
|
||||||
import org.infinispan.filter.NullValueConverter;
|
import org.infinispan.filter.NullValueConverter;
|
||||||
import org.infinispan.interceptors.CallInterceptor;
|
import org.infinispan.interceptors.CallInterceptor;
|
||||||
import org.infinispan.util.logging.Log;
|
|
||||||
import org.infinispan.util.logging.LogFactory;
|
|
||||||
|
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Note that this does not implement all commands, only those appropriate for {@link TombstoneAccessDelegate}
|
* Note that this does not implement all commands, only those appropriate for {@link TombstoneAccessDelegate}
|
||||||
|
|
|
@ -73,6 +73,8 @@ public abstract class AbstractNonInvalidationTest extends SingleNodeTest {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void insertAndClearCache() throws Exception {
|
public void insertAndClearCache() throws Exception {
|
||||||
|
region = sessionFactory().getSecondLevelCacheRegion(Item.class.getName());
|
||||||
|
entityCache = ((EntityRegionImpl) region).getCache();
|
||||||
Item item = new Item("my item", "Original item");
|
Item item = new Item("my item", "Original item");
|
||||||
withTxSession(s -> s.persist(item));
|
withTxSession(s -> s.persist(item));
|
||||||
entityCache.clear();
|
entityCache.clear();
|
||||||
|
|
|
@ -3,12 +3,19 @@ package org.hibernate.test.cache.infinispan.functional;
|
||||||
import org.hibernate.PessimisticLockException;
|
import org.hibernate.PessimisticLockException;
|
||||||
import org.hibernate.Session;
|
import org.hibernate.Session;
|
||||||
import org.hibernate.StaleStateException;
|
import org.hibernate.StaleStateException;
|
||||||
|
import org.hibernate.cache.infinispan.impl.BaseTransactionalDataRegion;
|
||||||
import org.hibernate.cache.infinispan.util.Caches;
|
import org.hibernate.cache.infinispan.util.Caches;
|
||||||
import org.hibernate.cache.infinispan.util.VersionedEntry;
|
import org.hibernate.cache.infinispan.util.VersionedEntry;
|
||||||
import org.hibernate.cache.spi.entry.CacheEntry;
|
import org.hibernate.cache.spi.entry.CacheEntry;
|
||||||
import org.hibernate.engine.spi.SessionImplementor;
|
import org.hibernate.engine.spi.SessionImplementor;
|
||||||
import org.hibernate.test.cache.infinispan.functional.entities.Item;
|
import org.hibernate.test.cache.infinispan.functional.entities.Item;
|
||||||
|
import org.hibernate.test.cache.infinispan.functional.entities.OtherItem;
|
||||||
|
import org.infinispan.AdvancedCache;
|
||||||
|
import org.infinispan.commands.write.PutKeyValueCommand;
|
||||||
import org.infinispan.commons.util.ByRef;
|
import org.infinispan.commons.util.ByRef;
|
||||||
|
import org.infinispan.context.Flag;
|
||||||
|
import org.infinispan.context.InvocationContext;
|
||||||
|
import org.infinispan.interceptors.base.BaseCustomInterceptor;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import javax.transaction.Synchronization;
|
import javax.transaction.Synchronization;
|
||||||
|
@ -17,10 +24,12 @@ import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.BiConsumer;
|
import java.util.function.BiConsumer;
|
||||||
|
|
||||||
|
@ -41,6 +50,11 @@ public class VersionedTest extends AbstractNonInvalidationTest {
|
||||||
return Arrays.asList(NONSTRICT_REPLICATED, NONSTRICT_DISTRIBUTED);
|
return Arrays.asList(NONSTRICT_REPLICATED, NONSTRICT_DISTRIBUTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean getUseQueryCache() {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTwoRemoves() throws Exception {
|
public void testTwoRemoves() throws Exception {
|
||||||
CyclicBarrier loadBarrier = new CyclicBarrier(2);
|
CyclicBarrier loadBarrier = new CyclicBarrier(2);
|
||||||
|
@ -220,6 +234,98 @@ public class VersionedTest extends AbstractNonInvalidationTest {
|
||||||
assertSingleCacheEntry();
|
assertSingleCacheEntry();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCollectionUpdate() throws Exception {
|
||||||
|
// the first insert puts VersionedEntry(null, null, timestamp), so we have to wait a while to cache the entry
|
||||||
|
TIME_SERVICE.advance(1);
|
||||||
|
|
||||||
|
withTxSession(s -> {
|
||||||
|
Item item = s.load(Item.class, itemId);
|
||||||
|
OtherItem otherItem = new OtherItem();
|
||||||
|
otherItem.setName("Other 1");
|
||||||
|
s.persist(otherItem);
|
||||||
|
item.addOtherItem(otherItem);
|
||||||
|
});
|
||||||
|
withTxSession(s -> {
|
||||||
|
Item item = s.load(Item.class, itemId);
|
||||||
|
Set<OtherItem> otherItems = item.getOtherItems();
|
||||||
|
assertFalse(otherItems.isEmpty());
|
||||||
|
otherItems.remove(otherItems.iterator().next());
|
||||||
|
});
|
||||||
|
|
||||||
|
AdvancedCache collectionCache = ((BaseTransactionalDataRegion) sessionFactory().getSecondLevelCacheRegion(Item.class.getName() + ".otherItems")).getCache();
|
||||||
|
CountDownLatch putFromLoadLatch = new CountDownLatch(1);
|
||||||
|
AtomicBoolean committing = new AtomicBoolean(false);
|
||||||
|
CollectionUpdateTestInterceptor collectionUpdateTestInterceptor = new CollectionUpdateTestInterceptor(putFromLoadLatch);
|
||||||
|
AnotherCollectionUpdateTestInterceptor anotherInterceptor = new AnotherCollectionUpdateTestInterceptor(putFromLoadLatch, committing);
|
||||||
|
collectionCache.addInterceptor(collectionUpdateTestInterceptor, collectionCache.getInterceptorChain().size() - 1);
|
||||||
|
collectionCache.addInterceptor(anotherInterceptor, 0);
|
||||||
|
|
||||||
|
TIME_SERVICE.advance(1);
|
||||||
|
Future<Boolean> addFuture = executor.submit(() -> withTxSessionApply(s -> {
|
||||||
|
collectionUpdateTestInterceptor.updateLatch.await();
|
||||||
|
Item item = s.load(Item.class, itemId);
|
||||||
|
OtherItem otherItem = new OtherItem();
|
||||||
|
otherItem.setName("Other 2");
|
||||||
|
s.persist(otherItem);
|
||||||
|
item.addOtherItem(otherItem);
|
||||||
|
committing.set(true);
|
||||||
|
return true;
|
||||||
|
}));
|
||||||
|
|
||||||
|
Future<Boolean> readFuture = executor.submit(() -> withTxSessionApply(s -> {
|
||||||
|
Item item = s.load(Item.class, itemId);
|
||||||
|
assertTrue(item.getOtherItems().isEmpty());
|
||||||
|
return true;
|
||||||
|
}));
|
||||||
|
|
||||||
|
addFuture.get();
|
||||||
|
readFuture.get();
|
||||||
|
collectionCache.removeInterceptor(CollectionUpdateTestInterceptor.class);
|
||||||
|
collectionCache.removeInterceptor(AnotherCollectionUpdateTestInterceptor.class);
|
||||||
|
|
||||||
|
withTxSession(s -> assertFalse(s.load(Item.class, itemId).getOtherItems().isEmpty()));
|
||||||
|
}
|
||||||
|
|
||||||
|
private class CollectionUpdateTestInterceptor extends BaseCustomInterceptor {
|
||||||
|
final AtomicBoolean firstPutFromLoad = new AtomicBoolean(true);
|
||||||
|
final CountDownLatch putFromLoadLatch;
|
||||||
|
final CountDownLatch updateLatch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
public CollectionUpdateTestInterceptor(CountDownLatch putFromLoadLatch) {
|
||||||
|
this.putFromLoadLatch = putFromLoadLatch;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
|
||||||
|
if (command.hasFlag(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT)) {
|
||||||
|
if (firstPutFromLoad.compareAndSet(true, false)) {
|
||||||
|
updateLatch.countDown();
|
||||||
|
putFromLoadLatch.await();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return super.visitPutKeyValueCommand(ctx, command);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class AnotherCollectionUpdateTestInterceptor extends BaseCustomInterceptor {
|
||||||
|
final CountDownLatch putFromLoadLatch;
|
||||||
|
final AtomicBoolean committing;
|
||||||
|
|
||||||
|
public AnotherCollectionUpdateTestInterceptor(CountDownLatch putFromLoadLatch, AtomicBoolean committing) {
|
||||||
|
this.putFromLoadLatch = putFromLoadLatch;
|
||||||
|
this.committing = committing;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
|
||||||
|
if (committing.get() && !command.hasFlag(Flag.ZERO_LOCK_ACQUISITION_TIMEOUT)) {
|
||||||
|
putFromLoadLatch.countDown();
|
||||||
|
}
|
||||||
|
return super.visitPutKeyValueCommand(ctx, command);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected void assertSingleEmpty() {
|
protected void assertSingleEmpty() {
|
||||||
Map contents = Caches.entrySet(entityCache).toMap();
|
Map contents = Caches.entrySet(entityCache).toMap();
|
||||||
Object value;
|
Object value;
|
||||||
|
|
Loading…
Reference in New Issue