Added an indices level field data cache listener that always gets invoked and updates indices statistics and services about field data loading and unloading.

Moved the circuit breaker memory reducing logic to the IndicesFieldDataCacheListener, so it always reduces the memory used when field data gets unloaded,
this fixes a issue where the circuit breaker didn't get reduced when segments where no shardId could be resolved get cleared up.

Also made sure that exceptions in the percolator service are bubbled up properly.

Closes #5588
This commit is contained in:
Martijn van Groningen 2014-04-01 16:18:56 +07:00
parent 42b20d601f
commit b7451533c8
8 changed files with 130 additions and 49 deletions

View File

@ -32,7 +32,10 @@ import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
/**
@ -70,11 +73,13 @@ public interface IndexFieldDataCache {
private final FieldMapper.Names fieldNames;
private final FieldDataType fieldDataType;
private final Cache<Key, AtomicFieldData> cache;
private final IndicesFieldDataCacheListener indicesFieldDataCacheListener;
protected FieldBased(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, CacheBuilder cache) {
protected FieldBased(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, CacheBuilder cache, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
this.indexService = indexService;
this.fieldNames = fieldNames;
this.fieldDataType = fieldDataType;
this.indicesFieldDataCacheListener = indicesFieldDataCacheListener;
cache.removalListener(this);
//noinspection unchecked
this.cache = cache.build();
@ -83,15 +88,16 @@ public interface IndexFieldDataCache {
@Override
public void onRemoval(RemovalNotification<Key, AtomicFieldData> notification) {
Key key = notification.getKey();
if (key == null || key.listener == null) {
return; // we can't do anything here...
}
assert key != null && key.listeners != null;
AtomicFieldData value = notification.getValue();
long sizeInBytes = key.sizeInBytes;
if (sizeInBytes == -1 && value != null) {
sizeInBytes = value.getMemorySizeInBytes();
}
key.listener.onUnload(fieldNames, fieldDataType, notification.wasEvicted(), sizeInBytes, value);
for (Listener listener : key.listeners) {
listener.onUnload(fieldNames, fieldDataType, notification.wasEvicted(), sizeInBytes, value);
}
}
@Override
@ -104,21 +110,20 @@ public interface IndexFieldDataCache {
SegmentReaderUtils.registerCoreListener(context.reader(), FieldBased.this);
AtomicFieldData fieldData = indexFieldData.loadDirect(context);
key.sizeInBytes = fieldData.getMemorySizeInBytes();
key.listeners.add(indicesFieldDataCacheListener);
if (indexService != null) {
ShardId shardId = ShardUtils.extractShardId(context.reader());
if (shardId != null) {
IndexShard shard = indexService.shard(shardId.id());
if (shard != null) {
key.listener = shard.fieldData();
key.listeners.add(shard.fieldData());
}
}
}
if (key.listener != null) {
key.listener.onLoad(fieldNames, fieldDataType, fieldData);
for (Listener listener : key.listeners) {
listener.onLoad(fieldNames, fieldDataType, fieldData);
}
return fieldData;
}
});
@ -146,8 +151,7 @@ public interface IndexFieldDataCache {
static class Key {
final Object readerKey;
@Nullable
Listener listener; // optional stats listener
final List<Listener> listeners = new ArrayList<>(); // optional stats listener
long sizeInBytes = -1; // optional size in bytes (we keep it here in case the values are soft references)
Key(Object readerKey) {
@ -171,15 +175,15 @@ public interface IndexFieldDataCache {
static class Resident extends FieldBased {
public Resident(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType) {
super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder());
public Resident(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder(), indicesFieldDataCacheListener);
}
}
static class Soft extends FieldBased {
public Soft(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType) {
super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder().softValues());
public Soft(@Nullable IndexService indexService, FieldMapper.Names fieldNames, FieldDataType fieldDataType, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
super(indexService, fieldNames, fieldDataType, CacheBuilder.newBuilder().softValues(), indicesFieldDataCacheListener);
}
}
}

View File

@ -37,6 +37,7 @@ import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
@ -58,6 +59,7 @@ public class IndexFieldDataService extends AbstractIndexComponent {
private final static ImmutableMap<String, IndexFieldData.Builder> docValuesBuildersByType;
private final static ImmutableMap<Tuple<String, String>, IndexFieldData.Builder> buildersByTypeAndFormat;
private final CircuitBreakerService circuitBreakerService;
private final IndicesFieldDataCacheListener indicesFieldDataCacheListener;
static {
buildersByType = MapBuilder.<String, IndexFieldData.Builder>newMapBuilder()
@ -129,15 +131,16 @@ public class IndexFieldDataService extends AbstractIndexComponent {
// public for testing
public IndexFieldDataService(Index index, CircuitBreakerService circuitBreakerService) {
this(index, ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCache(ImmutableSettings.Builder.EMPTY_SETTINGS), circuitBreakerService);
this(index, ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCache(ImmutableSettings.Builder.EMPTY_SETTINGS, new IndicesFieldDataCacheListener(circuitBreakerService)), circuitBreakerService, new IndicesFieldDataCacheListener(circuitBreakerService));
}
@Inject
public IndexFieldDataService(Index index, @IndexSettings Settings indexSettings, IndicesFieldDataCache indicesFieldDataCache,
CircuitBreakerService circuitBreakerService) {
CircuitBreakerService circuitBreakerService, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
super(index, indexSettings);
this.indicesFieldDataCache = indicesFieldDataCache;
this.circuitBreakerService = circuitBreakerService;
this.indicesFieldDataCacheListener = indicesFieldDataCacheListener;
}
// we need to "inject" the index service to not create cyclic dep
@ -227,9 +230,9 @@ public class IndexFieldDataService extends AbstractIndexComponent {
// this means changing the node level settings is simple, just set the bounds there
String cacheType = type.getSettings().get("cache", indexSettings.get("index.fielddata.cache", "node"));
if ("resident".equals(cacheType)) {
cache = new IndexFieldDataCache.Resident(indexService, fieldNames, type);
cache = new IndexFieldDataCache.Resident(indexService, fieldNames, type, indicesFieldDataCacheListener);
} else if ("soft".equals(cacheType)) {
cache = new IndexFieldDataCache.Soft(indexService, fieldNames, type);
cache = new IndexFieldDataCache.Soft(indexService, fieldNames, type, indicesFieldDataCacheListener);
} else if ("node".equals(cacheType)) {
cache = indicesFieldDataCache.buildIndexFieldDataCache(indexService, index, fieldNames, type);
} else {

View File

@ -31,7 +31,6 @@ import org.elasticsearch.index.mapper.internal.ParentFieldMapper;
import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@ -45,12 +44,9 @@ public class ShardFieldData extends AbstractIndexShardComponent implements Index
final ConcurrentMap<String, CounterMetric> perFieldTotals = ConcurrentCollections.newConcurrentMap();
private final CircuitBreakerService breakerService;
@Inject
public ShardFieldData(ShardId shardId, @IndexSettings Settings indexSettings, CircuitBreakerService breakerService) {
public ShardFieldData(ShardId shardId, @IndexSettings Settings indexSettings) {
super(shardId, indexSettings);
this.breakerService = breakerService;
}
public FieldDataStats stats(String... fields) {
@ -101,10 +97,6 @@ public class ShardFieldData extends AbstractIndexShardComponent implements Index
evictionsMetric.inc();
}
if (sizeInBytes != -1) {
// Since field data is being unloaded (due to expiration or manual
// clearing), we also need to decrement the used bytes in the breaker
breakerService.getBreaker().addWithoutBreaking(-sizeInBytes);
totalMetric.dec(sizeInBytes);
String keyFieldName = fieldNames.indexName();

View File

@ -31,6 +31,7 @@ import org.elasticsearch.indices.cache.filter.terms.IndicesTermsFilterCache;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
import org.elasticsearch.indices.fielddata.breaker.InternalCircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCacheListener;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.memory.IndexingMemoryController;
import org.elasticsearch.indices.query.IndicesQueriesModule;
@ -81,5 +82,6 @@ public class IndicesModule extends AbstractModule implements SpawnModules {
bind(UpdateHelper.class).asEagerSingleton();
bind(CircuitBreakerService.class).to(InternalCircuitBreakerService.class).asEagerSingleton();
bind(IndicesFieldDataCacheListener.class).asEagerSingleton();
}
}

View File

@ -40,6 +40,8 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.index.shard.service.IndexShard;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
@ -47,6 +49,8 @@ import java.util.concurrent.TimeUnit;
*/
public class IndicesFieldDataCache extends AbstractComponent implements RemovalListener<IndicesFieldDataCache.Key, AtomicFieldData> {
private final IndicesFieldDataCacheListener indicesFieldDataCacheListener;
Cache<Key, AtomicFieldData> cache;
private volatile String size;
@ -55,8 +59,9 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
@Inject
public IndicesFieldDataCache(Settings settings) {
public IndicesFieldDataCache(Settings settings, IndicesFieldDataCacheListener indicesFieldDataCacheListener) {
super(settings);
this.indicesFieldDataCacheListener = indicesFieldDataCacheListener;
this.size = componentSettings.get("size", "-1");
this.sizeInBytes = componentSettings.getAsMemory("size", "-1").bytes();
this.expire = componentSettings.getAsTime("expire", null);
@ -89,16 +94,17 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
@Override
public void onRemoval(RemovalNotification<Key, AtomicFieldData> notification) {
Key key = notification.getKey();
if (key == null || key.listener == null) {
return; // nothing to do here really...
}
assert key != null && key.listeners != null;
IndexFieldCache indexCache = key.indexCache;
long sizeInBytes = key.sizeInBytes;
AtomicFieldData value = notification.getValue();
if (sizeInBytes == -1 && value != null) {
sizeInBytes = value.getMemorySizeInBytes();
}
key.listener.onUnload(indexCache.fieldNames, indexCache.fieldDataType, notification.wasEvicted(), sizeInBytes, value);
for (IndexFieldDataCache.Listener listener : key.listeners) {
listener.onUnload(indexCache.fieldNames, indexCache.fieldDataType, notification.wasEvicted(), sizeInBytes, value);
}
}
public static class FieldDataWeigher implements Weigher<Key, AtomicFieldData> {
@ -137,21 +143,20 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
public AtomicFieldData call() throws Exception {
SegmentReaderUtils.registerCoreListener(context.reader(), IndexFieldCache.this);
AtomicFieldData fieldData = indexFieldData.loadDirect(context);
key.listeners.add(indicesFieldDataCacheListener);
if (indexService != null) {
ShardId shardId = ShardUtils.extractShardId(context.reader());
if (shardId != null) {
IndexShard shard = indexService.shard(shardId.id());
if (shard != null) {
key.listener = shard.fieldData();
key.listeners.add(shard.fieldData());
}
}
}
if (key.listener != null) {
key.listener.onLoad(fieldNames, fieldDataType, fieldData);
for (Listener listener : key.listeners) {
listener.onLoad(fieldNames, fieldDataType, fieldData);
}
return fieldData;
}
});
@ -192,8 +197,7 @@ public class IndicesFieldDataCache extends AbstractComponent implements RemovalL
public final IndexFieldCache indexCache;
public final Object readerKey;
@Nullable
public IndexFieldDataCache.Listener listener; // optional stats listener
public final List<IndexFieldDataCache.Listener> listeners = new ArrayList<>(); // optional stats listener
long sizeInBytes = -1; // optional size in bytes (we keep it here in case the values are soft references)

View File

@ -0,0 +1,55 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.fielddata.cache;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.fielddata.AtomicFieldData;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.indices.fielddata.breaker.CircuitBreakerService;
/**
* A {@link IndexFieldDataCache.Listener} implementation that updates indices (node) level statistics / service about
* field data entries being loaded and unloaded.
*
* Currently it only decrements the memory used in the {@link CircuitBreakerService}.
*/
public class IndicesFieldDataCacheListener implements IndexFieldDataCache.Listener {
private final CircuitBreakerService circuitBreakerService;
@Inject
public IndicesFieldDataCacheListener(CircuitBreakerService circuitBreakerService) {
this.circuitBreakerService = circuitBreakerService;
}
@Override
public void onLoad(FieldMapper.Names fieldNames, FieldDataType fieldDataType, AtomicFieldData fieldData) {
}
@Override
public void onUnload(FieldMapper.Names fieldNames, FieldDataType fieldDataType, boolean wasEvicted, long sizeInBytes, @Nullable AtomicFieldData fieldData) {
assert sizeInBytes > 0 : "When reducing circuit breaker, it should be adjusted with a positive number and not [" + sizeInBytes + "]";
circuitBreakerService.getBreaker().addWithoutBreaking(-sizeInBytes);
}
}

View File

@ -70,10 +70,7 @@ import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.percolator.QueryCollector.Count;
import org.elasticsearch.percolator.QueryCollector.Match;
import org.elasticsearch.percolator.QueryCollector.MatchAndScore;
import org.elasticsearch.percolator.QueryCollector.MatchAndSort;
import org.elasticsearch.percolator.QueryCollector.*;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchParseElement;
import org.elasticsearch.search.SearchShardTarget;
@ -441,8 +438,9 @@ public class PercolatorService extends AbstractComponent {
collector.reset();
try {
context.docSearcher().search(entry.getValue(), collector);
} catch (IOException e) {
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
} catch (Throwable e) {
logger.debug("[" + entry.getKey() + "] failed to execute query", e);
throw new PercolateException(context.indexShard().shardId(), "failed to execute", e);
}
if (collector.exists()) {
@ -539,7 +537,8 @@ public class PercolatorService extends AbstractComponent {
try {
context.docSearcher().search(entry.getValue(), collector);
} catch (Throwable e) {
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
logger.debug("[" + entry.getKey() + "] failed to execute query", e);
throw new PercolateException(context.indexShard().shardId(), "failed to execute", e);
}
if (collector.exists()) {

View File

@ -217,7 +217,7 @@ public class PercolatorTests extends ElasticsearchIntegrationTest {
client().prepareIndex("test1", PercolatorService.TYPE_NAME)
.setSource(
XContentFactory.jsonBuilder().startObject().field("query",
constantScoreQuery(FilterBuilders.rangeFilter("field2").from("value").includeLower(true))
constantScoreQuery(FilterBuilders.rangeFilter("field2").from(1).to(5).includeLower(true).setExecution("fielddata"))
).endObject()
)
.execute().actionGet();
@ -230,6 +230,28 @@ public class PercolatorTests extends ElasticsearchIntegrationTest {
assertThat(convertFromTextArray(response.getMatches(), "test"), arrayContaining("test1"));
}
@Test
public void testRangeFilterThatUsesFD() throws Exception {
client().admin().indices().prepareCreate("test")
.addMapping("type1", "field1", "type=long")
.get();
client().prepareIndex("test", PercolatorService.TYPE_NAME, "1")
.setSource(
XContentFactory.jsonBuilder().startObject().field("query",
constantScoreQuery(FilterBuilders.rangeFilter("field1").from(1).to(5).setExecution("fielddata"))
).endObject()
).get();
PercolateResponse response = client().preparePercolate()
.setIndices("test").setDocumentType("type1")
.setPercolateDoc(PercolateSourceBuilder.docBuilder().setDoc("field1", 3)).get();
assertMatchCount(response, 1l);
assertThat(response.getMatches(), arrayWithSize(1));
assertThat(convertFromTextArray(response.getMatches(), "test"), arrayContaining("1"));
}
@Test
public void testPercolateQueriesWithRouting() throws Exception {
client().admin().indices().prepareCreate("test")