From b31e464a905d7020470dbe6b126147ec44776251 Mon Sep 17 00:00:00 2001 From: Michael Buckley Date: Fri, 25 Jun 2021 11:01:41 -0400 Subject: [PATCH] Found bad growing cache scenario --- .../fhir/jpa/util/MemoryCacheServiceTest.java | 225 ++++++++++++------ 1 file changed, 153 insertions(+), 72 deletions(-) diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/util/MemoryCacheServiceTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/util/MemoryCacheServiceTest.java index b3c45bdd65a..f281bdb9906 100644 --- a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/util/MemoryCacheServiceTest.java +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/util/MemoryCacheServiceTest.java @@ -5,7 +5,10 @@ import ca.uhn.fhir.jpa.model.entity.TagDefinition; import ca.uhn.fhir.jpa.model.entity.TagTypeEnum; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,6 +22,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -58,99 +62,176 @@ class MemoryCacheServiceTest { assertThat(retVal, equalTo(tagDef)); } - @Test - public void ensureCaffeineHandlesSlowReaders() throws InterruptedException, ExecutionException { - // mimic our tag cache under pathological unbounded tag usage + @Nested + public static class CaffeineAbuseTest { - // given a full cache - Cache cache = Caffeine.newBuilder().expireAfterWrite(60, TimeUnit.MINUTES).maximumSize(10000).build(); - for (int i = 30000; i < 50000; i++) { - cache.put(i, i); - } - //assertThat(cache.estimatedSize(), greaterThan(9999L)); - final int nThreads = 80; - ExecutorService executor = Executors.newFixedThreadPool(nThreads); - - // when we spill the cache, and have delayed calculation. + Cache myCache; + List mySlowJobs; + List myFastJobs; + ExecutorService myExecutor; final boolean[] canProceed = new boolean[]{ false }; - List slowJobs = new ArrayList<>(); - List fastJobs = new ArrayList<>(); - for (int i = 0; i < 1000; i++) { - // block all but 1 of the workers with a slow job - boolean slow = i < nThreads - 1; - SlowFastJob job = new SlowFastJob(i, slow, canProceed); - if (job.mySlowFlag) { - slowJobs.add(job); - } else { - fastJobs.add(job); + + @AfterEach + public void tearDown() { + myExecutor.shutdown(); + } + + void withCacheOfSize(int theMaxSize) { + myCache = Caffeine.newBuilder().expireAfterWrite(60, TimeUnit.MINUTES).maximumSize(theMaxSize).build(); + } + + void fillCacheWithRange(int theStart, int theEnd) { + for (int i = theStart; i < theEnd; i++) { + myCache.put(i, i); } - job.submit(executor); } - // wait for results to start appearing. - SlowFastJob firstFastJob = fastJobs.get(0); - SlowFastJob lastFastJob = fastJobs.get(fastJobs.size() - 1); + @Test + public void fullCacheHandlesSlowReaders() throws InterruptedException, ExecutionException { + // mimic our tag cache under pathological unbounded tag usage - firstFastJob.getOrTimeout("first future blocked by earlier waiters"); - lastFastJob.getOrTimeout("last future blocked by earlier waiters"); + // given a full cache + withCacheOfSize(10000); + fillCacheWithRange(30000, 30000 + 10000); - for(SlowFastJob job: fastJobs) { - assertTrue(job.future.isDone()); + final int nThreads = 40; + withExecutorOfSize(nThreads); + + // when we spill the cache, and have delayed calculation. + // block all but 1 of the workers with a slow job + startJobs(1000, (j) -> (j < nThreads - 1)); + + // wait for results to start appearing. + SlowFastJob firstFastJob = myFastJobs.get(0); + SlowFastJob lastFastJob = myFastJobs.get(myFastJobs.size() - 1); + + firstFastJob.getOrTimeout("first fast job blocked by earlier waiters"); + lastFastJob.getOrTimeout("last fast job blocked by earlier waiters"); + + // fast jobs done + myFastJobs.stream().forEach(SlowFastJob::assertDone); + // slow jobs still blocked + mySlowJobs.stream().forEach(SlowFastJob::assertNotDone); + + // blocked items released + canProceed[0] = true; + + for(SlowFastJob job: mySlowJobs) { + job.getOrTimeout("released job doesn't complete"); + } } - // blocked items not done - for(SlowFastJob job: slowJobs) { - assertFalse(job.future.isDone()); + /** + * Identifies our problem with Caffeine. + * + * computeIfAbsent locks the hash node. + * This is not a problem with a full cache, since it will only block colliding entries. + * But it prevents growing the map, so until we hit the full 10k in Tags, it was single-threading ingestion. + * + */ + @Test + @Disabled("Fails with empty cache") + public void emptyCacheHandlesSlowReaders() throws InterruptedException, ExecutionException { + // mimic our tag cache under pathological unbounded tag usage + + // given an empty cache + withCacheOfSize(10000); + + final int nThreads = 10; + withExecutorOfSize(nThreads); + + // when we spill the cache, and have delayed calculation. + // block only a single thread + startJobs(1000, (j) -> (j == nThreads)); + + // wait for results to start appearing. + SlowFastJob firstFastJob = myFastJobs.get(0); + SlowFastJob lastFastJob = myFastJobs.get(myFastJobs.size() - 1); + + firstFastJob.getOrTimeout("first fast job blocked by earlier waiters"); + lastFastJob.getOrTimeout("last fast job blocked by earlier waiters"); + + // blocked items released + canProceed[0] = true; + + for(SlowFastJob job: mySlowJobs) { + job.getOrTimeout("released job doesn't complete"); + } } - // blocked items released - canProceed[0] = true; - - for(SlowFastJob job: slowJobs) { - job.getOrTimeout("released job doesn't complete"); + private void startJobs(int jobCount, Predicate slowPredicate) { + mySlowJobs = new ArrayList<>(); + myFastJobs = new ArrayList<>(); + for (int i = 0; i < jobCount; i++) { + boolean slow = slowPredicate.test(i); + //boolean slow = i == 0; + SlowFastJob job = new SlowFastJob(i, slow, myCache, canProceed); + if (job.mySlowFlag) { + mySlowJobs.add(job); + } else { + myFastJobs.add(job); + } + job.submit(myExecutor); + } } - executor.shutdown(); - } - - static class SlowFastJob implements Callable { - final boolean mySlowFlag; - final int myValue; - final boolean[] myProceedFlag; - Future future; - - SlowFastJob(int theValue, boolean theSlowFlag, boolean[] theProceedFlag) { - this.mySlowFlag = theSlowFlag; - this.myValue = theValue; - this.myProceedFlag = theProceedFlag; + private void withExecutorOfSize(int nThreads) { + myExecutor = Executors.newFixedThreadPool(nThreads); } - @Override - public Integer call() throws Exception { - if (mySlowFlag) { - while(!myProceedFlag[0]) { - try { - Thread.sleep(100); - ourLog.debug("yawn " + myValue); - } catch (InterruptedException e) { - // empty + static class SlowFastJob implements Callable { + final boolean mySlowFlag; + final int myValue; + final boolean[] myProceedFlag; + final Cache myCache; + Future future; + + SlowFastJob(int theValue, boolean theSlowFlag, Cache theCache, boolean[] theProceedFlag) { + this.mySlowFlag = theSlowFlag; + this.myValue = theValue; + this.myProceedFlag = theProceedFlag; + this.myCache = theCache; + } + + @Override + public Integer call() throws Exception { + return myCache.get(myValue, i -> computeValue()); + } + + private int computeValue() { + if (mySlowFlag) { + while(!myProceedFlag[0]) { + try { + Thread.sleep(100); + ourLog.debug("yawn " + myValue); + } catch (InterruptedException e) { + // empty + } } } + return myValue; } - return myValue; - } - public Integer getOrTimeout(String theMessage) throws InterruptedException, ExecutionException { - try { - return future.get(100, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - fail(theMessage); - return null; + public Integer getOrTimeout(String theMessage) throws InterruptedException, ExecutionException { + try { + return future.get(1, TimeUnit.SECONDS); + } catch (TimeoutException e) { + fail(theMessage); + return null; + } } - } - public void submit(ExecutorService executor) { - future = executor.submit(this); + public void submit(ExecutorService executor) { + future = executor.submit(this); + } + + void assertDone() { + assertTrue(future.isDone()); + } + + void assertNotDone() { + assertFalse(future.isDone(), "job " + myValue + " not done"); + } } }