Remove cyclic dependencies between IndexService and FieldData / BitSet caches

Adds a listeners to each of the caches that allows us to remove the dependency on IndexService which is cyclic since
the IndexService depends on both of these caches. This cyclic dependency makes
testing the indiviual parts very hard and is only added for the sake of
incrementing some stats.
This commit is contained in:
Simon Willnauer 2015-09-07 17:22:23 +02:00
parent a2f94f2e2f
commit ed5f6e5a0c
17 changed files with 427 additions and 185 deletions

View File

@ -23,6 +23,7 @@ import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -38,7 +39,10 @@ import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.deletionpolicy.DeletionPolicyModule;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.settings.IndexSettings;
@ -150,8 +154,8 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
this.indicesLifecycle = (InternalIndicesLifecycle) injector.getInstance(IndicesLifecycle.class);
// inject workarounds for cyclic dep
indexFieldData.setIndexService(this);
bitSetFilterCache.setIndexService(this);
indexFieldData.setListener(new FieldDataCacheListener(this));
bitSetFilterCache.setListener(new BitsetCacheListener(this));
this.nodeEnv = nodeEnv;
}
@ -537,4 +541,62 @@ public class IndexService extends AbstractIndexComponent implements IndexCompone
public Settings getIndexSettings() {
return indexSettings;
}
private static final class BitsetCacheListener implements BitsetFilterCache.Listener {
final IndexService indexService;
private BitsetCacheListener(IndexService indexService) {
this.indexService = indexService;
}
@Override
public void onCache(ShardId shardId, Accountable accountable) {
if (shardId != null) {
final IndexShard shard = indexService.shard(shardId.id());
if (shard != null) {
long ramBytesUsed = accountable != null ? accountable.ramBytesUsed() : 0l;
shard.shardBitsetFilterCache().onCached(ramBytesUsed);
}
}
}
@Override
public void onRemoval(ShardId shardId, Accountable accountable) {
if (shardId != null) {
final IndexShard shard = indexService.shard(shardId.id());
if (shard != null) {
long ramBytesUsed = accountable != null ? accountable.ramBytesUsed() : 0l;
shard.shardBitsetFilterCache().onRemoval(ramBytesUsed);
}
}
}
}
private final class FieldDataCacheListener implements IndexFieldDataCache.Listener {
final IndexService indexService;
public FieldDataCacheListener(IndexService indexService) {
this.indexService = indexService;
}
@Override
public void onCache(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage) {
if (shardId != null) {
final IndexShard shard = indexService.shard(shardId.id());
if (shard != null) {
shard.fieldData().onCache(shardId, fieldNames, fieldDataType, ramUsage);
}
}
}
@Override
public void onRemoval(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
if (shardId != null) {
final IndexShard shard = indexService.shard(shardId.id());
if (shard != null) {
shard.fieldData().onRemoval(shardId, fieldNames, fieldDataType, wasEvicted, sizeInBytes);
}
}
}
}
}

View File

@ -33,6 +33,7 @@ import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.join.BitSetProducer;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BitDocIdSet;
import org.apache.lucene.util.BitSet;
import org.elasticsearch.ExceptionsHelper;
@ -43,7 +44,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.object.ObjectMapper;
@ -61,10 +61,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.*;
/**
* This is a cache for {@link BitDocIdSet} based filters and is unbounded by size or time.
@ -76,12 +73,21 @@ import java.util.concurrent.Executor;
public class BitsetFilterCache extends AbstractIndexComponent implements LeafReader.CoreClosedListener, RemovalListener<Object, Cache<Query, BitsetFilterCache.Value>>, Closeable {
public static final String LOAD_RANDOM_ACCESS_FILTERS_EAGERLY = "index.load_fixed_bitset_filters_eagerly";
private static final Listener DEFAULT_NOOP_LISTENER = new Listener() {
@Override
public void onCache(ShardId shardId, Accountable accountable) {
}
@Override
public void onRemoval(ShardId shardId, Accountable accountable) {
}
};
private final boolean loadRandomAccessFiltersEagerly;
private final Cache<Object, Cache<Query, Value>> loadedFilters;
private volatile Listener listener = DEFAULT_NOOP_LISTENER;
private final BitSetProducerWarmer warmer;
private IndexService indexService;
private IndicesWarmer indicesWarmer;
@Inject
@ -95,16 +101,24 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
@Inject(optional = true)
public void setIndicesWarmer(IndicesWarmer indicesWarmer) {
this.indicesWarmer = indicesWarmer;
}
public void setIndexService(IndexService indexService) {
this.indexService = indexService;
// First the indicesWarmer is set and then the indexService is set, because of this there is a small window of
// time where indexService is null. This is why the warmer should only registered after indexService has been set.
// Otherwise there is a small chance of the warmer running into a NPE, since it uses the indexService
indicesWarmer.addListener(warmer);
}
/**
* Sets a listener that is invoked for all subsequent cache and removal events.
* @throws IllegalStateException if the listener is set more than once
*/
public void setListener(Listener listener) {
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");
}
if (this.listener != DEFAULT_NOOP_LISTENER) {
throw new IllegalStateException("can't set listener more than once");
}
this.listener = listener;
}
public BitSetProducer getBitSetProducer(Query query) {
return new QueryWrapperBitSetProducer(query);
}
@ -116,7 +130,9 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
@Override
public void close() {
if (indicesWarmer != null) {
indicesWarmer.removeListener(warmer);
}
clear("close");
}
@ -135,9 +151,7 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
return CacheBuilder.newBuilder().build();
}
});
return filterToFbs.get(query, new Callable<Value>() {
@Override
public Value call() throws Exception {
return filterToFbs.get(query, () -> {
final IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(context);
final IndexSearcher searcher = new IndexSearcher(topLevelContext);
searcher.setQueryCache(null);
@ -151,15 +165,8 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
}
Value value = new Value(bitSet, shardId);
if (shardId != null) {
IndexShard shard = indexService.shard(shardId.id());
if (shard != null) {
long ramBytesUsed = value.bitset != null ? value.bitset.ramBytesUsed() : 0l;
shard.shardBitsetFilterCache().onCached(ramBytesUsed);
}
}
listener.onCache(shardId, value.bitset);
return value;
}
}).bitset;
}
@ -170,20 +177,13 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
return;
}
Cache<Query, Value> value = notification.getValue();
if (value == null) {
Cache<Query, Value> valueCache = notification.getValue();
if (valueCache == null) {
return;
}
for (Map.Entry<Query, Value> entry : value.asMap().entrySet()) {
if (entry.getValue().shardId == null) {
continue;
}
IndexShard shard = indexService.shard(entry.getValue().shardId.id());
if (shard != null) {
ShardBitsetFilterCache shardBitsetFilterCache = shard.shardBitsetFilterCache();
shardBitsetFilterCache.onRemoval(entry.getValue().bitset.ramBytesUsed());
}
for (Value value : valueCache.asMap().values()) {
listener.onRemoval(value.shardId, value.bitset);
// if null then this means the shard has already been removed and the stats are 0 anyway for the shard this key belongs to
}
}
@ -266,10 +266,7 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
final CountDownLatch latch = new CountDownLatch(context.searcher().reader().leaves().size() * warmUp.size());
for (final LeafReaderContext ctx : context.searcher().reader().leaves()) {
for (final Query filterToWarm : warmUp) {
executor.execute(new Runnable() {
@Override
public void run() {
executor.execute(() -> {
try {
final long start = System.nanoTime();
getAndLoadIfNotPresent(filterToWarm, ctx);
@ -281,17 +278,10 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
} finally {
latch.countDown();
}
}
});
}
}
return new TerminationHandle() {
@Override
public void awaitTermination() throws InterruptedException {
latch.await();
}
};
return () -> latch.await();
}
@Override
@ -304,4 +294,25 @@ public class BitsetFilterCache extends AbstractIndexComponent implements LeafRea
Cache<Object, Cache<Query, Value>> getLoadedFilters() {
return loadedFilters;
}
/**
* A listener interface that is executed for each onCache / onRemoval event
*/
public interface Listener {
/**
* Called for each cached bitset on the cache event.
* @param shardId the shard id the bitset was cached for. This can be <code>null</code>
* @param accountable the bitsets ram representation
*/
void onCache(ShardId shardId, Accountable accountable);
/**
* Called for each cached bitset on the removal event.
* @param shardId the shard id the bitset was cached for. This can be <code>null</code>
* @param accountable the bitsets ram representation
*/
void onRemoval(ShardId shardId, Accountable accountable);
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.index.cache.bitset;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings;
@ -47,4 +46,5 @@ public class ShardBitsetFilterCache extends AbstractIndexShardComponent {
public long getMemorySizeInBytes() {
return totalMetric.count();
}
}

View File

@ -24,6 +24,7 @@ import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.shard.ShardId;
/**
* A simple field data cache abstraction on the *index* level.
@ -44,13 +45,19 @@ public interface IndexFieldDataCache {
*/
void clear(String fieldName);
void clear(Object coreCacheKey);
void clear(IndexReader reader);
interface Listener {
void onLoad(MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage);
/**
* Called after the fielddata is loaded during the cache phase
*/
void onCache(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage);
void onUnload(MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes);
/**
* Called after the fielddata is unloaded
*/
void onRemoval(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes);
}
class None implements IndexFieldDataCache {
@ -75,8 +82,7 @@ public interface IndexFieldDataCache {
}
@Override
public void clear(Object coreCacheKey) {
public void clear(IndexReader reader) {
}
}
}

View File

@ -22,6 +22,7 @@ package org.elasticsearch.index.fielddata;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.common.collect.MapBuilder;
@ -32,11 +33,12 @@ import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.fielddata.plain.*;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.core.BooleanFieldMapper;
import org.elasticsearch.index.mapper.internal.IndexFieldMapper;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
@ -139,8 +141,18 @@ public class IndexFieldDataService extends AbstractIndexComponent {
private final IndicesFieldDataCache indicesFieldDataCache;
// the below map needs to be modified under a lock
private final Map<String, IndexFieldDataCache> fieldDataCaches = Maps.newHashMap();
private final MapperService mapperService;
private static final IndexFieldDataCache.Listener DEFAULT_NOOP_LISTENER = new IndexFieldDataCache.Listener() {
@Override
public void onCache(ShardId shardId, Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage) {
}
@Override
public void onRemoval(ShardId shardId, Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
}
};
private volatile IndexFieldDataCache.Listener listener = DEFAULT_NOOP_LISTENER;
IndexService indexService;
// We need to cache fielddata on the _parent field because of 1.x indices.
// When we don't support 1.x anymore (3.0) then remove this caching
@ -149,15 +161,11 @@ public class IndexFieldDataService extends AbstractIndexComponent {
@Inject
public IndexFieldDataService(Index index, @IndexSettings Settings indexSettings, IndicesFieldDataCache indicesFieldDataCache,
CircuitBreakerService circuitBreakerService) {
CircuitBreakerService circuitBreakerService, MapperService mapperService) {
super(index, indexSettings);
this.indicesFieldDataCache = indicesFieldDataCache;
this.circuitBreakerService = circuitBreakerService;
}
// we need to "inject" the index service to not create cyclic dep
public void setIndexService(IndexService indexService) {
this.indexService = indexService;
this.mapperService = mapperService;
}
public synchronized void clear() {
@ -229,7 +237,7 @@ public class IndexFieldDataService extends AbstractIndexComponent {
// this means changing the node level settings is simple, just set the bounds there
String cacheType = type.getSettings().get("cache", indexSettings.get(FIELDDATA_CACHE_KEY, FIELDDATA_CACHE_VALUE_NODE));
if (FIELDDATA_CACHE_VALUE_NODE.equals(cacheType)) {
cache = indicesFieldDataCache.buildIndexFieldDataCache(indexService, index, fieldNames, type);
cache = indicesFieldDataCache.buildIndexFieldDataCache(listener, index, fieldNames, type);
} else if ("none".equals(cacheType)){
cache = new IndexFieldDataCache.None();
} else {
@ -243,13 +251,29 @@ public class IndexFieldDataService extends AbstractIndexComponent {
&& Version.indexCreated(indexSettings).before(Version.V_2_0_0_beta1);
if (isOldParentField) {
if (parentIndexFieldData == null) {
parentIndexFieldData = builder.build(index, indexSettings, fieldType, cache, circuitBreakerService, indexService.mapperService());
parentIndexFieldData = builder.build(index, indexSettings, fieldType, cache, circuitBreakerService, mapperService);
}
return (IFD) parentIndexFieldData;
}
}
return (IFD) builder.build(index, indexSettings, fieldType, cache, circuitBreakerService, indexService.mapperService());
return (IFD) builder.build(index, indexSettings, fieldType, cache, circuitBreakerService, mapperService);
}
/**
* Sets a {@link org.elasticsearch.index.fielddata.IndexFieldDataCache.Listener} passed to each {@link IndexFieldData}
* creation to capture onCache and onRemoval events. Setting a listener on this method will override any previously
* set listeners.
* @throws IllegalStateException if the listener is set more than once
*/
public void setListener(IndexFieldDataCache.Listener listener) {
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");
}
if (this.listener != DEFAULT_NOOP_LISTENER) {
throw new IllegalStateException("can't set listener more than once");
}
this.listener = listener;
}
}

View File

@ -56,7 +56,7 @@ public class ShardFieldData implements IndexFieldDataCache.Listener {
}
@Override
public void onLoad(MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage) {
public void onCache(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage) {
totalMetric.inc(ramUsage.ramBytesUsed());
String keyFieldName = fieldNames.indexName();
CounterMetric total = perFieldTotals.get(keyFieldName);
@ -73,7 +73,7 @@ public class ShardFieldData implements IndexFieldDataCache.Listener {
}
@Override
public void onUnload(MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
public void onRemoval(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
if (wasEvicted) {
evictionsMetric.inc();
}

View File

@ -32,6 +32,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.indexing.IndexingOperationListener;
@ -82,7 +83,6 @@ public class PercolatorQueriesRegistry extends AbstractIndexShardComponent imple
private final RealTimePercolatorOperationListener realTimePercolatorOperationListener = new RealTimePercolatorOperationListener();
private final PercolateTypeListener percolateTypeListener = new PercolateTypeListener();
private final AtomicBoolean realTimePercolatorEnabled = new AtomicBoolean(false);
private boolean mapUnmappedFieldsAsString;
public PercolatorQueriesRegistry(ShardId shardId, @IndexSettings Settings indexSettings, IndexQueryParserService queryParserService,

View File

@ -395,5 +395,4 @@ public class QueryParseContext {
public Version indexVersionCreated() {
return indexVersionCreated;
}
}

View File

@ -24,6 +24,7 @@ import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.SegmentReader;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.ESLogger;
@ -35,17 +36,13 @@ import org.elasticsearch.index.fielddata.AtomicFieldData;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/**
*/
@ -95,8 +92,8 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
this.closed = true;
}
public IndexFieldDataCache buildIndexFieldDataCache(IndexService indexService, Index index, MappedFieldType.Names fieldNames, FieldDataType fieldDataType) {
return new IndexFieldCache(logger, cache, indicesFieldDataCacheListener, indexService, index, fieldNames, fieldDataType);
public IndexFieldDataCache buildIndexFieldDataCache(IndexFieldDataCache.Listener listener, Index index, MappedFieldType.Names fieldNames, FieldDataType fieldDataType) {
return new IndexFieldCache(logger, cache, index, fieldNames, fieldDataType, indicesFieldDataCacheListener, listener);
}
public Cache<Key, Accountable> getCache() {
@ -111,7 +108,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
final Accountable value = notification.getValue();
for (IndexFieldDataCache.Listener listener : key.listeners) {
try {
listener.onUnload(indexCache.fieldNames, indexCache.fieldDataType, notification.wasEvicted(), value.ramBytesUsed());
listener.onRemoval(key.shardId, indexCache.fieldNames, indexCache.fieldDataType, notification.wasEvicted(), value.ramBytesUsed());
} catch (Throwable e) {
// load anyway since listeners should not throw exceptions
logger.error("Failed to call listener on field data cache unloading", e);
@ -133,96 +130,78 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
*/
static class IndexFieldCache implements IndexFieldDataCache, SegmentReader.CoreClosedListener, IndexReader.ReaderClosedListener {
private final ESLogger logger;
private final IndexService indexService;
final Index index;
final MappedFieldType.Names fieldNames;
final FieldDataType fieldDataType;
private final Cache<Key, Accountable> cache;
private final IndicesFieldDataCacheListener indicesFieldDataCacheListener;
private final Listener[] listeners;
IndexFieldCache(ESLogger logger,final Cache<Key, Accountable> cache, IndicesFieldDataCacheListener indicesFieldDataCacheListener, IndexService indexService, Index index, MappedFieldType.Names fieldNames, FieldDataType fieldDataType) {
IndexFieldCache(ESLogger logger,final Cache<Key, Accountable> cache, Index index, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Listener... listeners) {
this.logger = logger;
this.indexService = indexService;
this.listeners = listeners;
this.index = index;
this.fieldNames = fieldNames;
this.fieldDataType = fieldDataType;
this.cache = cache;
this.indicesFieldDataCacheListener = indicesFieldDataCacheListener;
assert indexService != null;
}
@Override
public <FD extends AtomicFieldData, IFD extends IndexFieldData<FD>> FD load(final LeafReaderContext context, final IFD indexFieldData) throws Exception {
final Key key = new Key(this, context.reader().getCoreCacheKey());
//noinspection unchecked
final Accountable accountable = cache.get(key, new Callable<AtomicFieldData>() {
@Override
public AtomicFieldData call() throws Exception {
context.reader().addCoreClosedListener(IndexFieldCache.this);
key.listeners.add(indicesFieldDataCacheListener);
final ShardId shardId = ShardUtils.extractShardId(context.reader());
if (shardId != null) {
final IndexShard shard = indexService.shard(shardId.id());
if (shard != null) {
key.listeners.add(shard.fieldData());
}
final Key key = new Key(this, context.reader().getCoreCacheKey(), shardId);
//noinspection unchecked
final Accountable accountable = cache.get(key, () -> {
context.reader().addCoreClosedListener(IndexFieldCache.this);
for (Listener listener : this.listeners) {
key.listeners.add(listener);
}
final AtomicFieldData fieldData = indexFieldData.loadDirect(context);
for (Listener listener : key.listeners) {
try {
listener.onLoad(fieldNames, fieldDataType, fieldData);
listener.onCache(shardId, fieldNames, fieldDataType, fieldData);
} catch (Throwable e) {
// load anyway since listeners should not throw exceptions
logger.error("Failed to call listener on atomic field data loading", e);
}
}
return fieldData;
}
});
return (FD) accountable;
}
@Override
public <FD extends AtomicFieldData, IFD extends IndexFieldData.Global<FD>> IFD load(final IndexReader indexReader, final IFD indexFieldData) throws Exception {
final Key key = new Key(this, indexReader.getCoreCacheKey());
//noinspection unchecked
final Accountable accountable = cache.get(key, new Callable<Accountable>() {
@Override
public Accountable call() throws Exception {
indexReader.addReaderClosedListener(IndexFieldCache.this);
key.listeners.add(indicesFieldDataCacheListener);
final ShardId shardId = ShardUtils.extractShardId(indexReader);
if (shardId != null) {
IndexShard shard = indexService.shard(shardId.id());
if (shard != null) {
key.listeners.add(shard.fieldData());
}
final Key key = new Key(this, indexReader.getCoreCacheKey(), shardId);
//noinspection unchecked
final Accountable accountable = cache.get(key, () -> {
indexReader.addReaderClosedListener(IndexFieldCache.this);
for (Listener listener : this.listeners) {
key.listeners.add(listener);
}
final Accountable ifd = (Accountable) indexFieldData.localGlobalDirect(indexReader);
for (Listener listener : key.listeners) {
try {
listener.onLoad(fieldNames, fieldDataType, ifd);
listener.onCache(shardId, fieldNames, fieldDataType, ifd);
} catch (Throwable e) {
// load anyway since listeners should not throw exceptions
logger.error("Failed to call listener on global ordinals loading", e);
}
}
return ifd;
}
});
return (IFD) accountable;
}
@Override
public void onClose(Object coreKey) {
cache.invalidate(new Key(this, coreKey));
cache.invalidate(new Key(this, coreKey, null));
// don't call cache.cleanUp here as it would have bad performance implications
}
@Override
public void onClose(IndexReader reader) {
cache.invalidate(new Key(this, reader.getCoreCacheKey()));
cache.invalidate(new Key(this, reader.getCoreCacheKey(), null));
// don't call cache.cleanUp here as it would have bad performance implications
}
@ -263,8 +242,8 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
}
@Override
public void clear(Object coreCacheKey) {
cache.invalidate(new Key(this, coreCacheKey));
public void clear(IndexReader indexReader) {
cache.invalidate(new Key(this, indexReader.getCoreCacheKey(), null));
// don't call cache.cleanUp here as it would have bad performance implications
}
}
@ -272,13 +251,14 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
public static class Key {
public final IndexFieldCache indexCache;
public final Object readerKey;
public final ShardId shardId;
public final List<IndexFieldDataCache.Listener> listeners = new ArrayList<>();
Key(IndexFieldCache indexCache, Object readerKey) {
Key(IndexFieldCache indexCache, Object readerKey, @Nullable ShardId shardId) {
this.indexCache = indexCache;
this.readerKey = readerKey;
this.shardId = shardId;
}
@Override

View File

@ -19,6 +19,7 @@
package org.elasticsearch.indices.fielddata.cache;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.common.inject.Inject;
@ -26,6 +27,7 @@ import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
/**
@ -44,11 +46,11 @@ public class IndicesFieldDataCacheListener implements IndexFieldDataCache.Listen
}
@Override
public void onLoad(MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable fieldData) {
public void onCache(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable fieldData) {
}
@Override
public void onUnload(MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
public void onRemoval(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
assert sizeInBytes >= 0 : "When reducing circuit breaker, it should be adjusted with a number higher or equal to 0 and not [" + sizeInBytes + "]";
circuitBreakerService.getBreaker(CircuitBreaker.FIELDDATA).addWithoutBreaking(-sizeInBytes);
}

View File

@ -34,14 +34,21 @@ import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.QueryWrapperFilter;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.join.BitSetProducer;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ESTestCase;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import static org.hamcrest.Matchers.equalTo;
@ -109,4 +116,91 @@ public class BitSetFilterCacheTests extends ESTestCase {
assertThat(cache.getLoadedFilters().size(), equalTo(0l));
}
public void testListener() throws IOException {
IndexWriter writer = new IndexWriter(
new RAMDirectory(),
new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(new LogByteSizeMergePolicy())
);
Document document = new Document();
document.add(new StringField("field", "value", Field.Store.NO));
writer.addDocument(document);
writer.commit();
final DirectoryReader writerReader = DirectoryReader.open(writer, false);
final IndexReader reader = randomBoolean() ? writerReader : ElasticsearchDirectoryReader.wrap(writerReader, new ShardId("test", 0));
final AtomicLong stats = new AtomicLong();
final AtomicInteger onCacheCalls = new AtomicInteger();
final AtomicInteger onRemoveCalls = new AtomicInteger();
final BitsetFilterCache cache = new BitsetFilterCache(new Index("test"), Settings.EMPTY);
cache.setListener(new BitsetFilterCache.Listener() {
@Override
public void onCache(ShardId shardId, Accountable accountable) {
onCacheCalls.incrementAndGet();
stats.addAndGet(accountable.ramBytesUsed());
if (writerReader != reader) {
assertNotNull(shardId);
assertEquals("test", shardId.index().name());
assertEquals(0, shardId.id());
} else {
assertNull(shardId);
}
}
@Override
public void onRemoval(ShardId shardId, Accountable accountable) {
onRemoveCalls.incrementAndGet();
stats.addAndGet(-accountable.ramBytesUsed());
if (writerReader != reader) {
assertNotNull(shardId);
assertEquals("test", shardId.index().name());
assertEquals(0, shardId.id());
} else {
assertNull(shardId);
}
}
});
BitSetProducer filter = cache.getBitSetProducer(new QueryWrapperFilter(new TermQuery(new Term("field", "value"))));
assertThat(matchCount(filter, reader), equalTo(1));
assertTrue(stats.get() > 0);
assertEquals(1, onCacheCalls.get());
assertEquals(0, onRemoveCalls.get());
IOUtils.close(reader, writer);
assertEquals(1, onRemoveCalls.get());
assertEquals(0, stats.get());
}
public void testSetListenerTwice() {
final BitsetFilterCache cache = new BitsetFilterCache(new Index("test"), Settings.EMPTY);
cache.setListener(new BitsetFilterCache.Listener() {
@Override
public void onCache(ShardId shardId, Accountable accountable) {
}
@Override
public void onRemoval(ShardId shardId, Accountable accountable) {
}
});
try {
cache.setListener(new BitsetFilterCache.Listener() {
@Override
public void onCache(ShardId shardId, Accountable accountable) {
}
@Override
public void onRemoval(ShardId shardId, Accountable accountable) {
}
});
fail("can't set it twice");
} catch (IllegalStateException ex) {
// all is well
}
}
}

View File

@ -19,8 +19,8 @@
package org.elasticsearch.index.fielddata;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.*;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.Strings;
@ -273,5 +273,4 @@ public abstract class AbstractFieldDataImplTestCase extends AbstractFieldDataTes
}
protected abstract void fillExtendedMvSet() throws Exception;
}

View File

@ -54,7 +54,6 @@ public abstract class AbstractFieldDataTestCase extends ESSingleNodeTestCase {
protected LeafReaderContext readerContext;
protected IndexReader topLevelReader;
protected IndicesFieldDataCache indicesFieldDataCache;
protected abstract FieldDataType getFieldDataType();
protected boolean hasDocValues() {
@ -109,11 +108,12 @@ public abstract class AbstractFieldDataTestCase extends ESSingleNodeTestCase {
writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(new LogByteSizeMergePolicy()));
}
protected LeafReaderContext refreshReader() throws Exception {
protected final LeafReaderContext refreshReader() throws Exception {
if (readerContext != null) {
readerContext.reader().close();
}
LeafReader reader = SlowCompositeReaderWrapper.wrap(topLevelReader = DirectoryReader.open(writer, true));
topLevelReader = DirectoryReader.open(writer, true);
LeafReader reader = SlowCompositeReaderWrapper.wrap(topLevelReader);
readerContext = reader.getContext();
return readerContext;
}
@ -150,8 +150,5 @@ public abstract class AbstractFieldDataTestCase extends ESSingleNodeTestCase {
}
previous = current;
}
}
}

View File

@ -26,11 +26,7 @@ import org.apache.lucene.document.Field;
import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.SortedSetDocValuesField;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.RandomAccessOrds;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.index.*;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.FilteredQuery;
import org.apache.lucene.search.IndexSearcher;

View File

@ -25,6 +25,9 @@ import org.apache.lucene.document.Field.Store;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.*;
import org.apache.lucene.store.RAMDirectory;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.common.lucene.index.ESDirectoryReaderTests;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.fielddata.plain.*;
import org.elasticsearch.index.mapper.ContentPath;
@ -33,12 +36,16 @@ import org.elasticsearch.index.mapper.Mapper.BuilderContext;
import org.elasticsearch.index.mapper.MapperBuilders;
import org.elasticsearch.index.mapper.core.*;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.test.ESSingleNodeTestCase;
import java.util.Arrays;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.Matchers.instanceOf;
@ -158,4 +165,77 @@ public class IndexFieldDataServiceTests extends ESSingleNodeTestCase {
writer.getDirectory().close();
}
public void testFieldDataCacheListener() throws Exception {
final IndexService indexService = createIndex("test");
IndexFieldDataService shardPrivateService = indexService.fieldData();
// copy the ifdService since we can set the listener only once.
final IndexFieldDataService ifdService = new IndexFieldDataService(shardPrivateService.index(), shardPrivateService.indexSettings(),
getInstanceFromNode(IndicesFieldDataCache.class), getInstanceFromNode(CircuitBreakerService.class), indexService.mapperService());
final BuilderContext ctx = new BuilderContext(indexService.settingsService().getSettings(), new ContentPath(1));
final MappedFieldType mapper1 = MapperBuilders.stringField("s").tokenized(false).docValues(true).fieldDataSettings(Settings.builder().put(FieldDataType.FORMAT_KEY, "paged_bytes").build()).build(ctx).fieldType();
final IndexWriter writer = new IndexWriter(new RAMDirectory(), new IndexWriterConfig(new KeywordAnalyzer()));
Document doc = new Document();
doc.add(new StringField("s", "thisisastring", Store.NO));
writer.addDocument(doc);
DirectoryReader open = DirectoryReader.open(writer, true);
final boolean wrap = randomBoolean();
final IndexReader reader = wrap ? ElasticsearchDirectoryReader.wrap(open, new ShardId("test", 1)) : open;
final AtomicInteger onCacheCalled = new AtomicInteger();
final AtomicInteger onRemovalCalled = new AtomicInteger();
ifdService.setListener(new IndexFieldDataCache.Listener() {
@Override
public void onCache(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage) {
if (wrap) {
assertEquals(new ShardId("test", 1), shardId);
} else {
assertNull(shardId);
}
onCacheCalled.incrementAndGet();
}
@Override
public void onRemoval(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
if (wrap) {
assertEquals(new ShardId("test", 1), shardId);
} else {
assertNull(shardId);
}
onRemovalCalled.incrementAndGet();
}
});
IndexFieldData<?> ifd = ifdService.getForField(mapper1);
LeafReaderContext leafReaderContext = reader.getContext().leaves().get(0);
AtomicFieldData load = ifd.load(leafReaderContext);
assertEquals(1, onCacheCalled.get());
assertEquals(0, onRemovalCalled.get());
reader.close();
load.close();
writer.close();
assertEquals(1, onCacheCalled.get());
assertEquals(1, onRemovalCalled.get());
ifdService.clear();
}
public void testSetCacheListenerTwice() {
final IndexService indexService = createIndex("test");
IndexFieldDataService shardPrivateService = indexService.fieldData();
try {
shardPrivateService.setListener(new IndexFieldDataCache.Listener() {
@Override
public void onCache(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, Accountable ramUsage) {
}
@Override
public void onRemoval(ShardId shardId, MappedFieldType.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes) {
}
});
fail("listener already set");
} catch (IllegalStateException ex) {
// all well
}
}
}

View File

@ -25,16 +25,7 @@ import org.apache.lucene.analysis.MockAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.StringField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.SlowCompositeReaderWrapper;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.index.*;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Filter;
import org.apache.lucene.search.IndexSearcher;

View File

@ -38,6 +38,7 @@ import org.elasticsearch.common.lucene.search.Queries;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.internal.TypeFieldMapper;
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.aggregations.Aggregator;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.BucketCollector;