From 81801451ad803a8ebae3a587a2029a8c4b049a30 Mon Sep 17 00:00:00 2001
From: Simon Willnauer
Date: Wed, 30 Mar 2016 10:36:54 +0200
Subject: [PATCH] Factor out slow logs into Search and
IndexingOperationListeners
This commit introduces SearchOperationListeneres which allow to hook
into search operation lifecycle and execute operations like slow-logs
and statistic collection in a transparent way. SearchOperationListenrs
can be registered on the IndexModule just like IndexingOperationListeners.
The main consumers (slow log) have already been moved out of IndexService
into IndexModule which reduces the dependency on IndexService as well as
IndexShard and makes slowlogging transparent.
Closes #17398
---
.../resources/checkstyle_suppressions.xml | 1 -
.../org/elasticsearch/index/IndexModule.java | 121 +++++++--
.../org/elasticsearch/index/IndexService.java | 35 +--
.../elasticsearch/index/IndexingSlowLog.java | 1 +
.../elasticsearch/index/SearchSlowLog.java | 6 +-
.../index/search/stats/ShardSearchStats.java | 21 +-
.../elasticsearch/index/shard/IndexShard.java | 17 +-
.../index/shard/SearchOperationListener.java | 227 ++++++++++++++++
.../index/shard/ShadowIndexShard.java | 8 +-
.../elasticsearch/indices/IndicesService.java | 7 +-
.../elasticsearch/search/SearchService.java | 84 +++---
.../elasticsearch/index/IndexModuleTests.java | 84 +++++-
.../index/shard/IndexShardTests.java | 3 +-
.../shard/SearchOperationListenerTests.java | 249 ++++++++++++++++++
14 files changed, 754 insertions(+), 110 deletions(-)
create mode 100644 core/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java
create mode 100644 core/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java
diff --git a/buildSrc/src/main/resources/checkstyle_suppressions.xml b/buildSrc/src/main/resources/checkstyle_suppressions.xml
index 915c6492712..dabbf43b235 100644
--- a/buildSrc/src/main/resources/checkstyle_suppressions.xml
+++ b/buildSrc/src/main/resources/checkstyle_suppressions.xml
@@ -415,7 +415,6 @@
-
diff --git a/core/src/main/java/org/elasticsearch/index/IndexModule.java b/core/src/main/java/org/elasticsearch/index/IndexModule.java
index 48230e6ec1e..3bc1346e884 100644
--- a/core/src/main/java/org/elasticsearch/index/IndexModule.java
+++ b/core/src/main/java/org/elasticsearch/index/IndexModule.java
@@ -33,6 +33,7 @@ import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexingOperationListener;
+import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.similarity.BM25SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityService;
@@ -43,11 +44,14 @@ import org.elasticsearch.indices.fielddata.cache.IndicesFieldDataCache;
import org.elasticsearch.indices.mapper.MapperRegistry;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -55,13 +59,15 @@ import java.util.function.Function;
/**
* IndexModule represents the central extension point for index level custom implementations like:
*
- * - {@link SimilarityProvider} - New {@link SimilarityProvider} implementations can be registered through {@link #addSimilarity(String, BiFunction)}
- * while existing Providers can be referenced through Settings under the {@link IndexModule#SIMILARITY_SETTINGS_PREFIX} prefix
- * along with the "type" value. For example, to reference the {@link BM25SimilarityProvider}, the configuration
- * "index.similarity.my_similarity.type : "BM25" can be used.
+ * - {@link SimilarityProvider} - New {@link SimilarityProvider} implementations can be registered through
+ * {@link #addSimilarity(String, BiFunction)}while existing Providers can be referenced through Settings under the
+ * {@link IndexModule#SIMILARITY_SETTINGS_PREFIX} prefix along with the "type" value. For example, to reference the
+ * {@link BM25SimilarityProvider}, the configuration "index.similarity.my_similarity.type : "BM25" can be used.
* - {@link IndexStore} - Custom {@link IndexStore} instances can be registered via {@link #addIndexStore(String, BiFunction)}
- * - {@link IndexEventListener} - Custom {@link IndexEventListener} instances can be registered via {@link #addIndexEventListener(IndexEventListener)}
- * - Settings update listener - Custom settings update listener can be registered via {@link #addSettingsUpdateConsumer(Setting, Consumer)}
+ * - {@link IndexEventListener} - Custom {@link IndexEventListener} instances can be registered via
+ * {@link #addIndexEventListener(IndexEventListener)}
+ * - Settings update listener - Custom settings update listener can be registered via
+ * {@link #addSettingsUpdateConsumer(Setting, Consumer)}
*
*/
public final class IndexModule {
@@ -84,11 +90,13 @@ public final class IndexModule {
final SetOnce engineFactory = new SetOnce<>();
private SetOnce indexSearcherWrapper = new SetOnce<>();
private final Set indexEventListeners = new HashSet<>();
- private IndexEventListener listener;
private final Map> similarities = new HashMap<>();
private final Map> storeTypes = new HashMap<>();
private final Map> queryCaches = new HashMap<>();
private final SetOnce forceQueryCacheType = new SetOnce<>();
+ private final List searchOperationListeners = new ArrayList<>();
+ private final List indexOperationListeners = new ArrayList<>();
+ private final AtomicBoolean frozen = new AtomicBoolean(false);
public IndexModule(IndexSettings indexSettings, IndexStoreConfig indexStoreConfig, AnalysisRegistry analysisRegistry) {
this.indexStoreConfig = indexStoreConfig;
@@ -96,12 +104,15 @@ public final class IndexModule {
this.analysisRegistry = analysisRegistry;
registerQueryCache(INDEX_QUERY_CACHE, IndexQueryCache::new);
registerQueryCache(NONE_QUERY_CACHE, (a, b) -> new NoneQueryCache(a));
+ this.searchOperationListeners.add(new SearchSlowLog(indexSettings));
+ this.indexOperationListeners.add(new IndexingSlowLog(indexSettings));
}
/**
* Adds a Setting and it's consumer for this index.
*/
public void addSettingsUpdateConsumer(Setting setting, Consumer consumer) {
+ ensureNotFrozen();
if (setting == null) {
throw new IllegalArgumentException("setting must not be null");
}
@@ -134,9 +145,7 @@ public final class IndexModule {
*
*/
public void addIndexEventListener(IndexEventListener listener) {
- if (this.listener != null) {
- throw new IllegalStateException("can't add listener after listeners are frozen");
- }
+ ensureNotFrozen();
if (listener == null) {
throw new IllegalArgumentException("listener must not be null");
}
@@ -147,6 +156,52 @@ public final class IndexModule {
this.indexEventListeners.add(listener);
}
+ /**
+ * Adds an {@link SearchOperationListener} for this index. All listeners added here
+ * are maintained for the entire index lifecycle on this node. Once an index is closed or deleted these
+ * listeners go out of scope.
+ *
+ * Note: an index might be created on a node multiple times. For instance if the last shard from an index is
+ * relocated to another node the internal representation will be destroyed which includes the registered listeners.
+ * Once the node holds at least one shard of an index all modules are reloaded and listeners are registered again.
+ * Listeners can't be unregistered they will stay alive for the entire time the index is allocated on a node.
+ *
+ */
+ public void addSearchOperationListener(SearchOperationListener listener) {
+ ensureNotFrozen();
+ if (listener == null) {
+ throw new IllegalArgumentException("listener must not be null");
+ }
+ if (searchOperationListeners.contains(listener)) {
+ throw new IllegalArgumentException("listener already added");
+ }
+
+ this.searchOperationListeners.add(listener);
+ }
+
+ /**
+ * Adds an {@link IndexingOperationListener} for this index. All listeners added here
+ * are maintained for the entire index lifecycle on this node. Once an index is closed or deleted these
+ * listeners go out of scope.
+ *
+ * Note: an index might be created on a node multiple times. For instance if the last shard from an index is
+ * relocated to another node the internal representation will be destroyed which includes the registered listeners.
+ * Once the node holds at least one shard of an index all modules are reloaded and listeners are registered again.
+ * Listeners can't be unregistered they will stay alive for the entire time the index is allocated on a node.
+ *
+ */
+ public void addIndexOperationListener(IndexingOperationListener listener) {
+ ensureNotFrozen();
+ if (listener == null) {
+ throw new IllegalArgumentException("listener must not be null");
+ }
+ if (indexOperationListeners.contains(listener)) {
+ throw new IllegalArgumentException("listener already added");
+ }
+
+ this.indexOperationListeners.add(listener);
+ }
+
/**
* Adds an {@link IndexStore} type to this index module. Typically stores are registered with a reference to
* it's constructor:
@@ -158,6 +213,7 @@ public final class IndexModule {
* @param provider the instance provider / factory method
*/
public void addIndexStore(String type, BiFunction provider) {
+ ensureNotFrozen();
if (storeTypes.containsKey(type)) {
throw new IllegalArgumentException("key [" + type +"] already registered");
}
@@ -172,6 +228,7 @@ public final class IndexModule {
* @param similarity SimilarityProvider to register
*/
public void addSimilarity(String name, BiFunction similarity) {
+ ensureNotFrozen();
if (similarities.containsKey(name) || SimilarityService.BUILT_IN.containsKey(name)) {
throw new IllegalArgumentException("similarity for name: [" + name + " is already registered");
}
@@ -184,6 +241,7 @@ public final class IndexModule {
* @param provider the provider instance
*/
public void registerQueryCache(String name, BiFunction provider) {
+ ensureNotFrozen();
if (provider == null) {
throw new IllegalArgumentException("provider must not be null");
}
@@ -194,19 +252,21 @@ public final class IndexModule {
}
/**
- * Sets a {@link org.elasticsearch.index.IndexModule.IndexSearcherWrapperFactory} that is called once the IndexService is fully constructed.
+ * Sets a {@link org.elasticsearch.index.IndexModule.IndexSearcherWrapperFactory} that is called once the IndexService
+ * is fully constructed.
* Note: this method can only be called once per index. Multiple wrappers are not supported.
*/
public void setSearcherWrapper(IndexSearcherWrapperFactory indexSearcherWrapperFactory) {
+ ensureNotFrozen();
this.indexSearcherWrapper.set(indexSearcherWrapperFactory);
}
- public IndexEventListener freeze() {
- // TODO somehow we need to make this pkg private...
- if (listener == null) {
- listener = new CompositeIndexEventListener(indexSettings, indexEventListeners);
+ IndexEventListener freeze() { // pkg private for testing
+ if (this.frozen.compareAndSet(false, true)) {
+ return new CompositeIndexEventListener(indexSettings, indexEventListeners);
+ } else {
+ throw new IllegalStateException("already frozen");
}
- return listener;
}
private static boolean isBuiltinType(String storeType) {
@@ -246,10 +306,13 @@ public final class IndexModule {
IndexSearcherWrapper newWrapper(final IndexService indexService);
}
- public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter, NodeServicesProvider servicesProvider, IndicesQueryCache indicesQueryCache, MapperRegistry mapperRegistry, IndicesFieldDataCache indicesFieldDataCache,
- IndexingOperationListener... listeners) throws IOException {
- IndexSearcherWrapperFactory searcherWrapperFactory = indexSearcherWrapper.get() == null ? (shard) -> null : indexSearcherWrapper.get();
- IndexEventListener eventListener = freeze();
+ public IndexService newIndexService(NodeEnvironment environment, IndexService.ShardStoreDeleter shardStoreDeleter,
+ NodeServicesProvider servicesProvider, IndicesQueryCache indicesQueryCache,
+ MapperRegistry mapperRegistry, IndicesFieldDataCache indicesFieldDataCache) throws IOException {
+ final IndexEventListener eventListener = freeze();
+ IndexSearcherWrapperFactory searcherWrapperFactory = indexSearcherWrapper.get() == null
+ ? (shard) -> null : indexSearcherWrapper.get();
+ eventListener.beforeIndexCreated(indexSettings.getIndex(), indexSettings.getSettings());
final String storeType = indexSettings.getValue(INDEX_STORE_TYPE_SETTING);
final IndexStore store;
if (Strings.isEmpty(storeType) || isBuiltinType(storeType)) {
@@ -265,12 +328,15 @@ public final class IndexModule {
}
}
indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING, store::setType);
- indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING, store::setMaxRate);
- final String queryCacheType = forceQueryCacheType.get() != null ? forceQueryCacheType.get() : indexSettings.getValue(INDEX_QUERY_CACHE_TYPE_SETTING);
+ indexSettings.getScopedSettings().addSettingsUpdateConsumer(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING,
+ store::setMaxRate);
+ final String queryCacheType = forceQueryCacheType.get() != null
+ ? forceQueryCacheType.get() : indexSettings.getValue(INDEX_QUERY_CACHE_TYPE_SETTING);
final BiFunction queryCacheProvider = queryCaches.get(queryCacheType);
final QueryCache queryCache = queryCacheProvider.apply(indexSettings, indicesQueryCache);
- return new IndexService(indexSettings, environment, new SimilarityService(indexSettings, similarities), shardStoreDeleter, analysisRegistry, engineFactory.get(),
- servicesProvider, queryCache, store, eventListener, searcherWrapperFactory, mapperRegistry, indicesFieldDataCache, listeners);
+ return new IndexService(indexSettings, environment, new SimilarityService(indexSettings, similarities), shardStoreDeleter,
+ analysisRegistry, engineFactory.get(), servicesProvider, queryCache, store, eventListener, searcherWrapperFactory,
+ mapperRegistry, indicesFieldDataCache, searchOperationListeners, indexOperationListeners);
}
/**
@@ -282,7 +348,14 @@ public final class IndexModule {
* @see #INDEX_QUERY_CACHE_TYPE_SETTING
*/
public void forceQueryCacheType(String type) {
+ ensureNotFrozen();
this.forceQueryCacheType.set(type);
}
+ private void ensureNotFrozen() {
+ if (this.frozen.get()) {
+ throw new IllegalStateException("Can't modify IndexModule once the index service has been created");
+ }
+ }
+
}
diff --git a/core/src/main/java/org/elasticsearch/index/IndexService.java b/core/src/main/java/org/elasticsearch/index/IndexService.java
index 815f257a45d..1f1037ea18e 100644
--- a/core/src/main/java/org/elasticsearch/index/IndexService.java
+++ b/core/src/main/java/org/elasticsearch/index/IndexService.java
@@ -56,6 +56,7 @@ import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexingOperationListener;
+import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShadowIndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.ShardNotFoundException;
@@ -73,8 +74,10 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -109,11 +112,10 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean deleted = new AtomicBoolean(false);
private final IndexSettings indexSettings;
- private final IndexingSlowLog slowLog;
- private final IndexingOperationListener[] listeners;
+ private final List indexingOperationListeners;
+ private final List searchOperationListeners;
private volatile AsyncRefreshTask refreshTask;
private volatile AsyncTranslogFSync fsyncTask;
- private final SearchSlowLog searchSlowLog;
private final ThreadPool threadPool;
private final BigArrays bigArrays;
@@ -129,7 +131,8 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
IndexModule.IndexSearcherWrapperFactory wrapperFactory,
MapperRegistry mapperRegistry,
IndicesFieldDataCache indicesFieldDataCache,
- IndexingOperationListener... listenersIn) throws IOException {
+ List searchOperationListeners,
+ List indexingOperationListeners) throws IOException {
super(indexSettings);
this.indexSettings = indexSettings;
this.analysisService = registry.build(indexSettings);
@@ -155,15 +158,10 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
this.engineFactory = engineFactory;
// initialize this last -- otherwise if the wrapper requires any other member to be non-null we fail with an NPE
this.searcherWrapper = wrapperFactory.newWrapper(this);
- this.slowLog = new IndexingSlowLog(indexSettings);
-
- // Add our slowLog to the incoming IndexingOperationListeners:
- this.listeners = new IndexingOperationListener[1+listenersIn.length];
- this.listeners[0] = slowLog;
- System.arraycopy(listenersIn, 0, this.listeners, 1, listenersIn.length);
+ this.indexingOperationListeners = Collections.unmodifiableList(indexingOperationListeners);
+ this.searchOperationListeners = Collections.unmodifiableList(searchOperationListeners);
// kick off async ops for the first shard in this index
this.refreshTask = new AsyncRefreshTask(this);
- searchSlowLog = new SearchSlowLog(indexSettings);
rescheduleFsyncTask(indexSettings.getTranslogDurability());
}
@@ -338,12 +336,13 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
new StoreCloseListener(shardId, canDeleteShardContent, () -> eventListener.onStoreClosed(shardId)));
if (useShadowEngine(primary, indexSettings)) {
indexShard = new ShadowIndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService,
- indexFieldData, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, searchSlowLog, engineWarmer);
+ indexFieldData, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
+ searchOperationListeners);
// no indexing listeners - shadow engines don't index
} else {
indexShard = new IndexShard(shardId, this.indexSettings, path, store, indexCache, mapperService, similarityService,
- indexFieldData, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, searchSlowLog, engineWarmer,
- listeners);
+ indexFieldData, engineFactory, eventListener, searcherWrapper, threadPool, bigArrays, engineWarmer,
+ searchOperationListeners, indexingOperationListeners);
}
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
@@ -455,8 +454,12 @@ public final class IndexService extends AbstractIndexComponent implements IndexC
return bigArrays;
}
- public SearchSlowLog getSearchSlowLog() {
- return searchSlowLog;
+ List getIndexOperationListeners() { // pkg private for testing
+ return indexingOperationListeners;
+ }
+
+ List getSearchOperationListener() { // pkg private for testing
+ return searchOperationListeners;
}
private class StoreCloseListener implements Store.OnClose {
diff --git a/core/src/main/java/org/elasticsearch/index/IndexingSlowLog.java b/core/src/main/java/org/elasticsearch/index/IndexingSlowLog.java
index 723ebd62c5b..a145012dd2b 100644
--- a/core/src/main/java/org/elasticsearch/index/IndexingSlowLog.java
+++ b/core/src/main/java/org/elasticsearch/index/IndexingSlowLog.java
@@ -136,6 +136,7 @@ public final class IndexingSlowLog implements IndexingOperationListener {
}
+ @Override
public void postIndex(Engine.Index index, boolean created) {
final long took = index.endTime() - index.startTime();
postIndexing(index.parsedDoc(), took);
diff --git a/core/src/main/java/org/elasticsearch/index/SearchSlowLog.java b/core/src/main/java/org/elasticsearch/index/SearchSlowLog.java
index cfa779d64aa..57d879ab584 100644
--- a/core/src/main/java/org/elasticsearch/index/SearchSlowLog.java
+++ b/core/src/main/java/org/elasticsearch/index/SearchSlowLog.java
@@ -25,13 +25,14 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.search.internal.SearchContext;
import java.util.concurrent.TimeUnit;
/**
*/
-public final class SearchSlowLog {
+public final class SearchSlowLog implements SearchOperationListener {
private boolean reformat;
@@ -116,7 +117,7 @@ public final class SearchSlowLog {
this.queryLogger.setLevel(level.name());
this.fetchLogger.setLevel(level.name());
}
-
+ @Override
public void onQueryPhase(SearchContext context, long tookInNanos) {
if (queryWarnThreshold >= 0 && tookInNanos > queryWarnThreshold) {
queryLogger.warn("{}", new SlowLogSearchContextPrinter(context, tookInNanos, reformat));
@@ -129,6 +130,7 @@ public final class SearchSlowLog {
}
}
+ @Override
public void onFetchPhase(SearchContext context, long tookInNanos) {
if (fetchWarnThreshold >= 0 && tookInNanos > fetchWarnThreshold) {
fetchLogger.warn("{}", new SlowLogSearchContextPrinter(context, tookInNanos, reformat));
diff --git a/core/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java b/core/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java
index 748bb01bd54..13d72a74b9c 100644
--- a/core/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java
+++ b/core/src/main/java/org/elasticsearch/index/search/stats/ShardSearchStats.java
@@ -23,7 +23,7 @@ import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.metrics.MeanMetric;
import org.elasticsearch.common.regex.Regex;
-import org.elasticsearch.index.SearchSlowLog;
+import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.search.internal.SearchContext;
import java.util.HashMap;
@@ -35,17 +35,12 @@ import static java.util.Collections.emptyMap;
/**
*/
-public final class ShardSearchStats {
+public final class ShardSearchStats implements SearchOperationListener {
- private final SearchSlowLog slowLogSearchService;
private final StatsHolder totalStats = new StatsHolder();
private final CounterMetric openContexts = new CounterMetric();
private volatile Map groupsStats = emptyMap();
- public ShardSearchStats(SearchSlowLog searchSlowLog) {
- this.slowLogSearchService = searchSlowLog;
- }
-
/**
* Returns the stats, including group specific stats. If the groups are null/0 length, then nothing
* is returned for them. If they are set, then only groups provided will be returned, or
@@ -71,6 +66,7 @@ public final class ShardSearchStats {
return new SearchStats(total, openContexts.count(), groupsSt);
}
+ @Override
public void onPreQueryPhase(SearchContext searchContext) {
computeStats(searchContext, statsHolder -> {
if (searchContext.hasOnlySuggest()) {
@@ -81,6 +77,7 @@ public final class ShardSearchStats {
});
}
+ @Override
public void onFailedQueryPhase(SearchContext searchContext) {
computeStats(searchContext, statsHolder -> {
if (searchContext.hasOnlySuggest()) {
@@ -91,6 +88,7 @@ public final class ShardSearchStats {
});
}
+ @Override
public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
computeStats(searchContext, statsHolder -> {
if (searchContext.hasOnlySuggest()) {
@@ -101,23 +99,24 @@ public final class ShardSearchStats {
statsHolder.queryCurrent.dec();
}
});
- slowLogSearchService.onQueryPhase(searchContext, tookInNanos);
}
+ @Override
public void onPreFetchPhase(SearchContext searchContext) {
computeStats(searchContext, statsHolder -> statsHolder.fetchCurrent.inc());
}
+ @Override
public void onFailedFetchPhase(SearchContext searchContext) {
computeStats(searchContext, statsHolder -> statsHolder.fetchCurrent.dec());
}
+ @Override
public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
computeStats(searchContext, statsHolder -> {
statsHolder.fetchMetric.inc(tookInNanos);
statsHolder.fetchCurrent.dec();
});
- slowLogSearchService.onFetchPhase(searchContext, tookInNanos);
}
public void clear() {
@@ -159,18 +158,22 @@ public final class ShardSearchStats {
return stats;
}
+ @Override
public void onNewContext(SearchContext context) {
openContexts.inc();
}
+ @Override
public void onFreeContext(SearchContext context) {
openContexts.dec();
}
+ @Override
public void onNewScrollContext(SearchContext context) {
totalStats.scrollCurrent.inc();
}
+ @Override
public void onFreeScrollContext(SearchContext context) {
totalStats.scrollCurrent.dec();
totalStats.scrollMetric.inc(System.nanoTime() - context.getOriginNanoTime());
diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
index 3404f79e6da..10ef37683b2 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java
@@ -128,7 +128,7 @@ public class IndexShard extends AbstractIndexShardComponent {
private final IndexCache indexCache;
private final Store store;
private final InternalIndexingStats internalIndexingStats;
- private final ShardSearchStats searchService;
+ private final ShardSearchStats searchStats = new ShardSearchStats();;
private final ShardGetService getService;
private final ShardIndexWarmerService shardWarmerService;
private final ShardRequestCache shardQueryCache;
@@ -151,6 +151,7 @@ public class IndexShard extends AbstractIndexShardComponent {
* being indexed/deleted.
*/
private final AtomicLong writingBytes = new AtomicLong();
+ private final SearchOperationListener searchOperationListener;
protected volatile ShardRouting shardRouting;
protected volatile IndexShardState state;
@@ -195,7 +196,7 @@ public class IndexShard extends AbstractIndexShardComponent {
MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService,
@Nullable EngineFactory engineFactory,
IndexEventListener indexEventListener, IndexSearcherWrapper indexSearcherWrapper, ThreadPool threadPool, BigArrays bigArrays,
- SearchSlowLog slowLog, Engine.Warmer warmer, IndexingOperationListener... listeners) {
+ Engine.Warmer warmer, List searchOperationListener, List listeners) {
super(shardId, indexSettings);
final Settings settings = indexSettings.getSettings();
this.codecService = new CodecService(mapperService, logger);
@@ -210,11 +211,13 @@ public class IndexShard extends AbstractIndexShardComponent {
this.mapperService = mapperService;
this.indexCache = indexCache;
this.internalIndexingStats = new InternalIndexingStats();
- final List listenersList = new ArrayList<>(Arrays.asList(listeners));
+ final List listenersList = new ArrayList<>(listeners);
listenersList.add(internalIndexingStats);
this.indexingOperationListeners = new IndexingOperationListener.CompositeListener(listenersList, logger);
+ final List searchListenersList = new ArrayList<>(searchOperationListener);
+ searchListenersList.add(searchStats);
+ this.searchOperationListener = new SearchOperationListener.CompositeListener(searchListenersList, logger);
this.getService = new ShardGetService(indexSettings, this, mapperService);
- this.searchService = new ShardSearchStats(slowLog);
this.shardWarmerService = new ShardIndexWarmerService(shardId, indexSettings);
this.shardQueryCache = new ShardRequestCache();
this.shardFieldData = new ShardFieldData();
@@ -270,8 +273,8 @@ public class IndexShard extends AbstractIndexShardComponent {
return mapperService;
}
- public ShardSearchStats searchService() {
- return this.searchService;
+ public SearchOperationListener getSearchOperationListener() {
+ return this.searchOperationListener;
}
public ShardIndexWarmerService warmerService() {
@@ -613,7 +616,7 @@ public class IndexShard extends AbstractIndexShardComponent {
}
public SearchStats searchStats(String... groups) {
- return searchService.stats(groups);
+ return searchStats.stats(groups);
}
public GetStats getStats() {
diff --git a/core/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java b/core/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java
new file mode 100644
index 00000000000..012617edcd0
--- /dev/null
+++ b/core/src/main/java/org/elasticsearch/index/shard/SearchOperationListener.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.index.shard;
+
+import org.elasticsearch.common.logging.ESLogger;
+import org.elasticsearch.search.internal.SearchContext;
+
+import java.util.List;
+
+/**
+ * An listener for search, fetch and context events.
+ */
+public interface SearchOperationListener {
+
+ /**
+ * Executed before the query phase is executed
+ * @param searchContext the current search context
+ */
+ default void onPreQueryPhase(SearchContext searchContext) {};
+
+ /**
+ * Executed if a query phased failed.
+ * @param searchContext the current search context
+ */
+ default void onFailedQueryPhase(SearchContext searchContext) {};
+
+ /**
+ * Executed after the query phase successfully finished.
+ * Note: this is not invoked if the query phase execution failed.
+ * @param searchContext the current search context
+ * @param tookInNanos the number of nanoseconds the query execution took
+ *
+ * @see #onFailedQueryPhase(SearchContext)
+ */
+ default void onQueryPhase(SearchContext searchContext, long tookInNanos) {};
+
+ /**
+ * Executed before the fetch phase is executed
+ * @param searchContext the current search context
+ */
+ default void onPreFetchPhase(SearchContext searchContext) {};
+
+ /**
+ * Executed if a fetch phased failed.
+ * @param searchContext the current search context
+ */
+ default void onFailedFetchPhase(SearchContext searchContext) {};
+
+ /**
+ * Executed after the fetch phase successfully finished.
+ * Note: this is not invoked if the fetch phase execution failed.
+ * @param searchContext the current search context
+ * @param tookInNanos the number of nanoseconds the fetch execution took
+ *
+ * @see #onFailedFetchPhase(SearchContext)
+ */
+ default void onFetchPhase(SearchContext searchContext, long tookInNanos) {};
+
+ /**
+ * Executed when a new search context was created
+ * @param context the created context
+ */
+ default void onNewContext(SearchContext context) {};
+
+ /**
+ * Executed when a previously created search context is freed.
+ * This happens either when the search execution finishes, if the
+ * execution failed or if the search context as idle for and needs to be
+ * cleaned up.
+ * @param context the freed search context
+ */
+ default void onFreeContext(SearchContext context) {};
+
+ /**
+ * Executed when a new scroll search {@link SearchContext} was created
+ * @param context the created search context
+ */
+ default void onNewScrollContext(SearchContext context) {};
+
+ /**
+ * Executed when a scroll search {@link SearchContext} is freed.
+ * This happens either when the scroll search execution finishes, if the
+ * execution failed or if the search context as idle for and needs to be
+ * cleaned up.
+ * @param context the freed search context
+ */
+ default void onFreeScrollContext(SearchContext context) {};
+
+ /**
+ * A Composite listener that multiplexes calls to each of the listeners methods.
+ */
+ final class CompositeListener implements SearchOperationListener {
+ private final List listeners;
+ private final ESLogger logger;
+
+ public CompositeListener(List listeners, ESLogger logger) {
+ this.listeners = listeners;
+ this.logger = logger;
+ }
+
+ @Override
+ public void onPreQueryPhase(SearchContext searchContext) {
+ for (SearchOperationListener listener : listeners) {
+ try {
+ listener.onPreQueryPhase(searchContext);
+ } catch (Throwable t) {
+ logger.warn("onPreQueryPhase listener [{}] failed", t, listener);
+ }
+ }
+ }
+
+ @Override
+ public void onFailedQueryPhase(SearchContext searchContext) {
+ for (SearchOperationListener listener : listeners) {
+ try {
+ listener.onFailedQueryPhase(searchContext);
+ } catch (Throwable t) {
+ logger.warn("onFailedQueryPhase listener [{}] failed", t, listener);
+ }
+ }
+ }
+
+ @Override
+ public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
+ for (SearchOperationListener listener : listeners) {
+ try {
+ listener.onQueryPhase(searchContext, tookInNanos);
+ } catch (Throwable t) {
+ logger.warn("onQueryPhase listener [{}] failed", t, listener);
+ }
+ }
+ }
+
+ @Override
+ public void onPreFetchPhase(SearchContext searchContext) {
+ for (SearchOperationListener listener : listeners) {
+ try {
+ listener.onPreFetchPhase(searchContext);
+ } catch (Throwable t) {
+ logger.warn("onPreFetchPhase listener [{}] failed", t, listener);
+ }
+ }
+ }
+
+ @Override
+ public void onFailedFetchPhase(SearchContext searchContext) {
+ for (SearchOperationListener listener : listeners) {
+ try {
+ listener.onFailedFetchPhase(searchContext);
+ } catch (Throwable t) {
+ logger.warn("onFailedFetchPhase listener [{}] failed", t, listener);
+ }
+ }
+ }
+
+ @Override
+ public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
+ for (SearchOperationListener listener : listeners) {
+ try {
+ listener.onFetchPhase(searchContext, tookInNanos);
+ } catch (Throwable t) {
+ logger.warn("onFetchPhase listener [{}] failed", t, listener);
+ }
+ }
+ }
+
+ @Override
+ public void onNewContext(SearchContext context) {
+ for (SearchOperationListener listener : listeners) {
+ try {
+ listener.onNewContext(context);
+ } catch (Throwable t) {
+ logger.warn("onNewContext listener [{}] failed", t, listener);
+ }
+ }
+ }
+
+ @Override
+ public void onFreeContext(SearchContext context) {
+ for (SearchOperationListener listener : listeners) {
+ try {
+ listener.onFreeContext(context);
+ } catch (Throwable t) {
+ logger.warn("onFreeContext listener [{}] failed", t, listener);
+ }
+ }
+ }
+
+ @Override
+ public void onNewScrollContext(SearchContext context) {
+ for (SearchOperationListener listener : listeners) {
+ try {
+ listener.onNewScrollContext(context);
+ } catch (Throwable t) {
+ logger.warn("onNewScrollContext listener [{}] failed", t, listener);
+ }
+ }
+ }
+
+ @Override
+ public void onFreeScrollContext(SearchContext context) {
+ for (SearchOperationListener listener : listeners) {
+ try {
+ listener.onFreeScrollContext(context);
+ } catch (Throwable t) {
+ logger.warn("onFreeScrollContext listener [{}] failed", t, listener);
+ }
+ }
+ }
+ }
+}
diff --git a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java
index 774052b3a5f..54fdd8de5f9 100644
--- a/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java
+++ b/core/src/main/java/org/elasticsearch/index/shard/ShadowIndexShard.java
@@ -36,6 +36,8 @@ import org.elasticsearch.index.translog.TranslogStats;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
/**
* ShadowIndexShard extends {@link IndexShard} to add file synchronization
@@ -48,10 +50,10 @@ public final class ShadowIndexShard extends IndexShard {
public ShadowIndexShard(ShardId shardId, IndexSettings indexSettings, ShardPath path, Store store, IndexCache indexCache,
MapperService mapperService, SimilarityService similarityService, IndexFieldDataService indexFieldDataService,
@Nullable EngineFactory engineFactory, IndexEventListener indexEventListener, IndexSearcherWrapper wrapper,
- ThreadPool threadPool, BigArrays bigArrays, SearchSlowLog searchSlowLog, Engine.Warmer engineWarmer)
- throws IOException {
+ ThreadPool threadPool, BigArrays bigArrays, Engine.Warmer engineWarmer,
+ List searchOperationListeners) throws IOException {
super(shardId, indexSettings, path, store, indexCache, mapperService, similarityService, indexFieldDataService, engineFactory,
- indexEventListener, wrapper, threadPool, bigArrays, searchSlowLog, engineWarmer);
+ indexEventListener, wrapper, threadPool, bigArrays, engineWarmer, searchOperationListeners, Collections.emptyList());
}
/**
diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesService.java b/core/src/main/java/org/elasticsearch/indices/IndicesService.java
index 2719075a5c7..1e268c88c97 100644
--- a/core/src/main/java/org/elasticsearch/indices/IndicesService.java
+++ b/core/src/main/java/org/elasticsearch/indices/IndicesService.java
@@ -377,13 +377,14 @@ public class IndicesService extends AbstractLifecycleComponent i
idxSettings.isShadowReplicaIndex() ? "s" : "", reason);
final IndexModule indexModule = new IndexModule(idxSettings, indexStoreConfig, analysisRegistry);
+ for (IndexingOperationListener operationListener : indexingOperationListeners) {
+ indexModule.addIndexOperationListener(operationListener);
+ }
pluginsService.onIndexModule(indexModule);
for (IndexEventListener listener : builtInListeners) {
indexModule.addIndexEventListener(listener);
}
- final IndexEventListener listener = indexModule.freeze();
- listener.beforeIndexCreated(index, idxSettings.getSettings());
- return indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, indicesQueryCache, mapperRegistry, indicesFieldDataCache, indexingOperationListeners);
+ return indexModule.newIndexService(nodeEnv, this, nodeServicesProvider, indicesQueryCache, mapperRegistry, indicesFieldDataCache);
}
/**
diff --git a/core/src/main/java/org/elasticsearch/search/SearchService.java b/core/src/main/java/org/elasticsearch/search/SearchService.java
index 98085bfed35..b45242c7944 100644
--- a/core/src/main/java/org/elasticsearch/search/SearchService.java
+++ b/core/src/main/java/org/elasticsearch/search/SearchService.java
@@ -51,10 +51,10 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.fieldstats.FieldStatsProvider;
import org.elasticsearch.index.query.QueryParseContext;
import org.elasticsearch.index.query.QueryShardContext;
-import org.elasticsearch.index.search.stats.ShardSearchStats;
import org.elasticsearch.index.search.stats.StatsGroupsParseElement;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexShard;
+import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.script.ExecutableScript;
import org.elasticsearch.script.ScriptContext;
@@ -274,9 +274,9 @@ public class SearchService extends AbstractLifecycleComponent imp
public QuerySearchResultProvider executeQueryPhase(ShardSearchRequest request) throws IOException {
final SearchContext context = createAndPutContext(request);
- final ShardSearchStats shardSearchStats = context.indexShard().searchService();
+ final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
try {
- shardSearchStats.onPreQueryPhase(context);
+ operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
contextProcessing(context);
@@ -287,7 +287,7 @@ public class SearchService extends AbstractLifecycleComponent imp
} else {
contextProcessedSuccessfully(context);
}
- shardSearchStats.onQueryPhase(context, System.nanoTime() - time);
+ operationListener.onQueryPhase(context, System.nanoTime() - time);
return context.queryResult();
} catch (Throwable e) {
@@ -295,7 +295,7 @@ public class SearchService extends AbstractLifecycleComponent imp
if (e instanceof ExecutionException) {
e = e.getCause();
}
- shardSearchStats.onFailedQueryPhase(context);
+ operationListener.onFailedQueryPhase(context);
logger.trace("Query phase failed", e);
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
@@ -306,18 +306,18 @@ public class SearchService extends AbstractLifecycleComponent imp
public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request) {
final SearchContext context = findContext(request.id());
- ShardSearchStats shardSearchStats = context.indexShard().searchService();
+ SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
try {
- shardSearchStats.onPreQueryPhase(context);
+ operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
contextProcessing(context);
processScroll(request, context);
queryPhase.execute(context);
contextProcessedSuccessfully(context);
- shardSearchStats.onQueryPhase(context, System.nanoTime() - time);
+ operationListener.onQueryPhase(context, System.nanoTime() - time);
return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget());
} catch (Throwable e) {
- shardSearchStats.onFailedQueryPhase(context);
+ operationListener.onFailedQueryPhase(context);
logger.trace("Query phase failed", e);
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
@@ -331,9 +331,9 @@ public class SearchService extends AbstractLifecycleComponent imp
contextProcessing(context);
context.searcher().setAggregatedDfs(request.dfs());
IndexShard indexShard = context.indexShard();
- ShardSearchStats shardSearchStats = indexShard.searchService();
+ SearchOperationListener operationListener = indexShard.getSearchOperationListener();
try {
- shardSearchStats.onPreQueryPhase(context);
+ operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
queryPhase.execute(context);
if (context.queryResult().topDocs().scoreDocs.length == 0 && context.scrollContext() == null) {
@@ -342,10 +342,10 @@ public class SearchService extends AbstractLifecycleComponent imp
} else {
contextProcessedSuccessfully(context);
}
- shardSearchStats.onQueryPhase(context, System.nanoTime() - time);
+ operationListener.onQueryPhase(context, System.nanoTime() - time);
return context.queryResult();
} catch (Throwable e) {
- shardSearchStats.onFailedQueryPhase(context);
+ operationListener.onFailedQueryPhase(context);
logger.trace("Query phase failed", e);
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
@@ -368,18 +368,18 @@ public class SearchService extends AbstractLifecycleComponent imp
final SearchContext context = createAndPutContext(request);
contextProcessing(context);
try {
- ShardSearchStats shardSearchStats = context.indexShard().searchService();
- shardSearchStats.onPreQueryPhase(context);
+ SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
+ operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
try {
loadOrExecuteQueryPhase(request, context, queryPhase);
} catch (Throwable e) {
- shardSearchStats.onFailedQueryPhase(context);
+ operationListener.onFailedQueryPhase(context);
throw ExceptionsHelper.convertToRuntime(e);
}
long time2 = System.nanoTime();
- shardSearchStats.onQueryPhase(context, time2 - time);
- shardSearchStats.onPreFetchPhase(context);
+ operationListener.onQueryPhase(context, time2 - time);
+ operationListener.onPreFetchPhase(context);
try {
shortcutDocIdsToLoad(context);
fetchPhase.execute(context);
@@ -389,10 +389,10 @@ public class SearchService extends AbstractLifecycleComponent imp
contextProcessedSuccessfully(context);
}
} catch (Throwable e) {
- shardSearchStats.onFailedFetchPhase(context);
+ operationListener.onFailedFetchPhase(context);
throw ExceptionsHelper.convertToRuntime(e);
}
- shardSearchStats.onFetchPhase(context, System.nanoTime() - time2);
+ operationListener.onFetchPhase(context, System.nanoTime() - time2);
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
} catch (Throwable e) {
logger.trace("Fetch phase failed", e);
@@ -408,18 +408,18 @@ public class SearchService extends AbstractLifecycleComponent imp
contextProcessing(context);
context.searcher().setAggregatedDfs(request.dfs());
try {
- ShardSearchStats shardSearchStats = context.indexShard().searchService();
- shardSearchStats.onPreQueryPhase(context);
+ SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
+ operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
try {
queryPhase.execute(context);
} catch (Throwable e) {
- shardSearchStats.onFailedQueryPhase(context);
+ operationListener.onFailedQueryPhase(context);
throw ExceptionsHelper.convertToRuntime(e);
}
long time2 = System.nanoTime();
- shardSearchStats.onQueryPhase(context, time2 - time);
- shardSearchStats.onPreFetchPhase(context);
+ operationListener.onQueryPhase(context, time2 - time);
+ operationListener.onPreFetchPhase(context);
try {
shortcutDocIdsToLoad(context);
fetchPhase.execute(context);
@@ -429,10 +429,10 @@ public class SearchService extends AbstractLifecycleComponent imp
contextProcessedSuccessfully(context);
}
} catch (Throwable e) {
- shardSearchStats.onFailedFetchPhase(context);
+ operationListener.onFailedFetchPhase(context);
throw ExceptionsHelper.convertToRuntime(e);
}
- shardSearchStats.onFetchPhase(context, System.nanoTime() - time2);
+ operationListener.onFetchPhase(context, System.nanoTime() - time2);
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
} catch (Throwable e) {
logger.trace("Fetch phase failed", e);
@@ -447,19 +447,19 @@ public class SearchService extends AbstractLifecycleComponent imp
final SearchContext context = findContext(request.id());
contextProcessing(context);
try {
- ShardSearchStats shardSearchStats = context.indexShard().searchService();
+ SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
processScroll(request, context);
- shardSearchStats.onPreQueryPhase(context);
+ operationListener.onPreQueryPhase(context);
long time = System.nanoTime();
try {
queryPhase.execute(context);
} catch (Throwable e) {
- shardSearchStats.onFailedQueryPhase(context);
+ operationListener.onFailedQueryPhase(context);
throw ExceptionsHelper.convertToRuntime(e);
}
long time2 = System.nanoTime();
- shardSearchStats.onQueryPhase(context, time2 - time);
- shardSearchStats.onPreFetchPhase(context);
+ operationListener.onQueryPhase(context, time2 - time);
+ operationListener.onPreFetchPhase(context);
try {
shortcutDocIdsToLoad(context);
fetchPhase.execute(context);
@@ -469,10 +469,10 @@ public class SearchService extends AbstractLifecycleComponent imp
contextProcessedSuccessfully(context);
}
} catch (Throwable e) {
- shardSearchStats.onFailedFetchPhase(context);
+ operationListener.onFailedFetchPhase(context);
throw ExceptionsHelper.convertToRuntime(e);
}
- shardSearchStats.onFetchPhase(context, System.nanoTime() - time2);
+ operationListener.onFetchPhase(context, System.nanoTime() - time2);
return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget());
} catch (Throwable e) {
logger.trace("Fetch phase failed", e);
@@ -486,13 +486,13 @@ public class SearchService extends AbstractLifecycleComponent imp
public FetchSearchResult executeFetchPhase(ShardFetchRequest request) {
final SearchContext context = findContext(request.id());
contextProcessing(context);
- final ShardSearchStats shardSearchStats = context.indexShard().searchService();
+ final SearchOperationListener operationListener = context.indexShard().getSearchOperationListener();
try {
if (request.lastEmittedDoc() != null) {
context.scrollContext().lastEmittedDoc = request.lastEmittedDoc();
}
context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());
- shardSearchStats.onPreFetchPhase(context);
+ operationListener.onPreFetchPhase(context);
long time = System.nanoTime();
fetchPhase.execute(context);
if (fetchPhaseShouldFreeContext(context)) {
@@ -500,10 +500,10 @@ public class SearchService extends AbstractLifecycleComponent imp
} else {
contextProcessedSuccessfully(context);
}
- shardSearchStats.onFetchPhase(context, System.nanoTime() - time);
+ operationListener.onFetchPhase(context, System.nanoTime() - time);
return context.fetchResult();
} catch (Throwable e) {
- shardSearchStats.onFailedFetchPhase(context);
+ operationListener.onFailedFetchPhase(context);
logger.trace("Fetch phase failed", e);
processFailure(context, e);
throw ExceptionsHelper.convertToRuntime(e);
@@ -527,9 +527,9 @@ public class SearchService extends AbstractLifecycleComponent imp
try {
putContext(context);
if (request.scroll() != null) {
- context.indexShard().searchService().onNewScrollContext(context);
+ context.indexShard().getSearchOperationListener().onNewScrollContext(context);
}
- context.indexShard().searchService().onNewContext(context);
+ context.indexShard().getSearchOperationListener().onNewContext(context);
success = true;
return context;
} finally {
@@ -617,9 +617,9 @@ public class SearchService extends AbstractLifecycleComponent imp
final SearchContext context = removeContext(id);
if (context != null) {
try {
- context.indexShard().searchService().onFreeContext(context);
+ context.indexShard().getSearchOperationListener().onFreeContext(context);
if (context.scrollContext() != null) {
- context.indexShard().searchService().onFreeScrollContext(context);
+ context.indexShard().getSearchOperationListener().onFreeScrollContext(context);
}
} finally {
context.close();
diff --git a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java
index b7c2c29eb07..b59a54bc4b8 100644
--- a/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java
+++ b/core/src/test/java/org/elasticsearch/index/IndexModuleTests.java
@@ -22,6 +22,7 @@ import org.apache.lucene.index.AssertingDirectoryReader;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.FieldInvertState;
import org.apache.lucene.index.LeafReaderContext;
+import org.apache.lucene.index.Term;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.QueryCachingPolicy;
@@ -44,10 +45,13 @@ import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.cache.query.QueryCache;
import org.elasticsearch.index.cache.query.index.IndexQueryCache;
import org.elasticsearch.index.cache.query.none.NoneQueryCache;
+import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.fielddata.IndexFieldDataCache;
import org.elasticsearch.index.shard.IndexEventListener;
import org.elasticsearch.index.shard.IndexSearcherWrapper;
+import org.elasticsearch.index.shard.IndexingOperationListener;
+import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.similarity.SimilarityProvider;
import org.elasticsearch.index.similarity.SimilarityService;
@@ -65,8 +69,10 @@ import org.elasticsearch.script.ScriptEngineRegistry;
import org.elasticsearch.script.ScriptEngineService;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.script.ScriptSettings;
+import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.IndexSettingsModule;
+import org.elasticsearch.test.TestSearchContext;
import org.elasticsearch.test.engine.MockEngineFactory;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.watcher.ResourceWatcherService;
@@ -160,14 +166,15 @@ public class IndexModuleTests extends ESTestCase {
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings(index, settings);
IndexModule module = new IndexModule(indexSettings, null, new AnalysisRegistry(null, environment));
module.addIndexStore("foo_store", FooStore::new);
- IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, new IndicesFieldDataCache(settings, listener));
- assertTrue(indexService.getIndexStore() instanceof FooStore);
try {
module.addIndexStore("foo_store", FooStore::new);
fail("already registered");
} catch (IllegalArgumentException ex) {
// fine
}
+ IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry, new IndicesFieldDataCache(settings, listener));
+ assertTrue(indexService.getIndexStore() instanceof FooStore);
+
indexService.close("simon says", false);
}
@@ -215,6 +222,65 @@ public class IndexModuleTests extends ESTestCase {
indexService.close("simon says", false);
}
+ public void testAddIndexOperationListener() throws IOException {
+ IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), null, new AnalysisRegistry(null, environment));
+ AtomicBoolean executed = new AtomicBoolean(false);
+ IndexingOperationListener listener = new IndexingOperationListener() {
+ @Override
+ public Engine.Index preIndex(Engine.Index operation) {
+ executed.set(true);
+ return operation;
+ }
+ };
+ module.addIndexOperationListener(listener);
+
+ expectThrows(IllegalArgumentException.class, () -> module.addIndexOperationListener(listener));
+ expectThrows(IllegalArgumentException.class, () -> module.addIndexOperationListener(null));
+
+
+ IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry,
+ new IndicesFieldDataCache(settings, this.listener));
+ assertEquals(2, indexService.getIndexOperationListeners().size());
+ assertEquals(IndexingSlowLog.class, indexService.getIndexOperationListeners().get(0).getClass());
+ assertSame(listener, indexService.getIndexOperationListeners().get(1));
+
+ Engine.Index index = new Engine.Index(new Term("_uid", "1"), null);
+ for (IndexingOperationListener l : indexService.getIndexOperationListeners()) {
+ l.preIndex(index);
+ }
+ assertTrue(executed.get());
+ indexService.close("simon says", false);
+ }
+
+ public void testAddSearchOperationListener() throws IOException {
+ IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), null, new AnalysisRegistry(null, environment));
+ AtomicBoolean executed = new AtomicBoolean(false);
+ SearchOperationListener listener = new SearchOperationListener() {
+
+ @Override
+ public void onNewContext(SearchContext context) {
+ executed.set(true);
+ }
+ };
+ module.addSearchOperationListener(listener);
+
+ expectThrows(IllegalArgumentException.class, () -> module.addSearchOperationListener(listener));
+ expectThrows(IllegalArgumentException.class, () -> module.addSearchOperationListener(null));
+
+
+ IndexService indexService = module.newIndexService(nodeEnvironment, deleter, nodeServicesProvider, indicesQueryCache, mapperRegistry,
+ new IndicesFieldDataCache(settings, this.listener));
+ assertEquals(2, indexService.getSearchOperationListener().size());
+ assertEquals(SearchSlowLog.class, indexService.getSearchOperationListener().get(0).getClass());
+ assertSame(listener, indexService.getSearchOperationListener().get(1));
+
+ for (SearchOperationListener l : indexService.getSearchOperationListener()) {
+ l.onNewContext(new TestSearchContext(null));
+ }
+ assertTrue(executed.get());
+ indexService.close("simon says", false);
+ }
+
public void testAddSimilarity() throws IOException {
Settings indexSettings = Settings.settingsBuilder()
.put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)
@@ -245,6 +311,20 @@ public class IndexModuleTests extends ESTestCase {
indexService.close("simon says", false);
}
+ public void testFrozen() {
+ IndexModule module = new IndexModule(IndexSettingsModule.newIndexSettings(index, settings), null, new AnalysisRegistry(null, environment));
+ module.freeze();
+ String msg = "Can't modify IndexModule once the index service has been created";
+ assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addSearchOperationListener(null)).getMessage());
+ assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addIndexEventListener(null)).getMessage());
+ assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addIndexOperationListener(null)).getMessage());
+ assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addSimilarity(null, null)).getMessage());
+ assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.setSearcherWrapper(null)).getMessage());
+ assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.forceQueryCacheType("foo")).getMessage());
+ assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.addIndexStore("foo", null)).getMessage());
+ assertEquals(msg, expectThrows(IllegalStateException.class, () -> module.registerQueryCache("foo", null)).getMessage());
+ }
+
public void testSetupUnknownSimilarity() throws IOException {
Settings indexSettings = Settings.settingsBuilder()
.put("index.similarity.my_similarity.type", "test_similarity")
diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
index ddb3e484152..2231b9ecc08 100644
--- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
+++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java
@@ -107,6 +107,7 @@ import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -1293,7 +1294,7 @@ public class IndexShardTests extends ESSingleNodeTestCase {
IndexShard newShard = new IndexShard(shard.shardId(), indexService.getIndexSettings(), shard.shardPath(),
shard.store(), indexService.cache(), indexService.mapperService(), indexService.similarityService(),
indexService.fieldData(), shard.getEngineFactory(), indexService.getIndexEventListener(), wrapper,
- indexService.getThreadPool(), indexService.getBigArrays(), indexService.getSearchSlowLog(), null, listeners
+ indexService.getThreadPool(), indexService.getBigArrays(), null, Collections.emptyList(), Arrays.asList(listeners)
);
ShardRoutingHelper.reinit(routing);
newShard.updateRoutingEntry(routing, false);
diff --git a/core/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java b/core/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java
new file mode 100644
index 00000000000..1721e5f5e5d
--- /dev/null
+++ b/core/src/test/java/org/elasticsearch/index/shard/SearchOperationListenerTests.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to Elasticsearch under one or more contributor
+ * license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright
+ * ownership. Elasticsearch licenses this file to you under
+ * the Apache License, Version 2.0 (the "License"); you may
+ * not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.elasticsearch.index.shard;
+
+import org.apache.lucene.index.Term;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.index.engine.Engine;
+import org.elasticsearch.search.internal.SearchContext;
+import org.elasticsearch.test.ESTestCase;
+import org.elasticsearch.test.TestSearchContext;
+
+import java.lang.reflect.Proxy;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class SearchOperationListenerTests extends ESTestCase {
+
+ // this test also tests if calls are correct if one or more listeners throw exceptions
+ public void testListenersAreExecuted() {
+ AtomicInteger preQuery = new AtomicInteger();
+ AtomicInteger failedQuery = new AtomicInteger();
+ AtomicInteger onQuery = new AtomicInteger();
+ AtomicInteger onFetch = new AtomicInteger();
+ AtomicInteger preFetch = new AtomicInteger();
+ AtomicInteger failedFetch = new AtomicInteger();
+ AtomicInteger newContext = new AtomicInteger();
+ AtomicInteger freeContext = new AtomicInteger();
+ AtomicInteger newScrollContext = new AtomicInteger();
+ AtomicInteger freeScrollContext = new AtomicInteger();
+ AtomicInteger timeInNanos = new AtomicInteger(randomIntBetween(0, 10));
+ SearchOperationListener listener = new SearchOperationListener() {
+ @Override
+ public void onPreQueryPhase(SearchContext searchContext) {
+ assertNotNull(searchContext);
+ preQuery.incrementAndGet();
+ }
+
+ @Override
+ public void onFailedQueryPhase(SearchContext searchContext) {
+ assertNotNull(searchContext);
+ failedQuery.incrementAndGet();
+ }
+
+ @Override
+ public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
+ assertEquals(timeInNanos.get(), tookInNanos);
+ assertNotNull(searchContext);
+ onQuery.incrementAndGet();
+ }
+
+ @Override
+ public void onPreFetchPhase(SearchContext searchContext) {
+ assertNotNull(searchContext);
+ preFetch.incrementAndGet();
+
+ }
+
+ @Override
+ public void onFailedFetchPhase(SearchContext searchContext) {
+ assertNotNull(searchContext);
+ failedFetch.incrementAndGet();
+ }
+
+ @Override
+ public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
+ assertEquals(timeInNanos.get(), tookInNanos);
+ onFetch.incrementAndGet();
+ }
+
+ @Override
+ public void onNewContext(SearchContext context) {
+ assertNotNull(context);
+ newContext.incrementAndGet();
+ }
+
+ @Override
+ public void onFreeContext(SearchContext context) {
+ assertNotNull(context);
+ freeContext.incrementAndGet();
+ }
+
+ @Override
+ public void onNewScrollContext(SearchContext context) {
+ assertNotNull(context);
+ newScrollContext.incrementAndGet();
+ }
+
+ @Override
+ public void onFreeScrollContext(SearchContext context) {
+ assertNotNull(context);
+ freeScrollContext.incrementAndGet();
+ }
+ };
+
+ SearchOperationListener throwingListener = (SearchOperationListener) Proxy.newProxyInstance(
+ SearchOperationListener.class.getClassLoader(),
+ new Class[]{SearchOperationListener.class},
+ (a,b,c) -> { throw new RuntimeException();});
+ final List indexingOperationListeners = new ArrayList<>(Arrays.asList(listener, listener));
+ if (randomBoolean()) {
+ indexingOperationListeners.add(throwingListener);
+ if (randomBoolean()) {
+ indexingOperationListeners.add(throwingListener);
+ }
+ }
+ Collections.shuffle(indexingOperationListeners, random());
+ SearchOperationListener.CompositeListener compositeListener =
+ new SearchOperationListener.CompositeListener(indexingOperationListeners, logger);
+ SearchContext ctx = new TestSearchContext(null);
+ compositeListener.onQueryPhase(ctx, timeInNanos.get());
+ assertEquals(0, preFetch.get());
+ assertEquals(0, preQuery.get());
+ assertEquals(0, failedFetch.get());
+ assertEquals(0, failedQuery.get());
+ assertEquals(2, onQuery.get());
+ assertEquals(0, onFetch.get());
+ assertEquals(0, newContext.get());
+ assertEquals(0, newScrollContext.get());
+ assertEquals(0, freeContext.get());
+ assertEquals(0, freeScrollContext.get());
+
+ compositeListener.onFetchPhase(ctx, timeInNanos.get());
+ assertEquals(0, preFetch.get());
+ assertEquals(0, preQuery.get());
+ assertEquals(0, failedFetch.get());
+ assertEquals(0, failedQuery.get());
+ assertEquals(2, onQuery.get());
+ assertEquals(2, onFetch.get());
+ assertEquals(0, newContext.get());
+ assertEquals(0, newScrollContext.get());
+ assertEquals(0, freeContext.get());
+ assertEquals(0, freeScrollContext.get());
+
+ compositeListener.onPreQueryPhase(ctx);
+ assertEquals(0, preFetch.get());
+ assertEquals(2, preQuery.get());
+ assertEquals(0, failedFetch.get());
+ assertEquals(0, failedQuery.get());
+ assertEquals(2, onQuery.get());
+ assertEquals(2, onFetch.get());
+ assertEquals(0, newContext.get());
+ assertEquals(0, newScrollContext.get());
+ assertEquals(0, freeContext.get());
+ assertEquals(0, freeScrollContext.get());
+
+ compositeListener.onPreFetchPhase(ctx);
+ assertEquals(2, preFetch.get());
+ assertEquals(2, preQuery.get());
+ assertEquals(0, failedFetch.get());
+ assertEquals(0, failedQuery.get());
+ assertEquals(2, onQuery.get());
+ assertEquals(2, onFetch.get());
+ assertEquals(0, newContext.get());
+ assertEquals(0, newScrollContext.get());
+ assertEquals(0, freeContext.get());
+ assertEquals(0, freeScrollContext.get());
+
+ compositeListener.onFailedFetchPhase(ctx);
+ assertEquals(2, preFetch.get());
+ assertEquals(2, preQuery.get());
+ assertEquals(2, failedFetch.get());
+ assertEquals(0, failedQuery.get());
+ assertEquals(2, onQuery.get());
+ assertEquals(2, onFetch.get());
+ assertEquals(0, newContext.get());
+ assertEquals(0, newScrollContext.get());
+ assertEquals(0, freeContext.get());
+ assertEquals(0, freeScrollContext.get());
+
+ compositeListener.onFailedQueryPhase(ctx);
+ assertEquals(2, preFetch.get());
+ assertEquals(2, preQuery.get());
+ assertEquals(2, failedFetch.get());
+ assertEquals(2, failedQuery.get());
+ assertEquals(2, onQuery.get());
+ assertEquals(2, onFetch.get());
+ assertEquals(0, newContext.get());
+ assertEquals(0, newScrollContext.get());
+ assertEquals(0, freeContext.get());
+ assertEquals(0, freeScrollContext.get());
+
+ compositeListener.onNewContext(ctx);
+ assertEquals(2, preFetch.get());
+ assertEquals(2, preQuery.get());
+ assertEquals(2, failedFetch.get());
+ assertEquals(2, failedQuery.get());
+ assertEquals(2, onQuery.get());
+ assertEquals(2, onFetch.get());
+ assertEquals(2, newContext.get());
+ assertEquals(0, newScrollContext.get());
+ assertEquals(0, freeContext.get());
+ assertEquals(0, freeScrollContext.get());
+
+ compositeListener.onNewScrollContext(ctx);
+ assertEquals(2, preFetch.get());
+ assertEquals(2, preQuery.get());
+ assertEquals(2, failedFetch.get());
+ assertEquals(2, failedQuery.get());
+ assertEquals(2, onQuery.get());
+ assertEquals(2, onFetch.get());
+ assertEquals(2, newContext.get());
+ assertEquals(2, newScrollContext.get());
+ assertEquals(0, freeContext.get());
+ assertEquals(0, freeScrollContext.get());
+
+ compositeListener.onFreeContext(ctx);
+ assertEquals(2, preFetch.get());
+ assertEquals(2, preQuery.get());
+ assertEquals(2, failedFetch.get());
+ assertEquals(2, failedQuery.get());
+ assertEquals(2, onQuery.get());
+ assertEquals(2, onFetch.get());
+ assertEquals(2, newContext.get());
+ assertEquals(2, newScrollContext.get());
+ assertEquals(2, freeContext.get());
+ assertEquals(0, freeScrollContext.get());
+
+ compositeListener.onFreeScrollContext(ctx);
+ assertEquals(2, preFetch.get());
+ assertEquals(2, preQuery.get());
+ assertEquals(2, failedFetch.get());
+ assertEquals(2, failedQuery.get());
+ assertEquals(2, onQuery.get());
+ assertEquals(2, onFetch.get());
+ assertEquals(2, newContext.get());
+ assertEquals(2, newScrollContext.get());
+ assertEquals(2, freeContext.get());
+ assertEquals(2, freeScrollContext.get());
+ }
+}