From 17d34d5c9728c27e3b9117eb6e177c56a90c879b Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 21 May 2014 16:34:37 +0200 Subject: [PATCH] Fix FieldDataWeighter generics to accept RamUsage instead of AtomicFieldData The `FieldDataWeighter` allowed to use a concrete subclass of the caches generic type to be used that causes ClassCastException and also trips the CirciutBreaker to not be decremented appropriately. This was tripped by settings randomization also part of this commit. Closes #6260 --- .../index/fielddata/IndexFieldDataCache.java | 43 ++++++++++++----- .../fielddata/IndexFieldDataService.java | 4 +- .../cache/IndicesFieldDataCache.java | 46 +++++++++++++------ .../search/scroll/SlowSearchScrollTests.java | 1 - .../org/elasticsearch/test/TestCluster.java | 10 ++++ 5 files changed, 75 insertions(+), 29 deletions(-) diff --git a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java index 5553cecb8be..dc0261dd4d9 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java +++ b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataCache.java @@ -26,6 +26,7 @@ import com.google.common.cache.RemovalNotification; import org.apache.lucene.index.AtomicReaderContext; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.SegmentReader; +import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.SegmentReaderUtils; import org.elasticsearch.index.fielddata.ordinals.GlobalOrdinalsIndexFieldData; import org.elasticsearch.index.mapper.FieldMapper; @@ -76,9 +77,11 @@ public interface IndexFieldDataCache { private final FieldDataType fieldDataType; private final Cache cache; private final IndicesFieldDataCacheListener indicesFieldDataCacheListener; + private final ESLogger logger; - protected FieldBased(IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, CacheBuilder cache, IndicesFieldDataCacheListener indicesFieldDataCacheListener) { + protected FieldBased(ESLogger logger, IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, CacheBuilder cache, IndicesFieldDataCacheListener indicesFieldDataCacheListener) { assert indexService != null; + this.logger = logger; this.indexService = indexService; this.fieldNames = fieldNames; this.fieldDataType = fieldDataType; @@ -100,7 +103,11 @@ public interface IndexFieldDataCache { sizeInBytes = value.getMemorySizeInBytes(); } for (Listener listener : key.listeners) { - listener.onUnload(fieldNames, fieldDataType, notification.wasEvicted(), sizeInBytes); + try { + listener.onUnload(fieldNames, fieldDataType, notification.wasEvicted(), sizeInBytes); + } catch (Throwable e) { + logger.error("Failed to call listener on field data cache unloading", e); + } } } @@ -112,8 +119,7 @@ public interface IndexFieldDataCache { @Override public AtomicFieldData call() throws Exception { SegmentReaderUtils.registerCoreListener(context.reader(), FieldBased.this); - AtomicFieldData fieldData = indexFieldData.loadDirect(context); - key.sizeInBytes = fieldData.getMemorySizeInBytes(); + key.listeners.add(indicesFieldDataCacheListener); final ShardId shardId = ShardUtils.extractShardId(context.reader()); if (shardId != null) { @@ -122,8 +128,15 @@ public interface IndexFieldDataCache { key.listeners.add(shard.fieldData()); } } + final AtomicFieldData fieldData = indexFieldData.loadDirect(context); + key.sizeInBytes = fieldData.getMemorySizeInBytes(); for (Listener listener : key.listeners) { - listener.onLoad(fieldNames, fieldDataType, fieldData); + try { + listener.onLoad(fieldNames, fieldDataType, fieldData); + } catch (Throwable e) { + // load anyway since listeners should not throw exceptions + logger.error("Failed to call listener on atomic field data loading", e); + } } return fieldData; } @@ -137,8 +150,7 @@ public interface IndexFieldDataCache { @Override public GlobalOrdinalsIndexFieldData call() throws Exception { indexReader.addReaderClosedListener(FieldBased.this); - GlobalOrdinalsIndexFieldData ifd = (GlobalOrdinalsIndexFieldData) indexFieldData.localGlobalDirect(indexReader); - key.sizeInBytes = ifd.getMemorySizeInBytes(); + key.listeners.add(indicesFieldDataCacheListener); final ShardId shardId = ShardUtils.extractShardId(indexReader); if (shardId != null) { @@ -147,8 +159,15 @@ public interface IndexFieldDataCache { key.listeners.add(shard.fieldData()); } } + GlobalOrdinalsIndexFieldData ifd = (GlobalOrdinalsIndexFieldData) indexFieldData.localGlobalDirect(indexReader); + key.sizeInBytes = ifd.getMemorySizeInBytes(); for (Listener listener : key.listeners) { - listener.onLoad(fieldNames, fieldDataType, ifd); + try { + listener.onLoad(fieldNames, fieldDataType, ifd); + } catch (Throwable e) { + // load anyway since listeners should not throw exceptions + logger.error("Failed to call listener on global ordinals loading", e); + } } return ifd; @@ -207,15 +226,15 @@ public interface IndexFieldDataCache { static class Resident extends FieldBased { - public Resident(IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndicesFieldDataCacheListener indicesFieldDataCacheListener) { - super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder(), indicesFieldDataCacheListener); + public Resident(ESLogger logger, IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndicesFieldDataCacheListener indicesFieldDataCacheListener) { + super(logger, indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder(), indicesFieldDataCacheListener); } } static class Soft extends FieldBased { - public Soft(IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndicesFieldDataCacheListener indicesFieldDataCacheListener) { - super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder().softValues(), indicesFieldDataCacheListener); + public Soft(ESLogger logger, IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndicesFieldDataCacheListener indicesFieldDataCacheListener) { + super(logger, indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder().softValues(), indicesFieldDataCacheListener); } } } diff --git a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java index 7c4dd8cae4f..af1bf2e0a48 100644 --- a/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java +++ b/src/main/java/org/elasticsearch/index/fielddata/IndexFieldDataService.java @@ -246,9 +246,9 @@ public class IndexFieldDataService extends AbstractIndexComponent { // this means changing the node level settings is simple, just set the bounds there String cacheType = type.getSettings().get("cache", indexSettings.get("index.fielddata.cache", "node")); if ("resident".equals(cacheType)) { - cache = new IndexFieldDataCache.Resident(indexService, fieldNames, type, indicesFieldDataCacheListener); + cache = new IndexFieldDataCache.Resident(logger, indexService, fieldNames, type, indicesFieldDataCacheListener); } else if ("soft".equals(cacheType)) { - cache = new IndexFieldDataCache.Soft(indexService, fieldNames, type, indicesFieldDataCacheListener); + cache = new IndexFieldDataCache.Soft(logger, indexService, fieldNames, type, indicesFieldDataCacheListener); } else if ("node".equals(cacheType)) { cache = indicesFieldDataCache.buildIndexFieldDataCache(indexService, index, fieldNames, type); } else { diff --git a/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java b/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java index de58896e78e..522e4c5d05e 100644 --- a/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java +++ b/src/main/java/org/elasticsearch/indices/fielddata/cache/IndicesFieldDataCache.java @@ -25,6 +25,7 @@ import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.SegmentReader; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.SegmentReaderUtils; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; @@ -76,7 +77,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL } public IndexFieldDataCache buildIndexFieldDataCache(IndexService indexService, Index index, FieldMapper.Names fieldNames, FieldDataType fieldDataType) { - return new IndexFieldCache(cache, indicesFieldDataCacheListener, indexService, index, fieldNames, fieldDataType); + return new IndexFieldCache(logger, cache, indicesFieldDataCacheListener, indexService, index, fieldNames, fieldDataType); } public Cache getCache() { @@ -95,15 +96,20 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL sizeInBytes = value.getMemorySizeInBytes(); } for (IndexFieldDataCache.Listener listener : key.listeners) { - listener.onUnload(indexCache.fieldNames, indexCache.fieldDataType, notification.wasEvicted(), sizeInBytes); + try { + listener.onUnload(indexCache.fieldNames, indexCache.fieldDataType, notification.wasEvicted(), sizeInBytes); + } catch (Throwable e) { + // load anyway since listeners should not throw exceptions + logger.error("Failed to call listener on field data cache unloading", e); + } } } - public static class FieldDataWeigher implements Weigher { + public static class FieldDataWeigher implements Weigher { @Override - public int weigh(Key key, AtomicFieldData fieldData) { - int weight = (int) Math.min(fieldData.getMemorySizeInBytes(), Integer.MAX_VALUE); + public int weigh(Key key, RamUsage ramUsage) { + int weight = (int) Math.min(ramUsage.getMemorySizeInBytes(), Integer.MAX_VALUE); return weight == 0 ? 1 : weight; } } @@ -112,14 +118,16 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL * A specific cache instance for the relevant parameters of it (index, fieldNames, fieldType). */ static class IndexFieldCache implements IndexFieldDataCache, SegmentReader.CoreClosedListener, IndexReader.ReaderClosedListener { - + private final ESLogger logger; private final IndexService indexService; final Index index; final FieldMapper.Names fieldNames; final FieldDataType fieldDataType; private final Cache cache; + private final IndicesFieldDataCacheListener indicesFieldDataCacheListener; - IndexFieldCache(final Cache cache, IndicesFieldDataCacheListener indicesFieldDataCacheListener, IndexService indexService, Index index, FieldMapper.Names fieldNames, FieldDataType fieldDataType) { + IndexFieldCache(ESLogger logger,final Cache cache, IndicesFieldDataCacheListener indicesFieldDataCacheListener, IndexService indexService, Index index, FieldMapper.Names fieldNames, FieldDataType fieldDataType) { + this.logger = logger; this.indexService = indexService; this.index = index; this.fieldNames = fieldNames; @@ -129,8 +137,6 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL assert indexService != null; } - private final IndicesFieldDataCacheListener indicesFieldDataCacheListener; - @Override public > FD load(final AtomicReaderContext context, final IFD indexFieldData) throws Exception { final Key key = new Key(this, context.reader().getCoreCacheKey()); @@ -139,8 +145,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL @Override public AtomicFieldData call() throws Exception { SegmentReaderUtils.registerCoreListener(context.reader(), IndexFieldCache.this); - AtomicFieldData fieldData = indexFieldData.loadDirect(context); - key.sizeInBytes = fieldData.getMemorySizeInBytes(); + key.listeners.add(indicesFieldDataCacheListener); final ShardId shardId = ShardUtils.extractShardId(context.reader()); if (shardId != null) { @@ -149,9 +154,16 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL key.listeners.add(shard.fieldData()); } } + final AtomicFieldData fieldData = indexFieldData.loadDirect(context); for (Listener listener : key.listeners) { - listener.onLoad(fieldNames, fieldDataType, fieldData); + try { + listener.onLoad(fieldNames, fieldDataType, fieldData); + } catch (Throwable e) { + // load anyway since listeners should not throw exceptions + logger.error("Failed to call listener on atomic field data loading", e); + } } + key.sizeInBytes = fieldData.getMemorySizeInBytes(); return fieldData; } }); @@ -159,12 +171,12 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL public > IFD load(final IndexReader indexReader, final IFD indexFieldData) throws Exception { final Key key = new Key(this, indexReader.getCoreCacheKey()); + //noinspection unchecked return (IFD) cache.get(key, new Callable() { @Override public RamUsage call() throws Exception { indexReader.addReaderClosedListener(IndexFieldCache.this); - GlobalOrdinalsIndexFieldData ifd = (GlobalOrdinalsIndexFieldData) indexFieldData.localGlobalDirect(indexReader); key.listeners.add(indicesFieldDataCacheListener); final ShardId shardId = ShardUtils.extractShardId(indexReader); if (shardId != null) { @@ -173,8 +185,14 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL key.listeners.add(shard.fieldData()); } } + final GlobalOrdinalsIndexFieldData ifd = (GlobalOrdinalsIndexFieldData) indexFieldData.localGlobalDirect(indexReader); for (Listener listener : key.listeners) { - listener.onLoad(fieldNames, fieldDataType, ifd); + try { + listener.onLoad(fieldNames, fieldDataType, ifd); + } catch (Throwable e) { + // load anyway since listeners should not throw exceptions + logger.error("Failed to call listener on global ordinals loading", e); + } } return ifd; } diff --git a/src/test/java/org/elasticsearch/search/scroll/SlowSearchScrollTests.java b/src/test/java/org/elasticsearch/search/scroll/SlowSearchScrollTests.java index 67ba3ac39e9..542838e12e2 100644 --- a/src/test/java/org/elasticsearch/search/scroll/SlowSearchScrollTests.java +++ b/src/test/java/org/elasticsearch/search/scroll/SlowSearchScrollTests.java @@ -25,7 +25,6 @@ import org.elasticsearch.action.search.type.ParsedScrollId; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.elasticsearch.test.ElasticsearchIntegrationTest; /** */ diff --git a/src/test/java/org/elasticsearch/test/TestCluster.java b/src/test/java/org/elasticsearch/test/TestCluster.java index 3b335c79f35..8c81b5b6b5c 100644 --- a/src/test/java/org/elasticsearch/test/TestCluster.java +++ b/src/test/java/org/elasticsearch/test/TestCluster.java @@ -52,6 +52,7 @@ import org.elasticsearch.common.settings.ImmutableSettings.Builder; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.transport.TransportAddress; +import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArraysModule; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -310,6 +311,15 @@ public final class TestCluster extends ImmutableTestCluster { } else { builder.put(EsExecutors.PROCESSORS, AbstractRandomizedTest.TESTS_PROCESSORS); } + + if (random.nextBoolean()) { + if (random.nextBoolean()) { + builder.put("indices.fielddata.cache.size", 1 + random.nextInt(1000), ByteSizeUnit.MB); + } + if (random.nextBoolean()) { + builder.put("indices.fielddata.cache.expire", TimeValue.timeValueMillis(1 + random.nextInt(10000))); + } + } return builder.build(); }