Make IndicesWarmer a private class of IndexService

There is no need for IndicesWarmer to be a global accessible class. All it needs
access to is inside IndexService. It also doesn't need to be mutable once it's not a per node
instance. This commit move IndicesWarmer to IndexWarmer and makes the default impls like field data and
norms warming an impl detail. Also the IndexShard doesn't depend on this class anymore, instead it accepts
an Engine.Warmer as a ctor argument which delegates to the actual warmer from the index.
This commit is contained in:
Simon Willnauer 2016-02-05 10:31:17 +01:00
parent 0c41a68690
commit b185209b63
17 changed files with 346 additions and 404 deletions

View File

@ -38,8 +38,8 @@ import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
import org.elasticsearch.index.store.FsDirectoryService; import org.elasticsearch.index.store.FsDirectoryService;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.Store;
import org.elasticsearch.index.IndexWarmer;
import org.elasticsearch.indices.cache.request.IndicesRequestCache; import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.search.SearchService;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -132,7 +132,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS_SETTING, PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS_SETTING,
FsDirectoryService.INDEX_LOCK_FACTOR_SETTING, FsDirectoryService.INDEX_LOCK_FACTOR_SETTING,
EngineConfig.INDEX_CODEC_SETTING, EngineConfig.INDEX_CODEC_SETTING,
SearchService.INDEX_NORMS_LOADING_SETTING, IndexWarmer.INDEX_NORMS_LOADING_SETTING,
// this sucks but we can't really validate all the analyzers/similarity in here // this sucks but we can't really validate all the analyzers/similarity in here
Setting.groupSetting("index.similarity.", false, Setting.Scope.INDEX), // this allows similarity settings to be passed Setting.groupSetting("index.similarity.", false, Setting.Scope.INDEX), // this allows similarity settings to be passed
Setting.groupSetting("index.analysis.", false, Setting.Scope.INDEX) // this allows analysis settings to be passed Setting.groupSetting("index.analysis.", false, Setting.Scope.INDEX) // this allows analysis settings to be passed

View File

@ -54,6 +54,7 @@ import org.elasticsearch.index.analysis.AnalysisService;
import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.IndexCache;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache; import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.cache.query.QueryCache; import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineClosedException; import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.EngineFactory; import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.fielddata.FieldDataType; import org.elasticsearch.index.fielddata.FieldDataType;
@ -102,6 +103,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
private final MapperService mapperService; private final MapperService mapperService;
private final SimilarityService similarityService; private final SimilarityService similarityService;
private final EngineFactory engineFactory; private final EngineFactory engineFactory;
private final IndexWarmer warmer;
private volatile Map<Integer, IndexShard> shards = emptyMap(); private volatile Map<Integer, IndexShard> shards = emptyMap();
private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean deleted = new AtomicBoolean(false); private final AtomicBoolean deleted = new AtomicBoolean(false);
@ -137,7 +139,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
this.nodeServicesProvider = nodeServicesProvider; this.nodeServicesProvider = nodeServicesProvider;
this.indexStore = indexStore; this.indexStore = indexStore;
indexFieldData.setListener(new FieldDataCacheListener(this)); indexFieldData.setListener(new FieldDataCacheListener(this));
this.bitsetFilterCache = new BitsetFilterCache(indexSettings, nodeServicesProvider.getWarmer(), new BitsetCacheListener(this)); this.bitsetFilterCache = new BitsetFilterCache(indexSettings, new BitsetCacheListener(this));
this.warmer = new IndexWarmer(indexSettings.getSettings(), nodeServicesProvider.getThreadPool(), bitsetFilterCache.createListener(nodeServicesProvider.getThreadPool()));
this.indexCache = new IndexCache(indexSettings, queryCache, bitsetFilterCache); this.indexCache = new IndexCache(indexSettings, queryCache, bitsetFilterCache);
this.engineFactory = engineFactory; this.engineFactory = engineFactory;
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE // initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
@ -312,11 +315,18 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
// if we are on a shared FS we only own the shard (ie. we can safely delete it) if we are the primary. // if we are on a shared FS we only own the shard (ie. we can safely delete it) if we are the primary.
final boolean canDeleteShardContent = IndexMetaData.isOnSharedFilesystem(indexSettings) == false || final boolean canDeleteShardContent = IndexMetaData.isOnSharedFilesystem(indexSettings) == false ||
(primary && IndexMetaData.isOnSharedFilesystem(indexSettings)); (primary && IndexMetaData.isOnSharedFilesystem(indexSettings));
final Engine.Warmer engineWarmer = (searcher, toLevel) -> {
IndexShard shard = getShardOrNull(shardId.getId());
if (shard != null) {
warmer.warm(searcher, shard, IndexService.this.indexSettings, toLevel);
}
};
store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> nodeServicesProvider.getIndicesQueryCache().onClose(shardId))); store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> nodeServicesProvider.getIndicesQueryCache().onClose(shardId)));
if (useShadowEngine(primary, indexSettings)) { if (useShadowEngine(primary, indexSettings)) {
indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, searchSlowLog); // no indexing listeners - shadow engines don't index indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, searchSlowLog, engineWarmer); // no indexing listeners - shadow engines don't index
} else { } else {
indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, searchSlowLog, listeners); indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, searchSlowLog, engineWarmer, listeners);
} }
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard); eventListener.afterIndexShardCreated(indexShard);

View File

@ -0,0 +1,292 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index;
import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.ObjectSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
*/
public final class IndexWarmer extends AbstractComponent {
public static final Setting<MappedFieldType.Loading> INDEX_NORMS_LOADING_SETTING = new Setting<>("index.norms.loading",
MappedFieldType.Loading.LAZY.toString(), (s) -> MappedFieldType.Loading.parse(s, MappedFieldType.Loading.LAZY),
false, Setting.Scope.INDEX);
private final List<Listener> listeners;
IndexWarmer(Settings settings, ThreadPool threadPool, Listener... listeners) {
super(settings);
ArrayList<Listener> list = new ArrayList<>();
final Executor executor = threadPool.executor(ThreadPool.Names.WARMER);
list.add(new NormsWarmer(executor));
list.add(new FieldDataWarmer(executor));
for (Listener listener : listeners) {
list.add(listener);
}
this.listeners = Collections.unmodifiableList(list);
}
void warm(Engine.Searcher searcher, IndexShard shard, IndexSettings settings, boolean isTopReader) {
if (shard.state() == IndexShardState.CLOSED) {
return;
}
if (settings.isWarmerEnabled() == false) {
return;
}
if (logger.isTraceEnabled()) {
if (isTopReader) {
logger.trace("{} top warming [{}]", shard.shardId(), searcher.reader());
} else {
logger.trace("{} warming [{}]", shard.shardId(), searcher.reader());
}
}
shard.warmerService().onPreWarm();
long time = System.nanoTime();
final List<TerminationHandle> terminationHandles = new ArrayList<>();
// get a handle on pending tasks
for (final Listener listener : listeners) {
if (isTopReader) {
terminationHandles.add(listener.warmTopReader(shard, searcher));
} else {
terminationHandles.add(listener.warmNewReaders(shard, searcher));
}
}
// wait for termination
for (TerminationHandle terminationHandle : terminationHandles) {
try {
terminationHandle.awaitTermination();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (isTopReader) {
logger.warn("top warming has been interrupted", e);
} else {
logger.warn("warming has been interrupted", e);
}
break;
}
}
long took = System.nanoTime() - time;
shard.warmerService().onPostWarm(took);
if (shard.warmerService().logger().isTraceEnabled()) {
if (isTopReader) {
shard.warmerService().logger().trace("top warming took [{}]", new TimeValue(took, TimeUnit.NANOSECONDS));
} else {
shard.warmerService().logger().trace("warming took [{}]", new TimeValue(took, TimeUnit.NANOSECONDS));
}
}
}
/** A handle on the execution of warm-up action. */
public interface TerminationHandle {
TerminationHandle NO_WAIT = () -> {};
/** Wait until execution of the warm-up action completes. */
void awaitTermination() throws InterruptedException;
}
public interface Listener {
/** Queue tasks to warm-up the given segments and return handles that allow to wait for termination of the
* execution of those tasks. */
TerminationHandle warmNewReaders(IndexShard indexShard, Engine.Searcher searcher);
TerminationHandle warmTopReader(IndexShard indexShard, Engine.Searcher searcher);
}
private static class NormsWarmer implements IndexWarmer.Listener {
private final Executor executor;
public NormsWarmer(Executor executor) {
this.executor = executor;
}
@Override
public TerminationHandle warmNewReaders(final IndexShard indexShard, final Engine.Searcher searcher) {
final MappedFieldType.Loading defaultLoading = indexShard.indexSettings().getValue(INDEX_NORMS_LOADING_SETTING);
final MapperService mapperService = indexShard.mapperService();
final ObjectSet<String> warmUp = new ObjectHashSet<>();
for (DocumentMapper docMapper : mapperService.docMappers(false)) {
for (FieldMapper fieldMapper : docMapper.mappers()) {
final String indexName = fieldMapper.fieldType().name();
MappedFieldType.Loading normsLoading = fieldMapper.fieldType().normsLoading();
if (normsLoading == null) {
normsLoading = defaultLoading;
}
if (fieldMapper.fieldType().indexOptions() != IndexOptions.NONE && !fieldMapper.fieldType().omitNorms()
&& normsLoading == MappedFieldType.Loading.EAGER) {
warmUp.add(indexName);
}
}
}
final CountDownLatch latch = new CountDownLatch(1);
// Norms loading may be I/O intensive but is not CPU intensive, so we execute it in a single task
executor.execute(() -> {
try {
for (ObjectCursor<String> stringObjectCursor : warmUp) {
final String indexName = stringObjectCursor.value;
final long start = System.nanoTime();
for (final LeafReaderContext ctx : searcher.reader().leaves()) {
final NumericDocValues values = ctx.reader().getNormValues(indexName);
if (values != null) {
values.get(0);
}
}
if (indexShard.warmerService().logger().isTraceEnabled()) {
indexShard.warmerService().logger().trace("warmed norms for [{}], took [{}]", indexName,
TimeValue.timeValueNanos(System.nanoTime() - start));
}
}
} catch (Throwable t) {
indexShard.warmerService().logger().warn("failed to warm-up norms", t);
} finally {
latch.countDown();
}
});
return () -> latch.await();
}
@Override
public TerminationHandle warmTopReader(IndexShard indexShard, final Engine.Searcher searcher) {
return TerminationHandle.NO_WAIT;
}
}
private static class FieldDataWarmer implements IndexWarmer.Listener {
private final Executor executor;
public FieldDataWarmer(Executor executor) {
this.executor = executor;
}
@Override
public TerminationHandle warmNewReaders(final IndexShard indexShard, final Engine.Searcher searcher) {
final MapperService mapperService = indexShard.mapperService();
final Map<String, MappedFieldType> warmUp = new HashMap<>();
for (DocumentMapper docMapper : mapperService.docMappers(false)) {
for (FieldMapper fieldMapper : docMapper.mappers()) {
final FieldDataType fieldDataType = fieldMapper.fieldType().fieldDataType();
final String indexName = fieldMapper.fieldType().name();
if (fieldDataType == null) {
continue;
}
if (fieldDataType.getLoading() == MappedFieldType.Loading.LAZY) {
continue;
}
if (warmUp.containsKey(indexName)) {
continue;
}
warmUp.put(indexName, fieldMapper.fieldType());
}
}
final IndexFieldDataService indexFieldDataService = indexShard.indexFieldDataService();
final CountDownLatch latch = new CountDownLatch(searcher.reader().leaves().size() * warmUp.size());
for (final LeafReaderContext ctx : searcher.reader().leaves()) {
for (final MappedFieldType fieldType : warmUp.values()) {
executor.execute(() -> {
try {
final long start = System.nanoTime();
indexFieldDataService.getForField(fieldType).load(ctx);
if (indexShard.warmerService().logger().isTraceEnabled()) {
indexShard.warmerService().logger().trace("warmed fielddata for [{}], took [{}]", fieldType.name(),
TimeValue.timeValueNanos(System.nanoTime() - start));
}
} catch (Throwable t) {
indexShard.warmerService().logger().warn("failed to warm-up fielddata for [{}]", t, fieldType.name());
} finally {
latch.countDown();
}
});
}
}
return () -> latch.await();
}
@Override
public TerminationHandle warmTopReader(final IndexShard indexShard, final Engine.Searcher searcher) {
final MapperService mapperService = indexShard.mapperService();
final Map<String, MappedFieldType> warmUpGlobalOrdinals = new HashMap<>();
for (DocumentMapper docMapper : mapperService.docMappers(false)) {
for (FieldMapper fieldMapper : docMapper.mappers()) {
final FieldDataType fieldDataType = fieldMapper.fieldType().fieldDataType();
final String indexName = fieldMapper.fieldType().name();
if (fieldDataType == null) {
continue;
}
if (fieldDataType.getLoading() != MappedFieldType.Loading.EAGER_GLOBAL_ORDINALS) {
continue;
}
if (warmUpGlobalOrdinals.containsKey(indexName)) {
continue;
}
warmUpGlobalOrdinals.put(indexName, fieldMapper.fieldType());
}
}
final IndexFieldDataService indexFieldDataService = indexShard.indexFieldDataService();
final CountDownLatch latch = new CountDownLatch(warmUpGlobalOrdinals.size());
for (final MappedFieldType fieldType : warmUpGlobalOrdinals.values()) {
executor.execute(() -> {
try {
final long start = System.nanoTime();
IndexFieldData.Global ifd = indexFieldDataService.getForField(fieldType);
ifd.loadGlobal(searcher.getDirectoryReader());
if (indexShard.warmerService().logger().isTraceEnabled()) {
indexShard.warmerService().logger().trace("warmed global ordinals for [{}], took [{}]", fieldType.name(),
TimeValue.timeValueNanos(System.nanoTime() - start));
}
} catch (Throwable t) {
indexShard.warmerService().logger().warn("failed to warm-up global ordinals for [{}]", t, fieldType.name());
} finally {
latch.countDown();
}
});
}
return () -> latch.await();
}
}
}

View File

@ -20,10 +20,8 @@
package org.elasticsearch.index; package org.elasticsearch.index;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.query.IndicesQueriesRegistry; import org.elasticsearch.indices.query.IndicesQueriesRegistry;
@ -39,7 +37,6 @@ public final class NodeServicesProvider {
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final IndicesQueryCache indicesQueryCache; private final IndicesQueryCache indicesQueryCache;
private final IndicesWarmer warmer;
private final BigArrays bigArrays; private final BigArrays bigArrays;
private final Client client; private final Client client;
private final IndicesQueriesRegistry indicesQueriesRegistry; private final IndicesQueriesRegistry indicesQueriesRegistry;
@ -47,10 +44,9 @@ public final class NodeServicesProvider {
private final CircuitBreakerService circuitBreakerService; private final CircuitBreakerService circuitBreakerService;
@Inject @Inject
public NodeServicesProvider(ThreadPool threadPool, IndicesQueryCache indicesQueryCache, @Nullable IndicesWarmer warmer, BigArrays bigArrays, Client client, ScriptService scriptService, IndicesQueriesRegistry indicesQueriesRegistry, CircuitBreakerService circuitBreakerService) { public NodeServicesProvider(ThreadPool threadPool, IndicesQueryCache indicesQueryCache, BigArrays bigArrays, Client client, ScriptService scriptService, IndicesQueriesRegistry indicesQueriesRegistry, CircuitBreakerService circuitBreakerService) {
this.threadPool = threadPool; this.threadPool = threadPool;
this.indicesQueryCache = indicesQueryCache; this.indicesQueryCache = indicesQueryCache;
this.warmer = warmer;
this.bigArrays = bigArrays; this.bigArrays = bigArrays;
this.client = client; this.client = client;
this.indicesQueriesRegistry = indicesQueriesRegistry; this.indicesQueriesRegistry = indicesQueriesRegistry;
@ -66,10 +62,6 @@ public final class NodeServicesProvider {
return indicesQueryCache; return indicesQueryCache;
} }
public IndicesWarmer getWarmer() {
return warmer;
}
public BigArrays getBigArrays() { return bigArrays; } public BigArrays getBigArrays() { return bigArrays; }
public Client getClient() { public Client getClient() {

View File

@ -48,8 +48,9 @@ import org.elasticsearch.index.mapper.object.ObjectMapper;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardUtils; import org.elasticsearch.index.shard.ShardUtils;
import org.elasticsearch.indices.IndicesWarmer; import org.elasticsearch.index.IndexWarmer;
import org.elasticsearch.indices.IndicesWarmer.TerminationHandle; import org.elasticsearch.index.IndexWarmer.TerminationHandle;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -74,22 +75,20 @@ public final class BitsetFilterCache extends AbstractIndexComponent implements L
private final boolean loadRandomAccessFiltersEagerly; private final boolean loadRandomAccessFiltersEagerly;
private final Cache<Object, Cache<Query, Value>> loadedFilters; private final Cache<Object, Cache<Query, Value>> loadedFilters;
private final Listener listener; private final Listener listener;
private final BitSetProducerWarmer warmer;
private final IndicesWarmer indicesWarmer;
public BitsetFilterCache(IndexSettings indexSettings, IndicesWarmer indicesWarmer, Listener listener) { public BitsetFilterCache(IndexSettings indexSettings, Listener listener) {
super(indexSettings); super(indexSettings);
if (listener == null) { if (listener == null) {
throw new IllegalArgumentException("listener must not be null"); throw new IllegalArgumentException("listener must not be null");
} }
this.loadRandomAccessFiltersEagerly = this.indexSettings.getValue(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING); this.loadRandomAccessFiltersEagerly = this.indexSettings.getValue(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING);
this.loadedFilters = CacheBuilder.<Object, Cache<Query, Value>>builder().removalListener(this).build(); this.loadedFilters = CacheBuilder.<Object, Cache<Query, Value>>builder().removalListener(this).build();
this.warmer = new BitSetProducerWarmer();
this.indicesWarmer = indicesWarmer;
indicesWarmer.addListener(warmer);
this.listener = listener; this.listener = listener;
} }
public IndexWarmer.Listener createListener(ThreadPool threadPool) {
return new BitSetProducerWarmer(threadPool);
}
public BitSetProducer getBitSetProducer(Query query) { public BitSetProducer getBitSetProducer(Query query) {
@ -103,11 +102,7 @@ public final class BitsetFilterCache extends AbstractIndexComponent implements L
@Override @Override
public void close() { public void close() {
try { clear("close");
indicesWarmer.removeListener(warmer);
} finally {
clear("close");
}
} }
public void clear(String reason) { public void clear(String reason) {
@ -210,10 +205,16 @@ public final class BitsetFilterCache extends AbstractIndexComponent implements L
} }
} }
final class BitSetProducerWarmer implements IndicesWarmer.Listener { final class BitSetProducerWarmer implements IndexWarmer.Listener {
private final Executor executor;
BitSetProducerWarmer(ThreadPool threadPool) {
this.executor = threadPool.executor(ThreadPool.Names.WARMER);
}
@Override @Override
public IndicesWarmer.TerminationHandle warmNewReaders(final IndexShard indexShard, final Engine.Searcher searcher) { public IndexWarmer.TerminationHandle warmNewReaders(final IndexShard indexShard, final Engine.Searcher searcher) {
if (indexSettings.getIndex().equals(indexShard.getIndexSettings().getIndex()) == false) { if (indexSettings.getIndex().equals(indexShard.getIndexSettings().getIndex()) == false) {
// this is from a different index // this is from a different index
return TerminationHandle.NO_WAIT; return TerminationHandle.NO_WAIT;
@ -244,7 +245,6 @@ public final class BitsetFilterCache extends AbstractIndexComponent implements L
warmUp.add(Queries.newNonNestedFilter()); warmUp.add(Queries.newNonNestedFilter());
} }
final Executor executor = indicesWarmer.getExecutor();
final CountDownLatch latch = new CountDownLatch(searcher.reader().leaves().size() * warmUp.size()); final CountDownLatch latch = new CountDownLatch(searcher.reader().leaves().size() * warmUp.size());
for (final LeafReaderContext ctx : searcher.reader().leaves()) { for (final LeafReaderContext ctx : searcher.reader().leaves()) {
for (final Query filterToWarm : warmUp) { for (final Query filterToWarm : warmUp) {
@ -277,7 +277,6 @@ public final class BitsetFilterCache extends AbstractIndexComponent implements L
return loadedFilters; return loadedFilters;
} }
/** /**
* A listener interface that is executed for each onCache / onRemoval event * A listener interface that is executed for each onCache / onRemoval event
*/ */
@ -295,6 +294,4 @@ public final class BitsetFilterCache extends AbstractIndexComponent implements L
*/ */
void onRemoval(ShardId shardId, Accountable accountable); void onRemoval(ShardId shardId, Accountable accountable);
} }
} }

View File

@ -105,7 +105,6 @@ import org.elasticsearch.index.translog.TranslogConfig;
import org.elasticsearch.index.translog.TranslogStats; import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.index.warmer.ShardIndexWarmerService; import org.elasticsearch.index.warmer.ShardIndexWarmerService;
import org.elasticsearch.index.warmer.WarmerStats; import org.elasticsearch.index.warmer.WarmerStats;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.indices.cache.query.IndicesQueryCache;
import org.elasticsearch.indices.IndexingMemoryController; import org.elasticsearch.indices.IndexingMemoryController;
import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryFailedException;
@ -151,7 +150,7 @@ public class IndexShard extends AbstractIndexShardComponent {
private final Object mutex = new Object(); private final Object mutex = new Object();
private final String checkIndexOnStartup; private final String checkIndexOnStartup;
private final CodecService codecService; private final CodecService codecService;
private final IndicesWarmer warmer; private final Engine.Warmer warmer;
private final SnapshotDeletionPolicy deletionPolicy; private final SnapshotDeletionPolicy deletionPolicy;
private final SimilarityService similarityService; private final SimilarityService similarityService;
private final EngineConfig engineConfig; private final EngineConfig engineConfig;
@ -208,12 +207,12 @@ public class IndexShard extends AbstractIndexShardComponent {
public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache, public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache,
MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService, MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService,
@Nullable EngineFactory engineFactory, @Nullable EngineFactory engineFactory,
IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider, SearchSlowLog slowLog, IndexingOperationListener... listeners) { IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider, SearchSlowLog slowLog, Engine.Warmer warmer, IndexingOperationListener... listeners) {
super(shardId, indexSettings); super(shardId, indexSettings);
final Settings settings = indexSettings.getSettings(); final Settings settings = indexSettings.getSettings();
this.idxSettings = indexSettings; this.idxSettings = indexSettings;
this.codecService = new CodecService(mapperService, logger); this.codecService = new CodecService(mapperService, logger);
this.warmer = provider.getWarmer(); this.warmer = warmer;
this.deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()); this.deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
this.similarityService = similarityService; this.similarityService = similarityService;
Objects.requireNonNull(store, "Store must be provided to the index shard"); Objects.requireNonNull(store, "Store must be provided to the index shard");
@ -1389,9 +1388,8 @@ public class IndexShard extends AbstractIndexShardComponent {
recoveryState.getTranslog().incrementRecoveredOperations(); recoveryState.getTranslog().incrementRecoveredOperations();
} }
}; };
final Engine.Warmer engineWarmer = (searcher, toLevel) -> warmer.warm(searcher, this, idxSettings, toLevel);
return new EngineConfig(shardId, return new EngineConfig(shardId,
threadPool, indexSettings, engineWarmer, store, deletionPolicy, indexSettings.getMergePolicy(), threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig, mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
idxSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME)); idxSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME));
} }

View File

@ -45,8 +45,8 @@ import java.io.IOException;
public final class ShadowIndexShard extends IndexShard { public final class ShadowIndexShard extends IndexShard {
public ShadowIndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache, MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService, @Nullable EngineFactory engineFactory, public ShadowIndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache, MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService, @Nullable EngineFactory engineFactory,
IndexEventListener indexEventListener, IndexSearcherWrapper wrapper, NodeServicesProvider provider, SearchSlowLog searchSlowLog) throws IOException { IndexEventListener indexEventListener, IndexSearcherWrapper wrapper, NodeServicesProvider provider, SearchSlowLog searchSlowLog, Engine.Warmer engineWarmer) throws IOException {
super(shardId, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory, indexEventListener, wrapper, provider, searchSlowLog); super(shardId, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory, indexEventListener, wrapper, provider, searchSlowLog, engineWarmer);
} }
/** /**

View File

@ -164,7 +164,6 @@ public class IndicesModule extends AbstractModule {
bind(IndicesRequestCache.class).asEagerSingleton(); bind(IndicesRequestCache.class).asEagerSingleton();
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton(); bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
bind(IndicesTTLService.class).asEagerSingleton(); bind(IndicesTTLService.class).asEagerSingleton();
bind(IndicesWarmer.class).asEagerSingleton();
bind(UpdateHelper.class).asEagerSingleton(); bind(UpdateHelper.class).asEagerSingleton();
bind(MetaDataIndexUpgradeService.class).asEagerSingleton(); bind(MetaDataIndexUpgradeService.class).asEagerSingleton();
bind(NodeServicesProvider.class).asEagerSingleton(); bind(NodeServicesProvider.class).asEagerSingleton();

View File

@ -1,131 +0,0 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
/**
*/
public final class IndicesWarmer extends AbstractComponent {
private final ThreadPool threadPool;
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<>();
@Inject
public IndicesWarmer(Settings settings, ThreadPool threadPool) {
super(settings);
this.threadPool = threadPool;
}
public void addListener(Listener listener) {
listeners.add(listener);
}
public void removeListener(Listener listener) {
listeners.remove(listener);
}
public void warm(Engine.Searcher searcher, IndexShard shard, IndexSettings settings, boolean isTopReader) {
if (shard.state() == IndexShardState.CLOSED) {
return;
}
if (settings.isWarmerEnabled() == false) {
return;
}
if (logger.isTraceEnabled()) {
if (isTopReader) {
logger.trace("{} top warming [{}]", shard.shardId(), searcher.reader());
} else {
logger.trace("{} warming [{}]", shard.shardId(), searcher.reader());
}
}
shard.warmerService().onPreWarm();
long time = System.nanoTime();
final List<TerminationHandle> terminationHandles = new ArrayList<>();
// get a handle on pending tasks
for (final Listener listener : listeners) {
if (isTopReader) {
terminationHandles.add(listener.warmTopReader(shard, searcher));
} else {
terminationHandles.add(listener.warmNewReaders(shard, searcher));
}
}
// wait for termination
for (TerminationHandle terminationHandle : terminationHandles) {
try {
terminationHandle.awaitTermination();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (isTopReader) {
logger.warn("top warming has been interrupted", e);
} else {
logger.warn("warming has been interrupted", e);
}
break;
}
}
long took = System.nanoTime() - time;
shard.warmerService().onPostWarm(took);
if (shard.warmerService().logger().isTraceEnabled()) {
if (isTopReader) {
shard.warmerService().logger().trace("top warming took [{}]", new TimeValue(took, TimeUnit.NANOSECONDS));
} else {
shard.warmerService().logger().trace("warming took [{}]", new TimeValue(took, TimeUnit.NANOSECONDS));
}
}
}
/**
* Returns an executor for async warmer tasks
*/
public Executor getExecutor() {
return threadPool.executor(ThreadPool.Names.WARMER);
}
/** A handle on the execution of warm-up action. */
public interface TerminationHandle {
TerminationHandle NO_WAIT = () -> {};
/** Wait until execution of the warm-up action completes. */
void awaitTermination() throws InterruptedException;
}
public interface Listener {
/** Queue tasks to warm-up the given segments and return handles that allow to wait for termination of the execution of those tasks. */
TerminationHandle warmNewReaders(IndexShard indexShard, Engine.Searcher searcher);
TerminationHandle warmTopReader(IndexShard indexShard, Engine.Searcher searcher);
}
}

View File

@ -20,13 +20,6 @@
package org.elasticsearch.search; package org.elasticsearch.search;
import com.carrotsearch.hppc.ObjectFloatHashMap; import com.carrotsearch.hppc.ObjectFloatHashMap;
import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.ObjectSet;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TopDocs;
import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.ExceptionsHelper;
@ -54,14 +47,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fielddata.FieldDataType;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.FieldMapper;
import org.elasticsearch.index.mapper.MappedFieldType;
import org.elasticsearch.index.mapper.MappedFieldType.Loading;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.query.QueryParseContext; import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.search.stats.ShardSearchStats; import org.elasticsearch.index.search.stats.ShardSearchStats;
@ -69,8 +54,6 @@ import org.elasticsearch.index.search.stats.StatsGroupsParseElement;
import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.IndicesWarmer.TerminationHandle;
import org.elasticsearch.indices.cache.request.IndicesRequestCache; import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptContext; import org.elasticsearch.script.ScriptContext;
@ -109,9 +92,7 @@ import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -124,7 +105,6 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
*/ */
public class SearchService extends AbstractLifecycleComponent<SearchService> implements IndexEventListener { public class SearchService extends AbstractLifecycleComponent<SearchService> implements IndexEventListener {
public static final Setting<Loading> INDEX_NORMS_LOADING_SETTING = new Setting<>("index.norms.loading", Loading.LAZY.toString(), (s) -> Loading.parse(s, Loading.LAZY), false, Setting.Scope.INDEX);
// we can have 5 minutes here, since we make sure to clean with search requests and when shard/index closes // we can have 5 minutes here, since we make sure to clean with search requests and when shard/index closes
public static final Setting<TimeValue> DEFAULT_KEEPALIVE_SETTING = Setting.positiveTimeSetting("search.default_keep_alive", timeValueMinutes(5), false, Setting.Scope.CLUSTER); public static final Setting<TimeValue> DEFAULT_KEEPALIVE_SETTING = Setting.positiveTimeSetting("search.default_keep_alive", timeValueMinutes(5), false, Setting.Scope.CLUSTER);
public static final Setting<TimeValue> KEEPALIVE_INTERVAL_SETTING = Setting.positiveTimeSetting("search.keep_alive_interval", timeValueMinutes(1), false, Setting.Scope.CLUSTER); public static final Setting<TimeValue> KEEPALIVE_INTERVAL_SETTING = Setting.positiveTimeSetting("search.keep_alive_interval", timeValueMinutes(1), false, Setting.Scope.CLUSTER);
@ -139,8 +119,6 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
private final IndicesService indicesService; private final IndicesService indicesService;
private final IndicesWarmer indicesWarmer;
private final ScriptService scriptService; private final ScriptService scriptService;
private final PageCacheRecycler pageCacheRecycler; private final PageCacheRecycler pageCacheRecycler;
@ -170,7 +148,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
private final ParseFieldMatcher parseFieldMatcher; private final ParseFieldMatcher parseFieldMatcher;
@Inject @Inject
public SearchService(Settings settings, ClusterSettings clusterSettings, ClusterService clusterService, IndicesService indicesService, IndicesWarmer indicesWarmer, ThreadPool threadPool, public SearchService(Settings settings, ClusterSettings clusterSettings, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase, ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase,
IndicesRequestCache indicesQueryCache) { IndicesRequestCache indicesQueryCache) {
super(settings); super(settings);
@ -178,7 +156,6 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
this.threadPool = threadPool; this.threadPool = threadPool;
this.clusterService = clusterService; this.clusterService = clusterService;
this.indicesService = indicesService; this.indicesService = indicesService;
this.indicesWarmer = indicesWarmer;
this.scriptService = scriptService; this.scriptService = scriptService;
this.pageCacheRecycler = pageCacheRecycler; this.pageCacheRecycler = pageCacheRecycler;
this.bigArrays = bigArrays; this.bigArrays = bigArrays;
@ -199,9 +176,6 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
this.keepAliveReaper = threadPool.scheduleWithFixedDelay(new Reaper(), keepAliveInterval); this.keepAliveReaper = threadPool.scheduleWithFixedDelay(new Reaper(), keepAliveInterval);
this.indicesWarmer.addListener(new NormsWarmer(indicesWarmer));
this.indicesWarmer.addListener(new FieldDataWarmer(indicesWarmer));
defaultSearchTimeout = DEFAULT_SEARCH_TIMEOUT_SETTING.get(settings); defaultSearchTimeout = DEFAULT_SEARCH_TIMEOUT_SETTING.get(settings);
clusterSettings.addSettingsUpdateConsumer(DEFAULT_SEARCH_TIMEOUT_SETTING, this::setDefaultSearchTimeout); clusterSettings.addSettingsUpdateConsumer(DEFAULT_SEARCH_TIMEOUT_SETTING, this::setDefaultSearchTimeout);
} }
@ -946,184 +920,6 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
return this.activeContexts.size(); return this.activeContexts.size();
} }
static class NormsWarmer implements IndicesWarmer.Listener {
private final IndicesWarmer indicesWarmer;
public NormsWarmer(IndicesWarmer indicesWarmer) {
this.indicesWarmer = indicesWarmer;
}
@Override
public TerminationHandle warmNewReaders(final IndexShard indexShard, final Engine.Searcher searcher) {
final Loading defaultLoading = indexShard.indexSettings().getValue(INDEX_NORMS_LOADING_SETTING);
final MapperService mapperService = indexShard.mapperService();
final ObjectSet<String> warmUp = new ObjectHashSet<>();
for (DocumentMapper docMapper : mapperService.docMappers(false)) {
for (FieldMapper fieldMapper : docMapper.mappers()) {
final String indexName = fieldMapper.fieldType().name();
Loading normsLoading = fieldMapper.fieldType().normsLoading();
if (normsLoading == null) {
normsLoading = defaultLoading;
}
if (fieldMapper.fieldType().indexOptions() != IndexOptions.NONE && !fieldMapper.fieldType().omitNorms() && normsLoading == Loading.EAGER) {
warmUp.add(indexName);
}
}
}
final CountDownLatch latch = new CountDownLatch(1);
// Norms loading may be I/O intensive but is not CPU intensive, so we execute it in a single task
indicesWarmer.getExecutor().execute(new Runnable() {
@Override
public void run() {
try {
for (ObjectCursor<String> stringObjectCursor : warmUp) {
final String indexName = stringObjectCursor.value;
final long start = System.nanoTime();
for (final LeafReaderContext ctx : searcher.reader().leaves()) {
final NumericDocValues values = ctx.reader().getNormValues(indexName);
if (values != null) {
values.get(0);
}
}
if (indexShard.warmerService().logger().isTraceEnabled()) {
indexShard.warmerService().logger().trace("warmed norms for [{}], took [{}]", indexName, TimeValue.timeValueNanos(System.nanoTime() - start));
}
}
} catch (Throwable t) {
indexShard.warmerService().logger().warn("failed to warm-up norms", t);
} finally {
latch.countDown();
}
}
});
return new TerminationHandle() {
@Override
public void awaitTermination() throws InterruptedException {
latch.await();
}
};
}
@Override
public TerminationHandle warmTopReader(IndexShard indexShard, final Engine.Searcher searcher) {
return TerminationHandle.NO_WAIT;
}
}
static class FieldDataWarmer implements IndicesWarmer.Listener {
private final IndicesWarmer indicesWarmer;
public FieldDataWarmer(IndicesWarmer indicesWarmer) {
this.indicesWarmer = indicesWarmer;
}
@Override
public TerminationHandle warmNewReaders(final IndexShard indexShard, final Engine.Searcher searcher) {
final MapperService mapperService = indexShard.mapperService();
final Map<String, MappedFieldType> warmUp = new HashMap<>();
for (DocumentMapper docMapper : mapperService.docMappers(false)) {
for (FieldMapper fieldMapper : docMapper.mappers()) {
final FieldDataType fieldDataType = fieldMapper.fieldType().fieldDataType();
final String indexName = fieldMapper.fieldType().name();
if (fieldDataType == null) {
continue;
}
if (fieldDataType.getLoading() == Loading.LAZY) {
continue;
}
if (warmUp.containsKey(indexName)) {
continue;
}
warmUp.put(indexName, fieldMapper.fieldType());
}
}
final IndexFieldDataService indexFieldDataService = indexShard.indexFieldDataService();
final Executor executor = indicesWarmer.getExecutor();
final CountDownLatch latch = new CountDownLatch(searcher.reader().leaves().size() * warmUp.size());
for (final LeafReaderContext ctx : searcher.reader().leaves()) {
for (final MappedFieldType fieldType : warmUp.values()) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
final long start = System.nanoTime();
indexFieldDataService.getForField(fieldType).load(ctx);
if (indexShard.warmerService().logger().isTraceEnabled()) {
indexShard.warmerService().logger().trace("warmed fielddata for [{}], took [{}]", fieldType.name(), TimeValue.timeValueNanos(System.nanoTime() - start));
}
} catch (Throwable t) {
indexShard.warmerService().logger().warn("failed to warm-up fielddata for [{}]", t, fieldType.name());
} finally {
latch.countDown();
}
}
});
}
}
return new TerminationHandle() {
@Override
public void awaitTermination() throws InterruptedException {
latch.await();
}
};
}
@Override
public TerminationHandle warmTopReader(final IndexShard indexShard, final Engine.Searcher searcher) {
final MapperService mapperService = indexShard.mapperService();
final Map<String, MappedFieldType> warmUpGlobalOrdinals = new HashMap<>();
for (DocumentMapper docMapper : mapperService.docMappers(false)) {
for (FieldMapper fieldMapper : docMapper.mappers()) {
final FieldDataType fieldDataType = fieldMapper.fieldType().fieldDataType();
final String indexName = fieldMapper.fieldType().name();
if (fieldDataType == null) {
continue;
}
if (fieldDataType.getLoading() != Loading.EAGER_GLOBAL_ORDINALS) {
continue;
}
if (warmUpGlobalOrdinals.containsKey(indexName)) {
continue;
}
warmUpGlobalOrdinals.put(indexName, fieldMapper.fieldType());
}
}
final IndexFieldDataService indexFieldDataService = indexShard.indexFieldDataService();
final Executor executor = indicesWarmer.getExecutor();
final CountDownLatch latch = new CountDownLatch(warmUpGlobalOrdinals.size());
for (final MappedFieldType fieldType : warmUpGlobalOrdinals.values()) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
final long start = System.nanoTime();
IndexFieldData.Global ifd = indexFieldDataService.getForField(fieldType);
ifd.loadGlobal(searcher.getDirectoryReader());
if (indexShard.warmerService().logger().isTraceEnabled()) {
indexShard.warmerService().logger().trace("warmed global ordinals for [{}], took [{}]", fieldType.name(), TimeValue.timeValueNanos(System.nanoTime() - start));
}
} catch (Throwable t) {
indexShard.warmerService().logger().warn("failed to warm-up global ordinals for [{}]", t, fieldType.name());
} finally {
latch.countDown();
}
}
});
}
return new TerminationHandle() {
@Override
public void awaitTermination() throws InterruptedException {
latch.await();
}
};
}
}
class Reaper implements Runnable { class Reaper implements Runnable {
@Override @Override
public void run() { public void run() {

View File

@ -53,7 +53,6 @@ import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.IndexStoreConfig; import org.elasticsearch.index.store.IndexStoreConfig;
import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.cache.query.IndicesQueryCache; import org.elasticsearch.indices.cache.query.IndicesQueryCache;
@ -104,7 +103,6 @@ public class IndexModuleTests extends ESTestCase {
static NodeServicesProvider newNodeServiceProvider(Settings settings, Environment environment, Client client, ScriptEngineService... scriptEngineServices) throws IOException { static NodeServicesProvider newNodeServiceProvider(Settings settings, Environment environment, Client client, ScriptEngineService... scriptEngineServices) throws IOException {
// TODO this can be used in other place too - lets first refactor the IndicesQueriesRegistry // TODO this can be used in other place too - lets first refactor the IndicesQueriesRegistry
ThreadPool threadPool = new ThreadPool("test"); ThreadPool threadPool = new ThreadPool("test");
IndicesWarmer warmer = new IndicesWarmer(settings, threadPool);
IndicesQueryCache indicesQueryCache = new IndicesQueryCache(settings); IndicesQueryCache indicesQueryCache = new IndicesQueryCache(settings);
CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService(); CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
PageCacheRecycler recycler = new PageCacheRecycler(settings, threadPool); PageCacheRecycler recycler = new PageCacheRecycler(settings, threadPool);
@ -116,7 +114,7 @@ public class IndexModuleTests extends ESTestCase {
ScriptSettings scriptSettings = new ScriptSettings(scriptEngineRegistry, scriptContextRegistry); ScriptSettings scriptSettings = new ScriptSettings(scriptEngineRegistry, scriptContextRegistry);
ScriptService scriptService = new ScriptService(settings, environment, scriptEngines, new ResourceWatcherService(settings, threadPool), scriptEngineRegistry, scriptContextRegistry, scriptSettings); ScriptService scriptService = new ScriptService(settings, environment, scriptEngines, new ResourceWatcherService(settings, threadPool), scriptEngineRegistry, scriptContextRegistry, scriptSettings);
IndicesQueriesRegistry indicesQueriesRegistry = new IndicesQueriesRegistry(settings, emptyMap()); IndicesQueriesRegistry indicesQueriesRegistry = new IndicesQueriesRegistry(settings, emptyMap());
return new NodeServicesProvider(threadPool, indicesQueryCache, warmer, bigArrays, client, scriptService, indicesQueriesRegistry, circuitBreakerService); return new NodeServicesProvider(threadPool, indicesQueryCache, bigArrays, client, scriptService, indicesQueriesRegistry, circuitBreakerService);
} }
@Override @Override

View File

@ -41,10 +41,8 @@ import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader; import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings; import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.IndexSettingsModule;
@ -57,8 +55,6 @@ import static org.hamcrest.Matchers.equalTo;
public class BitSetFilterCacheTests extends ESTestCase { public class BitSetFilterCacheTests extends ESTestCase {
private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("test", Settings.EMPTY); private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("test", Settings.EMPTY);
private final IndicesWarmer warmer = new IndicesWarmer(Settings.EMPTY, null);
private static int matchCount(BitSetProducer producer, IndexReader reader) throws IOException { private static int matchCount(BitSetProducer producer, IndexReader reader) throws IOException {
int count = 0; int count = 0;
@ -95,7 +91,7 @@ public class BitSetFilterCacheTests extends ESTestCase {
reader = ElasticsearchDirectoryReader.wrap(reader, new ShardId("test", "_na_", 0)); reader = ElasticsearchDirectoryReader.wrap(reader, new ShardId("test", "_na_", 0));
IndexSearcher searcher = new IndexSearcher(reader); IndexSearcher searcher = new IndexSearcher(reader);
BitsetFilterCache cache = new BitsetFilterCache(INDEX_SETTINGS, warmer, new BitsetFilterCache.Listener() { BitsetFilterCache cache = new BitsetFilterCache(INDEX_SETTINGS, new BitsetFilterCache.Listener() {
@Override @Override
public void onCache(ShardId shardId, Accountable accountable) { public void onCache(ShardId shardId, Accountable accountable) {
@ -149,7 +145,7 @@ public class BitSetFilterCacheTests extends ESTestCase {
final AtomicInteger onCacheCalls = new AtomicInteger(); final AtomicInteger onCacheCalls = new AtomicInteger();
final AtomicInteger onRemoveCalls = new AtomicInteger(); final AtomicInteger onRemoveCalls = new AtomicInteger();
final BitsetFilterCache cache = new BitsetFilterCache(INDEX_SETTINGS, warmer, new BitsetFilterCache.Listener() { final BitsetFilterCache cache = new BitsetFilterCache(INDEX_SETTINGS, new BitsetFilterCache.Listener() {
@Override @Override
public void onCache(ShardId shardId, Accountable accountable) { public void onCache(ShardId shardId, Accountable accountable) {
onCacheCalls.incrementAndGet(); onCacheCalls.incrementAndGet();
@ -188,7 +184,7 @@ public class BitSetFilterCacheTests extends ESTestCase {
public void testSetNullListener() { public void testSetNullListener() {
try { try {
new BitsetFilterCache(INDEX_SETTINGS, warmer, null); new BitsetFilterCache(INDEX_SETTINGS, null);
fail("listener can't be null"); fail("listener can't be null");
} catch (IllegalArgumentException ex) { } catch (IllegalArgumentException ex) {
assertEquals("listener must not be null", ex.getMessage()); assertEquals("listener must not be null", ex.getMessage());
@ -197,7 +193,7 @@ public class BitSetFilterCacheTests extends ESTestCase {
} }
public void testRejectOtherIndex() throws IOException { public void testRejectOtherIndex() throws IOException {
BitsetFilterCache cache = new BitsetFilterCache(INDEX_SETTINGS, warmer, new BitsetFilterCache.Listener() { BitsetFilterCache cache = new BitsetFilterCache(INDEX_SETTINGS, new BitsetFilterCache.Listener() {
@Override @Override
public void onCache(ShardId shardId, Accountable accountable) { public void onCache(ShardId shardId, Accountable accountable) {

View File

@ -78,7 +78,6 @@ import org.elasticsearch.index.query.support.QueryParsers;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
@ -275,7 +274,7 @@ public abstract class AbstractQueryTestCase<QB extends AbstractQueryBuilder<QB>>
IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() { IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() {
}); });
indexFieldDataService = new IndexFieldDataService(idxSettings, indicesFieldDataCache, injector.getInstance(CircuitBreakerService.class), mapperService); indexFieldDataService = new IndexFieldDataService(idxSettings, indicesFieldDataCache, injector.getInstance(CircuitBreakerService.class), mapperService);
BitsetFilterCache bitsetFilterCache = new BitsetFilterCache(idxSettings, new IndicesWarmer(idxSettings.getNodeSettings(), null), new BitsetFilterCache.Listener() { BitsetFilterCache bitsetFilterCache = new BitsetFilterCache(idxSettings, new BitsetFilterCache.Listener() {
@Override @Override
public void onCache(ShardId shardId, Accountable accountable) { public void onCache(ShardId shardId, Accountable accountable) {

View File

@ -1154,7 +1154,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
ShardRouting routing = new ShardRouting(shard.routingEntry()); ShardRouting routing = new ShardRouting(shard.routingEntry());
shard.close("simon says", true); shard.close("simon says", true);
NodeServicesProvider indexServices = indexService.getIndexServices(); NodeServicesProvider indexServices = indexService.getIndexServices();
IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, indexServices, indexService.getSearchSlowLog(), listeners); IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, indexServices, indexService.getSearchSlowLog(), null, listeners);
ShardRoutingHelper.reinit(routing); ShardRoutingHelper.reinit(routing);
newShard.updateRoutingEntry(routing, false); newShard.updateRoutingEntry(routing, false);
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT); DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);

View File

@ -52,7 +52,6 @@ import org.elasticsearch.index.query.functionscore.ScoreFunctionParser;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.breaker.CircuitBreakerService; import org.elasticsearch.indices.breaker.CircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache; import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
@ -141,7 +140,7 @@ public class TemplateQueryParserTests extends ESTestCase {
MapperService mapperService = new MapperService(idxSettings, analysisService, similarityService, mapperRegistry, () -> context); MapperService mapperService = new MapperService(idxSettings, analysisService, similarityService, mapperRegistry, () -> context);
IndicesFieldDataCache cache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() {}); IndicesFieldDataCache cache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() {});
IndexFieldDataService indexFieldDataService =new IndexFieldDataService(idxSettings, cache, injector.getInstance(CircuitBreakerService.class), mapperService); IndexFieldDataService indexFieldDataService =new IndexFieldDataService(idxSettings, cache, injector.getInstance(CircuitBreakerService.class), mapperService);
BitsetFilterCache bitsetFilterCache = new BitsetFilterCache(idxSettings, new IndicesWarmer(idxSettings.getNodeSettings(), null), new BitsetFilterCache.Listener() { BitsetFilterCache bitsetFilterCache = new BitsetFilterCache(idxSettings, new BitsetFilterCache.Listener() {
@Override @Override
public void onCache(ShardId shardId, Accountable accountable) { public void onCache(ShardId shardId, Accountable accountable) {

View File

@ -26,7 +26,6 @@ import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.IndicesWarmer;
import org.elasticsearch.indices.cache.request.IndicesRequestCache; import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.plugins.Plugin; import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
@ -67,10 +66,10 @@ public class MockSearchService extends SearchService {
} }
@Inject @Inject
public MockSearchService(Settings settings, ClusterSettings clusterSettings, ClusterService clusterService, IndicesService indicesService, IndicesWarmer indicesWarmer, public MockSearchService(Settings settings, ClusterSettings clusterSettings, ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, ThreadPool threadPool, ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays,
DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase, IndicesRequestCache indicesQueryCache) { DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase, IndicesRequestCache indicesQueryCache) {
super(settings, clusterSettings, clusterService, indicesService, indicesWarmer, threadPool, scriptService, pageCacheRecycler, bigArrays, dfsPhase, super(settings, clusterSettings, clusterService, indicesService, threadPool, scriptService, pageCacheRecycler, bigArrays, dfsPhase,
queryPhase, fetchPhase, indicesQueryCache); queryPhase, fetchPhase, indicesQueryCache);
} }

View File

@ -47,7 +47,6 @@ import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder; import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
@ -108,6 +107,7 @@ import org.elasticsearch.index.MergePolicyConfig;
import org.elasticsearch.index.MergeSchedulerConfig; import org.elasticsearch.index.MergeSchedulerConfig;
import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.Translog;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.index.IndexWarmer;
import org.elasticsearch.indices.cache.request.IndicesRequestCache; import org.elasticsearch.indices.cache.request.IndicesRequestCache;
import org.elasticsearch.indices.store.IndicesStore; import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.node.NodeMocksPlugin; import org.elasticsearch.node.NodeMocksPlugin;
@ -115,7 +115,6 @@ import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.MockSearchService; import org.elasticsearch.search.MockSearchService;
import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.test.client.RandomizingClient; import org.elasticsearch.test.client.RandomizingClient;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme; import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.rest.client.http.HttpRequestBuilder; import org.elasticsearch.test.rest.client.http.HttpRequestBuilder;
@ -129,7 +128,6 @@ import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.lang.annotation.Annotation; import java.lang.annotation.Annotation;
@ -510,7 +508,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
private static Settings.Builder setRandomIndexNormsLoading(Random random, Settings.Builder builder) { private static Settings.Builder setRandomIndexNormsLoading(Random random, Settings.Builder builder) {
if (random.nextBoolean()) { if (random.nextBoolean()) {
builder.put(SearchService.INDEX_NORMS_LOADING_SETTING.getKey(), RandomPicks.randomFrom(random, Arrays.asList(MappedFieldType.Loading.EAGER, MappedFieldType.Loading.LAZY))); builder.put(IndexWarmer.INDEX_NORMS_LOADING_SETTING.getKey(), RandomPicks.randomFrom(random, Arrays.asList(MappedFieldType.Loading.EAGER, MappedFieldType.Loading.LAZY)));
} }
return builder; return builder;
} }