HHH-9928 Pending put leaks when the entity is not found in DB

This commit is contained in:
Radim Vansa 2015-07-31 11:05:21 +02:00
parent 3fcf745967
commit 631c057d45
4 changed files with 111 additions and 33 deletions

View File

@ -188,6 +188,11 @@ public class InfinispanRegionFactory implements RegionFactory {
* Name of the pending puts cache. * Name of the pending puts cache.
*/ */
public static final String PENDING_PUTS_CACHE_NAME = "pending-puts"; public static final String PENDING_PUTS_CACHE_NAME = "pending-puts";
// A local, lightweight cache for pending puts, which is
// non-transactional and has aggressive expiration settings.
// Locking is still required since the putFromLoad validator
// code uses conditional operations (i.e. putIfAbsent).
public static final org.infinispan.configuration.cache.Configuration PENDING_PUTS_CACHE_CONFIGURATION;
private EmbeddedCacheManager manager; private EmbeddedCacheManager manager;
@ -199,6 +204,18 @@ public class InfinispanRegionFactory implements RegionFactory {
private List<String> regionNames = new ArrayList<String>(); private List<String> regionNames = new ArrayList<String>();
static {
ConfigurationBuilder cb = new ConfigurationBuilder();
cb
.clustering().cacheMode(CacheMode.LOCAL)
.transaction().transactionMode(TransactionMode.NON_TRANSACTIONAL)
.expiration().maxIdle(TimeUnit.SECONDS.toMillis(60))
.storeAsBinary().enabled(false)
.locking().isolationLevel(IsolationLevel.READ_COMMITTED)
.jmxStatistics().disable().build();
PENDING_PUTS_CACHE_CONFIGURATION = cb.build();
}
/** /**
* Create a new instance using the default configuration. * Create a new instance using the default configuration.
*/ */
@ -331,7 +348,7 @@ public class InfinispanRegionFactory implements RegionFactory {
} }
} }
defineGenericDataTypeCacheConfigurations( properties ); defineGenericDataTypeCacheConfigurations( properties );
definePendingPutsCache(); manager.defineConfiguration( PENDING_PUTS_CACHE_NAME, PENDING_PUTS_CACHE_CONFIGURATION );
} }
catch (CacheException ce) { catch (CacheException ce) {
throw ce; throw ce;
@ -341,22 +358,6 @@ public class InfinispanRegionFactory implements RegionFactory {
} }
} }
private void definePendingPutsCache() {
final ConfigurationBuilder builder = new ConfigurationBuilder();
// A local, lightweight cache for pending puts, which is
// non-transactional and has aggressive expiration settings.
// Locking is still required since the putFromLoad validator
// code uses conditional operations (i.e. putIfAbsent).
builder.clustering().cacheMode( CacheMode.LOCAL )
.transaction().transactionMode( TransactionMode.NON_TRANSACTIONAL )
.expiration().maxIdle( TimeUnit.SECONDS.toMillis( 60 ) )
.storeAsBinary().enabled( false )
.locking().isolationLevel( IsolationLevel.READ_COMMITTED )
.jmxStatistics().disable();
manager.defineConfiguration( PENDING_PUTS_CACHE_NAME, builder.build() );
}
protected org.infinispan.transaction.lookup.TransactionManagerLookup createTransactionManagerLookup( protected org.infinispan.transaction.lookup.TransactionManagerLookup createTransactionManagerLookup(
Settings settings, Properties properties) { Settings settings, Properties properties) {
return new HibernateTransactionManagerLookup( settings, properties ); return new HibernateTransactionManagerLookup( settings, properties );

View File

@ -24,6 +24,7 @@
package org.hibernate.cache.infinispan.access; package org.hibernate.cache.infinispan.access;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -40,6 +41,7 @@ import org.hibernate.cache.CacheException;
import org.hibernate.cache.infinispan.InfinispanRegionFactory; import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.infinispan.AdvancedCache; import org.infinispan.AdvancedCache;
import org.infinispan.Cache;
import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.manager.EmbeddedCacheManager;
/** /**
@ -126,6 +128,11 @@ public class PutFromLoadValidator {
*/ */
private volatile long invalidationTimestamp; private volatile long invalidationTimestamp;
private static final int GC_THRESHOLD = 10;
private final long expirationTimeout;
/** /**
* Creates a new put from load validator instance. * Creates a new put from load validator instance.
* *
@ -166,8 +173,10 @@ public class PutFromLoadValidator {
public PutFromLoadValidator( public PutFromLoadValidator(
EmbeddedCacheManager cacheManager, EmbeddedCacheManager cacheManager,
TransactionManager tm, long nakedPutInvalidationPeriod) { TransactionManager tm, long nakedPutInvalidationPeriod) {
this.pendingPuts = cacheManager Cache<Object, PendingPutMap> cache = cacheManager
.getCache( InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME ); .getCache(InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME);
this.pendingPuts = cache;
this.expirationTimeout = cache.getCacheConfiguration().expiration().maxIdle();
this.transactionManager = tm; this.transactionManager = tm;
this.nakedPutInvalidationPeriod = nakedPutInvalidationPeriod; this.nakedPutInvalidationPeriod = nakedPutInvalidationPeriod;
} }
@ -474,7 +483,7 @@ public class PutFromLoadValidator {
* <p/> * <p/>
* This class is NOT THREAD SAFE. All operations on it must be performed with the lock held. * This class is NOT THREAD SAFE. All operations on it must be performed with the lock held.
*/ */
private static class PendingPutMap { private class PendingPutMap {
private PendingPut singlePendingPut; private PendingPut singlePendingPut;
private Map<Object, PendingPut> fullMap; private Map<Object, PendingPut> fullMap;
private final Lock lock = new ReentrantLock(); private final Lock lock = new ReentrantLock();
@ -491,6 +500,17 @@ public class PutFromLoadValidator {
} }
else { else {
fullMap.put( pendingPut.owner, pendingPut ); fullMap.put( pendingPut.owner, pendingPut );
if (fullMap.size() > GC_THRESHOLD) {
long now = System.currentTimeMillis();
for (Iterator<PendingPut> iterator = fullMap.values().iterator(); iterator.hasNext(); ) {
PendingPut pp = iterator.next();
if (pp.timestamp == Long.MIN_VALUE) {
pp.timestamp = now;
} else if (now - pp.timestamp >= expirationTimeout) {
iterator.remove();
}
}
}
} }
} }
else { else {
@ -555,6 +575,7 @@ public class PutFromLoadValidator {
private static class PendingPut { private static class PendingPut {
private final Object owner; private final Object owner;
private volatile boolean completed; private volatile boolean completed;
private long timestamp = Long.MIN_VALUE;
private PendingPut(Object owner) { private PendingPut(Object owner) {
this.owner = owner; this.owner = owner;
@ -570,5 +591,4 @@ public class PutFromLoadValidator {
timestamp = System.currentTimeMillis() + nakedPutInvalidationPeriod; timestamp = System.currentTimeMillis() + nakedPutInvalidationPeriod;
} }
} }
} }

View File

@ -23,6 +23,8 @@
*/ */
package org.hibernate.test.cache.infinispan.access; package org.hibernate.test.cache.infinispan.access;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
@ -34,8 +36,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import javax.transaction.TransactionManager; import javax.transaction.TransactionManager;
import org.hibernate.cache.infinispan.InfinispanRegionFactory;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.test.CacheManagerCallable; import org.infinispan.test.CacheManagerCallable;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory; import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.infinispan.util.logging.Log; import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory; import org.infinispan.util.logging.LogFactory;
@ -47,11 +52,7 @@ import org.hibernate.cache.infinispan.access.PutFromLoadValidator;
import org.hibernate.test.cache.infinispan.functional.cluster.DualNodeJtaTransactionManagerImpl; import org.hibernate.test.cache.infinispan.functional.cluster.DualNodeJtaTransactionManagerImpl;
import static org.infinispan.test.TestingUtil.withCacheManager; import static org.infinispan.test.TestingUtil.withCacheManager;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.*;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
/** /**
* Tests of {@link PutFromLoadValidator}. * Tests of {@link PutFromLoadValidator}.
@ -425,7 +426,7 @@ public class PutFromLoadValidatorUnitTestCase {
testee.invalidateRegion(); testee.invalidateRegion();
// Do the registration + isPutValid calls // Do the registration isPutValid calls
executor.execute(r); executor.execute(r);
executor.execute(r); executor.execute(r);
executor.execute(r); executor.execute(r);
@ -531,4 +532,56 @@ public class PutFromLoadValidatorUnitTestCase {
} }
} }
@Test
public void testGetForNullReleasePuts() {
EmbeddedCacheManager cm = TestCacheManagerFactory.createCacheManager(false);
ConfigurationBuilder cb = new ConfigurationBuilder().read(InfinispanRegionFactory.PENDING_PUTS_CACHE_CONFIGURATION);
cb.expiration().maxIdle(500);
cm.defineConfiguration(InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME, cb.build());
withCacheManager(new CacheManagerCallable(cm) {
@Override
public void call() {
final PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache());
long lastInsert = Long.MAX_VALUE;
for (int i = 0; i < 100; i++) {
lastInsert = System.currentTimeMillis();
try {
TestingUtil.withTx(tm, new Callable<Object>() {
@Override
public Object call() throws Exception {
testee.registerPendingPut(KEY1);
return null;
}
});
Thread.sleep(10);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
String ppName = InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME;
Map ppCache = cm.getCache(ppName, false);
assertNotNull(ppCache);
Object pendingPutMap = ppCache.get(KEY1);
long end = System.currentTimeMillis();
if (end - lastInsert > 500) {
log.warn("Test took too long");
return;
}
assertNotNull(pendingPutMap);
int size;
try {
Method sizeMethod = pendingPutMap.getClass().getMethod("size");
sizeMethod.setAccessible(true);
size = (Integer) sizeMethod.invoke(pendingPutMap);
} catch (Exception e) {
throw new RuntimeException(e);
}
// some of the pending puts need to be expired by now
assertTrue(size < 100);
// but some are still registered
assertTrue(size > 0);
}
});
}
} }

View File

@ -79,10 +79,12 @@ public class DualNodeJtaTransactionImpl implements Transaction {
} else { } else {
status = Status.STATUS_PREPARING; status = Status.STATUS_PREPARING;
if (synchronizations != null) {
for (int i = 0; i < synchronizations.size(); i++) { for (int i = 0; i < synchronizations.size(); i++) {
Synchronization s = (Synchronization) synchronizations.get(i); Synchronization s = (Synchronization) synchronizations.get(i);
s.beforeCompletion(); s.beforeCompletion();
} }
}
if (!runXaResourcePrepare()) { if (!runXaResourcePrepare()) {
status = Status.STATUS_ROLLING_BACK; status = Status.STATUS_ROLLING_BACK;
@ -106,10 +108,12 @@ public class DualNodeJtaTransactionImpl implements Transaction {
status = Status.STATUS_COMMITTED; status = Status.STATUS_COMMITTED;
if (synchronizations != null) {
for (int i = 0; i < synchronizations.size(); i++) { for (int i = 0; i < synchronizations.size(); i++) {
Synchronization s = (Synchronization) synchronizations.get(i); Synchronization s = (Synchronization) synchronizations.get(i);
s.afterCompletion(status); s.afterCompletion(status);
} }
}
// status = Status.STATUS_NO_TRANSACTION; // status = Status.STATUS_NO_TRANSACTION;
jtaTransactionManager.endCurrent(this); jtaTransactionManager.endCurrent(this);