Factor out slow logs into Search and IndexingOperationListeners

This commit introduces SearchOperationListeneres which allow to hook
into search operation lifecycle and execute operations like slow-logs
and statistic collection in a transparent way. SearchOperationListenrs
can be registered on the IndexModule just like IndexingOperationListeners.
The main consumers (slow log) have already been moved out of IndexService
into IndexModule which reduces the dependency on IndexService as well as
IndexShard and makes slowlogging transparent.

Closes #17398
This commit is contained in:
Simon Willnauer 2016-03-30 10:36:54 +02:00
parent fc47007e17
commit 81801451ad
14 changed files with 754 additions and 110 deletions

View File

@ -415,7 +415,6 @@
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]http[/\\]netty[/\\]NettyHttpServerTransport.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]http[/\\]netty[/\\]NettyHttpServerTransport.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]AlreadyExpiredException.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]AlreadyExpiredException.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]CompositeIndexEventListener.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]CompositeIndexEventListener.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexModule.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexSettings.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexSettings.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexingSlowLog.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]IndexingSlowLog.java" checks="LineLength" />
<suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]MergePolicyConfig.java" checks="LineLength" /> <suppress files="core[/\\]src[/\\]main[/\\]java[/\\]org[/\\]elasticsearch[/\\]index[/\\]MergePolicyConfig.java" checks="LineLength" />

View File

@ -33,6 +33,7 @@ import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexingOperationListener; import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.similarity.BM25SimilarityProvider; import org.elasticsearch.index.similarity.BM25SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityProvider; import org.elasticsearch.index.similarity.SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.similarity.SimilarityService;
@ -43,11 +44,14 @@ import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.indices.mapper.MapperRegistry;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction; import java.util.function.BiFunction;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.Function; import java.util.function.Function;
@ -55,13 +59,15 @@ import java.util.function.Function;
/** /**
* IndexModule represents the central extension point for index level custom implementations like: * IndexModule represents the central extension point for index level custom implementations like:
* <ul> * <ul>
* <li>{@link SimilarityProvider} - New {@link SimilarityProvider} implementations can be registered through {@link #addSimilarity(String, BiFunction)} * <li>{@link SimilarityProvider} - New {@link SimilarityProvider} implementations can be registered through
* while existing Providers can be referenced through Settings under the {@link IndexModule#SIMILARITY_SETTINGS_PREFIX} prefix * {@link #addSimilarity(String, BiFunction)}while existing Providers can be referenced through Settings under the
* along with the "type" value. For example, to reference the {@link BM25SimilarityProvider}, the configuration * {@link IndexModule#SIMILARITY_SETTINGS_PREFIX} prefix along with the "type" value. For example, to reference the
* <tt>"index.similarity.my_similarity.type : "BM25"</tt> can be used.</li> * {@link BM25SimilarityProvider}, the configuration <tt>"index.similarity.my_similarity.type : "BM25"</tt> can be used.</li>
* <li>{@link IndexStore} - Custom {@link IndexStore} instances can be registered via {@link #addIndexStore(String, BiFunction)}</li> * <li>{@link IndexStore} - Custom {@link IndexStore} instances can be registered via {@link #addIndexStore(String, BiFunction)}</li>
* <li>{@link IndexEventListener} - Custom {@link IndexEventListener} instances can be registered via {@link #addIndexEventListener(IndexEventListener)}</li> * <li>{@link IndexEventListener} - Custom {@link IndexEventListener} instances can be registered via
* <li>Settings update listener - Custom settings update listener can be registered via {@link #addSettingsUpdateConsumer(Setting, Consumer)}</li> * {@link #addIndexEventListener(IndexEventListener)}</li>
* <li>Settings update listener - Custom settings update listener can be registered via
* {@link #addSettingsUpdateConsumer(Setting, Consumer)}</li>
* </ul> * </ul>
*/ */
public final class IndexModule { public final class IndexModule {
@ -84,11 +90,13 @@ public final class IndexModule {
final SetOnce<EngineFactory> engineFactory = new SetOnce<>(); final SetOnce<EngineFactory> engineFactory = new SetOnce<>();
private SetOnce<IndexSearcherWrapperFactory> indexSearcherWrapper = new SetOnce<>(); private SetOnce<IndexSearcherWrapperFactory> indexSearcherWrapper = new SetOnce<>();
private final Set<IndexEventListener> indexEventListeners = new HashSet<>(); private final Set<IndexEventListener> indexEventListeners = new HashSet<>();
private IndexEventListener listener;
private final Map<String, BiFunction<String, Settings, SimilarityProvider>> similarities = new HashMap<>(); private final Map<String, BiFunction<String, Settings, SimilarityProvider>> similarities = new HashMap<>();
private final Map<String, BiFunction<IndexSettings, IndexStoreConfig, IndexStore>> storeTypes = new HashMap<>(); private final Map<String, BiFunction<IndexSettings, IndexStoreConfig, IndexStore>> storeTypes = new HashMap<>();
private final Map<String, BiFunction<IndexSettings, IndicesQueryCache, QueryCache>> queryCaches = new HashMap<>(); private final Map<String, BiFunction<IndexSettings, IndicesQueryCache, QueryCache>> queryCaches = new HashMap<>();
private final SetOnce<String> forceQueryCacheType = new SetOnce<>(); private final SetOnce<String> forceQueryCacheType = new SetOnce<>();
private final List<SearchOperationListener> searchOperationListeners = new ArrayList<>();
private final List<IndexingOperationListener> indexOperationListeners = new ArrayList<>();
private final AtomicBoolean frozen = new AtomicBoolean(false);
public IndexModule(IndexSettings indexSettings, IndexStoreConfig indexStoreConfig, AnalysisRegistry analysisRegistry) { public IndexModule(IndexSettings indexSettings, IndexStoreConfig indexStoreConfig, AnalysisRegistry analysisRegistry) {
this.indexStoreConfig = indexStoreConfig; this.indexStoreConfig = indexStoreConfig;
@ -96,12 +104,15 @@ public final class IndexModule {
this.analysisRegistry = analysisRegistry; this.analysisRegistry = analysisRegistry;
registerQueryCache(INDEX_QUERY_CACHE, IndexQueryCache::new); registerQueryCache(INDEX_QUERY_CACHE, IndexQueryCache::new);
registerQueryCache(NONE_QUERY_CACHE, (a, b) -> new NoneQueryCache(a)); registerQueryCache(NONE_QUERY_CACHE, (a, b) -> new NoneQueryCache(a));
this.searchOperationListeners.add(new SearchSlowLog(indexSettings));
this.indexOperationListeners.add(new IndexingSlowLog(indexSettings));
} }
/** /**
* Adds a Setting and it's consumer for this index. * Adds a Setting and it's consumer for this index.
*/ */
public <T> void addSettingsUpdateConsumer(Setting<T> setting, Consumer<T> consumer) { public <T> void addSettingsUpdateConsumer(Setting<T> setting, Consumer<T> consumer) {
ensureNotFrozen();
if (setting == null) { if (setting == null) {
throw new IllegalArgumentException("setting must not be null"); throw new IllegalArgumentException("setting must not be null");
} }
@ -134,9 +145,7 @@ public final class IndexModule {
* </p> * </p>
*/ */
public void addIndexEventListener(IndexEventListener listener) { public void addIndexEventListener(IndexEventListener listener) {
if (this.listener != null) { ensureNotFrozen();
throw new IllegalStateException("can't add listener after listeners are frozen");
}
if (listener == null) { if (listener == null) {
throw new IllegalArgumentException("listener must not be null"); throw new IllegalArgumentException("listener must not be null");
} }
@ -147,6 +156,52 @@ public final class IndexModule {
this.indexEventListeners.add(listener); this.indexEventListeners.add(listener);
} }
/**
* Adds an {@link SearchOperationListener} for this index. All listeners added here
* are maintained for the entire index lifecycle on this node. Once an index is closed or deleted these
* listeners go out of scope.
* <p>
* Note: an index might be created on a node multiple times. For instance if the last shard from an index is
* relocated to another node the internal representation will be destroyed which includes the registered listeners.
* Once the node holds at least one shard of an index all modules are reloaded and listeners are registered again.
* Listeners can't be unregistered they will stay alive for the entire time the index is allocated on a node.
* </p>
*/
public void addSearchOperationListener(SearchOperationListener listener) {
ensureNotFrozen();
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");
}
if (searchOperationListeners.contains(listener)) {
throw new IllegalArgumentException("listener already added");
}
this.searchOperationListeners.add(listener);
}
/**
* Adds an {@link IndexingOperationListener} for this index. All listeners added here
* are maintained for the entire index lifecycle on this node. Once an index is closed or deleted these
* listeners go out of scope.
* <p>
* Note: an index might be created on a node multiple times. For instance if the last shard from an index is
* relocated to another node the internal representation will be destroyed which includes the registered listeners.
* Once the node holds at least one shard of an index all modules are reloaded and listeners are registered again.
* Listeners can't be unregistered they will stay alive for the entire time the index is allocated on a node.
* </p>
*/
public void addIndexOperationListener(IndexingOperationListener listener) {
ensureNotFrozen();
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");
}
if (indexOperationListeners.contains(listener)) {
throw new IllegalArgumentException("listener already added");
}
this.indexOperationListeners.add(listener);
}
/** /**
* Adds an {@link IndexStore} type to this index module. Typically stores are registered with a reference to * Adds an {@link IndexStore} type to this index module. Typically stores are registered with a reference to
* it's constructor: * it's constructor:
@ -158,6 +213,7 @@ public final class IndexModule {
* @param provider the instance provider / factory method * @param provider the instance provider / factory method
*/ */
public void addIndexStore(String type, BiFunction<IndexSettings, IndexStoreConfig, IndexStore> provider) { public void addIndexStore(String type, BiFunction<IndexSettings, IndexStoreConfig, IndexStore> provider) {
ensureNotFrozen();
if (storeTypes.containsKey(type)) { if (storeTypes.containsKey(type)) {
throw new IllegalArgumentException("key [" + type +"] already registered"); throw new IllegalArgumentException("key [" + type +"] already registered");
} }
@ -172,6 +228,7 @@ public final class IndexModule {
* @param similarity SimilarityProvider to register * @param similarity SimilarityProvider to register
*/ */
public void addSimilarity(String name, BiFunction<String, Settings, SimilarityProvider> similarity) { public void addSimilarity(String name, BiFunction<String, Settings, SimilarityProvider> similarity) {
ensureNotFrozen();
if (similarities.containsKey(name) || SimilarityService.BUILT_IN.containsKey(name)) { if (similarities.containsKey(name) || SimilarityService.BUILT_IN.containsKey(name)) {
throw new IllegalArgumentException("similarity for name: [" + name + " is already registered"); throw new IllegalArgumentException("similarity for name: [" + name + " is already registered");
} }
@ -184,6 +241,7 @@ public final class IndexModule {
* @param provider the provider instance * @param provider the provider instance
*/ */
public void registerQueryCache(String name, BiFunction<IndexSettings, IndicesQueryCache, QueryCache> provider) { public void registerQueryCache(String name, BiFunction<IndexSettings, IndicesQueryCache, QueryCache> provider) {
ensureNotFrozen();
if (provider == null) { if (provider == null) {
throw new IllegalArgumentException("provider must not be null"); throw new IllegalArgumentException("provider must not be null");
} }
@ -194,19 +252,21 @@ public final class IndexModule {
} }
/** /**
* Sets a {@link org.elasticsearch.index.IndexModule.IndexSearcherWrapperFactory} that is called once the IndexService is fully constructed. * Sets a {@link org.elasticsearch.index.IndexModule.IndexSearcherWrapperFactory} that is called once the IndexService
* is fully constructed.
* Note: this method can only be called once per index. Multiple wrappers are not supported. * Note: this method can only be called once per index. Multiple wrappers are not supported.
*/ */
public void setSearcherWrapper(IndexSearcherWrapperFactory indexSearcherWrapperFactory) { public void setSearcherWrapper(IndexSearcherWrapperFactory indexSearcherWrapperFactory) {
ensureNotFrozen();
this.indexSearcherWrapper.set(indexSearcherWrapperFactory); this.indexSearcherWrapper.set(indexSearcherWrapperFactory);
} }
public IndexEventListener freeze() { IndexEventListener freeze() { // pkg private for testing
// TODO somehow we need to make this pkg private... if (this.frozen.compareAndSet(false, true)) {
if (listener == null) { return new CompositeIndexEventListener(indexSettings, indexEventListeners);
listener = new CompositeIndexEventListener(indexSettings, indexEventListeners); } else {
throw new IllegalStateException("already frozen");
} }
return listener;
} }
private static boolean isBuiltinType(String storeType) { private static boolean isBuiltinType(String storeType) {
@ -246,10 +306,13 @@ public final class IndexModule {
IndexSearcherWrapper newWrapper(final IndexService indexService); IndexSearcherWrapper newWrapper(final IndexService indexService);
} }
public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter, NodeServicesProvider servicesProvider, IndicesQueryCache indicesQueryCache, MapperRegistry mapperRegistry, IndicesFieldDataCache indicesFieldDataCache, public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter,
IndexingOperationListener... listeners) throws IOException { NodeServicesProvider servicesProvider, IndicesQueryCache indicesQueryCache,
IndexSearcherWrapperFactory searcherWrapperFactory = indexSearcherWrapper.get() == null ? (shard) -> null : indexSearcherWrapper.get(); MapperRegistry mapperRegistry, IndicesFieldDataCache indicesFieldDataCache) throws IOException {
IndexEventListener eventListener = freeze(); final IndexEventListener eventListener = freeze();
IndexSearcherWrapperFactory searcherWrapperFactory = indexSearcherWrapper.get() == null
? (shard) -> null : indexSearcherWrapper.get();
eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings());
final String storeType = indexSettings.getValue(INDEX_STORE_TYPE_SETTING); final String storeType = indexSettings.getValue(INDEX_STORE_TYPE_SETTING);
final IndexStore store; final IndexStore store;
if (Strings.isEmpty(storeType) || isBuiltinType(storeType)) { if (Strings.isEmpty(storeType) || isBuiltinType(storeType)) {
@ -265,12 +328,15 @@ public final class IndexModule {
} }
} }
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING, store::setType); indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING, store::setType);
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, store::setMaxRate); indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING,
final String queryCacheType = forceQueryCacheType.get() != null ? forceQueryCacheType.get() : indexSettings.getValue(INDEX_QUERY_CACHE_TYPE_SETTING); store::setMaxRate);
final String queryCacheType = forceQueryCacheType.get() != null
? forceQueryCacheType.get() : indexSettings.getValue(INDEX_QUERY_CACHE_TYPE_SETTING);
final BiFunction<IndexSettings, IndicesQueryCache, QueryCache> queryCacheProvider = queryCaches.get(queryCacheType); final BiFunction<IndexSettings, IndicesQueryCache, QueryCache> queryCacheProvider = queryCaches.get(queryCacheType);
final QueryCache queryCache = queryCacheProvider.apply(indexSettings, indicesQueryCache); final QueryCache queryCache = queryCacheProvider.apply(indexSettings, indicesQueryCache);
return new IndexService(indexSettings, environment, new SimilarityService(indexSettings, similarities), shardStoreDeleter, analysisRegistry, engineFactory.get(), return new IndexService(indexSettings, environment, new SimilarityService(indexSettings, similarities), shardStoreDeleter,
servicesProvider, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry, indicesFieldDataCache, listeners); analysisRegistry, engineFactory.get(), servicesProvider, queryCache, store, eventListener, searcherWrapperFactory,
mapperRegistry, indicesFieldDataCache, searchOperationListeners, indexOperationListeners);
} }
/** /**
@ -282,7 +348,14 @@ public final class IndexModule {
* @see #INDEX_QUERY_CACHE_TYPE_SETTING * @see #INDEX_QUERY_CACHE_TYPE_SETTING
*/ */
public void forceQueryCacheType(String type) { public void forceQueryCacheType(String type) {
ensureNotFrozen();
this.forceQueryCacheType.set(type); this.forceQueryCacheType.set(type);
} }
private void ensureNotFrozen() {
if (this.frozen.get()) {
throw new IllegalStateException("Can't modify IndexModule once the index service has been created");
}
}
} }

View File

@ -56,6 +56,7 @@ import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexingOperationListener; import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShadowIndexShard; import org.elasticsearch.index.shard.ShadowIndexShard;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.shard.ShardNotFoundException;
@ -73,8 +74,10 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.Set; import java.util.Set;
@ -109,11 +112,10 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean deleted = new AtomicBoolean(false); private final AtomicBoolean deleted = new AtomicBoolean(false);
private final IndexSettings indexSettings; private final IndexSettings indexSettings;
private final IndexingSlowLog slowLog; private final List<IndexingOperationListener> indexingOperationListeners;
private final IndexingOperationListener[] listeners; private final List<SearchOperationListener> searchOperationListeners;
private volatile AsyncRefreshTask refreshTask; private volatile AsyncRefreshTask refreshTask;
private volatile AsyncTranslogFSync fsyncTask; private volatile AsyncTranslogFSync fsyncTask;
private final SearchSlowLog searchSlowLog;
private final ThreadPool threadPool; private final ThreadPool threadPool;
private final BigArrays bigArrays; private final BigArrays bigArrays;
@ -129,7 +131,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
IndexModule.IndexSearcherWrapperFactory wrapperFactory, IndexModule.IndexSearcherWrapperFactory wrapperFactory,
MapperRegistry mapperRegistry, MapperRegistry mapperRegistry,
IndicesFieldDataCache indicesFieldDataCache, IndicesFieldDataCache indicesFieldDataCache,
IndexingOperationListener... listenersIn) throws IOException { List<SearchOperationListener> searchOperationListeners,
List<IndexingOperationListener> indexingOperationListeners) throws IOException {
super(indexSettings); super(indexSettings);
this.indexSettings = indexSettings; this.indexSettings = indexSettings;
this.analysisService = registry.build(indexSettings); this.analysisService = registry.build(indexSettings);
@ -155,15 +158,10 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
this.engineFactory = engineFactory; this.engineFactory = engineFactory;
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE // initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
this.searcherWrapper = wrapperFactory.newWrapper(this); this.searcherWrapper = wrapperFactory.newWrapper(this);
this.slowLog = new IndexingSlowLog(indexSettings); this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners);
this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
// Add our slowLog to the incoming IndexingOperationListeners:
this.listeners = new IndexingOperationListener[1+listenersIn.length];
this.listeners[0] = slowLog;
System.arraycopy(listenersIn, 0, this.listeners, 1, listenersIn.length);
// kick off async ops for the first shard in this index // kick off async ops for the first shard in this index
this.refreshTask = new AsyncRefreshTask(this); this.refreshTask = new AsyncRefreshTask(this);
searchSlowLog = new SearchSlowLog(indexSettings);
rescheduleFsyncTask(indexSettings.getTranslogDurability()); rescheduleFsyncTask(indexSettings.getTranslogDurability());
} }
@ -338,12 +336,13 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
new StoreCloseListener(shardId, canDeleteShardContent, () -> eventListener.onStoreClosed(shardId))); new StoreCloseListener(shardId, canDeleteShardContent, () -> eventListener.onStoreClosed(shardId)));
if (useShadowEngine(primary, indexSettings)) { if (useShadowEngine(primary, indexSettings)) {
indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService,
indexFieldData, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, searchSlowLog, engineWarmer); indexFieldData, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
searchOperationListeners);
// no indexing listeners - shadow engines don't index // no indexing listeners - shadow engines don't index
} else { } else {
indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService, indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService,
indexFieldData, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, searchSlowLog, engineWarmer, indexFieldData, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
listeners); searchOperationListeners, indexingOperationListeners);
} }
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard); eventListener.afterIndexShardCreated(indexShard);
@ -455,8 +454,12 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
return bigArrays; return bigArrays;
} }
public SearchSlowLog getSearchSlowLog() { List<IndexingOperationListener> getIndexOperationListeners() { // pkg private for testing
return searchSlowLog; return indexingOperationListeners;
}
List<SearchOperationListener> getSearchOperationListener() { // pkg private for testing
return searchOperationListeners;
} }
private class StoreCloseListener implements Store.OnClose { private class StoreCloseListener implements Store.OnClose {

View File

@ -136,6 +136,7 @@ public final class IndexingSlowLog implements IndexingOperationListener {
} }
@Override
public void postIndex(Engine.Index index, boolean created) { public void postIndex(Engine.Index index, boolean created) {
final long took = index.endTime() - index.startTime(); final long took = index.endTime() - index.startTime();
postIndexing(index.parsedDoc(), took); postIndexing(index.parsedDoc(), took);

View File

@ -25,13 +25,14 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
*/ */
public final class SearchSlowLog { public final class SearchSlowLog implements SearchOperationListener {
private boolean reformat; private boolean reformat;
@ -116,7 +117,7 @@ public final class SearchSlowLog {
this.queryLogger.setLevel(level.name()); this.queryLogger.setLevel(level.name());
this.fetchLogger.setLevel(level.name()); this.fetchLogger.setLevel(level.name());
} }
@Override
public void onQueryPhase(SearchContext context, long tookInNanos) { public void onQueryPhase(SearchContext context, long tookInNanos) {
if (queryWarnThreshold >= 0 && tookInNanos > queryWarnThreshold) { if (queryWarnThreshold >= 0 && tookInNanos > queryWarnThreshold) {
queryLogger.warn("{}", new SlowLogSearchContextPrinter(context, tookInNanos, reformat)); queryLogger.warn("{}", new SlowLogSearchContextPrinter(context, tookInNanos, reformat));
@ -129,6 +130,7 @@ public final class SearchSlowLog {
} }
} }
@Override
public void onFetchPhase(SearchContext context, long tookInNanos) { public void onFetchPhase(SearchContext context, long tookInNanos) {
if (fetchWarnThreshold >= 0 && tookInNanos > fetchWarnThreshold) { if (fetchWarnThreshold >= 0 && tookInNanos > fetchWarnThreshold) {
fetchLogger.warn("{}", new SlowLogSearchContextPrinter(context, tookInNanos, reformat)); fetchLogger.warn("{}", new SlowLogSearchContextPrinter(context, tookInNanos, reformat));

View File

@ -23,7 +23,7 @@ import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.index.SearchSlowLog; import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.SearchContext;
import java.util.HashMap; import java.util.HashMap;
@ -35,17 +35,12 @@ import static java.util.Collections.emptyMap;
/** /**
*/ */
public final class ShardSearchStats { public final class ShardSearchStats implements SearchOperationListener {
private final SearchSlowLog slowLogSearchService;
private final StatsHolder totalStats = new StatsHolder(); private final StatsHolder totalStats = new StatsHolder();
private final CounterMetric openContexts = new CounterMetric(); private final CounterMetric openContexts = new CounterMetric();
private volatile Map<String, StatsHolder> groupsStats = emptyMap(); private volatile Map<String, StatsHolder> groupsStats = emptyMap();
public ShardSearchStats(SearchSlowLog searchSlowLog) {
this.slowLogSearchService = searchSlowLog;
}
/** /**
* Returns the stats, including group specific stats. If the groups are null/0 length, then nothing * Returns the stats, including group specific stats. If the groups are null/0 length, then nothing
* is returned for them. If they are set, then only groups provided will be returned, or * is returned for them. If they are set, then only groups provided will be returned, or
@ -71,6 +66,7 @@ public final class ShardSearchStats {
return new SearchStats(total, openContexts.count(), groupsSt); return new SearchStats(total, openContexts.count(), groupsSt);
} }
@Override
public void onPreQueryPhase(SearchContext searchContext) { public void onPreQueryPhase(SearchContext searchContext) {
computeStats(searchContext, statsHolder -> { computeStats(searchContext, statsHolder -> {
if (searchContext.hasOnlySuggest()) { if (searchContext.hasOnlySuggest()) {
@ -81,6 +77,7 @@ public final class ShardSearchStats {
}); });
} }
@Override
public void onFailedQueryPhase(SearchContext searchContext) { public void onFailedQueryPhase(SearchContext searchContext) {
computeStats(searchContext, statsHolder -> { computeStats(searchContext, statsHolder -> {
if (searchContext.hasOnlySuggest()) { if (searchContext.hasOnlySuggest()) {
@ -91,6 +88,7 @@ public final class ShardSearchStats {
}); });
} }
@Override
public void onQueryPhase(SearchContext searchContext, long tookInNanos) { public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
computeStats(searchContext, statsHolder -> { computeStats(searchContext, statsHolder -> {
if (searchContext.hasOnlySuggest()) { if (searchContext.hasOnlySuggest()) {
@ -101,23 +99,24 @@ public final class ShardSearchStats {
statsHolder.queryCurrent.dec(); statsHolder.queryCurrent.dec();
} }
}); });
slowLogSearchService.onQueryPhase(searchContext, tookInNanos);
} }
@Override
public void onPreFetchPhase(SearchContext searchContext) { public void onPreFetchPhase(SearchContext searchContext) {
computeStats(searchContext, statsHolder -> statsHolder.fetchCurrent.inc()); computeStats(searchContext, statsHolder -> statsHolder.fetchCurrent.inc());
} }
@Override
public void onFailedFetchPhase(SearchContext searchContext) { public void onFailedFetchPhase(SearchContext searchContext) {
computeStats(searchContext, statsHolder -> statsHolder.fetchCurrent.dec()); computeStats(searchContext, statsHolder -> statsHolder.fetchCurrent.dec());
} }
@Override
public void onFetchPhase(SearchContext searchContext, long tookInNanos) { public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
computeStats(searchContext, statsHolder -> { computeStats(searchContext, statsHolder -> {
statsHolder.fetchMetric.inc(tookInNanos); statsHolder.fetchMetric.inc(tookInNanos);
statsHolder.fetchCurrent.dec(); statsHolder.fetchCurrent.dec();
}); });
slowLogSearchService.onFetchPhase(searchContext, tookInNanos);
} }
public void clear() { public void clear() {
@ -159,18 +158,22 @@ public final class ShardSearchStats {
return stats; return stats;
} }
@Override
public void onNewContext(SearchContext context) { public void onNewContext(SearchContext context) {
openContexts.inc(); openContexts.inc();
} }
@Override
public void onFreeContext(SearchContext context) { public void onFreeContext(SearchContext context) {
openContexts.dec(); openContexts.dec();
} }
@Override
public void onNewScrollContext(SearchContext context) { public void onNewScrollContext(SearchContext context) {
totalStats.scrollCurrent.inc(); totalStats.scrollCurrent.inc();
} }
@Override
public void onFreeScrollContext(SearchContext context) { public void onFreeScrollContext(SearchContext context) {
totalStats.scrollCurrent.dec(); totalStats.scrollCurrent.dec();
totalStats.scrollMetric.inc(System.nanoTime() - context.getOriginNanoTime()); totalStats.scrollMetric.inc(System.nanoTime() - context.getOriginNanoTime());

View File

@ -128,7 +128,7 @@ public class IndexShard extends AbstractIndexShardComponent {
private final IndexCache indexCache; private final IndexCache indexCache;
private final Store store; private final Store store;
private final InternalIndexingStats internalIndexingStats; private final InternalIndexingStats internalIndexingStats;
private final ShardSearchStats searchService; private final ShardSearchStats searchStats = new ShardSearchStats();;
private final ShardGetService getService; private final ShardGetService getService;
private final ShardIndexWarmerService shardWarmerService; private final ShardIndexWarmerService shardWarmerService;
private final ShardRequestCache shardQueryCache; private final ShardRequestCache shardQueryCache;
@ -151,6 +151,7 @@ public class IndexShard extends AbstractIndexShardComponent {
* being indexed/deleted. * being indexed/deleted.
*/ */
private final AtomicLong writingBytes = new AtomicLong(); private final AtomicLong writingBytes = new AtomicLong();
private final SearchOperationListener searchOperationListener;
protected volatile ShardRouting shardRouting; protected volatile ShardRouting shardRouting;
protected volatile IndexShardState state; protected volatile IndexShardState state;
@ -195,7 +196,7 @@ public class IndexShard extends AbstractIndexShardComponent {
MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService, MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService,
@Nullable EngineFactory engineFactory, @Nullable EngineFactory engineFactory,
IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, ThreadPool threadPool, BigArrays bigArrays, IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, ThreadPool threadPool, BigArrays bigArrays,
SearchSlowLog slowLog, Engine.Warmer warmer, IndexingOperationListener... listeners) { Engine.Warmer warmer, List<SearchOperationListener> searchOperationListener, List<IndexingOperationListener> listeners) {
super(shardId, indexSettings); super(shardId, indexSettings);
final Settings settings = indexSettings.getSettings(); final Settings settings = indexSettings.getSettings();
this.codecService = new CodecService(mapperService, logger); this.codecService = new CodecService(mapperService, logger);
@ -210,11 +211,13 @@ public class IndexShard extends AbstractIndexShardComponent {
this.mapperService = mapperService; this.mapperService = mapperService;
this.indexCache = indexCache; this.indexCache = indexCache;
this.internalIndexingStats = new InternalIndexingStats(); this.internalIndexingStats = new InternalIndexingStats();
final List<IndexingOperationListener> listenersList = new ArrayList<>(Arrays.asList(listeners)); final List<IndexingOperationListener> listenersList = new ArrayList<>(listeners);
listenersList.add(internalIndexingStats); listenersList.add(internalIndexingStats);
this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(listenersList, logger); this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(listenersList, logger);
final List<SearchOperationListener> searchListenersList = new ArrayList<>(searchOperationListener);
searchListenersList.add(searchStats);
this.searchOperationListener = new SearchOperationListener.CompositeListener(searchListenersList, logger);
this.getService = new ShardGetService(indexSettings, this, mapperService); this.getService = new ShardGetService(indexSettings, this, mapperService);
this.searchService = new ShardSearchStats(slowLog);
this.shardWarmerService = new ShardIndexWarmerService(shardId, indexSettings); this.shardWarmerService = new ShardIndexWarmerService(shardId, indexSettings);
this.shardQueryCache = new ShardRequestCache(); this.shardQueryCache = new ShardRequestCache();
this.shardFieldData = new ShardFieldData(); this.shardFieldData = new ShardFieldData();
@ -270,8 +273,8 @@ public class IndexShard extends AbstractIndexShardComponent {
return mapperService; return mapperService;
} }
public ShardSearchStats searchService() { public SearchOperationListener getSearchOperationListener() {
return this.searchService; return this.searchOperationListener;
} }
public ShardIndexWarmerService warmerService() { public ShardIndexWarmerService warmerService() {
@ -613,7 +616,7 @@ public class IndexShard extends AbstractIndexShardComponent {
} }
public SearchStats searchStats(String... groups) { public SearchStats searchStats(String... groups) {
return searchService.stats(groups); return searchStats.stats(groups);
} }
public GetStats getStats() { public GetStats getStats() {

View File

@ -0,0 +1,227 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.search.internal.SearchContext;
import java.util.List;
/**
* An listener for search, fetch and context events.
*/
public interface SearchOperationListener {
/**
* Executed before the query phase is executed
* @param searchContext the current search context
*/
default void onPreQueryPhase(SearchContext searchContext) {};
/**
* Executed if a query phased failed.
* @param searchContext the current search context
*/
default void onFailedQueryPhase(SearchContext searchContext) {};
/**
* Executed after the query phase successfully finished.
* Note: this is not invoked if the query phase execution failed.
* @param searchContext the current search context
* @param tookInNanos the number of nanoseconds the query execution took
*
* @see #onFailedQueryPhase(SearchContext)
*/
default void onQueryPhase(SearchContext searchContext, long tookInNanos) {};
/**
* Executed before the fetch phase is executed
* @param searchContext the current search context
*/
default void onPreFetchPhase(SearchContext searchContext) {};
/**
* Executed if a fetch phased failed.
* @param searchContext the current search context
*/
default void onFailedFetchPhase(SearchContext searchContext) {};
/**
* Executed after the fetch phase successfully finished.
* Note: this is not invoked if the fetch phase execution failed.
* @param searchContext the current search context
* @param tookInNanos the number of nanoseconds the fetch execution took
*
* @see #onFailedFetchPhase(SearchContext)
*/
default void onFetchPhase(SearchContext searchContext, long tookInNanos) {};
/**
* Executed when a new search context was created
* @param context the created context
*/
default void onNewContext(SearchContext context) {};
/**
* Executed when a previously created search context is freed.
* This happens either when the search execution finishes, if the
* execution failed or if the search context as idle for and needs to be
* cleaned up.
* @param context the freed search context
*/
default void onFreeContext(SearchContext context) {};
/**
* Executed when a new scroll search {@link SearchContext} was created
* @param context the created search context
*/
default void onNewScrollContext(SearchContext context) {};
/**
* Executed when a scroll search {@link SearchContext} is freed.
* This happens either when the scroll search execution finishes, if the
* execution failed or if the search context as idle for and needs to be
* cleaned up.
* @param context the freed search context
*/
default void onFreeScrollContext(SearchContext context) {};
/**
* A Composite listener that multiplexes calls to each of the listeners methods.
*/
final class CompositeListener implements SearchOperationListener {
private final List<SearchOperationListener> listeners;
private final ESLogger logger;
public CompositeListener(List<SearchOperationListener> listeners, ESLogger logger) {
this.listeners = listeners;
this.logger = logger;
}
@Override
public void onPreQueryPhase(SearchContext searchContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onPreQueryPhase(searchContext);
} catch (Throwable t) {
logger.warn("onPreQueryPhase listener [{}] failed", t, listener);
}
}
}
@Override
public void onFailedQueryPhase(SearchContext searchContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFailedQueryPhase(searchContext);
} catch (Throwable t) {
logger.warn("onFailedQueryPhase listener [{}] failed", t, listener);
}
}
}
@Override
public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
for (SearchOperationListener listener : listeners) {
try {
listener.onQueryPhase(searchContext, tookInNanos);
} catch (Throwable t) {
logger.warn("onQueryPhase listener [{}] failed", t, listener);
}
}
}
@Override
public void onPreFetchPhase(SearchContext searchContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onPreFetchPhase(searchContext);
} catch (Throwable t) {
logger.warn("onPreFetchPhase listener [{}] failed", t, listener);
}
}
}
@Override
public void onFailedFetchPhase(SearchContext searchContext) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFailedFetchPhase(searchContext);
} catch (Throwable t) {
logger.warn("onFailedFetchPhase listener [{}] failed", t, listener);
}
}
}
@Override
public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFetchPhase(searchContext, tookInNanos);
} catch (Throwable t) {
logger.warn("onFetchPhase listener [{}] failed", t, listener);
}
}
}
@Override
public void onNewContext(SearchContext context) {
for (SearchOperationListener listener : listeners) {
try {
listener.onNewContext(context);
} catch (Throwable t) {
logger.warn("onNewContext listener [{}] failed", t, listener);
}
}
}
@Override
public void onFreeContext(SearchContext context) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFreeContext(context);
} catch (Throwable t) {
logger.warn("onFreeContext listener [{}] failed", t, listener);
}
}
}
@Override
public void onNewScrollContext(SearchContext context) {
for (SearchOperationListener listener : listeners) {
try {
listener.onNewScrollContext(context);
} catch (Throwable t) {
logger.warn("onNewScrollContext listener [{}] failed", t, listener);
}
}
}
@Override
public void onFreeScrollContext(SearchContext context) {
for (SearchOperationListener listener : listeners) {
try {
listener.onFreeScrollContext(context);
} catch (Throwable t) {
logger.warn("onFreeScrollContext listener [{}] failed", t, listener);
}
}
}
}
}

View File

@ -36,6 +36,8 @@ import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.List;
/** /**
* ShadowIndexShard extends {@link IndexShard} to add file synchronization * ShadowIndexShard extends {@link IndexShard} to add file synchronization
@ -48,10 +50,10 @@ public final class ShadowIndexShard extends IndexShard {
public ShadowIndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache, public ShadowIndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache,
MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService, MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService,
@Nullable EngineFactory engineFactory, IndexEventListener indexEventListener, IndexSearcherWrapper wrapper, @Nullable EngineFactory engineFactory, IndexEventListener indexEventListener, IndexSearcherWrapper wrapper,
ThreadPool threadPool, BigArrays bigArrays, SearchSlowLog searchSlowLog, Engine.Warmer engineWarmer) ThreadPool threadPool, BigArrays bigArrays, Engine.Warmer engineWarmer,
throws IOException { List<SearchOperationListener> searchOperationListeners) throws IOException {
super(shardId, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory, super(shardId, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory,
indexEventListener, wrapper, threadPool, bigArrays, searchSlowLog, engineWarmer); indexEventListener, wrapper, threadPool, bigArrays, engineWarmer, searchOperationListeners, Collections.emptyList());
} }
/** /**

View File

@ -377,13 +377,14 @@ public class IndicesService extends AbstractLifecycleComponent<IndicesService> i
idxSettings.isShadowReplicaIndex() ? "s" : "", reason); idxSettings.isShadowReplicaIndex() ? "s" : "", reason);
final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry); final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry);
for (IndexingOperationListener operationListener : indexingOperationListeners) {
indexModule.addIndexOperationListener(operationListener);
}
pluginsService.onIndexModule(indexModule); pluginsService.onIndexModule(indexModule);
for (IndexEventListener listener : builtInListeners) { for (IndexEventListener listener : builtInListeners) {
indexModule.addIndexEventListener(listener); indexModule.addIndexEventListener(listener);
} }
final IndexEventListener listener = indexModule.freeze(); return indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, indicesQueryCache, mapperRegistry, indicesFieldDataCache);
listener.beforeIndexCreated(index, idxSettings.getSettings());
return indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, indicesQueryCache, mapperRegistry, indicesFieldDataCache, indexingOperationListeners);
} }
/** /**

View File

@ -51,10 +51,10 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fieldstats.FieldStatsProvider; import org.elasticsearch.index.fieldstats.FieldStatsProvider;
import org.elasticsearch.index.query.QueryParseContext; import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.index.query.QueryShardContext; import org.elasticsearch.index.query.QueryShardContext;
import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.search.stats.StatsGroupsParseElement; import org.elasticsearch.index.search.stats.StatsGroupsParseElement;
import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.script.ExecutableScript; import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptContext; import org.elasticsearch.script.ScriptContext;
@ -274,9 +274,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) throws IOException { public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) throws IOException {
final SearchContext context = createAndPutContext(request); final SearchContext context = createAndPutContext(request);
final ShardSearchStats shardSearchStats = context.indexShard().searchService(); final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
try { try {
shardSearchStats.onPreQueryPhase(context); operationListener.onPreQueryPhase(context);
long time = System.nanoTime(); long time = System.nanoTime();
contextProcessing(context); contextProcessing(context);
@ -287,7 +287,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
} else { } else {
contextProcessedSuccessfully(context); contextProcessedSuccessfully(context);
} }
shardSearchStats.onQueryPhase(context, System.nanoTime() - time); operationListener.onQueryPhase(context, System.nanoTime() - time);
return context.queryResult(); return context.queryResult();
} catch (Throwable e) { } catch (Throwable e) {
@ -295,7 +295,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
if (e instanceof ExecutionException) { if (e instanceof ExecutionException) {
e = e.getCause(); e = e.getCause();
} }
shardSearchStats.onFailedQueryPhase(context); operationListener.onFailedQueryPhase(context);
logger.trace("Query phase failed", e); logger.trace("Query phase failed", e);
processFailure(context, e); processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e); throw ExceptionsHelper.convertToRuntime(e);
@ -306,18 +306,18 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request) { public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request) {
final SearchContext context = findContext(request.id()); final SearchContext context = findContext(request.id());
ShardSearchStats shardSearchStats = context.indexShard().searchService(); SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
try { try {
shardSearchStats.onPreQueryPhase(context); operationListener.onPreQueryPhase(context);
long time = System.nanoTime(); long time = System.nanoTime();
contextProcessing(context); contextProcessing(context);
processScroll(request, context); processScroll(request, context);
queryPhase.execute(context); queryPhase.execute(context);
contextProcessedSuccessfully(context); contextProcessedSuccessfully(context);
shardSearchStats.onQueryPhase(context, System.nanoTime() - time); operationListener.onQueryPhase(context, System.nanoTime() - time);
return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget()); return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget());
} catch (Throwable e) { } catch (Throwable e) {
shardSearchStats.onFailedQueryPhase(context); operationListener.onFailedQueryPhase(context);
logger.trace("Query phase failed", e); logger.trace("Query phase failed", e);
processFailure(context, e); processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e); throw ExceptionsHelper.convertToRuntime(e);
@ -331,9 +331,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
contextProcessing(context); contextProcessing(context);
context.searcher().setAggregatedDfs(request.dfs()); context.searcher().setAggregatedDfs(request.dfs());
IndexShard indexShard = context.indexShard(); IndexShard indexShard = context.indexShard();
ShardSearchStats shardSearchStats = indexShard.searchService(); SearchOperationListener operationListener = indexShard.getSearchOperationListener();
try { try {
shardSearchStats.onPreQueryPhase(context); operationListener.onPreQueryPhase(context);
long time = System.nanoTime(); long time = System.nanoTime();
queryPhase.execute(context); queryPhase.execute(context);
if (context.queryResult().topDocs().scoreDocs.length == 0 && context.scrollContext() == null) { if (context.queryResult().topDocs().scoreDocs.length == 0 && context.scrollContext() == null) {
@ -342,10 +342,10 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
} else { } else {
contextProcessedSuccessfully(context); contextProcessedSuccessfully(context);
} }
shardSearchStats.onQueryPhase(context, System.nanoTime() - time); operationListener.onQueryPhase(context, System.nanoTime() - time);
return context.queryResult(); return context.queryResult();
} catch (Throwable e) { } catch (Throwable e) {
shardSearchStats.onFailedQueryPhase(context); operationListener.onFailedQueryPhase(context);
logger.trace("Query phase failed", e); logger.trace("Query phase failed", e);
processFailure(context, e); processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e); throw ExceptionsHelper.convertToRuntime(e);
@ -368,18 +368,18 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
final SearchContext context = createAndPutContext(request); final SearchContext context = createAndPutContext(request);
contextProcessing(context); contextProcessing(context);
try { try {
ShardSearchStats shardSearchStats = context.indexShard().searchService(); SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
shardSearchStats.onPreQueryPhase(context); operationListener.onPreQueryPhase(context);
long time = System.nanoTime(); long time = System.nanoTime();
try { try {
loadOrExecuteQueryPhase(request, context, queryPhase); loadOrExecuteQueryPhase(request, context, queryPhase);
} catch (Throwable e) { } catch (Throwable e) {
shardSearchStats.onFailedQueryPhase(context); operationListener.onFailedQueryPhase(context);
throw ExceptionsHelper.convertToRuntime(e); throw ExceptionsHelper.convertToRuntime(e);
} }
long time2 = System.nanoTime(); long time2 = System.nanoTime();
shardSearchStats.onQueryPhase(context, time2 - time); operationListener.onQueryPhase(context, time2 - time);
shardSearchStats.onPreFetchPhase(context); operationListener.onPreFetchPhase(context);
try { try {
shortcutDocIdsToLoad(context); shortcutDocIdsToLoad(context);
fetchPhase.execute(context); fetchPhase.execute(context);
@ -389,10 +389,10 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
contextProcessedSuccessfully(context); contextProcessedSuccessfully(context);
} }
} catch (Throwable e) { } catch (Throwable e) {
shardSearchStats.onFailedFetchPhase(context); operationListener.onFailedFetchPhase(context);
throw ExceptionsHelper.convertToRuntime(e); throw ExceptionsHelper.convertToRuntime(e);
} }
shardSearchStats.onFetchPhase(context, System.nanoTime() - time2); operationListener.onFetchPhase(context, System.nanoTime() - time2);
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
} catch (Throwable e) { } catch (Throwable e) {
logger.trace("Fetch phase failed", e); logger.trace("Fetch phase failed", e);
@ -408,18 +408,18 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
contextProcessing(context); contextProcessing(context);
context.searcher().setAggregatedDfs(request.dfs()); context.searcher().setAggregatedDfs(request.dfs());
try { try {
ShardSearchStats shardSearchStats = context.indexShard().searchService(); SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
shardSearchStats.onPreQueryPhase(context); operationListener.onPreQueryPhase(context);
long time = System.nanoTime(); long time = System.nanoTime();
try { try {
queryPhase.execute(context); queryPhase.execute(context);
} catch (Throwable e) { } catch (Throwable e) {
shardSearchStats.onFailedQueryPhase(context); operationListener.onFailedQueryPhase(context);
throw ExceptionsHelper.convertToRuntime(e); throw ExceptionsHelper.convertToRuntime(e);
} }
long time2 = System.nanoTime(); long time2 = System.nanoTime();
shardSearchStats.onQueryPhase(context, time2 - time); operationListener.onQueryPhase(context, time2 - time);
shardSearchStats.onPreFetchPhase(context); operationListener.onPreFetchPhase(context);
try { try {
shortcutDocIdsToLoad(context); shortcutDocIdsToLoad(context);
fetchPhase.execute(context); fetchPhase.execute(context);
@ -429,10 +429,10 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
contextProcessedSuccessfully(context); contextProcessedSuccessfully(context);
} }
} catch (Throwable e) { } catch (Throwable e) {
shardSearchStats.onFailedFetchPhase(context); operationListener.onFailedFetchPhase(context);
throw ExceptionsHelper.convertToRuntime(e); throw ExceptionsHelper.convertToRuntime(e);
} }
shardSearchStats.onFetchPhase(context, System.nanoTime() - time2); operationListener.onFetchPhase(context, System.nanoTime() - time2);
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
} catch (Throwable e) { } catch (Throwable e) {
logger.trace("Fetch phase failed", e); logger.trace("Fetch phase failed", e);
@ -447,19 +447,19 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
final SearchContext context = findContext(request.id()); final SearchContext context = findContext(request.id());
contextProcessing(context); contextProcessing(context);
try { try {
ShardSearchStats shardSearchStats = context.indexShard().searchService(); SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
processScroll(request, context); processScroll(request, context);
shardSearchStats.onPreQueryPhase(context); operationListener.onPreQueryPhase(context);
long time = System.nanoTime(); long time = System.nanoTime();
try { try {
queryPhase.execute(context); queryPhase.execute(context);
} catch (Throwable e) { } catch (Throwable e) {
shardSearchStats.onFailedQueryPhase(context); operationListener.onFailedQueryPhase(context);
throw ExceptionsHelper.convertToRuntime(e); throw ExceptionsHelper.convertToRuntime(e);
} }
long time2 = System.nanoTime(); long time2 = System.nanoTime();
shardSearchStats.onQueryPhase(context, time2 - time); operationListener.onQueryPhase(context, time2 - time);
shardSearchStats.onPreFetchPhase(context); operationListener.onPreFetchPhase(context);
try { try {
shortcutDocIdsToLoad(context); shortcutDocIdsToLoad(context);
fetchPhase.execute(context); fetchPhase.execute(context);
@ -469,10 +469,10 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
contextProcessedSuccessfully(context); contextProcessedSuccessfully(context);
} }
} catch (Throwable e) { } catch (Throwable e) {
shardSearchStats.onFailedFetchPhase(context); operationListener.onFailedFetchPhase(context);
throw ExceptionsHelper.convertToRuntime(e); throw ExceptionsHelper.convertToRuntime(e);
} }
shardSearchStats.onFetchPhase(context, System.nanoTime() - time2); operationListener.onFetchPhase(context, System.nanoTime() - time2);
return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget()); return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget());
} catch (Throwable e) { } catch (Throwable e) {
logger.trace("Fetch phase failed", e); logger.trace("Fetch phase failed", e);
@ -486,13 +486,13 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
public FetchSearchResult executeFetchPhase(ShardFetchRequest request) { public FetchSearchResult executeFetchPhase(ShardFetchRequest request) {
final SearchContext context = findContext(request.id()); final SearchContext context = findContext(request.id());
contextProcessing(context); contextProcessing(context);
final ShardSearchStats shardSearchStats = context.indexShard().searchService(); final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
try { try {
if (request.lastEmittedDoc() != null) { if (request.lastEmittedDoc() != null) {
context.scrollContext().lastEmittedDoc = request.lastEmittedDoc(); context.scrollContext().lastEmittedDoc = request.lastEmittedDoc();
} }
context.docIdsToLoad(request.docIds(), 0, request.docIdsSize()); context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());
shardSearchStats.onPreFetchPhase(context); operationListener.onPreFetchPhase(context);
long time = System.nanoTime(); long time = System.nanoTime();
fetchPhase.execute(context); fetchPhase.execute(context);
if (fetchPhaseShouldFreeContext(context)) { if (fetchPhaseShouldFreeContext(context)) {
@ -500,10 +500,10 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
} else { } else {
contextProcessedSuccessfully(context); contextProcessedSuccessfully(context);
} }
shardSearchStats.onFetchPhase(context, System.nanoTime() - time); operationListener.onFetchPhase(context, System.nanoTime() - time);
return context.fetchResult(); return context.fetchResult();
} catch (Throwable e) { } catch (Throwable e) {
shardSearchStats.onFailedFetchPhase(context); operationListener.onFailedFetchPhase(context);
logger.trace("Fetch phase failed", e); logger.trace("Fetch phase failed", e);
processFailure(context, e); processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e); throw ExceptionsHelper.convertToRuntime(e);
@ -527,9 +527,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
try { try {
putContext(context); putContext(context);
if (request.scroll() != null) { if (request.scroll() != null) {
context.indexShard().searchService().onNewScrollContext(context); context.indexShard().getSearchOperationListener().onNewScrollContext(context);
} }
context.indexShard().searchService().onNewContext(context); context.indexShard().getSearchOperationListener().onNewContext(context);
success = true; success = true;
return context; return context;
} finally { } finally {
@ -617,9 +617,9 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> imp
final SearchContext context = removeContext(id); final SearchContext context = removeContext(id);
if (context != null) { if (context != null) {
try { try {
context.indexShard().searchService().onFreeContext(context); context.indexShard().getSearchOperationListener().onFreeContext(context);
if (context.scrollContext() != null) { if (context.scrollContext() != null) {
context.indexShard().searchService().onFreeScrollContext(context); context.indexShard().getSearchOperationListener().onFreeScrollContext(context);
} }
} finally { } finally {
context.close(); context.close();

View File

@ -22,6 +22,7 @@ import org.apache.lucene.index.AssertingDirectoryReader;
import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FieldInvertState; import org.apache.lucene.index.FieldInvertState;
import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.CollectionStatistics; import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.QueryCachingPolicy; import org.apache.lucene.search.QueryCachingPolicy;
@ -44,10 +45,13 @@ import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.cache.query.QueryCache; import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.cache.query.index.IndexQueryCache; import org.elasticsearch.index.cache.query.index.IndexQueryCache;
import org.elasticsearch.index.cache.query.none.NoneQueryCache; import org.elasticsearch.index.cache.query.none.NoneQueryCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.fielddata.IndexFieldDataCache; import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexSearcherWrapper; import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexingOperationListener;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.SimilarityProvider; import org.elasticsearch.index.similarity.SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.similarity.SimilarityService;
@ -65,8 +69,10 @@ import org.elasticsearch.script.ScriptEngineRegistry;
import org.elasticsearch.script.ScriptEngineService; import org.elasticsearch.script.ScriptEngineService;
import org.elasticsearch.script.ScriptService; import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptSettings; import org.elasticsearch.script.ScriptSettings;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule; import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.TestSearchContext;
import org.elasticsearch.test.engine.MockEngineFactory; import org.elasticsearch.test.engine.MockEngineFactory;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService; import org.elasticsearch.watcher.ResourceWatcherService;
@ -160,14 +166,15 @@ public class IndexModuleTests extends ESTestCase {
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings); IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings);
IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment)); IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment));
module.addIndexStore("foo_store", FooStore::new); module.addIndexStore("foo_store", FooStore::new);
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, new IndicesFieldDataCache(settings, listener));
assertTrue(indexService.getIndexStore() instanceof FooStore);
try { try {
module.addIndexStore("foo_store", FooStore::new); module.addIndexStore("foo_store", FooStore::new);
fail("already registered"); fail("already registered");
} catch (IllegalArgumentException ex) { } catch (IllegalArgumentException ex) {
// fine // fine
} }
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, new IndicesFieldDataCache(settings, listener));
assertTrue(indexService.getIndexStore() instanceof FooStore);
indexService.close("simon says", false); indexService.close("simon says", false);
} }
@ -215,6 +222,65 @@ public class IndexModuleTests extends ESTestCase {
indexService.close("simon says", false); indexService.close("simon says", false);
} }
public void testAddIndexOperationListener() throws IOException {
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), null, new AnalysisRegistry(null, environment));
AtomicBoolean executed = new AtomicBoolean(false);
IndexingOperationListener listener = new IndexingOperationListener() {
@Override
public Engine.Index preIndex(Engine.Index operation) {
executed.set(true);
return operation;
}
};
module.addIndexOperationListener(listener);
expectThrows(IllegalArgumentException.class, () -> module.addIndexOperationListener(listener));
expectThrows(IllegalArgumentException.class, () -> module.addIndexOperationListener(null));
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry,
new IndicesFieldDataCache(settings, this.listener));
assertEquals(2, indexService.getIndexOperationListeners().size());
assertEquals(IndexingSlowLog.class, indexService.getIndexOperationListeners().get(0).getClass());
assertSame(listener, indexService.getIndexOperationListeners().get(1));
Engine.Index index = new Engine.Index(new Term("_uid", "1"), null);
for (IndexingOperationListener l : indexService.getIndexOperationListeners()) {
l.preIndex(index);
}
assertTrue(executed.get());
indexService.close("simon says", false);
}
public void testAddSearchOperationListener() throws IOException {
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), null, new AnalysisRegistry(null, environment));
AtomicBoolean executed = new AtomicBoolean(false);
SearchOperationListener listener = new SearchOperationListener() {
@Override
public void onNewContext(SearchContext context) {
executed.set(true);
}
};
module.addSearchOperationListener(listener);
expectThrows(IllegalArgumentException.class, () -> module.addSearchOperationListener(listener));
expectThrows(IllegalArgumentException.class, () -> module.addSearchOperationListener(null));
IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry,
new IndicesFieldDataCache(settings, this.listener));
assertEquals(2, indexService.getSearchOperationListener().size());
assertEquals(SearchSlowLog.class, indexService.getSearchOperationListener().get(0).getClass());
assertSame(listener, indexService.getSearchOperationListener().get(1));
for (SearchOperationListener l : indexService.getSearchOperationListener()) {
l.onNewContext(new TestSearchContext(null));
}
assertTrue(executed.get());
indexService.close("simon says", false);
}
public void testAddSimilarity() throws IOException { public void testAddSimilarity() throws IOException {
Settings indexSettings = Settings.settingsBuilder() Settings indexSettings = Settings.settingsBuilder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
@ -245,6 +311,20 @@ public class IndexModuleTests extends ESTestCase {
indexService.close("simon says", false); indexService.close("simon says", false);
} }
public void testFrozen() {
IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), null, new AnalysisRegistry(null, environment));
module.freeze();
String msg = "Can't modify IndexModule once the index service has been created";
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addSearchOperationListener(null)).getMessage());
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addIndexEventListener(null)).getMessage());
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addIndexOperationListener(null)).getMessage());
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addSimilarity(null, null)).getMessage());
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.setSearcherWrapper(null)).getMessage());
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.forceQueryCacheType("foo")).getMessage());
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addIndexStore("foo", null)).getMessage());
assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.registerQueryCache("foo", null)).getMessage());
}
public void testSetupUnknownSimilarity() throws IOException { public void testSetupUnknownSimilarity() throws IOException {
Settings indexSettings = Settings.settingsBuilder() Settings indexSettings = Settings.settingsBuilder()
.put("index.similarity.my_similarity.type", "test_similarity") .put("index.similarity.my_similarity.type", "test_similarity")

View File

@ -107,6 +107,7 @@ import java.nio.file.StandardCopyOption;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -1293,7 +1294,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(), IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(),
shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(), shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(),
indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper, indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper,
indexService.getThreadPool(), indexService.getBigArrays(), indexService.getSearchSlowLog(), null, listeners indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners)
); );
ShardRoutingHelper.reinit(routing); ShardRoutingHelper.reinit(routing);
newShard.updateRoutingEntry(routing, false); newShard.updateRoutingEntry(routing, false);

View File

@ -0,0 +1,249 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.shard;
import org.apache.lucene.index.Term;
import org.elasticsearch.client.Client;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.TestSearchContext;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class SearchOperationListenerTests extends ESTestCase {
// this test also tests if calls are correct if one or more listeners throw exceptions
public void testListenersAreExecuted() {
AtomicInteger preQuery = new AtomicInteger();
AtomicInteger failedQuery = new AtomicInteger();
AtomicInteger onQuery = new AtomicInteger();
AtomicInteger onFetch = new AtomicInteger();
AtomicInteger preFetch = new AtomicInteger();
AtomicInteger failedFetch = new AtomicInteger();
AtomicInteger newContext = new AtomicInteger();
AtomicInteger freeContext = new AtomicInteger();
AtomicInteger newScrollContext = new AtomicInteger();
AtomicInteger freeScrollContext = new AtomicInteger();
AtomicInteger timeInNanos = new AtomicInteger(randomIntBetween(0, 10));
SearchOperationListener listener = new SearchOperationListener() {
@Override
public void onPreQueryPhase(SearchContext searchContext) {
assertNotNull(searchContext);
preQuery.incrementAndGet();
}
@Override
public void onFailedQueryPhase(SearchContext searchContext) {
assertNotNull(searchContext);
failedQuery.incrementAndGet();
}
@Override
public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
assertEquals(timeInNanos.get(), tookInNanos);
assertNotNull(searchContext);
onQuery.incrementAndGet();
}
@Override
public void onPreFetchPhase(SearchContext searchContext) {
assertNotNull(searchContext);
preFetch.incrementAndGet();
}
@Override
public void onFailedFetchPhase(SearchContext searchContext) {
assertNotNull(searchContext);
failedFetch.incrementAndGet();
}
@Override
public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
assertEquals(timeInNanos.get(), tookInNanos);
onFetch.incrementAndGet();
}
@Override
public void onNewContext(SearchContext context) {
assertNotNull(context);
newContext.incrementAndGet();
}
@Override
public void onFreeContext(SearchContext context) {
assertNotNull(context);
freeContext.incrementAndGet();
}
@Override
public void onNewScrollContext(SearchContext context) {
assertNotNull(context);
newScrollContext.incrementAndGet();
}
@Override
public void onFreeScrollContext(SearchContext context) {
assertNotNull(context);
freeScrollContext.incrementAndGet();
}
};
SearchOperationListener throwingListener = (SearchOperationListener) Proxy.newProxyInstance(
SearchOperationListener.class.getClassLoader(),
new Class[]{SearchOperationListener.class},
(a,b,c) -> { throw new RuntimeException();});
final List<SearchOperationListener> indexingOperationListeners = new ArrayList<>(Arrays.asList(listener, listener));
if (randomBoolean()) {
indexingOperationListeners.add(throwingListener);
if (randomBoolean()) {
indexingOperationListeners.add(throwingListener);
}
}
Collections.shuffle(indexingOperationListeners, random());
SearchOperationListener.CompositeListener compositeListener =
new SearchOperationListener.CompositeListener(indexingOperationListeners, logger);
SearchContext ctx = new TestSearchContext(null);
compositeListener.onQueryPhase(ctx, timeInNanos.get());
assertEquals(0, preFetch.get());
assertEquals(0, preQuery.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(2, onQuery.get());
assertEquals(0, onFetch.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
compositeListener.onFetchPhase(ctx, timeInNanos.get());
assertEquals(0, preFetch.get());
assertEquals(0, preQuery.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
compositeListener.onPreQueryPhase(ctx);
assertEquals(0, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
compositeListener.onPreFetchPhase(ctx);
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(0, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
compositeListener.onFailedFetchPhase(ctx);
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, failedFetch.get());
assertEquals(0, failedQuery.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
compositeListener.onFailedQueryPhase(ctx);
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(0, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
compositeListener.onNewContext(ctx);
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, newContext.get());
assertEquals(0, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
compositeListener.onNewScrollContext(ctx);
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, newContext.get());
assertEquals(2, newScrollContext.get());
assertEquals(0, freeContext.get());
assertEquals(0, freeScrollContext.get());
compositeListener.onFreeContext(ctx);
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, newContext.get());
assertEquals(2, newScrollContext.get());
assertEquals(2, freeContext.get());
assertEquals(0, freeScrollContext.get());
compositeListener.onFreeScrollContext(ctx);
assertEquals(2, preFetch.get());
assertEquals(2, preQuery.get());
assertEquals(2, failedFetch.get());
assertEquals(2, failedQuery.get());
assertEquals(2, onQuery.get());
assertEquals(2, onFetch.get());
assertEquals(2, newContext.get());
assertEquals(2, newScrollContext.get());
assertEquals(2, freeContext.get());
assertEquals(2, freeScrollContext.get());
}
}