Found bad growing cache scenario

This commit is contained in:
Michael Buckley 2021-06-25 11:01:41 -04:00
parent 1367e61c93
commit b31e464a90
1 changed files with 153 additions and 72 deletions

View File

@ -5,7 +5,10 @@ import ca.uhn.fhir.jpa.model.entity.TagDefinition;
import ca.uhn.fhir.jpa.model.entity.TagTypeEnum;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -19,6 +22,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
@ -58,75 +62,143 @@ class MemoryCacheServiceTest {
assertThat(retVal, equalTo(tagDef));
}
@Nested
public static class CaffeineAbuseTest {
Cache<Integer, Integer> myCache;
List<SlowFastJob> mySlowJobs;
List<SlowFastJob> myFastJobs;
ExecutorService myExecutor;
final boolean[] canProceed = new boolean[]{ false };
@AfterEach
public void tearDown() {
myExecutor.shutdown();
}
void withCacheOfSize(int theMaxSize) {
myCache = Caffeine.newBuilder().expireAfterWrite(60, TimeUnit.MINUTES).maximumSize(theMaxSize).build();
}
void fillCacheWithRange(int theStart, int theEnd) {
for (int i = theStart; i < theEnd; i++) {
myCache.put(i, i);
}
}
@Test
public void ensureCaffeineHandlesSlowReaders() throws InterruptedException, ExecutionException {
public void fullCacheHandlesSlowReaders() throws InterruptedException, ExecutionException {
// mimic our tag cache under pathological unbounded tag usage
// given a full cache
Cache<Integer, Integer> cache = Caffeine.newBuilder().expireAfterWrite(60, TimeUnit.MINUTES).maximumSize(10000).build();
for (int i = 30000; i < 50000; i++) {
cache.put(i, i);
}
//assertThat(cache.estimatedSize(), greaterThan(9999L));
final int nThreads = 80;
ExecutorService executor = Executors.newFixedThreadPool(nThreads);
withCacheOfSize(10000);
fillCacheWithRange(30000, 30000 + 10000);
final int nThreads = 40;
withExecutorOfSize(nThreads);
// when we spill the cache, and have delayed calculation.
final boolean[] canProceed = new boolean[]{ false };
List<SlowFastJob> slowJobs = new ArrayList<>();
List<SlowFastJob> fastJobs = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
// block all but 1 of the workers with a slow job
boolean slow = i < nThreads - 1;
SlowFastJob job = new SlowFastJob(i, slow, canProceed);
if (job.mySlowFlag) {
slowJobs.add(job);
} else {
fastJobs.add(job);
}
job.submit(executor);
}
startJobs(1000, (j) -> (j < nThreads - 1));
// wait for results to start appearing.
SlowFastJob firstFastJob = fastJobs.get(0);
SlowFastJob lastFastJob = fastJobs.get(fastJobs.size() - 1);
SlowFastJob firstFastJob = myFastJobs.get(0);
SlowFastJob lastFastJob = myFastJobs.get(myFastJobs.size() - 1);
firstFastJob.getOrTimeout("first future blocked by earlier waiters");
lastFastJob.getOrTimeout("last future blocked by earlier waiters");
firstFastJob.getOrTimeout("first fast job blocked by earlier waiters");
lastFastJob.getOrTimeout("last fast job blocked by earlier waiters");
for(SlowFastJob job: fastJobs) {
assertTrue(job.future.isDone());
}
// blocked items not done
for(SlowFastJob job: slowJobs) {
assertFalse(job.future.isDone());
}
// fast jobs done
myFastJobs.stream().forEach(SlowFastJob::assertDone);
// slow jobs still blocked
mySlowJobs.stream().forEach(SlowFastJob::assertNotDone);
// blocked items released
canProceed[0] = true;
for(SlowFastJob job: slowJobs) {
for(SlowFastJob job: mySlowJobs) {
job.getOrTimeout("released job doesn't complete");
}
}
executor.shutdown();
/**
* Identifies our problem with Caffeine.
*
* computeIfAbsent locks the hash node.
* This is not a problem with a full cache, since it will only block colliding entries.
* But it prevents growing the map, so until we hit the full 10k in Tags, it was single-threading ingestion.
*
*/
@Test
@Disabled("Fails with empty cache")
public void emptyCacheHandlesSlowReaders() throws InterruptedException, ExecutionException {
// mimic our tag cache under pathological unbounded tag usage
// given an empty cache
withCacheOfSize(10000);
final int nThreads = 10;
withExecutorOfSize(nThreads);
// when we spill the cache, and have delayed calculation.
// block only a single thread
startJobs(1000, (j) -> (j == nThreads));
// wait for results to start appearing.
SlowFastJob firstFastJob = myFastJobs.get(0);
SlowFastJob lastFastJob = myFastJobs.get(myFastJobs.size() - 1);
firstFastJob.getOrTimeout("first fast job blocked by earlier waiters");
lastFastJob.getOrTimeout("last fast job blocked by earlier waiters");
// blocked items released
canProceed[0] = true;
for(SlowFastJob job: mySlowJobs) {
job.getOrTimeout("released job doesn't complete");
}
}
private void startJobs(int jobCount, Predicate<Integer> slowPredicate) {
mySlowJobs = new ArrayList<>();
myFastJobs = new ArrayList<>();
for (int i = 0; i < jobCount; i++) {
boolean slow = slowPredicate.test(i);
//boolean slow = i == 0;
SlowFastJob job = new SlowFastJob(i, slow, myCache, canProceed);
if (job.mySlowFlag) {
mySlowJobs.add(job);
} else {
myFastJobs.add(job);
}
job.submit(myExecutor);
}
}
private void withExecutorOfSize(int nThreads) {
myExecutor = Executors.newFixedThreadPool(nThreads);
}
static class SlowFastJob implements Callable<Integer> {
final boolean mySlowFlag;
final int myValue;
final boolean[] myProceedFlag;
final Cache<Integer, Integer> myCache;
Future<Integer> future;
SlowFastJob(int theValue, boolean theSlowFlag, boolean[] theProceedFlag) {
SlowFastJob(int theValue, boolean theSlowFlag, Cache<Integer, Integer> theCache, boolean[] theProceedFlag) {
this.mySlowFlag = theSlowFlag;
this.myValue = theValue;
this.myProceedFlag = theProceedFlag;
this.myCache = theCache;
}
@Override
public Integer call() throws Exception {
return myCache.get(myValue, i -> computeValue());
}
private int computeValue() {
if (mySlowFlag) {
while(!myProceedFlag[0]) {
try {
@ -142,7 +214,7 @@ class MemoryCacheServiceTest {
public Integer getOrTimeout(String theMessage) throws InterruptedException, ExecutionException {
try {
return future.get(100, TimeUnit.MILLISECONDS);
return future.get(1, TimeUnit.SECONDS);
} catch (TimeoutException e) {
fail(theMessage);
return null;
@ -152,6 +224,15 @@ class MemoryCacheServiceTest {
public void submit(ExecutorService executor) {
future = executor.submit(this);
}
void assertDone() {
assertTrue(future.isDone());
}
void assertNotDone() {
assertFalse(future.isDone(), "job " + myValue + " not done");
}
}
}