diff --git a/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java b/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java index a87da496f85..cd67a02941d 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStats.java @@ -32,6 +32,7 @@ import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.get.GetStats; import org.elasticsearch.index.indexing.IndexingStats; import org.elasticsearch.index.merge.MergeStats; +import org.elasticsearch.index.percolator.stats.PercolateStats; import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.shard.DocsStats; @@ -80,6 +81,9 @@ public class CommonStats implements Streamable, ToXContent { @Nullable public FieldDataStats fieldData; + @Nullable + public PercolateStats percolate; + public void add(CommonStats stats) { if (docs == null) { if (stats.getDocs() != null) { @@ -179,6 +183,14 @@ public class CommonStats implements Streamable, ToXContent { } else { fieldData.add(stats.getFieldData()); } + if (percolate == null) { + if (stats.getPercolate() != null) { + percolate = new PercolateStats(); + percolate.add(stats.getPercolate()); + } + } else { + percolate.add(stats.getPercolate()); + } } @Nullable @@ -241,6 +253,11 @@ public class CommonStats implements Streamable, ToXContent { return this.fieldData; } + @Nullable + public PercolateStats getPercolate() { + return percolate; + } + public static CommonStats readCommonStats(StreamInput in) throws IOException { CommonStats stats = new CommonStats(); stats.readFrom(in); @@ -285,6 +302,9 @@ public class CommonStats implements Streamable, ToXContent { if (in.readBoolean()) { fieldData = FieldDataStats.readFieldDataStats(in); } + if (in.readBoolean()) { + percolate = PercolateStats.readPercolateStats(in); + } } @Override @@ -361,6 +381,12 @@ public class CommonStats implements Streamable, ToXContent { out.writeBoolean(true); fieldData.writeTo(out); } + if (percolate == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + percolate.writeTo(out); + } } // note, requires a wrapping object @@ -402,6 +428,9 @@ public class CommonStats implements Streamable, ToXContent { if (fieldData != null) { fieldData.toXContent(builder, params); } + if (percolate != null) { + percolate.toXContent(builder, params); + } return builder; } } diff --git a/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java b/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java index 1f1f75881ea..56043289712 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/stats/CommonStatsFlags.java @@ -29,7 +29,7 @@ import java.util.EnumSet; /** */ public class CommonStatsFlags implements Streamable, Cloneable { - private EnumSet flags = EnumSet.of(Flag.Docs, Flag.Store, Flag.Indexing, Flag.Get, Flag.Search); + private EnumSet flags = EnumSet.of(Flag.Docs, Flag.Store, Flag.Indexing, Flag.Get, Flag.Search, Flag.Percolate); private String[] types = null; private String[] groups = null; private String[] fieldDataFields = null; @@ -187,7 +187,8 @@ public class CommonStatsFlags implements Streamable, Cloneable { IdCache("id_cache"), FieldData("fielddata"), Docs("docs"), - Warmer("warmer"); + Warmer("warmer"), + Percolate("percolate"); private final String restName; diff --git a/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java b/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java index 28f315a39c2..18488f43ff5 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsRequest.java @@ -194,6 +194,15 @@ public class IndicesStatsRequest extends BroadcastOperationRequest listener) { ((IndicesAdminClient) client).stats(request, listener); diff --git a/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java b/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java index eba11cdf069..82c9278cabd 100644 --- a/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java +++ b/src/main/java/org/elasticsearch/action/admin/indices/stats/TransportIndicesStatsAction.java @@ -181,6 +181,9 @@ public class TransportIndicesStatsAction extends TransportBroadcastOperationActi if (request.request.fieldData()) { stats.stats.fieldData = indexShard.fieldDataStats(request.request.fieldDataFields()); } + if (request.request.percolate()) { + stats.stats.percolate = indexShard.shardPercolateService().stats(); + } return stats; } diff --git a/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java b/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java index 59515e3158d..72c0a57b4de 100644 --- a/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java +++ b/src/main/java/org/elasticsearch/index/percolator/PercolatorService.java @@ -50,6 +50,7 @@ import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.internal.UidFieldMapper; +import org.elasticsearch.index.percolator.stats.ShardPercolateService; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.indices.IndicesService; @@ -86,78 +87,85 @@ public class PercolatorService extends AbstractComponent { IndexService percolateIndexService = indicesService.indexServiceSafe(request.index()); IndexShard indexShard = percolateIndexService.shardSafe(request.shardId()); - ConcurrentMap percolateQueries = indexShard.percolateRegistry().percolateQueries(); - if (percolateQueries.isEmpty()) { - return new PercolateShardResponse(StringText.EMPTY_ARRAY, request.index(), request.shardId()); - } - - Tuple parseResult = parsePercolate(percolateIndexService, request.documentType(), request.documentSource()); - ParsedDocument parsedDocument = parseResult.v1(); - Query query = parseResult.v2(); - - // first, parse the source doc into a MemoryIndex - final MemoryIndex memoryIndex = cache.get(); + ShardPercolateService shardPercolateService = indexShard.shardPercolateService(); + shardPercolateService.prePercolate(); + long startTime = System.nanoTime(); try { - // TODO: This means percolation does not support nested docs... - for (IndexableField field : parsedDocument.rootDoc().getFields()) { - if (!field.fieldType().indexed()) { - continue; - } - // no need to index the UID field - if (field.name().equals(UidFieldMapper.NAME)) { - continue; - } - TokenStream tokenStream; - try { - tokenStream = field.tokenStream(parsedDocument.analyzer()); - if (tokenStream != null) { - memoryIndex.addField(field.name(), tokenStream, field.boost()); - } - } catch (IOException e) { - throw new ElasticSearchException("Failed to create token stream", e); - } + ConcurrentMap percolateQueries = indexShard.percolateRegistry().percolateQueries(); + if (percolateQueries.isEmpty()) { + return new PercolateShardResponse(StringText.EMPTY_ARRAY, request.index(), request.shardId()); } - final IndexSearcher searcher = memoryIndex.createSearcher(); - List matches = new ArrayList(); + Tuple parseResult = parsePercolate(percolateIndexService, request.documentType(), request.documentSource()); + ParsedDocument parsedDocument = parseResult.v1(); + Query query = parseResult.v2(); - IndexFieldDataService fieldDataService = percolateIndexService.fieldData(); - IndexCache indexCache = percolateIndexService.cache(); + // first, parse the source doc into a MemoryIndex + final MemoryIndex memoryIndex = cache.get(); try { - if (query == null) { - Lucene.ExistsCollector collector = new Lucene.ExistsCollector(); - for (Map.Entry entry : percolateQueries.entrySet()) { - collector.reset(); - try { - searcher.search(entry.getValue(), collector); - } catch (IOException e) { - logger.warn("[" + entry.getKey() + "] failed to execute query", e); - } - - if (collector.exists()) { - matches.add(entry.getKey()); - } + // TODO: This means percolation does not support nested docs... + for (IndexableField field : parsedDocument.rootDoc().getFields()) { + if (!field.fieldType().indexed()) { + continue; } - } else { - Engine.Searcher percolatorSearcher = indexShard.searcher(); + // no need to index the UID field + if (field.name().equals(UidFieldMapper.NAME)) { + continue; + } + TokenStream tokenStream; try { - percolatorSearcher.searcher().search( - query, new QueryCollector(logger, percolateQueries, searcher, fieldDataService, matches) - ); + tokenStream = field.tokenStream(parsedDocument.analyzer()); + if (tokenStream != null) { + memoryIndex.addField(field.name(), tokenStream, field.boost()); + } } catch (IOException e) { - logger.warn("failed to execute", e); - } finally { - percolatorSearcher.release(); + throw new ElasticSearchException("Failed to create token stream", e); } } + + final IndexSearcher searcher = memoryIndex.createSearcher(); + List matches = new ArrayList(); + + IndexFieldDataService fieldDataService = percolateIndexService.fieldData(); + IndexCache indexCache = percolateIndexService.cache(); + try { + if (query == null) { + Lucene.ExistsCollector collector = new Lucene.ExistsCollector(); + for (Map.Entry entry : percolateQueries.entrySet()) { + collector.reset(); + try { + searcher.search(entry.getValue(), collector); + } catch (IOException e) { + logger.warn("[" + entry.getKey() + "] failed to execute query", e); + } + + if (collector.exists()) { + matches.add(entry.getKey()); + } + } + } else { + Engine.Searcher percolatorSearcher = indexShard.searcher(); + try { + percolatorSearcher.searcher().search( + query, new QueryCollector(logger, percolateQueries, searcher, fieldDataService, matches) + ); + } catch (IOException e) { + logger.warn("failed to execute", e); + } finally { + percolatorSearcher.release(); + } + } + } finally { + // explicitly clear the reader, since we can only register on callback on SegmentReader + indexCache.clear(searcher.getIndexReader()); + fieldDataService.clear(searcher.getIndexReader()); + } + return new PercolateShardResponse(matches.toArray(new Text[matches.size()]), request.index(), request.shardId()); } finally { - // explicitly clear the reader, since we can only register on callback on SegmentReader - indexCache.clear(searcher.getIndexReader()); - fieldDataService.clear(searcher.getIndexReader()); + memoryIndex.reset(); } - return new PercolateShardResponse(matches.toArray(new Text[matches.size()]), request.index(), request.shardId()); } finally { - memoryIndex.reset(); + shardPercolateService.postPercolate(System.nanoTime() - startTime); } } diff --git a/src/main/java/org/elasticsearch/index/percolator/PercolatorShardModule.java b/src/main/java/org/elasticsearch/index/percolator/PercolatorShardModule.java index 6f260258a81..b8cb5c1f523 100644 --- a/src/main/java/org/elasticsearch/index/percolator/PercolatorShardModule.java +++ b/src/main/java/org/elasticsearch/index/percolator/PercolatorShardModule.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.percolator; import org.elasticsearch.common.inject.AbstractModule; +import org.elasticsearch.index.percolator.stats.ShardPercolateService; /** * @@ -28,5 +29,6 @@ public class PercolatorShardModule extends AbstractModule { @Override protected void configure() { bind(PercolatorQueriesRegistry.class).asEagerSingleton(); + bind(ShardPercolateService.class).asEagerSingleton(); } } diff --git a/src/main/java/org/elasticsearch/index/percolator/stats/PercolateStats.java b/src/main/java/org/elasticsearch/index/percolator/stats/PercolateStats.java new file mode 100644 index 00000000000..13cf7410549 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/percolator/stats/PercolateStats.java @@ -0,0 +1,94 @@ +package org.elasticsearch.index.percolator.stats; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; + +import java.io.IOException; + +/** + */ +public class PercolateStats implements Streamable, ToXContent { + + private long percolateCount; + private long percolateTimeInMillis; + private long current; + + public PercolateStats() { + } + + public PercolateStats(long percolateCount, long percolateTimeInMillis, long current) { + this.percolateCount = percolateCount; + this.percolateTimeInMillis = percolateTimeInMillis; + this.current = current; + } + + public long getCount() { + return percolateCount; + } + + public long getTimeInMillis() { + return percolateTimeInMillis; + } + + public TimeValue getTime() { + return new TimeValue(getTimeInMillis()); + } + + public long getCurrent() { + return current; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.PERCOLATE); + builder.field(Fields.TOTAL, percolateCount); + builder.field(Fields.TIME, getTime().toString()); + builder.field(Fields.TIME_IN_MILLIS, percolateTimeInMillis); + builder.field(Fields.CURRENT, current); + builder.endObject(); + return builder; + } + + public void add(PercolateStats percolate) { + if (percolate == null) { + return; + } + + percolateCount += percolate.getCount(); + percolateTimeInMillis += percolate.getTimeInMillis(); + current += percolate.getCurrent(); + } + + static final class Fields { + static final XContentBuilderString PERCOLATE = new XContentBuilderString("percolate"); + static final XContentBuilderString TOTAL = new XContentBuilderString("total"); + static final XContentBuilderString TIME = new XContentBuilderString("getTime"); + static final XContentBuilderString TIME_IN_MILLIS = new XContentBuilderString("time_in_millis"); + static final XContentBuilderString CURRENT = new XContentBuilderString("current"); + } + + public static PercolateStats readPercolateStats(StreamInput in) throws IOException { + PercolateStats stats = new PercolateStats(); + stats.readFrom(in); + return stats; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + percolateCount = in.readVLong(); + percolateTimeInMillis = in.readVLong(); + current = in.readVLong(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(percolateCount); + out.writeVLong(percolateTimeInMillis); + out.writeVLong(current); + } +} diff --git a/src/main/java/org/elasticsearch/index/percolator/stats/ShardPercolateService.java b/src/main/java/org/elasticsearch/index/percolator/stats/ShardPercolateService.java new file mode 100644 index 00000000000..79131ea3617 --- /dev/null +++ b/src/main/java/org/elasticsearch/index/percolator/stats/ShardPercolateService.java @@ -0,0 +1,57 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.percolator.stats; + +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.metrics.CounterMetric; +import org.elasticsearch.common.metrics.MeanMetric; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.settings.IndexSettings; +import org.elasticsearch.index.shard.AbstractIndexShardComponent; +import org.elasticsearch.index.shard.ShardId; + +import java.util.concurrent.TimeUnit; + +/** + */ +public class ShardPercolateService extends AbstractIndexShardComponent { + + @Inject + public ShardPercolateService(ShardId shardId, @IndexSettings Settings indexSettings) { + super(shardId, indexSettings); + } + + private final MeanMetric percolateMetric = new MeanMetric(); + private final CounterMetric currentMetric = new CounterMetric(); + + public void prePercolate() { + currentMetric.inc(); + } + + public void postPercolate(long tookInNanos) { + currentMetric.dec(); + percolateMetric.inc(tookInNanos); + } + + public PercolateStats stats() { + return new PercolateStats(percolateMetric.count(), TimeUnit.NANOSECONDS.toMillis(percolateMetric.sum()), currentMetric.count()); + } + +} diff --git a/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java index 0f55a454968..de223670118 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java @@ -40,6 +40,7 @@ import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.percolator.PercolatorQueriesRegistry; +import org.elasticsearch.index.percolator.stats.ShardPercolateService; import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.stats.SearchStats; import org.elasticsearch.index.search.stats.ShardSearchService; @@ -97,6 +98,8 @@ public interface IndexShard extends IndexShardComponent { PercolatorQueriesRegistry percolateRegistry(); + ShardPercolateService shardPercolateService(); + IndexShardState state(); Engine.Create prepareCreate(SourceToParse source) throws ElasticSearchException; diff --git a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index e4c962fc51f..4cec1be2126 100644 --- a/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -57,6 +57,7 @@ import org.elasticsearch.index.mapper.*; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider; import org.elasticsearch.index.percolator.PercolatorQueriesRegistry; +import org.elasticsearch.index.percolator.stats.ShardPercolateService; import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.search.nested.NonNestedDocsFilter; @@ -106,7 +107,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I private final ShardFilterCache shardFilterCache; private final ShardIdCache shardIdCache; private final ShardFieldData shardFieldData; - private final PercolatorQueriesRegistry shardPercolator; + private final PercolatorQueriesRegistry percolatorQueriesRegistry; + private final ShardPercolateService shardPercolateService; private final Object mutex = new Object(); private final String checkIndexOnStartup; @@ -131,7 +133,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService, ShardFilterCache shardFilterCache, ShardIdCache shardIdCache, ShardFieldData shardFieldData, - PercolatorQueriesRegistry shardPercolator) { + PercolatorQueriesRegistry percolatorQueriesRegistry, ShardPercolateService shardPercolateService) { super(shardId, indexSettings); this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle; this.indexSettingsService = indexSettingsService; @@ -151,7 +153,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I this.shardFilterCache = shardFilterCache; this.shardIdCache = shardIdCache; this.shardFieldData = shardFieldData; - this.shardPercolator = shardPercolator; + this.percolatorQueriesRegistry = percolatorQueriesRegistry; + this.shardPercolateService = shardPercolateService; state = IndexShardState.CREATED; this.refreshInterval = indexSettings.getAsTime("engine.robin.refresh_interval", indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, engine.defaultRefreshInterval())); @@ -487,7 +490,12 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I @Override public PercolatorQueriesRegistry percolateRegistry() { - return shardPercolator; + return percolatorQueriesRegistry; + } + + @Override + public ShardPercolateService shardPercolateService() { + return shardPercolateService; } @Override diff --git a/src/main/java/org/elasticsearch/indices/InternalIndicesService.java b/src/main/java/org/elasticsearch/indices/InternalIndicesService.java index 3632d0f079f..18956c1472e 100644 --- a/src/main/java/org/elasticsearch/indices/InternalIndicesService.java +++ b/src/main/java/org/elasticsearch/indices/InternalIndicesService.java @@ -56,6 +56,7 @@ import org.elasticsearch.index.indexing.IndexingStats; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperServiceModule; import org.elasticsearch.index.merge.MergeStats; +import org.elasticsearch.index.percolator.stats.PercolateStats; import org.elasticsearch.index.query.IndexQueryParserModule; import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.refresh.RefreshStats; @@ -246,6 +247,9 @@ public class InternalIndicesService extends AbstractLifecycleComponent() { @Override @@ -618,4 +622,43 @@ public class RestIndicesStatsAction extends BaseRestHandler { }); } } + + class RestPercolateStatsHandler implements RestHandler { + + @Override + public void handleRequest(final RestRequest request, final RestChannel channel) { + IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest(); + indicesStatsRequest.listenerThreaded(false); + indicesStatsRequest.clear().percolate(true); + indicesStatsRequest.indices(splitIndices(request.param("index"))); + indicesStatsRequest.types(splitTypes(request.param("types"))); + + client.admin().indices().stats(indicesStatsRequest, new ActionListener() { + @Override + public void onResponse(IndicesStatsResponse response) { + try { + XContentBuilder builder = RestXContentBuilder.restContentBuilder(request); + builder.startObject(); + builder.field("ok", true); + buildBroadcastShardsHeader(builder, response); + response.toXContent(builder, request); + builder.endObject(); + channel.sendResponse(new XContentRestResponse(request, OK, builder)); + } catch (Throwable e) { + onFailure(e); + } + } + + @Override + public void onFailure(Throwable e) { + try { + channel.sendResponse(new XContentThrowableRestResponse(request, e)); + } catch (IOException e1) { + logger.error("Failed to send failure response", e1); + } + } + }); + } + } + } diff --git a/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java b/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java index fae40009f64..a12097c6308 100644 --- a/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java +++ b/src/test/java/org/elasticsearch/test/integration/percolator/SimplePercolatorTests.java @@ -19,6 +19,9 @@ package org.elasticsearch.test.integration.percolator; +import org.elasticsearch.action.admin.cluster.node.stats.NodeStats; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.count.CountResponse; import org.elasticsearch.action.percolate.PercolateResponse; import org.elasticsearch.action.search.SearchResponse; @@ -491,6 +494,62 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest { assertThat(convertFromTextArray(percolate.getMatches()), arrayContaining("kuku")); } + @Test + public void testPercolateStatistics() throws Exception { + client().admin().indices().prepareCreate("test").execute().actionGet(); + ensureGreen(); + + logger.info("--> register a query"); + client().prepareIndex("test", "_percolator", "1") + .setSource(jsonBuilder().startObject().field("query", matchAllQuery()).endObject()) + .execute().actionGet(); + client().admin().indices().prepareRefresh("test").execute().actionGet(); + + logger.info("--> First percolate request"); + PercolateResponse response = client().preparePercolate("test", "type") + .setSource(jsonBuilder().startObject().startObject("doc").field("field", "val").endObject().endObject()) + .execute().actionGet(); + assertThat(response.getMatches(), arrayWithSize(1)); + assertThat(convertFromTextArray(response.getMatches()), arrayContaining("1")); + + IndicesStatsResponse indicesResponse = client().admin().indices().prepareStats("test").execute().actionGet(); + assertThat(indicesResponse.getTotal().getPercolate().getCount(), equalTo(5l)); // We have 5 partitions + assertThat(indicesResponse.getTotal().getPercolate().getTimeInMillis(), greaterThan(0l)); + assertThat(indicesResponse.getTotal().getPercolate().getCurrent(), equalTo(0l)); + + NodesStatsResponse nodesResponse = client().admin().cluster().prepareNodesStats().execute().actionGet(); + long percolateCount = 0; + long percolateSumTime = 0; + for (NodeStats nodeStats : nodesResponse) { + percolateCount += nodeStats.getIndices().getPercolate().getCount(); + percolateSumTime += nodeStats.getIndices().getPercolate().getTimeInMillis(); + } + assertThat(percolateCount, equalTo(5l)); // We have 5 partitions + assertThat(percolateSumTime, greaterThan(0l)); + + logger.info("--> Second percolate request"); + response = client().preparePercolate("test", "type") + .setSource(jsonBuilder().startObject().startObject("doc").field("field", "val").endObject().endObject()) + .execute().actionGet(); + assertThat(response.getMatches(), arrayWithSize(1)); + assertThat(convertFromTextArray(response.getMatches()), arrayContaining("1")); + + indicesResponse = client().admin().indices().prepareStats().setPercolate(true).execute().actionGet(); + assertThat(indicesResponse.getTotal().getPercolate().getCount(), equalTo(10l)); + assertThat(indicesResponse.getTotal().getPercolate().getTimeInMillis(), greaterThan(0l)); + assertThat(indicesResponse.getTotal().getPercolate().getCurrent(), equalTo(0l)); + + nodesResponse = client().admin().cluster().prepareNodesStats().execute().actionGet(); + percolateCount = 0; + percolateSumTime = 0; + for (NodeStats nodeStats : nodesResponse) { + percolateCount += nodeStats.getIndices().getPercolate().getCount(); + percolateSumTime += nodeStats.getIndices().getPercolate().getTimeInMillis(); + } + assertThat(percolateCount, equalTo(10l)); + assertThat(percolateSumTime, greaterThan(0l)); + } + public static String[] convertFromTextArray(Text[] texts) { if (texts.length == 0) { return Strings.EMPTY_ARRAY;