HHH-11381 In nonstrict mode, putFromLoad after evict can behave incorrectly
* piggybacking minor improvements for size command, too
(cherry picked from commit 3d712b0a6e
)
This commit is contained in:
parent
f87a66e32d
commit
4f4f1073e5
|
@ -108,12 +108,9 @@ public class NonStrictAccessDelegate implements AccessDelegate {
|
|||
}
|
||||
// we can't use putForExternalRead since the PFER flag means that entry is not wrapped into context
|
||||
// when it is present in the container. TombstoneCallInterceptor will deal with this.
|
||||
if (!(value instanceof CacheEntry)) {
|
||||
value = new VersionedEntry(value, version, txTimestamp);
|
||||
}
|
||||
// Apply the update locally first - if we're the backup owner, async propagation wouldn't change the value
|
||||
// for the subsequent operation soon enough as it goes through primary owner
|
||||
putFromLoadCache.put(key, value);
|
||||
// Even if value is instanceof CacheEntry, we have to wrap it in VersionedEntry and add transaction timestamp.
|
||||
// Otherwise, old eviction record wouldn't be overwritten.
|
||||
putFromLoadCache.put(key, new VersionedEntry(value, version, txTimestamp));
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.infinispan.context.Flag;
|
|||
import org.infinispan.context.InvocationContext;
|
||||
import org.infinispan.factories.annotations.Inject;
|
||||
import org.infinispan.factories.annotations.Start;
|
||||
import org.infinispan.filter.NullValueConverter;
|
||||
import org.infinispan.interceptors.CallInterceptor;
|
||||
import org.infinispan.metadata.EmbeddedMetadata;
|
||||
import org.infinispan.metadata.Metadata;
|
||||
|
@ -190,14 +191,16 @@ public class TombstoneCallInterceptor extends CallInterceptor {
|
|||
public Object visitSizeCommand(InvocationContext ctx, SizeCommand command) throws Throwable {
|
||||
Set<Flag> flags = command.getFlags();
|
||||
int size = 0;
|
||||
Map<Object, CacheEntry> contextEntries = ctx.getLookedUpEntries();
|
||||
AdvancedCache decoratedCache = cache.getAdvancedCache().withFlags(flags != null ? flags.toArray(new Flag[flags.size()]) : null);
|
||||
AdvancedCache decoratedCache = cache.getAdvancedCache();
|
||||
if (flags != null) {
|
||||
decoratedCache = decoratedCache.withFlags(flags.toArray(new Flag[flags.size()]));
|
||||
}
|
||||
// In non-transactional caches we don't care about context
|
||||
CloseableIterable<CacheEntry<Object, Object>> iterable = decoratedCache
|
||||
.filterEntries(Tombstone.EXCLUDE_TOMBSTONES);
|
||||
.filterEntries(Tombstone.EXCLUDE_TOMBSTONES).converter(NullValueConverter.getInstance());
|
||||
try {
|
||||
for (CacheEntry<Object, Object> entry : iterable) {
|
||||
if (entry.getValue() != null && size++ == Integer.MAX_VALUE) {
|
||||
if (size++ == Integer.MAX_VALUE) {
|
||||
return Integer.MAX_VALUE;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,8 +78,8 @@ public class VersionedCallInterceptor extends CallInterceptor {
|
|||
}
|
||||
|
||||
Object newValue = command.getValue();
|
||||
Object newVersion = null;
|
||||
long newTimestamp = Long.MIN_VALUE;
|
||||
Object newVersion;
|
||||
long newTimestamp;
|
||||
Object actualNewValue = newValue;
|
||||
boolean isRemoval = false;
|
||||
if (newValue instanceof VersionedEntry) {
|
||||
|
@ -93,8 +93,8 @@ public class VersionedCallInterceptor extends CallInterceptor {
|
|||
actualNewValue = ve.getValue();
|
||||
}
|
||||
}
|
||||
else if (newValue instanceof org.hibernate.cache.spi.entry.CacheEntry) {
|
||||
newVersion = ((org.hibernate.cache.spi.entry.CacheEntry) newValue).getVersion();
|
||||
else {
|
||||
throw new IllegalArgumentException(String.valueOf(newValue));
|
||||
}
|
||||
|
||||
if (newVersion == null) {
|
||||
|
@ -104,24 +104,20 @@ public class VersionedCallInterceptor extends CallInterceptor {
|
|||
}
|
||||
if (oldVersion == null) {
|
||||
assert oldValue == null || oldTimestamp != Long.MIN_VALUE;
|
||||
if (newTimestamp == Long.MIN_VALUE) {
|
||||
// remove, knowing the version
|
||||
setValue(e, newValue, expiringMetadata);
|
||||
}
|
||||
else if (newTimestamp <= oldTimestamp) {
|
||||
if (newTimestamp <= oldTimestamp) {
|
||||
// either putFromLoad or regular update/insert - in either case this update might come
|
||||
// when it was evicted/region-invalidated. In both cases, with old timestamp we'll leave
|
||||
// the invalid value
|
||||
assert oldValue == null;
|
||||
}
|
||||
else {
|
||||
setValue(e, newValue, defaultMetadata);
|
||||
setValue(e, actualNewValue, defaultMetadata);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
int compareResult = versionComparator.compare(newVersion, oldVersion);
|
||||
if (isRemoval && compareResult >= 0) {
|
||||
setValue(e, newValue, expiringMetadata);
|
||||
setValue(e, actualNewValue, expiringMetadata);
|
||||
}
|
||||
else if (compareResult > 0) {
|
||||
setValue(e, actualNewValue, defaultMetadata);
|
||||
|
@ -146,7 +142,10 @@ public class VersionedCallInterceptor extends CallInterceptor {
|
|||
public Object visitSizeCommand(InvocationContext ctx, SizeCommand command) throws Throwable {
|
||||
Set<Flag> flags = command.getFlags();
|
||||
int size = 0;
|
||||
AdvancedCache decoratedCache = cache.getAdvancedCache().withFlags(flags != null ? flags.toArray(new Flag[flags.size()]) : null);
|
||||
AdvancedCache decoratedCache = cache.getAdvancedCache();
|
||||
if (flags != null) {
|
||||
decoratedCache = decoratedCache.withFlags(flags.toArray(new Flag[flags.size()]));
|
||||
}
|
||||
// In non-transactional caches we don't care about context
|
||||
CloseableIterable<CacheEntry<Object, Void>> iterable = decoratedCache
|
||||
.filterEntries(VersionedEntry.EXCLUDE_EMPTY_EXTRACT_VALUE).converter(NullValueConverter.getInstance());
|
||||
|
|
|
@ -217,11 +217,7 @@ public class VersionedTest extends AbstractNonInvalidationTest {
|
|||
public void testEvictUpdateExpiration() throws Exception {
|
||||
// since the timestamp for update is based on session open/tx begin time, we have to do this sequentially
|
||||
sessionFactory().getCache().evictEntity(Item.class, itemId);
|
||||
|
||||
Map contents = Caches.entrySet(entityCache).toMap();
|
||||
assertEquals(1, contents.size());
|
||||
assertEquals(VersionedEntry.class, contents.get(itemId).getClass());
|
||||
|
||||
assertSingleEmpty();
|
||||
TIME_SERVICE.advance(1);
|
||||
|
||||
withTxSession(s -> {
|
||||
|
@ -235,6 +231,22 @@ public class VersionedTest extends AbstractNonInvalidationTest {
|
|||
assertSingleCacheEntry();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEvictAndPutFromLoad() throws Exception {
|
||||
sessionFactory().getCache().evictEntity(Item.class, itemId);
|
||||
assertSingleEmpty();
|
||||
TIME_SERVICE.advance(1);
|
||||
|
||||
withTxSession(s -> {
|
||||
Item item = s.load(Item.class, itemId);
|
||||
assertEquals("Original item", item.getDescription());
|
||||
});
|
||||
|
||||
assertSingleCacheEntry();
|
||||
TIME_SERVICE.advance(TIMEOUT + 1);
|
||||
assertSingleCacheEntry();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCollectionUpdate() throws Exception {
|
||||
// the first insert puts VersionedEntry(null, null, timestamp), so we have to wait a while to cache the entry
|
||||
|
|
Loading…
Reference in New Issue