Merge pull request #15202 from jpountz/fix/clear_empty_query_cache

Fix NPE when a segment with an empty cache gets closed.
This commit is contained in:
Adrien Grand 2015-12-03 09:38:08 +01:00
commit 79ff2ddd20
2 changed files with 325 additions and 12 deletions

View File

@ -150,18 +150,23 @@ public class IndicesQueryCache extends AbstractComponent implements QueryCache,
protected void onDocIdSetEviction(Object readerCoreKey, int numEntries, long sumRamBytesUsed) {
assert Thread.holdsLock(this);
super.onDocIdSetEviction(readerCoreKey, numEntries, sumRamBytesUsed);
// We can't use ShardCoreKeyMap here because its core closed
// listener is called before the listener of the cache which
// triggers this eviction. So instead we use use stats2 that
// we only evict when nothing is cached anymore on the segment
// instead of relying on close listeners
final StatsAndCount statsAndCount = stats2.get(readerCoreKey);
final Stats shardStats = statsAndCount.stats;
shardStats.cacheSize -= numEntries;
shardStats.ramBytesUsed -= sumRamBytesUsed;
statsAndCount.count -= numEntries;
if (statsAndCount.count == 0) {
stats2.remove(readerCoreKey);
// onDocIdSetEviction might sometimes be called with a number
// of entries equal to zero if the cache for the given segment
// was already empty when the close listener was called
if (numEntries > 0) {
// We can't use ShardCoreKeyMap here because its core closed
// listener is called before the listener of the cache which
// triggers this eviction. So instead we use use stats2 that
// we only evict when nothing is cached anymore on the segment
// instead of relying on close listeners
final StatsAndCount statsAndCount = stats2.get(readerCoreKey);
final Stats shardStats = statsAndCount.stats;
shardStats.cacheSize -= numEntries;
shardStats.ramBytesUsed -= sumRamBytesUsed;
statsAndCount.count -= numEntries;
if (statsAndCount.count == 0) {
stats2.remove(readerCoreKey);
}
}
}

View File

@ -0,0 +1,308 @@
package org.elasticsearch.indices.cache.query;
import java.io.IOException;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.ConstantScoreScorer;
import org.apache.lucene.search.ConstantScoreWeight;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.cache.query.QueryCacheStats;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
public class IndicesQueryCacheTests extends ESTestCase {
private static class DummyQuery extends Query {
private final int id;
DummyQuery(int id) {
this.id = id;
}
@Override
public boolean equals(Object obj) {
return super.equals(obj) && id == ((DummyQuery) obj).id;
}
@Override
public int hashCode() {
return 31 * super.hashCode() + id;
}
@Override
public String toString(String field) {
return "dummy";
}
@Override
public Weight createWeight(IndexSearcher searcher, boolean needsScores)
throws IOException {
return new ConstantScoreWeight(this) {
@Override
public Scorer scorer(LeafReaderContext context) throws IOException {
return new ConstantScoreScorer(this, score(), DocIdSetIterator.all(context.reader().maxDoc()));
}
};
}
}
public void testBasics() throws IOException {
Directory dir = newDirectory();
IndexWriter w = new IndexWriter(dir, newIndexWriterConfig());
w.addDocument(new Document());
DirectoryReader r = DirectoryReader.open(w, false);
w.close();
ShardId shard = new ShardId(new Index("index"), 0);
r = ElasticsearchDirectoryReader.wrap(r, shard);
IndexSearcher s = new IndexSearcher(r);
s.setQueryCachingPolicy(QueryCachingPolicy.ALWAYS_CACHE);
Settings settings = Settings.builder()
.put(IndicesQueryCache.INDICES_CACHE_QUERY_COUNT, 10)
.build();
IndicesQueryCache cache = new IndicesQueryCache(settings);
s.setQueryCache(cache);
QueryCacheStats stats = cache.getStats(shard);
assertEquals(0L, stats.getCacheSize());
assertEquals(0L, stats.getCacheCount());
assertEquals(0L, stats.getHitCount());
assertEquals(0L, stats.getMissCount());
assertEquals(1, s.count(new DummyQuery(0)));
stats = cache.getStats(shard);
assertEquals(1L, stats.getCacheSize());
assertEquals(1L, stats.getCacheCount());
assertEquals(0L, stats.getHitCount());
assertEquals(1L, stats.getMissCount());
for (int i = 1; i < 20; ++i) {
assertEquals(1, s.count(new DummyQuery(i)));
}
stats = cache.getStats(shard);
assertEquals(10L, stats.getCacheSize());
assertEquals(20L, stats.getCacheCount());
assertEquals(0L, stats.getHitCount());
assertEquals(20L, stats.getMissCount());
s.count(new DummyQuery(10));
stats = cache.getStats(shard);
assertEquals(10L, stats.getCacheSize());
assertEquals(20L, stats.getCacheCount());
assertEquals(1L, stats.getHitCount());
assertEquals(20L, stats.getMissCount());
IOUtils.close(r, dir);
// got emptied, but no changes to other metrics
stats = cache.getStats(shard);
assertEquals(0L, stats.getCacheSize());
assertEquals(20L, stats.getCacheCount());
assertEquals(1L, stats.getHitCount());
assertEquals(20L, stats.getMissCount());
cache.onClose(shard);
// forgot everything
stats = cache.getStats(shard);
assertEquals(0L, stats.getCacheSize());
assertEquals(0L, stats.getCacheCount());
assertEquals(0L, stats.getHitCount());
assertEquals(0L, stats.getMissCount());
cache.close(); // this triggers some assertions
}
public void testTwoShards() throws IOException {
Directory dir1 = newDirectory();
IndexWriter w1 = new IndexWriter(dir1, newIndexWriterConfig());
w1.addDocument(new Document());
DirectoryReader r1 = DirectoryReader.open(w1, false);
w1.close();
ShardId shard1 = new ShardId(new Index("index"), 0);
r1 = ElasticsearchDirectoryReader.wrap(r1, shard1);
IndexSearcher s1 = new IndexSearcher(r1);
s1.setQueryCachingPolicy(QueryCachingPolicy.ALWAYS_CACHE);
Directory dir2 = newDirectory();
IndexWriter w2 = new IndexWriter(dir2, newIndexWriterConfig());
w2.addDocument(new Document());
DirectoryReader r2 = DirectoryReader.open(w2, false);
w2.close();
ShardId shard2 = new ShardId(new Index("index"), 1);
r2 = ElasticsearchDirectoryReader.wrap(r2, shard2);
IndexSearcher s2 = new IndexSearcher(r2);
s2.setQueryCachingPolicy(QueryCachingPolicy.ALWAYS_CACHE);
Settings settings = Settings.builder()
.put(IndicesQueryCache.INDICES_CACHE_QUERY_COUNT, 10)
.build();
IndicesQueryCache cache = new IndicesQueryCache(settings);
s1.setQueryCache(cache);
s2.setQueryCache(cache);
assertEquals(1, s1.count(new DummyQuery(0)));
QueryCacheStats stats1 = cache.getStats(shard1);
assertEquals(1L, stats1.getCacheSize());
assertEquals(1L, stats1.getCacheCount());
assertEquals(0L, stats1.getHitCount());
assertEquals(1L, stats1.getMissCount());
QueryCacheStats stats2 = cache.getStats(shard2);
assertEquals(0L, stats2.getCacheSize());
assertEquals(0L, stats2.getCacheCount());
assertEquals(0L, stats2.getHitCount());
assertEquals(0L, stats2.getMissCount());
assertEquals(1, s2.count(new DummyQuery(0)));
stats1 = cache.getStats(shard1);
assertEquals(1L, stats1.getCacheSize());
assertEquals(1L, stats1.getCacheCount());
assertEquals(0L, stats1.getHitCount());
assertEquals(1L, stats1.getMissCount());
stats2 = cache.getStats(shard2);
assertEquals(1L, stats2.getCacheSize());
assertEquals(1L, stats2.getCacheCount());
assertEquals(0L, stats2.getHitCount());
assertEquals(1L, stats2.getMissCount());
for (int i = 0; i < 20; ++i) {
assertEquals(1, s2.count(new DummyQuery(i)));
}
stats1 = cache.getStats(shard1);
assertEquals(0L, stats1.getCacheSize()); // evicted
assertEquals(1L, stats1.getCacheCount());
assertEquals(0L, stats1.getHitCount());
assertEquals(1L, stats1.getMissCount());
stats2 = cache.getStats(shard2);
assertEquals(10L, stats2.getCacheSize());
assertEquals(20L, stats2.getCacheCount());
assertEquals(1L, stats2.getHitCount());
assertEquals(20L, stats2.getMissCount());
IOUtils.close(r1, dir1);
// no changes
stats1 = cache.getStats(shard1);
assertEquals(0L, stats1.getCacheSize());
assertEquals(1L, stats1.getCacheCount());
assertEquals(0L, stats1.getHitCount());
assertEquals(1L, stats1.getMissCount());
stats2 = cache.getStats(shard2);
assertEquals(10L, stats2.getCacheSize());
assertEquals(20L, stats2.getCacheCount());
assertEquals(1L, stats2.getHitCount());
assertEquals(20L, stats2.getMissCount());
cache.onClose(shard1);
// forgot everything about shard1
stats1 = cache.getStats(shard1);
assertEquals(0L, stats1.getCacheSize());
assertEquals(0L, stats1.getCacheCount());
assertEquals(0L, stats1.getHitCount());
assertEquals(0L, stats1.getMissCount());
stats2 = cache.getStats(shard2);
assertEquals(10L, stats2.getCacheSize());
assertEquals(20L, stats2.getCacheCount());
assertEquals(1L, stats2.getHitCount());
assertEquals(20L, stats2.getMissCount());
IOUtils.close(r2, dir2);
cache.onClose(shard2);
// forgot everything about shard2
stats1 = cache.getStats(shard1);
assertEquals(0L, stats1.getCacheSize());
assertEquals(0L, stats1.getCacheCount());
assertEquals(0L, stats1.getHitCount());
assertEquals(0L, stats1.getMissCount());
stats2 = cache.getStats(shard2);
assertEquals(0L, stats2.getCacheSize());
assertEquals(0L, stats2.getCacheCount());
assertEquals(0L, stats2.getHitCount());
assertEquals(0L, stats2.getMissCount());
cache.close(); // this triggers some assertions
}
// Make sure the cache behaves correctly when a segment that is associated
// with an empty cache gets closed. In that particular case, the eviction
// callback is called with a number of evicted entries equal to 0
// see https://github.com/elastic/elasticsearch/issues/15043
public void testStatsOnEviction() throws IOException {
Directory dir1 = newDirectory();
IndexWriter w1 = new IndexWriter(dir1, newIndexWriterConfig());
w1.addDocument(new Document());
DirectoryReader r1 = DirectoryReader.open(w1, false);
w1.close();
ShardId shard1 = new ShardId(new Index("index"), 0);
r1 = ElasticsearchDirectoryReader.wrap(r1, shard1);
IndexSearcher s1 = new IndexSearcher(r1);
s1.setQueryCachingPolicy(QueryCachingPolicy.ALWAYS_CACHE);
Directory dir2 = newDirectory();
IndexWriter w2 = new IndexWriter(dir2, newIndexWriterConfig());
w2.addDocument(new Document());
DirectoryReader r2 = DirectoryReader.open(w2, false);
w2.close();
ShardId shard2 = new ShardId(new Index("index"), 1);
r2 = ElasticsearchDirectoryReader.wrap(r2, shard2);
IndexSearcher s2 = new IndexSearcher(r2);
s2.setQueryCachingPolicy(QueryCachingPolicy.ALWAYS_CACHE);
Settings settings = Settings.builder()
.put(IndicesQueryCache.INDICES_CACHE_QUERY_COUNT, 10)
.build();
IndicesQueryCache cache = new IndicesQueryCache(settings);
s1.setQueryCache(cache);
s2.setQueryCache(cache);
assertEquals(1, s1.count(new DummyQuery(0)));
for (int i = 1; i <= 20; ++i) {
assertEquals(1, s2.count(new DummyQuery(i)));
}
QueryCacheStats stats1 = cache.getStats(shard1);
assertEquals(0L, stats1.getCacheSize());
assertEquals(1L, stats1.getCacheCount());
// this used to fail because we were evicting an empty cache on
// the segment from r1
IOUtils.close(r1, dir1);
cache.onClose(shard1);
IOUtils.close(r2, dir2);
cache.onClose(shard2);
cache.close(); // this triggers some assertions
}
}