Merge pull request #16470 from s1monw/make_indices_warmer_private
Make IndicesWarmer a private class of IndexService
This commit is contained in:
commit
f1d1c3c79f
|
@ -38,8 +38,8 @@ import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
|
|||
import org.elasticsearch.index.store.FsDirectoryService;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.index.IndexWarmer;
|
||||
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -132,7 +132,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
|
|||
PrimaryShardAllocator.INDEX_RECOVERY_INITIAL_SHARDS_SETTING,
|
||||
FsDirectoryService.INDEX_LOCK_FACTOR_SETTING,
|
||||
EngineConfig.INDEX_CODEC_SETTING,
|
||||
SearchService.INDEX_NORMS_LOADING_SETTING,
|
||||
IndexWarmer.INDEX_NORMS_LOADING_SETTING,
|
||||
// this sucks but we can't really validate all the analyzers/similarity in here
|
||||
Setting.groupSetting("index.similarity.", false, Setting.Scope.INDEX), // this allows similarity settings to be passed
|
||||
Setting.groupSetting("index.analysis.", false, Setting.Scope.INDEX) // this allows analysis settings to be passed
|
||||
|
|
|
@ -54,6 +54,7 @@ import org.elasticsearch.index.analysis.AnalysisService;
|
|||
import org.elasticsearch.index.cache.IndexCache;
|
||||
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
|
||||
import org.elasticsearch.index.cache.query.QueryCache;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.engine.EngineClosedException;
|
||||
import org.elasticsearch.index.engine.EngineFactory;
|
||||
import org.elasticsearch.index.fielddata.FieldDataType;
|
||||
|
@ -102,6 +103,7 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
private final MapperService mapperService;
|
||||
private final SimilarityService similarityService;
|
||||
private final EngineFactory engineFactory;
|
||||
private final IndexWarmer warmer;
|
||||
private volatile Map<Integer, IndexShard> shards = emptyMap();
|
||||
private final AtomicBoolean closed = new AtomicBoolean(false);
|
||||
private final AtomicBoolean deleted = new AtomicBoolean(false);
|
||||
|
@ -137,7 +139,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
this.nodeServicesProvider = nodeServicesProvider;
|
||||
this.indexStore = indexStore;
|
||||
indexFieldData.setListener(new FieldDataCacheListener(this));
|
||||
this.bitsetFilterCache = new BitsetFilterCache(indexSettings, nodeServicesProvider.getWarmer(), new BitsetCacheListener(this));
|
||||
this.bitsetFilterCache = new BitsetFilterCache(indexSettings, new BitsetCacheListener(this));
|
||||
this.warmer = new IndexWarmer(indexSettings.getSettings(), nodeServicesProvider.getThreadPool(), bitsetFilterCache.createListener(nodeServicesProvider.getThreadPool()));
|
||||
this.indexCache = new IndexCache(indexSettings, queryCache, bitsetFilterCache);
|
||||
this.engineFactory = engineFactory;
|
||||
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
|
||||
|
@ -312,11 +315,18 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
|
|||
// if we are on a shared FS we only own the shard (ie. we can safely delete it) if we are the primary.
|
||||
final boolean canDeleteShardContent = IndexMetaData.isOnSharedFilesystem(indexSettings) == false ||
|
||||
(primary && IndexMetaData.isOnSharedFilesystem(indexSettings));
|
||||
final Engine.Warmer engineWarmer = (searcher, toLevel) -> {
|
||||
IndexShard shard = getShardOrNull(shardId.getId());
|
||||
if (shard != null) {
|
||||
warmer.warm(searcher, shard, IndexService.this.indexSettings, toLevel);
|
||||
}
|
||||
};
|
||||
|
||||
store = new Store(shardId, this.indexSettings, indexStore.newDirectoryService(path), lock, new StoreCloseListener(shardId, canDeleteShardContent, () -> nodeServicesProvider.getIndicesQueryCache().onClose(shardId)));
|
||||
if (useShadowEngine(primary, indexSettings)) {
|
||||
indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, searchSlowLog); // no indexing listeners - shadow engines don't index
|
||||
indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, searchSlowLog, engineWarmer); // no indexing listeners - shadow engines don't index
|
||||
} else {
|
||||
indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, searchSlowLog, listeners);
|
||||
indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldData, engineFactory, eventListener, searcherWrapper, nodeServicesProvider, searchSlowLog, engineWarmer, listeners);
|
||||
}
|
||||
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
|
||||
eventListener.afterIndexShardCreated(indexShard);
|
||||
|
|
|
@ -0,0 +1,292 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.index;
|
||||
|
||||
import com.carrotsearch.hppc.ObjectHashSet;
|
||||
import com.carrotsearch.hppc.ObjectSet;
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
import org.apache.lucene.index.IndexOptions;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.index.NumericDocValues;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.fielddata.FieldDataType;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldData;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldDataService;
|
||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||
import org.elasticsearch.index.mapper.FieldMapper;
|
||||
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
*/
|
||||
public final class IndexWarmer extends AbstractComponent {
|
||||
|
||||
public static final Setting<MappedFieldType.Loading> INDEX_NORMS_LOADING_SETTING = new Setting<>("index.norms.loading",
|
||||
MappedFieldType.Loading.LAZY.toString(), (s) -> MappedFieldType.Loading.parse(s, MappedFieldType.Loading.LAZY),
|
||||
false, Setting.Scope.INDEX);
|
||||
private final List<Listener> listeners;
|
||||
|
||||
IndexWarmer(Settings settings, ThreadPool threadPool, Listener... listeners) {
|
||||
super(settings);
|
||||
ArrayList<Listener> list = new ArrayList<>();
|
||||
final Executor executor = threadPool.executor(ThreadPool.Names.WARMER);
|
||||
list.add(new NormsWarmer(executor));
|
||||
list.add(new FieldDataWarmer(executor));
|
||||
for (Listener listener : listeners) {
|
||||
list.add(listener);
|
||||
}
|
||||
this.listeners = Collections.unmodifiableList(list);
|
||||
}
|
||||
|
||||
void warm(Engine.Searcher searcher, IndexShard shard, IndexSettings settings, boolean isTopReader) {
|
||||
if (shard.state() == IndexShardState.CLOSED) {
|
||||
return;
|
||||
}
|
||||
if (settings.isWarmerEnabled() == false) {
|
||||
return;
|
||||
}
|
||||
if (logger.isTraceEnabled()) {
|
||||
if (isTopReader) {
|
||||
logger.trace("{} top warming [{}]", shard.shardId(), searcher.reader());
|
||||
} else {
|
||||
logger.trace("{} warming [{}]", shard.shardId(), searcher.reader());
|
||||
}
|
||||
}
|
||||
shard.warmerService().onPreWarm();
|
||||
long time = System.nanoTime();
|
||||
final List<TerminationHandle> terminationHandles = new ArrayList<>();
|
||||
// get a handle on pending tasks
|
||||
for (final Listener listener : listeners) {
|
||||
if (isTopReader) {
|
||||
terminationHandles.add(listener.warmTopReader(shard, searcher));
|
||||
} else {
|
||||
terminationHandles.add(listener.warmNewReaders(shard, searcher));
|
||||
}
|
||||
}
|
||||
// wait for termination
|
||||
for (TerminationHandle terminationHandle : terminationHandles) {
|
||||
try {
|
||||
terminationHandle.awaitTermination();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
if (isTopReader) {
|
||||
logger.warn("top warming has been interrupted", e);
|
||||
} else {
|
||||
logger.warn("warming has been interrupted", e);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
long took = System.nanoTime() - time;
|
||||
shard.warmerService().onPostWarm(took);
|
||||
if (shard.warmerService().logger().isTraceEnabled()) {
|
||||
if (isTopReader) {
|
||||
shard.warmerService().logger().trace("top warming took [{}]", new TimeValue(took, TimeUnit.NANOSECONDS));
|
||||
} else {
|
||||
shard.warmerService().logger().trace("warming took [{}]", new TimeValue(took, TimeUnit.NANOSECONDS));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** A handle on the execution of warm-up action. */
|
||||
public interface TerminationHandle {
|
||||
|
||||
TerminationHandle NO_WAIT = () -> {};
|
||||
|
||||
/** Wait until execution of the warm-up action completes. */
|
||||
void awaitTermination() throws InterruptedException;
|
||||
}
|
||||
public interface Listener {
|
||||
/** Queue tasks to warm-up the given segments and return handles that allow to wait for termination of the
|
||||
* execution of those tasks. */
|
||||
TerminationHandle warmNewReaders(IndexShard indexShard, Engine.Searcher searcher);
|
||||
|
||||
TerminationHandle warmTopReader(IndexShard indexShard, Engine.Searcher searcher);
|
||||
}
|
||||
|
||||
private static class NormsWarmer implements IndexWarmer.Listener {
|
||||
private final Executor executor;
|
||||
public NormsWarmer(Executor executor) {
|
||||
this.executor = executor;
|
||||
}
|
||||
@Override
|
||||
public TerminationHandle warmNewReaders(final IndexShard indexShard, final Engine.Searcher searcher) {
|
||||
final MappedFieldType.Loading defaultLoading = indexShard.indexSettings().getValue(INDEX_NORMS_LOADING_SETTING);
|
||||
final MapperService mapperService = indexShard.mapperService();
|
||||
final ObjectSet<String> warmUp = new ObjectHashSet<>();
|
||||
for (DocumentMapper docMapper : mapperService.docMappers(false)) {
|
||||
for (FieldMapper fieldMapper : docMapper.mappers()) {
|
||||
final String indexName = fieldMapper.fieldType().name();
|
||||
MappedFieldType.Loading normsLoading = fieldMapper.fieldType().normsLoading();
|
||||
if (normsLoading == null) {
|
||||
normsLoading = defaultLoading;
|
||||
}
|
||||
if (fieldMapper.fieldType().indexOptions() != IndexOptions.NONE && !fieldMapper.fieldType().omitNorms()
|
||||
&& normsLoading == MappedFieldType.Loading.EAGER) {
|
||||
warmUp.add(indexName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
// Norms loading may be I/O intensive but is not CPU intensive, so we execute it in a single task
|
||||
executor.execute(() -> {
|
||||
try {
|
||||
for (ObjectCursor<String> stringObjectCursor : warmUp) {
|
||||
final String indexName = stringObjectCursor.value;
|
||||
final long start = System.nanoTime();
|
||||
for (final LeafReaderContext ctx : searcher.reader().leaves()) {
|
||||
final NumericDocValues values = ctx.reader().getNormValues(indexName);
|
||||
if (values != null) {
|
||||
values.get(0);
|
||||
}
|
||||
}
|
||||
if (indexShard.warmerService().logger().isTraceEnabled()) {
|
||||
indexShard.warmerService().logger().trace("warmed norms for [{}], took [{}]", indexName,
|
||||
TimeValue.timeValueNanos(System.nanoTime() - start));
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
indexShard.warmerService().logger().warn("failed to warm-up norms", t);
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
return () -> latch.await();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TerminationHandle warmTopReader(IndexShard indexShard, final Engine.Searcher searcher) {
|
||||
return TerminationHandle.NO_WAIT;
|
||||
}
|
||||
}
|
||||
|
||||
private static class FieldDataWarmer implements IndexWarmer.Listener {
|
||||
|
||||
private final Executor executor;
|
||||
public FieldDataWarmer(Executor executor) {
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TerminationHandle warmNewReaders(final IndexShard indexShard, final Engine.Searcher searcher) {
|
||||
final MapperService mapperService = indexShard.mapperService();
|
||||
final Map<String, MappedFieldType> warmUp = new HashMap<>();
|
||||
for (DocumentMapper docMapper : mapperService.docMappers(false)) {
|
||||
for (FieldMapper fieldMapper : docMapper.mappers()) {
|
||||
final FieldDataType fieldDataType = fieldMapper.fieldType().fieldDataType();
|
||||
final String indexName = fieldMapper.fieldType().name();
|
||||
if (fieldDataType == null) {
|
||||
continue;
|
||||
}
|
||||
if (fieldDataType.getLoading() == MappedFieldType.Loading.LAZY) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (warmUp.containsKey(indexName)) {
|
||||
continue;
|
||||
}
|
||||
warmUp.put(indexName, fieldMapper.fieldType());
|
||||
}
|
||||
}
|
||||
final IndexFieldDataService indexFieldDataService = indexShard.indexFieldDataService();
|
||||
final CountDownLatch latch = new CountDownLatch(searcher.reader().leaves().size() * warmUp.size());
|
||||
for (final LeafReaderContext ctx : searcher.reader().leaves()) {
|
||||
for (final MappedFieldType fieldType : warmUp.values()) {
|
||||
executor.execute(() -> {
|
||||
try {
|
||||
final long start = System.nanoTime();
|
||||
indexFieldDataService.getForField(fieldType).load(ctx);
|
||||
if (indexShard.warmerService().logger().isTraceEnabled()) {
|
||||
indexShard.warmerService().logger().trace("warmed fielddata for [{}], took [{}]", fieldType.name(),
|
||||
TimeValue.timeValueNanos(System.nanoTime() - start));
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
indexShard.warmerService().logger().warn("failed to warm-up fielddata for [{}]", t, fieldType.name());
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
return () -> latch.await();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TerminationHandle warmTopReader(final IndexShard indexShard, final Engine.Searcher searcher) {
|
||||
final MapperService mapperService = indexShard.mapperService();
|
||||
final Map<String, MappedFieldType> warmUpGlobalOrdinals = new HashMap<>();
|
||||
for (DocumentMapper docMapper : mapperService.docMappers(false)) {
|
||||
for (FieldMapper fieldMapper : docMapper.mappers()) {
|
||||
final FieldDataType fieldDataType = fieldMapper.fieldType().fieldDataType();
|
||||
final String indexName = fieldMapper.fieldType().name();
|
||||
if (fieldDataType == null) {
|
||||
continue;
|
||||
}
|
||||
if (fieldDataType.getLoading() != MappedFieldType.Loading.EAGER_GLOBAL_ORDINALS) {
|
||||
continue;
|
||||
}
|
||||
if (warmUpGlobalOrdinals.containsKey(indexName)) {
|
||||
continue;
|
||||
}
|
||||
warmUpGlobalOrdinals.put(indexName, fieldMapper.fieldType());
|
||||
}
|
||||
}
|
||||
final IndexFieldDataService indexFieldDataService = indexShard.indexFieldDataService();
|
||||
final CountDownLatch latch = new CountDownLatch(warmUpGlobalOrdinals.size());
|
||||
for (final MappedFieldType fieldType : warmUpGlobalOrdinals.values()) {
|
||||
executor.execute(() -> {
|
||||
try {
|
||||
final long start = System.nanoTime();
|
||||
IndexFieldData.Global ifd = indexFieldDataService.getForField(fieldType);
|
||||
ifd.loadGlobal(searcher.getDirectoryReader());
|
||||
if (indexShard.warmerService().logger().isTraceEnabled()) {
|
||||
indexShard.warmerService().logger().trace("warmed global ordinals for [{}], took [{}]", fieldType.name(),
|
||||
TimeValue.timeValueNanos(System.nanoTime() - start));
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
indexShard.warmerService().logger().warn("failed to warm-up global ordinals for [{}]", t, fieldType.name());
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
return () -> latch.await();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -20,10 +20,8 @@
|
|||
package org.elasticsearch.index;
|
||||
|
||||
import org.elasticsearch.client.Client;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.indices.IndicesWarmer;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
|
||||
import org.elasticsearch.indices.query.IndicesQueriesRegistry;
|
||||
|
@ -39,7 +37,6 @@ public final class NodeServicesProvider {
|
|||
|
||||
private final ThreadPool threadPool;
|
||||
private final IndicesQueryCache indicesQueryCache;
|
||||
private final IndicesWarmer warmer;
|
||||
private final BigArrays bigArrays;
|
||||
private final Client client;
|
||||
private final IndicesQueriesRegistry indicesQueriesRegistry;
|
||||
|
@ -47,10 +44,9 @@ public final class NodeServicesProvider {
|
|||
private final CircuitBreakerService circuitBreakerService;
|
||||
|
||||
@Inject
|
||||
public NodeServicesProvider(ThreadPool threadPool, IndicesQueryCache indicesQueryCache, @Nullable IndicesWarmer warmer, BigArrays bigArrays, Client client, ScriptService scriptService, IndicesQueriesRegistry indicesQueriesRegistry, CircuitBreakerService circuitBreakerService) {
|
||||
public NodeServicesProvider(ThreadPool threadPool, IndicesQueryCache indicesQueryCache, BigArrays bigArrays, Client client, ScriptService scriptService, IndicesQueriesRegistry indicesQueriesRegistry, CircuitBreakerService circuitBreakerService) {
|
||||
this.threadPool = threadPool;
|
||||
this.indicesQueryCache = indicesQueryCache;
|
||||
this.warmer = warmer;
|
||||
this.bigArrays = bigArrays;
|
||||
this.client = client;
|
||||
this.indicesQueriesRegistry = indicesQueriesRegistry;
|
||||
|
@ -66,10 +62,6 @@ public final class NodeServicesProvider {
|
|||
return indicesQueryCache;
|
||||
}
|
||||
|
||||
public IndicesWarmer getWarmer() {
|
||||
return warmer;
|
||||
}
|
||||
|
||||
public BigArrays getBigArrays() { return bigArrays; }
|
||||
|
||||
public Client getClient() {
|
||||
|
|
|
@ -48,8 +48,9 @@ import org.elasticsearch.index.mapper.object.ObjectMapper;
|
|||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.ShardUtils;
|
||||
import org.elasticsearch.indices.IndicesWarmer;
|
||||
import org.elasticsearch.indices.IndicesWarmer.TerminationHandle;
|
||||
import org.elasticsearch.index.IndexWarmer;
|
||||
import org.elasticsearch.index.IndexWarmer.TerminationHandle;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
@ -74,22 +75,20 @@ public final class BitsetFilterCache extends AbstractIndexComponent implements L
|
|||
private final boolean loadRandomAccessFiltersEagerly;
|
||||
private final Cache<Object, Cache<Query, Value>> loadedFilters;
|
||||
private final Listener listener;
|
||||
private final BitSetProducerWarmer warmer;
|
||||
private final IndicesWarmer indicesWarmer;
|
||||
|
||||
public BitsetFilterCache(IndexSettings indexSettings, IndicesWarmer indicesWarmer, Listener listener) {
|
||||
public BitsetFilterCache(IndexSettings indexSettings, Listener listener) {
|
||||
super(indexSettings);
|
||||
if (listener == null) {
|
||||
throw new IllegalArgumentException("listener must not be null");
|
||||
}
|
||||
this.loadRandomAccessFiltersEagerly = this.indexSettings.getValue(INDEX_LOAD_RANDOM_ACCESS_FILTERS_EAGERLY_SETTING);
|
||||
this.loadedFilters = CacheBuilder.<Object, Cache<Query, Value>>builder().removalListener(this).build();
|
||||
this.warmer = new BitSetProducerWarmer();
|
||||
this.indicesWarmer = indicesWarmer;
|
||||
indicesWarmer.addListener(warmer);
|
||||
this.listener = listener;
|
||||
}
|
||||
|
||||
public IndexWarmer.Listener createListener(ThreadPool threadPool) {
|
||||
return new BitSetProducerWarmer(threadPool);
|
||||
}
|
||||
|
||||
|
||||
public BitSetProducer getBitSetProducer(Query query) {
|
||||
|
@ -103,11 +102,7 @@ public final class BitsetFilterCache extends AbstractIndexComponent implements L
|
|||
|
||||
@Override
|
||||
public void close() {
|
||||
try {
|
||||
indicesWarmer.removeListener(warmer);
|
||||
} finally {
|
||||
clear("close");
|
||||
}
|
||||
clear("close");
|
||||
}
|
||||
|
||||
public void clear(String reason) {
|
||||
|
@ -210,10 +205,16 @@ public final class BitsetFilterCache extends AbstractIndexComponent implements L
|
|||
}
|
||||
}
|
||||
|
||||
final class BitSetProducerWarmer implements IndicesWarmer.Listener {
|
||||
final class BitSetProducerWarmer implements IndexWarmer.Listener {
|
||||
|
||||
private final Executor executor;
|
||||
|
||||
BitSetProducerWarmer(ThreadPool threadPool) {
|
||||
this.executor = threadPool.executor(ThreadPool.Names.WARMER);
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndicesWarmer.TerminationHandle warmNewReaders(final IndexShard indexShard, final Engine.Searcher searcher) {
|
||||
public IndexWarmer.TerminationHandle warmNewReaders(final IndexShard indexShard, final Engine.Searcher searcher) {
|
||||
if (indexSettings.getIndex().equals(indexShard.getIndexSettings().getIndex()) == false) {
|
||||
// this is from a different index
|
||||
return TerminationHandle.NO_WAIT;
|
||||
|
@ -244,7 +245,6 @@ public final class BitsetFilterCache extends AbstractIndexComponent implements L
|
|||
warmUp.add(Queries.newNonNestedFilter());
|
||||
}
|
||||
|
||||
final Executor executor = indicesWarmer.getExecutor();
|
||||
final CountDownLatch latch = new CountDownLatch(searcher.reader().leaves().size() * warmUp.size());
|
||||
for (final LeafReaderContext ctx : searcher.reader().leaves()) {
|
||||
for (final Query filterToWarm : warmUp) {
|
||||
|
@ -277,7 +277,6 @@ public final class BitsetFilterCache extends AbstractIndexComponent implements L
|
|||
return loadedFilters;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* A listener interface that is executed for each onCache / onRemoval event
|
||||
*/
|
||||
|
@ -295,6 +294,4 @@ public final class BitsetFilterCache extends AbstractIndexComponent implements L
|
|||
*/
|
||||
void onRemoval(ShardId shardId, Accountable accountable);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -105,7 +105,6 @@ import org.elasticsearch.index.translog.TranslogConfig;
|
|||
import org.elasticsearch.index.translog.TranslogStats;
|
||||
import org.elasticsearch.index.warmer.ShardIndexWarmerService;
|
||||
import org.elasticsearch.index.warmer.WarmerStats;
|
||||
import org.elasticsearch.indices.IndicesWarmer;
|
||||
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
|
||||
import org.elasticsearch.indices.IndexingMemoryController;
|
||||
import org.elasticsearch.indices.recovery.RecoveryFailedException;
|
||||
|
@ -151,7 +150,7 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
private final Object mutex = new Object();
|
||||
private final String checkIndexOnStartup;
|
||||
private final CodecService codecService;
|
||||
private final IndicesWarmer warmer;
|
||||
private final Engine.Warmer warmer;
|
||||
private final SnapshotDeletionPolicy deletionPolicy;
|
||||
private final SimilarityService similarityService;
|
||||
private final EngineConfig engineConfig;
|
||||
|
@ -208,12 +207,12 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
public IndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache,
|
||||
MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService,
|
||||
@Nullable EngineFactory engineFactory,
|
||||
IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider, SearchSlowLog slowLog, IndexingOperationListener... listeners) {
|
||||
IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, NodeServicesProvider provider, SearchSlowLog slowLog, Engine.Warmer warmer, IndexingOperationListener... listeners) {
|
||||
super(shardId, indexSettings);
|
||||
final Settings settings = indexSettings.getSettings();
|
||||
this.idxSettings = indexSettings;
|
||||
this.codecService = new CodecService(mapperService, logger);
|
||||
this.warmer = provider.getWarmer();
|
||||
this.warmer = warmer;
|
||||
this.deletionPolicy = new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy());
|
||||
this.similarityService = similarityService;
|
||||
Objects.requireNonNull(store, "Store must be provided to the index shard");
|
||||
|
@ -1389,9 +1388,8 @@ public class IndexShard extends AbstractIndexShardComponent {
|
|||
recoveryState.getTranslog().incrementRecoveredOperations();
|
||||
}
|
||||
};
|
||||
final Engine.Warmer engineWarmer = (searcher, toLevel) -> warmer.warm(searcher, this, idxSettings, toLevel);
|
||||
return new EngineConfig(shardId,
|
||||
threadPool, indexSettings, engineWarmer, store, deletionPolicy, indexSettings.getMergePolicy(),
|
||||
threadPool, indexSettings, warmer, store, deletionPolicy, indexSettings.getMergePolicy(),
|
||||
mapperService.indexAnalyzer(), similarityService.similarity(mapperService), codecService, shardEventListener, translogRecoveryPerformer, indexCache.query(), cachingPolicy, translogConfig,
|
||||
idxSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME));
|
||||
}
|
||||
|
|
|
@ -45,8 +45,8 @@ import java.io.IOException;
|
|||
public final class ShadowIndexShard extends IndexShard {
|
||||
|
||||
public ShadowIndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache, MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService, @Nullable EngineFactory engineFactory,
|
||||
IndexEventListener indexEventListener, IndexSearcherWrapper wrapper, NodeServicesProvider provider, SearchSlowLog searchSlowLog) throws IOException {
|
||||
super(shardId, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory, indexEventListener, wrapper, provider, searchSlowLog);
|
||||
IndexEventListener indexEventListener, IndexSearcherWrapper wrapper, NodeServicesProvider provider, SearchSlowLog searchSlowLog, Engine.Warmer engineWarmer) throws IOException {
|
||||
super(shardId, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory, indexEventListener, wrapper, provider, searchSlowLog, engineWarmer);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -164,7 +164,6 @@ public class IndicesModule extends AbstractModule {
|
|||
bind(IndicesRequestCache.class).asEagerSingleton();
|
||||
bind(TransportNodesListShardStoreMetaData.class).asEagerSingleton();
|
||||
bind(IndicesTTLService.class).asEagerSingleton();
|
||||
bind(IndicesWarmer.class).asEagerSingleton();
|
||||
bind(UpdateHelper.class).asEagerSingleton();
|
||||
bind(MetaDataIndexUpgradeService.class).asEagerSingleton();
|
||||
bind(NodeServicesProvider.class).asEagerSingleton();
|
||||
|
|
|
@ -1,131 +0,0 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.indices;
|
||||
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.IndexShardState;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
*/
|
||||
public final class IndicesWarmer extends AbstractComponent {
|
||||
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
private final CopyOnWriteArrayList<Listener> listeners = new CopyOnWriteArrayList<>();
|
||||
|
||||
@Inject
|
||||
public IndicesWarmer(Settings settings, ThreadPool threadPool) {
|
||||
super(settings);
|
||||
this.threadPool = threadPool;
|
||||
}
|
||||
|
||||
public void addListener(Listener listener) {
|
||||
listeners.add(listener);
|
||||
}
|
||||
public void removeListener(Listener listener) {
|
||||
listeners.remove(listener);
|
||||
}
|
||||
|
||||
public void warm(Engine.Searcher searcher, IndexShard shard, IndexSettings settings, boolean isTopReader) {
|
||||
if (shard.state() == IndexShardState.CLOSED) {
|
||||
return;
|
||||
}
|
||||
if (settings.isWarmerEnabled() == false) {
|
||||
return;
|
||||
}
|
||||
if (logger.isTraceEnabled()) {
|
||||
if (isTopReader) {
|
||||
logger.trace("{} top warming [{}]", shard.shardId(), searcher.reader());
|
||||
} else {
|
||||
logger.trace("{} warming [{}]", shard.shardId(), searcher.reader());
|
||||
}
|
||||
}
|
||||
shard.warmerService().onPreWarm();
|
||||
long time = System.nanoTime();
|
||||
final List<TerminationHandle> terminationHandles = new ArrayList<>();
|
||||
// get a handle on pending tasks
|
||||
for (final Listener listener : listeners) {
|
||||
if (isTopReader) {
|
||||
terminationHandles.add(listener.warmTopReader(shard, searcher));
|
||||
} else {
|
||||
terminationHandles.add(listener.warmNewReaders(shard, searcher));
|
||||
}
|
||||
}
|
||||
// wait for termination
|
||||
for (TerminationHandle terminationHandle : terminationHandles) {
|
||||
try {
|
||||
terminationHandle.awaitTermination();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
if (isTopReader) {
|
||||
logger.warn("top warming has been interrupted", e);
|
||||
} else {
|
||||
logger.warn("warming has been interrupted", e);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
long took = System.nanoTime() - time;
|
||||
shard.warmerService().onPostWarm(took);
|
||||
if (shard.warmerService().logger().isTraceEnabled()) {
|
||||
if (isTopReader) {
|
||||
shard.warmerService().logger().trace("top warming took [{}]", new TimeValue(took, TimeUnit.NANOSECONDS));
|
||||
} else {
|
||||
shard.warmerService().logger().trace("warming took [{}]", new TimeValue(took, TimeUnit.NANOSECONDS));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an executor for async warmer tasks
|
||||
*/
|
||||
public Executor getExecutor() {
|
||||
return threadPool.executor(ThreadPool.Names.WARMER);
|
||||
}
|
||||
|
||||
/** A handle on the execution of warm-up action. */
|
||||
public interface TerminationHandle {
|
||||
|
||||
TerminationHandle NO_WAIT = () -> {};
|
||||
|
||||
/** Wait until execution of the warm-up action completes. */
|
||||
void awaitTermination() throws InterruptedException;
|
||||
}
|
||||
public interface Listener {
|
||||
/** Queue tasks to warm-up the given segments and return handles that allow to wait for termination of the execution of those tasks. */
|
||||
TerminationHandle warmNewReaders(IndexShard indexShard, Engine.Searcher searcher);
|
||||
|
||||
TerminationHandle warmTopReader(IndexShard indexShard, Engine.Searcher searcher);
|
||||
}
|
||||
|
||||
}
|
|
@ -20,13 +20,6 @@
|
|||
package org.elasticsearch.search;
|
||||
|
||||
import com.carrotsearch.hppc.ObjectFloatHashMap;
|
||||
import com.carrotsearch.hppc.ObjectHashSet;
|
||||
import com.carrotsearch.hppc.ObjectSet;
|
||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
||||
|
||||
import org.apache.lucene.index.IndexOptions;
|
||||
import org.apache.lucene.index.LeafReaderContext;
|
||||
import org.apache.lucene.index.NumericDocValues;
|
||||
import org.apache.lucene.search.FieldDoc;
|
||||
import org.apache.lucene.search.TopDocs;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
|
@ -54,14 +47,6 @@ import org.elasticsearch.common.xcontent.XContentParser;
|
|||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
import org.elasticsearch.index.fielddata.FieldDataType;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldData;
|
||||
import org.elasticsearch.index.fielddata.IndexFieldDataService;
|
||||
import org.elasticsearch.index.mapper.DocumentMapper;
|
||||
import org.elasticsearch.index.mapper.FieldMapper;
|
||||
import org.elasticsearch.index.mapper.MappedFieldType;
|
||||
import org.elasticsearch.index.mapper.MappedFieldType.Loading;
|
||||
import org.elasticsearch.index.mapper.MapperService;
|
||||
import org.elasticsearch.index.query.QueryParseContext;
|
||||
import org.elasticsearch.index.query.QueryShardContext;
|
||||
import org.elasticsearch.index.search.stats.ShardSearchStats;
|
||||
|
@ -69,8 +54,6 @@ import org.elasticsearch.index.search.stats.StatsGroupsParseElement;
|
|||
import org.elasticsearch.index.shard.IndexEventListener;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.IndicesWarmer;
|
||||
import org.elasticsearch.indices.IndicesWarmer.TerminationHandle;
|
||||
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
|
||||
import org.elasticsearch.script.ExecutableScript;
|
||||
import org.elasticsearch.script.ScriptContext;
|
||||
|
@ -109,9 +92,7 @@ import java.io.IOException;
|
|||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
|
@ -124,7 +105,6 @@ import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
|
|||
*/
|
||||
public class SearchService extends AbstractLifecycleComponent<SearchService> implements IndexEventListener {
|
||||
|
||||
public static final Setting<Loading> INDEX_NORMS_LOADING_SETTING = new Setting<>("index.norms.loading", Loading.LAZY.toString(), (s) -> Loading.parse(s, Loading.LAZY), false, Setting.Scope.INDEX);
|
||||
// we can have 5 minutes here, since we make sure to clean with search requests and when shard/index closes
|
||||
public static final Setting<TimeValue> DEFAULT_KEEPALIVE_SETTING = Setting.positiveTimeSetting("search.default_keep_alive", timeValueMinutes(5), false, Setting.Scope.CLUSTER);
|
||||
public static final Setting<TimeValue> KEEPALIVE_INTERVAL_SETTING = Setting.positiveTimeSetting("search.keep_alive_interval", timeValueMinutes(1), false, Setting.Scope.CLUSTER);
|
||||
|
@ -139,8 +119,6 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
|
|||
|
||||
private final IndicesService indicesService;
|
||||
|
||||
private final IndicesWarmer indicesWarmer;
|
||||
|
||||
private final ScriptService scriptService;
|
||||
|
||||
private final PageCacheRecycler pageCacheRecycler;
|
||||
|
@ -170,7 +148,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
|
|||
private final ParseFieldMatcher parseFieldMatcher;
|
||||
|
||||
@Inject
|
||||
public SearchService(Settings settings, ClusterSettings clusterSettings, ClusterService clusterService, IndicesService indicesService, IndicesWarmer indicesWarmer, ThreadPool threadPool,
|
||||
public SearchService(Settings settings, ClusterSettings clusterSettings, ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
|
||||
ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays, DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase,
|
||||
IndicesRequestCache indicesQueryCache) {
|
||||
super(settings);
|
||||
|
@ -178,7 +156,6 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
|
|||
this.threadPool = threadPool;
|
||||
this.clusterService = clusterService;
|
||||
this.indicesService = indicesService;
|
||||
this.indicesWarmer = indicesWarmer;
|
||||
this.scriptService = scriptService;
|
||||
this.pageCacheRecycler = pageCacheRecycler;
|
||||
this.bigArrays = bigArrays;
|
||||
|
@ -199,9 +176,6 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
|
|||
|
||||
this.keepAliveReaper = threadPool.scheduleWithFixedDelay(new Reaper(), keepAliveInterval);
|
||||
|
||||
this.indicesWarmer.addListener(new NormsWarmer(indicesWarmer));
|
||||
this.indicesWarmer.addListener(new FieldDataWarmer(indicesWarmer));
|
||||
|
||||
defaultSearchTimeout = DEFAULT_SEARCH_TIMEOUT_SETTING.get(settings);
|
||||
clusterSettings.addSettingsUpdateConsumer(DEFAULT_SEARCH_TIMEOUT_SETTING, this::setDefaultSearchTimeout);
|
||||
}
|
||||
|
@ -946,184 +920,6 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
|
|||
return this.activeContexts.size();
|
||||
}
|
||||
|
||||
static class NormsWarmer implements IndicesWarmer.Listener {
|
||||
private final IndicesWarmer indicesWarmer;
|
||||
|
||||
public NormsWarmer(IndicesWarmer indicesWarmer) {
|
||||
this.indicesWarmer = indicesWarmer;
|
||||
}
|
||||
@Override
|
||||
public TerminationHandle warmNewReaders(final IndexShard indexShard, final Engine.Searcher searcher) {
|
||||
final Loading defaultLoading = indexShard.indexSettings().getValue(INDEX_NORMS_LOADING_SETTING);
|
||||
final MapperService mapperService = indexShard.mapperService();
|
||||
final ObjectSet<String> warmUp = new ObjectHashSet<>();
|
||||
for (DocumentMapper docMapper : mapperService.docMappers(false)) {
|
||||
for (FieldMapper fieldMapper : docMapper.mappers()) {
|
||||
final String indexName = fieldMapper.fieldType().name();
|
||||
Loading normsLoading = fieldMapper.fieldType().normsLoading();
|
||||
if (normsLoading == null) {
|
||||
normsLoading = defaultLoading;
|
||||
}
|
||||
if (fieldMapper.fieldType().indexOptions() != IndexOptions.NONE && !fieldMapper.fieldType().omitNorms() && normsLoading == Loading.EAGER) {
|
||||
warmUp.add(indexName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
// Norms loading may be I/O intensive but is not CPU intensive, so we execute it in a single task
|
||||
indicesWarmer.getExecutor().execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
for (ObjectCursor<String> stringObjectCursor : warmUp) {
|
||||
final String indexName = stringObjectCursor.value;
|
||||
final long start = System.nanoTime();
|
||||
for (final LeafReaderContext ctx : searcher.reader().leaves()) {
|
||||
final NumericDocValues values = ctx.reader().getNormValues(indexName);
|
||||
if (values != null) {
|
||||
values.get(0);
|
||||
}
|
||||
}
|
||||
if (indexShard.warmerService().logger().isTraceEnabled()) {
|
||||
indexShard.warmerService().logger().trace("warmed norms for [{}], took [{}]", indexName, TimeValue.timeValueNanos(System.nanoTime() - start));
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
indexShard.warmerService().logger().warn("failed to warm-up norms", t);
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
return new TerminationHandle() {
|
||||
@Override
|
||||
public void awaitTermination() throws InterruptedException {
|
||||
latch.await();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public TerminationHandle warmTopReader(IndexShard indexShard, final Engine.Searcher searcher) {
|
||||
return TerminationHandle.NO_WAIT;
|
||||
}
|
||||
}
|
||||
|
||||
static class FieldDataWarmer implements IndicesWarmer.Listener {
|
||||
|
||||
private final IndicesWarmer indicesWarmer;
|
||||
|
||||
public FieldDataWarmer(IndicesWarmer indicesWarmer) {
|
||||
this.indicesWarmer = indicesWarmer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TerminationHandle warmNewReaders(final IndexShard indexShard, final Engine.Searcher searcher) {
|
||||
final MapperService mapperService = indexShard.mapperService();
|
||||
final Map<String, MappedFieldType> warmUp = new HashMap<>();
|
||||
for (DocumentMapper docMapper : mapperService.docMappers(false)) {
|
||||
for (FieldMapper fieldMapper : docMapper.mappers()) {
|
||||
final FieldDataType fieldDataType = fieldMapper.fieldType().fieldDataType();
|
||||
final String indexName = fieldMapper.fieldType().name();
|
||||
if (fieldDataType == null) {
|
||||
continue;
|
||||
}
|
||||
if (fieldDataType.getLoading() == Loading.LAZY) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (warmUp.containsKey(indexName)) {
|
||||
continue;
|
||||
}
|
||||
warmUp.put(indexName, fieldMapper.fieldType());
|
||||
}
|
||||
}
|
||||
final IndexFieldDataService indexFieldDataService = indexShard.indexFieldDataService();
|
||||
final Executor executor = indicesWarmer.getExecutor();
|
||||
final CountDownLatch latch = new CountDownLatch(searcher.reader().leaves().size() * warmUp.size());
|
||||
for (final LeafReaderContext ctx : searcher.reader().leaves()) {
|
||||
for (final MappedFieldType fieldType : warmUp.values()) {
|
||||
executor.execute(new Runnable() {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
final long start = System.nanoTime();
|
||||
indexFieldDataService.getForField(fieldType).load(ctx);
|
||||
if (indexShard.warmerService().logger().isTraceEnabled()) {
|
||||
indexShard.warmerService().logger().trace("warmed fielddata for [{}], took [{}]", fieldType.name(), TimeValue.timeValueNanos(System.nanoTime() - start));
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
indexShard.warmerService().logger().warn("failed to warm-up fielddata for [{}]", t, fieldType.name());
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
});
|
||||
}
|
||||
}
|
||||
return new TerminationHandle() {
|
||||
@Override
|
||||
public void awaitTermination() throws InterruptedException {
|
||||
latch.await();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public TerminationHandle warmTopReader(final IndexShard indexShard, final Engine.Searcher searcher) {
|
||||
final MapperService mapperService = indexShard.mapperService();
|
||||
final Map<String, MappedFieldType> warmUpGlobalOrdinals = new HashMap<>();
|
||||
for (DocumentMapper docMapper : mapperService.docMappers(false)) {
|
||||
for (FieldMapper fieldMapper : docMapper.mappers()) {
|
||||
final FieldDataType fieldDataType = fieldMapper.fieldType().fieldDataType();
|
||||
final String indexName = fieldMapper.fieldType().name();
|
||||
if (fieldDataType == null) {
|
||||
continue;
|
||||
}
|
||||
if (fieldDataType.getLoading() != Loading.EAGER_GLOBAL_ORDINALS) {
|
||||
continue;
|
||||
}
|
||||
if (warmUpGlobalOrdinals.containsKey(indexName)) {
|
||||
continue;
|
||||
}
|
||||
warmUpGlobalOrdinals.put(indexName, fieldMapper.fieldType());
|
||||
}
|
||||
}
|
||||
final IndexFieldDataService indexFieldDataService = indexShard.indexFieldDataService();
|
||||
final Executor executor = indicesWarmer.getExecutor();
|
||||
final CountDownLatch latch = new CountDownLatch(warmUpGlobalOrdinals.size());
|
||||
for (final MappedFieldType fieldType : warmUpGlobalOrdinals.values()) {
|
||||
executor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
final long start = System.nanoTime();
|
||||
IndexFieldData.Global ifd = indexFieldDataService.getForField(fieldType);
|
||||
ifd.loadGlobal(searcher.getDirectoryReader());
|
||||
if (indexShard.warmerService().logger().isTraceEnabled()) {
|
||||
indexShard.warmerService().logger().trace("warmed global ordinals for [{}], took [{}]", fieldType.name(), TimeValue.timeValueNanos(System.nanoTime() - start));
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
indexShard.warmerService().logger().warn("failed to warm-up global ordinals for [{}]", t, fieldType.name());
|
||||
} finally {
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
return new TerminationHandle() {
|
||||
@Override
|
||||
public void awaitTermination() throws InterruptedException {
|
||||
latch.await();
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
class Reaper implements Runnable {
|
||||
@Override
|
||||
public void run() {
|
||||
|
|
|
@ -53,7 +53,6 @@ import org.elasticsearch.index.similarity.SimilarityService;
|
|||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.index.store.IndexStoreConfig;
|
||||
import org.elasticsearch.indices.IndicesModule;
|
||||
import org.elasticsearch.indices.IndicesWarmer;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.indices.cache.query.IndicesQueryCache;
|
||||
|
@ -104,7 +103,6 @@ public class IndexModuleTests extends ESTestCase {
|
|||
static NodeServicesProvider newNodeServiceProvider(Settings settings, Environment environment, Client client, ScriptEngineService... scriptEngineServices) throws IOException {
|
||||
// TODO this can be used in other place too - lets first refactor the IndicesQueriesRegistry
|
||||
ThreadPool threadPool = new ThreadPool("test");
|
||||
IndicesWarmer warmer = new IndicesWarmer(settings, threadPool);
|
||||
IndicesQueryCache indicesQueryCache = new IndicesQueryCache(settings);
|
||||
CircuitBreakerService circuitBreakerService = new NoneCircuitBreakerService();
|
||||
PageCacheRecycler recycler = new PageCacheRecycler(settings, threadPool);
|
||||
|
@ -116,7 +114,7 @@ public class IndexModuleTests extends ESTestCase {
|
|||
ScriptSettings scriptSettings = new ScriptSettings(scriptEngineRegistry, scriptContextRegistry);
|
||||
ScriptService scriptService = new ScriptService(settings, environment, scriptEngines, new ResourceWatcherService(settings, threadPool), scriptEngineRegistry, scriptContextRegistry, scriptSettings);
|
||||
IndicesQueriesRegistry indicesQueriesRegistry = new IndicesQueriesRegistry(settings, emptyMap());
|
||||
return new NodeServicesProvider(threadPool, indicesQueryCache, warmer, bigArrays, client, scriptService, indicesQueriesRegistry, circuitBreakerService);
|
||||
return new NodeServicesProvider(threadPool, indicesQueryCache, bigArrays, client, scriptService, indicesQueriesRegistry, circuitBreakerService);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -41,10 +41,8 @@ import org.apache.lucene.util.BitSet;
|
|||
import org.apache.lucene.util.IOUtils;
|
||||
import org.elasticsearch.common.lucene.index.ElasticsearchDirectoryReader;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesWarmer;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.IndexSettingsModule;
|
||||
|
||||
|
@ -57,8 +55,6 @@ import static org.hamcrest.Matchers.equalTo;
|
|||
public class BitSetFilterCacheTests extends ESTestCase {
|
||||
|
||||
private static final IndexSettings INDEX_SETTINGS = IndexSettingsModule.newIndexSettings("test", Settings.EMPTY);
|
||||
private final IndicesWarmer warmer = new IndicesWarmer(Settings.EMPTY, null);
|
||||
|
||||
|
||||
private static int matchCount(BitSetProducer producer, IndexReader reader) throws IOException {
|
||||
int count = 0;
|
||||
|
@ -95,7 +91,7 @@ public class BitSetFilterCacheTests extends ESTestCase {
|
|||
reader = ElasticsearchDirectoryReader.wrap(reader, new ShardId("test", "_na_", 0));
|
||||
IndexSearcher searcher = new IndexSearcher(reader);
|
||||
|
||||
BitsetFilterCache cache = new BitsetFilterCache(INDEX_SETTINGS, warmer, new BitsetFilterCache.Listener() {
|
||||
BitsetFilterCache cache = new BitsetFilterCache(INDEX_SETTINGS, new BitsetFilterCache.Listener() {
|
||||
@Override
|
||||
public void onCache(ShardId shardId, Accountable accountable) {
|
||||
|
||||
|
@ -149,7 +145,7 @@ public class BitSetFilterCacheTests extends ESTestCase {
|
|||
final AtomicInteger onCacheCalls = new AtomicInteger();
|
||||
final AtomicInteger onRemoveCalls = new AtomicInteger();
|
||||
|
||||
final BitsetFilterCache cache = new BitsetFilterCache(INDEX_SETTINGS, warmer, new BitsetFilterCache.Listener() {
|
||||
final BitsetFilterCache cache = new BitsetFilterCache(INDEX_SETTINGS, new BitsetFilterCache.Listener() {
|
||||
@Override
|
||||
public void onCache(ShardId shardId, Accountable accountable) {
|
||||
onCacheCalls.incrementAndGet();
|
||||
|
@ -188,7 +184,7 @@ public class BitSetFilterCacheTests extends ESTestCase {
|
|||
|
||||
public void testSetNullListener() {
|
||||
try {
|
||||
new BitsetFilterCache(INDEX_SETTINGS, warmer, null);
|
||||
new BitsetFilterCache(INDEX_SETTINGS, null);
|
||||
fail("listener can't be null");
|
||||
} catch (IllegalArgumentException ex) {
|
||||
assertEquals("listener must not be null", ex.getMessage());
|
||||
|
@ -197,7 +193,7 @@ public class BitSetFilterCacheTests extends ESTestCase {
|
|||
}
|
||||
|
||||
public void testRejectOtherIndex() throws IOException {
|
||||
BitsetFilterCache cache = new BitsetFilterCache(INDEX_SETTINGS, warmer, new BitsetFilterCache.Listener() {
|
||||
BitsetFilterCache cache = new BitsetFilterCache(INDEX_SETTINGS, new BitsetFilterCache.Listener() {
|
||||
@Override
|
||||
public void onCache(ShardId shardId, Accountable accountable) {
|
||||
|
||||
|
@ -208,7 +204,7 @@ public class BitSetFilterCacheTests extends ESTestCase {
|
|||
|
||||
}
|
||||
});
|
||||
|
||||
|
||||
Directory dir = newDirectory();
|
||||
IndexWriter writer = new IndexWriter(
|
||||
dir,
|
||||
|
@ -218,9 +214,9 @@ public class BitSetFilterCacheTests extends ESTestCase {
|
|||
DirectoryReader reader = DirectoryReader.open(writer, true);
|
||||
writer.close();
|
||||
reader = ElasticsearchDirectoryReader.wrap(reader, new ShardId("test2", "_na_", 0));
|
||||
|
||||
|
||||
BitSetProducer producer = cache.getBitSetProducer(new MatchAllDocsQuery());
|
||||
|
||||
|
||||
try {
|
||||
producer.getBitSet(reader.leaves().get(0));
|
||||
fail();
|
||||
|
|
|
@ -78,7 +78,6 @@ import org.elasticsearch.index.query.support.QueryParsers;
|
|||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
import org.elasticsearch.indices.IndicesModule;
|
||||
import org.elasticsearch.indices.IndicesWarmer;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
|
||||
|
@ -275,7 +274,7 @@ public abstract class AbstractQueryTestCase<QB extends AbstractQueryBuilder<QB>>
|
|||
IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() {
|
||||
});
|
||||
indexFieldDataService = new IndexFieldDataService(idxSettings, indicesFieldDataCache, injector.getInstance(CircuitBreakerService.class), mapperService);
|
||||
BitsetFilterCache bitsetFilterCache = new BitsetFilterCache(idxSettings, new IndicesWarmer(idxSettings.getNodeSettings(), null), new BitsetFilterCache.Listener() {
|
||||
BitsetFilterCache bitsetFilterCache = new BitsetFilterCache(idxSettings, new BitsetFilterCache.Listener() {
|
||||
@Override
|
||||
public void onCache(ShardId shardId, Accountable accountable) {
|
||||
|
||||
|
|
|
@ -1154,7 +1154,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
|
|||
ShardRouting routing = new ShardRouting(shard.routingEntry());
|
||||
shard.close("simon says", true);
|
||||
NodeServicesProvider indexServices = indexService.getIndexServices();
|
||||
IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, indexServices, indexService.getSearchSlowLog(), listeners);
|
||||
IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, indexServices, indexService.getSearchSlowLog(), null, listeners);
|
||||
ShardRoutingHelper.reinit(routing);
|
||||
newShard.updateRoutingEntry(routing, false);
|
||||
DiscoveryNode localNode = new DiscoveryNode("foo", DummyTransportAddress.INSTANCE, Version.CURRENT);
|
||||
|
|
|
@ -52,7 +52,6 @@ import org.elasticsearch.index.query.functionscore.ScoreFunctionParser;
|
|||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.similarity.SimilarityService;
|
||||
import org.elasticsearch.indices.IndicesModule;
|
||||
import org.elasticsearch.indices.IndicesWarmer;
|
||||
import org.elasticsearch.indices.breaker.CircuitBreakerService;
|
||||
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
|
||||
import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
|
||||
|
@ -141,7 +140,7 @@ public class TemplateQueryParserTests extends ESTestCase {
|
|||
MapperService mapperService = new MapperService(idxSettings, analysisService, similarityService, mapperRegistry, () -> context);
|
||||
IndicesFieldDataCache cache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() {});
|
||||
IndexFieldDataService indexFieldDataService =new IndexFieldDataService(idxSettings, cache, injector.getInstance(CircuitBreakerService.class), mapperService);
|
||||
BitsetFilterCache bitsetFilterCache = new BitsetFilterCache(idxSettings, new IndicesWarmer(idxSettings.getNodeSettings(), null), new BitsetFilterCache.Listener() {
|
||||
BitsetFilterCache bitsetFilterCache = new BitsetFilterCache(idxSettings, new BitsetFilterCache.Listener() {
|
||||
@Override
|
||||
public void onCache(ShardId shardId, Accountable accountable) {
|
||||
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.elasticsearch.common.settings.ClusterSettings;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.BigArrays;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.indices.IndicesWarmer;
|
||||
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.script.ScriptService;
|
||||
|
@ -67,10 +66,10 @@ public class MockSearchService extends SearchService {
|
|||
}
|
||||
|
||||
@Inject
|
||||
public MockSearchService(Settings settings, ClusterSettings clusterSettings, ClusterService clusterService, IndicesService indicesService, IndicesWarmer indicesWarmer,
|
||||
public MockSearchService(Settings settings, ClusterSettings clusterSettings, ClusterService clusterService, IndicesService indicesService,
|
||||
ThreadPool threadPool, ScriptService scriptService, PageCacheRecycler pageCacheRecycler, BigArrays bigArrays,
|
||||
DfsPhase dfsPhase, QueryPhase queryPhase, FetchPhase fetchPhase, IndicesRequestCache indicesQueryCache) {
|
||||
super(settings, clusterSettings, clusterService, indicesService, indicesWarmer, threadPool, scriptService, pageCacheRecycler, bigArrays, dfsPhase,
|
||||
super(settings, clusterSettings, clusterService, indicesService, threadPool, scriptService, pageCacheRecycler, bigArrays, dfsPhase,
|
||||
queryPhase, fetchPhase, indicesQueryCache);
|
||||
}
|
||||
|
||||
|
|
|
@ -47,7 +47,6 @@ import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
|
|||
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.BulkRequestBuilder;
|
||||
import org.elasticsearch.action.bulk.BulkResponse;
|
||||
import org.elasticsearch.action.delete.DeleteResponse;
|
||||
import org.elasticsearch.action.get.GetResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
|
@ -108,6 +107,7 @@ import org.elasticsearch.index.MergePolicyConfig;
|
|||
import org.elasticsearch.index.MergeSchedulerConfig;
|
||||
import org.elasticsearch.index.translog.Translog;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.index.IndexWarmer;
|
||||
import org.elasticsearch.indices.cache.request.IndicesRequestCache;
|
||||
import org.elasticsearch.indices.store.IndicesStore;
|
||||
import org.elasticsearch.node.NodeMocksPlugin;
|
||||
|
@ -115,7 +115,6 @@ import org.elasticsearch.plugins.Plugin;
|
|||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.search.MockSearchService;
|
||||
import org.elasticsearch.search.SearchHit;
|
||||
import org.elasticsearch.search.SearchService;
|
||||
import org.elasticsearch.test.client.RandomizingClient;
|
||||
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
|
||||
import org.elasticsearch.test.rest.client.http.HttpRequestBuilder;
|
||||
|
@ -129,7 +128,6 @@ import org.junit.AfterClass;
|
|||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.lang.annotation.Annotation;
|
||||
|
@ -510,7 +508,7 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
|||
|
||||
private static Settings.Builder setRandomIndexNormsLoading(Random random, Settings.Builder builder) {
|
||||
if (random.nextBoolean()) {
|
||||
builder.put(SearchService.INDEX_NORMS_LOADING_SETTING.getKey(), RandomPicks.randomFrom(random, Arrays.asList(MappedFieldType.Loading.EAGER, MappedFieldType.Loading.LAZY)));
|
||||
builder.put(IndexWarmer.INDEX_NORMS_LOADING_SETTING.getKey(), RandomPicks.randomFrom(random, Arrays.asList(MappedFieldType.Loading.EAGER, MappedFieldType.Loading.LAZY)));
|
||||
}
|
||||
return builder;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue