From 8eab5ec5286475b5c0602d8c230947c781db6596 Mon Sep 17 00:00:00 2001 From: kimchy Date: Sat, 9 Apr 2011 20:21:23 +0300 Subject: [PATCH] experiment with lab for filter cache, disabled by default for now, also, move to use soft reference on the whole reader key value, and not per filter (less load) --- .../elasticsearch/common/lab/LongsLAB.java | 249 ++++++++++++++++++ .../elasticsearch/index/cache/CacheStats.java | 17 +- .../elasticsearch/index/cache/IndexCache.java | 2 +- .../index/cache/filter/FilterCache.java | 2 + .../cache/filter/none/NoneFilterCache.java | 4 + .../filter/resident/ResidentFilterCache.java | 4 + .../cache/filter/soft/SoftFilterCache.java | 14 +- .../AbstractConcurrentMapFilterCache.java | 160 +++++++++-- .../cache/filter/weak/WeakFilterCache.java | 16 +- .../org/elasticsearch/jmx/JmxService.java | 6 +- .../gcbehavior/FilterCacheGcStress.java | 2 +- 11 files changed, 444 insertions(+), 32 deletions(-) create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/common/lab/LongsLAB.java diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/common/lab/LongsLAB.java b/modules/elasticsearch/src/main/java/org/elasticsearch/common/lab/LongsLAB.java new file mode 100644 index 00000000000..639350c862e --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/common/lab/LongsLAB.java @@ -0,0 +1,249 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.lab; + +import org.elasticsearch.common.Preconditions; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +public class LongsLAB { + + private AtomicReference curChunk = new AtomicReference(); + + final int chunkSize; + final int maxAlloc; + + public LongsLAB(int chunkSize, int maxAlloc) { + this.chunkSize = chunkSize; + this.maxAlloc = maxAlloc; + // if we don't exclude allocations >CHUNK_SIZE, we'd infiniteloop on one! + Preconditions.checkArgument(maxAlloc <= chunkSize); + } + + /** + * Allocate a slice of the given length. + * + * If the size is larger than the maximum size specified for this + * allocator, returns null. + */ + public Allocation allocateLongs(int size) { + Preconditions.checkArgument(size >= 0, "negative size"); + + // Callers should satisfy large allocations directly from JVM since they + // don't cause fragmentation as badly. + if (size > maxAlloc) { + return null; + } + + while (true) { + Chunk c = getOrMakeChunk(); + + // Try to allocate from this chunk + int allocOffset = c.alloc(size); + if (allocOffset != -1) { + // We succeeded - this is the common case - small alloc + // from a big buffer + return new Allocation(c.data, allocOffset); + } + + // not enough space! + // try to retire this chunk + tryRetireChunk(c); + } + } + + /** + * Try to retire the current chunk if it is still + * c. Postcondition is that curChunk.get() + * != c + */ + private void tryRetireChunk(Chunk c) { + @SuppressWarnings("unused") + boolean weRetiredIt = curChunk.compareAndSet(c, null); + // If the CAS succeeds, that means that we won the race + // to retire the chunk. We could use this opportunity to + // update metrics on external fragmentation. + // + // If the CAS fails, that means that someone else already + // retired the chunk for us. + } + + /** + * Get the current chunk, or, if there is no current chunk, + * allocate a new one from the JVM. + */ + private Chunk getOrMakeChunk() { + while (true) { + // Try to get the chunk + Chunk c = curChunk.get(); + if (c != null) { + return c; + } + + // No current chunk, so we want to allocate one. We race + // against other allocators to CAS in an uninitialized chunk + // (which is cheap to allocate) + c = new Chunk(chunkSize); + if (curChunk.compareAndSet(null, c)) { + // we won race - now we need to actually do the expensive + // allocation step + c.init(); + return c; + } + // someone else won race - that's fine, we'll try to grab theirs + // in the next iteration of the loop. + } + } + + /** + * A chunk of memory out of which allocations are sliced. + */ + private static class Chunk { + /** + * Actual underlying data + */ + private long[] data; + + private static final int UNINITIALIZED = -1; + private static final int OOM = -2; + /** + * Offset for the next allocation, or the sentinel value -1 + * which implies that the chunk is still uninitialized. + */ + private AtomicInteger nextFreeOffset = new AtomicInteger(UNINITIALIZED); + + /** + * Total number of allocations satisfied from this buffer + */ + private AtomicInteger allocCount = new AtomicInteger(); + + /** + * Size of chunk in longs + */ + private final int size; + + /** + * Create an uninitialized chunk. Note that memory is not allocated yet, so + * this is cheap. + * + * @param size in longs + */ + private Chunk(int size) { + this.size = size; + } + + /** + * Actually claim the memory for this chunk. This should only be called from + * the thread that constructed the chunk. It is thread-safe against other + * threads calling alloc(), who will block until the allocation is complete. + */ + public void init() { + assert nextFreeOffset.get() == UNINITIALIZED; + try { + data = new long[size]; + } catch (OutOfMemoryError e) { + boolean failInit = nextFreeOffset.compareAndSet(UNINITIALIZED, OOM); + assert failInit; // should be true. + throw e; + } + // Mark that it's ready for use + boolean initted = nextFreeOffset.compareAndSet( + UNINITIALIZED, 0); + // We should always succeed the above CAS since only one thread + // calls init()! + Preconditions.checkState(initted, + "Multiple threads tried to init same chunk"); + } + + /** + * Try to allocate size longs from the chunk. + * + * @return the offset of the successful allocation, or -1 to indicate not-enough-space + */ + public int alloc(int size) { + while (true) { + int oldOffset = nextFreeOffset.get(); + if (oldOffset == UNINITIALIZED) { + // The chunk doesn't have its data allocated yet. + // Since we found this in curChunk, we know that whoever + // CAS-ed it there is allocating it right now. So spin-loop + // shouldn't spin long! + Thread.yield(); + continue; + } + if (oldOffset == OOM) { + // doh we ran out of ram. return -1 to chuck this away. + return -1; + } + + if (oldOffset + size > data.length) { + return -1; // alloc doesn't fit + } + + // Try to atomically claim this chunk + if (nextFreeOffset.compareAndSet(oldOffset, oldOffset + size)) { + // we got the alloc + allocCount.incrementAndGet(); + return oldOffset; + } + // we raced and lost alloc, try again + } + } + + @Override + public String toString() { + return "Chunk@" + System.identityHashCode(this) + + " allocs=" + allocCount.get() + "waste=" + + (data.length - nextFreeOffset.get()); + } + } + + /** + * The result of a single allocation. Contains the chunk that the + * allocation points into, and the offset in this array where the + * slice begins. + */ + public static class Allocation { + private final long[] data; + private final int offset; + + private Allocation(long[] data, int off) { + this.data = data; + this.offset = off; + } + + @Override + public String toString() { + return "Allocation(data=" + data + + " with capacity=" + data.length + + ", off=" + offset + ")"; + } + + public long[] getData() { + return data; + } + + public int getOffset() { + return offset; + } + } + +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/CacheStats.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/CacheStats.java index e5b24e0490e..cc7ab96ee3a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/CacheStats.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/CacheStats.java @@ -36,6 +36,7 @@ public class CacheStats implements Streamable, ToXContent { long fieldEvictions; long filterEvictions; + long filterMemEvictions; long filterCount; long fieldSize = 0; long filterSize = 0; @@ -44,9 +45,10 @@ public class CacheStats implements Streamable, ToXContent { public CacheStats() { } - public CacheStats(long fieldEvictions, long filterEvictions, long fieldSize, long filterSize, long filterCount, long bloomSize) { + public CacheStats(long fieldEvictions, long filterEvictions, long filterMemEvictions, long fieldSize, long filterSize, long filterCount, long bloomSize) { this.fieldEvictions = fieldEvictions; this.filterEvictions = filterEvictions; + this.filterMemEvictions = filterMemEvictions; this.fieldSize = fieldSize; this.filterSize = filterSize; this.filterCount = filterCount; @@ -56,6 +58,7 @@ public class CacheStats implements Streamable, ToXContent { public void add(CacheStats stats) { this.fieldEvictions += stats.fieldEvictions; this.filterEvictions += stats.filterEvictions; + this.filterMemEvictions += stats.filterMemEvictions; this.fieldSize += stats.fieldSize; this.filterSize += stats.filterSize; this.filterCount += stats.filterCount; @@ -78,6 +81,14 @@ public class CacheStats implements Streamable, ToXContent { return this.filterEvictions; } + public long filterMemEvictions() { + return this.filterEvictions; + } + + public long getFilterMemEvictions() { + return this.filterEvictions; + } + public long filterCount() { return this.filterCount; } @@ -141,6 +152,7 @@ public class CacheStats implements Streamable, ToXContent { builder.field(Fields.FIELD_SIZE_IN_BYTES, fieldSize); builder.field(Fields.FILTER_COUNT, filterCount); builder.field(Fields.FILTER_EVICTIONS, filterEvictions); + builder.field(Fields.FILTER_MEM_EVICTIONS, filterMemEvictions); builder.field(Fields.FILTER_SIZE, filterSize().toString()); builder.field(Fields.FILTER_SIZE_IN_BYTES, filterSize); builder.endObject(); @@ -153,6 +165,7 @@ public class CacheStats implements Streamable, ToXContent { static final XContentBuilderString FIELD_SIZE_IN_BYTES = new XContentBuilderString("field_size_in_bytes"); static final XContentBuilderString FIELD_EVICTIONS = new XContentBuilderString("field_evictions"); static final XContentBuilderString FILTER_EVICTIONS = new XContentBuilderString("filter_evictions"); + static final XContentBuilderString FILTER_MEM_EVICTIONS = new XContentBuilderString("filter_mem_evictions"); static final XContentBuilderString FILTER_COUNT = new XContentBuilderString("filter_count"); static final XContentBuilderString FILTER_SIZE = new XContentBuilderString("filter_size"); static final XContentBuilderString FILTER_SIZE_IN_BYTES = new XContentBuilderString("filter_size_in_bytes"); @@ -167,6 +180,7 @@ public class CacheStats implements Streamable, ToXContent { @Override public void readFrom(StreamInput in) throws IOException { fieldEvictions = in.readVLong(); filterEvictions = in.readVLong(); + filterMemEvictions = in.readVLong(); fieldSize = in.readVLong(); filterSize = in.readVLong(); filterCount = in.readVLong(); @@ -176,6 +190,7 @@ public class CacheStats implements Streamable, ToXContent { @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(fieldEvictions); out.writeVLong(filterEvictions); + out.writeVLong(filterMemEvictions); out.writeVLong(fieldSize); out.writeVLong(filterSize); out.writeVLong(filterCount); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/IndexCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/IndexCache.java index 47968f8023f..117fa0b6b7e 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/IndexCache.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/IndexCache.java @@ -73,7 +73,7 @@ public class IndexCache extends AbstractIndexComponent implements CloseableCompo } public CacheStats stats() { - return new CacheStats(fieldDataCache.evictions(), filterCache.evictions(), fieldDataCache.sizeInBytes(), filterCache.sizeInBytes(), filterCache.count(), bloomCache.sizeInBytes()); + return new CacheStats(fieldDataCache.evictions(), filterCache.evictions(), filterCache.memEvictions(), fieldDataCache.sizeInBytes(), filterCache.sizeInBytes(), filterCache.count(), bloomCache.sizeInBytes()); } public FilterCache filter() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/FilterCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/FilterCache.java index 52b7392ce6d..bb227488e59 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/FilterCache.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/FilterCache.java @@ -44,4 +44,6 @@ public interface FilterCache extends IndexComponent, CloseableComponent { long sizeInBytes(); long evictions(); + + long memEvictions(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/none/NoneFilterCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/none/NoneFilterCache.java index 0092249a238..c8deb0e2aeb 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/none/NoneFilterCache.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/none/NoneFilterCache.java @@ -73,4 +73,8 @@ public class NoneFilterCache extends AbstractIndexComponent implements FilterCac @Override public long evictions() { return 0; } + + @Override public long memEvictions() { + return 0; + } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/resident/ResidentFilterCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/resident/ResidentFilterCache.java index 56a04e84b3c..82c15ed49d2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/resident/ResidentFilterCache.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/resident/ResidentFilterCache.java @@ -74,6 +74,10 @@ public class ResidentFilterCache extends AbstractConcurrentMapFilterCache implem return evictions.get(); } + @Override public long memEvictions() { + return 0; + } + @Override public void onEviction(Filter filter, DocSet docSet) { evictions.incrementAndGet(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/soft/SoftFilterCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/soft/SoftFilterCache.java index f6818d245b9..f62ba235233 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/soft/SoftFilterCache.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/soft/SoftFilterCache.java @@ -46,6 +46,7 @@ public class SoftFilterCache extends AbstractConcurrentMapFilterCache implements private final TimeValue expire; private final AtomicLong evictions = new AtomicLong(); + private final AtomicLong memEvictions = new AtomicLong(); @Inject public SoftFilterCache(Index index, @IndexSettings Settings indexSettings) { super(index, indexSettings); @@ -53,10 +54,17 @@ public class SoftFilterCache extends AbstractConcurrentMapFilterCache implements this.expire = componentSettings.getAsTime("expire", null); } + @Override protected ConcurrentMap buildCache() { + // better to have soft on the whole ReaderValue, simpler on the GC to clean it + MapMaker mapMaker = new MapMaker().weakKeys().softValues(); + mapMaker.evictionListener(new CacheMapEvictionListener(memEvictions)); + return mapMaker.makeMap(); + } + @Override protected ConcurrentMap buildFilterMap() { // DocSet are not really stored with strong reference only when searching on them... // Filter might be stored in query cache - MapMaker mapMaker = new MapMaker().softValues(); + MapMaker mapMaker = new MapMaker(); if (maxSize != -1) { mapMaker.maximumSize(maxSize); } @@ -75,6 +83,10 @@ public class SoftFilterCache extends AbstractConcurrentMapFilterCache implements return evictions.get(); } + @Override public long memEvictions() { + return memEvictions.get(); + } + @Override public void onEviction(Filter filter, DocSet docSet) { evictions.incrementAndGet(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/support/AbstractConcurrentMapFilterCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/support/AbstractConcurrentMapFilterCache.java index d03323623dd..f73a73ce83b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/support/AbstractConcurrentMapFilterCache.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/support/AbstractConcurrentMapFilterCache.java @@ -21,10 +21,20 @@ package org.elasticsearch.index.cache.filter.support; import org.apache.lucene.index.IndexReader; import org.apache.lucene.search.DocIdSet; +import org.apache.lucene.search.DocIdSetIterator; import org.apache.lucene.search.Filter; +import org.apache.lucene.util.OpenBitSet; +import org.elasticsearch.common.RamUsage; +import org.elasticsearch.common.collect.MapEvictionListener; import org.elasticsearch.common.collect.MapMaker; +import org.elasticsearch.common.lab.LongsLAB; import org.elasticsearch.common.lucene.docset.DocSet; +import org.elasticsearch.common.lucene.docset.DocSets; +import org.elasticsearch.common.lucene.docset.OpenBitDocSet; +import org.elasticsearch.common.lucene.docset.SlicedOpenBitSet; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.ByteSizeUnit; +import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.Index; import org.elasticsearch.index.cache.filter.FilterCache; @@ -32,8 +42,8 @@ import org.elasticsearch.index.settings.IndexSettings; import java.io.IOException; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicLong; -import static org.elasticsearch.common.lucene.docset.DocSets.*; import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.*; /** @@ -43,13 +53,42 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.*; */ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComponent implements FilterCache { - final ConcurrentMap> cache; + final ConcurrentMap cache; + + final boolean labEnabled; + final ByteSizeValue labMaxAlloc; + final ByteSizeValue labChunkSize; + + final int labMaxAllocBytes; + final int labChunkSizeBytes; protected AbstractConcurrentMapFilterCache(Index index, @IndexSettings Settings indexSettings) { super(index, indexSettings); // weak keys is fine, it will only be cleared once IndexReader references will be removed // (assuming clear(...) will not be called) - this.cache = new MapMaker().weakKeys().makeMap(); + this.cache = buildCache(); + + // The LAB is stored per reader, so whole chunks will be cleared once reader is discarded. + // This means that with filter entry specific based eviction, like access time + // we might get into cases where the LAB is held by a puny filter and other filters have been released. + // This usually will not be that bad, compared to the GC benefit of using a LAB, but, that is why + // the soft filter cache is recommended. + this.labEnabled = componentSettings.getAsBoolean("lab", false); + // These values should not be too high, basically we want to cached the small readers and use the LAB for + // them, 1M docs on OpenBitSet is around 110kb. + this.labMaxAlloc = componentSettings.getAsBytesSize("lab.max_alloc", new ByteSizeValue(128, ByteSizeUnit.KB)); + this.labChunkSize = componentSettings.getAsBytesSize("lab.chunk_size", new ByteSizeValue(1, ByteSizeUnit.MB)); + + this.labMaxAllocBytes = (int) (labMaxAlloc.bytes() / RamUsage.NUM_BYTES_LONG); + this.labChunkSizeBytes = (int) (labChunkSize.bytes() / RamUsage.NUM_BYTES_LONG); + } + + protected ConcurrentMap buildCache() { + return new MapMaker().weakKeys().makeMap(); + } + + protected ConcurrentMap buildFilterMap() { + return newConcurrentMap(); } @Override public void close() { @@ -61,17 +100,17 @@ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComp } @Override public void clear(IndexReader reader) { - ConcurrentMap map = cache.remove(reader.getCoreCacheKey()); + ReaderValue readerValue = cache.remove(reader.getCoreCacheKey()); // help soft/weak handling GC - if (map != null) { - map.clear(); + if (readerValue != null) { + readerValue.filters().clear(); } } @Override public long sizeInBytes() { long sizeInBytes = 0; - for (ConcurrentMap map : cache.values()) { - for (DocSet docSet : map.values()) { + for (ReaderValue readerValue : cache.values()) { + for (DocSet docSet : readerValue.filters().values()) { sizeInBytes += docSet.sizeInBytes(); } } @@ -80,8 +119,8 @@ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComp @Override public long count() { long entries = 0; - for (ConcurrentMap map : cache.values()) { - entries += map.size(); + for (ReaderValue readerValue : cache.values()) { + entries += readerValue.filters().size(); } return entries; } @@ -97,10 +136,6 @@ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComp return filter instanceof FilterCacheFilterWrapper; } - protected ConcurrentMap buildFilterMap() { - return newConcurrentMap(); - } - // LUCENE MONITOR: Check next version Lucene for CachingWrapperFilter, consider using that logic // and not use the DeletableConstantScoreQuery, instead pass the DeletesMode enum to the cache method // see: https://issues.apache.org/jira/browse/LUCENE-2468 @@ -117,21 +152,25 @@ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComp } @Override public DocIdSet getDocIdSet(IndexReader reader) throws IOException { - ConcurrentMap cachedFilters = cache.cache.get(reader.getCoreCacheKey()); - if (cachedFilters == null) { - cachedFilters = cache.buildFilterMap(); - ConcurrentMap prev = cache.cache.putIfAbsent(reader.getCoreCacheKey(), cachedFilters); + ReaderValue readerValue = cache.cache.get(reader.getCoreCacheKey()); + if (readerValue == null) { + LongsLAB longsLAB = null; + if (cache.labEnabled) { + longsLAB = new LongsLAB(cache.labChunkSizeBytes, cache.labMaxAllocBytes); + } + readerValue = new ReaderValue(cache.buildFilterMap(), longsLAB); + ReaderValue prev = cache.cache.putIfAbsent(reader.getCoreCacheKey(), readerValue); if (prev != null) { - cachedFilters = prev; + readerValue = prev; } } - DocSet docSet = cachedFilters.get(filter); + DocSet docSet = readerValue.filters().get(filter); if (docSet != null) { return docSet; } DocIdSet docIdSet = filter.getDocIdSet(reader); - docSet = cacheable(reader, docIdSet); - DocSet prev = cachedFilters.putIfAbsent(filter, docSet); + docSet = cacheable(reader, readerValue, docIdSet); + DocSet prev = readerValue.filters().putIfAbsent(filter, docSet); if (prev != null) { docSet = prev; } @@ -150,5 +189,82 @@ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComp public int hashCode() { return filter.hashCode() ^ 0x1117BF25; } + + private DocSet cacheable(IndexReader reader, ReaderValue readerValue, DocIdSet set) throws IOException { + if (set == null) { + return DocSet.EMPTY_DOC_SET; + } + if (set == DocIdSet.EMPTY_DOCIDSET) { + return DocSet.EMPTY_DOC_SET; + } + + DocIdSetIterator it = set.iterator(); + if (it == null) { + return DocSet.EMPTY_DOC_SET; + } + int doc = it.nextDoc(); + if (doc == DocIdSetIterator.NO_MORE_DOCS) { + return DocSet.EMPTY_DOC_SET; + } + + // we have a LAB, check if can be used... + if (readerValue.longsLAB() == null) { + return DocSets.cacheable(reader, set); + } + + int numOfWords = OpenBitSet.bits2words(reader.maxDoc()); + LongsLAB.Allocation allocation = readerValue.longsLAB().allocateLongs(numOfWords); + if (allocation == null) { + return DocSets.cacheable(reader, set); + } + // we have an allocation, use it to create SlicedOpenBitSet + if (set instanceof OpenBitSet) { + return new SlicedOpenBitSet(allocation.getData(), allocation.getOffset(), (OpenBitSet) set); + } else if (set instanceof OpenBitDocSet) { + return new SlicedOpenBitSet(allocation.getData(), allocation.getOffset(), ((OpenBitDocSet) set).set()); + } else { + SlicedOpenBitSet slicedSet = new SlicedOpenBitSet(allocation.getData(), numOfWords, allocation.getOffset()); + slicedSet.fastSet(doc); // we already have an open iterator, so use it, and don't forget to set the initial one + while ((doc = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) { + slicedSet.fastSet(doc); + } + return slicedSet; + } + } + } + + public static class ReaderValue { + private final ConcurrentMap filters; + private final LongsLAB longsLAB; + + public ReaderValue(ConcurrentMap filters, LongsLAB longsLAB) { + this.filters = filters; + this.longsLAB = longsLAB; + } + + public ConcurrentMap filters() { + return filters; + } + + public LongsLAB longsLAB() { + return longsLAB; + } + } + + public static class CacheMapEvictionListener implements MapEvictionListener { + + private final AtomicLong evictions; + + public CacheMapEvictionListener(AtomicLong evictions) { + this.evictions = evictions; + } + + @Override public void onEviction(Object o, ReaderValue readerValue) { + evictions.incrementAndGet(); + if (readerValue != null) { + // extra clean the map + readerValue.filters().clear(); + } + } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/weak/WeakFilterCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/weak/WeakFilterCache.java index 6d4d363c616..efbc153b4bd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/weak/WeakFilterCache.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/cache/filter/weak/WeakFilterCache.java @@ -46,6 +46,7 @@ public class WeakFilterCache extends AbstractConcurrentMapFilterCache implements private final TimeValue expire; private final AtomicLong evictions = new AtomicLong(); + private final AtomicLong memEvictions = new AtomicLong(); @Inject public WeakFilterCache(Index index, @IndexSettings Settings indexSettings) { super(index, indexSettings); @@ -53,10 +54,15 @@ public class WeakFilterCache extends AbstractConcurrentMapFilterCache implements this.expire = componentSettings.getAsTime("expire", null); } + @Override protected ConcurrentMap buildCache() { + // better to have weak on the whole ReaderValue, simpler on the GC to clean it + MapMaker mapMaker = new MapMaker().weakKeys().softValues(); + mapMaker.evictionListener(new CacheMapEvictionListener(memEvictions)); + return mapMaker.makeMap(); + } + @Override protected ConcurrentMap buildFilterMap() { - // DocSet are not really stored with strong reference only when searching on them... - // Filter might be stored in query cache - MapMaker mapMaker = new MapMaker().weakValues(); + MapMaker mapMaker = new MapMaker(); if (maxSize != -1) { mapMaker.maximumSize(maxSize); } @@ -75,6 +81,10 @@ public class WeakFilterCache extends AbstractConcurrentMapFilterCache implements return evictions.get(); } + @Override public long memEvictions() { + return memEvictions.get(); + } + @Override public void onEviction(Filter filter, DocSet docSet) { evictions.incrementAndGet(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/jmx/JmxService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/jmx/JmxService.java index 59326ef38b6..3075d4e2f98 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/jmx/JmxService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/jmx/JmxService.java @@ -222,13 +222,13 @@ public class JmxService { } } catch (InstanceAlreadyExistsException e) { //this might happen if multiple instances are trying to concurrently register same objectName - logger.warn("Could not register object with name:" + objectName + "(" + e.getMessage() + ")"); + logger.debug("Could not register object with name:" + objectName + "(" + e.getMessage() + ")"); } } else { - logger.warn("Could not register object with name: " + objectName); + logger.debug("Could not register object with name: " + objectName + ", already registered"); } } catch (Exception e) { - logger.warn("Could not register object with name: " + resourceDMBean.getFullObjectName()); + logger.warn("Could not register object with name: " + resourceDMBean.getFullObjectName() + "(" + e.getMessage() + ")"); } } diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/gcbehavior/FilterCacheGcStress.java b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/gcbehavior/FilterCacheGcStress.java index 58ba0b9bc0b..c0650d73c45 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/stress/gcbehavior/FilterCacheGcStress.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/stress/gcbehavior/FilterCacheGcStress.java @@ -59,7 +59,7 @@ public class FilterCacheGcStress { @Override public void run() { while (!stop.get()) { client.prepareSearch() - .setQuery(filteredQuery(matchAllQuery(), rangeFilter("field").from(System.currentTimeMillis()))) + .setQuery(filteredQuery(matchAllQuery(), rangeFilter("field").from(System.currentTimeMillis() - 1000000))) .execute().actionGet(); } }