LUCENE-8213: Asynchronous Caching in LRUQueryCache (#916)

* LUCENE-8213: Introduce Asynchronous Caching in LRUQueryCache
This commit is contained in:
Atri Sharma 2019-11-26 12:02:03 +05:30 committed by GitHub
parent 6c7a095f33
commit 2d98a619dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 630 additions and 82 deletions

View File

@ -55,6 +55,9 @@ Improvements
* LUCENE-8984: MoreLikeThis MLT is biased for uncommon fields (Andy Hind via Anshum Gupta) * LUCENE-8984: MoreLikeThis MLT is biased for uncommon fields (Andy Hind via Anshum Gupta)
* LUCENE-8213: LRUQueryCache#doCache now uses IndexSearcher's Executor (if present)
to asynchronously cache heavy queries (Atri Sharma)
Bug fixes Bug fixes
* LUCENE-8663: NRTCachingDirectory.slowFileExists may open a file while * LUCENE-8663: NRTCachingDirectory.slowFileExists may open a file while

View File

@ -181,7 +181,13 @@ public class IndexSearcher {
} }
/** Runs searches for each segment separately, using the /** Runs searches for each segment separately, using the
* provided Executor. NOTE: * provided Executor. The passed in Executor will also be
* used by LRUQueryCache (if enabled) to perform asynchronous
* query caching.
* If a task is rejected by the host Executor, the failed task
* will then be executed on the caller thread. This is done to
* ensure that a query succeeds, albeit with a higher latency.
* NOTE:
* if you are using {@link NIOFSDirectory}, do not use * if you are using {@link NIOFSDirectory}, do not use
* the shutdownNow method of ExecutorService as this uses * the shutdownNow method of ExecutorService as this uses
* Thread.interrupt under-the-hood which can silently * Thread.interrupt under-the-hood which can silently
@ -843,7 +849,7 @@ public class IndexSearcher {
final QueryCache queryCache = this.queryCache; final QueryCache queryCache = this.queryCache;
Weight weight = query.createWeight(this, scoreMode, boost); Weight weight = query.createWeight(this, scoreMode, boost);
if (scoreMode.needsScores() == false && queryCache != null) { if (scoreMode.needsScores() == false && queryCache != null) {
weight = queryCache.doCache(weight, queryCachingPolicy); weight = queryCache.doCache(weight, queryCachingPolicy, executor);
} }
return weight; return weight;
} }

View File

@ -28,6 +28,9 @@ import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate; import java.util.function.Predicate;
@ -88,7 +91,6 @@ import static org.apache.lucene.util.RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_U
* @lucene.experimental * @lucene.experimental
*/ */
public class LRUQueryCache implements QueryCache, Accountable { public class LRUQueryCache implements QueryCache, Accountable {
private final int maxSize; private final int maxSize;
private final long maxRamBytesUsed; private final long maxRamBytesUsed;
private final Predicate<LeafReaderContext> leavesToCache; private final Predicate<LeafReaderContext> leavesToCache;
@ -271,6 +273,7 @@ public class LRUQueryCache implements QueryCache, Accountable {
assert lock.isHeldByCurrentThread(); assert lock.isHeldByCurrentThread();
assert key instanceof BoostQuery == false; assert key instanceof BoostQuery == false;
assert key instanceof ConstantScoreQuery == false; assert key instanceof ConstantScoreQuery == false;
final IndexReader.CacheKey readerKey = cacheHelper.getKey(); final IndexReader.CacheKey readerKey = cacheHelper.getKey();
final LeafCache leafCache = cache.get(readerKey); final LeafCache leafCache = cache.get(readerKey);
if (leafCache == null) { if (leafCache == null) {
@ -404,6 +407,15 @@ public class LRUQueryCache implements QueryCache, Accountable {
} }
} }
// Get original weight from the cached weight
private Weight getOriginalWeight(Weight weight) {
while (weight instanceof CachingWrapperWeight) {
weight = ((CachingWrapperWeight) weight).in;
}
return weight;
}
// pkg-private for testing // pkg-private for testing
void assertConsistent() { void assertConsistent() {
lock.lock(); lock.lock();
@ -458,12 +470,10 @@ public class LRUQueryCache implements QueryCache, Accountable {
} }
@Override @Override
public Weight doCache(Weight weight, QueryCachingPolicy policy) { public Weight doCache(final Weight weight, QueryCachingPolicy policy, Executor executor) {
while (weight instanceof CachingWrapperWeight) { Weight originalWeight = getOriginalWeight(weight);
weight = ((CachingWrapperWeight) weight).in;
}
return new CachingWrapperWeight(weight, policy); return new CachingWrapperWeight(originalWeight, policy, executor);
} }
@Override @Override
@ -665,10 +675,13 @@ public class LRUQueryCache implements QueryCache, Accountable {
// threads when IndexSearcher is created with threads // threads when IndexSearcher is created with threads
private final AtomicBoolean used; private final AtomicBoolean used;
CachingWrapperWeight(Weight in, QueryCachingPolicy policy) { private final Executor executor;
CachingWrapperWeight(Weight in, QueryCachingPolicy policy, Executor executor) {
super(in.getQuery(), 1f); super(in.getQuery(), 1f);
this.in = in; this.in = in;
this.policy = policy; this.policy = policy;
this.executor = executor;
used = new AtomicBoolean(false); used = new AtomicBoolean(false);
} }
@ -756,6 +769,17 @@ public class LRUQueryCache implements QueryCache, Accountable {
return supplier.get(leadCost); return supplier.get(leadCost);
} }
boolean cacheSynchronously = executor == null;
if (cacheSynchronously == false) {
boolean asyncCachingSucceeded = cacheAsynchronously(context, cacheHelper);
// If async caching failed, synchronous caching will
// be performed, hence do not return the uncached value
if (asyncCachingSucceeded) {
return supplier.get(leadCost);
}
}
Scorer scorer = supplier.get(Long.MAX_VALUE); Scorer scorer = supplier.get(Long.MAX_VALUE);
DocIdSet docIdSet = cacheImpl(new DefaultBulkScorer(scorer), context.reader().maxDoc()); DocIdSet docIdSet = cacheImpl(new DefaultBulkScorer(scorer), context.reader().maxDoc());
putIfAbsent(in.getQuery(), docIdSet, cacheHelper); putIfAbsent(in.getQuery(), docIdSet, cacheHelper);
@ -852,6 +876,19 @@ public class LRUQueryCache implements QueryCache, Accountable {
if (docIdSet == null) { if (docIdSet == null) {
if (policy.shouldCache(in.getQuery())) { if (policy.shouldCache(in.getQuery())) {
boolean cacheSynchronously = executor == null;
// If asynchronous caching is requested, perform the same and return
// the uncached iterator
if (cacheSynchronously == false) {
boolean asyncCachingSucceeded = cacheAsynchronously(context, cacheHelper);
// If async caching failed, we will perform synchronous caching
// hence do not return the uncached value here
if (asyncCachingSucceeded) {
return in.bulkScorer(context);
}
}
docIdSet = cache(context); docIdSet = cache(context);
putIfAbsent(in.getQuery(), docIdSet, cacheHelper); putIfAbsent(in.getQuery(), docIdSet, cacheHelper);
} else { } else {
@ -871,5 +908,30 @@ public class LRUQueryCache implements QueryCache, Accountable {
return new DefaultBulkScorer(new ConstantScoreScorer(this, 0f, ScoreMode.COMPLETE_NO_SCORES, disi)); return new DefaultBulkScorer(new ConstantScoreScorer(this, 0f, ScoreMode.COMPLETE_NO_SCORES, disi));
} }
// Perform a cache load asynchronously
// @return true if asynchronous caching succeeded, false otherwise
private boolean cacheAsynchronously(LeafReaderContext context, IndexReader.CacheHelper cacheHelper) {
FutureTask<Void> task = new FutureTask<>(() -> {
// If the reader is being closed -- do nothing
if (context.reader().tryIncRef()) {
try {
DocIdSet localDocIdSet = cache(context);
putIfAbsent(in.getQuery(), localDocIdSet, cacheHelper);
} finally {
context.reader().decRef();
}
}
return null;
});
try {
executor.execute(task);
} catch (RejectedExecutionException e) {
// Trigger synchronous caching
return false;
}
return true;
}
} }
} }

View File

@ -17,6 +17,8 @@
package org.apache.lucene.search; package org.apache.lucene.search;
import java.util.concurrent.Executor;
/** /**
* A cache for queries. * A cache for queries.
* *
@ -30,7 +32,8 @@ public interface QueryCache {
* matching docs per-segment accordingly to the given <code>policy</code>. * matching docs per-segment accordingly to the given <code>policy</code>.
* NOTE: The returned weight will only be equivalent if scores are not needed. * NOTE: The returned weight will only be equivalent if scores are not needed.
* @see Collector#scoreMode() * @see Collector#scoreMode()
* If the Executor is not null, it will be used to perform asynchronous caching
*/ */
Weight doCache(Weight weight, QueryCachingPolicy policy); Weight doCache(Weight weight, QueryCachingPolicy policy, Executor executor);
} }

View File

@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -34,8 +35,8 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.document.Document; import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.Field; import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.SortedDocValuesField; import org.apache.lucene.document.SortedDocValuesField;
import org.apache.lucene.document.StringField; import org.apache.lucene.document.StringField;
import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.IndexReader;
@ -188,7 +189,7 @@ public class TestIndexSearcher extends LuceneTestCase {
assertEquals(IndexSearcher.getDefaultQueryCache(), searcher.getQueryCache()); assertEquals(IndexSearcher.getDefaultQueryCache(), searcher.getQueryCache());
QueryCache dummyCache = new QueryCache() { QueryCache dummyCache = new QueryCache() {
@Override @Override
public Weight doCache(Weight weight, QueryCachingPolicy policy) { public Weight doCache(Weight weight, QueryCachingPolicy policy, Executor executor) {
return weight; return weight;
} }
}; };
@ -294,7 +295,7 @@ public class TestIndexSearcher extends LuceneTestCase {
service.shutdown(); service.shutdown();
} }
private static class RejectingMockExecutor implements ExecutorService { public static class RejectingMockExecutor implements ExecutorService {
public void shutdown() { public void shutdown() {
} }

View File

@ -29,8 +29,18 @@ import java.util.HashSet;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -62,6 +72,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.util.Constants; import org.apache.lucene.util.Constants;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.apache.lucene.util.NamedThreadFactory;
import org.apache.lucene.util.RamUsageTester; import org.apache.lucene.util.RamUsageTester;
import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.TestUtil;
@ -99,6 +110,9 @@ public class TestLRUQueryCache extends LuceneTestCase {
final LRUQueryCache queryCache = new LRUQueryCache(1 + random().nextInt(20), 1 + random().nextInt(10000), context -> random().nextBoolean(), Float.POSITIVE_INFINITY); final LRUQueryCache queryCache = new LRUQueryCache(1 + random().nextInt(20), 1 + random().nextInt(10000), context -> random().nextBoolean(), Float.POSITIVE_INFINITY);
Directory dir = newDirectory(); Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir); final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
ExecutorService service = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("TestLRUQueryCache"));
final SearcherFactory searcherFactory = new SearcherFactory() { final SearcherFactory searcherFactory = new SearcherFactory() {
@Override @Override
public IndexSearcher newSearcher(IndexReader reader, IndexReader previous) throws IOException { public IndexSearcher newSearcher(IndexReader reader, IndexReader previous) throws IOException {
@ -108,87 +122,103 @@ public class TestLRUQueryCache extends LuceneTestCase {
return searcher; return searcher;
} }
}; };
final boolean applyDeletes = random().nextBoolean(); final SearcherFactory concurrentSearcherFactory = new SearcherFactory() {
final SearcherManager mgr = new SearcherManager(w.w, applyDeletes, false, searcherFactory); @Override
final AtomicBoolean indexing = new AtomicBoolean(true); public IndexSearcher newSearcher(IndexReader reader, IndexReader previous) throws IOException {
final AtomicReference<Throwable> error = new AtomicReference<>(); IndexSearcher searcher = new IndexSearcher(reader, service);
final int numDocs = atLeast(10000); searcher.setQueryCachingPolicy(MAYBE_CACHE_POLICY);
Thread[] threads = new Thread[3]; searcher.setQueryCache(queryCache);
threads[0] = new Thread() { return searcher;
public void run() {
Document doc = new Document();
StringField f = new StringField("color", "", Store.NO);
doc.add(f);
for (int i = 0; indexing.get() && i < numDocs; ++i) {
f.setStringValue(RandomPicks.randomFrom(random(), new String[] {"blue", "red", "yellow"}));
try {
w.addDocument(doc);
if ((i & 63) == 0) {
mgr.maybeRefresh();
if (rarely()) {
queryCache.clear();
}
if (rarely()) {
final String color = RandomPicks.randomFrom(random(), new String[] {"blue", "red", "yellow"});
w.deleteDocuments(new Term("color", color));
}
}
} catch (Throwable t) {
error.compareAndSet(null, t);
break;
}
}
indexing.set(false);
} }
}; };
for (int i = 1; i < threads.length; ++i) {
threads[i] = new Thread() { final SearcherFactory[] searcherFactories = {searcherFactory, concurrentSearcherFactory};
@Override
for (SearcherFactory currentSearcherFactory : searcherFactories) {
final boolean applyDeletes = random().nextBoolean();
final SearcherManager mgr = new SearcherManager(w.w, applyDeletes, false, currentSearcherFactory);
final AtomicBoolean indexing = new AtomicBoolean(true);
final AtomicReference<Throwable> error = new AtomicReference<>();
final int numDocs = atLeast(10000);
Thread[] threads = new Thread[3];
threads[0] = new Thread() {
public void run() { public void run() {
while (indexing.get()) { Document doc = new Document();
StringField f = new StringField("color", "", Store.NO);
doc.add(f);
for (int i = 0; indexing.get() && i < numDocs; ++i) {
f.setStringValue(RandomPicks.randomFrom(random(), new String[]{"blue", "red", "yellow"}));
try { try {
final IndexSearcher searcher = mgr.acquire(); w.addDocument(doc);
try { if ((i & 63) == 0) {
final String value = RandomPicks.randomFrom(random(), new String[] {"blue", "red", "yellow", "green"}); mgr.maybeRefresh();
final Query q = new TermQuery(new Term("color", value)); if (rarely()) {
TotalHitCountCollector collector = new TotalHitCountCollector(); queryCache.clear();
searcher.search(q, collector); // will use the cache }
final int totalHits1 = collector.getTotalHits(); if (rarely()) {
TotalHitCountCollector collector2 = new TotalHitCountCollector(); final String color = RandomPicks.randomFrom(random(), new String[]{"blue", "red", "yellow"});
searcher.search(q, new FilterCollector(collector2) { w.deleteDocuments(new Term("color", color));
public ScoreMode scoreMode() { }
return ScoreMode.COMPLETE; // will not use the cache because of scores
}
});
final long totalHits2 = collector2.getTotalHits();
assertEquals(totalHits2, totalHits1);
} finally {
mgr.release(searcher);
} }
} catch (Throwable t) { } catch (Throwable t) {
error.compareAndSet(null, t); error.compareAndSet(null, t);
break;
} }
} }
indexing.set(false);
} }
}; };
for (int i = 1; i < threads.length; ++i) {
threads[i] = new Thread() {
@Override
public void run() {
while (indexing.get()) {
try {
final IndexSearcher searcher = mgr.acquire();
try {
final String value = RandomPicks.randomFrom(random(), new String[]{"blue", "red", "yellow", "green"});
final Query q = new TermQuery(new Term("color", value));
TotalHitCountCollector collector = new TotalHitCountCollector();
searcher.search(q, collector); // will use the cache
final int totalHits1 = collector.getTotalHits();
TotalHitCountCollector collector2 = new TotalHitCountCollector();
searcher.search(q, new FilterCollector(collector2) {
public ScoreMode scoreMode() {
return ScoreMode.COMPLETE; // will not use the cache because of scores
}
});
final long totalHits2 = collector2.getTotalHits();
assertEquals(totalHits2, totalHits1);
} finally {
mgr.release(searcher);
}
} catch (Throwable t) {
error.compareAndSet(null, t);
}
}
}
};
}
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
if (error.get() != null) {
throw error.get();
}
queryCache.assertConsistent();
mgr.close();
} }
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
if (error.get() != null) {
throw error.get();
}
queryCache.assertConsistent();
mgr.close();
w.close(); w.close();
dir.close(); dir.close();
queryCache.assertConsistent(); queryCache.assertConsistent();
service.shutdown();
} }
public void testLRUEviction() throws Exception { public void testLRUEviction() throws Exception {
@ -275,6 +305,222 @@ public class TestLRUQueryCache extends LuceneTestCase {
dir.close(); dir.close();
} }
public void testLRUConcurrentLoadAndEviction() throws Exception {
Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
Document doc = new Document();
StringField f = new StringField("color", "blue", Store.NO);
doc.add(f);
w.addDocument(doc);
f.setStringValue("red");
w.addDocument(doc);
f.setStringValue("green");
w.addDocument(doc);
final DirectoryReader reader = w.getReader();
ExecutorService service = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("TestLRUQueryCache"));
IndexSearcher searcher = new IndexSearcher(reader, service);
final CountDownLatch[] latch = {new CountDownLatch(1)};
final LRUQueryCache queryCache = new LRUQueryCache(2, 100000, context -> true, Float.POSITIVE_INFINITY) {
@Override
protected void onDocIdSetCache(Object readerCoreKey, long ramBytesUsed) {
super.onDocIdSetCache(readerCoreKey, ramBytesUsed);
latch[0].countDown();
}
};
final Query blue = new TermQuery(new Term("color", "blue"));
final Query red = new TermQuery(new Term("color", "red"));
final Query green = new TermQuery(new Term("color", "green"));
assertEquals(Collections.emptyList(), queryCache.cachedQueries());
searcher.setQueryCache(queryCache);
// the filter is not cached on any segment: no changes
searcher.setQueryCachingPolicy(NEVER_CACHE);
searcher.search(new ConstantScoreQuery(green), 1);
assertEquals(Collections.emptyList(), queryCache.cachedQueries());
searcher.setQueryCachingPolicy(ALWAYS_CACHE);
// First read should miss
searcher.search(new ConstantScoreQuery(red), 1);
// Let the cache load be completed
latch[0].await();
assertEquals(Collections.singletonList(red), queryCache.cachedQueries());
// Second read should hit
searcher.search(new ConstantScoreQuery(red), 1);
assertEquals(Collections.singletonList(red), queryCache.cachedQueries());
assertEquals(queryCache.getHitCount(), 1);
latch[0] = new CountDownLatch(1);
searcher.search(new ConstantScoreQuery(green), 1);
// Let the cache load be completed
latch[0].await();
assertEquals(Arrays.asList(red, green), queryCache.cachedQueries());
searcher.search(new ConstantScoreQuery(red), 1);
assertEquals(Arrays.asList(green, red), queryCache.cachedQueries());
assertEquals(2, queryCache.getCacheCount());
latch[0] = new CountDownLatch(1);
searcher.search(new ConstantScoreQuery(blue), 1);
// Let the cache load be completed
latch[0].await();
assertEquals(Arrays.asList(red, blue), queryCache.cachedQueries());
searcher.search(new ConstantScoreQuery(blue), 1);
assertEquals(Arrays.asList(red, blue), queryCache.cachedQueries());
assertEquals(3, queryCache.getCacheCount());
latch[0] = new CountDownLatch(1);
searcher.search(new ConstantScoreQuery(green), 1);
// Let the cache load be completed
latch[0].await();
assertEquals(Arrays.asList(blue, green), queryCache.cachedQueries());
service.shutdown();
service.awaitTermination(300, TimeUnit.MILLISECONDS);
searcher.setQueryCachingPolicy(NEVER_CACHE);
searcher.search(new ConstantScoreQuery(red), 1);
assertEquals(Arrays.asList(blue, green), queryCache.cachedQueries());
reader.close();
w.close();
dir.close();
}
public void testLRUConcurrentLoadsOfSameQuery() throws Exception {
Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
Document doc = new Document();
StringField f = new StringField("color", "blue", Store.NO);
doc.add(f);
w.addDocument(doc);
f.setStringValue("red");
w.addDocument(doc);
f.setStringValue("green");
w.addDocument(doc);
final DirectoryReader reader = w.getReader();
ExecutorService service = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("TestLRUQueryCache"));
ExecutorService stressService = new ThreadPoolExecutor(15, 15, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("TestLRUQueryCache2"));
IndexSearcher searcher = new IndexSearcher(reader, service);
final CountDownLatch latch = new CountDownLatch(1);
final LRUQueryCache queryCache = new LRUQueryCache(2, 100000, context -> true, Float.POSITIVE_INFINITY) {
@Override
protected void onDocIdSetCache(Object readerCoreKey, long ramBytesUsed) {
super.onDocIdSetCache(readerCoreKey, ramBytesUsed);
latch.countDown();
}
};
final Query green = new TermQuery(new Term("color", "green"));
assertEquals(Collections.emptyList(), queryCache.cachedQueries());
searcher.setQueryCache(queryCache);
searcher.setQueryCachingPolicy(ALWAYS_CACHE);
CountDownLatch startLatch = new CountDownLatch(1);
FutureTask<Void> task = new FutureTask<>(() -> {
startLatch.await();
searcher.search(new ConstantScoreQuery(green), 1);
return null;
});
for (int i = 0; i < 5; i++) {
stressService.submit(task);
}
startLatch.countDown();
latch.await();
assertEquals(Arrays.asList(green), queryCache.cachedQueries());
reader.close();
w.close();
dir.close();
service.shutdown();
stressService.shutdown();
}
public void testLRUConcurrentCachingAcrossSegments() throws Exception {
Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
int numDocs = atLeast(150);
int numIterations = atLeast(3);
for (int i = 0; i < numIterations; i++) {
for (int j = 0; j < numDocs; j++) {
Document doc = new Document();
StringField f = new StringField("color", "blue", Store.NO);
doc.add(f);
w.addDocument(doc);
w.addDocument(doc);
w.addDocument(doc);
}
w.commit();
}
final DirectoryReader reader = w.getReader();
ExecutorService service = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("TestLRUQueryCache"));
IndexSearcher searcher = new IndexSearcher(reader, service) {
@Override
protected LeafSlice[] slices(List<LeafReaderContext> leaves) {
ArrayList<LeafSlice> slices = new ArrayList<>();
for (LeafReaderContext ctx : leaves) {
slices.add(new LeafSlice(Arrays.asList(ctx)));
}
return slices.toArray(new LeafSlice[0]);
}
};
final LRUQueryCache queryCache = new LRUQueryCache(2, 100000, context -> true, Float.POSITIVE_INFINITY);
final Query blue = new TermQuery(new Term("color", "blue"));
assertEquals(Collections.emptyList(), queryCache.cachedQueries());
searcher.setQueryCache(queryCache);
searcher.setQueryCachingPolicy(ALWAYS_CACHE);
assert searcher.getSlices().length > 1;
searcher.search(new ConstantScoreQuery(blue), 1);
reader.close();
w.close();
dir.close();
service.shutdown();
}
public void testClearFilter() throws IOException { public void testClearFilter() throws IOException {
Directory dir = newDirectory(); Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir); final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
@ -1201,7 +1447,7 @@ public class TestLRUQueryCache extends LuceneTestCase {
// test that the bulk scorer is propagated when a scorer should not be cached // test that the bulk scorer is propagated when a scorer should not be cached
Weight weight = searcher.createWeight(new MatchAllDocsQuery(), ScoreMode.COMPLETE_NO_SCORES, 1); Weight weight = searcher.createWeight(new MatchAllDocsQuery(), ScoreMode.COMPLETE_NO_SCORES, 1);
weight = new WeightWrapper(weight, scorerCalled, bulkScorerCalled); weight = new WeightWrapper(weight, scorerCalled, bulkScorerCalled);
weight = cache.doCache(weight, NEVER_CACHE); weight = cache.doCache(weight, NEVER_CACHE, null /* executor */);
weight.bulkScorer(leaf); weight.bulkScorer(leaf);
assertEquals(true, bulkScorerCalled.get()); assertEquals(true, bulkScorerCalled.get());
assertEquals(false, scorerCalled.get()); assertEquals(false, scorerCalled.get());
@ -1238,7 +1484,7 @@ public class TestLRUQueryCache extends LuceneTestCase {
dir.close(); dir.close();
} }
public void testMinSegmentSizePredicate() throws IOException { public void testMinSegmentSizePredicate() throws IOException, InterruptedException {
Directory dir = newDirectory(); Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE); IndexWriterConfig iwc = newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE);
RandomIndexWriter w = new RandomIndexWriter(random(), dir, iwc); RandomIndexWriter w = new RandomIndexWriter(random(), dir, iwc);
@ -1250,16 +1496,41 @@ public class TestLRUQueryCache extends LuceneTestCase {
LRUQueryCache cache = new LRUQueryCache(2, 10000, new LRUQueryCache.MinSegmentSizePredicate(2, 0f), Float.POSITIVE_INFINITY); LRUQueryCache cache = new LRUQueryCache(2, 10000, new LRUQueryCache.MinSegmentSizePredicate(2, 0f), Float.POSITIVE_INFINITY);
searcher.setQueryCache(cache); searcher.setQueryCache(cache);
searcher.count(new DummyQuery()); searcher.count(new DummyQuery());
assertEquals(0, cache.getCacheCount()); assertEquals(0, cache.getCacheCount());
cache = new LRUQueryCache(2, 10000, new LRUQueryCache.MinSegmentSizePredicate(1, 0f), Float.POSITIVE_INFINITY); final CountDownLatch[] latch = { new CountDownLatch(1)};
cache = new LRUQueryCache(2, 10000,
new LRUQueryCache.MinSegmentSizePredicate(1, 0f), Float.POSITIVE_INFINITY) {
@Override
protected void onDocIdSetCache(Object readerCoreKey, long ramBytesUsed) {
super.onDocIdSetCache(readerCoreKey, ramBytesUsed);
latch[0].countDown();
}
};
searcher.setQueryCache(cache); searcher.setQueryCache(cache);
searcher.count(new DummyQuery()); searcher.count(new DummyQuery());
latch[0].await();
assertEquals(1, cache.getCacheCount()); assertEquals(1, cache.getCacheCount());
cache = new LRUQueryCache(2, 10000, new LRUQueryCache.MinSegmentSizePredicate(0, .6f), Float.POSITIVE_INFINITY); latch[0] = new CountDownLatch(1);
cache = new LRUQueryCache(2, 10000,
new LRUQueryCache.MinSegmentSizePredicate(0, .6f), Float.POSITIVE_INFINITY) {
@Override
protected void onDocIdSetCache(Object readerCoreKey, long ramBytesUsed) {
super.onDocIdSetCache(readerCoreKey, ramBytesUsed);
latch[0].countDown();
}
};
searcher.setQueryCache(cache); searcher.setQueryCache(cache);
searcher.count(new DummyQuery()); searcher.count(new DummyQuery());
latch[0].await();
assertEquals(1, cache.getCacheCount()); assertEquals(1, cache.getCacheCount());
w.addDocument(new Document()); w.addDocument(new Document());
@ -1713,6 +1984,208 @@ public class TestLRUQueryCache extends LuceneTestCase {
t.join(); t.join();
} }
public void testRejectedExecution() throws IOException {
ExecutorService service = new TestIndexSearcher.RejectingMockExecutor();
Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
Document doc = new Document();
StringField f = new StringField("color", "blue", Store.NO);
doc.add(f);
w.addDocument(doc);
f.setStringValue("red");
w.addDocument(doc);
f.setStringValue("green");
w.addDocument(doc);
final DirectoryReader reader = w.getReader();
final Query red = new TermQuery(new Term("color", "red"));
IndexSearcher searcher = new IndexSearcher(reader, service);
final LRUQueryCache queryCache = new LRUQueryCache(2, 100000, context -> true, Float.POSITIVE_INFINITY);
searcher.setQueryCache(queryCache);
searcher.setQueryCachingPolicy(ALWAYS_CACHE);
// To ensure that failing ExecutorService still allows query to run
// successfully
searcher.search(new ConstantScoreQuery(red), 1);
assertEquals(Collections.singletonList(red), queryCache.cachedQueries());
reader.close();
w.close();
dir.close();
service.shutdown();
}
public void testFailedAsyncCaching() throws IOException {
ExecutorService service = new TestIndexSearcher.RejectingMockExecutor();
Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
for (int i = 0; i < 5; i++) {
Document doc = new Document();
StringField f = new StringField("color", "blue", Store.NO);
doc.add(f);
w.addDocument(doc);
f.setStringValue("red");
w.addDocument(doc);
f.setStringValue("green");
w.addDocument(doc);
}
w.commit();
final DirectoryReader reader = w.getReader();
final Query red = new TermQuery(new Term("color", "red"));
IndexSearcher searcher = new IndexSearcher(reader, service);
assertEquals(1, searcher.leafContexts.size());
final LRUQueryCache queryCache = new LRUQueryCache(2, 100000, context -> true, Float.POSITIVE_INFINITY);
searcher.setQueryCache(queryCache);
searcher.setQueryCachingPolicy(ALWAYS_CACHE);
searcher.search(new ConstantScoreQuery(red), 1);
assertEquals(Collections.singletonList(red), queryCache.cachedQueries());
w.close();
reader.close();
dir.close();
service.shutdown();
}
public void testAsyncCachingHitsClosedReader() throws Exception {
Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir);
Document doc = new Document();
doc.add(new StringField("color", "red", Store.NO));
w.addDocument(doc);
IndexReader reader = w.getReader();
w.close();
// Because our index has a single document, it also has a single segment. In that
// case, IndexSearcher will use the current thread for searching, so the only thing
// that runs via the executor is the caching of queries.
final CountDownLatch[] awaitCaching = new CountDownLatch[1];
awaitCaching[0] = new CountDownLatch(1);
final CountDownLatch[] cachingRan = new CountDownLatch[1];
cachingRan[0] = new CountDownLatch(1);
final AtomicBoolean success = new AtomicBoolean(false);
Executor executor = runnable -> {
new Thread(() -> {
try {
awaitCaching[0].await();
runnable.run();
success.set(true);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
cachingRan[0].countDown();
}
}).start();
};
final IndexSearcher searcher = new IndexSearcher(reader, executor);
final LRUQueryCache queryCache = new LRUQueryCache(2, 100000, context -> true, Float.POSITIVE_INFINITY);
searcher.setQueryCache(queryCache);
searcher.setQueryCachingPolicy(ALWAYS_CACHE);
searcher.search(new ConstantScoreQuery(new TermQuery(new Term("color", "red"))), 1);
assertEquals(Collections.emptyList(), queryCache.cachedQueries());
awaitCaching[0].countDown();
cachingRan[0].await();
assertTrue(success.get());
assertEquals(Collections.singletonList(new TermQuery(new Term("color", "red"))), queryCache.cachedQueries());
awaitCaching[0] = new CountDownLatch(1);
cachingRan[0] = new CountDownLatch(1);
success.set(false);
queryCache.clear();
searcher.search(new ConstantScoreQuery(new TermQuery(new Term("color", "red"))), 1);
assertEquals(Collections.emptyList(), queryCache.cachedQueries());
reader.close();
awaitCaching[0].countDown();
cachingRan[0].await();
assertTrue(success.get());
assertEquals(Collections.emptyList(), queryCache.cachedQueries());
dir.close();
}
public static class BlockedMockExecutor implements ExecutorService {
private final CountDownLatch countDownLatch;
public BlockedMockExecutor(final CountDownLatch latch) {
this.countDownLatch = latch;
}
public void shutdown() {
}
public List<Runnable> shutdownNow() {
throw new UnsupportedOperationException();
}
public boolean isShutdown() {
throw new UnsupportedOperationException();
}
public boolean isTerminated() {
throw new UnsupportedOperationException();
}
public boolean awaitTermination(final long l, final TimeUnit timeUnit) throws InterruptedException {
throw new UnsupportedOperationException();
}
public <T> Future<T> submit(final Callable<T> tCallable) {
throw new UnsupportedOperationException();
}
public <T> Future<T> submit(final Runnable runnable, final T t) {
throw new UnsupportedOperationException();
}
public Future<?> submit(final Runnable runnable) {
throw new UnsupportedOperationException();
}
public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> callables) throws InterruptedException {
throw new UnsupportedOperationException();
}
public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> callables, final long l, final TimeUnit timeUnit) throws InterruptedException {
throw new UnsupportedOperationException();
}
public <T> T invokeAny(final Collection<? extends Callable<T>> callables) throws InterruptedException, ExecutionException {
throw new UnsupportedOperationException();
}
public <T> T invokeAny(final Collection<? extends Callable<T>> callables, final long l, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
throw new UnsupportedOperationException();
}
public void execute(final Runnable runnable) {
try {
countDownLatch.await();
runnable.run();
} catch (InterruptedException e) {
throw new RuntimeException(e.getMessage());
}
}
}
public void testSkipCachingForRangeQuery() throws IOException { public void testSkipCachingForRangeQuery() throws IOException {
Directory dir = newDirectory(); Directory dir = newDirectory();
final RandomIndexWriter w = new RandomIndexWriter(random(), dir); final RandomIndexWriter w = new RandomIndexWriter(random(), dir);