Filter / Id Cache Stats: Add to Indices Stats API, revise node stats API

closes #2862
This commit is contained in:
Shay Banon 2013-04-05 20:02:22 +02:00
parent 5e7ad9832c
commit 84670212a6
31 changed files with 790 additions and 276 deletions

View File

@ -158,7 +158,6 @@ public class TransportClearIndicesCacheAction extends TransportBroadcastOperatio
termsFilterCache.clear("api");
}
}
service.cache().invalidateStatsCache();
}
return new ShardClearIndicesCacheResponse(request.index(), request.shardId());
}

View File

@ -25,6 +25,8 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.cache.filter.FilterCacheStats;
import org.elasticsearch.index.cache.id.IdCacheStats;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
import org.elasticsearch.index.indexing.IndexingStats;
@ -68,6 +70,12 @@ public class CommonStats implements Streamable, ToXContent {
@Nullable
WarmerStats warmer;
@Nullable
FilterCacheStats filterCache;
@Nullable
IdCacheStats idCache;
public void add(CommonStats stats) {
if (docs == null) {
if (stats.getDocs() != null) {
@ -141,6 +149,23 @@ public class CommonStats implements Streamable, ToXContent {
} else {
warmer.add(stats.getWarmer());
}
if (filterCache == null) {
if (stats.getFilterCache() != null) {
filterCache = new FilterCacheStats();
filterCache.add(stats.getFilterCache());
}
} else {
filterCache.add(stats.getFilterCache());
}
if (idCache == null) {
if (stats.getIdCache() != null) {
idCache = new IdCacheStats();
idCache.add(stats.getIdCache());
}
} else {
idCache.add(stats.getIdCache());
}
}
@Nullable
@ -188,6 +213,16 @@ public class CommonStats implements Streamable, ToXContent {
return this.warmer;
}
@Nullable
public FilterCacheStats getFilterCache() {
return this.filterCache;
}
@Nullable
public IdCacheStats getIdCache() {
return this.idCache;
}
public static CommonStats readCommonStats(StreamInput in) throws IOException {
CommonStats stats = new CommonStats();
stats.readFrom(in);
@ -223,6 +258,12 @@ public class CommonStats implements Streamable, ToXContent {
if (in.readBoolean()) {
warmer = WarmerStats.readWarmerStats(in);
}
if (in.readBoolean()) {
filterCache = FilterCacheStats.readFilterCacheStats(in);
}
if (in.readBoolean()) {
idCache = IdCacheStats.readIdCacheStats(in);
}
}
@Override
@ -281,6 +322,19 @@ public class CommonStats implements Streamable, ToXContent {
out.writeBoolean(true);
warmer.writeTo(out);
}
if (filterCache == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
filterCache.writeTo(out);
}
if (idCache == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
idCache.writeTo(out);
}
}
// note, requires a wrapping object
@ -313,6 +367,12 @@ public class CommonStats implements Streamable, ToXContent {
if (warmer != null) {
warmer.toXContent(builder, params);
}
if (filterCache != null) {
filterCache.toXContent(builder, params);
}
if (idCache != null) {
idCache.toXContent(builder, params);
}
return builder;
}
}

View File

@ -45,6 +45,8 @@ public class IndicesStatsRequest extends BroadcastOperationRequest<IndicesStatsR
private boolean refresh = false;
private boolean flush = false;
private boolean warmer = false;
private boolean filterCache = false;
private boolean idCache = false;
private String[] types = null;
private String[] groups = null;
@ -61,6 +63,8 @@ public class IndicesStatsRequest extends BroadcastOperationRequest<IndicesStatsR
refresh = true;
flush = true;
warmer = true;
filterCache = true;
idCache = true;
types = null;
groups = null;
return this;
@ -79,6 +83,8 @@ public class IndicesStatsRequest extends BroadcastOperationRequest<IndicesStatsR
refresh = false;
flush = false;
warmer = false;
filterCache = false;
idCache = false;
types = null;
groups = null;
return this;
@ -195,6 +201,24 @@ public class IndicesStatsRequest extends BroadcastOperationRequest<IndicesStatsR
return this.warmer;
}
public IndicesStatsRequest filterCache(boolean filterCache) {
this.filterCache = filterCache;
return this;
}
public boolean filterCache() {
return this.filterCache;
}
public IndicesStatsRequest idCache(boolean idCache) {
this.idCache = idCache;
return this;
}
public boolean idCache() {
return this.idCache;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
@ -207,6 +231,8 @@ public class IndicesStatsRequest extends BroadcastOperationRequest<IndicesStatsR
out.writeBoolean(flush);
out.writeBoolean(refresh);
out.writeBoolean(warmer);
out.writeBoolean(filterCache);
out.writeBoolean(idCache);
if (types == null) {
out.writeVInt(0);
} else {
@ -237,6 +263,8 @@ public class IndicesStatsRequest extends BroadcastOperationRequest<IndicesStatsR
flush = in.readBoolean();
refresh = in.readBoolean();
warmer = in.readBoolean();
filterCache = in.readBoolean();
idCache = in.readBoolean();
int size = in.readVInt();
if (size > 0) {
types = new String[size];

View File

@ -114,6 +114,16 @@ public class IndicesStatsRequestBuilder extends BroadcastOperationRequestBuilder
return this;
}
public IndicesStatsRequestBuilder setFilterCache(boolean filterCache) {
request.filterCache(filterCache);
return this;
}
public IndicesStatsRequestBuilder setIdCache(boolean idCache) {
request.idCache(idCache);
return this;
}
@Override
protected void doExecute(ActionListener<IndicesStatsResponse> listener) {
((IndicesAdminClient) client).stats(request, listener);

View File

@ -149,6 +149,8 @@ public class IndicesStatsResponse extends BroadcastOperationResponse implements
getTotal().toXContent(builder, params);
builder.endObject();
builder.endObject();
builder.startObject(Fields.INDICES);
for (IndexStats indexStats : getIndices().values()) {
builder.startObject(indexStats.getIndex(), XContentBuilder.FieldCaseConversion.NONE);
@ -188,7 +190,6 @@ public class IndicesStatsResponse extends BroadcastOperationResponse implements
}
builder.endObject();
builder.endObject();
return builder;
}

View File

@ -172,6 +172,12 @@ public class TransportIndicesStatsAction extends TransportBroadcastOperationActi
if (request.request.warmer()) {
stats.stats.warmer = indexShard.warmerStats();
}
if (request.request.filterCache()) {
stats.stats.filterCache = indexShard.filterCacheStats();
}
if (request.request.idCache()) {
stats.stats.idCache = indexShard.idCacheStats();
}
return stats;
}

View File

@ -1,131 +0,0 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.cache;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
/**
*
*/
public class CacheStats implements Streamable, ToXContent {
long filterEvictions;
long filterCount;
long filterSize;
long idCacheSize;
public CacheStats() {
}
public CacheStats(long filterEvictions, long filterSize, long filterCount, long idCacheSize) {
this.filterEvictions = filterEvictions;
this.filterSize = filterSize;
this.filterCount = filterCount;
this.idCacheSize = idCacheSize;
}
public void add(CacheStats stats) {
this.filterEvictions += stats.filterEvictions;
this.filterSize += stats.filterSize;
this.filterCount += stats.filterCount;
this.idCacheSize += stats.idCacheSize;
}
public long getFilterEvictions() {
return this.filterEvictions;
}
public long getFilterMemEvictions() {
return this.filterEvictions;
}
public long getFilterCount() {
return this.filterCount;
}
public long getFilterSizeInBytes() {
return this.filterSize;
}
public ByteSizeValue getFilterSize() {
return new ByteSizeValue(filterSize);
}
public long getIdCacheSizeInBytes() {
return idCacheSize;
}
public ByteSizeValue getIdCacheSize() {
return new ByteSizeValue(idCacheSize);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.CACHE);
builder.field(Fields.FILTER_COUNT, filterCount);
builder.field(Fields.FILTER_EVICTIONS, filterEvictions);
builder.field(Fields.FILTER_SIZE, getFilterSize().toString());
builder.field(Fields.FILTER_SIZE_IN_BYTES, filterSize);
builder.field(Fields.ID_CACHE_SIZE, getIdCacheSize().toString());
builder.field(Fields.ID_CACHE_SIZE_IN_BYTES, idCacheSize);
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString CACHE = new XContentBuilderString("cache");
static final XContentBuilderString FILTER_EVICTIONS = new XContentBuilderString("filter_evictions");
static final XContentBuilderString FILTER_COUNT = new XContentBuilderString("filter_count");
static final XContentBuilderString FILTER_SIZE = new XContentBuilderString("filter_size");
static final XContentBuilderString FILTER_SIZE_IN_BYTES = new XContentBuilderString("filter_size_in_bytes");
static final XContentBuilderString ID_CACHE_SIZE = new XContentBuilderString("id_cache_size");
static final XContentBuilderString ID_CACHE_SIZE_IN_BYTES = new XContentBuilderString("id_cache_size_in_bytes");
}
public static CacheStats readCacheStats(StreamInput in) throws IOException {
CacheStats stats = new CacheStats();
stats.readFrom(in);
return stats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
filterEvictions = in.readVLong();
filterSize = in.readVLong();
filterCount = in.readVLong();
idCacheSize = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(filterEvictions);
out.writeVLong(filterSize);
out.writeVLong(filterCount);
out.writeVLong(idCacheSize);
}
}

View File

@ -28,7 +28,6 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.CloseableComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.cache.docset.DocSetCache;
@ -47,12 +46,8 @@ public class IndexCache extends AbstractIndexComponent implements CloseableCompo
private final IdCache idCache;
private final DocSetCache docSetCache;
private final TimeValue refreshInterval;
private ClusterService clusterService;
private long latestCacheStatsTimestamp = -1;
private CacheStats latestCacheStats;
@Inject
public IndexCache(Index index, @IndexSettings Settings indexSettings, FilterCache filterCache, QueryParserCache queryParserCache, IdCache idCache,
DocSetCache docSetCache) {
@ -61,10 +56,6 @@ public class IndexCache extends AbstractIndexComponent implements CloseableCompo
this.queryParserCache = queryParserCache;
this.idCache = idCache;
this.docSetCache = docSetCache;
this.refreshInterval = componentSettings.getAsTime("stats.refresh_interval", TimeValue.timeValueSeconds(1));
logger.debug("Using stats.refresh_interval [{}]", refreshInterval);
}
@Inject(optional = true)
@ -75,22 +66,6 @@ public class IndexCache extends AbstractIndexComponent implements CloseableCompo
}
}
public synchronized void invalidateStatsCache() {
FilterCache.EntriesStats filterEntriesStats = filterCache.entriesStats();
latestCacheStats = new CacheStats(filterCache.evictions(), filterEntriesStats.sizeInBytes, filterEntriesStats.count, idCache.sizeInBytes());
latestCacheStatsTimestamp = System.currentTimeMillis();
}
public synchronized CacheStats stats() {
long timestamp = System.currentTimeMillis();
if ((timestamp - latestCacheStatsTimestamp) > refreshInterval.millis()) {
FilterCache.EntriesStats filterEntriesStats = filterCache.entriesStats();
latestCacheStats = new CacheStats(filterCache.evictions(), filterEntriesStats.sizeInBytes, filterEntriesStats.count, idCache.sizeInBytes());
latestCacheStatsTimestamp = timestamp;
}
return latestCacheStats;
}
public FilterCache filter() {
return filterCache;
}

View File

@ -23,6 +23,7 @@ import org.apache.lucene.index.IndexReader;
import org.apache.lucene.search.Filter;
import org.elasticsearch.common.component.CloseableComponent;
import org.elasticsearch.index.IndexComponent;
import org.elasticsearch.index.service.IndexService;
/**
*
@ -39,6 +40,9 @@ public interface FilterCache extends IndexComponent, CloseableComponent {
}
}
// we need to "inject" the index service to not create cyclic dep
void setIndexService(IndexService indexService);
String type();
Filter cache(Filter filterToCache);
@ -48,8 +52,4 @@ public interface FilterCache extends IndexComponent, CloseableComponent {
void clear(String reason);
void clear(String reason, String[] keys);
EntriesStats entriesStats();
long evictions();
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.cache.filter;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
/**
*/
public class FilterCacheStats implements Streamable, ToXContent {
long memorySize;
long evictions;
public FilterCacheStats() {
}
public FilterCacheStats(long memorySize, long evictions) {
this.memorySize = memorySize;
this.evictions = evictions;
}
public void add(FilterCacheStats stats) {
this.memorySize += stats.memorySize;
this.evictions += stats.evictions;
}
public long getMemorySizeInBytes() {
return this.memorySize;
}
public ByteSizeValue getMemorySize() {
return new ByteSizeValue(memorySize);
}
public long getEvictions() {
return this.evictions;
}
public static FilterCacheStats readFilterCacheStats(StreamInput in) throws IOException {
FilterCacheStats stats = new FilterCacheStats();
stats.readFrom(in);
return stats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
memorySize = in.readVLong();
evictions = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(memorySize);
out.writeVLong(evictions);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject(Fields.FILTER_CACHE);
builder.field(Fields.MEMORY_SIZE, getMemorySize().toString());
builder.field(Fields.MEMORY_SIZE_IN_BYTES, memorySize);
builder.field(Fields.EVICTIONS, getEvictions());
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString FILTER_CACHE = new XContentBuilderString("filter_cache");
static final XContentBuilderString MEMORY_SIZE = new XContentBuilderString("memory_size");
static final XContentBuilderString MEMORY_SIZE_IN_BYTES = new XContentBuilderString("memory_size_in_bytes");
static final XContentBuilderString EVICTIONS = new XContentBuilderString("evictions");
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.cache.filter;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.cache.filter.weighted.WeightedFilterCache;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
/**
*/
public class ShardFilterCache extends AbstractIndexShardComponent {
final CounterMetric evictionsMetric = new CounterMetric();
final CounterMetric totalMetric = new CounterMetric();
@Inject
public ShardFilterCache(ShardId shardId, @IndexSettings Settings indexSettings) {
super(shardId, indexSettings);
}
public FilterCacheStats stats() {
return new FilterCacheStats(totalMetric.count(), evictionsMetric.count());
}
public void onCached(WeightedFilterCache.FilterCacheKey cacheKey, long sizeInBytes) {
totalMetric.inc(sizeInBytes);
}
public void onRemoval(WeightedFilterCache.FilterCacheKey cacheKey, boolean evicted, long sizeInBytes) {
if (evicted) {
evictionsMetric.inc();
}
totalMetric.dec(sizeInBytes);
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.cache.filter;
import org.elasticsearch.common.inject.AbstractModule;
/**
*/
public class ShardFilterCacheModule extends AbstractModule {
@Override
protected void configure() {
bind(ShardFilterCache.class).asEagerSingleton();
}
}

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.cache.filter.FilterCache;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
/**
@ -39,6 +40,11 @@ public class NoneFilterCache extends AbstractIndexComponent implements FilterCac
logger.debug("Using no filter cache");
}
@Override
public void setIndexService(IndexService indexService) {
// nothing to do here...
}
@Override
public String type() {
return "none";
@ -68,14 +74,4 @@ public class NoneFilterCache extends AbstractIndexComponent implements FilterCac
public void clear(IndexReader reader) {
// nothing to do here
}
@Override
public EntriesStats entriesStats() {
return new EntriesStats(0, 0);
}
@Override
public long evictions() {
return 0;
}
}

View File

@ -35,15 +35,17 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.lucene.docset.DocIdSets;
import org.elasticsearch.common.lucene.search.CachedFilter;
import org.elasticsearch.common.lucene.search.NoCacheFilter;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.cache.filter.FilterCache;
import org.elasticsearch.index.cache.filter.support.CacheKeyFilter;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.cache.filter.IndicesFilterCache;
import java.io.IOException;
@ -52,12 +54,9 @@ import java.util.concurrent.ConcurrentMap;
public class WeightedFilterCache extends AbstractIndexComponent implements FilterCache, SegmentReader.CoreClosedListener, RemovalListener<WeightedFilterCache.FilterCacheKey, DocIdSet> {
final IndicesFilterCache indicesFilterCache;
IndexService indexService;
final ConcurrentMap<Object, Boolean> seenReaders = ConcurrentCollections.newConcurrentMap();
final CounterMetric seenReadersCount = new CounterMetric();
final CounterMetric evictionsMetric = new CounterMetric();
final MeanMetric totalMetric = new MeanMetric();
final ConcurrentMap<IndexReader, Boolean> seenReaders = ConcurrentCollections.newConcurrentMap();
@Inject
public WeightedFilterCache(Index index, @IndexSettings Settings indexSettings, IndicesFilterCache indicesFilterCache) {
@ -65,6 +64,11 @@ public class WeightedFilterCache extends AbstractIndexComponent implements Filte
this.indicesFilterCache = indicesFilterCache;
}
@Override
public void setIndexService(IndexService indexService) {
this.indexService = indexService;
}
@Override
public String type() {
return "weighted";
@ -83,7 +87,6 @@ public class WeightedFilterCache extends AbstractIndexComponent implements Filte
if (removed == null) {
return;
}
seenReadersCount.dec();
indicesFilterCache.addReaderKeyToClean(readerKey);
}
}
@ -92,8 +95,8 @@ public class WeightedFilterCache extends AbstractIndexComponent implements Filte
public void clear(String reason, String[] keys) {
logger.debug("clear keys [], reason [{}]", reason, keys);
for (String key : keys) {
for (Object readerKey : seenReaders.keySet()) {
indicesFilterCache.cache().invalidate(new FilterCacheKey(this, readerKey, new CacheKeyFilter.Key(key)));
for (IndexReader reader : seenReaders.keySet()) {
indicesFilterCache.cache().invalidate(new FilterCacheKey(this, reader, new CacheKeyFilter.Key(key)));
}
}
}
@ -111,21 +114,9 @@ public class WeightedFilterCache extends AbstractIndexComponent implements Filte
if (removed == null) {
return;
}
seenReadersCount.dec();
indicesFilterCache.addReaderKeyToClean(reader.getCoreCacheKey());
}
@Override
public EntriesStats entriesStats() {
long seenReadersCount = this.seenReadersCount.count();
return new EntriesStats(totalMetric.sum(), seenReadersCount == 0 ? 0 : totalMetric.count() / seenReadersCount);
}
@Override
public long evictions() {
return evictionsMetric.count();
}
@Override
public Filter cache(Filter filterToCache) {
if (filterToCache instanceof NoCacheFilter) {
@ -155,29 +146,27 @@ public class WeightedFilterCache extends AbstractIndexComponent implements Filte
if (filter instanceof CacheKeyFilter) {
filterKey = ((CacheKeyFilter) filter).cacheKey();
}
FilterCacheKey cacheKey = new FilterCacheKey(this.cache, context.reader().getCoreCacheKey(), filterKey);
FilterCacheKey cacheKey = new FilterCacheKey(this.cache, context.reader(), filterKey);
Cache<FilterCacheKey, DocIdSet> innerCache = cache.indicesFilterCache.cache();
DocIdSet cacheValue = innerCache.getIfPresent(cacheKey);
if (cacheValue == null) {
if (!cache.seenReaders.containsKey(context.reader().getCoreCacheKey())) {
Boolean previous = cache.seenReaders.putIfAbsent(context.reader().getCoreCacheKey(), Boolean.TRUE);
Boolean previous = cache.seenReaders.putIfAbsent(context.reader(), Boolean.TRUE);
if (previous == null) {
cache.seenReadersCount.inc();
// we add a core closed listener only, for non core IndexReaders we rely on clear being called (percolator for example)
if (context.reader() instanceof SegmentReader) {
((SegmentReader) context.reader()).addCoreClosedListener(cache);
}
}
}
// we can't pass down acceptedDocs provided, because we are caching the result, and acceptedDocs
// might be specific to a query AST, we do pass down the live docs to make sure we optimize the execution
cacheValue = DocIdSets.toCacheable(context.reader(), filter.getDocIdSet(context, context.reader().getLiveDocs()));
// we might put the same one concurrently, that's fine, it will be replaced and the removal
// will be called
cache.totalMetric.inc(sizeInBytes(cacheValue));
innerCache.put(cacheKey, cacheValue);
cache.onCached(cacheKey, cacheValue);
}
// note, we don't wrap the return value with a BitsFilteredDocIdSet.wrap(docIdSet, acceptDocs) because
@ -214,11 +203,24 @@ public class WeightedFilterCache extends AbstractIndexComponent implements Filte
// index we register the listener with
@Override
public void onRemoval(RemovalNotification<FilterCacheKey, DocIdSet> removalNotification) {
if (removalNotification.wasEvicted()) {
evictionsMetric.inc();
if (removalNotification.getKey() != null && removalNotification.getValue() != null) {
ShardId shardId = ShardUtils.extractShardId(removalNotification.getKey().reader());
if (shardId != null) {
IndexShard shard = indexService.shard(shardId.id());
if (shard != null) {
shard.filterCache().onRemoval(removalNotification.getKey(), removalNotification.wasEvicted(), sizeInBytes(removalNotification.getValue()));
}
}
}
if (removalNotification.getValue() != null) {
totalMetric.dec(sizeInBytes(removalNotification.getValue()));
}
void onCached(FilterCacheKey cacheKey, DocIdSet cacheValue) {
ShardId shardId = ShardUtils.extractShardId(cacheKey.reader());
if (shardId != null) {
IndexShard shard = indexService.shard(shardId.id());
if (shard != null) {
shard.filterCache().onCached(cacheKey, sizeInBytes(cacheValue));
}
}
}
@ -232,12 +234,12 @@ public class WeightedFilterCache extends AbstractIndexComponent implements Filte
public static class FilterCacheKey {
private final RemovalListener<WeightedFilterCache.FilterCacheKey, DocIdSet> removalListener;
private final Object readerKey;
private final IndexReader reader;
private final Object filterKey;
public FilterCacheKey(RemovalListener<WeightedFilterCache.FilterCacheKey, DocIdSet> removalListener, Object readerKey, Object filterKey) {
public FilterCacheKey(RemovalListener<WeightedFilterCache.FilterCacheKey, DocIdSet> removalListener, IndexReader reader, Object filterKey) {
this.removalListener = removalListener;
this.readerKey = readerKey;
this.reader = reader;
this.filterKey = filterKey;
}
@ -245,8 +247,12 @@ public class WeightedFilterCache extends AbstractIndexComponent implements Filte
return removalListener;
}
public IndexReader reader() {
return this.reader;
}
public Object readerKey() {
return readerKey;
return reader.getCoreCacheKey();
}
public Object filterKey() {
@ -258,12 +264,12 @@ public class WeightedFilterCache extends AbstractIndexComponent implements Filte
if (this == o) return true;
// if (o == null || getClass() != o.getClass()) return false;
FilterCacheKey that = (FilterCacheKey) o;
return (readerKey.equals(that.readerKey) && filterKey.equals(that.filterKey));
return (readerKey().equals(that.readerKey()) && filterKey.equals(that.filterKey));
}
@Override
public int hashCode() {
return readerKey.hashCode() + 31 * filterKey.hashCode();
return readerKey().hashCode() + 31 * filterKey.hashCode();
}
}
}

View File

@ -24,6 +24,7 @@ import org.apache.lucene.index.AtomicReaderContext;
import org.apache.lucene.index.IndexReader;
import org.elasticsearch.common.component.CloseableComponent;
import org.elasticsearch.index.IndexComponent;
import org.elasticsearch.index.service.IndexService;
import java.util.List;
@ -32,6 +33,9 @@ import java.util.List;
*/
public interface IdCache extends IndexComponent, CloseableComponent, Iterable<IdReaderCache> {
// we need to "inject" the index service to not create cyclic dep
void setIndexService(IndexService indexService);
void clear();
void clear(IndexReader reader);
@ -39,7 +43,4 @@ public interface IdCache extends IndexComponent, CloseableComponent, Iterable<Id
void refresh(List<AtomicReaderContext> readers) throws Exception;
IdReaderCache reader(AtomicReader reader);
long sizeInBytes();
}

View File

@ -0,0 +1,87 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.cache.id;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
/**
*/
public class IdCacheStats implements Streamable, ToXContent {
long memorySize;
public IdCacheStats() {
}
public IdCacheStats(long memorySize) {
this.memorySize = memorySize;
}
public void add(IdCacheStats stats) {
this.memorySize += stats.memorySize;
}
public long getMemorySizeInBytes() {
return this.memorySize;
}
public ByteSizeValue getMemorySize() {
return new ByteSizeValue(memorySize);
}
public static IdCacheStats readIdCacheStats(StreamInput in) throws IOException {
IdCacheStats stats = new IdCacheStats();
stats.readFrom(in);
return stats;
}
@Override
public void readFrom(StreamInput in) throws IOException {
memorySize = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(memorySize);
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.ID_CACHE);
builder.field(Fields.MEMORY_SIZE, getMemorySize().toString());
builder.field(Fields.MEMORY_SIZE_IN_BYTES, memorySize);
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString ID_CACHE = new XContentBuilderString("id_cache");
static final XContentBuilderString MEMORY_SIZE = new XContentBuilderString("memory_size");
static final XContentBuilderString MEMORY_SIZE_IN_BYTES = new XContentBuilderString("memory_size_in_bytes");
}
}

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.cache.id;
import org.apache.lucene.index.IndexReader;
import org.elasticsearch.common.bytes.HashedBytesArray;
/**
@ -26,6 +27,8 @@ import org.elasticsearch.common.bytes.HashedBytesArray;
*/
public interface IdReaderCache {
IndexReader reader();
Object readerCacheKey();
IdReaderTypeCache type(String type);

View File

@ -0,0 +1,51 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.cache.id;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
/**
*/
public class ShardIdCache extends AbstractIndexShardComponent {
final CounterMetric totalMetric = new CounterMetric();
@Inject
public ShardIdCache(ShardId shardId, @IndexSettings Settings indexSettings) {
super(shardId, indexSettings);
}
public IdCacheStats stats() {
return new IdCacheStats(totalMetric.count());
}
public void onCached(long sizeInBytes) {
totalMetric.inc(sizeInBytes);
}
public void onRemoval(long sizeInBytes) {
totalMetric.dec(sizeInBytes);
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.cache.id;
import org.elasticsearch.common.inject.AbstractModule;
/**
*/
public class ShardIdCacheModule extends AbstractModule {
@Override
protected void configure() {
bind(ShardIdCache.class).asEagerSingleton();
}
}

View File

@ -36,7 +36,11 @@ import org.elasticsearch.index.cache.id.IdReaderCache;
import org.elasticsearch.index.mapper.Uid;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.shard.service.IndexShard;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
@ -49,6 +53,8 @@ public class SimpleIdCache extends AbstractIndexComponent implements IdCache, Se
private final ConcurrentMap<Object, SimpleIdReaderCache> idReaders;
private final boolean reuse;
IndexService indexService;
@Inject
public SimpleIdCache(Index index, @IndexSettings Settings indexSettings) {
super(index, indexSettings);
@ -56,6 +62,11 @@ public class SimpleIdCache extends AbstractIndexComponent implements IdCache, Se
this.reuse = componentSettings.getAsBoolean("reuse", false);
}
@Override
public void setIndexService(IndexService indexService) {
this.indexService = indexService;
}
@Override
public void close() throws ElasticSearchException {
clear();
@ -63,7 +74,11 @@ public class SimpleIdCache extends AbstractIndexComponent implements IdCache, Se
@Override
public void clear() {
idReaders.clear();
for (Iterator<SimpleIdReaderCache> it = idReaders.values().iterator(); it.hasNext(); ) {
SimpleIdReaderCache idReaderCache = it.next();
it.remove();
onRemoval(idReaderCache);
}
}
@Override
@ -73,7 +88,8 @@ public class SimpleIdCache extends AbstractIndexComponent implements IdCache, Se
@Override
public void clear(IndexReader reader) {
idReaders.remove(reader.getCoreCacheKey());
SimpleIdReaderCache removed = idReaders.remove(reader.getCoreCacheKey());
if (removed != null) onRemoval(removed);
}
@Override
@ -99,6 +115,7 @@ public class SimpleIdCache extends AbstractIndexComponent implements IdCache, Se
// do the refresh
Map<Object, Map<String, TypeBuilder>> builders = new HashMap<Object, Map<String, TypeBuilder>>();
Map<Object, IndexReader> cacheToReader = new HashMap<Object, IndexReader>();
// first, go over and load all the id->doc map for all types
for (AtomicReaderContext context : atomicReaderContexts) {
@ -113,6 +130,7 @@ public class SimpleIdCache extends AbstractIndexComponent implements IdCache, Se
}
Map<String, TypeBuilder> readerBuilder = new HashMap<String, TypeBuilder>();
builders.put(reader.getCoreCacheKey(), readerBuilder);
cacheToReader.put(reader.getCoreCacheKey(), context.reader());
Terms terms = reader.terms(UidFieldMapper.NAME);
@ -191,19 +209,32 @@ public class SimpleIdCache extends AbstractIndexComponent implements IdCache, Se
typeBuilderEntry.getValue().parentIdsValues.toArray(new HashedBytesArray[typeBuilderEntry.getValue().parentIdsValues.size()]),
typeBuilderEntry.getValue().parentIdsOrdinals));
}
SimpleIdReaderCache readerCache = new SimpleIdReaderCache(entry.getKey(), types.immutableMap());
SimpleIdReaderCache readerCache = new SimpleIdReaderCache(cacheToReader.get(entry.getKey()), types.immutableMap());
idReaders.put(readerCache.readerCacheKey(), readerCache);
onCached(readerCache);
}
}
}
}
public long sizeInBytes() {
long sizeInBytes = 0;
for (SimpleIdReaderCache idReaderCache : idReaders.values()) {
sizeInBytes += idReaderCache.sizeInBytes();
void onCached(SimpleIdReaderCache readerCache) {
ShardId shardId = ShardUtils.extractShardId(readerCache.reader());
if (shardId != null) {
IndexShard shard = indexService.shard(shardId.id());
if (shard != null) {
shard.idCache().onCached(readerCache.sizeInBytes());
}
}
}
void onRemoval(SimpleIdReaderCache readerCache) {
ShardId shardId = ShardUtils.extractShardId(readerCache.reader());
if (shardId != null) {
IndexShard shard = indexService.shard(shardId.id());
if (shard != null) {
shard.idCache().onCached(readerCache.sizeInBytes());
}
}
return sizeInBytes;
}
private HashedBytesArray checkIfCanReuse(Map<Object, Map<String, TypeBuilder>> builders, HashedBytesArray idAsBytes) {

View File

@ -20,6 +20,7 @@
package org.elasticsearch.index.cache.id.simple;
import com.google.common.collect.ImmutableMap;
import org.apache.lucene.index.IndexReader;
import org.elasticsearch.common.bytes.HashedBytesArray;
import org.elasticsearch.index.cache.id.IdReaderCache;
import org.elasticsearch.index.cache.id.IdReaderTypeCache;
@ -29,18 +30,22 @@ import org.elasticsearch.index.cache.id.IdReaderTypeCache;
*/
public class SimpleIdReaderCache implements IdReaderCache {
private final Object readerCacheKey;
private final IndexReader reader;
private final ImmutableMap<String, SimpleIdReaderTypeCache> types;
public SimpleIdReaderCache(Object readerCacheKey, ImmutableMap<String, SimpleIdReaderTypeCache> types) {
this.readerCacheKey = readerCacheKey;
public SimpleIdReaderCache(IndexReader reader, ImmutableMap<String, SimpleIdReaderTypeCache> types) {
this.reader = reader;
this.types = types;
}
@Override
public IndexReader reader() {
return this.reader;
}
@Override
public Object readerCacheKey() {
return this.readerCacheKey;
return this.reader.getCoreCacheKey();
}
@Override

View File

@ -33,6 +33,8 @@ import org.elasticsearch.index.*;
import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.filter.ShardFilterCacheModule;
import org.elasticsearch.index.cache.id.ShardIdCacheModule;
import org.elasticsearch.index.deletionpolicy.DeletionPolicyModule;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineModule;
@ -151,6 +153,10 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
this.pluginsService = injector.getInstance(PluginsService.class);
this.indicesLifecycle = (InternalIndicesLifecycle) injector.getInstance(IndicesLifecycle.class);
// inject workarounds for cyclic dep
indexCache.filter().setIndexService(this);
indexCache.idCache().setIndexService(this);
}
@Override
@ -318,6 +324,8 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
modules.add(new DeletionPolicyModule(indexSettings));
modules.add(new MergePolicyModule(indexSettings));
modules.add(new MergeSchedulerModule(indexSettings));
modules.add(new ShardFilterCacheModule());
modules.add(new ShardIdCacheModule());
modules.add(new TranslogModule(indexSettings));
modules.add(new EngineModule(indexSettings));
modules.add(new IndexShardGatewayModule(injector.getInstance(IndexGateway.class)));

View File

@ -0,0 +1,47 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.SegmentReader;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.index.store.Store;
/**
*/
public class ShardUtils {
/**
* Tries to extract the shard id from a reader if possible, when its not possible,
* will return null. This method requires the reader to be a {@link SegmentReader}
* and the directory backing it to be {@link org.elasticsearch.index.store.Store.StoreDirectory}.
* This will be the case in almost all cases, except for percolator currently.
*/
@Nullable
public static ShardId extractShardId(IndexReader reader) {
if (reader instanceof SegmentReader) {
SegmentReader sReader = (SegmentReader) reader;
if (sReader.directory() instanceof Store.StoreDirectory) {
return ((Store.StoreDirectory) sReader.directory()).shardId();
}
}
return null;
}
}

View File

@ -23,6 +23,10 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.cache.filter.FilterCacheStats;
import org.elasticsearch.index.cache.filter.ShardFilterCache;
import org.elasticsearch.index.cache.id.IdCacheStats;
import org.elasticsearch.index.cache.id.ShardIdCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.flush.FlushStats;
@ -56,6 +60,10 @@ public interface IndexShard extends IndexShardComponent {
ShardIndexWarmerService warmerService();
ShardFilterCache filterCache();
ShardIdCache idCache();
ShardRouting routingEntry();
DocsStats docStats();
@ -76,6 +84,10 @@ public interface IndexShard extends IndexShardComponent {
WarmerStats warmerStats();
FilterCacheStats filterCacheStats();
IdCacheStats idCacheStats();
IndexShardState state();
Engine.Create prepareCreate(SourceToParse source) throws ElasticSearchException;

View File

@ -21,7 +21,6 @@ package org.elasticsearch.index.shard.service;
import com.google.common.base.Charsets;
import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.Query;
import org.apache.lucene.util.ThreadInterruptedException;
@ -42,6 +41,10 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.aliases.IndexAliasesService;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.filter.FilterCacheStats;
import org.elasticsearch.index.cache.filter.ShardFilterCache;
import org.elasticsearch.index.cache.id.IdCacheStats;
import org.elasticsearch.index.cache.id.ShardIdCache;
import org.elasticsearch.index.engine.*;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
@ -83,50 +86,33 @@ import static org.elasticsearch.index.mapper.SourceToParse.source;
public class InternalIndexShard extends AbstractIndexShardComponent implements IndexShard {
private final ThreadPool threadPool;
private final IndexSettingsService indexSettingsService;
private final MapperService mapperService;
private final IndexQueryParserService queryParserService;
private final IndexCache indexCache;
private final InternalIndicesLifecycle indicesLifecycle;
private final Store store;
private final MergeSchedulerProvider mergeScheduler;
private final Engine engine;
private final Translog translog;
private final IndexAliasesService indexAliasesService;
private final ShardIndexingService indexingService;
private final ShardSearchService searchService;
private final ShardGetService getService;
private final ShardIndexWarmerService shardWarmerService;
private final ShardFilterCache shardFilterCache;
private final ShardIdCache shardIdCache;
private final Object mutex = new Object();
private final String checkIndexOnStartup;
private long checkIndexTook = 0;
private volatile IndexShardState state;
private TimeValue refreshInterval;
private final TimeValue mergeInterval;
private volatile ScheduledFuture refreshScheduledFuture;
private volatile ScheduledFuture mergeScheduleFuture;
private volatile ShardRouting shardRouting;
private RecoveryStatus peerRecoveryStatus;
@ -138,7 +124,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
@Inject
public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService) {
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService,
ShardFilterCache shardFilterCache, ShardIdCache shardIdCache) {
super(shardId, indexSettings);
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
this.indexSettingsService = indexSettingsService;
@ -155,6 +142,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
this.getService = getService.setIndexShard(this);
this.searchService = searchService;
this.shardWarmerService = shardWarmerService;
this.shardFilterCache = shardFilterCache;
this.shardIdCache = shardIdCache;
state = IndexShardState.CREATED;
this.refreshInterval = indexSettings.getAsTime("engine.robin.refresh_interval", indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, engine.defaultRefreshInterval()));
@ -202,6 +191,16 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return this.shardWarmerService;
}
@Override
public ShardFilterCache filterCache() {
return this.shardFilterCache;
}
@Override
public ShardIdCache idCache() {
return this.shardIdCache;
}
@Override
public ShardRouting routingEntry() {
return this.shardRouting;
@ -463,6 +462,16 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return shardWarmerService.stats();
}
@Override
public FilterCacheStats filterCacheStats() {
return shardFilterCache.stats();
}
@Override
public IdCacheStats idCacheStats() {
return shardIdCache.stats();
}
@Override
public void flush(Engine.Flush flush) throws ElasticSearchException {
// we allows flush while recovering, since we allow for operations to happen

View File

@ -296,7 +296,7 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
/**
* The idea of the store directory is to cache file level meta data, as well as md5 of it
*/
class StoreDirectory extends Directory implements ForceSyncDirectory {
public class StoreDirectory extends Directory implements ForceSyncDirectory {
private final Distributor distributor;
@ -316,6 +316,10 @@ public class Store extends AbstractIndexShardComponent implements CloseableIndex
}
}
public ShardId shardId() {
return Store.this.shardId();
}
public Directory[] delegates() {
return distributor.all();
}

View File

@ -35,9 +35,10 @@ import org.elasticsearch.index.*;
import org.elasticsearch.index.aliases.IndexAliasesServiceModule;
import org.elasticsearch.index.analysis.AnalysisModule;
import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.CacheStats;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.IndexCacheModule;
import org.elasticsearch.index.cache.filter.FilterCacheStats;
import org.elasticsearch.index.cache.id.IdCacheStats;
import org.elasticsearch.index.codec.CodecModule;
import org.elasticsearch.index.engine.IndexEngine;
import org.elasticsearch.index.engine.IndexEngineModule;
@ -185,8 +186,9 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
IndexingStats indexingStats = new IndexingStats();
GetStats getStats = new GetStats();
SearchStats searchStats = new SearchStats();
CacheStats cacheStats = new CacheStats();
FieldDataStats fieldDataStats = new FieldDataStats();
FilterCacheStats filterCacheStats = new FilterCacheStats();
IdCacheStats idCacheStats = new IdCacheStats();
MergeStats mergeStats = new MergeStats();
RefreshStats refreshStats = new RefreshStats();
FlushStats flushStats = new FlushStats();
@ -210,11 +212,12 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
mergeStats.add(indexShard.mergeStats());
refreshStats.add(indexShard.refreshStats());
flushStats.add(indexShard.flushStats());
filterCacheStats.add(indexShard.filterCacheStats());
idCacheStats.add(indexShard.idCacheStats());
}
cacheStats.add(indexService.cache().stats());
fieldDataStats.add(indexService.fieldData().stats());
}
return new NodeIndicesStats(storeStats, docsStats, indexingStats, getStats, searchStats, cacheStats, fieldDataStats, mergeStats, refreshStats, flushStats);
return new NodeIndicesStats(storeStats, docsStats, indexingStats, getStats, searchStats, fieldDataStats, mergeStats, refreshStats, flushStats, filterCacheStats, idCacheStats);
}
/**

View File

@ -25,7 +25,8 @@ import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.cache.CacheStats;
import org.elasticsearch.index.cache.filter.FilterCacheStats;
import org.elasticsearch.index.cache.id.IdCacheStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
@ -45,39 +46,32 @@ import java.io.Serializable;
public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
private StoreStats storeStats;
private DocsStats docsStats;
private IndexingStats indexingStats;
private GetStats getStats;
private SearchStats searchStats;
private CacheStats cacheStats;
private FieldDataStats fieldDataStats;
private MergeStats mergeStats;
private RefreshStats refreshStats;
private FlushStats flushStats;
private FilterCacheStats filterCacheStats;
private IdCacheStats idCacheStats;
NodeIndicesStats() {
}
public NodeIndicesStats(StoreStats storeStats, DocsStats docsStats, IndexingStats indexingStats, GetStats getStats, SearchStats searchStats, CacheStats cacheStats, FieldDataStats fieldDataStats, MergeStats mergeStats, RefreshStats refreshStats, FlushStats flushStats) {
public NodeIndicesStats(StoreStats storeStats, DocsStats docsStats, IndexingStats indexingStats, GetStats getStats, SearchStats searchStats, FieldDataStats fieldDataStats, MergeStats mergeStats, RefreshStats refreshStats, FlushStats flushStats, FilterCacheStats filterCacheStats, IdCacheStats idCacheStats) {
this.storeStats = storeStats;
this.docsStats = docsStats;
this.indexingStats = indexingStats;
this.getStats = getStats;
this.searchStats = searchStats;
this.cacheStats = cacheStats;
this.fieldDataStats = fieldDataStats;
this.mergeStats = mergeStats;
this.refreshStats = refreshStats;
this.flushStats = flushStats;
this.filterCacheStats = filterCacheStats;
this.idCacheStats = idCacheStats;
}
/**
@ -103,10 +97,6 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
return this.searchStats;
}
public CacheStats getCache() {
return this.cacheStats;
}
public MergeStats getMerge() {
return this.mergeStats;
}
@ -119,6 +109,14 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
return this.flushStats;
}
public FilterCacheStats getFilterCache() {
return this.filterCacheStats;
}
public IdCacheStats getIdCache() {
return this.idCacheStats;
}
public static NodeIndicesStats readIndicesStats(StreamInput in) throws IOException {
NodeIndicesStats stats = new NodeIndicesStats();
stats.readFrom(in);
@ -132,11 +130,12 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
indexingStats = IndexingStats.readIndexingStats(in);
getStats = GetStats.readGetStats(in);
searchStats = SearchStats.readSearchStats(in);
cacheStats = CacheStats.readCacheStats(in);
fieldDataStats = FieldDataStats.readFieldDataStats(in);
mergeStats = MergeStats.readMergeStats(in);
refreshStats = RefreshStats.readRefreshStats(in);
flushStats = FlushStats.readFlushStats(in);
filterCacheStats = FilterCacheStats.readFilterCacheStats(in);
idCacheStats = IdCacheStats.readIdCacheStats(in);
}
@Override
@ -146,11 +145,12 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
indexingStats.writeTo(out);
getStats.writeTo(out);
searchStats.writeTo(out);
cacheStats.writeTo(out);
fieldDataStats.writeTo(out);
mergeStats.writeTo(out);
refreshStats.writeTo(out);
flushStats.writeTo(out);
filterCacheStats.writeTo(out);
idCacheStats.writeTo(out);
}
@Override
@ -162,7 +162,8 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
indexingStats.toXContent(builder, params);
getStats.toXContent(builder, params);
searchStats.toXContent(builder, params);
cacheStats.toXContent(builder, params);
filterCacheStats.toXContent(builder, params);
idCacheStats.toXContent(builder, params);
fieldDataStats.toXContent(builder, params);
mergeStats.toXContent(builder, params);
refreshStats.toXContent(builder, params);

View File

@ -77,6 +77,12 @@ public class RestIndicesStatsAction extends BaseRestHandler {
controller.registerHandler(GET, "/_stats/warmer", new RestWarmerStatsHandler());
controller.registerHandler(GET, "/{index}/_stats/warmer", new RestWarmerStatsHandler());
controller.registerHandler(GET, "/_stats/filter_cache", new RestFilterCacheStatsHandler());
controller.registerHandler(GET, "/{index}/_stats/filter_cache", new RestFilterCacheStatsHandler());
controller.registerHandler(GET, "/_stats/id_cache", new RestIdCacheStatsHandler());
controller.registerHandler(GET, "/{index}/_stats/id_cache", new RestIdCacheStatsHandler());
}
@Override
@ -108,6 +114,8 @@ public class RestIndicesStatsAction extends BaseRestHandler {
indicesStatsRequest.refresh(request.paramAsBoolean("refresh", indicesStatsRequest.refresh()));
indicesStatsRequest.flush(request.paramAsBoolean("flush", indicesStatsRequest.flush()));
indicesStatsRequest.warmer(request.paramAsBoolean("warmer", indicesStatsRequest.warmer()));
indicesStatsRequest.filterCache(request.paramAsBoolean("filter_cache", indicesStatsRequest.filterCache()));
indicesStatsRequest.idCache(request.paramAsBoolean("id_cache", indicesStatsRequest.idCache()));
client.admin().indices().stats(indicesStatsRequest, new ActionListener<IndicesStatsResponse>() {
@Override
@ -451,6 +459,82 @@ public class RestIndicesStatsAction extends BaseRestHandler {
}
}
class RestFilterCacheStatsHandler implements RestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.listenerThreaded(false);
indicesStatsRequest.clear().filterCache(true);
indicesStatsRequest.indices(splitIndices(request.param("index")));
indicesStatsRequest.types(splitTypes(request.param("types")));
client.admin().indices().stats(indicesStatsRequest, new ActionListener<IndicesStatsResponse>() {
@Override
public void onResponse(IndicesStatsResponse response) {
try {
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
builder.startObject();
builder.field("ok", true);
buildBroadcastShardsHeader(builder, response);
response.toXContent(builder, request);
builder.endObject();
channel.sendResponse(new XContentRestResponse(request, OK, builder));
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
}
}
class RestIdCacheStatsHandler implements RestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.listenerThreaded(false);
indicesStatsRequest.clear().idCache(true);
indicesStatsRequest.indices(splitIndices(request.param("index")));
indicesStatsRequest.types(splitTypes(request.param("types")));
client.admin().indices().stats(indicesStatsRequest, new ActionListener<IndicesStatsResponse>() {
@Override
public void onResponse(IndicesStatsResponse response) {
try {
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
builder.startObject();
builder.field("ok", true);
buildBroadcastShardsHeader(builder, response);
response.toXContent(builder, request);
builder.endObject();
channel.sendResponse(new XContentRestResponse(request, OK, builder));
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
}
}
class RestRefreshStatsHandler implements RestHandler {
@Override

View File

@ -303,7 +303,7 @@ public class ChildSearchBenchmark {
statsResponse = client.admin().cluster().prepareNodesStats()
.setJvm(true).setIndices(true).execute().actionGet();
System.out.println("--> Id cache size: " + statsResponse.getNodes()[0].getIndices().getCache().getIdCacheSize());
System.out.println("--> Id cache size: " + statsResponse.getNodes()[0].getIndices().getIdCache().getMemorySize());
System.out.println("--> Used heap size: " + statsResponse.getNodes()[0].getJvm().getMem().getHeapUsed());
System.out.println("--> Running has_child query with score type");
@ -369,7 +369,7 @@ public class ChildSearchBenchmark {
statsResponse = client.admin().cluster().prepareNodesStats()
.setJvm(true).setIndices(true).execute().actionGet();
System.out.println("--> Id cache size: " + statsResponse.getNodes()[0].getIndices().getCache().getIdCacheSize());
System.out.println("--> Id cache size: " + statsResponse.getNodes()[0].getIndices().getIdCache().getMemorySize());
System.out.println("--> Used heap size: " + statsResponse.getNodes()[0].getJvm().getMem().getHeapUsed());
client.close();

View File

@ -65,15 +65,15 @@ public class ClearCacheTests extends AbstractNodesTests {
client.admin().indices().prepareRefresh().execute().actionGet();
NodesStatsResponse nodesStats = client.admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getCache().getFilterSizeInBytes(), equalTo(0l));
assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes(), equalTo(0l));
SearchResponse searchResponse = client.prepareSearch().setQuery(filteredQuery(matchAllQuery(), FilterBuilders.termFilter("field", "value").cacheKey("test_key"))).execute().actionGet();
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
nodesStats = client.admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getCache().getFilterSizeInBytes(), greaterThan(0l));
assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes(), greaterThan(0l));
client.admin().indices().prepareClearCache().setFilterKeys("test_key").execute().actionGet();
nodesStats = client.admin().cluster().prepareNodesStats().setIndices(true).execute().actionGet();
assertThat(nodesStats.getNodes()[0].getIndices().getCache().getFilterSizeInBytes(), equalTo(0l));
assertThat(nodesStats.getNodes()[0].getIndices().getFilterCache().getMemorySizeInBytes(), equalTo(0l));
}
}