Filter Cache: Introduce new `node` level filter cache and make it default, closes #959.
This commit is contained in:
parent
e948c366f9
commit
b61735d8ea
|
@ -22,7 +22,7 @@ package org.elasticsearch.index.cache.filter;
|
|||
import org.elasticsearch.common.inject.AbstractModule;
|
||||
import org.elasticsearch.common.inject.Scopes;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.cache.filter.soft.SoftFilterCache;
|
||||
import org.elasticsearch.index.cache.filter.node.NodeFilterCache;
|
||||
|
||||
/**
|
||||
* @author kimchy (Shay Banon)
|
||||
|
@ -41,7 +41,7 @@ public class FilterCacheModule extends AbstractModule {
|
|||
|
||||
@Override protected void configure() {
|
||||
bind(FilterCache.class)
|
||||
.to(settings.getAsClass(FilterCacheSettings.FILTER_CACHE_TYPE, SoftFilterCache.class, "org.elasticsearch.index.cache.filter.", "FilterCache"))
|
||||
.to(settings.getAsClass(FilterCacheSettings.FILTER_CACHE_TYPE, NodeFilterCache.class, "org.elasticsearch.index.cache.filter.", "FilterCache"))
|
||||
.in(Scopes.SINGLETON);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.cache.filter.node;
|
||||
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.lucene.docset.DocSet;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.cache.filter.support.AbstractWeightedFilterCache;
|
||||
import org.elasticsearch.index.cache.filter.support.FilterCacheValue;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.indices.cache.filter.IndicesNodeFilterCache;
|
||||
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
public class NodeFilterCache extends AbstractWeightedFilterCache {
|
||||
|
||||
private final IndicesNodeFilterCache indicesNodeFilterCache;
|
||||
|
||||
@Inject public NodeFilterCache(Index index, @IndexSettings Settings indexSettings, IndicesNodeFilterCache indicesNodeFilterCache) {
|
||||
super(index, indexSettings);
|
||||
this.indicesNodeFilterCache = indicesNodeFilterCache;
|
||||
|
||||
indicesNodeFilterCache.addEvictionListener(this);
|
||||
}
|
||||
|
||||
@Override public void close() throws ElasticSearchException {
|
||||
indicesNodeFilterCache.removeEvictionListener(this);
|
||||
super.close();
|
||||
}
|
||||
|
||||
@Override protected ConcurrentMap<FilterCacheKey, FilterCacheValue<DocSet>> cache() {
|
||||
return indicesNodeFilterCache.cache();
|
||||
}
|
||||
|
||||
@Override public String type() {
|
||||
return "node";
|
||||
}
|
||||
}
|
|
@ -21,15 +21,10 @@ package org.elasticsearch.index.cache.filter.support;
|
|||
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.search.DocIdSet;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.search.Filter;
|
||||
import org.apache.lucene.util.OpenBitSet;
|
||||
import org.elasticsearch.common.RamUsage;
|
||||
import org.elasticsearch.common.lab.LongsLAB;
|
||||
import org.elasticsearch.common.lucene.docset.DocSet;
|
||||
import org.elasticsearch.common.lucene.docset.DocSets;
|
||||
import org.elasticsearch.common.lucene.docset.OpenBitDocSet;
|
||||
import org.elasticsearch.common.lucene.docset.SlicedOpenBitSet;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
|
@ -51,7 +46,7 @@ import static org.elasticsearch.common.util.concurrent.ConcurrentCollections.*;
|
|||
*/
|
||||
public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComponent implements FilterCache, IndexReader.ReaderFinishedListener {
|
||||
|
||||
final ConcurrentMap<Object, ReaderValue> cache;
|
||||
final ConcurrentMap<Object, FilterCacheValue<ConcurrentMap<Filter, DocSet>>> cache;
|
||||
|
||||
final boolean labEnabled;
|
||||
final ByteSizeValue labMaxAlloc;
|
||||
|
@ -62,8 +57,6 @@ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComp
|
|||
|
||||
protected AbstractConcurrentMapFilterCache(Index index, @IndexSettings Settings indexSettings) {
|
||||
super(index, indexSettings);
|
||||
// weak keys is fine, it will only be cleared once IndexReader references will be removed
|
||||
// (assuming clear(...) will not be called)
|
||||
this.cache = buildCache();
|
||||
|
||||
// The LAB is stored per reader, so whole chunks will be cleared once reader is discarded.
|
||||
|
@ -81,8 +74,8 @@ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComp
|
|||
this.labChunkSizeBytes = (int) (labChunkSize.bytes() / RamUsage.NUM_BYTES_LONG);
|
||||
}
|
||||
|
||||
protected ConcurrentMap<Object, ReaderValue> buildCache() {
|
||||
return new ConcurrentHashMap<Object, ReaderValue>();
|
||||
protected ConcurrentMap<Object, FilterCacheValue<ConcurrentMap<Filter, DocSet>>> buildCache() {
|
||||
return new ConcurrentHashMap<Object, FilterCacheValue<ConcurrentMap<Filter, DocSet>>>();
|
||||
}
|
||||
|
||||
protected ConcurrentMap<Filter, DocSet> buildFilterMap() {
|
||||
|
@ -98,18 +91,18 @@ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComp
|
|||
}
|
||||
|
||||
@Override public void finished(IndexReader reader) {
|
||||
ReaderValue readerValue = cache.remove(reader.getCoreCacheKey());
|
||||
FilterCacheValue<ConcurrentMap<Filter, DocSet>> readerValue = cache.remove(reader.getCoreCacheKey());
|
||||
// help soft/weak handling GC
|
||||
if (readerValue != null) {
|
||||
readerValue.filters().clear();
|
||||
readerValue.value().clear();
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void clear(IndexReader reader) {
|
||||
ReaderValue readerValue = cache.remove(reader.getCoreCacheKey());
|
||||
FilterCacheValue<ConcurrentMap<Filter, DocSet>> readerValue = cache.remove(reader.getCoreCacheKey());
|
||||
// help soft/weak handling GC
|
||||
if (readerValue != null) {
|
||||
readerValue.filters().clear();
|
||||
readerValue.value().clear();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -117,9 +110,9 @@ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComp
|
|||
long sizeInBytes = 0;
|
||||
long totalCount = 0;
|
||||
int segmentsCount = 0;
|
||||
for (ReaderValue readerValue : cache.values()) {
|
||||
for (FilterCacheValue<ConcurrentMap<Filter, DocSet>> readerValue : cache.values()) {
|
||||
segmentsCount++;
|
||||
for (DocSet docSet : readerValue.filters().values()) {
|
||||
for (DocSet docSet : readerValue.value().values()) {
|
||||
sizeInBytes += docSet.sizeInBytes();
|
||||
totalCount++;
|
||||
}
|
||||
|
@ -154,27 +147,27 @@ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComp
|
|||
}
|
||||
|
||||
@Override public DocIdSet getDocIdSet(IndexReader reader) throws IOException {
|
||||
ReaderValue readerValue = cache.cache.get(reader.getCoreCacheKey());
|
||||
if (readerValue == null) {
|
||||
FilterCacheValue<ConcurrentMap<Filter, DocSet>> cacheValue = cache.cache.get(reader.getCoreCacheKey());
|
||||
if (cacheValue == null) {
|
||||
LongsLAB longsLAB = null;
|
||||
if (cache.labEnabled) {
|
||||
longsLAB = new LongsLAB(cache.labChunkSizeBytes, cache.labMaxAllocBytes);
|
||||
}
|
||||
readerValue = new ReaderValue(cache.buildFilterMap(), longsLAB);
|
||||
ReaderValue prev = cache.cache.putIfAbsent(reader.getCoreCacheKey(), readerValue);
|
||||
cacheValue = new FilterCacheValue<ConcurrentMap<Filter, DocSet>>(cache.buildFilterMap(), longsLAB);
|
||||
FilterCacheValue<ConcurrentMap<Filter, DocSet>> prev = cache.cache.putIfAbsent(reader.getCoreCacheKey(), cacheValue);
|
||||
if (prev != null) {
|
||||
readerValue = prev;
|
||||
cacheValue = prev;
|
||||
} else {
|
||||
reader.addReaderFinishedListener(cache);
|
||||
}
|
||||
}
|
||||
DocSet docSet = readerValue.filters().get(filter);
|
||||
DocSet docSet = cacheValue.value().get(filter);
|
||||
if (docSet != null) {
|
||||
return docSet;
|
||||
}
|
||||
DocIdSet docIdSet = filter.getDocIdSet(reader);
|
||||
docSet = cacheable(reader, readerValue, docIdSet);
|
||||
DocSet prev = readerValue.filters().putIfAbsent(filter, docSet);
|
||||
docSet = FilterCacheValue.cacheable(reader, cacheValue.longsLAB(), docIdSet);
|
||||
DocSet prev = cacheValue.value().putIfAbsent(filter, docSet);
|
||||
if (prev != null) {
|
||||
docSet = prev;
|
||||
}
|
||||
|
@ -193,65 +186,5 @@ public abstract class AbstractConcurrentMapFilterCache extends AbstractIndexComp
|
|||
public int hashCode() {
|
||||
return filter.hashCode() ^ 0x1117BF25;
|
||||
}
|
||||
|
||||
private DocSet cacheable(IndexReader reader, ReaderValue readerValue, DocIdSet set) throws IOException {
|
||||
if (set == null) {
|
||||
return DocSet.EMPTY_DOC_SET;
|
||||
}
|
||||
if (set == DocIdSet.EMPTY_DOCIDSET) {
|
||||
return DocSet.EMPTY_DOC_SET;
|
||||
}
|
||||
|
||||
DocIdSetIterator it = set.iterator();
|
||||
if (it == null) {
|
||||
return DocSet.EMPTY_DOC_SET;
|
||||
}
|
||||
int doc = it.nextDoc();
|
||||
if (doc == DocIdSetIterator.NO_MORE_DOCS) {
|
||||
return DocSet.EMPTY_DOC_SET;
|
||||
}
|
||||
|
||||
// we have a LAB, check if can be used...
|
||||
if (readerValue.longsLAB() == null) {
|
||||
return DocSets.cacheable(reader, set);
|
||||
}
|
||||
|
||||
int numOfWords = OpenBitSet.bits2words(reader.maxDoc());
|
||||
LongsLAB.Allocation allocation = readerValue.longsLAB().allocateLongs(numOfWords);
|
||||
if (allocation == null) {
|
||||
return DocSets.cacheable(reader, set);
|
||||
}
|
||||
// we have an allocation, use it to create SlicedOpenBitSet
|
||||
if (set instanceof OpenBitSet) {
|
||||
return new SlicedOpenBitSet(allocation.getData(), allocation.getOffset(), (OpenBitSet) set);
|
||||
} else if (set instanceof OpenBitDocSet) {
|
||||
return new SlicedOpenBitSet(allocation.getData(), allocation.getOffset(), ((OpenBitDocSet) set).set());
|
||||
} else {
|
||||
SlicedOpenBitSet slicedSet = new SlicedOpenBitSet(allocation.getData(), numOfWords, allocation.getOffset());
|
||||
slicedSet.fastSet(doc); // we already have an open iterator, so use it, and don't forget to set the initial one
|
||||
while ((doc = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
|
||||
slicedSet.fastSet(doc);
|
||||
}
|
||||
return slicedSet;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class ReaderValue {
|
||||
private final ConcurrentMap<Filter, DocSet> filters;
|
||||
private final LongsLAB longsLAB;
|
||||
|
||||
public ReaderValue(ConcurrentMap<Filter, DocSet> filters, LongsLAB longsLAB) {
|
||||
this.filters = filters;
|
||||
this.longsLAB = longsLAB;
|
||||
}
|
||||
|
||||
public ConcurrentMap<Filter, DocSet> filters() {
|
||||
return filters;
|
||||
}
|
||||
|
||||
public LongsLAB longsLAB() {
|
||||
return longsLAB;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,248 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.cache.filter.support;
|
||||
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.search.DocIdSet;
|
||||
import org.apache.lucene.search.Filter;
|
||||
import org.elasticsearch.ElasticSearchException;
|
||||
import org.elasticsearch.common.RamUsage;
|
||||
import org.elasticsearch.common.collect.Sets;
|
||||
import org.elasticsearch.common.concurrentlinkedhashmap.EvictionListener;
|
||||
import org.elasticsearch.common.concurrentlinkedhashmap.Weigher;
|
||||
import org.elasticsearch.common.lab.LongsLAB;
|
||||
import org.elasticsearch.common.lucene.docset.DocSet;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||
import org.elasticsearch.index.AbstractIndexComponent;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.cache.filter.FilterCache;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public abstract class AbstractWeightedFilterCache extends AbstractIndexComponent implements FilterCache, IndexReader.ReaderFinishedListener, EvictionListener<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>> {
|
||||
|
||||
final ConcurrentMap<Object, Boolean> seenReaders = ConcurrentCollections.newConcurrentMap();
|
||||
|
||||
final boolean labEnabled;
|
||||
final ByteSizeValue labMaxAlloc;
|
||||
final ByteSizeValue labChunkSize;
|
||||
|
||||
final int labMaxAllocBytes;
|
||||
final int labChunkSizeBytes;
|
||||
|
||||
protected final AtomicLong evictions = new AtomicLong();
|
||||
|
||||
protected AbstractWeightedFilterCache(Index index, @IndexSettings Settings indexSettings) {
|
||||
super(index, indexSettings);
|
||||
|
||||
// The LAB is stored per reader, so whole chunks will be cleared once reader is discarded.
|
||||
// This means that with filter entry specific based eviction, like access time
|
||||
// we might get into cases where the LAB is held by a puny filter and other filters have been released.
|
||||
// This usually will not be that bad, compared to the GC benefit of using a LAB, but, that is why
|
||||
// the soft filter cache is recommended.
|
||||
this.labEnabled = componentSettings.getAsBoolean("lab", false);
|
||||
// These values should not be too high, basically we want to cached the small readers and use the LAB for
|
||||
// them, 1M docs on OpenBitSet is around 110kb.
|
||||
this.labMaxAlloc = componentSettings.getAsBytesSize("lab.max_alloc", new ByteSizeValue(128, ByteSizeUnit.KB));
|
||||
this.labChunkSize = componentSettings.getAsBytesSize("lab.chunk_size", new ByteSizeValue(1, ByteSizeUnit.MB));
|
||||
|
||||
this.labMaxAllocBytes = (int) (labMaxAlloc.bytes() / RamUsage.NUM_BYTES_LONG);
|
||||
this.labChunkSizeBytes = (int) (labChunkSize.bytes() / RamUsage.NUM_BYTES_LONG);
|
||||
}
|
||||
|
||||
protected abstract ConcurrentMap<FilterCacheKey, FilterCacheValue<DocSet>> cache();
|
||||
|
||||
@Override public void close() throws ElasticSearchException {
|
||||
clear();
|
||||
}
|
||||
|
||||
@Override public void clear() {
|
||||
for (Object readerKey : seenReaders.keySet()) {
|
||||
Boolean removed = seenReaders.remove(readerKey);
|
||||
if (removed == null) {
|
||||
return;
|
||||
}
|
||||
ConcurrentMap<FilterCacheKey, FilterCacheValue<DocSet>> cache = cache();
|
||||
for (FilterCacheKey key : cache.keySet()) {
|
||||
if (key.readerKey() == readerKey) {
|
||||
cache.remove(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void finished(IndexReader reader) {
|
||||
clear(reader);
|
||||
}
|
||||
|
||||
@Override public void clear(IndexReader reader) {
|
||||
// we add the seen reader before we add the first cache entry for this reader
|
||||
// so, if we don't see it here, its won't be in the cache
|
||||
Boolean removed = seenReaders.remove(reader.getCoreCacheKey());
|
||||
if (removed == null) {
|
||||
return;
|
||||
}
|
||||
ConcurrentMap<FilterCacheKey, FilterCacheValue<DocSet>> cache = cache();
|
||||
for (FilterCacheKey key : cache.keySet()) {
|
||||
if (key.readerKey() == reader.getCoreCacheKey()) {
|
||||
cache.remove(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override public EntriesStats entriesStats() {
|
||||
long sizeInBytes = 0;
|
||||
long totalCount = 0;
|
||||
Set<Object> segmentsCount = Sets.newHashSet();
|
||||
for (Map.Entry<FilterCacheKey, FilterCacheValue<DocSet>> entry : cache().entrySet()) {
|
||||
// if its not a reader associated with this index, then don't aggreagte stats for it
|
||||
if (!seenReaders.containsKey(entry.getKey().readerKey())) {
|
||||
continue;
|
||||
}
|
||||
if (!segmentsCount.contains(entry.getKey().readerKey())) {
|
||||
segmentsCount.add(entry.getKey().readerKey());
|
||||
}
|
||||
sizeInBytes += entry.getValue().value().sizeInBytes();
|
||||
totalCount++;
|
||||
}
|
||||
return new EntriesStats(sizeInBytes, segmentsCount.size() == 0 ? 0 : totalCount / segmentsCount.size());
|
||||
}
|
||||
|
||||
@Override public long evictions() {
|
||||
return evictions.get();
|
||||
}
|
||||
|
||||
@Override public Filter cache(Filter filterToCache) {
|
||||
if (isCached(filterToCache)) {
|
||||
return filterToCache;
|
||||
}
|
||||
return new FilterCacheFilterWrapper(filterToCache, this);
|
||||
}
|
||||
|
||||
@Override public boolean isCached(Filter filter) {
|
||||
return filter instanceof FilterCacheFilterWrapper;
|
||||
}
|
||||
|
||||
static class FilterCacheFilterWrapper extends Filter {
|
||||
|
||||
private final Filter filter;
|
||||
|
||||
private final AbstractWeightedFilterCache cache;
|
||||
|
||||
FilterCacheFilterWrapper(Filter filter, AbstractWeightedFilterCache cache) {
|
||||
this.filter = filter;
|
||||
this.cache = cache;
|
||||
}
|
||||
|
||||
@Override public DocIdSet getDocIdSet(IndexReader reader) throws IOException {
|
||||
FilterCacheKey cacheKey = new FilterCacheKey(reader.getCoreCacheKey(), filter);
|
||||
ConcurrentMap<FilterCacheKey, FilterCacheValue<DocSet>> innerCache = cache.cache();
|
||||
|
||||
FilterCacheValue<DocSet> cacheValue = innerCache.get(cacheKey);
|
||||
if (cacheValue == null) {
|
||||
if (!cache.seenReaders.containsKey(reader.getCoreCacheKey())) {
|
||||
reader.addReaderFinishedListener(cache);
|
||||
cache.seenReaders.put(reader.getCoreCacheKey(), Boolean.TRUE);
|
||||
}
|
||||
|
||||
LongsLAB longsLAB = null;
|
||||
if (cache.labEnabled) {
|
||||
longsLAB = new LongsLAB(cache.labChunkSizeBytes, cache.labMaxAllocBytes);
|
||||
}
|
||||
DocIdSet docIdSet = filter.getDocIdSet(reader);
|
||||
DocSet docSet = FilterCacheValue.cacheable(reader, longsLAB, docIdSet);
|
||||
cacheValue = new FilterCacheValue<DocSet>(docSet, longsLAB);
|
||||
innerCache.putIfAbsent(cacheKey, cacheValue);
|
||||
}
|
||||
|
||||
return cacheValue.value();
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return "FilterCacheFilterWrapper(" + filter + ")";
|
||||
}
|
||||
|
||||
public boolean equals(Object o) {
|
||||
if (!(o instanceof FilterCacheFilterWrapper)) return false;
|
||||
return this.filter.equals(((FilterCacheFilterWrapper) o).filter);
|
||||
}
|
||||
|
||||
public int hashCode() {
|
||||
return filter.hashCode() ^ 0x1117BF25;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// factored by 10
|
||||
public static class FilterCacheValueWeigher implements Weigher<FilterCacheValue<DocSet>> {
|
||||
|
||||
public static final long FACTOR = 10l;
|
||||
|
||||
@Override public int weightOf(FilterCacheValue<DocSet> value) {
|
||||
int weight = (int) Math.min(value.value().sizeInBytes() / 10, Integer.MAX_VALUE);
|
||||
return weight == 0 ? 1 : weight;
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void onEviction(FilterCacheKey filterCacheKey, FilterCacheValue<DocSet> docSetFilterCacheValue) {
|
||||
if (filterCacheKey != null) {
|
||||
if (seenReaders.containsKey(filterCacheKey.readerKey())) {
|
||||
evictions.incrementAndGet();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class FilterCacheKey {
|
||||
private final Object readerKey;
|
||||
private final Filter filter;
|
||||
|
||||
public FilterCacheKey(Object readerKey, Filter filter) {
|
||||
this.readerKey = readerKey;
|
||||
this.filter = filter;
|
||||
}
|
||||
|
||||
public Object readerKey() {
|
||||
return readerKey;
|
||||
}
|
||||
|
||||
public Filter filter() {
|
||||
return filter;
|
||||
}
|
||||
|
||||
@Override public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
// if (o == null || getClass() != o.getClass()) return false;
|
||||
FilterCacheKey that = (FilterCacheKey) o;
|
||||
return (readerKey == that.readerKey && filter.equals(that.filter));
|
||||
}
|
||||
|
||||
@Override public int hashCode() {
|
||||
return readerKey.hashCode() + 31 * filter().hashCode();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.cache.filter.support;
|
||||
|
||||
public class FilterCacheHelper {
|
||||
}
|
|
@ -0,0 +1,95 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index.cache.filter.support;
|
||||
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.search.DocIdSet;
|
||||
import org.apache.lucene.search.DocIdSetIterator;
|
||||
import org.apache.lucene.util.OpenBitSet;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.lab.LongsLAB;
|
||||
import org.elasticsearch.common.lucene.docset.DocSet;
|
||||
import org.elasticsearch.common.lucene.docset.DocSets;
|
||||
import org.elasticsearch.common.lucene.docset.OpenBitDocSet;
|
||||
import org.elasticsearch.common.lucene.docset.SlicedOpenBitSet;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class FilterCacheValue<T> {
|
||||
|
||||
private final T value;
|
||||
private final LongsLAB longsLAB;
|
||||
|
||||
public FilterCacheValue(T value, LongsLAB longsLAB) {
|
||||
this.value = value;
|
||||
this.longsLAB = longsLAB;
|
||||
}
|
||||
|
||||
public T value() {
|
||||
return value;
|
||||
}
|
||||
|
||||
public LongsLAB longsLAB() {
|
||||
return longsLAB;
|
||||
}
|
||||
|
||||
|
||||
public static DocSet cacheable(IndexReader reader, @Nullable LongsLAB longsLAB, DocIdSet set) throws IOException {
|
||||
if (set == null) {
|
||||
return DocSet.EMPTY_DOC_SET;
|
||||
}
|
||||
if (set == DocIdSet.EMPTY_DOCIDSET) {
|
||||
return DocSet.EMPTY_DOC_SET;
|
||||
}
|
||||
|
||||
DocIdSetIterator it = set.iterator();
|
||||
if (it == null) {
|
||||
return DocSet.EMPTY_DOC_SET;
|
||||
}
|
||||
int doc = it.nextDoc();
|
||||
if (doc == DocIdSetIterator.NO_MORE_DOCS) {
|
||||
return DocSet.EMPTY_DOC_SET;
|
||||
}
|
||||
|
||||
// we have a LAB, check if can be used...
|
||||
if (longsLAB == null) {
|
||||
return DocSets.cacheable(reader, set);
|
||||
}
|
||||
|
||||
int numOfWords = OpenBitSet.bits2words(reader.maxDoc());
|
||||
LongsLAB.Allocation allocation = longsLAB.allocateLongs(numOfWords);
|
||||
if (allocation == null) {
|
||||
return DocSets.cacheable(reader, set);
|
||||
}
|
||||
// we have an allocation, use it to create SlicedOpenBitSet
|
||||
if (set instanceof OpenBitSet) {
|
||||
return new SlicedOpenBitSet(allocation.getData(), allocation.getOffset(), (OpenBitSet) set);
|
||||
} else if (set instanceof OpenBitDocSet) {
|
||||
return new SlicedOpenBitSet(allocation.getData(), allocation.getOffset(), ((OpenBitDocSet) set).set());
|
||||
} else {
|
||||
SlicedOpenBitSet slicedSet = new SlicedOpenBitSet(allocation.getData(), numOfWords, allocation.getOffset());
|
||||
slicedSet.fastSet(doc); // we already have an open iterator, so use it, and don't forget to set the initial one
|
||||
while ((doc = it.nextDoc()) != DocIdSetIterator.NO_MORE_DOCS) {
|
||||
slicedSet.fastSet(doc);
|
||||
}
|
||||
return slicedSet;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.index.shard.recovery.RecoverySource;
|
||||
import org.elasticsearch.index.shard.recovery.RecoveryTarget;
|
||||
import org.elasticsearch.indices.analysis.IndicesAnalysisService;
|
||||
import org.elasticsearch.indices.cache.filter.IndicesNodeFilterCache;
|
||||
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
|
||||
import org.elasticsearch.indices.memory.IndexingMemoryBufferController;
|
||||
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
|
||||
|
@ -49,6 +50,7 @@ public class IndicesModule extends AbstractModule {
|
|||
|
||||
bind(IndicesClusterStateService.class).asEagerSingleton();
|
||||
bind(IndexingMemoryBufferController.class).asEagerSingleton();
|
||||
bind(IndicesNodeFilterCache.class).asEagerSingleton();
|
||||
bind(IndicesAnalysisService.class).asEagerSingleton();
|
||||
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
/*
|
||||
* Licensed to Elastic Search and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. Elastic Search licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.indices.cache.filter;
|
||||
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
|
||||
import org.elasticsearch.common.concurrentlinkedhashmap.EvictionListener;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.lucene.docset.DocSet;
|
||||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.index.cache.filter.support.AbstractWeightedFilterCache;
|
||||
import org.elasticsearch.index.cache.filter.support.FilterCacheValue;
|
||||
import org.elasticsearch.monitor.jvm.JvmInfo;
|
||||
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
public class IndicesNodeFilterCache extends AbstractComponent implements EvictionListener<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>> {
|
||||
|
||||
private ConcurrentMap<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>> cache;
|
||||
|
||||
private volatile long sizeInBytes;
|
||||
|
||||
private final CopyOnWriteArrayList<EvictionListener<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>>> evictionListeners =
|
||||
new CopyOnWriteArrayList<EvictionListener<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>>>();
|
||||
|
||||
public IndicesNodeFilterCache() {
|
||||
this(ImmutableSettings.Builder.EMPTY_SETTINGS);
|
||||
}
|
||||
|
||||
@Inject public IndicesNodeFilterCache(Settings settings) {
|
||||
super(settings);
|
||||
|
||||
String size = componentSettings.get("size", "20%");
|
||||
if (size.endsWith("%")) {
|
||||
double percent = Double.parseDouble(size.substring(0, size.length() - 1));
|
||||
sizeInBytes = (long) ((percent / 100) * JvmInfo.jvmInfo().getMem().getHeapMax().bytes());
|
||||
} else {
|
||||
sizeInBytes = ByteSizeValue.parseBytesSizeValue(size).bytes();
|
||||
}
|
||||
|
||||
int weightedSize = (int) Math.min(sizeInBytes / AbstractWeightedFilterCache.FilterCacheValueWeigher.FACTOR, Integer.MAX_VALUE);
|
||||
|
||||
cache = new ConcurrentLinkedHashMap.Builder<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>>()
|
||||
.maximumWeightedCapacity(weightedSize)
|
||||
.weigher(new AbstractWeightedFilterCache.FilterCacheValueWeigher())
|
||||
.listener(this)
|
||||
.build();
|
||||
|
||||
logger.debug("using [node] filter cache with size [{}]", new ByteSizeValue(sizeInBytes));
|
||||
}
|
||||
|
||||
public void addEvictionListener(EvictionListener<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>> listener) {
|
||||
evictionListeners.add(listener);
|
||||
}
|
||||
|
||||
public void removeEvictionListener(EvictionListener<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>> listener) {
|
||||
evictionListeners.remove(listener);
|
||||
}
|
||||
|
||||
public void close() {
|
||||
cache.clear();
|
||||
}
|
||||
|
||||
public ConcurrentMap<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>> cache() {
|
||||
return this.cache;
|
||||
}
|
||||
|
||||
@Override public void onEviction(AbstractWeightedFilterCache.FilterCacheKey filterCacheKey, FilterCacheValue<DocSet> docSetFilterCacheValue) {
|
||||
for (EvictionListener<AbstractWeightedFilterCache.FilterCacheKey, FilterCacheValue<DocSet>> listener : evictionListeners) {
|
||||
listener.onEviction(filterCacheKey, docSetFilterCacheValue);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -58,6 +58,7 @@ import org.elasticsearch.http.HttpServer;
|
|||
import org.elasticsearch.http.HttpServerModule;
|
||||
import org.elasticsearch.indices.IndicesModule;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.cache.filter.IndicesNodeFilterCache;
|
||||
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
|
||||
import org.elasticsearch.indices.memory.IndexingMemoryBufferController;
|
||||
import org.elasticsearch.jmx.JmxModule;
|
||||
|
@ -257,6 +258,7 @@ public final class InternalNode implements Node {
|
|||
stopWatch.stop().start("indices_cluster");
|
||||
injector.getInstance(IndicesClusterStateService.class).close();
|
||||
stopWatch.stop().start("indices");
|
||||
injector.getInstance(IndicesNodeFilterCache.class).close();
|
||||
injector.getInstance(IndexingMemoryBufferController.class).close();
|
||||
injector.getInstance(IndicesService.class).close();
|
||||
stopWatch.stop().start("routing");
|
||||
|
|
|
@ -9,6 +9,7 @@ dependencies {
|
|||
runtime 'com.google.inject.extensions:guice-assisted-inject:2.0'
|
||||
runtime 'com.google.inject.extensions:guice-multibindings:2.0'
|
||||
|
||||
runtime 'com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.2'
|
||||
runtime 'com.google.guava:guava:r09'
|
||||
runtime 'org.elasticsearch:es-trove:3.0.0rc1'
|
||||
runtime 'org.elasticsearch:es-jsr166y:20110209'
|
||||
|
@ -40,12 +41,13 @@ jar << {
|
|||
jarjar(jarfile: jarjarArchivePath) {
|
||||
zipfileset(src: jar.archivePath)
|
||||
configurations.runtime.files.findAll {file ->
|
||||
['inject', 'codec', 'mvel', 'jackson', 'joda', 'snakeyaml', 'netty', 'guice', 'javax.inject', 'aopalliance', 'guava', 'trove', 'jsr166y'].any { file.name.contains(it) }
|
||||
['inject', 'codec', 'mvel', 'jackson', 'joda', 'snakeyaml', 'netty', 'guice', 'javax.inject', 'aopalliance', 'guava', 'concurrentlinkedhashmap', 'trove', 'jsr166y'].any { file.name.contains(it) }
|
||||
}.each { jarjarFile ->
|
||||
zipfileset(src: jarjarFile) {
|
||||
exclude(name: "META-INF/**")
|
||||
}
|
||||
}
|
||||
rule pattern: "com.googlecode.concurrentlinkedhashmap.**", result: "org.elasticsearch.common.concurrentlinkedhashmap.@1"
|
||||
rule pattern: "com.google.common.**", result: "org.elasticsearch.common.@1"
|
||||
rule pattern: "gnu.trove.**", result: "org.elasticsearch.common.trove.@1"
|
||||
rule pattern: "jsr166y.**", result: "org.elasticsearch.common.util.concurrent.jsr166y.@1"
|
||||
|
|
|
@ -199,6 +199,7 @@ public class DocumentActionsTests extends AbstractNodesTests {
|
|||
for (int i = 0; i < 5; i++) {
|
||||
// test successful
|
||||
CountResponse countResponse = client1.prepareCount("test").setQuery(termQuery("_type", "type1")).setOperationThreading(BroadcastOperationThreading.NO_THREADS).execute().actionGet();
|
||||
assertThat("Failures " + countResponse.shardFailures(), countResponse.shardFailures().size(), equalTo(0));
|
||||
assertThat(countResponse.count(), equalTo(2l));
|
||||
assertThat(countResponse.successfulShards(), equalTo(5));
|
||||
assertThat(countResponse.failedShards(), equalTo(0));
|
||||
|
|
Loading…
Reference in New Issue