HHH-9928 Pending put leaks when the entity is not found in DB
This commit is contained in:
parent
93d39fa470
commit
6191489ca3
|
@ -416,7 +416,7 @@ public class PutFromLoadValidator {
|
||||||
PendingPutMap entry = it.next();
|
PendingPutMap entry = it.next();
|
||||||
if (entry.acquireLock(60, TimeUnit.SECONDS)) {
|
if (entry.acquireLock(60, TimeUnit.SECONDS)) {
|
||||||
try {
|
try {
|
||||||
entry.invalidate(now, expirationPeriod);
|
entry.invalidate(now);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
entry.releaseLock();
|
entry.releaseLock();
|
||||||
|
@ -548,7 +548,7 @@ public class PutFromLoadValidator {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
pending.invalidate(now, expirationPeriod);
|
pending.invalidate(now);
|
||||||
pending.addInvalidator(lockOwner, valueForPFER, now);
|
pending.addInvalidator(lockOwner, valueForPFER, now);
|
||||||
}
|
}
|
||||||
finally {
|
finally {
|
||||||
|
@ -638,6 +638,8 @@ public class PutFromLoadValidator {
|
||||||
* This class is NOT THREAD SAFE. All operations on it must be performed with the lock held.
|
* This class is NOT THREAD SAFE. All operations on it must be performed with the lock held.
|
||||||
*/
|
*/
|
||||||
private class PendingPutMap extends Lock {
|
private class PendingPutMap extends Lock {
|
||||||
|
// Number of pending puts which trigger garbage collection
|
||||||
|
private static final int GC_THRESHOLD = 10;
|
||||||
private PendingPut singlePendingPut;
|
private PendingPut singlePendingPut;
|
||||||
private Map<Object, PendingPut> fullMap;
|
private Map<Object, PendingPut> fullMap;
|
||||||
private final java.util.concurrent.locks.Lock lock = new ReentrantLock();
|
private final java.util.concurrent.locks.Lock lock = new ReentrantLock();
|
||||||
|
@ -705,6 +707,9 @@ public class PutFromLoadValidator {
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
fullMap.put( pendingPut.owner, pendingPut );
|
fullMap.put( pendingPut.owner, pendingPut );
|
||||||
|
if (fullMap.size() >= GC_THRESHOLD) {
|
||||||
|
gc();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -750,7 +755,7 @@ public class PutFromLoadValidator {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void invalidate(long now, long expirationPeriod) {
|
public void invalidate(long now) {
|
||||||
if ( singlePendingPut != null ) {
|
if ( singlePendingPut != null ) {
|
||||||
if (singlePendingPut.invalidate(now, expirationPeriod)) {
|
if (singlePendingPut.invalidate(now, expirationPeriod)) {
|
||||||
singlePendingPut = null;
|
singlePendingPut = null;
|
||||||
|
@ -766,6 +771,27 @@ public class PutFromLoadValidator {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Running {@link #gc()} is important when the key is regularly queried but it is not
|
||||||
|
* present in DB. In such case, the putFromLoad would not be called at all and we would
|
||||||
|
* leak pending puts. Cache expiration should handle the case when the pending puts
|
||||||
|
* are not accessed frequently; when these are accessed, we have to do the housekeeping
|
||||||
|
* internally to prevent unlimited growth of the map.
|
||||||
|
* The pending puts will get their timestamps when the map reaches {@link #GC_THRESHOLD}
|
||||||
|
* entries; after expiration period these will be removed completely either through
|
||||||
|
* invalidation or when we try to register next pending put.
|
||||||
|
*/
|
||||||
|
private void gc() {
|
||||||
|
assert fullMap != null;
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
for ( Iterator<PendingPut> it = fullMap.values().iterator(); it.hasNext(); ) {
|
||||||
|
PendingPut pp = it.next();
|
||||||
|
if (pp.gc(now, expirationPeriod)) {
|
||||||
|
it.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void addInvalidator(Object owner, Object valueForPFER, long now) {
|
public void addInvalidator(Object owner, Object valueForPFER, long now) {
|
||||||
assert owner != null;
|
assert owner != null;
|
||||||
if (invalidators == null) {
|
if (invalidators == null) {
|
||||||
|
@ -885,6 +911,10 @@ public class PutFromLoadValidator {
|
||||||
|
|
||||||
public boolean invalidate(long now, long expirationPeriod) {
|
public boolean invalidate(long now, long expirationPeriod) {
|
||||||
completed = true;
|
completed = true;
|
||||||
|
return gc(now, expirationPeriod);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean gc(long now, long expirationPeriod) {
|
||||||
if (registeredTimestamp == Long.MIN_VALUE) {
|
if (registeredTimestamp == Long.MIN_VALUE) {
|
||||||
registeredTimestamp = now;
|
registeredTimestamp = now;
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,6 +8,8 @@ package org.hibernate.test.cache.infinispan.access;
|
||||||
|
|
||||||
import javax.transaction.TransactionManager;
|
import javax.transaction.TransactionManager;
|
||||||
|
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
@ -23,6 +25,8 @@ import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
|
||||||
import org.hibernate.engine.spi.SessionImplementor;
|
import org.hibernate.engine.spi.SessionImplementor;
|
||||||
import org.hibernate.test.cache.infinispan.functional.cluster.DualNodeJtaTransactionManagerImpl;
|
import org.hibernate.test.cache.infinispan.functional.cluster.DualNodeJtaTransactionManagerImpl;
|
||||||
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
|
import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup;
|
||||||
|
import org.hibernate.testing.TestForIssue;
|
||||||
|
import org.infinispan.configuration.cache.ConfigurationBuilder;
|
||||||
import org.infinispan.manager.EmbeddedCacheManager;
|
import org.infinispan.manager.EmbeddedCacheManager;
|
||||||
import org.infinispan.test.CacheManagerCallable;
|
import org.infinispan.test.CacheManagerCallable;
|
||||||
import org.infinispan.test.fwk.TestCacheManagerFactory;
|
import org.infinispan.test.fwk.TestCacheManagerFactory;
|
||||||
|
@ -35,12 +39,8 @@ import org.junit.Test;
|
||||||
|
|
||||||
import static org.infinispan.test.TestingUtil.withCacheManager;
|
import static org.infinispan.test.TestingUtil.withCacheManager;
|
||||||
import static org.infinispan.test.TestingUtil.withTx;
|
import static org.infinispan.test.TestingUtil.withTx;
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
import static org.junit.Assert.assertNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
import static org.junit.Assert.fail;
|
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests of {@link PutFromLoadValidator}.
|
* Tests of {@link PutFromLoadValidator}.
|
||||||
|
@ -508,4 +508,58 @@ public class PutFromLoadValidatorUnitTestCase {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
@TestForIssue(jiraKey = "HHH-9928")
|
||||||
|
public void testGetForNullReleasePuts() {
|
||||||
|
EmbeddedCacheManager cm = TestCacheManagerFactory.createCacheManager(false);
|
||||||
|
ConfigurationBuilder cb = new ConfigurationBuilder().read(InfinispanRegionFactory.PENDING_PUTS_CACHE_CONFIGURATION);
|
||||||
|
cb.expiration().maxIdle(500);
|
||||||
|
cm.defineConfiguration(InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME, cb.build());
|
||||||
|
withCacheManager(new CacheManagerCallable(cm) {
|
||||||
|
@Override
|
||||||
|
public void call() {
|
||||||
|
PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm);
|
||||||
|
long lastInsert = Long.MAX_VALUE;
|
||||||
|
for (int i = 0; i < 100; ++i) {
|
||||||
|
lastInsert = System.currentTimeMillis();
|
||||||
|
try {
|
||||||
|
withTx(tm, new Callable<Object>() {
|
||||||
|
@Override
|
||||||
|
public Object call() throws Exception {
|
||||||
|
SessionImplementor session = mock (SessionImplementor.class);
|
||||||
|
testee.registerPendingPut(session, KEY1, 0);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Thread.sleep(10);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
String ppName = cm.getCache().getName() + "-" + InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME;
|
||||||
|
Map ppCache = cm.getCache(ppName, false);
|
||||||
|
assertNotNull(ppCache);
|
||||||
|
Object pendingPutMap = ppCache.get(KEY1);
|
||||||
|
long end = System.currentTimeMillis();
|
||||||
|
if (end - lastInsert > 500) {
|
||||||
|
log.warn("Test took too long");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
assertNotNull(pendingPutMap);
|
||||||
|
int size;
|
||||||
|
try {
|
||||||
|
Method sizeMethod = pendingPutMap.getClass().getMethod("size");
|
||||||
|
sizeMethod.setAccessible(true);
|
||||||
|
size = (Integer) sizeMethod.invoke(pendingPutMap);
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
// some of the pending puts need to be expired by now
|
||||||
|
assertTrue(size < 100);
|
||||||
|
// but some are still registered
|
||||||
|
assertTrue(size > 0);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue