Add counts of currently executing index, delete, query and fetch operations
This commit is contained in:
parent
eefbe52580
commit
62d67e6c2d
|
@ -40,27 +40,33 @@ public class IndexingStats implements Streamable, ToXContent {
|
|||
|
||||
private long indexCount;
|
||||
private long indexTimeInMillis;
|
||||
private long indexCurrent;
|
||||
|
||||
private long deleteCount;
|
||||
private long deleteTimeInMillis;
|
||||
private long deleteCurrent;
|
||||
|
||||
Stats() {
|
||||
|
||||
}
|
||||
|
||||
public Stats(long indexCount, long indexTimeInMillis, long deleteCount, long deleteTimeInMillis) {
|
||||
public Stats(long indexCount, long indexTimeInMillis, long indexCurrent, long deleteCount, long deleteTimeInMillis, long deleteCurrent) {
|
||||
this.indexCount = indexCount;
|
||||
this.indexTimeInMillis = indexTimeInMillis;
|
||||
this.indexCurrent = indexCurrent;
|
||||
this.deleteCount = deleteCount;
|
||||
this.deleteTimeInMillis = deleteTimeInMillis;
|
||||
this.deleteCurrent = deleteCurrent;
|
||||
}
|
||||
|
||||
public void add(Stats stats) {
|
||||
indexCount += stats.indexCount;
|
||||
indexTimeInMillis += stats.indexTimeInMillis;
|
||||
indexCurrent += stats.indexCurrent;
|
||||
|
||||
deleteCount += stats.deleteCount;
|
||||
deleteTimeInMillis += stats.deleteTimeInMillis;
|
||||
deleteCurrent += stats.deleteCurrent;
|
||||
}
|
||||
|
||||
public long indexCount() {
|
||||
|
@ -83,6 +89,14 @@ public class IndexingStats implements Streamable, ToXContent {
|
|||
return indexTimeInMillis;
|
||||
}
|
||||
|
||||
public long indexCurrent() {
|
||||
return indexCurrent;
|
||||
}
|
||||
|
||||
public long getIndexCurrent() {
|
||||
return indexCurrent;
|
||||
}
|
||||
|
||||
public long deleteCount() {
|
||||
return deleteCount;
|
||||
}
|
||||
|
@ -103,6 +117,15 @@ public class IndexingStats implements Streamable, ToXContent {
|
|||
return deleteTimeInMillis;
|
||||
}
|
||||
|
||||
|
||||
public long deleteCurrent() {
|
||||
return deleteCurrent;
|
||||
}
|
||||
|
||||
public long getDeleteCurrent() {
|
||||
return deleteCurrent;
|
||||
}
|
||||
|
||||
public static Stats readStats(StreamInput in) throws IOException {
|
||||
Stats stats = new Stats();
|
||||
stats.readFrom(in);
|
||||
|
@ -112,27 +135,33 @@ public class IndexingStats implements Streamable, ToXContent {
|
|||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
indexCount = in.readVLong();
|
||||
indexTimeInMillis = in.readVLong();
|
||||
indexCurrent = in.readVLong();
|
||||
|
||||
deleteCount = in.readVLong();
|
||||
deleteTimeInMillis = in.readVLong();
|
||||
deleteCurrent = in.readVLong();
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVLong(indexCount);
|
||||
out.writeVLong(indexTimeInMillis);
|
||||
out.writeVLong(indexCurrent);
|
||||
|
||||
out.writeVLong(deleteCount);
|
||||
out.writeVLong(deleteTimeInMillis);
|
||||
out.writeVLong(deleteCurrent);
|
||||
}
|
||||
|
||||
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.field(Fields.INDEX_TOTAL, indexCount);
|
||||
builder.field(Fields.INDEX_TIME, indexTime().toString());
|
||||
builder.field(Fields.INDEX_TIME_IN_MILLIS, indexTimeInMillis);
|
||||
builder.field(Fields.INDEX_CURRENT, indexCurrent);
|
||||
|
||||
builder.field(Fields.DELETE_TOTAL, deleteCount);
|
||||
builder.field(Fields.DELETE_TIME, deleteTime().toString());
|
||||
builder.field(Fields.DELETE_TIME_IN_MILLIS, deleteTimeInMillis);
|
||||
builder.field(Fields.DELETE_CURRENT, deleteCurrent);
|
||||
|
||||
return builder;
|
||||
}
|
||||
|
@ -205,9 +234,11 @@ public class IndexingStats implements Streamable, ToXContent {
|
|||
static final XContentBuilderString INDEX_TOTAL = new XContentBuilderString("index_total");
|
||||
static final XContentBuilderString INDEX_TIME = new XContentBuilderString("index_time");
|
||||
static final XContentBuilderString INDEX_TIME_IN_MILLIS = new XContentBuilderString("index_time_in_millis");
|
||||
static final XContentBuilderString INDEX_CURRENT = new XContentBuilderString("index_current");
|
||||
static final XContentBuilderString DELETE_TOTAL = new XContentBuilderString("delete_total");
|
||||
static final XContentBuilderString DELETE_TIME = new XContentBuilderString("delete_time");
|
||||
static final XContentBuilderString DELETE_TIME_IN_MILLIS = new XContentBuilderString("delete_time_in_millis");
|
||||
static final XContentBuilderString DELETE_CURRENT = new XContentBuilderString("delete_current");
|
||||
}
|
||||
|
||||
public static IndexingStats readIndexingStats(StreamInput in) throws IOException {
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.index.indexing;
|
|||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.metrics.CounterMetric;
|
||||
import org.elasticsearch.common.metrics.MeanMetric;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.engine.Engine;
|
||||
|
@ -113,6 +114,8 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
|
|||
}
|
||||
|
||||
public Engine.Index preIndex(Engine.Index index) {
|
||||
totalStats.indexCurrent.inc();
|
||||
typeStats(index.type()).indexCurrent.inc();
|
||||
if (listeners != null) {
|
||||
for (IndexingOperationListener listener : listeners) {
|
||||
index = listener.preIndex(index);
|
||||
|
@ -124,7 +127,10 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
|
|||
public void postIndex(Engine.Index index) {
|
||||
long took = index.endTime() - index.startTime();
|
||||
totalStats.indexMetric.inc(took);
|
||||
typeStats(index.type()).indexMetric.inc(took);
|
||||
totalStats.indexCurrent.dec();
|
||||
StatsHolder typeStats = typeStats(index.type());
|
||||
typeStats.indexMetric.inc(took);
|
||||
typeStats.indexCurrent.dec();
|
||||
if (listeners != null) {
|
||||
for (IndexingOperationListener listener : listeners) {
|
||||
listener.postIndex(index);
|
||||
|
@ -132,7 +138,14 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
|
|||
}
|
||||
}
|
||||
|
||||
public void failedIndex(Engine.Index index) {
|
||||
totalStats.indexCurrent.dec();
|
||||
typeStats(index.type()).indexCurrent.dec();
|
||||
}
|
||||
|
||||
public Engine.Delete preDelete(Engine.Delete delete) {
|
||||
totalStats.deleteCurrent.inc();
|
||||
typeStats(delete.type()).deleteCurrent.inc();
|
||||
if (listeners != null) {
|
||||
for (IndexingOperationListener listener : listeners) {
|
||||
delete = listener.preDelete(delete);
|
||||
|
@ -144,7 +157,10 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
|
|||
public void postDelete(Engine.Delete delete) {
|
||||
long took = delete.endTime() - delete.startTime();
|
||||
totalStats.deleteMetric.inc(took);
|
||||
typeStats(delete.type()).deleteMetric.inc(took);
|
||||
totalStats.deleteCurrent.dec();
|
||||
StatsHolder typeStats = typeStats(delete.type());
|
||||
typeStats.deleteMetric.inc(took);
|
||||
typeStats.deleteCurrent.dec();
|
||||
if (listeners != null) {
|
||||
for (IndexingOperationListener listener : listeners) {
|
||||
listener.postDelete(delete);
|
||||
|
@ -152,6 +168,11 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
|
|||
}
|
||||
}
|
||||
|
||||
public void failedDelete(Engine.Delete delete) {
|
||||
totalStats.deleteCurrent.dec();
|
||||
typeStats(delete.type()).deleteCurrent.dec();
|
||||
}
|
||||
|
||||
public Engine.DeleteByQuery preDeleteByQuery(Engine.DeleteByQuery deleteByQuery) {
|
||||
if (listeners != null) {
|
||||
for (IndexingOperationListener listener : listeners) {
|
||||
|
@ -172,7 +193,16 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
|
|||
public void clear() {
|
||||
totalStats.clear();
|
||||
synchronized (this) {
|
||||
typesStats = ImmutableMap.of();
|
||||
if (!typesStats.isEmpty()) {
|
||||
MapBuilder<String, StatsHolder> typesStatsBuilder = MapBuilder.newMapBuilder();
|
||||
for (Map.Entry<String, StatsHolder> typeStats : typesStats.entrySet()) {
|
||||
if (typeStats.getValue().totalCurrent() > 0) {
|
||||
typeStats.getValue().clear();
|
||||
typesStatsBuilder.put(typeStats.getKey(), typeStats.getValue());
|
||||
}
|
||||
}
|
||||
typesStats = typesStatsBuilder.immutableMap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -193,11 +223,17 @@ public class ShardIndexingService extends AbstractIndexShardComponent {
|
|||
static class StatsHolder {
|
||||
public final MeanMetric indexMetric = new MeanMetric();
|
||||
public final MeanMetric deleteMetric = new MeanMetric();
|
||||
public final CounterMetric indexCurrent = new CounterMetric();
|
||||
public final CounterMetric deleteCurrent = new CounterMetric();
|
||||
|
||||
public IndexingStats.Stats stats() {
|
||||
return new IndexingStats.Stats(
|
||||
indexMetric.count(), TimeUnit.NANOSECONDS.toMillis(indexMetric.sum()),
|
||||
deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()));
|
||||
indexMetric.count(), TimeUnit.NANOSECONDS.toMillis(indexMetric.sum()), indexCurrent.count(),
|
||||
deleteMetric.count(), TimeUnit.NANOSECONDS.toMillis(deleteMetric.sum()), deleteCurrent.count());
|
||||
}
|
||||
|
||||
public long totalCurrent() {
|
||||
return indexCurrent.count() + deleteMetric.count();
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
|
|
|
@ -40,27 +40,33 @@ public class SearchStats implements Streamable, ToXContent {
|
|||
|
||||
private long queryCount;
|
||||
private long queryTimeInMillis;
|
||||
private long queryCurrent;
|
||||
|
||||
private long fetchCount;
|
||||
private long fetchTimeInMillis;
|
||||
private long fetchCurrent;
|
||||
|
||||
Stats() {
|
||||
|
||||
}
|
||||
|
||||
public Stats(long queryCount, long queryTimeInMillis, long fetchCount, long fetchTimeInMillis) {
|
||||
public Stats(long queryCount, long queryTimeInMillis, long queryCurrent, long fetchCount, long fetchTimeInMillis, long fetchCurrent) {
|
||||
this.queryCount = queryCount;
|
||||
this.queryTimeInMillis = queryTimeInMillis;
|
||||
this.queryCurrent = queryCurrent;
|
||||
this.fetchCount = fetchCount;
|
||||
this.fetchTimeInMillis = fetchTimeInMillis;
|
||||
this.fetchCurrent = fetchCurrent;
|
||||
}
|
||||
|
||||
public void add(Stats stats) {
|
||||
queryCount += stats.queryCount;
|
||||
queryTimeInMillis += stats.queryTimeInMillis;
|
||||
queryCurrent += stats.queryCurrent;
|
||||
|
||||
fetchCount += stats.fetchCount;
|
||||
fetchTimeInMillis += stats.fetchTimeInMillis;
|
||||
fetchCurrent += stats.fetchCurrent;
|
||||
}
|
||||
|
||||
public long queryCount() {
|
||||
|
@ -83,6 +89,14 @@ public class SearchStats implements Streamable, ToXContent {
|
|||
return queryTimeInMillis;
|
||||
}
|
||||
|
||||
public long queryCurrent() {
|
||||
return queryCurrent;
|
||||
}
|
||||
|
||||
public long getQueryCurrent() {
|
||||
return queryCurrent;
|
||||
}
|
||||
|
||||
public long fetchCount() {
|
||||
return fetchCount;
|
||||
}
|
||||
|
@ -103,6 +117,15 @@ public class SearchStats implements Streamable, ToXContent {
|
|||
return fetchTimeInMillis;
|
||||
}
|
||||
|
||||
public long fetchCurrent() {
|
||||
return fetchCurrent;
|
||||
}
|
||||
|
||||
public long getFetchCurrent() {
|
||||
return fetchCurrent;
|
||||
}
|
||||
|
||||
|
||||
public static Stats readStats(StreamInput in) throws IOException {
|
||||
Stats stats = new Stats();
|
||||
stats.readFrom(in);
|
||||
|
@ -112,27 +135,33 @@ public class SearchStats implements Streamable, ToXContent {
|
|||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
queryCount = in.readVLong();
|
||||
queryTimeInMillis = in.readVLong();
|
||||
queryCurrent = in.readVLong();
|
||||
|
||||
fetchCount = in.readVLong();
|
||||
fetchTimeInMillis = in.readVLong();
|
||||
fetchCurrent = in.readVLong();
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVLong(queryCount);
|
||||
out.writeVLong(queryTimeInMillis);
|
||||
out.writeVLong(queryCurrent);
|
||||
|
||||
out.writeVLong(fetchCount);
|
||||
out.writeVLong(fetchTimeInMillis);
|
||||
out.writeVLong(fetchCurrent);
|
||||
}
|
||||
|
||||
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.field(Fields.QUERY_TOTAL, queryCount);
|
||||
builder.field(Fields.QUERY_TIME, queryTime().toString());
|
||||
builder.field(Fields.QUERY_TIME_IN_MILLIS, queryTimeInMillis);
|
||||
builder.field(Fields.QUERY_CURRENT, queryCurrent);
|
||||
|
||||
builder.field(Fields.FETCH_TOTAL, fetchCount);
|
||||
builder.field(Fields.FETCH_TIME, fetchTime().toString());
|
||||
builder.field(Fields.FETCH_TIME_IN_MILLIS, fetchTimeInMillis);
|
||||
builder.field(Fields.FETCH_CURRENT, fetchCurrent);
|
||||
|
||||
return builder;
|
||||
}
|
||||
|
@ -205,9 +234,11 @@ public class SearchStats implements Streamable, ToXContent {
|
|||
static final XContentBuilderString QUERY_TOTAL = new XContentBuilderString("query_total");
|
||||
static final XContentBuilderString QUERY_TIME = new XContentBuilderString("query_time");
|
||||
static final XContentBuilderString QUERY_TIME_IN_MILLIS = new XContentBuilderString("query_time_in_millis");
|
||||
static final XContentBuilderString QUERY_CURRENT = new XContentBuilderString("query_current");
|
||||
static final XContentBuilderString FETCH_TOTAL = new XContentBuilderString("fetch_total");
|
||||
static final XContentBuilderString FETCH_TIME = new XContentBuilderString("fetch_time");
|
||||
static final XContentBuilderString FETCH_TIME_IN_MILLIS = new XContentBuilderString("fetch_time_in_millis");
|
||||
static final XContentBuilderString FETCH_CURRENT = new XContentBuilderString("fetch_current");
|
||||
}
|
||||
|
||||
public static SearchStats readSearchStats(StreamInput in) throws IOException {
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.index.search.stats;
|
|||
import org.elasticsearch.common.collect.ImmutableMap;
|
||||
import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.metrics.CounterMetric;
|
||||
import org.elasticsearch.common.metrics.MeanMetric;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
|
@ -72,20 +73,63 @@ public class ShardSearchService extends AbstractIndexShardComponent {
|
|||
return new SearchStats(total, groupsSt);
|
||||
}
|
||||
|
||||
public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
|
||||
totalStats.queryMetric.inc(tookInNanos);
|
||||
public void onPreQueryPhase(SearchContext searchContext) {
|
||||
totalStats.queryCurrent.inc();
|
||||
if (searchContext.groupStats() != null) {
|
||||
for (int i = 0; i < searchContext.groupStats().size(); i++) {
|
||||
groupStats(searchContext.groupStats().get(i)).queryMetric.inc(tookInNanos);
|
||||
groupStats(searchContext.groupStats().get(i)).queryCurrent.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
|
||||
totalStats.fetchMetric.inc(tookInNanos);
|
||||
public void onFailedQueryPhase(SearchContext searchContext) {
|
||||
totalStats.queryCurrent.dec();
|
||||
if (searchContext.groupStats() != null) {
|
||||
for (int i = 0; i < searchContext.groupStats().size(); i++) {
|
||||
groupStats(searchContext.groupStats().get(i)).fetchMetric.inc(tookInNanos);
|
||||
groupStats(searchContext.groupStats().get(i)).queryCurrent.dec();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
|
||||
totalStats.queryMetric.inc(tookInNanos);
|
||||
totalStats.queryCurrent.dec();
|
||||
if (searchContext.groupStats() != null) {
|
||||
for (int i = 0; i < searchContext.groupStats().size(); i++) {
|
||||
StatsHolder statsHolder = groupStats(searchContext.groupStats().get(i));
|
||||
statsHolder.queryMetric.inc(tookInNanos);
|
||||
statsHolder.queryCurrent.dec();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void onPreFetchPhase(SearchContext searchContext) {
|
||||
totalStats.fetchCurrent.inc();
|
||||
if (searchContext.groupStats() != null) {
|
||||
for (int i = 0; i < searchContext.groupStats().size(); i++) {
|
||||
groupStats(searchContext.groupStats().get(i)).fetchCurrent.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void onFailedFetchPhase(SearchContext searchContext) {
|
||||
totalStats.fetchCurrent.dec();
|
||||
if (searchContext.groupStats() != null) {
|
||||
for (int i = 0; i < searchContext.groupStats().size(); i++) {
|
||||
groupStats(searchContext.groupStats().get(i)).fetchCurrent.dec();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
|
||||
totalStats.fetchMetric.inc(tookInNanos);
|
||||
totalStats.fetchCurrent.dec();
|
||||
if (searchContext.groupStats() != null) {
|
||||
for (int i = 0; i < searchContext.groupStats().size(); i++) {
|
||||
StatsHolder statsHolder = groupStats(searchContext.groupStats().get(i));
|
||||
statsHolder.fetchMetric.inc(tookInNanos);
|
||||
statsHolder.fetchCurrent.dec();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -93,7 +137,16 @@ public class ShardSearchService extends AbstractIndexShardComponent {
|
|||
public void clear() {
|
||||
totalStats.clear();
|
||||
synchronized (this) {
|
||||
groupsStats = ImmutableMap.of();
|
||||
if (!groupsStats.isEmpty()) {
|
||||
MapBuilder<String, StatsHolder> typesStatsBuilder = MapBuilder.newMapBuilder();
|
||||
for (Map.Entry<String, StatsHolder> typeStats : groupsStats.entrySet()) {
|
||||
if (typeStats.getValue().totalCurrent() > 0) {
|
||||
typeStats.getValue().clear();
|
||||
typesStatsBuilder.put(typeStats.getKey(), typeStats.getValue());
|
||||
}
|
||||
}
|
||||
groupsStats = typesStatsBuilder.immutableMap();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -114,11 +167,17 @@ public class ShardSearchService extends AbstractIndexShardComponent {
|
|||
static class StatsHolder {
|
||||
public final MeanMetric queryMetric = new MeanMetric();
|
||||
public final MeanMetric fetchMetric = new MeanMetric();
|
||||
public final CounterMetric queryCurrent = new CounterMetric();
|
||||
public final CounterMetric fetchCurrent = new CounterMetric();
|
||||
|
||||
public SearchStats.Stats stats() {
|
||||
return new SearchStats.Stats(
|
||||
queryMetric.count(), TimeUnit.NANOSECONDS.toMillis(queryMetric.sum()),
|
||||
fetchMetric.count(), TimeUnit.NANOSECONDS.toMillis(fetchMetric.sum()));
|
||||
queryMetric.count(), TimeUnit.NANOSECONDS.toMillis(queryMetric.sum()), queryCurrent.count(),
|
||||
fetchMetric.count(), TimeUnit.NANOSECONDS.toMillis(fetchMetric.sum()), fetchCurrent.count());
|
||||
}
|
||||
|
||||
public long totalCurrent() {
|
||||
return queryCurrent.count() + fetchCurrent.count();
|
||||
}
|
||||
|
||||
public void clear() {
|
||||
|
|
|
@ -306,11 +306,16 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
@Override public ParsedDocument index(Engine.Index index) throws ElasticSearchException {
|
||||
writeAllowed();
|
||||
index = indexingService.preIndex(index);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("index {}", index.docs());
|
||||
try {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("index {}", index.docs());
|
||||
}
|
||||
engine.index(index);
|
||||
index.endTime(System.nanoTime());
|
||||
} catch (RuntimeException ex) {
|
||||
indexingService.failedIndex(index);
|
||||
throw ex;
|
||||
}
|
||||
engine.index(index);
|
||||
index.endTime(System.nanoTime());
|
||||
indexingService.postIndex(index);
|
||||
return index.parsedDoc();
|
||||
}
|
||||
|
@ -324,11 +329,16 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
|||
@Override public void delete(Engine.Delete delete) throws ElasticSearchException {
|
||||
writeAllowed();
|
||||
delete = indexingService.preDelete(delete);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("delete [{}]", delete.uid().text());
|
||||
try {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("delete [{}]", delete.uid().text());
|
||||
}
|
||||
engine.delete(delete);
|
||||
delete.endTime(System.nanoTime());
|
||||
} catch (RuntimeException ex) {
|
||||
indexingService.failedDelete(delete);
|
||||
throw ex;
|
||||
}
|
||||
engine.delete(delete);
|
||||
delete.endTime(System.nanoTime());
|
||||
indexingService.postDelete(delete);
|
||||
}
|
||||
|
||||
|
|
|
@ -232,6 +232,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
SearchContext context = createContext(request);
|
||||
activeContexts.put(context.id(), context);
|
||||
try {
|
||||
context.indexShard().searchService().onPreQueryPhase(context);
|
||||
long time = System.nanoTime();
|
||||
contextProcessing(context);
|
||||
queryPhase.execute(context);
|
||||
|
@ -243,6 +244,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time);
|
||||
return context.queryResult();
|
||||
} catch (RuntimeException e) {
|
||||
context.indexShard().searchService().onFailedQueryPhase(context);
|
||||
logger.trace("Query phase failed", e);
|
||||
freeContext(context);
|
||||
throw e;
|
||||
|
@ -254,6 +256,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
public ScrollQuerySearchResult executeQueryPhase(InternalScrollSearchRequest request) throws ElasticSearchException {
|
||||
SearchContext context = findContext(request.id());
|
||||
try {
|
||||
context.indexShard().searchService().onPreQueryPhase(context);
|
||||
long time = System.nanoTime();
|
||||
contextProcessing(context);
|
||||
processScroll(request, context);
|
||||
|
@ -262,6 +265,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time);
|
||||
return new ScrollQuerySearchResult(context.queryResult(), context.shardTarget());
|
||||
} catch (RuntimeException e) {
|
||||
context.indexShard().searchService().onFailedQueryPhase(context);
|
||||
logger.trace("Query phase failed", e);
|
||||
freeContext(context);
|
||||
throw e;
|
||||
|
@ -281,12 +285,14 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e);
|
||||
}
|
||||
try {
|
||||
context.indexShard().searchService().onPreQueryPhase(context);
|
||||
long time = System.nanoTime();
|
||||
queryPhase.execute(context);
|
||||
contextProcessedSuccessfully(context);
|
||||
context.indexShard().searchService().onQueryPhase(context, System.nanoTime() - time);
|
||||
return context.queryResult();
|
||||
} catch (RuntimeException e) {
|
||||
context.indexShard().searchService().onFailedQueryPhase(context);
|
||||
logger.trace("Query phase failed", e);
|
||||
freeContext(context);
|
||||
throw e;
|
||||
|
@ -300,16 +306,28 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
activeContexts.put(context.id(), context);
|
||||
contextProcessing(context);
|
||||
try {
|
||||
context.indexShard().searchService().onPreQueryPhase(context);
|
||||
long time = System.nanoTime();
|
||||
queryPhase.execute(context);
|
||||
try {
|
||||
queryPhase.execute(context);
|
||||
} catch (RuntimeException e) {
|
||||
context.indexShard().searchService().onFailedQueryPhase(context);
|
||||
throw e;
|
||||
}
|
||||
long time2 = System.nanoTime();
|
||||
context.indexShard().searchService().onQueryPhase(context, time2 - time);
|
||||
shortcutDocIdsToLoad(context);
|
||||
fetchPhase.execute(context);
|
||||
if (context.scroll() == null) {
|
||||
freeContext(context.id());
|
||||
} else {
|
||||
contextProcessedSuccessfully(context);
|
||||
context.indexShard().searchService().onPreFetchPhase(context);
|
||||
try {
|
||||
shortcutDocIdsToLoad(context);
|
||||
fetchPhase.execute(context);
|
||||
if (context.scroll() == null) {
|
||||
freeContext(context.id());
|
||||
} else {
|
||||
contextProcessedSuccessfully(context);
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
context.indexShard().searchService().onFailedFetchPhase(context);
|
||||
throw e;
|
||||
}
|
||||
context.indexShard().searchService().onFetchPhase(context, System.nanoTime() - time2);
|
||||
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
|
||||
|
@ -333,16 +351,28 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
throw new QueryPhaseExecutionException(context, "Failed to set aggregated df", e);
|
||||
}
|
||||
try {
|
||||
context.indexShard().searchService().onPreQueryPhase(context);
|
||||
long time = System.nanoTime();
|
||||
queryPhase.execute(context);
|
||||
try {
|
||||
queryPhase.execute(context);
|
||||
} catch (RuntimeException e) {
|
||||
context.indexShard().searchService().onFailedQueryPhase(context);
|
||||
throw e;
|
||||
}
|
||||
long time2 = System.nanoTime();
|
||||
context.indexShard().searchService().onQueryPhase(context, time2 - time);
|
||||
shortcutDocIdsToLoad(context);
|
||||
fetchPhase.execute(context);
|
||||
if (context.scroll() == null) {
|
||||
freeContext(request.id());
|
||||
} else {
|
||||
contextProcessedSuccessfully(context);
|
||||
context.indexShard().searchService().onPreFetchPhase(context);
|
||||
try {
|
||||
shortcutDocIdsToLoad(context);
|
||||
fetchPhase.execute(context);
|
||||
if (context.scroll() == null) {
|
||||
freeContext(request.id());
|
||||
} else {
|
||||
contextProcessedSuccessfully(context);
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
context.indexShard().searchService().onFailedFetchPhase(context);
|
||||
throw e;
|
||||
}
|
||||
context.indexShard().searchService().onFetchPhase(context, System.nanoTime() - time2);
|
||||
return new QueryFetchSearchResult(context.queryResult(), context.fetchResult());
|
||||
|
@ -360,16 +390,28 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
contextProcessing(context);
|
||||
try {
|
||||
processScroll(request, context);
|
||||
context.indexShard().searchService().onPreQueryPhase(context);
|
||||
long time = System.nanoTime();
|
||||
queryPhase.execute(context);
|
||||
try {
|
||||
queryPhase.execute(context);
|
||||
} catch (RuntimeException e) {
|
||||
context.indexShard().searchService().onFailedQueryPhase(context);
|
||||
throw e;
|
||||
}
|
||||
long time2 = System.nanoTime();
|
||||
context.indexShard().searchService().onQueryPhase(context, time2 - time);
|
||||
shortcutDocIdsToLoad(context);
|
||||
fetchPhase.execute(context);
|
||||
if (context.scroll() == null) {
|
||||
freeContext(request.id());
|
||||
} else {
|
||||
contextProcessedSuccessfully(context);
|
||||
context.indexShard().searchService().onPreFetchPhase(context);
|
||||
try {
|
||||
shortcutDocIdsToLoad(context);
|
||||
fetchPhase.execute(context);
|
||||
if (context.scroll() == null) {
|
||||
freeContext(request.id());
|
||||
} else {
|
||||
contextProcessedSuccessfully(context);
|
||||
}
|
||||
} catch (RuntimeException e) {
|
||||
context.indexShard().searchService().onFailedFetchPhase(context);
|
||||
throw e;
|
||||
}
|
||||
context.indexShard().searchService().onFetchPhase(context, System.nanoTime() - time2);
|
||||
return new ScrollQueryFetchSearchResult(new QueryFetchSearchResult(context.queryResult(), context.fetchResult()), context.shardTarget());
|
||||
|
@ -387,6 +429,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
contextProcessing(context);
|
||||
try {
|
||||
context.docIdsToLoad(request.docIds(), 0, request.docIdsSize());
|
||||
context.indexShard().searchService().onPreFetchPhase(context);
|
||||
long time = System.nanoTime();
|
||||
fetchPhase.execute(context);
|
||||
if (context.scroll() == null) {
|
||||
|
@ -397,6 +440,7 @@ public class SearchService extends AbstractLifecycleComponent<SearchService> {
|
|||
context.indexShard().searchService().onFetchPhase(context, System.nanoTime() - time);
|
||||
return context.fetchResult();
|
||||
} catch (RuntimeException e) {
|
||||
context.indexShard().searchService().onFailedFetchPhase(context);
|
||||
logger.trace("Fetch phase failed", e);
|
||||
freeContext(context);
|
||||
throw e;
|
||||
|
|
|
@ -88,6 +88,12 @@ public class SimpleIndexStatsTests extends AbstractNodesTests {
|
|||
assertThat(stats.index("test2").primaries().docs().count(), equalTo(1l));
|
||||
assertThat(stats.index("test2").total().docs().count(), equalTo(2l));
|
||||
|
||||
// make sure that number of requests in progress is 0
|
||||
assertThat(stats.index("test1").total().indexing().total().indexCurrent(), equalTo(0l));
|
||||
assertThat(stats.index("test1").total().indexing().total().deleteCurrent(), equalTo(0l));
|
||||
assertThat(stats.index("test1").total().search().total().fetchCurrent(), equalTo(0l));
|
||||
assertThat(stats.index("test1").total().search().total().queryCurrent(), equalTo(0l));
|
||||
|
||||
// check flags
|
||||
stats = client.admin().indices().prepareStats()
|
||||
.setDocs(false)
|
||||
|
@ -110,6 +116,8 @@ public class SimpleIndexStatsTests extends AbstractNodesTests {
|
|||
assertThat(stats.primaries().indexing().typeStats().get("type1").indexCount(), equalTo(1l));
|
||||
assertThat(stats.primaries().indexing().typeStats().get("type").indexCount(), equalTo(1l));
|
||||
assertThat(stats.primaries().indexing().typeStats().get("type2"), nullValue());
|
||||
assertThat(stats.primaries().indexing().typeStats().get("type1").indexCurrent(), equalTo(0l));
|
||||
assertThat(stats.primaries().indexing().typeStats().get("type1").deleteCurrent(), equalTo(0l));
|
||||
|
||||
assertThat(stats.total().get().count(), equalTo(0l));
|
||||
// check get
|
||||
|
|
Loading…
Reference in New Issue