SOLR-10141: add test for underlying cache

This commit is contained in:
yonik 2017-02-17 21:21:05 -05:00
parent 6804f36942
commit 33e398c021
1 changed files with 121 additions and 6 deletions

View File

@ -21,11 +21,11 @@ import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import com.github.benmanes.caffeine.cache.*;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.junit.Test; import org.junit.Test;
public class BlockCacheTest extends LuceneTestCase { public class BlockCacheTest extends LuceneTestCase {
@Test @Test
public void testBlockCache() { public void testBlockCache() {
int blocksInTest = 2000000; int blocksInTest = 2000000;
@ -104,11 +104,11 @@ public class BlockCacheTest extends LuceneTestCase {
final long totalMemory = 2 * slabSize; // 2 slabs of memory, so only half of what is needed for all blocks final long totalMemory = 2 * slabSize; // 2 slabs of memory, so only half of what is needed for all blocks
/*** /***
final int blocksInTest = 16384; // pick something bigger than 256, since that would lead to a slab size of 64 blocks and the bitset locks would consist of a single word. final int blocksInTest = 16384; // pick something bigger than 256, since that would lead to a slab size of 64 blocks and the bitset locks would consist of a single word.
final int blockSize = 1024; final int blockSize = 1024;
final int slabSize = blocksInTest * blockSize / 4; final int slabSize = blocksInTest * blockSize / 4;
final long totalMemory = 2 * slabSize; // 2 slabs of memory, so only half of what is needed for all blocks final long totalMemory = 2 * slabSize; // 2 slabs of memory, so only half of what is needed for all blocks
***/ ***/
final int nThreads=64; final int nThreads=64;
final int nReads=1000000; final int nReads=1000000;
@ -219,4 +219,119 @@ public class BlockCacheTest extends LuceneTestCase {
assertFalse( failed.get() ); assertFalse( failed.get() );
} }
static class Val {
long key;
AtomicBoolean live = new AtomicBoolean(true);
}
// Sanity test the underlying concurrent map that BlockCache is using, in the same way that we use it.
@Test
@AwaitsFix(bugUrl = "https://issues.apache.org/jira/browse/SOLR-10141")
public void testCacheConcurrent() throws Exception {
Random rnd = random();
final int blocksInTest = 400;
final int maxEntries = blocksInTest/2;
final int nThreads=64;
final int nReads=10000000;
final int readsPerThread=nReads/nThreads;
final int readLastBlockOdds=10; // odds (1 in N) of the next block operation being on the same block as the previous operation... helps flush concurrency issues
final boolean updateAnyway = true; // sometimes insert a new entry for the key even if one was found
final AtomicLong hits = new AtomicLong();
final AtomicLong removals = new AtomicLong();
final AtomicLong inserts = new AtomicLong();
RemovalListener<Long,Val> listener = (k, v, removalCause) -> {
assert v.key == k;
if (!v.live.compareAndSet(true, false)) {
throw new RuntimeException("listener called more than once! k=" + k + " v=" + v + " removalCause=" + removalCause);
// return; // use this variant if listeners may be called more than once
}
removals.incrementAndGet();
};
com.github.benmanes.caffeine.cache.Cache<Long,Val> cache = Caffeine.newBuilder()
.removalListener(listener)
.maximumSize(maxEntries)
.executor(Runnable::run)
.build();
final AtomicBoolean failed = new AtomicBoolean(false);
final AtomicLong lastBlock = new AtomicLong();
final AtomicLong maxObservedSize = new AtomicLong();
Thread[] threads = new Thread[nThreads];
for (int i=0; i<threads.length; i++) {
final long seed = rnd.nextLong();
threads[i] = new Thread() {
Random r;
@Override
public void run() {
try {
r = new Random(seed);
test(readsPerThread);
} catch (Throwable e) {
failed.set(true);
e.printStackTrace();
}
}
public void test(int iter) {
for (int i=0; i<iter; i++) {
test();
}
}
public void test() {
long block = r.nextInt(blocksInTest);
if (readLastBlockOdds > 0 && r.nextInt(readLastBlockOdds) == 0) block = lastBlock.get(); // some percent of the time, try to read the last block another thread was just reading/writing
lastBlock.set(block);
Long k = block;
Val v = cache.getIfPresent(k);
if (v != null) {
hits.incrementAndGet();
assert k.equals(v.key);
}
if (v == null || updateAnyway && r.nextBoolean()) {
v = new Val();
v.key = k;
cache.put(k, v);
inserts.incrementAndGet();
}
long sz = cache.asMap().size();
if (sz > maxObservedSize.get()) maxObservedSize.set(sz); // race condition here, but an estimate is OK
}
};
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
// Thread.sleep(1000); // need to wait if executor is used for listener?
long cacheSize = cache.asMap().size();
System.out.println("Done! # of Elements = " + cacheSize + " inserts=" + inserts.get() + " removals=" + removals.get() + " hits=" + hits.get() + " maxObservedSize=" + maxObservedSize);
assert inserts.get() - removals.get() == cacheSize;
assertFalse( failed.get() );
}
} }