diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/indexing/IndexingStats.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/indexing/IndexingStats.java index f19efb1fb76..6965df46ded 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/indexing/IndexingStats.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/indexing/IndexingStats.java @@ -40,27 +40,33 @@ public class IndexingStats implements Streamable, ToXContent { private long indexCount; private long indexTimeInMillis; + private long indexCurrent; private long deleteCount; private long deleteTimeInMillis; + private long deleteCurrent; Stats() { } - public Stats(long indexCount, long indexTimeInMillis, long deleteCount, long deleteTimeInMillis) { + public Stats(long indexCount, long indexTimeInMillis, long indexCurrent, long deleteCount, long deleteTimeInMillis, long deleteCurrent) { this.indexCount = indexCount; this.indexTimeInMillis = indexTimeInMillis; + this.indexCurrent = indexCurrent; this.deleteCount = deleteCount; this.deleteTimeInMillis = deleteTimeInMillis; + this.deleteCurrent = deleteCurrent; } public void add(Stats stats) { indexCount += stats.indexCount; indexTimeInMillis += stats.indexTimeInMillis; + indexCurrent += stats.indexCurrent; deleteCount += stats.deleteCount; deleteTimeInMillis += stats.deleteTimeInMillis; + deleteCurrent += stats.deleteCurrent; } public long indexCount() { @@ -83,6 +89,14 @@ public class IndexingStats implements Streamable, ToXContent { return indexTimeInMillis; } + public long indexCurrent() { + return indexCurrent; + } + + public long getIndexCurrent() { + return indexCurrent; + } + public long deleteCount() { return deleteCount; } @@ -103,6 +117,15 @@ public class IndexingStats implements Streamable, ToXContent { return deleteTimeInMillis; } + + public long deleteCurrent() { + return deleteCurrent; + } + + public long getDeleteCurrent() { + return deleteCurrent; + } + public static Stats readStats(StreamInput in) throws IOException { Stats stats = new Stats(); stats.readFrom(in); @@ -112,27 +135,33 @@ public class IndexingStats implements Streamable, ToXContent { @Override public void readFrom(StreamInput in) throws IOException { indexCount = in.readVLong(); indexTimeInMillis = in.readVLong(); + indexCurrent = in.readVLong(); deleteCount = in.readVLong(); deleteTimeInMillis = in.readVLong(); + deleteCurrent = in.readVLong(); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(indexCount); out.writeVLong(indexTimeInMillis); + out.writeVLong(indexCurrent); out.writeVLong(deleteCount); out.writeVLong(deleteTimeInMillis); + out.writeVLong(deleteCurrent); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field(Fields.INDEX_TOTAL, indexCount); builder.field(Fields.INDEX_TIME, indexTime().toString()); builder.field(Fields.INDEX_TIME_IN_MILLIS, indexTimeInMillis); + builder.field(Fields.INDEX_CURRENT, indexCurrent); builder.field(Fields.DELETE_TOTAL, deleteCount); builder.field(Fields.DELETE_TIME, deleteTime().toString()); builder.field(Fields.DELETE_TIME_IN_MILLIS, deleteTimeInMillis); + builder.field(Fields.DELETE_CURRENT, deleteCurrent); return builder; } @@ -205,9 +234,11 @@ public class IndexingStats implements Streamable, ToXContent { static final XContentBuilderString INDEX_TOTAL = new XContentBuilderString("index_total"); static final XContentBuilderString INDEX_TIME = new XContentBuilderString("index_time"); static final XContentBuilderString INDEX_TIME_IN_MILLIS = new XContentBuilderString("index_time_in_millis"); + static final XContentBuilderString INDEX_CURRENT = new XContentBuilderString("index_current"); static final XContentBuilderString DELETE_TOTAL = new XContentBuilderString("delete_total"); static final XContentBuilderString DELETE_TIME = new XContentBuilderString("delete_time"); static final XContentBuilderString DELETE_TIME_IN_MILLIS = new XContentBuilderString("delete_time_in_millis"); + static final XContentBuilderString DELETE_CURRENT = new XContentBuilderString("delete_current"); } public static IndexingStats readIndexingStats(StreamInput in) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java index 74827cc63cf..e5d5229af31 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/indexing/ShardIndexingService.java @@ -22,6 +22,7 @@ package org.elasticsearch.index.indexing; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.engine.Engine; @@ -113,6 +114,8 @@ public class ShardIndexingService extends AbstractIndexShardComponent { } public Engine.Index preIndex(Engine.Index index) { + totalStats.indexCurrent.inc(); + typeStats(index.type()).indexCurrent.inc(); if (listeners != null) { for (IndexingOperationListener listener : listeners) { index = listener.preIndex(index); @@ -124,7 +127,10 @@ public class ShardIndexingService extends AbstractIndexShardComponent { public void postIndex(Engine.Index index) { long took = index.endTime() - index.startTime(); totalStats.indexMetric.inc(took); - typeStats(index.type()).indexMetric.inc(took); + totalStats.indexCurrent.dec(); + StatsHolder typeStats = typeStats(index.type()); + typeStats.indexMetric.inc(took); + typeStats.indexCurrent.dec(); if (listeners != null) { for (IndexingOperationListener listener : listeners) { listener.postIndex(index); @@ -132,7 +138,14 @@ public class ShardIndexingService extends AbstractIndexShardComponent { } } + public void failedIndex(Engine.Index index) { + totalStats.indexCurrent.dec(); + typeStats(index.type()).indexCurrent.dec(); + } + public Engine.Delete preDelete(Engine.Delete delete) { + totalStats.deleteCurrent.inc(); + typeStats(delete.type()).deleteCurrent.inc(); if (listeners != null) { for (IndexingOperationListener listener : listeners) { delete = listener.preDelete(delete); @@ -144,7 +157,10 @@ public class ShardIndexingService extends AbstractIndexShardComponent { public void postDelete(Engine.Delete delete) { long took = delete.endTime() - delete.startTime(); totalStats.deleteMetric.inc(took); - typeStats(delete.type()).deleteMetric.inc(took); + totalStats.deleteCurrent.dec(); + StatsHolder typeStats = typeStats(delete.type()); + typeStats.deleteMetric.inc(took); + typeStats.deleteCurrent.dec(); if (listeners != null) { for (IndexingOperationListener listener : listeners) { listener.postDelete(delete); @@ -152,6 +168,11 @@ public class ShardIndexingService extends AbstractIndexShardComponent { } } + public void failedDelete(Engine.Delete delete) { + totalStats.deleteCurrent.dec(); + typeStats(delete.type()).deleteCurrent.dec(); + } + public Engine.DeleteByQuery preDeleteByQuery(Engine.DeleteByQuery deleteByQuery) { if (listeners != null) { for (IndexingOperationListener listener : listeners) { @@ -172,7 +193,16 @@ public class ShardIndexingService extends AbstractIndexShardComponent { public void clear() { totalStats.clear(); synchronized (this) { - typesStats = ImmutableMap.of(); + if (!typesStats.isEmpty()) { + MapBuilder typesStatsBuilder = MapBuilder.newMapBuilder(); + for (Map.Entry typeStats : typesStats.entrySet()) { + if (typeStats.getValue().totalCurrent() > 0) { + typeStats.getValue().clear(); + typesStatsBuilder.put(typeStats.getKey(), typeStats.getValue()); + } + } + typesStats = typesStatsBuilder.immutableMap(); + } } } @@ -193,11 +223,17 @@ public class ShardIndexingService extends AbstractIndexShardComponent { static class StatsHolder { public final MeanMetric indexMetric = new MeanMetric(); public final MeanMetric deleteMetric = new MeanMetric(); + public final CounterMetric indexCurrent = new CounterMetric(); + public final CounterMetric deleteCurrent = new CounterMetric(); public IndexingStats.Stats stats() { return new IndexingStats.Stats( - indexMetric.count(), TimeUnit.NANOSECONDS.toMillis(indexMetric.sum()), - deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum())); + indexMetric.count(), TimeUnit.NANOSECONDS.toMillis(indexMetric.sum()), indexCurrent.count(), + deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count()); + } + + public long totalCurrent() { + return indexCurrent.count() + deleteMetric.count(); } public void clear() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/search/stats/SearchStats.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/search/stats/SearchStats.java index 5f961e9e1b8..7e8912ed0b7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/search/stats/SearchStats.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/search/stats/SearchStats.java @@ -40,27 +40,33 @@ public class SearchStats implements Streamable, ToXContent { private long queryCount; private long queryTimeInMillis; + private long queryCurrent; private long fetchCount; private long fetchTimeInMillis; + private long fetchCurrent; Stats() { } - public Stats(long queryCount, long queryTimeInMillis, long fetchCount, long fetchTimeInMillis) { + public Stats(long queryCount, long queryTimeInMillis, long queryCurrent, long fetchCount, long fetchTimeInMillis, long fetchCurrent) { this.queryCount = queryCount; this.queryTimeInMillis = queryTimeInMillis; + this.queryCurrent = queryCurrent; this.fetchCount = fetchCount; this.fetchTimeInMillis = fetchTimeInMillis; + this.fetchCurrent = fetchCurrent; } public void add(Stats stats) { queryCount += stats.queryCount; queryTimeInMillis += stats.queryTimeInMillis; + queryCurrent += stats.queryCurrent; fetchCount += stats.fetchCount; fetchTimeInMillis += stats.fetchTimeInMillis; + fetchCurrent += stats.fetchCurrent; } public long queryCount() { @@ -83,6 +89,14 @@ public class SearchStats implements Streamable, ToXContent { return queryTimeInMillis; } + public long queryCurrent() { + return queryCurrent; + } + + public long getQueryCurrent() { + return queryCurrent; + } + public long fetchCount() { return fetchCount; } @@ -103,6 +117,15 @@ public class SearchStats implements Streamable, ToXContent { return fetchTimeInMillis; } + public long fetchCurrent() { + return fetchCurrent; + } + + public long getFetchCurrent() { + return fetchCurrent; + } + + public static Stats readStats(StreamInput in) throws IOException { Stats stats = new Stats(); stats.readFrom(in); @@ -112,27 +135,33 @@ public class SearchStats implements Streamable, ToXContent { @Override public void readFrom(StreamInput in) throws IOException { queryCount = in.readVLong(); queryTimeInMillis = in.readVLong(); + queryCurrent = in.readVLong(); fetchCount = in.readVLong(); fetchTimeInMillis = in.readVLong(); + fetchCurrent = in.readVLong(); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(queryCount); out.writeVLong(queryTimeInMillis); + out.writeVLong(queryCurrent); out.writeVLong(fetchCount); out.writeVLong(fetchTimeInMillis); + out.writeVLong(fetchCurrent); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field(Fields.QUERY_TOTAL, queryCount); builder.field(Fields.QUERY_TIME, queryTime().toString()); builder.field(Fields.QUERY_TIME_IN_MILLIS, queryTimeInMillis); + builder.field(Fields.QUERY_CURRENT, queryCurrent); builder.field(Fields.FETCH_TOTAL, fetchCount); builder.field(Fields.FETCH_TIME, fetchTime().toString()); builder.field(Fields.FETCH_TIME_IN_MILLIS, fetchTimeInMillis); + builder.field(Fields.FETCH_CURRENT, fetchCurrent); return builder; } @@ -205,9 +234,11 @@ public class SearchStats implements Streamable, ToXContent { static final XContentBuilderString QUERY_TOTAL = new XContentBuilderString("query_total"); static final XContentBuilderString QUERY_TIME = new XContentBuilderString("query_time"); static final XContentBuilderString QUERY_TIME_IN_MILLIS = new XContentBuilderString("query_time_in_millis"); + static final XContentBuilderString QUERY_CURRENT = new XContentBuilderString("query_current"); static final XContentBuilderString FETCH_TOTAL = new XContentBuilderString("fetch_total"); static final XContentBuilderString FETCH_TIME = new XContentBuilderString("fetch_time"); static final XContentBuilderString FETCH_TIME_IN_MILLIS = new XContentBuilderString("fetch_time_in_millis"); + static final XContentBuilderString FETCH_CURRENT = new XContentBuilderString("fetch_current"); } public static SearchStats readSearchStats(StreamInput in) throws IOException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/search/stats/ShardSearchService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/search/stats/ShardSearchService.java index 408aa2cb60a..0a1c9ec8071 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/search/stats/ShardSearchService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/search/stats/ShardSearchService.java @@ -22,6 +22,7 @@ package org.elasticsearch.index.search.stats; import org.elasticsearch.common.collect.ImmutableMap; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.metrics.MeanMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.settings.IndexSettings; @@ -72,20 +73,63 @@ public class ShardSearchService extends AbstractIndexShardComponent { return new SearchStats(total, groupsSt); } - public void onQueryPhase(SearchContext searchContext, long tookInNanos) { - totalStats.queryMetric.inc(tookInNanos); + public void onPreQueryPhase(SearchContext searchContext) { + totalStats.queryCurrent.inc(); if (searchContext.groupStats() != null) { for (int i = 0; i < searchContext.groupStats().size(); i++) { - groupStats(searchContext.groupStats().get(i)).queryMetric.inc(tookInNanos); + groupStats(searchContext.groupStats().get(i)).queryCurrent.inc(); } } } - public void onFetchPhase(SearchContext searchContext, long tookInNanos) { - totalStats.fetchMetric.inc(tookInNanos); + public void onFailedQueryPhase(SearchContext searchContext) { + totalStats.queryCurrent.dec(); if (searchContext.groupStats() != null) { for (int i = 0; i < searchContext.groupStats().size(); i++) { - groupStats(searchContext.groupStats().get(i)).fetchMetric.inc(tookInNanos); + groupStats(searchContext.groupStats().get(i)).queryCurrent.dec(); + } + } + } + + public void onQueryPhase(SearchContext searchContext, long tookInNanos) { + totalStats.queryMetric.inc(tookInNanos); + totalStats.queryCurrent.dec(); + if (searchContext.groupStats() != null) { + for (int i = 0; i < searchContext.groupStats().size(); i++) { + StatsHolder statsHolder = groupStats(searchContext.groupStats().get(i)); + statsHolder.queryMetric.inc(tookInNanos); + statsHolder.queryCurrent.dec(); + } + } + } + + public void onPreFetchPhase(SearchContext searchContext) { + totalStats.fetchCurrent.inc(); + if (searchContext.groupStats() != null) { + for (int i = 0; i < searchContext.groupStats().size(); i++) { + groupStats(searchContext.groupStats().get(i)).fetchCurrent.inc(); + } + } + } + + public void onFailedFetchPhase(SearchContext searchContext) { + totalStats.fetchCurrent.dec(); + if (searchContext.groupStats() != null) { + for (int i = 0; i < searchContext.groupStats().size(); i++) { + groupStats(searchContext.groupStats().get(i)).fetchCurrent.dec(); + } + } + } + + + public void onFetchPhase(SearchContext searchContext, long tookInNanos) { + totalStats.fetchMetric.inc(tookInNanos); + totalStats.fetchCurrent.dec(); + if (searchContext.groupStats() != null) { + for (int i = 0; i < searchContext.groupStats().size(); i++) { + StatsHolder statsHolder = groupStats(searchContext.groupStats().get(i)); + statsHolder.fetchMetric.inc(tookInNanos); + statsHolder.fetchCurrent.dec(); } } } @@ -93,7 +137,16 @@ public class ShardSearchService extends AbstractIndexShardComponent { public void clear() { totalStats.clear(); synchronized (this) { - groupsStats = ImmutableMap.of(); + if (!groupsStats.isEmpty()) { + MapBuilder typesStatsBuilder = MapBuilder.newMapBuilder(); + for (Map.Entry typeStats : groupsStats.entrySet()) { + if (typeStats.getValue().totalCurrent() > 0) { + typeStats.getValue().clear(); + typesStatsBuilder.put(typeStats.getKey(), typeStats.getValue()); + } + } + groupsStats = typesStatsBuilder.immutableMap(); + } } } @@ -114,11 +167,17 @@ public class ShardSearchService extends AbstractIndexShardComponent { static class StatsHolder { public final MeanMetric queryMetric = new MeanMetric(); public final MeanMetric fetchMetric = new MeanMetric(); + public final CounterMetric queryCurrent = new CounterMetric(); + public final CounterMetric fetchCurrent = new CounterMetric(); public SearchStats.Stats stats() { return new SearchStats.Stats( - queryMetric.count(), TimeUnit.NANOSECONDS.toMillis(queryMetric.sum()), - fetchMetric.count(), TimeUnit.NANOSECONDS.toMillis(fetchMetric.sum())); + queryMetric.count(), TimeUnit.NANOSECONDS.toMillis(queryMetric.sum()), queryCurrent.count(), + fetchMetric.count(), TimeUnit.NANOSECONDS.toMillis(fetchMetric.sum()), fetchCurrent.count()); + } + + public long totalCurrent() { + return queryCurrent.count() + fetchCurrent.count(); } public void clear() { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index fd43e96e1dc..4bb205a08a7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -306,11 +306,16 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I @Override public ParsedDocument index(Engine.Index index) throws ElasticSearchException { writeAllowed(); index = indexingService.preIndex(index); - if (logger.isTraceEnabled()) { - logger.trace("index {}", index.docs()); + try { + if (logger.isTraceEnabled()) { + logger.trace("index {}", index.docs()); + } + engine.index(index); + index.endTime(System.nanoTime()); + } catch (RuntimeException ex) { + indexingService.failedIndex(index); + throw ex; } - engine.index(index); - index.endTime(System.nanoTime()); indexingService.postIndex(index); return index.parsedDoc(); } @@ -324,11 +329,16 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I @Override public void delete(Engine.Delete delete) throws ElasticSearchException { writeAllowed(); delete = indexingService.preDelete(delete); - if (logger.isTraceEnabled()) { - logger.trace("delete [{}]", delete.uid().text()); + try { + if (logger.isTraceEnabled()) { + logger.trace("delete [{}]", delete.uid().text()); + } + engine.delete(delete); + delete.endTime(System.nanoTime()); + } catch (RuntimeException ex) { + indexingService.failedDelete(delete); + throw ex; } - engine.delete(delete); - delete.endTime(System.nanoTime()); indexingService.postDelete(delete); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java index b2db301f826..9500203a707 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/search/SearchService.java @@ -232,6 +232,7 @@ public class SearchService extends AbstractLifecycleComponent { SearchContext context = createContext(request); activeContexts.put(context.id(), context); try { + context.indexShard().searchService().onPreQueryPhase(context); long time = System.nanoTime(); contextProcessing(context); queryPhase.execute(context); @@ -243,6 +244,7 @@ public class SearchService extends AbstractLifecycleComponent { context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time); return context.queryResult(); } catch (RuntimeException e) { + context.indexShard().searchService().onFailedQueryPhase(context); logger.trace("Query phase failed", e); freeContext(context); throw e; @@ -254,6 +256,7 @@ public class SearchService extends AbstractLifecycleComponent { public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request) throws ElasticSearchException { SearchContext context = findContext(request.id()); try { + context.indexShard().searchService().onPreQueryPhase(context); long time = System.nanoTime(); contextProcessing(context); processScroll(request, context); @@ -262,6 +265,7 @@ public class SearchService extends AbstractLifecycleComponent { context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time); return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget()); } catch (RuntimeException e) { + context.indexShard().searchService().onFailedQueryPhase(context); logger.trace("Query phase failed", e); freeContext(context); throw e; @@ -281,12 +285,14 @@ public class SearchService extends AbstractLifecycleComponent { throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e); } try { + context.indexShard().searchService().onPreQueryPhase(context); long time = System.nanoTime(); queryPhase.execute(context); contextProcessedSuccessfully(context); context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time); return context.queryResult(); } catch (RuntimeException e) { + context.indexShard().searchService().onFailedQueryPhase(context); logger.trace("Query phase failed", e); freeContext(context); throw e; @@ -300,16 +306,28 @@ public class SearchService extends AbstractLifecycleComponent { activeContexts.put(context.id(), context); contextProcessing(context); try { + context.indexShard().searchService().onPreQueryPhase(context); long time = System.nanoTime(); - queryPhase.execute(context); + try { + queryPhase.execute(context); + } catch (RuntimeException e) { + context.indexShard().searchService().onFailedQueryPhase(context); + throw e; + } long time2 = System.nanoTime(); context.indexShard().searchService().onQueryPhase(context, time2 - time); - shortcutDocIdsToLoad(context); - fetchPhase.execute(context); - if (context.scroll() == null) { - freeContext(context.id()); - } else { - contextProcessedSuccessfully(context); + context.indexShard().searchService().onPreFetchPhase(context); + try { + shortcutDocIdsToLoad(context); + fetchPhase.execute(context); + if (context.scroll() == null) { + freeContext(context.id()); + } else { + contextProcessedSuccessfully(context); + } + } catch (RuntimeException e) { + context.indexShard().searchService().onFailedFetchPhase(context); + throw e; } context.indexShard().searchService().onFetchPhase(context, System.nanoTime() - time2); return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); @@ -333,16 +351,28 @@ public class SearchService extends AbstractLifecycleComponent { throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e); } try { + context.indexShard().searchService().onPreQueryPhase(context); long time = System.nanoTime(); - queryPhase.execute(context); + try { + queryPhase.execute(context); + } catch (RuntimeException e) { + context.indexShard().searchService().onFailedQueryPhase(context); + throw e; + } long time2 = System.nanoTime(); context.indexShard().searchService().onQueryPhase(context, time2 - time); - shortcutDocIdsToLoad(context); - fetchPhase.execute(context); - if (context.scroll() == null) { - freeContext(request.id()); - } else { - contextProcessedSuccessfully(context); + context.indexShard().searchService().onPreFetchPhase(context); + try { + shortcutDocIdsToLoad(context); + fetchPhase.execute(context); + if (context.scroll() == null) { + freeContext(request.id()); + } else { + contextProcessedSuccessfully(context); + } + } catch (RuntimeException e) { + context.indexShard().searchService().onFailedFetchPhase(context); + throw e; } context.indexShard().searchService().onFetchPhase(context, System.nanoTime() - time2); return new QueryFetchSearchResult(context.queryResult(), context.fetchResult()); @@ -360,16 +390,28 @@ public class SearchService extends AbstractLifecycleComponent { contextProcessing(context); try { processScroll(request, context); + context.indexShard().searchService().onPreQueryPhase(context); long time = System.nanoTime(); - queryPhase.execute(context); + try { + queryPhase.execute(context); + } catch (RuntimeException e) { + context.indexShard().searchService().onFailedQueryPhase(context); + throw e; + } long time2 = System.nanoTime(); context.indexShard().searchService().onQueryPhase(context, time2 - time); - shortcutDocIdsToLoad(context); - fetchPhase.execute(context); - if (context.scroll() == null) { - freeContext(request.id()); - } else { - contextProcessedSuccessfully(context); + context.indexShard().searchService().onPreFetchPhase(context); + try { + shortcutDocIdsToLoad(context); + fetchPhase.execute(context); + if (context.scroll() == null) { + freeContext(request.id()); + } else { + contextProcessedSuccessfully(context); + } + } catch (RuntimeException e) { + context.indexShard().searchService().onFailedFetchPhase(context); + throw e; } context.indexShard().searchService().onFetchPhase(context, System.nanoTime() - time2); return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget()); @@ -387,6 +429,7 @@ public class SearchService extends AbstractLifecycleComponent { contextProcessing(context); try { context.docIdsToLoad(request.docIds(), 0, request.docIdsSize()); + context.indexShard().searchService().onPreFetchPhase(context); long time = System.nanoTime(); fetchPhase.execute(context); if (context.scroll() == null) { @@ -397,6 +440,7 @@ public class SearchService extends AbstractLifecycleComponent { context.indexShard().searchService().onFetchPhase(context, System.nanoTime() - time); return context.fetchResult(); } catch (RuntimeException e) { + context.indexShard().searchService().onFailedFetchPhase(context); logger.trace("Fetch phase failed", e); freeContext(context); throw e; diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indices/stats/SimpleIndexStatsTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indices/stats/SimpleIndexStatsTests.java index 2717b35900b..dc317e9b217 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indices/stats/SimpleIndexStatsTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/indices/stats/SimpleIndexStatsTests.java @@ -88,6 +88,12 @@ public class SimpleIndexStatsTests extends AbstractNodesTests { assertThat(stats.index("test2").primaries().docs().count(), equalTo(1l)); assertThat(stats.index("test2").total().docs().count(), equalTo(2l)); + // make sure that number of requests in progress is 0 + assertThat(stats.index("test1").total().indexing().total().indexCurrent(), equalTo(0l)); + assertThat(stats.index("test1").total().indexing().total().deleteCurrent(), equalTo(0l)); + assertThat(stats.index("test1").total().search().total().fetchCurrent(), equalTo(0l)); + assertThat(stats.index("test1").total().search().total().queryCurrent(), equalTo(0l)); + // check flags stats = client.admin().indices().prepareStats() .setDocs(false) @@ -110,6 +116,8 @@ public class SimpleIndexStatsTests extends AbstractNodesTests { assertThat(stats.primaries().indexing().typeStats().get("type1").indexCount(), equalTo(1l)); assertThat(stats.primaries().indexing().typeStats().get("type").indexCount(), equalTo(1l)); assertThat(stats.primaries().indexing().typeStats().get("type2"), nullValue()); + assertThat(stats.primaries().indexing().typeStats().get("type1").indexCurrent(), equalTo(0l)); + assertThat(stats.primaries().indexing().typeStats().get("type1").deleteCurrent(), equalTo(0l)); assertThat(stats.total().get().count(), equalTo(0l)); // check get