mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-25 14:26:27 +00:00
experiment with lab for filter cache, disabled by default for now, also, move to use soft reference on the whole reader key value, and not per filter (less load)
This commit is contained in:
parent
b658fba22e
commit
8eab5ec528
@ -0,0 +1,249 @@
|
||||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.common.lab;
|
||||
|
||||
import org.elasticsearch.common.Preconditions;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
public class LongsLAB {
|
||||
|
||||
private AtomicReference<Chunk> curChunk = new AtomicReference<Chunk>();
|
||||
|
||||
final int chunkSize;
|
||||
final int maxAlloc;
|
||||
|
||||
public LongsLAB(int chunkSize, int maxAlloc) {
|
||||
this.chunkSize = chunkSize;
|
||||
this.maxAlloc = maxAlloc;
|
||||
// if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one!
|
||||
Preconditions.checkArgument(maxAlloc <= chunkSize);
|
||||
}
|
||||
|
||||
/**
|
||||
* Allocate a slice of the given length.
|
||||
*
|
||||
* If the size is larger than the maximum size specified for this
|
||||
* allocator, returns null.
|
||||
*/
|
||||
public Allocation allocateLongs(int size) {
|
||||
Preconditions.checkArgument(size >= 0, "negative size");
|
||||
|
||||
// Callers should satisfy large allocations directly from JVM since they
|
||||
// don't cause fragmentation as badly.
|
||||
if (size > maxAlloc) {
|
||||
return null;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
Chunk c = getOrMakeChunk();
|
||||
|
||||
// Try to allocate from this chunk
|
||||
int allocOffset = c.alloc(size);
|
||||
if (allocOffset != -1) {
|
||||
// We succeeded - this is the common case - small alloc
|
||||
// from a big buffer
|
||||
return new Allocation(c.data, allocOffset);
|
||||
}
|
||||
|
||||
// not enough space!
|
||||
// try to retire this chunk
|
||||
tryRetireChunk(c);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to retire the current chunk if it is still
|
||||
* <code>c</code>. Postcondition is that curChunk.get()
|
||||
* != c
|
||||
*/
|
||||
private void tryRetireChunk(Chunk c) {
|
||||
@SuppressWarnings("unused")
|
||||
boolean weRetiredIt = curChunk.compareAndSet(c, null);
|
||||
// If the CAS succeeds, that means that we won the race
|
||||
// to retire the chunk. We could use this opportunity to
|
||||
// update metrics on external fragmentation.
|
||||
//
|
||||
// If the CAS fails, that means that someone else already
|
||||
// retired the chunk for us.
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current chunk, or, if there is no current chunk,
|
||||
* allocate a new one from the JVM.
|
||||
*/
|
||||
private Chunk getOrMakeChunk() {
|
||||
while (true) {
|
||||
// Try to get the chunk
|
||||
Chunk c = curChunk.get();
|
||||
if (c != null) {
|
||||
return c;
|
||||
}
|
||||
|
||||
// No current chunk, so we want to allocate one. We race
|
||||
// against other allocators to CAS in an uninitialized chunk
|
||||
// (which is cheap to allocate)
|
||||
c = new Chunk(chunkSize);
|
||||
if (curChunk.compareAndSet(null, c)) {
|
||||
// we won race - now we need to actually do the expensive
|
||||
// allocation step
|
||||
c.init();
|
||||
return c;
|
||||
}
|
||||
// someone else won race - that's fine, we'll try to grab theirs
|
||||
// in the next iteration of the loop.
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A chunk of memory out of which allocations are sliced.
|
||||
*/
|
||||
private static class Chunk {
|
||||
/**
|
||||
* Actual underlying data
|
||||
*/
|
||||
private long[] data;
|
||||
|
||||
private static final int UNINITIALIZED = -1;
|
||||
private static final int OOM = -2;
|
||||
/**
|
||||
* Offset for the next allocation, or the sentinel value -1
|
||||
* which implies that the chunk is still uninitialized.
|
||||
*/
|
||||
private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED);
|
||||
|
||||
/**
|
||||
* Total number of allocations satisfied from this buffer
|
||||
*/
|
||||
private AtomicInteger allocCount = new AtomicInteger();
|
||||
|
||||
/**
|
||||
* Size of chunk in longs
|
||||
*/
|
||||
private final int size;
|
||||
|
||||
/**
|
||||
* Create an uninitialized chunk. Note that memory is not allocated yet, so
|
||||
* this is cheap.
|
||||
*
|
||||
* @param size in longs
|
||||
*/
|
||||
private Chunk(int size) {
|
||||
this.size = size;
|
||||
}
|
||||
|
||||
/**
|
||||
* Actually claim the memory for this chunk. This should only be called from
|
||||
* the thread that constructed the chunk. It is thread-safe against other
|
||||
* threads calling alloc(), who will block until the allocation is complete.
|
||||
*/
|
||||
public void init() {
|
||||
assert nextFreeOffset.get() == UNINITIALIZED;
|
||||
try {
|
||||
data = new long[size];
|
||||
} catch (OutOfMemoryError e) {
|
||||
boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM);
|
||||
assert failInit; // should be true.
|
||||
throw e;
|
||||
}
|
||||
// Mark that it's ready for use
|
||||
boolean initted = nextFreeOffset.compareAndSet(
|
||||
UNINITIALIZED, 0);
|
||||
// We should always succeed the above CAS since only one thread
|
||||
// calls init()!
|
||||
Preconditions.checkState(initted,
|
||||
"Multiple threads tried to init same chunk");
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to allocate <code>size</code> longs from the chunk.
|
||||
*
|
||||
* @return the offset of the successful allocation, or -1 to indicate not-enough-space
|
||||
*/
|
||||
public int alloc(int size) {
|
||||
while (true) {
|
||||
int oldOffset = nextFreeOffset.get();
|
||||
if (oldOffset == UNINITIALIZED) {
|
||||
// The chunk doesn't have its data allocated yet.
|
||||
// Since we found this in curChunk, we know that whoever
|
||||
// CAS-ed it there is allocating it right now. So spin-loop
|
||||
// shouldn't spin long!
|
||||
Thread.yield();
|
||||
continue;
|
||||
}
|
||||
if (oldOffset == OOM) {
|
||||
// doh we ran out of ram. return -1 to chuck this away.
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (oldOffset + size > data.length) {
|
||||
return -1; // alloc doesn't fit
|
||||
}
|
||||
|
||||
// Try to atomically claim this chunk
|
||||
if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) {
|
||||
// we got the alloc
|
||||
allocCount.incrementAndGet();
|
||||
return oldOffset;
|
||||
}
|
||||
// we raced and lost alloc, try again
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Chunk@" + System.identityHashCode(this) +
|
||||
" allocs=" + allocCount.get() + "waste=" +
|
||||
(data.length - nextFreeOffset.get());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The result of a single allocation. Contains the chunk that the
|
||||
* allocation points into, and the offset in this array where the
|
||||
* slice begins.
|
||||
*/
|
||||
public static class Allocation {
|
||||
private final long[] data;
|
||||
private final int offset;
|
||||
|
||||
private Allocation(long[] data, int off) {
|
||||
this.data = data;
|
||||
this.offset = off;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Allocation(data=" + data +
|
||||
" with capacity=" + data.length +
|
||||
", off=" + offset + ")";
|
||||
}
|
||||
|
||||
public long[] getData() {
|
||||
return data;
|
||||
}
|
||||
|
||||
public int getOffset() {
|
||||
return offset;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -36,6 +36,7 @@ public class CacheStats implements Streamable, ToXContent {
|
||||
|
||||
long fieldEvictions;
|
||||
long filterEvictions;
|
||||
long filterMemEvictions;
|
||||
long filterCount;
|
||||
long fieldSize = 0;
|
||||
long filterSize = 0;
|
||||
@ -44,9 +45,10 @@ public class CacheStats implements Streamable, ToXContent {
|
||||
public CacheStats() {
|
||||
}
|
||||
|
||||
public CacheStats(long fieldEvictions, long filterEvictions, long fieldSize, long filterSize, long filterCount, long bloomSize) {
|
||||
public CacheStats(long fieldEvictions, long filterEvictions, long filterMemEvictions, long fieldSize, long filterSize, long filterCount, long bloomSize) {
|
||||
this.fieldEvictions = fieldEvictions;
|
||||
this.filterEvictions = filterEvictions;
|
||||
this.filterMemEvictions = filterMemEvictions;
|
||||
this.fieldSize = fieldSize;
|
||||
this.filterSize = filterSize;
|
||||
this.filterCount = filterCount;
|
||||
@ -56,6 +58,7 @@ public class CacheStats implements Streamable, ToXContent {
|
||||
public void add(CacheStats stats) {
|
||||
this.fieldEvictions += stats.fieldEvictions;
|
||||
this.filterEvictions += stats.filterEvictions;
|
||||
this.filterMemEvictions += stats.filterMemEvictions;
|
||||
this.fieldSize += stats.fieldSize;
|
||||
this.filterSize += stats.filterSize;
|
||||
this.filterCount += stats.filterCount;
|
||||
@ -78,6 +81,14 @@ public class CacheStats implements Streamable, ToXContent {
|
||||
return this.filterEvictions;
|
||||
}
|
||||
|
||||
public long filterMemEvictions() {
|
||||
return this.filterEvictions;
|
||||
}
|
||||
|
||||
public long getFilterMemEvictions() {
|
||||
return this.filterEvictions;
|
||||
}
|
||||
|
||||
public long filterCount() {
|
||||
return this.filterCount;
|
||||
}
|
||||
@ -141,6 +152,7 @@ public class CacheStats implements Streamable, ToXContent {
|
||||
builder.field(Fields.FIELD_SIZE_IN_BYTES, fieldSize);
|
||||
builder.field(Fields.FILTER_COUNT, filterCount);
|
||||
builder.field(Fields.FILTER_EVICTIONS, filterEvictions);
|
||||
builder.field(Fields.FILTER_MEM_EVICTIONS, filterMemEvictions);
|
||||
builder.field(Fields.FILTER_SIZE, filterSize().toString());
|
||||
builder.field(Fields.FILTER_SIZE_IN_BYTES, filterSize);
|
||||
builder.endObject();
|
||||
@ -153,6 +165,7 @@ public class CacheStats implements Streamable, ToXContent {
|
||||
static final XContentBuilderString FIELD_SIZE_IN_BYTES = new XContentBuilderString("field_size_in_bytes");
|
||||
static final XContentBuilderString FIELD_EVICTIONS = new XContentBuilderString("field_evictions");
|
||||
static final XContentBuilderString FILTER_EVICTIONS = new XContentBuilderString("filter_evictions");
|
||||
static final XContentBuilderString FILTER_MEM_EVICTIONS = new XContentBuilderString("filter_mem_evictions");
|
||||
static final XContentBuilderString FILTER_COUNT = new XContentBuilderString("filter_count");
|
||||
static final XContentBuilderString FILTER_SIZE = new XContentBuilderString("filter_size");
|
||||
static final XContentBuilderString FILTER_SIZE_IN_BYTES = new XContentBuilderString("filter_size_in_bytes");
|
||||
@ -167,6 +180,7 @@ public class CacheStats implements Streamable, ToXContent {
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
fieldEvictions = in.readVLong();
|
||||
filterEvictions = in.readVLong();
|
||||
filterMemEvictions = in.readVLong();
|
||||
fieldSize = in.readVLong();
|
||||
filterSize = in.readVLong();
|
||||
filterCount = in.readVLong();
|
||||
@ -176,6 +190,7 @@ public class CacheStats implements Streamable, ToXContent {
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVLong(fieldEvictions);
|
||||
out.writeVLong(filterEvictions);
|
||||
out.writeVLong(filterMemEvictions);
|
||||
out.writeVLong(fieldSize);
|
||||
out.writeVLong(filterSize);
|
||||
out.writeVLong(filterCount);
|
||||
|
@ -73,7 +73,7 @@ public class IndexCache extends AbstractIndexComponent implements CloseableCompo
|
||||
}
|
||||
|
||||
public CacheStats stats() {
|
||||
return new CacheStats(fieldDataCache.evictions(), filterCache.evictions(), fieldDataCache.sizeInBytes(), filterCache.sizeInBytes(), filterCache.count(), bloomCache.sizeInBytes());
|
||||
return new CacheStats(fieldDataCache.evictions(), filterCache.evictions(), filterCache.memEvictions(), fieldDataCache.sizeInBytes(), filterCache.sizeInBytes(), filterCache.count(), bloomCache.sizeInBytes());
|
||||
}
|
||||
|
||||
public FilterCache filter() {
|
||||
|
@ -44,4 +44,6 @@ public interface FilterCache extends IndexComponent, CloseableComponent {
|
||||
long sizeInBytes();
|
||||
|
||||
long evictions();
|
||||
|
||||
long memEvictions();
|
||||
}
|
||||
|
@ -73,4 +73,8 @@ public class NoneFilterCache extends AbstractIndexComponent implements FilterCac
|
||||
@Override public long evictions() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override public long memEvictions() {
|
||||
return 0;
|
||||
}
|
||||
}
|
@ -74,6 +74,10 @@ public class ResidentFilterCache extends AbstractConcurrentMapFilterCache implem
|
||||
return evictions.get();
|
||||
}
|
||||
|
||||
@Override public long memEvictions() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override public void onEviction(Filter filter, DocSet docSet) {
|
||||
evictions.incrementAndGet();
|
||||
}
|
||||
|
@ -46,6 +46,7 @@ public class SoftFilterCache extends AbstractConcurrentMapFilterCache implements
|
||||
private final TimeValue expire;
|
||||
|
||||
private final AtomicLong evictions = new AtomicLong();
|
||||
private final AtomicLong memEvictions = new AtomicLong();
|
||||
|
||||
@Inject public SoftFilterCache(Index index, @IndexSettings Settings indexSettings) {
|
||||
super(index, indexSettings);
|
||||
@ -53,10 +54,17 @@ public class SoftFilterCache extends AbstractConcurrentMapFilterCache implements
|
||||
this.expire = componentSettings.getAsTime("expire", null);
|
||||
}
|
||||
|
||||
@Override protected ConcurrentMap<Object, ReaderValue> buildCache() {
|
||||
// better to have soft on the whole ReaderValue, simpler on the GC to clean it
|
||||
MapMaker mapMaker = new MapMaker().weakKeys().softValues();
|
||||
mapMaker.evictionListener(new CacheMapEvictionListener(memEvictions));
|
||||
return mapMaker.makeMap();
|
||||
}
|
||||
|
||||
@Override protected ConcurrentMap<Filter, DocSet> buildFilterMap() {
|
||||
// DocSet are not really stored with strong reference only when searching on them...
|
||||
// Filter might be stored in query cache
|
||||
MapMaker mapMaker = new MapMaker().softValues();
|
||||
MapMaker mapMaker = new MapMaker();
|
||||
if (maxSize != -1) {
|
||||
mapMaker.maximumSize(maxSize);
|
||||
}
|
||||
@ -75,6 +83,10 @@ public class SoftFilterCache extends AbstractConcurrentMapFilterCache implements
|
||||
return evictions.get();
|
||||
}
|
||||
|
||||
@Override public long memEvictions() {
|
||||
return memEvictions.get();
|
||||
}
|
||||
|
||||
@Override public void onEviction(Filter filter, DocSet docSet) {
|
||||
evictions.incrementAndGet();
|
||||
}
|
||||
|
@ -21,10 +21,20 @@ package org.elasticsearch.index.cache.filter.support;
|
||||
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.search.DocIdSet;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.search.Filter;
|
||||
import org.apache.lucene.util.OpenBitSet;
|
||||
import org.elasticsearch.common.RamUsage;
|
||||
import org.elasticsearch.common.collect.MapEvictionListener;
|
||||
import org.elasticsearch.common.collect.MapMaker;
|
||||
import org.elasticsearch.common.lab.LongsLAB;
|
||||
import org.elasticsearch.common.lucene.docset.DocSet;
|
||||
import org.elasticsearch.common.lucene.docset.DocSets;
|
||||
import org.elasticsearch.common.lucene.docset.OpenBitDocSet;
|
||||
import org.elasticsearch.common.lucene.docset.SlicedOpenBitSet;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.index.AbstractIndexComponent;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.cache.filter.FilterCache;
|
||||
@ -32,8 +42,8 @@ import org.elasticsearch.index.settings.IndexSettings;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import static org.elasticsearch.common.lucene.docset.DocSets.*;
|
||||
import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.*;
|
||||
|
||||
/**
|
||||
@ -43,13 +53,42 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.*;
|
||||
*/
|
||||
public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComponent implements FilterCache {
|
||||
|
||||
final ConcurrentMap<Object, ConcurrentMap<Filter, DocSet>> cache;
|
||||
final ConcurrentMap<Object, ReaderValue> cache;
|
||||
|
||||
final boolean labEnabled;
|
||||
final ByteSizeValue labMaxAlloc;
|
||||
final ByteSizeValue labChunkSize;
|
||||
|
||||
final int labMaxAllocBytes;
|
||||
final int labChunkSizeBytes;
|
||||
|
||||
protected AbstractConcurrentMapFilterCache(Index index, @IndexSettings Settings indexSettings) {
|
||||
super(index, indexSettings);
|
||||
// weak keys is fine, it will only be cleared once IndexReader references will be removed
|
||||
// (assuming clear(...) will not be called)
|
||||
this.cache = new MapMaker().weakKeys().makeMap();
|
||||
this.cache = buildCache();
|
||||
|
||||
// The LAB is stored per reader, so whole chunks will be cleared once reader is discarded.
|
||||
// This means that with filter entry specific based eviction, like access time
|
||||
// we might get into cases where the LAB is held by a puny filter and other filters have been released.
|
||||
// This usually will not be that bad, compared to the GC benefit of using a LAB, but, that is why
|
||||
// the soft filter cache is recommended.
|
||||
this.labEnabled = componentSettings.getAsBoolean("lab", false);
|
||||
// These values should not be too high, basically we want to cached the small readers and use the LAB for
|
||||
// them, 1M docs on OpenBitSet is around 110kb.
|
||||
this.labMaxAlloc = componentSettings.getAsBytesSize("lab.max_alloc", new ByteSizeValue(128, ByteSizeUnit.KB));
|
||||
this.labChunkSize = componentSettings.getAsBytesSize("lab.chunk_size", new ByteSizeValue(1, ByteSizeUnit.MB));
|
||||
|
||||
this.labMaxAllocBytes = (int) (labMaxAlloc.bytes() / RamUsage.NUM_BYTES_LONG);
|
||||
this.labChunkSizeBytes = (int) (labChunkSize.bytes() / RamUsage.NUM_BYTES_LONG);
|
||||
}
|
||||
|
||||
protected ConcurrentMap<Object, ReaderValue> buildCache() {
|
||||
return new MapMaker().weakKeys().makeMap();
|
||||
}
|
||||
|
||||
protected ConcurrentMap<Filter, DocSet> buildFilterMap() {
|
||||
return newConcurrentMap();
|
||||
}
|
||||
|
||||
@Override public void close() {
|
||||
@ -61,17 +100,17 @@ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComp
|
||||
}
|
||||
|
||||
@Override public void clear(IndexReader reader) {
|
||||
ConcurrentMap<Filter, DocSet> map = cache.remove(reader.getCoreCacheKey());
|
||||
ReaderValue readerValue = cache.remove(reader.getCoreCacheKey());
|
||||
// help soft/weak handling GC
|
||||
if (map != null) {
|
||||
map.clear();
|
||||
if (readerValue != null) {
|
||||
readerValue.filters().clear();
|
||||
}
|
||||
}
|
||||
|
||||
@Override public long sizeInBytes() {
|
||||
long sizeInBytes = 0;
|
||||
for (ConcurrentMap<Filter, DocSet> map : cache.values()) {
|
||||
for (DocSet docSet : map.values()) {
|
||||
for (ReaderValue readerValue : cache.values()) {
|
||||
for (DocSet docSet : readerValue.filters().values()) {
|
||||
sizeInBytes += docSet.sizeInBytes();
|
||||
}
|
||||
}
|
||||
@ -80,8 +119,8 @@ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComp
|
||||
|
||||
@Override public long count() {
|
||||
long entries = 0;
|
||||
for (ConcurrentMap<Filter, DocSet> map : cache.values()) {
|
||||
entries += map.size();
|
||||
for (ReaderValue readerValue : cache.values()) {
|
||||
entries += readerValue.filters().size();
|
||||
}
|
||||
return entries;
|
||||
}
|
||||
@ -97,10 +136,6 @@ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComp
|
||||
return filter instanceof FilterCacheFilterWrapper;
|
||||
}
|
||||
|
||||
protected ConcurrentMap<Filter, DocSet> buildFilterMap() {
|
||||
return newConcurrentMap();
|
||||
}
|
||||
|
||||
// LUCENE MONITOR: Check next version Lucene for CachingWrapperFilter, consider using that logic
|
||||
// and not use the DeletableConstantScoreQuery, instead pass the DeletesMode enum to the cache method
|
||||
// see: https://issues.apache.org/jira/browse/LUCENE-2468
|
||||
@ -117,21 +152,25 @@ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComp
|
||||
}
|
||||
|
||||
@Override public DocIdSet getDocIdSet(IndexReader reader) throws IOException {
|
||||
ConcurrentMap<Filter, DocSet> cachedFilters = cache.cache.get(reader.getCoreCacheKey());
|
||||
if (cachedFilters == null) {
|
||||
cachedFilters = cache.buildFilterMap();
|
||||
ConcurrentMap<Filter, DocSet> prev = cache.cache.putIfAbsent(reader.getCoreCacheKey(), cachedFilters);
|
||||
ReaderValue readerValue = cache.cache.get(reader.getCoreCacheKey());
|
||||
if (readerValue == null) {
|
||||
LongsLAB longsLAB = null;
|
||||
if (cache.labEnabled) {
|
||||
longsLAB = new LongsLAB(cache.labChunkSizeBytes, cache.labMaxAllocBytes);
|
||||
}
|
||||
readerValue = new ReaderValue(cache.buildFilterMap(), longsLAB);
|
||||
ReaderValue prev = cache.cache.putIfAbsent(reader.getCoreCacheKey(), readerValue);
|
||||
if (prev != null) {
|
||||
cachedFilters = prev;
|
||||
readerValue = prev;
|
||||
}
|
||||
}
|
||||
DocSet docSet = cachedFilters.get(filter);
|
||||
DocSet docSet = readerValue.filters().get(filter);
|
||||
if (docSet != null) {
|
||||
return docSet;
|
||||
}
|
||||
DocIdSet docIdSet = filter.getDocIdSet(reader);
|
||||
docSet = cacheable(reader, docIdSet);
|
||||
DocSet prev = cachedFilters.putIfAbsent(filter, docSet);
|
||||
docSet = cacheable(reader, readerValue, docIdSet);
|
||||
DocSet prev = readerValue.filters().putIfAbsent(filter, docSet);
|
||||
if (prev != null) {
|
||||
docSet = prev;
|
||||
}
|
||||
@ -150,5 +189,82 @@ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComp
|
||||
public int hashCode() {
|
||||
return filter.hashCode() ^ 0x1117BF25;
|
||||
}
|
||||
|
||||
private DocSet cacheable(IndexReader reader, ReaderValue readerValue, DocIdSet set) throws IOException {
|
||||
if (set == null) {
|
||||
return DocSet.EMPTY_DOC_SET;
|
||||
}
|
||||
if (set == DocIdSet.EMPTY_DOCIDSET) {
|
||||
return DocSet.EMPTY_DOC_SET;
|
||||
}
|
||||
|
||||
DocIdSetIterator it = set.iterator();
|
||||
if (it == null) {
|
||||
return DocSet.EMPTY_DOC_SET;
|
||||
}
|
||||
int doc = it.nextDoc();
|
||||
if (doc == DocIdSetIterator.NO_MORE_DOCS) {
|
||||
return DocSet.EMPTY_DOC_SET;
|
||||
}
|
||||
|
||||
// we have a LAB, check if can be used...
|
||||
if (readerValue.longsLAB() == null) {
|
||||
return DocSets.cacheable(reader, set);
|
||||
}
|
||||
|
||||
int numOfWords = OpenBitSet.bits2words(reader.maxDoc());
|
||||
LongsLAB.Allocation allocation = readerValue.longsLAB().allocateLongs(numOfWords);
|
||||
if (allocation == null) {
|
||||
return DocSets.cacheable(reader, set);
|
||||
}
|
||||
// we have an allocation, use it to create SlicedOpenBitSet
|
||||
if (set instanceof OpenBitSet) {
|
||||
return new SlicedOpenBitSet(allocation.getData(), allocation.getOffset(), (OpenBitSet) set);
|
||||
} else if (set instanceof OpenBitDocSet) {
|
||||
return new SlicedOpenBitSet(allocation.getData(), allocation.getOffset(), ((OpenBitDocSet) set).set());
|
||||
} else {
|
||||
SlicedOpenBitSet slicedSet = new SlicedOpenBitSet(allocation.getData(), numOfWords, allocation.getOffset());
|
||||
slicedSet.fastSet(doc); // we already have an open iterator, so use it, and don't forget to set the initial one
|
||||
while ((doc = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
|
||||
slicedSet.fastSet(doc);
|
||||
}
|
||||
return slicedSet;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class ReaderValue {
|
||||
private final ConcurrentMap<Filter, DocSet> filters;
|
||||
private final LongsLAB longsLAB;
|
||||
|
||||
public ReaderValue(ConcurrentMap<Filter, DocSet> filters, LongsLAB longsLAB) {
|
||||
this.filters = filters;
|
||||
this.longsLAB = longsLAB;
|
||||
}
|
||||
|
||||
public ConcurrentMap<Filter, DocSet> filters() {
|
||||
return filters;
|
||||
}
|
||||
|
||||
public LongsLAB longsLAB() {
|
||||
return longsLAB;
|
||||
}
|
||||
}
|
||||
|
||||
public static class CacheMapEvictionListener implements MapEvictionListener<Object, ReaderValue> {
|
||||
|
||||
private final AtomicLong evictions;
|
||||
|
||||
public CacheMapEvictionListener(AtomicLong evictions) {
|
||||
this.evictions = evictions;
|
||||
}
|
||||
|
||||
@Override public void onEviction(Object o, ReaderValue readerValue) {
|
||||
evictions.incrementAndGet();
|
||||
if (readerValue != null) {
|
||||
// extra clean the map
|
||||
readerValue.filters().clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -46,6 +46,7 @@ public class WeakFilterCache extends AbstractConcurrentMapFilterCache implements
|
||||
private final TimeValue expire;
|
||||
|
||||
private final AtomicLong evictions = new AtomicLong();
|
||||
private final AtomicLong memEvictions = new AtomicLong();
|
||||
|
||||
@Inject public WeakFilterCache(Index index, @IndexSettings Settings indexSettings) {
|
||||
super(index, indexSettings);
|
||||
@ -53,10 +54,15 @@ public class WeakFilterCache extends AbstractConcurrentMapFilterCache implements
|
||||
this.expire = componentSettings.getAsTime("expire", null);
|
||||
}
|
||||
|
||||
@Override protected ConcurrentMap<Object, ReaderValue> buildCache() {
|
||||
// better to have weak on the whole ReaderValue, simpler on the GC to clean it
|
||||
MapMaker mapMaker = new MapMaker().weakKeys().softValues();
|
||||
mapMaker.evictionListener(new CacheMapEvictionListener(memEvictions));
|
||||
return mapMaker.makeMap();
|
||||
}
|
||||
|
||||
@Override protected ConcurrentMap<Filter, DocSet> buildFilterMap() {
|
||||
// DocSet are not really stored with strong reference only when searching on them...
|
||||
// Filter might be stored in query cache
|
||||
MapMaker mapMaker = new MapMaker().weakValues();
|
||||
MapMaker mapMaker = new MapMaker();
|
||||
if (maxSize != -1) {
|
||||
mapMaker.maximumSize(maxSize);
|
||||
}
|
||||
@ -75,6 +81,10 @@ public class WeakFilterCache extends AbstractConcurrentMapFilterCache implements
|
||||
return evictions.get();
|
||||
}
|
||||
|
||||
@Override public long memEvictions() {
|
||||
return memEvictions.get();
|
||||
}
|
||||
|
||||
@Override public void onEviction(Filter filter, DocSet docSet) {
|
||||
evictions.incrementAndGet();
|
||||
}
|
||||
|
@ -222,13 +222,13 @@ public class JmxService {
|
||||
}
|
||||
} catch (InstanceAlreadyExistsException e) {
|
||||
//this might happen if multiple instances are trying to concurrently register same objectName
|
||||
logger.warn("Could not register object with name:" + objectName + "(" + e.getMessage() + ")");
|
||||
logger.debug("Could not register object with name:" + objectName + "(" + e.getMessage() + ")");
|
||||
}
|
||||
} else {
|
||||
logger.warn("Could not register object with name: " + objectName);
|
||||
logger.debug("Could not register object with name: " + objectName + ", already registered");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("Could not register object with name: " + resourceDMBean.getFullObjectName());
|
||||
logger.warn("Could not register object with name: " + resourceDMBean.getFullObjectName() + "(" + e.getMessage() + ")");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -59,7 +59,7 @@ public class FilterCacheGcStress {
|
||||
@Override public void run() {
|
||||
while (!stop.get()) {
|
||||
client.prepareSearch()
|
||||
.setQuery(filteredQuery(matchAllQuery(), rangeFilter("field").from(System.currentTimeMillis())))
|
||||
.setQuery(filteredQuery(matchAllQuery(), rangeFilter("field").from(System.currentTimeMillis() - 1000000)))
|
||||
.execute().actionGet();
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user