From 2d98a619dd96abfa716098bbc5cac2bc3a1b7d49 Mon Sep 17 00:00:00 2001 From: Atri Sharma Date: Tue, 26 Nov 2019 12:02:03 +0530 Subject: [PATCH] LUCENE-8213: Asynchronous Caching in LRUQueryCache (#916) * LUCENE-8213: Introduce Asynchronous Caching in LRUQueryCache --- lucene/CHANGES.txt | 3 + .../apache/lucene/search/IndexSearcher.java | 10 +- .../apache/lucene/search/LRUQueryCache.java | 76 ++- .../org/apache/lucene/search/QueryCache.java | 5 +- .../lucene/search/TestIndexSearcher.java | 7 +- .../lucene/search/TestLRUQueryCache.java | 611 ++++++++++++++++-- 6 files changed, 630 insertions(+), 82 deletions(-) diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 2c7c5c3f3ef..77794676397 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -55,6 +55,9 @@ Improvements * LUCENE-8984: MoreLikeThis MLT is biased for uncommon fields (Andy Hind via Anshum Gupta) +* LUCENE-8213: LRUQueryCache#doCache now uses IndexSearcher's Executor (if present) + to asynchronously cache heavy queries (Atri Sharma) + Bug fixes * LUCENE-8663: NRTCachingDirectory.slowFileExists may open a file while diff --git a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java index 2c2d6691171..b620b3dc6de 100644 --- a/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java +++ b/lucene/core/src/java/org/apache/lucene/search/IndexSearcher.java @@ -181,7 +181,13 @@ public class IndexSearcher { } /** Runs searches for each segment separately, using the - * provided Executor. NOTE: + * provided Executor. The passed in Executor will also be + * used by LRUQueryCache (if enabled) to perform asynchronous + * query caching. + * If a task is rejected by the host Executor, the failed task + * will then be executed on the caller thread. This is done to + * ensure that a query succeeds, albeit with a higher latency. + * NOTE: * if you are using {@link NIOFSDirectory}, do not use * the shutdownNow method of ExecutorService as this uses * Thread.interrupt under-the-hood which can silently @@ -843,7 +849,7 @@ public class IndexSearcher { final QueryCache queryCache = this.queryCache; Weight weight = query.createWeight(this, scoreMode, boost); if (scoreMode.needsScores() == false && queryCache != null) { - weight = queryCache.doCache(weight, queryCachingPolicy); + weight = queryCache.doCache(weight, queryCachingPolicy, executor); } return weight; } diff --git a/lucene/core/src/java/org/apache/lucene/search/LRUQueryCache.java b/lucene/core/src/java/org/apache/lucene/search/LRUQueryCache.java index da881421bf0..20798766c31 100644 --- a/lucene/core/src/java/org/apache/lucene/search/LRUQueryCache.java +++ b/lucene/core/src/java/org/apache/lucene/search/LRUQueryCache.java @@ -28,6 +28,9 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executor; +import java.util.concurrent.FutureTask; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; @@ -88,7 +91,6 @@ import static org.apache.lucene.util.RamUsageEstimator.QUERY_DEFAULT_RAM_BYTES_U * @lucene.experimental */ public class LRUQueryCache implements QueryCache, Accountable { - private final int maxSize; private final long maxRamBytesUsed; private final Predicate leavesToCache; @@ -271,6 +273,7 @@ public class LRUQueryCache implements QueryCache, Accountable { assert lock.isHeldByCurrentThread(); assert key instanceof BoostQuery == false; assert key instanceof ConstantScoreQuery == false; + final IndexReader.CacheKey readerKey = cacheHelper.getKey(); final LeafCache leafCache = cache.get(readerKey); if (leafCache == null) { @@ -404,6 +407,15 @@ public class LRUQueryCache implements QueryCache, Accountable { } } + // Get original weight from the cached weight + private Weight getOriginalWeight(Weight weight) { + while (weight instanceof CachingWrapperWeight) { + weight = ((CachingWrapperWeight) weight).in; + } + + return weight; + } + // pkg-private for testing void assertConsistent() { lock.lock(); @@ -458,12 +470,10 @@ public class LRUQueryCache implements QueryCache, Accountable { } @Override - public Weight doCache(Weight weight, QueryCachingPolicy policy) { - while (weight instanceof CachingWrapperWeight) { - weight = ((CachingWrapperWeight) weight).in; - } + public Weight doCache(final Weight weight, QueryCachingPolicy policy, Executor executor) { + Weight originalWeight = getOriginalWeight(weight); - return new CachingWrapperWeight(weight, policy); + return new CachingWrapperWeight(originalWeight, policy, executor); } @Override @@ -665,10 +675,13 @@ public class LRUQueryCache implements QueryCache, Accountable { // threads when IndexSearcher is created with threads private final AtomicBoolean used; - CachingWrapperWeight(Weight in, QueryCachingPolicy policy) { + private final Executor executor; + + CachingWrapperWeight(Weight in, QueryCachingPolicy policy, Executor executor) { super(in.getQuery(), 1f); this.in = in; this.policy = policy; + this.executor = executor; used = new AtomicBoolean(false); } @@ -756,6 +769,17 @@ public class LRUQueryCache implements QueryCache, Accountable { return supplier.get(leadCost); } + boolean cacheSynchronously = executor == null; + if (cacheSynchronously == false) { + boolean asyncCachingSucceeded = cacheAsynchronously(context, cacheHelper); + + // If async caching failed, synchronous caching will + // be performed, hence do not return the uncached value + if (asyncCachingSucceeded) { + return supplier.get(leadCost); + } + } + Scorer scorer = supplier.get(Long.MAX_VALUE); DocIdSet docIdSet = cacheImpl(new DefaultBulkScorer(scorer), context.reader().maxDoc()); putIfAbsent(in.getQuery(), docIdSet, cacheHelper); @@ -852,6 +876,19 @@ public class LRUQueryCache implements QueryCache, Accountable { if (docIdSet == null) { if (policy.shouldCache(in.getQuery())) { + boolean cacheSynchronously = executor == null; + // If asynchronous caching is requested, perform the same and return + // the uncached iterator + if (cacheSynchronously == false) { + boolean asyncCachingSucceeded = cacheAsynchronously(context, cacheHelper); + + // If async caching failed, we will perform synchronous caching + // hence do not return the uncached value here + if (asyncCachingSucceeded) { + return in.bulkScorer(context); + } + } + docIdSet = cache(context); putIfAbsent(in.getQuery(), docIdSet, cacheHelper); } else { @@ -871,5 +908,30 @@ public class LRUQueryCache implements QueryCache, Accountable { return new DefaultBulkScorer(new ConstantScoreScorer(this, 0f, ScoreMode.COMPLETE_NO_SCORES, disi)); } + // Perform a cache load asynchronously + // @return true if asynchronous caching succeeded, false otherwise + private boolean cacheAsynchronously(LeafReaderContext context, IndexReader.CacheHelper cacheHelper) { + FutureTask task = new FutureTask<>(() -> { + // If the reader is being closed -- do nothing + if (context.reader().tryIncRef()) { + try { + DocIdSet localDocIdSet = cache(context); + putIfAbsent(in.getQuery(), localDocIdSet, cacheHelper); + } finally { + context.reader().decRef(); + } + } + + return null; + }); + try { + executor.execute(task); + } catch (RejectedExecutionException e) { + // Trigger synchronous caching + return false; + } + + return true; + } } } diff --git a/lucene/core/src/java/org/apache/lucene/search/QueryCache.java b/lucene/core/src/java/org/apache/lucene/search/QueryCache.java index 94e34ee9a1e..21716edc803 100644 --- a/lucene/core/src/java/org/apache/lucene/search/QueryCache.java +++ b/lucene/core/src/java/org/apache/lucene/search/QueryCache.java @@ -17,6 +17,8 @@ package org.apache.lucene.search; +import java.util.concurrent.Executor; + /** * A cache for queries. * @@ -30,7 +32,8 @@ public interface QueryCache { * matching docs per-segment accordingly to the given policy. * NOTE: The returned weight will only be equivalent if scores are not needed. * @see Collector#scoreMode() + * If the Executor is not null, it will be used to perform asynchronous caching */ - Weight doCache(Weight weight, QueryCachingPolicy policy); + Weight doCache(Weight weight, QueryCachingPolicy policy, Executor executor); } diff --git a/lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java b/lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java index 6617c60e919..1f6f9d4a73d 100644 --- a/lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java +++ b/lucene/core/src/test/org/apache/lucene/search/TestIndexSearcher.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -34,8 +35,8 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.document.Document; -import org.apache.lucene.document.Field.Store; import org.apache.lucene.document.Field; +import org.apache.lucene.document.Field.Store; import org.apache.lucene.document.SortedDocValuesField; import org.apache.lucene.document.StringField; import org.apache.lucene.index.IndexReader; @@ -188,7 +189,7 @@ public class TestIndexSearcher extends LuceneTestCase { assertEquals(IndexSearcher.getDefaultQueryCache(), searcher.getQueryCache()); QueryCache dummyCache = new QueryCache() { @Override - public Weight doCache(Weight weight, QueryCachingPolicy policy) { + public Weight doCache(Weight weight, QueryCachingPolicy policy, Executor executor) { return weight; } }; @@ -294,7 +295,7 @@ public class TestIndexSearcher extends LuceneTestCase { service.shutdown(); } - private static class RejectingMockExecutor implements ExecutorService { + public static class RejectingMockExecutor implements ExecutorService { public void shutdown() { } diff --git a/lucene/core/src/test/org/apache/lucene/search/TestLRUQueryCache.java b/lucene/core/src/test/org/apache/lucene/search/TestLRUQueryCache.java index 68bc8c9a504..83811cb8279 100644 --- a/lucene/core/src/test/org/apache/lucene/search/TestLRUQueryCache.java +++ b/lucene/core/src/test/org/apache/lucene/search/TestLRUQueryCache.java @@ -29,8 +29,18 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.Set; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -62,6 +72,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.util.Constants; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.LuceneTestCase; +import org.apache.lucene.util.NamedThreadFactory; import org.apache.lucene.util.RamUsageTester; import org.apache.lucene.util.TestUtil; @@ -99,6 +110,9 @@ public class TestLRUQueryCache extends LuceneTestCase { final LRUQueryCache queryCache = new LRUQueryCache(1 + random().nextInt(20), 1 + random().nextInt(10000), context -> random().nextBoolean(), Float.POSITIVE_INFINITY); Directory dir = newDirectory(); final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + ExecutorService service = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + new NamedThreadFactory("TestLRUQueryCache")); final SearcherFactory searcherFactory = new SearcherFactory() { @Override public IndexSearcher newSearcher(IndexReader reader, IndexReader previous) throws IOException { @@ -108,87 +122,103 @@ public class TestLRUQueryCache extends LuceneTestCase { return searcher; } }; - final boolean applyDeletes = random().nextBoolean(); - final SearcherManager mgr = new SearcherManager(w.w, applyDeletes, false, searcherFactory); - final AtomicBoolean indexing = new AtomicBoolean(true); - final AtomicReference error = new AtomicReference<>(); - final int numDocs = atLeast(10000); - Thread[] threads = new Thread[3]; - threads[0] = new Thread() { - public void run() { - Document doc = new Document(); - StringField f = new StringField("color", "", Store.NO); - doc.add(f); - for (int i = 0; indexing.get() && i < numDocs; ++i) { - f.setStringValue(RandomPicks.randomFrom(random(), new String[] {"blue", "red", "yellow"})); - try { - w.addDocument(doc); - if ((i & 63) == 0) { - mgr.maybeRefresh(); - if (rarely()) { - queryCache.clear(); - } - if (rarely()) { - final String color = RandomPicks.randomFrom(random(), new String[] {"blue", "red", "yellow"}); - w.deleteDocuments(new Term("color", color)); - } - } - } catch (Throwable t) { - error.compareAndSet(null, t); - break; - } - } - indexing.set(false); + final SearcherFactory concurrentSearcherFactory = new SearcherFactory() { + @Override + public IndexSearcher newSearcher(IndexReader reader, IndexReader previous) throws IOException { + IndexSearcher searcher = new IndexSearcher(reader, service); + searcher.setQueryCachingPolicy(MAYBE_CACHE_POLICY); + searcher.setQueryCache(queryCache); + return searcher; } }; - for (int i = 1; i < threads.length; ++i) { - threads[i] = new Thread() { - @Override + + final SearcherFactory[] searcherFactories = {searcherFactory, concurrentSearcherFactory}; + + for (SearcherFactory currentSearcherFactory : searcherFactories) { + final boolean applyDeletes = random().nextBoolean(); + final SearcherManager mgr = new SearcherManager(w.w, applyDeletes, false, currentSearcherFactory); + final AtomicBoolean indexing = new AtomicBoolean(true); + final AtomicReference error = new AtomicReference<>(); + final int numDocs = atLeast(10000); + Thread[] threads = new Thread[3]; + threads[0] = new Thread() { public void run() { - while (indexing.get()) { + Document doc = new Document(); + StringField f = new StringField("color", "", Store.NO); + doc.add(f); + for (int i = 0; indexing.get() && i < numDocs; ++i) { + f.setStringValue(RandomPicks.randomFrom(random(), new String[]{"blue", "red", "yellow"})); try { - final IndexSearcher searcher = mgr.acquire(); - try { - final String value = RandomPicks.randomFrom(random(), new String[] {"blue", "red", "yellow", "green"}); - final Query q = new TermQuery(new Term("color", value)); - TotalHitCountCollector collector = new TotalHitCountCollector(); - searcher.search(q, collector); // will use the cache - final int totalHits1 = collector.getTotalHits(); - TotalHitCountCollector collector2 = new TotalHitCountCollector(); - searcher.search(q, new FilterCollector(collector2) { - public ScoreMode scoreMode() { - return ScoreMode.COMPLETE; // will not use the cache because of scores - } - }); - final long totalHits2 = collector2.getTotalHits(); - assertEquals(totalHits2, totalHits1); - } finally { - mgr.release(searcher); + w.addDocument(doc); + if ((i & 63) == 0) { + mgr.maybeRefresh(); + if (rarely()) { + queryCache.clear(); + } + if (rarely()) { + final String color = RandomPicks.randomFrom(random(), new String[]{"blue", "red", "yellow"}); + w.deleteDocuments(new Term("color", color)); + } } } catch (Throwable t) { error.compareAndSet(null, t); + break; } } + indexing.set(false); } }; + for (int i = 1; i < threads.length; ++i) { + threads[i] = new Thread() { + @Override + public void run() { + while (indexing.get()) { + try { + final IndexSearcher searcher = mgr.acquire(); + try { + final String value = RandomPicks.randomFrom(random(), new String[]{"blue", "red", "yellow", "green"}); + final Query q = new TermQuery(new Term("color", value)); + TotalHitCountCollector collector = new TotalHitCountCollector(); + searcher.search(q, collector); // will use the cache + final int totalHits1 = collector.getTotalHits(); + TotalHitCountCollector collector2 = new TotalHitCountCollector(); + searcher.search(q, new FilterCollector(collector2) { + public ScoreMode scoreMode() { + return ScoreMode.COMPLETE; // will not use the cache because of scores + } + }); + final long totalHits2 = collector2.getTotalHits(); + assertEquals(totalHits2, totalHits1); + } finally { + mgr.release(searcher); + } + } catch (Throwable t) { + error.compareAndSet(null, t); + } + } + } + }; + } + + for (Thread thread : threads) { + thread.start(); + } + + for (Thread thread : threads) { + thread.join(); + } + + if (error.get() != null) { + throw error.get(); + } + queryCache.assertConsistent(); + mgr.close(); } - for (Thread thread : threads) { - thread.start(); - } - - for (Thread thread : threads) { - thread.join(); - } - - if (error.get() != null) { - throw error.get(); - } - queryCache.assertConsistent(); - mgr.close(); w.close(); dir.close(); queryCache.assertConsistent(); + service.shutdown(); } public void testLRUEviction() throws Exception { @@ -275,6 +305,222 @@ public class TestLRUQueryCache extends LuceneTestCase { dir.close(); } + public void testLRUConcurrentLoadAndEviction() throws Exception { + Directory dir = newDirectory(); + final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + + Document doc = new Document(); + StringField f = new StringField("color", "blue", Store.NO); + doc.add(f); + w.addDocument(doc); + f.setStringValue("red"); + w.addDocument(doc); + f.setStringValue("green"); + w.addDocument(doc); + final DirectoryReader reader = w.getReader(); + ExecutorService service = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + new NamedThreadFactory("TestLRUQueryCache")); + + IndexSearcher searcher = new IndexSearcher(reader, service); + + final CountDownLatch[] latch = {new CountDownLatch(1)}; + + final LRUQueryCache queryCache = new LRUQueryCache(2, 100000, context -> true, Float.POSITIVE_INFINITY) { + @Override + protected void onDocIdSetCache(Object readerCoreKey, long ramBytesUsed) { + super.onDocIdSetCache(readerCoreKey, ramBytesUsed); + latch[0].countDown(); + } + }; + + final Query blue = new TermQuery(new Term("color", "blue")); + final Query red = new TermQuery(new Term("color", "red")); + final Query green = new TermQuery(new Term("color", "green")); + + assertEquals(Collections.emptyList(), queryCache.cachedQueries()); + + searcher.setQueryCache(queryCache); + // the filter is not cached on any segment: no changes + searcher.setQueryCachingPolicy(NEVER_CACHE); + searcher.search(new ConstantScoreQuery(green), 1); + assertEquals(Collections.emptyList(), queryCache.cachedQueries()); + + searcher.setQueryCachingPolicy(ALWAYS_CACHE); + + // First read should miss + searcher.search(new ConstantScoreQuery(red), 1); + + // Let the cache load be completed + latch[0].await(); + assertEquals(Collections.singletonList(red), queryCache.cachedQueries()); + + // Second read should hit + searcher.search(new ConstantScoreQuery(red), 1); + assertEquals(Collections.singletonList(red), queryCache.cachedQueries()); + assertEquals(queryCache.getHitCount(), 1); + + latch[0] = new CountDownLatch(1); + searcher.search(new ConstantScoreQuery(green), 1); + + // Let the cache load be completed + latch[0].await(); + assertEquals(Arrays.asList(red, green), queryCache.cachedQueries()); + + searcher.search(new ConstantScoreQuery(red), 1); + assertEquals(Arrays.asList(green, red), queryCache.cachedQueries()); + assertEquals(2, queryCache.getCacheCount()); + + latch[0] = new CountDownLatch(1); + + searcher.search(new ConstantScoreQuery(blue), 1); + + // Let the cache load be completed + latch[0].await(); + assertEquals(Arrays.asList(red, blue), queryCache.cachedQueries()); + + searcher.search(new ConstantScoreQuery(blue), 1); + assertEquals(Arrays.asList(red, blue), queryCache.cachedQueries()); + assertEquals(3, queryCache.getCacheCount()); + + latch[0] = new CountDownLatch(1); + + searcher.search(new ConstantScoreQuery(green), 1); + + // Let the cache load be completed + latch[0].await(); + assertEquals(Arrays.asList(blue, green), queryCache.cachedQueries()); + + service.shutdown(); + service.awaitTermination(300, TimeUnit.MILLISECONDS); + + searcher.setQueryCachingPolicy(NEVER_CACHE); + searcher.search(new ConstantScoreQuery(red), 1); + assertEquals(Arrays.asList(blue, green), queryCache.cachedQueries()); + + reader.close(); + w.close(); + dir.close(); + } + + public void testLRUConcurrentLoadsOfSameQuery() throws Exception { + Directory dir = newDirectory(); + final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + + Document doc = new Document(); + StringField f = new StringField("color", "blue", Store.NO); + doc.add(f); + w.addDocument(doc); + f.setStringValue("red"); + w.addDocument(doc); + f.setStringValue("green"); + w.addDocument(doc); + final DirectoryReader reader = w.getReader(); + ExecutorService service = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + new NamedThreadFactory("TestLRUQueryCache")); + + ExecutorService stressService = new ThreadPoolExecutor(15, 15, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + new NamedThreadFactory("TestLRUQueryCache2")); + + IndexSearcher searcher = new IndexSearcher(reader, service); + + final CountDownLatch latch = new CountDownLatch(1); + + final LRUQueryCache queryCache = new LRUQueryCache(2, 100000, context -> true, Float.POSITIVE_INFINITY) { + @Override + protected void onDocIdSetCache(Object readerCoreKey, long ramBytesUsed) { + super.onDocIdSetCache(readerCoreKey, ramBytesUsed); + latch.countDown(); + } + }; + + final Query green = new TermQuery(new Term("color", "green")); + + assertEquals(Collections.emptyList(), queryCache.cachedQueries()); + + searcher.setQueryCache(queryCache); + searcher.setQueryCachingPolicy(ALWAYS_CACHE); + + CountDownLatch startLatch = new CountDownLatch(1); + + FutureTask task = new FutureTask<>(() -> { + startLatch.await(); + searcher.search(new ConstantScoreQuery(green), 1); + return null; + }); + + for (int i = 0; i < 5; i++) { + stressService.submit(task); + } + + startLatch.countDown(); + + latch.await(); + assertEquals(Arrays.asList(green), queryCache.cachedQueries()); + + reader.close(); + w.close(); + dir.close(); + service.shutdown(); + stressService.shutdown(); + } + + public void testLRUConcurrentCachingAcrossSegments() throws Exception { + Directory dir = newDirectory(); + final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + + int numDocs = atLeast(150); + int numIterations = atLeast(3); + + for (int i = 0; i < numIterations; i++) { + for (int j = 0; j < numDocs; j++) { + Document doc = new Document(); + StringField f = new StringField("color", "blue", Store.NO); + doc.add(f); + w.addDocument(doc); + w.addDocument(doc); + w.addDocument(doc); + } + w.commit(); + } + + final DirectoryReader reader = w.getReader(); + + ExecutorService service = new ThreadPoolExecutor(4, 4, 0L, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue(), + new NamedThreadFactory("TestLRUQueryCache")); + + IndexSearcher searcher = new IndexSearcher(reader, service) { + @Override + protected LeafSlice[] slices(List leaves) { + ArrayList slices = new ArrayList<>(); + for (LeafReaderContext ctx : leaves) { + slices.add(new LeafSlice(Arrays.asList(ctx))); + } + return slices.toArray(new LeafSlice[0]); + } + }; + + final LRUQueryCache queryCache = new LRUQueryCache(2, 100000, context -> true, Float.POSITIVE_INFINITY); + + final Query blue = new TermQuery(new Term("color", "blue")); + + assertEquals(Collections.emptyList(), queryCache.cachedQueries()); + + searcher.setQueryCache(queryCache); + searcher.setQueryCachingPolicy(ALWAYS_CACHE); + assert searcher.getSlices().length > 1; + + searcher.search(new ConstantScoreQuery(blue), 1); + + reader.close(); + w.close(); + dir.close(); + service.shutdown(); + } + public void testClearFilter() throws IOException { Directory dir = newDirectory(); final RandomIndexWriter w = new RandomIndexWriter(random(), dir); @@ -1201,7 +1447,7 @@ public class TestLRUQueryCache extends LuceneTestCase { // test that the bulk scorer is propagated when a scorer should not be cached Weight weight = searcher.createWeight(new MatchAllDocsQuery(), ScoreMode.COMPLETE_NO_SCORES, 1); weight = new WeightWrapper(weight, scorerCalled, bulkScorerCalled); - weight = cache.doCache(weight, NEVER_CACHE); + weight = cache.doCache(weight, NEVER_CACHE, null /* executor */); weight.bulkScorer(leaf); assertEquals(true, bulkScorerCalled.get()); assertEquals(false, scorerCalled.get()); @@ -1238,7 +1484,7 @@ public class TestLRUQueryCache extends LuceneTestCase { dir.close(); } - public void testMinSegmentSizePredicate() throws IOException { + public void testMinSegmentSizePredicate() throws IOException, InterruptedException { Directory dir = newDirectory(); IndexWriterConfig iwc = newIndexWriterConfig().setMergePolicy(NoMergePolicy.INSTANCE); RandomIndexWriter w = new RandomIndexWriter(random(), dir, iwc); @@ -1250,16 +1496,41 @@ public class TestLRUQueryCache extends LuceneTestCase { LRUQueryCache cache = new LRUQueryCache(2, 10000, new LRUQueryCache.MinSegmentSizePredicate(2, 0f), Float.POSITIVE_INFINITY); searcher.setQueryCache(cache); searcher.count(new DummyQuery()); + assertEquals(0, cache.getCacheCount()); - cache = new LRUQueryCache(2, 10000, new LRUQueryCache.MinSegmentSizePredicate(1, 0f), Float.POSITIVE_INFINITY); + final CountDownLatch[] latch = { new CountDownLatch(1)}; + cache = new LRUQueryCache(2, 10000, + new LRUQueryCache.MinSegmentSizePredicate(1, 0f), Float.POSITIVE_INFINITY) { + @Override + protected void onDocIdSetCache(Object readerCoreKey, long ramBytesUsed) { + super.onDocIdSetCache(readerCoreKey, ramBytesUsed); + latch[0].countDown(); + } + }; + searcher.setQueryCache(cache); + searcher.count(new DummyQuery()); + + latch[0].await(); assertEquals(1, cache.getCacheCount()); - cache = new LRUQueryCache(2, 10000, new LRUQueryCache.MinSegmentSizePredicate(0, .6f), Float.POSITIVE_INFINITY); + latch[0] = new CountDownLatch(1); + cache = new LRUQueryCache(2, 10000, + new LRUQueryCache.MinSegmentSizePredicate(0, .6f), Float.POSITIVE_INFINITY) { + @Override + protected void onDocIdSetCache(Object readerCoreKey, long ramBytesUsed) { + super.onDocIdSetCache(readerCoreKey, ramBytesUsed); + latch[0].countDown(); + } + }; + searcher.setQueryCache(cache); + searcher.count(new DummyQuery()); + + latch[0].await(); assertEquals(1, cache.getCacheCount()); w.addDocument(new Document()); @@ -1713,6 +1984,208 @@ public class TestLRUQueryCache extends LuceneTestCase { t.join(); } + public void testRejectedExecution() throws IOException { + ExecutorService service = new TestIndexSearcher.RejectingMockExecutor(); + Directory dir = newDirectory(); + final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + + Document doc = new Document(); + StringField f = new StringField("color", "blue", Store.NO); + doc.add(f); + w.addDocument(doc); + f.setStringValue("red"); + w.addDocument(doc); + f.setStringValue("green"); + w.addDocument(doc); + final DirectoryReader reader = w.getReader(); + + final Query red = new TermQuery(new Term("color", "red")); + + IndexSearcher searcher = new IndexSearcher(reader, service); + + final LRUQueryCache queryCache = new LRUQueryCache(2, 100000, context -> true, Float.POSITIVE_INFINITY); + + searcher.setQueryCache(queryCache); + searcher.setQueryCachingPolicy(ALWAYS_CACHE); + + // To ensure that failing ExecutorService still allows query to run + // successfully + + searcher.search(new ConstantScoreQuery(red), 1); + assertEquals(Collections.singletonList(red), queryCache.cachedQueries()); + + reader.close(); + w.close(); + dir.close(); + service.shutdown(); + } + + public void testFailedAsyncCaching() throws IOException { + ExecutorService service = new TestIndexSearcher.RejectingMockExecutor(); + + Directory dir = newDirectory(); + final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + + for (int i = 0; i < 5; i++) { + Document doc = new Document(); + StringField f = new StringField("color", "blue", Store.NO); + doc.add(f); + w.addDocument(doc); + f.setStringValue("red"); + w.addDocument(doc); + f.setStringValue("green"); + w.addDocument(doc); + } + + w.commit(); + + final DirectoryReader reader = w.getReader(); + + final Query red = new TermQuery(new Term("color", "red")); + + IndexSearcher searcher = new IndexSearcher(reader, service); + + assertEquals(1, searcher.leafContexts.size()); + + final LRUQueryCache queryCache = new LRUQueryCache(2, 100000, context -> true, Float.POSITIVE_INFINITY); + + searcher.setQueryCache(queryCache); + searcher.setQueryCachingPolicy(ALWAYS_CACHE); + + searcher.search(new ConstantScoreQuery(red), 1); + + assertEquals(Collections.singletonList(red), queryCache.cachedQueries()); + + w.close(); + reader.close(); + dir.close(); + service.shutdown(); + } + + public void testAsyncCachingHitsClosedReader() throws Exception { + Directory dir = newDirectory(); + final RandomIndexWriter w = new RandomIndexWriter(random(), dir); + Document doc = new Document(); + doc.add(new StringField("color", "red", Store.NO)); + w.addDocument(doc); + IndexReader reader = w.getReader(); + w.close(); + + // Because our index has a single document, it also has a single segment. In that + // case, IndexSearcher will use the current thread for searching, so the only thing + // that runs via the executor is the caching of queries. + final CountDownLatch[] awaitCaching = new CountDownLatch[1]; + awaitCaching[0] = new CountDownLatch(1); + final CountDownLatch[] cachingRan = new CountDownLatch[1]; + cachingRan[0] = new CountDownLatch(1); + final AtomicBoolean success = new AtomicBoolean(false); + Executor executor = runnable -> { + new Thread(() -> { + try { + awaitCaching[0].await(); + runnable.run(); + success.set(true); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + cachingRan[0].countDown(); + } + }).start(); + }; + + final IndexSearcher searcher = new IndexSearcher(reader, executor); + final LRUQueryCache queryCache = new LRUQueryCache(2, 100000, context -> true, Float.POSITIVE_INFINITY); + searcher.setQueryCache(queryCache); + searcher.setQueryCachingPolicy(ALWAYS_CACHE); + + searcher.search(new ConstantScoreQuery(new TermQuery(new Term("color", "red"))), 1); + assertEquals(Collections.emptyList(), queryCache.cachedQueries()); + awaitCaching[0].countDown(); + cachingRan[0].await(); + assertTrue(success.get()); + assertEquals(Collections.singletonList(new TermQuery(new Term("color", "red"))), queryCache.cachedQueries()); + + awaitCaching[0] = new CountDownLatch(1); + cachingRan[0] = new CountDownLatch(1); + success.set(false); + queryCache.clear(); + + searcher.search(new ConstantScoreQuery(new TermQuery(new Term("color", "red"))), 1); + assertEquals(Collections.emptyList(), queryCache.cachedQueries()); + reader.close(); + awaitCaching[0].countDown(); + cachingRan[0].await(); + assertTrue(success.get()); + assertEquals(Collections.emptyList(), queryCache.cachedQueries()); + + dir.close(); + } + + public static class BlockedMockExecutor implements ExecutorService { + + private final CountDownLatch countDownLatch; + + public BlockedMockExecutor(final CountDownLatch latch) { + this.countDownLatch = latch; + } + + public void shutdown() { + } + + public List shutdownNow() { + throw new UnsupportedOperationException(); + } + + public boolean isShutdown() { + throw new UnsupportedOperationException(); + } + + public boolean isTerminated() { + throw new UnsupportedOperationException(); + } + + public boolean awaitTermination(final long l, final TimeUnit timeUnit) throws InterruptedException { + throw new UnsupportedOperationException(); + } + + public Future submit(final Callable tCallable) { + throw new UnsupportedOperationException(); + } + + public Future submit(final Runnable runnable, final T t) { + throw new UnsupportedOperationException(); + } + + public Future submit(final Runnable runnable) { + throw new UnsupportedOperationException(); + } + + public List> invokeAll(final Collection> callables) throws InterruptedException { + throw new UnsupportedOperationException(); + } + + public List> invokeAll(final Collection> callables, final long l, final TimeUnit timeUnit) throws InterruptedException { + throw new UnsupportedOperationException(); + } + + public T invokeAny(final Collection> callables) throws InterruptedException, ExecutionException { + throw new UnsupportedOperationException(); + } + + public T invokeAny(final Collection> callables, final long l, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException { + throw new UnsupportedOperationException(); + } + + public void execute(final Runnable runnable) { + try { + countDownLatch.await(); + runnable.run(); + } catch (InterruptedException e) { + throw new RuntimeException(e.getMessage()); + } + } + } + public void testSkipCachingForRangeQuery() throws IOException { Directory dir = newDirectory(); final RandomIndexWriter w = new RandomIndexWriter(random(), dir);