From 631c057d458d8c10fe59b1f9ffc4facebe23afd8 Mon Sep 17 00:00:00 2001 From: Radim Vansa Date: Fri, 31 Jul 2015 11:05:21 +0200 Subject: [PATCH] HHH-9928 Pending put leaks when the entity is not found in DB --- .../infinispan/InfinispanRegionFactory.java | 35 +++++----- .../access/PutFromLoadValidator.java | 28 ++++++-- .../PutFromLoadValidatorUnitTestCase.java | 65 +++++++++++++++++-- .../cluster/DualNodeJtaTransactionImpl.java | 16 +++-- 4 files changed, 111 insertions(+), 33 deletions(-) diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/InfinispanRegionFactory.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/InfinispanRegionFactory.java index fca7b120c2..d96c1270a6 100644 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/InfinispanRegionFactory.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/InfinispanRegionFactory.java @@ -188,6 +188,11 @@ public class InfinispanRegionFactory implements RegionFactory { * Name of the pending puts cache. */ public static final String PENDING_PUTS_CACHE_NAME = "pending-puts"; + // A local, lightweight cache for pending puts, which is + // non-transactional and has aggressive expiration settings. + // Locking is still required since the putFromLoad validator + // code uses conditional operations (i.e. putIfAbsent). + public static final org.infinispan.configuration.cache.Configuration PENDING_PUTS_CACHE_CONFIGURATION; private EmbeddedCacheManager manager; @@ -199,6 +204,18 @@ public class InfinispanRegionFactory implements RegionFactory { private List regionNames = new ArrayList(); + static { + ConfigurationBuilder cb = new ConfigurationBuilder(); + cb + .clustering().cacheMode(CacheMode.LOCAL) + .transaction().transactionMode(TransactionMode.NON_TRANSACTIONAL) + .expiration().maxIdle(TimeUnit.SECONDS.toMillis(60)) + .storeAsBinary().enabled(false) + .locking().isolationLevel(IsolationLevel.READ_COMMITTED) + .jmxStatistics().disable().build(); + PENDING_PUTS_CACHE_CONFIGURATION = cb.build(); + } + /** * Create a new instance using the default configuration. */ @@ -331,7 +348,7 @@ public class InfinispanRegionFactory implements RegionFactory { } } defineGenericDataTypeCacheConfigurations( properties ); - definePendingPutsCache(); + manager.defineConfiguration( PENDING_PUTS_CACHE_NAME, PENDING_PUTS_CACHE_CONFIGURATION ); } catch (CacheException ce) { throw ce; @@ -341,22 +358,6 @@ public class InfinispanRegionFactory implements RegionFactory { } } - private void definePendingPutsCache() { - final ConfigurationBuilder builder = new ConfigurationBuilder(); - // A local, lightweight cache for pending puts, which is - // non-transactional and has aggressive expiration settings. - // Locking is still required since the putFromLoad validator - // code uses conditional operations (i.e. putIfAbsent). - builder.clustering().cacheMode( CacheMode.LOCAL ) - .transaction().transactionMode( TransactionMode.NON_TRANSACTIONAL ) - .expiration().maxIdle( TimeUnit.SECONDS.toMillis( 60 ) ) - .storeAsBinary().enabled( false ) - .locking().isolationLevel( IsolationLevel.READ_COMMITTED ) - .jmxStatistics().disable(); - - manager.defineConfiguration( PENDING_PUTS_CACHE_NAME, builder.build() ); - } - protected org.infinispan.transaction.lookup.TransactionManagerLookup createTransactionManagerLookup( Settings settings, Properties properties) { return new HibernateTransactionManagerLookup( settings, properties ); diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java index ea6909e430..413721ea36 100644 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java @@ -24,6 +24,7 @@ package org.hibernate.cache.infinispan.access; import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -40,6 +41,7 @@ import org.hibernate.cache.CacheException; import org.hibernate.cache.infinispan.InfinispanRegionFactory; import org.infinispan.AdvancedCache; +import org.infinispan.Cache; import org.infinispan.manager.EmbeddedCacheManager; /** @@ -126,6 +128,11 @@ public class PutFromLoadValidator { */ private volatile long invalidationTimestamp; + private static final int GC_THRESHOLD = 10; + + private final long expirationTimeout; + + /** * Creates a new put from load validator instance. * @@ -166,8 +173,10 @@ public class PutFromLoadValidator { public PutFromLoadValidator( EmbeddedCacheManager cacheManager, TransactionManager tm, long nakedPutInvalidationPeriod) { - this.pendingPuts = cacheManager - .getCache( InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME ); + Cache cache = cacheManager + .getCache(InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME); + this.pendingPuts = cache; + this.expirationTimeout = cache.getCacheConfiguration().expiration().maxIdle(); this.transactionManager = tm; this.nakedPutInvalidationPeriod = nakedPutInvalidationPeriod; } @@ -474,7 +483,7 @@ public class PutFromLoadValidator { *

* This class is NOT THREAD SAFE. All operations on it must be performed with the lock held. */ - private static class PendingPutMap { + private class PendingPutMap { private PendingPut singlePendingPut; private Map fullMap; private final Lock lock = new ReentrantLock(); @@ -491,6 +500,17 @@ public class PutFromLoadValidator { } else { fullMap.put( pendingPut.owner, pendingPut ); + if (fullMap.size() > GC_THRESHOLD) { + long now = System.currentTimeMillis(); + for (Iterator iterator = fullMap.values().iterator(); iterator.hasNext(); ) { + PendingPut pp = iterator.next(); + if (pp.timestamp == Long.MIN_VALUE) { + pp.timestamp = now; + } else if (now - pp.timestamp >= expirationTimeout) { + iterator.remove(); + } + } + } } } else { @@ -555,6 +575,7 @@ public class PutFromLoadValidator { private static class PendingPut { private final Object owner; private volatile boolean completed; + private long timestamp = Long.MIN_VALUE; private PendingPut(Object owner) { this.owner = owner; @@ -570,5 +591,4 @@ public class PutFromLoadValidator { timestamp = System.currentTimeMillis() + nakedPutInvalidationPeriod; } } - } diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java index 1a7f0cc8b5..d364a2fd3c 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java @@ -23,6 +23,8 @@ */ package org.hibernate.test.cache.infinispan.access; +import java.lang.reflect.Method; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -34,8 +36,11 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import javax.transaction.TransactionManager; +import org.hibernate.cache.infinispan.InfinispanRegionFactory; +import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.test.CacheManagerCallable; +import org.infinispan.test.TestingUtil; import org.infinispan.test.fwk.TestCacheManagerFactory; import org.infinispan.util.logging.Log; import org.infinispan.util.logging.LogFactory; @@ -47,11 +52,7 @@ import org.hibernate.cache.infinispan.access.PutFromLoadValidator; import org.hibernate.test.cache.infinispan.functional.cluster.DualNodeJtaTransactionManagerImpl; import static org.infinispan.test.TestingUtil.withCacheManager; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; /** * Tests of {@link PutFromLoadValidator}. @@ -425,7 +426,7 @@ public class PutFromLoadValidatorUnitTestCase { testee.invalidateRegion(); - // Do the registration + isPutValid calls + // Do the registration isPutValid calls executor.execute(r); executor.execute(r); executor.execute(r); @@ -531,4 +532,56 @@ public class PutFromLoadValidatorUnitTestCase { } } + + @Test + public void testGetForNullReleasePuts() { + EmbeddedCacheManager cm = TestCacheManagerFactory.createCacheManager(false); + ConfigurationBuilder cb = new ConfigurationBuilder().read(InfinispanRegionFactory.PENDING_PUTS_CACHE_CONFIGURATION); + cb.expiration().maxIdle(500); + cm.defineConfiguration(InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME, cb.build()); + withCacheManager(new CacheManagerCallable(cm) { + @Override + public void call() { + final PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache()); + long lastInsert = Long.MAX_VALUE; + for (int i = 0; i < 100; i++) { + lastInsert = System.currentTimeMillis(); + try { + TestingUtil.withTx(tm, new Callable() { + @Override + public Object call() throws Exception { + testee.registerPendingPut(KEY1); + return null; + } + }); + Thread.sleep(10); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + String ppName = InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME; + Map ppCache = cm.getCache(ppName, false); + assertNotNull(ppCache); + Object pendingPutMap = ppCache.get(KEY1); + long end = System.currentTimeMillis(); + if (end - lastInsert > 500) { + log.warn("Test took too long"); + return; + } + assertNotNull(pendingPutMap); + int size; + try { + Method sizeMethod = pendingPutMap.getClass().getMethod("size"); + sizeMethod.setAccessible(true); + size = (Integer) sizeMethod.invoke(pendingPutMap); + } catch (Exception e) { + throw new RuntimeException(e); + } + // some of the pending puts need to be expired by now + assertTrue(size < 100); + // but some are still registered + assertTrue(size > 0); + } + }); + } } diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeJtaTransactionImpl.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeJtaTransactionImpl.java index c02101263a..46a74ef484 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeJtaTransactionImpl.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/functional/cluster/DualNodeJtaTransactionImpl.java @@ -79,9 +79,11 @@ public class DualNodeJtaTransactionImpl implements Transaction { } else { status = Status.STATUS_PREPARING; - for (int i = 0; i < synchronizations.size(); i++) { - Synchronization s = (Synchronization) synchronizations.get(i); - s.beforeCompletion(); + if (synchronizations != null) { + for (int i = 0; i < synchronizations.size(); i++) { + Synchronization s = (Synchronization) synchronizations.get(i); + s.beforeCompletion(); + } } if (!runXaResourcePrepare()) { @@ -106,9 +108,11 @@ public class DualNodeJtaTransactionImpl implements Transaction { status = Status.STATUS_COMMITTED; - for (int i = 0; i < synchronizations.size(); i++) { - Synchronization s = (Synchronization) synchronizations.get(i); - s.afterCompletion(status); + if (synchronizations != null) { + for (int i = 0; i < synchronizations.size(); i++) { + Synchronization s = (Synchronization) synchronizations.get(i); + s.afterCompletion(status); + } } // status = Status.STATUS_NO_TRANSACTION;