diff --git a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java index 4f43650b03..f23f9abda4 100644 --- a/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java +++ b/hibernate-infinispan/src/main/java/org/hibernate/cache/infinispan/access/PutFromLoadValidator.java @@ -416,7 +416,7 @@ public class PutFromLoadValidator { PendingPutMap entry = it.next(); if (entry.acquireLock(60, TimeUnit.SECONDS)) { try { - entry.invalidate(now, expirationPeriod); + entry.invalidate(now); } finally { entry.releaseLock(); @@ -548,7 +548,7 @@ public class PutFromLoadValidator { continue; } long now = System.currentTimeMillis(); - pending.invalidate(now, expirationPeriod); + pending.invalidate(now); pending.addInvalidator(lockOwner, valueForPFER, now); } finally { @@ -638,6 +638,8 @@ public class PutFromLoadValidator { * This class is NOT THREAD SAFE. All operations on it must be performed with the lock held. */ private class PendingPutMap extends Lock { + // Number of pending puts which trigger garbage collection + private static final int GC_THRESHOLD = 10; private PendingPut singlePendingPut; private Map fullMap; private final java.util.concurrent.locks.Lock lock = new ReentrantLock(); @@ -705,6 +707,9 @@ public class PutFromLoadValidator { } else { fullMap.put( pendingPut.owner, pendingPut ); + if (fullMap.size() >= GC_THRESHOLD) { + gc(); + } } } else { @@ -750,7 +755,7 @@ public class PutFromLoadValidator { lock.unlock(); } - public void invalidate(long now, long expirationPeriod) { + public void invalidate(long now) { if ( singlePendingPut != null ) { if (singlePendingPut.invalidate(now, expirationPeriod)) { singlePendingPut = null; @@ -766,6 +771,27 @@ public class PutFromLoadValidator { } } + /** + * Running {@link #gc()} is important when the key is regularly queried but it is not + * present in DB. In such case, the putFromLoad would not be called at all and we would + * leak pending puts. Cache expiration should handle the case when the pending puts + * are not accessed frequently; when these are accessed, we have to do the housekeeping + * internally to prevent unlimited growth of the map. + * The pending puts will get their timestamps when the map reaches {@link #GC_THRESHOLD} + * entries; after expiration period these will be removed completely either through + * invalidation or when we try to register next pending put. + */ + private void gc() { + assert fullMap != null; + long now = System.currentTimeMillis(); + for ( Iterator it = fullMap.values().iterator(); it.hasNext(); ) { + PendingPut pp = it.next(); + if (pp.gc(now, expirationPeriod)) { + it.remove(); + } + } + } + public void addInvalidator(Object owner, Object valueForPFER, long now) { assert owner != null; if (invalidators == null) { @@ -885,6 +911,10 @@ public class PutFromLoadValidator { public boolean invalidate(long now, long expirationPeriod) { completed = true; + return gc(now, expirationPeriod); + } + + public boolean gc(long now, long expirationPeriod) { if (registeredTimestamp == Long.MIN_VALUE) { registeredTimestamp = now; } diff --git a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java index 902f7fa567..b38ffb88e8 100644 --- a/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java +++ b/hibernate-infinispan/src/test/java/org/hibernate/test/cache/infinispan/access/PutFromLoadValidatorUnitTestCase.java @@ -8,6 +8,8 @@ package org.hibernate.test.cache.infinispan.access; import javax.transaction.TransactionManager; +import java.lang.reflect.Method; +import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; @@ -23,6 +25,8 @@ import org.hibernate.cache.infinispan.access.PutFromLoadValidator; import org.hibernate.engine.spi.SessionImplementor; import org.hibernate.test.cache.infinispan.functional.cluster.DualNodeJtaTransactionManagerImpl; import org.hibernate.test.cache.infinispan.util.InfinispanTestingSetup; +import org.hibernate.testing.TestForIssue; +import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.manager.EmbeddedCacheManager; import org.infinispan.test.CacheManagerCallable; import org.infinispan.test.fwk.TestCacheManagerFactory; @@ -35,12 +39,8 @@ import org.junit.Test; import static org.infinispan.test.TestingUtil.withCacheManager; import static org.infinispan.test.TestingUtil.withTx; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; +import static org.junit.Assert.*; /** * Tests of {@link PutFromLoadValidator}. @@ -508,4 +508,58 @@ public class PutFromLoadValidatorUnitTestCase { return null; } } + + @Test + @TestForIssue(jiraKey = "HHH-9928") + public void testGetForNullReleasePuts() { + EmbeddedCacheManager cm = TestCacheManagerFactory.createCacheManager(false); + ConfigurationBuilder cb = new ConfigurationBuilder().read(InfinispanRegionFactory.PENDING_PUTS_CACHE_CONFIGURATION); + cb.expiration().maxIdle(500); + cm.defineConfiguration(InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME, cb.build()); + withCacheManager(new CacheManagerCallable(cm) { + @Override + public void call() { + PutFromLoadValidator testee = new PutFromLoadValidator(cm.getCache().getAdvancedCache(), cm); + long lastInsert = Long.MAX_VALUE; + for (int i = 0; i < 100; ++i) { + lastInsert = System.currentTimeMillis(); + try { + withTx(tm, new Callable() { + @Override + public Object call() throws Exception { + SessionImplementor session = mock (SessionImplementor.class); + testee.registerPendingPut(session, KEY1, 0); + return null; + } + }); + Thread.sleep(10); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + String ppName = cm.getCache().getName() + "-" + InfinispanRegionFactory.PENDING_PUTS_CACHE_NAME; + Map ppCache = cm.getCache(ppName, false); + assertNotNull(ppCache); + Object pendingPutMap = ppCache.get(KEY1); + long end = System.currentTimeMillis(); + if (end - lastInsert > 500) { + log.warn("Test took too long"); + return; + } + assertNotNull(pendingPutMap); + int size; + try { + Method sizeMethod = pendingPutMap.getClass().getMethod("size"); + sizeMethod.setAccessible(true); + size = (Integer) sizeMethod.invoke(pendingPutMap); + } catch (Exception e) { + throw new RuntimeException(e); + } + // some of the pending puts need to be expired by now + assertTrue(size < 100); + // but some are still registered + assertTrue(size > 0); + } + }); + } }