Fix FieldDataWeighter generics to accept RamUsage instead of AtomicFieldData
The `FieldDataWeighter` allowed to use a concrete subclass of the caches generic type to be used that causes ClassCastException and also trips the CirciutBreaker to not be decremented appropriately. This was tripped by settings randomization also part of this commit. Closes #6260
This commit is contained in:
parent
03402c7ed8
commit
17d34d5c97
|
@ -26,6 +26,7 @@ import com.google.common.cache.RemovalNotification;
|
|||
import org.apache.lucene.index.AtomicReaderContext;
|
||||
import org.apache.lucene.index.IndexReader;
|
||||
import org.apache.lucene.index.SegmentReader;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.lucene.SegmentReaderUtils;
|
||||
import org.elasticsearch.index.fielddata.ordinals.GlobalOrdinalsIndexFieldData;
|
||||
import org.elasticsearch.index.mapper.FieldMapper;
|
||||
|
@ -76,9 +77,11 @@ public interface IndexFieldDataCache {
|
|||
private final FieldDataType fieldDataType;
|
||||
private final Cache<Key, RamUsage> cache;
|
||||
private final IndicesFieldDataCacheListener indicesFieldDataCacheListener;
|
||||
private final ESLogger logger;
|
||||
|
||||
protected FieldBased(IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, CacheBuilder cache, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
|
||||
protected FieldBased(ESLogger logger, IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, CacheBuilder cache, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
|
||||
assert indexService != null;
|
||||
this.logger = logger;
|
||||
this.indexService = indexService;
|
||||
this.fieldNames = fieldNames;
|
||||
this.fieldDataType = fieldDataType;
|
||||
|
@ -100,7 +103,11 @@ public interface IndexFieldDataCache {
|
|||
sizeInBytes = value.getMemorySizeInBytes();
|
||||
}
|
||||
for (Listener listener : key.listeners) {
|
||||
try {
|
||||
listener.onUnload(fieldNames, fieldDataType, notification.wasEvicted(), sizeInBytes);
|
||||
} catch (Throwable e) {
|
||||
logger.error("Failed to call listener on field data cache unloading", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -112,8 +119,7 @@ public interface IndexFieldDataCache {
|
|||
@Override
|
||||
public AtomicFieldData call() throws Exception {
|
||||
SegmentReaderUtils.registerCoreListener(context.reader(), FieldBased.this);
|
||||
AtomicFieldData fieldData = indexFieldData.loadDirect(context);
|
||||
key.sizeInBytes = fieldData.getMemorySizeInBytes();
|
||||
|
||||
key.listeners.add(indicesFieldDataCacheListener);
|
||||
final ShardId shardId = ShardUtils.extractShardId(context.reader());
|
||||
if (shardId != null) {
|
||||
|
@ -122,8 +128,15 @@ public interface IndexFieldDataCache {
|
|||
key.listeners.add(shard.fieldData());
|
||||
}
|
||||
}
|
||||
final AtomicFieldData fieldData = indexFieldData.loadDirect(context);
|
||||
key.sizeInBytes = fieldData.getMemorySizeInBytes();
|
||||
for (Listener listener : key.listeners) {
|
||||
try {
|
||||
listener.onLoad(fieldNames, fieldDataType, fieldData);
|
||||
} catch (Throwable e) {
|
||||
// load anyway since listeners should not throw exceptions
|
||||
logger.error("Failed to call listener on atomic field data loading", e);
|
||||
}
|
||||
}
|
||||
return fieldData;
|
||||
}
|
||||
|
@ -137,8 +150,7 @@ public interface IndexFieldDataCache {
|
|||
@Override
|
||||
public GlobalOrdinalsIndexFieldData call() throws Exception {
|
||||
indexReader.addReaderClosedListener(FieldBased.this);
|
||||
GlobalOrdinalsIndexFieldData ifd = (GlobalOrdinalsIndexFieldData) indexFieldData.localGlobalDirect(indexReader);
|
||||
key.sizeInBytes = ifd.getMemorySizeInBytes();
|
||||
|
||||
key.listeners.add(indicesFieldDataCacheListener);
|
||||
final ShardId shardId = ShardUtils.extractShardId(indexReader);
|
||||
if (shardId != null) {
|
||||
|
@ -147,8 +159,15 @@ public interface IndexFieldDataCache {
|
|||
key.listeners.add(shard.fieldData());
|
||||
}
|
||||
}
|
||||
GlobalOrdinalsIndexFieldData ifd = (GlobalOrdinalsIndexFieldData) indexFieldData.localGlobalDirect(indexReader);
|
||||
key.sizeInBytes = ifd.getMemorySizeInBytes();
|
||||
for (Listener listener : key.listeners) {
|
||||
try {
|
||||
listener.onLoad(fieldNames, fieldDataType, ifd);
|
||||
} catch (Throwable e) {
|
||||
// load anyway since listeners should not throw exceptions
|
||||
logger.error("Failed to call listener on global ordinals loading", e);
|
||||
}
|
||||
}
|
||||
|
||||
return ifd;
|
||||
|
@ -207,15 +226,15 @@ public interface IndexFieldDataCache {
|
|||
|
||||
static class Resident extends FieldBased {
|
||||
|
||||
public Resident(IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
|
||||
super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder(), indicesFieldDataCacheListener);
|
||||
public Resident(ESLogger logger, IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
|
||||
super(logger, indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder(), indicesFieldDataCacheListener);
|
||||
}
|
||||
}
|
||||
|
||||
static class Soft extends FieldBased {
|
||||
|
||||
public Soft(IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
|
||||
super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder().softValues(), indicesFieldDataCacheListener);
|
||||
public Soft(ESLogger logger, IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
|
||||
super(logger, indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder().softValues(), indicesFieldDataCacheListener);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -246,9 +246,9 @@ public class IndexFieldDataService extends AbstractIndexComponent {
|
|||
// this means changing the node level settings is simple, just set the bounds there
|
||||
String cacheType = type.getSettings().get("cache", indexSettings.get("index.fielddata.cache", "node"));
|
||||
if ("resident".equals(cacheType)) {
|
||||
cache = new IndexFieldDataCache.Resident(indexService, fieldNames, type, indicesFieldDataCacheListener);
|
||||
cache = new IndexFieldDataCache.Resident(logger, indexService, fieldNames, type, indicesFieldDataCacheListener);
|
||||
} else if ("soft".equals(cacheType)) {
|
||||
cache = new IndexFieldDataCache.Soft(indexService, fieldNames, type, indicesFieldDataCacheListener);
|
||||
cache = new IndexFieldDataCache.Soft(logger, indexService, fieldNames, type, indicesFieldDataCacheListener);
|
||||
} else if ("node".equals(cacheType)) {
|
||||
cache = indicesFieldDataCache.buildIndexFieldDataCache(indexService, index, fieldNames, type);
|
||||
} else {
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.lucene.index.IndexReader;
|
|||
import org.apache.lucene.index.SegmentReader;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.logging.ESLogger;
|
||||
import org.elasticsearch.common.lucene.SegmentReaderUtils;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
|
@ -76,7 +77,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
|||
}
|
||||
|
||||
public IndexFieldDataCache buildIndexFieldDataCache(IndexService indexService, Index index, FieldMapper.Names fieldNames, FieldDataType fieldDataType) {
|
||||
return new IndexFieldCache(cache, indicesFieldDataCacheListener, indexService, index, fieldNames, fieldDataType);
|
||||
return new IndexFieldCache(logger, cache, indicesFieldDataCacheListener, indexService, index, fieldNames, fieldDataType);
|
||||
}
|
||||
|
||||
public Cache<Key, RamUsage> getCache() {
|
||||
|
@ -95,15 +96,20 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
|||
sizeInBytes = value.getMemorySizeInBytes();
|
||||
}
|
||||
for (IndexFieldDataCache.Listener listener : key.listeners) {
|
||||
try {
|
||||
listener.onUnload(indexCache.fieldNames, indexCache.fieldDataType, notification.wasEvicted(), sizeInBytes);
|
||||
} catch (Throwable e) {
|
||||
// load anyway since listeners should not throw exceptions
|
||||
logger.error("Failed to call listener on field data cache unloading", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class FieldDataWeigher implements Weigher<Key, AtomicFieldData> {
|
||||
public static class FieldDataWeigher implements Weigher<Key, RamUsage> {
|
||||
|
||||
@Override
|
||||
public int weigh(Key key, AtomicFieldData fieldData) {
|
||||
int weight = (int) Math.min(fieldData.getMemorySizeInBytes(), Integer.MAX_VALUE);
|
||||
public int weigh(Key key, RamUsage ramUsage) {
|
||||
int weight = (int) Math.min(ramUsage.getMemorySizeInBytes(), Integer.MAX_VALUE);
|
||||
return weight == 0 ? 1 : weight;
|
||||
}
|
||||
}
|
||||
|
@ -112,14 +118,16 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
|||
* A specific cache instance for the relevant parameters of it (index, fieldNames, fieldType).
|
||||
*/
|
||||
static class IndexFieldCache implements IndexFieldDataCache, SegmentReader.CoreClosedListener, IndexReader.ReaderClosedListener {
|
||||
|
||||
private final ESLogger logger;
|
||||
private final IndexService indexService;
|
||||
final Index index;
|
||||
final FieldMapper.Names fieldNames;
|
||||
final FieldDataType fieldDataType;
|
||||
private final Cache<Key, RamUsage> cache;
|
||||
private final IndicesFieldDataCacheListener indicesFieldDataCacheListener;
|
||||
|
||||
IndexFieldCache(final Cache<Key, RamUsage> cache, IndicesFieldDataCacheListener indicesFieldDataCacheListener, IndexService indexService, Index index, FieldMapper.Names fieldNames, FieldDataType fieldDataType) {
|
||||
IndexFieldCache(ESLogger logger,final Cache<Key, RamUsage> cache, IndicesFieldDataCacheListener indicesFieldDataCacheListener, IndexService indexService, Index index, FieldMapper.Names fieldNames, FieldDataType fieldDataType) {
|
||||
this.logger = logger;
|
||||
this.indexService = indexService;
|
||||
this.index = index;
|
||||
this.fieldNames = fieldNames;
|
||||
|
@ -129,8 +137,6 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
|||
assert indexService != null;
|
||||
}
|
||||
|
||||
private final IndicesFieldDataCacheListener indicesFieldDataCacheListener;
|
||||
|
||||
@Override
|
||||
public <FD extends AtomicFieldData, IFD extends IndexFieldData<FD>> FD load(final AtomicReaderContext context, final IFD indexFieldData) throws Exception {
|
||||
final Key key = new Key(this, context.reader().getCoreCacheKey());
|
||||
|
@ -139,8 +145,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
|||
@Override
|
||||
public AtomicFieldData call() throws Exception {
|
||||
SegmentReaderUtils.registerCoreListener(context.reader(), IndexFieldCache.this);
|
||||
AtomicFieldData fieldData = indexFieldData.loadDirect(context);
|
||||
key.sizeInBytes = fieldData.getMemorySizeInBytes();
|
||||
|
||||
key.listeners.add(indicesFieldDataCacheListener);
|
||||
final ShardId shardId = ShardUtils.extractShardId(context.reader());
|
||||
if (shardId != null) {
|
||||
|
@ -149,9 +154,16 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
|||
key.listeners.add(shard.fieldData());
|
||||
}
|
||||
}
|
||||
final AtomicFieldData fieldData = indexFieldData.loadDirect(context);
|
||||
for (Listener listener : key.listeners) {
|
||||
try {
|
||||
listener.onLoad(fieldNames, fieldDataType, fieldData);
|
||||
} catch (Throwable e) {
|
||||
// load anyway since listeners should not throw exceptions
|
||||
logger.error("Failed to call listener on atomic field data loading", e);
|
||||
}
|
||||
}
|
||||
key.sizeInBytes = fieldData.getMemorySizeInBytes();
|
||||
return fieldData;
|
||||
}
|
||||
});
|
||||
|
@ -159,12 +171,12 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
|||
|
||||
public <IFD extends IndexFieldData.WithOrdinals<?>> IFD load(final IndexReader indexReader, final IFD indexFieldData) throws Exception {
|
||||
final Key key = new Key(this, indexReader.getCoreCacheKey());
|
||||
|
||||
//noinspection unchecked
|
||||
return (IFD) cache.get(key, new Callable<RamUsage>() {
|
||||
@Override
|
||||
public RamUsage call() throws Exception {
|
||||
indexReader.addReaderClosedListener(IndexFieldCache.this);
|
||||
GlobalOrdinalsIndexFieldData ifd = (GlobalOrdinalsIndexFieldData) indexFieldData.localGlobalDirect(indexReader);
|
||||
key.listeners.add(indicesFieldDataCacheListener);
|
||||
final ShardId shardId = ShardUtils.extractShardId(indexReader);
|
||||
if (shardId != null) {
|
||||
|
@ -173,8 +185,14 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
|
|||
key.listeners.add(shard.fieldData());
|
||||
}
|
||||
}
|
||||
final GlobalOrdinalsIndexFieldData ifd = (GlobalOrdinalsIndexFieldData) indexFieldData.localGlobalDirect(indexReader);
|
||||
for (Listener listener : key.listeners) {
|
||||
try {
|
||||
listener.onLoad(fieldNames, fieldDataType, ifd);
|
||||
} catch (Throwable e) {
|
||||
// load anyway since listeners should not throw exceptions
|
||||
logger.error("Failed to call listener on global ordinals loading", e);
|
||||
}
|
||||
}
|
||||
return ifd;
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.elasticsearch.action.search.type.ParsedScrollId;
|
|||
import org.elasticsearch.common.settings.ImmutableSettings;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
import org.elasticsearch.test.ElasticsearchIntegrationTest;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.elasticsearch.common.settings.ImmutableSettings.Builder;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.transport.InetSocketTransportAddress;
|
||||
import org.elasticsearch.common.transport.TransportAddress;
|
||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.util.BigArraysModule;
|
||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||
|
@ -310,6 +311,15 @@ public final class TestCluster extends ImmutableTestCluster {
|
|||
} else {
|
||||
builder.put(EsExecutors.PROCESSORS, AbstractRandomizedTest.TESTS_PROCESSORS);
|
||||
}
|
||||
|
||||
if (random.nextBoolean()) {
|
||||
if (random.nextBoolean()) {
|
||||
builder.put("indices.fielddata.cache.size", 1 + random.nextInt(1000), ByteSizeUnit.MB);
|
||||
}
|
||||
if (random.nextBoolean()) {
|
||||
builder.put("indices.fielddata.cache.expire", TimeValue.timeValueMillis(1 + random.nextInt(10000)));
|
||||
}
|
||||
}
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue