diff --git a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/MemoryCacheService.java b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/MemoryCacheService.java index a91d4488f65..ef27a1a5b0b 100644 --- a/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/MemoryCacheService.java +++ b/hapi-fhir-jpaserver-base/src/main/java/ca/uhn/fhir/jpa/util/MemoryCacheService.java @@ -41,6 +41,8 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import static java.util.concurrent.TimeUnit.MINUTES; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.commons.lang3.StringUtils.isNotBlank; /** @@ -52,7 +54,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; public class MemoryCacheService { @Autowired - private DaoConfig myDaoConfig; + DaoConfig myDaoConfig; private EnumMap> myCaches; @@ -69,16 +71,16 @@ public class MemoryCacheService { switch (next) { case CONCEPT_TRANSLATION: case CONCEPT_TRANSLATION_REVERSE: - timeoutSeconds = myDaoConfig.getTranslationCachesExpireAfterWriteInMinutes() * 1000; + timeoutSeconds = SECONDS.convert(myDaoConfig.getTranslationCachesExpireAfterWriteInMinutes(), MINUTES); maximumSize = 10000; break; case PID_TO_FORCED_ID: case FORCED_ID_TO_PID: case MATCH_URL: - timeoutSeconds = 60; + timeoutSeconds = SECONDS.convert(1, MINUTES); maximumSize = 10000; if (myDaoConfig.isMassIngestionMode()) { - timeoutSeconds = 3000; + timeoutSeconds = SECONDS.convert(50, MINUTES); maximumSize = 100000; } break; @@ -87,12 +89,16 @@ public class MemoryCacheService { case RESOURCE_LOOKUP: case RESOURCE_CONDITIONAL_CREATE_VERSION: default: - timeoutSeconds = 60; + timeoutSeconds = SECONDS.convert(1, MINUTES); maximumSize = 10000; break; } - Cache nextCache = Caffeine.newBuilder().expireAfterWrite(timeoutSeconds, TimeUnit.MINUTES).maximumSize(maximumSize).build(); + Cache nextCache = Caffeine.newBuilder() + .expireAfterWrite(timeoutSeconds, SECONDS) + .maximumSize(maximumSize) + .build(); + myCaches.put(next, nextCache); } diff --git a/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/util/MemoryCacheServiceTest.java b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/util/MemoryCacheServiceTest.java new file mode 100644 index 00000000000..87bf06bfba6 --- /dev/null +++ b/hapi-fhir-jpaserver-base/src/test/java/ca/uhn/fhir/jpa/util/MemoryCacheServiceTest.java @@ -0,0 +1,158 @@ +package ca.uhn.fhir.jpa.util; + +import ca.uhn.fhir.jpa.api.config.DaoConfig; +import ca.uhn.fhir.jpa.model.entity.TagDefinition; +import ca.uhn.fhir.jpa.model.entity.TagTypeEnum; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +class MemoryCacheServiceTest { + private static final Logger ourLog = LoggerFactory.getLogger(MemoryCacheServiceTest.class); + MemoryCacheService mySvc; + + @BeforeEach + public void setUp() { + DaoConfig daoConfig = new DaoConfig(); + daoConfig.setMassIngestionMode(false); + mySvc = new MemoryCacheService(); + mySvc.myDaoConfig = daoConfig; + } + + @Test + public void simpleTagCacheRetrieve() { + String system = "http://example.com"; + TagTypeEnum type = TagTypeEnum.TAG; + String code = "t"; + + MemoryCacheService.TagDefinitionCacheKey cacheKey = new MemoryCacheService.TagDefinitionCacheKey(type, system, code); + mySvc.start(); + + TagDefinition retVal = mySvc.getIfPresent(MemoryCacheService.CacheEnum.TAG_DEFINITION, cacheKey); + assertThat(retVal, nullValue()); + + TagDefinition tagDef = new TagDefinition(type, system, code, "theLabel"); + mySvc.put(MemoryCacheService.CacheEnum.TAG_DEFINITION, cacheKey, tagDef); + + retVal = mySvc.getIfPresent(MemoryCacheService.CacheEnum.TAG_DEFINITION, cacheKey); + assertThat(retVal, equalTo(tagDef)); + } + + @Test + public void ensureCaffeineHandlesSlowReaders() throws InterruptedException, ExecutionException { + // mimic our tag cache under pathological unbounded tag usage + + // given a full cache + Cache cache = Caffeine.newBuilder().expireAfterWrite(60, TimeUnit.MINUTES).maximumSize(10000).build(); + for (int i = 30000; i < 50000; i++) { + cache.put(i, i); + } + //assertThat(cache.estimatedSize(), greaterThan(9999L)); + final int nThreads = 80; + ExecutorService executor = Executors.newFixedThreadPool(nThreads); + + // when we spill the cache, and have delayed calculation. + final boolean[] canProceed = new boolean[]{ false }; + List jobs = new ArrayList<>(); + List slowJobs = new ArrayList<>(); + List fastJobs = new ArrayList<>(); + for (int i = 0; i < 1000; i++) { + // block all but 1 of the workers with a slow job + boolean slow = i < nThreads - 1; + SlowFastJob job = new SlowFastJob(i, slow, canProceed); + if (job.mySlowFlag) { + slowJobs.add(job); + } else { + fastJobs.add(job); + } + jobs.add(job); + job.submit(executor); + } + + // wait for results to start appearing. + SlowFastJob firstFastJob = fastJobs.get(0); + SlowFastJob lastFastJob = fastJobs.get(fastJobs.size() - 1); + + firstFastJob.getOrTimeout("first future blocked by earlier waiters"); + lastFastJob.getOrTimeout("last future blocked by earlier waiters"); + + for(SlowFastJob job: fastJobs) { + assertTrue(job.future.isDone()); + } + + // blocked items not done + for(SlowFastJob job: slowJobs) { + assertFalse(job.future.isDone()); + } + + // blocked items released + canProceed[0] = true; + + for(SlowFastJob job: slowJobs) { + job.getOrTimeout("released job doesn't complete"); + } + + executor.shutdown(); + } + + static class SlowFastJob implements Callable { + final boolean mySlowFlag; + final int myValue; + final boolean[] myProceedFlag; + Future future; + + SlowFastJob(int theValue, boolean theSlowFlag, boolean[] theProceedFlag) { + this.mySlowFlag = theSlowFlag; + this.myValue = theValue; + this.myProceedFlag = theProceedFlag; + } + + @Override + public Integer call() throws Exception { + if (mySlowFlag) { + while(!myProceedFlag[0]) { + try { + Thread.sleep(100); + ourLog.debug("yawn " + myValue); + } catch (InterruptedException e) { } + } + } + return myValue; + } + + public Integer getOrTimeout(String theMessage) throws InterruptedException, ExecutionException { + try { + return future.get(100, TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + fail(theMessage); + return null; + } + } + + public void submit(ExecutorService executor) { + future = executor.submit(this); + } + } + + +}