parent
eb75a815db
commit
32a96aea71
|
@ -32,6 +32,7 @@ import org.elasticsearch.index.flush.FlushStats;
|
||||||
import org.elasticsearch.index.get.GetStats;
|
import org.elasticsearch.index.get.GetStats;
|
||||||
import org.elasticsearch.index.indexing.IndexingStats;
|
import org.elasticsearch.index.indexing.IndexingStats;
|
||||||
import org.elasticsearch.index.merge.MergeStats;
|
import org.elasticsearch.index.merge.MergeStats;
|
||||||
|
import org.elasticsearch.index.percolator.stats.PercolateStats;
|
||||||
import org.elasticsearch.index.refresh.RefreshStats;
|
import org.elasticsearch.index.refresh.RefreshStats;
|
||||||
import org.elasticsearch.index.search.stats.SearchStats;
|
import org.elasticsearch.index.search.stats.SearchStats;
|
||||||
import org.elasticsearch.index.shard.DocsStats;
|
import org.elasticsearch.index.shard.DocsStats;
|
||||||
|
@ -80,6 +81,9 @@ public class CommonStats implements Streamable, ToXContent {
|
||||||
@Nullable
|
@Nullable
|
||||||
public FieldDataStats fieldData;
|
public FieldDataStats fieldData;
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
public PercolateStats percolate;
|
||||||
|
|
||||||
public void add(CommonStats stats) {
|
public void add(CommonStats stats) {
|
||||||
if (docs == null) {
|
if (docs == null) {
|
||||||
if (stats.getDocs() != null) {
|
if (stats.getDocs() != null) {
|
||||||
|
@ -179,6 +183,14 @@ public class CommonStats implements Streamable, ToXContent {
|
||||||
} else {
|
} else {
|
||||||
fieldData.add(stats.getFieldData());
|
fieldData.add(stats.getFieldData());
|
||||||
}
|
}
|
||||||
|
if (percolate == null) {
|
||||||
|
if (stats.getPercolate() != null) {
|
||||||
|
percolate = new PercolateStats();
|
||||||
|
percolate.add(stats.getPercolate());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
percolate.add(stats.getPercolate());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
|
@ -241,6 +253,11 @@ public class CommonStats implements Streamable, ToXContent {
|
||||||
return this.fieldData;
|
return this.fieldData;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
public PercolateStats getPercolate() {
|
||||||
|
return percolate;
|
||||||
|
}
|
||||||
|
|
||||||
public static CommonStats readCommonStats(StreamInput in) throws IOException {
|
public static CommonStats readCommonStats(StreamInput in) throws IOException {
|
||||||
CommonStats stats = new CommonStats();
|
CommonStats stats = new CommonStats();
|
||||||
stats.readFrom(in);
|
stats.readFrom(in);
|
||||||
|
@ -285,6 +302,9 @@ public class CommonStats implements Streamable, ToXContent {
|
||||||
if (in.readBoolean()) {
|
if (in.readBoolean()) {
|
||||||
fieldData = FieldDataStats.readFieldDataStats(in);
|
fieldData = FieldDataStats.readFieldDataStats(in);
|
||||||
}
|
}
|
||||||
|
if (in.readBoolean()) {
|
||||||
|
percolate = PercolateStats.readPercolateStats(in);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -361,6 +381,12 @@ public class CommonStats implements Streamable, ToXContent {
|
||||||
out.writeBoolean(true);
|
out.writeBoolean(true);
|
||||||
fieldData.writeTo(out);
|
fieldData.writeTo(out);
|
||||||
}
|
}
|
||||||
|
if (percolate == null) {
|
||||||
|
out.writeBoolean(false);
|
||||||
|
} else {
|
||||||
|
out.writeBoolean(true);
|
||||||
|
percolate.writeTo(out);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// note, requires a wrapping object
|
// note, requires a wrapping object
|
||||||
|
@ -402,6 +428,9 @@ public class CommonStats implements Streamable, ToXContent {
|
||||||
if (fieldData != null) {
|
if (fieldData != null) {
|
||||||
fieldData.toXContent(builder, params);
|
fieldData.toXContent(builder, params);
|
||||||
}
|
}
|
||||||
|
if (percolate != null) {
|
||||||
|
percolate.toXContent(builder, params);
|
||||||
|
}
|
||||||
return builder;
|
return builder;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,7 @@ import java.util.EnumSet;
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public class CommonStatsFlags implements Streamable, Cloneable {
|
public class CommonStatsFlags implements Streamable, Cloneable {
|
||||||
private EnumSet<Flag> flags = EnumSet.of(Flag.Docs, Flag.Store, Flag.Indexing, Flag.Get, Flag.Search);
|
private EnumSet<Flag> flags = EnumSet.of(Flag.Docs, Flag.Store, Flag.Indexing, Flag.Get, Flag.Search, Flag.Percolate);
|
||||||
private String[] types = null;
|
private String[] types = null;
|
||||||
private String[] groups = null;
|
private String[] groups = null;
|
||||||
private String[] fieldDataFields = null;
|
private String[] fieldDataFields = null;
|
||||||
|
@ -187,7 +187,8 @@ public class CommonStatsFlags implements Streamable, Cloneable {
|
||||||
IdCache("id_cache"),
|
IdCache("id_cache"),
|
||||||
FieldData("fielddata"),
|
FieldData("fielddata"),
|
||||||
Docs("docs"),
|
Docs("docs"),
|
||||||
Warmer("warmer");
|
Warmer("warmer"),
|
||||||
|
Percolate("percolate");
|
||||||
|
|
||||||
private final String restName;
|
private final String restName;
|
||||||
|
|
||||||
|
|
|
@ -194,6 +194,15 @@ public class IndicesStatsRequest extends BroadcastOperationRequest<IndicesStatsR
|
||||||
return flags.isSet(Flag.FieldData);
|
return flags.isSet(Flag.FieldData);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public IndicesStatsRequest percolate(boolean percolate) {
|
||||||
|
flags.set(Flag.Percolate, percolate);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean percolate() {
|
||||||
|
return flags.isSet(Flag.Percolate);
|
||||||
|
}
|
||||||
|
|
||||||
public IndicesStatsRequest fieldDataFields(String... fieldDataFields) {
|
public IndicesStatsRequest fieldDataFields(String... fieldDataFields) {
|
||||||
flags.fieldDataFields(fieldDataFields);
|
flags.fieldDataFields(fieldDataFields);
|
||||||
return this;
|
return this;
|
||||||
|
|
|
@ -134,6 +134,11 @@ public class IndicesStatsRequestBuilder extends BroadcastOperationRequestBuilder
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public IndicesStatsRequestBuilder setPercolate(boolean percolate) {
|
||||||
|
request.percolate(percolate);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doExecute(ActionListener<IndicesStatsResponse> listener) {
|
protected void doExecute(ActionListener<IndicesStatsResponse> listener) {
|
||||||
((IndicesAdminClient) client).stats(request, listener);
|
((IndicesAdminClient) client).stats(request, listener);
|
||||||
|
|
|
@ -181,6 +181,9 @@ public class TransportIndicesStatsAction extends TransportBroadcastOperationActi
|
||||||
if (request.request.fieldData()) {
|
if (request.request.fieldData()) {
|
||||||
stats.stats.fieldData = indexShard.fieldDataStats(request.request.fieldDataFields());
|
stats.stats.fieldData = indexShard.fieldDataStats(request.request.fieldDataFields());
|
||||||
}
|
}
|
||||||
|
if (request.request.percolate()) {
|
||||||
|
stats.stats.percolate = indexShard.shardPercolateService().stats();
|
||||||
|
}
|
||||||
|
|
||||||
return stats;
|
return stats;
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,6 +50,7 @@ import org.elasticsearch.index.mapper.DocumentMapper;
|
||||||
import org.elasticsearch.index.mapper.MapperService;
|
import org.elasticsearch.index.mapper.MapperService;
|
||||||
import org.elasticsearch.index.mapper.ParsedDocument;
|
import org.elasticsearch.index.mapper.ParsedDocument;
|
||||||
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
|
||||||
|
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
|
||||||
import org.elasticsearch.index.service.IndexService;
|
import org.elasticsearch.index.service.IndexService;
|
||||||
import org.elasticsearch.index.shard.service.IndexShard;
|
import org.elasticsearch.index.shard.service.IndexShard;
|
||||||
import org.elasticsearch.indices.IndicesService;
|
import org.elasticsearch.indices.IndicesService;
|
||||||
|
@ -86,78 +87,85 @@ public class PercolatorService extends AbstractComponent {
|
||||||
IndexService percolateIndexService = indicesService.indexServiceSafe(request.index());
|
IndexService percolateIndexService = indicesService.indexServiceSafe(request.index());
|
||||||
IndexShard indexShard = percolateIndexService.shardSafe(request.shardId());
|
IndexShard indexShard = percolateIndexService.shardSafe(request.shardId());
|
||||||
|
|
||||||
ConcurrentMap<Text, Query> percolateQueries = indexShard.percolateRegistry().percolateQueries();
|
ShardPercolateService shardPercolateService = indexShard.shardPercolateService();
|
||||||
if (percolateQueries.isEmpty()) {
|
shardPercolateService.prePercolate();
|
||||||
return new PercolateShardResponse(StringText.EMPTY_ARRAY, request.index(), request.shardId());
|
long startTime = System.nanoTime();
|
||||||
}
|
|
||||||
|
|
||||||
Tuple<ParsedDocument, Query> parseResult = parsePercolate(percolateIndexService, request.documentType(), request.documentSource());
|
|
||||||
ParsedDocument parsedDocument = parseResult.v1();
|
|
||||||
Query query = parseResult.v2();
|
|
||||||
|
|
||||||
// first, parse the source doc into a MemoryIndex
|
|
||||||
final MemoryIndex memoryIndex = cache.get();
|
|
||||||
try {
|
try {
|
||||||
// TODO: This means percolation does not support nested docs...
|
ConcurrentMap<Text, Query> percolateQueries = indexShard.percolateRegistry().percolateQueries();
|
||||||
for (IndexableField field : parsedDocument.rootDoc().getFields()) {
|
if (percolateQueries.isEmpty()) {
|
||||||
if (!field.fieldType().indexed()) {
|
return new PercolateShardResponse(StringText.EMPTY_ARRAY, request.index(), request.shardId());
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// no need to index the UID field
|
|
||||||
if (field.name().equals(UidFieldMapper.NAME)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
TokenStream tokenStream;
|
|
||||||
try {
|
|
||||||
tokenStream = field.tokenStream(parsedDocument.analyzer());
|
|
||||||
if (tokenStream != null) {
|
|
||||||
memoryIndex.addField(field.name(), tokenStream, field.boost());
|
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new ElasticSearchException("Failed to create token stream", e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
final IndexSearcher searcher = memoryIndex.createSearcher();
|
Tuple<ParsedDocument, Query> parseResult = parsePercolate(percolateIndexService, request.documentType(), request.documentSource());
|
||||||
List<Text> matches = new ArrayList<Text>();
|
ParsedDocument parsedDocument = parseResult.v1();
|
||||||
|
Query query = parseResult.v2();
|
||||||
|
|
||||||
IndexFieldDataService fieldDataService = percolateIndexService.fieldData();
|
// first, parse the source doc into a MemoryIndex
|
||||||
IndexCache indexCache = percolateIndexService.cache();
|
final MemoryIndex memoryIndex = cache.get();
|
||||||
try {
|
try {
|
||||||
if (query == null) {
|
// TODO: This means percolation does not support nested docs...
|
||||||
Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
|
for (IndexableField field : parsedDocument.rootDoc().getFields()) {
|
||||||
for (Map.Entry<Text, Query> entry : percolateQueries.entrySet()) {
|
if (!field.fieldType().indexed()) {
|
||||||
collector.reset();
|
continue;
|
||||||
try {
|
|
||||||
searcher.search(entry.getValue(), collector);
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (collector.exists()) {
|
|
||||||
matches.add(entry.getKey());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
// no need to index the UID field
|
||||||
Engine.Searcher percolatorSearcher = indexShard.searcher();
|
if (field.name().equals(UidFieldMapper.NAME)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
TokenStream tokenStream;
|
||||||
try {
|
try {
|
||||||
percolatorSearcher.searcher().search(
|
tokenStream = field.tokenStream(parsedDocument.analyzer());
|
||||||
query, new QueryCollector(logger, percolateQueries, searcher, fieldDataService, matches)
|
if (tokenStream != null) {
|
||||||
);
|
memoryIndex.addField(field.name(), tokenStream, field.boost());
|
||||||
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
logger.warn("failed to execute", e);
|
throw new ElasticSearchException("Failed to create token stream", e);
|
||||||
} finally {
|
|
||||||
percolatorSearcher.release();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final IndexSearcher searcher = memoryIndex.createSearcher();
|
||||||
|
List<Text> matches = new ArrayList<Text>();
|
||||||
|
|
||||||
|
IndexFieldDataService fieldDataService = percolateIndexService.fieldData();
|
||||||
|
IndexCache indexCache = percolateIndexService.cache();
|
||||||
|
try {
|
||||||
|
if (query == null) {
|
||||||
|
Lucene.ExistsCollector collector = new Lucene.ExistsCollector();
|
||||||
|
for (Map.Entry<Text, Query> entry : percolateQueries.entrySet()) {
|
||||||
|
collector.reset();
|
||||||
|
try {
|
||||||
|
searcher.search(entry.getValue(), collector);
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.warn("[" + entry.getKey() + "] failed to execute query", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (collector.exists()) {
|
||||||
|
matches.add(entry.getKey());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Engine.Searcher percolatorSearcher = indexShard.searcher();
|
||||||
|
try {
|
||||||
|
percolatorSearcher.searcher().search(
|
||||||
|
query, new QueryCollector(logger, percolateQueries, searcher, fieldDataService, matches)
|
||||||
|
);
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.warn("failed to execute", e);
|
||||||
|
} finally {
|
||||||
|
percolatorSearcher.release();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
// explicitly clear the reader, since we can only register on callback on SegmentReader
|
||||||
|
indexCache.clear(searcher.getIndexReader());
|
||||||
|
fieldDataService.clear(searcher.getIndexReader());
|
||||||
|
}
|
||||||
|
return new PercolateShardResponse(matches.toArray(new Text[matches.size()]), request.index(), request.shardId());
|
||||||
} finally {
|
} finally {
|
||||||
// explicitly clear the reader, since we can only register on callback on SegmentReader
|
memoryIndex.reset();
|
||||||
indexCache.clear(searcher.getIndexReader());
|
|
||||||
fieldDataService.clear(searcher.getIndexReader());
|
|
||||||
}
|
}
|
||||||
return new PercolateShardResponse(matches.toArray(new Text[matches.size()]), request.index(), request.shardId());
|
|
||||||
} finally {
|
} finally {
|
||||||
memoryIndex.reset();
|
shardPercolateService.postPercolate(System.nanoTime() - startTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.elasticsearch.index.percolator;
|
package org.elasticsearch.index.percolator;
|
||||||
|
|
||||||
import org.elasticsearch.common.inject.AbstractModule;
|
import org.elasticsearch.common.inject.AbstractModule;
|
||||||
|
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -28,5 +29,6 @@ public class PercolatorShardModule extends AbstractModule {
|
||||||
@Override
|
@Override
|
||||||
protected void configure() {
|
protected void configure() {
|
||||||
bind(PercolatorQueriesRegistry.class).asEagerSingleton();
|
bind(PercolatorQueriesRegistry.class).asEagerSingleton();
|
||||||
|
bind(ShardPercolateService.class).asEagerSingleton();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
package org.elasticsearch.index.percolator.stats;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
|
import org.elasticsearch.common.io.stream.Streamable;
|
||||||
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
|
import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class PercolateStats implements Streamable, ToXContent {
|
||||||
|
|
||||||
|
private long percolateCount;
|
||||||
|
private long percolateTimeInMillis;
|
||||||
|
private long current;
|
||||||
|
|
||||||
|
public PercolateStats() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public PercolateStats(long percolateCount, long percolateTimeInMillis, long current) {
|
||||||
|
this.percolateCount = percolateCount;
|
||||||
|
this.percolateTimeInMillis = percolateTimeInMillis;
|
||||||
|
this.current = current;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getCount() {
|
||||||
|
return percolateCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getTimeInMillis() {
|
||||||
|
return percolateTimeInMillis;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TimeValue getTime() {
|
||||||
|
return new TimeValue(getTimeInMillis());
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getCurrent() {
|
||||||
|
return current;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||||
|
builder.startObject(Fields.PERCOLATE);
|
||||||
|
builder.field(Fields.TOTAL, percolateCount);
|
||||||
|
builder.field(Fields.TIME, getTime().toString());
|
||||||
|
builder.field(Fields.TIME_IN_MILLIS, percolateTimeInMillis);
|
||||||
|
builder.field(Fields.CURRENT, current);
|
||||||
|
builder.endObject();
|
||||||
|
return builder;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void add(PercolateStats percolate) {
|
||||||
|
if (percolate == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
percolateCount += percolate.getCount();
|
||||||
|
percolateTimeInMillis += percolate.getTimeInMillis();
|
||||||
|
current += percolate.getCurrent();
|
||||||
|
}
|
||||||
|
|
||||||
|
static final class Fields {
|
||||||
|
static final XContentBuilderString PERCOLATE = new XContentBuilderString("percolate");
|
||||||
|
static final XContentBuilderString TOTAL = new XContentBuilderString("total");
|
||||||
|
static final XContentBuilderString TIME = new XContentBuilderString("getTime");
|
||||||
|
static final XContentBuilderString TIME_IN_MILLIS = new XContentBuilderString("time_in_millis");
|
||||||
|
static final XContentBuilderString CURRENT = new XContentBuilderString("current");
|
||||||
|
}
|
||||||
|
|
||||||
|
public static PercolateStats readPercolateStats(StreamInput in) throws IOException {
|
||||||
|
PercolateStats stats = new PercolateStats();
|
||||||
|
stats.readFrom(in);
|
||||||
|
return stats;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
|
percolateCount = in.readVLong();
|
||||||
|
percolateTimeInMillis = in.readVLong();
|
||||||
|
current = in.readVLong();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
|
out.writeVLong(percolateCount);
|
||||||
|
out.writeVLong(percolateTimeInMillis);
|
||||||
|
out.writeVLong(current);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,57 @@
|
||||||
|
/*
|
||||||
|
* Licensed to ElasticSearch and Shay Banon under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. ElasticSearch licenses this
|
||||||
|
* file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.index.percolator.stats;
|
||||||
|
|
||||||
|
import org.elasticsearch.common.inject.Inject;
|
||||||
|
import org.elasticsearch.common.metrics.CounterMetric;
|
||||||
|
import org.elasticsearch.common.metrics.MeanMetric;
|
||||||
|
import org.elasticsearch.common.settings.Settings;
|
||||||
|
import org.elasticsearch.index.settings.IndexSettings;
|
||||||
|
import org.elasticsearch.index.shard.AbstractIndexShardComponent;
|
||||||
|
import org.elasticsearch.index.shard.ShardId;
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*/
|
||||||
|
public class ShardPercolateService extends AbstractIndexShardComponent {
|
||||||
|
|
||||||
|
@Inject
|
||||||
|
public ShardPercolateService(ShardId shardId, @IndexSettings Settings indexSettings) {
|
||||||
|
super(shardId, indexSettings);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final MeanMetric percolateMetric = new MeanMetric();
|
||||||
|
private final CounterMetric currentMetric = new CounterMetric();
|
||||||
|
|
||||||
|
public void prePercolate() {
|
||||||
|
currentMetric.inc();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void postPercolate(long tookInNanos) {
|
||||||
|
currentMetric.dec();
|
||||||
|
percolateMetric.inc(tookInNanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public PercolateStats stats() {
|
||||||
|
return new PercolateStats(percolateMetric.count(), TimeUnit.NANOSECONDS.toMillis(percolateMetric.sum()), currentMetric.count());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -40,6 +40,7 @@ import org.elasticsearch.index.mapper.ParsedDocument;
|
||||||
import org.elasticsearch.index.mapper.SourceToParse;
|
import org.elasticsearch.index.mapper.SourceToParse;
|
||||||
import org.elasticsearch.index.merge.MergeStats;
|
import org.elasticsearch.index.merge.MergeStats;
|
||||||
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
|
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
|
||||||
|
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
|
||||||
import org.elasticsearch.index.refresh.RefreshStats;
|
import org.elasticsearch.index.refresh.RefreshStats;
|
||||||
import org.elasticsearch.index.search.stats.SearchStats;
|
import org.elasticsearch.index.search.stats.SearchStats;
|
||||||
import org.elasticsearch.index.search.stats.ShardSearchService;
|
import org.elasticsearch.index.search.stats.ShardSearchService;
|
||||||
|
@ -97,6 +98,8 @@ public interface IndexShard extends IndexShardComponent {
|
||||||
|
|
||||||
PercolatorQueriesRegistry percolateRegistry();
|
PercolatorQueriesRegistry percolateRegistry();
|
||||||
|
|
||||||
|
ShardPercolateService shardPercolateService();
|
||||||
|
|
||||||
IndexShardState state();
|
IndexShardState state();
|
||||||
|
|
||||||
Engine.Create prepareCreate(SourceToParse source) throws ElasticSearchException;
|
Engine.Create prepareCreate(SourceToParse source) throws ElasticSearchException;
|
||||||
|
|
|
@ -57,6 +57,7 @@ import org.elasticsearch.index.mapper.*;
|
||||||
import org.elasticsearch.index.merge.MergeStats;
|
import org.elasticsearch.index.merge.MergeStats;
|
||||||
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
|
import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
|
||||||
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
|
import org.elasticsearch.index.percolator.PercolatorQueriesRegistry;
|
||||||
|
import org.elasticsearch.index.percolator.stats.ShardPercolateService;
|
||||||
import org.elasticsearch.index.query.IndexQueryParserService;
|
import org.elasticsearch.index.query.IndexQueryParserService;
|
||||||
import org.elasticsearch.index.refresh.RefreshStats;
|
import org.elasticsearch.index.refresh.RefreshStats;
|
||||||
import org.elasticsearch.index.search.nested.NonNestedDocsFilter;
|
import org.elasticsearch.index.search.nested.NonNestedDocsFilter;
|
||||||
|
@ -106,7 +107,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||||
private final ShardFilterCache shardFilterCache;
|
private final ShardFilterCache shardFilterCache;
|
||||||
private final ShardIdCache shardIdCache;
|
private final ShardIdCache shardIdCache;
|
||||||
private final ShardFieldData shardFieldData;
|
private final ShardFieldData shardFieldData;
|
||||||
private final PercolatorQueriesRegistry shardPercolator;
|
private final PercolatorQueriesRegistry percolatorQueriesRegistry;
|
||||||
|
private final ShardPercolateService shardPercolateService;
|
||||||
|
|
||||||
private final Object mutex = new Object();
|
private final Object mutex = new Object();
|
||||||
private final String checkIndexOnStartup;
|
private final String checkIndexOnStartup;
|
||||||
|
@ -131,7 +133,7 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||||
public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog,
|
public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog,
|
||||||
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService,
|
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService, ShardIndexingService indexingService, ShardGetService getService, ShardSearchService searchService, ShardIndexWarmerService shardWarmerService,
|
||||||
ShardFilterCache shardFilterCache, ShardIdCache shardIdCache, ShardFieldData shardFieldData,
|
ShardFilterCache shardFilterCache, ShardIdCache shardIdCache, ShardFieldData shardFieldData,
|
||||||
PercolatorQueriesRegistry shardPercolator) {
|
PercolatorQueriesRegistry percolatorQueriesRegistry, ShardPercolateService shardPercolateService) {
|
||||||
super(shardId, indexSettings);
|
super(shardId, indexSettings);
|
||||||
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
|
this.indicesLifecycle = (InternalIndicesLifecycle) indicesLifecycle;
|
||||||
this.indexSettingsService = indexSettingsService;
|
this.indexSettingsService = indexSettingsService;
|
||||||
|
@ -151,7 +153,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||||
this.shardFilterCache = shardFilterCache;
|
this.shardFilterCache = shardFilterCache;
|
||||||
this.shardIdCache = shardIdCache;
|
this.shardIdCache = shardIdCache;
|
||||||
this.shardFieldData = shardFieldData;
|
this.shardFieldData = shardFieldData;
|
||||||
this.shardPercolator = shardPercolator;
|
this.percolatorQueriesRegistry = percolatorQueriesRegistry;
|
||||||
|
this.shardPercolateService = shardPercolateService;
|
||||||
state = IndexShardState.CREATED;
|
state = IndexShardState.CREATED;
|
||||||
|
|
||||||
this.refreshInterval = indexSettings.getAsTime("engine.robin.refresh_interval", indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, engine.defaultRefreshInterval()));
|
this.refreshInterval = indexSettings.getAsTime("engine.robin.refresh_interval", indexSettings.getAsTime(INDEX_REFRESH_INTERVAL, engine.defaultRefreshInterval()));
|
||||||
|
@ -487,7 +490,12 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public PercolatorQueriesRegistry percolateRegistry() {
|
public PercolatorQueriesRegistry percolateRegistry() {
|
||||||
return shardPercolator;
|
return percolatorQueriesRegistry;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ShardPercolateService shardPercolateService() {
|
||||||
|
return shardPercolateService;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -56,6 +56,7 @@ import org.elasticsearch.index.indexing.IndexingStats;
|
||||||
import org.elasticsearch.index.mapper.MapperService;
|
import org.elasticsearch.index.mapper.MapperService;
|
||||||
import org.elasticsearch.index.mapper.MapperServiceModule;
|
import org.elasticsearch.index.mapper.MapperServiceModule;
|
||||||
import org.elasticsearch.index.merge.MergeStats;
|
import org.elasticsearch.index.merge.MergeStats;
|
||||||
|
import org.elasticsearch.index.percolator.stats.PercolateStats;
|
||||||
import org.elasticsearch.index.query.IndexQueryParserModule;
|
import org.elasticsearch.index.query.IndexQueryParserModule;
|
||||||
import org.elasticsearch.index.query.IndexQueryParserService;
|
import org.elasticsearch.index.query.IndexQueryParserService;
|
||||||
import org.elasticsearch.index.refresh.RefreshStats;
|
import org.elasticsearch.index.refresh.RefreshStats;
|
||||||
|
@ -246,6 +247,9 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
|
||||||
case FilterCache:
|
case FilterCache:
|
||||||
stats.filterCache = new FilterCacheStats();
|
stats.filterCache = new FilterCacheStats();
|
||||||
break;
|
break;
|
||||||
|
case Percolate:
|
||||||
|
stats.percolate = new PercolateStats();
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
throw new IllegalStateException("Unknown Flag: " + flag);
|
throw new IllegalStateException("Unknown Flag: " + flag);
|
||||||
}
|
}
|
||||||
|
@ -292,6 +296,9 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
|
||||||
case Warmer:
|
case Warmer:
|
||||||
stats.warmer.add(indexShard.warmerStats());
|
stats.warmer.add(indexShard.warmerStats());
|
||||||
break;
|
break;
|
||||||
|
case Percolate:
|
||||||
|
stats.percolate.add(indexShard.shardPercolateService().stats());
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
throw new IllegalStateException("Unknown Flag: " + flag);
|
throw new IllegalStateException("Unknown Flag: " + flag);
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.elasticsearch.index.flush.FlushStats;
|
||||||
import org.elasticsearch.index.get.GetStats;
|
import org.elasticsearch.index.get.GetStats;
|
||||||
import org.elasticsearch.index.indexing.IndexingStats;
|
import org.elasticsearch.index.indexing.IndexingStats;
|
||||||
import org.elasticsearch.index.merge.MergeStats;
|
import org.elasticsearch.index.merge.MergeStats;
|
||||||
|
import org.elasticsearch.index.percolator.stats.PercolateStats;
|
||||||
import org.elasticsearch.index.refresh.RefreshStats;
|
import org.elasticsearch.index.refresh.RefreshStats;
|
||||||
import org.elasticsearch.index.search.stats.SearchStats;
|
import org.elasticsearch.index.search.stats.SearchStats;
|
||||||
import org.elasticsearch.index.shard.DocsStats;
|
import org.elasticsearch.index.shard.DocsStats;
|
||||||
|
@ -81,6 +82,11 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
|
||||||
return stats.getSearch();
|
return stats.getSearch();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Nullable
|
||||||
|
public PercolateStats getPercolate() {
|
||||||
|
return stats.getPercolate();
|
||||||
|
}
|
||||||
|
|
||||||
@Nullable
|
@Nullable
|
||||||
public MergeStats getMerge() {
|
public MergeStats getMerge() {
|
||||||
return stats.getMerge();
|
return stats.getMerge();
|
||||||
|
|
|
@ -88,6 +88,9 @@ public class RestIndicesStatsAction extends BaseRestHandler {
|
||||||
controller.registerHandler(GET, "/{index}/_stats/fielddata", new RestFieldDataStatsHandler());
|
controller.registerHandler(GET, "/{index}/_stats/fielddata", new RestFieldDataStatsHandler());
|
||||||
controller.registerHandler(GET, "/_stats/fielddata/{fields}", new RestFieldDataStatsHandler());
|
controller.registerHandler(GET, "/_stats/fielddata/{fields}", new RestFieldDataStatsHandler());
|
||||||
controller.registerHandler(GET, "/{index}/_stats/fielddata/{fields}", new RestFieldDataStatsHandler());
|
controller.registerHandler(GET, "/{index}/_stats/fielddata/{fields}", new RestFieldDataStatsHandler());
|
||||||
|
|
||||||
|
controller.registerHandler(GET, "/_stats/percolate", new RestPercolateStatsHandler());
|
||||||
|
controller.registerHandler(GET, "/{index}/_stats/percolate", new RestPercolateStatsHandler());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -123,6 +126,7 @@ public class RestIndicesStatsAction extends BaseRestHandler {
|
||||||
indicesStatsRequest.idCache(request.paramAsBoolean("id_cache", indicesStatsRequest.idCache()));
|
indicesStatsRequest.idCache(request.paramAsBoolean("id_cache", indicesStatsRequest.idCache()));
|
||||||
indicesStatsRequest.fieldData(request.paramAsBoolean("fielddata", indicesStatsRequest.fieldData()));
|
indicesStatsRequest.fieldData(request.paramAsBoolean("fielddata", indicesStatsRequest.fieldData()));
|
||||||
indicesStatsRequest.fieldDataFields(request.paramAsStringArray("fields", null));
|
indicesStatsRequest.fieldDataFields(request.paramAsStringArray("fields", null));
|
||||||
|
indicesStatsRequest.percolate(request.paramAsBoolean("percolate", indicesStatsRequest.percolate()));
|
||||||
|
|
||||||
client.admin().indices().stats(indicesStatsRequest, new ActionListener<IndicesStatsResponse>() {
|
client.admin().indices().stats(indicesStatsRequest, new ActionListener<IndicesStatsResponse>() {
|
||||||
@Override
|
@Override
|
||||||
|
@ -618,4 +622,43 @@ public class RestIndicesStatsAction extends BaseRestHandler {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class RestPercolateStatsHandler implements RestHandler {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void handleRequest(final RestRequest request, final RestChannel channel) {
|
||||||
|
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
|
||||||
|
indicesStatsRequest.listenerThreaded(false);
|
||||||
|
indicesStatsRequest.clear().percolate(true);
|
||||||
|
indicesStatsRequest.indices(splitIndices(request.param("index")));
|
||||||
|
indicesStatsRequest.types(splitTypes(request.param("types")));
|
||||||
|
|
||||||
|
client.admin().indices().stats(indicesStatsRequest, new ActionListener<IndicesStatsResponse>() {
|
||||||
|
@Override
|
||||||
|
public void onResponse(IndicesStatsResponse response) {
|
||||||
|
try {
|
||||||
|
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
|
||||||
|
builder.startObject();
|
||||||
|
builder.field("ok", true);
|
||||||
|
buildBroadcastShardsHeader(builder, response);
|
||||||
|
response.toXContent(builder, request);
|
||||||
|
builder.endObject();
|
||||||
|
channel.sendResponse(new XContentRestResponse(request, OK, builder));
|
||||||
|
} catch (Throwable e) {
|
||||||
|
onFailure(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onFailure(Throwable e) {
|
||||||
|
try {
|
||||||
|
channel.sendResponse(new XContentThrowableRestResponse(request, e));
|
||||||
|
} catch (IOException e1) {
|
||||||
|
logger.error("Failed to send failure response", e1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,9 @@
|
||||||
|
|
||||||
package org.elasticsearch.test.integration.percolator;
|
package org.elasticsearch.test.integration.percolator;
|
||||||
|
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
|
||||||
|
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
|
||||||
|
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
|
||||||
import org.elasticsearch.action.count.CountResponse;
|
import org.elasticsearch.action.count.CountResponse;
|
||||||
import org.elasticsearch.action.percolate.PercolateResponse;
|
import org.elasticsearch.action.percolate.PercolateResponse;
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
|
@ -491,6 +494,62 @@ public class SimplePercolatorTests extends AbstractSharedClusterTest {
|
||||||
assertThat(convertFromTextArray(percolate.getMatches()), arrayContaining("kuku"));
|
assertThat(convertFromTextArray(percolate.getMatches()), arrayContaining("kuku"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPercolateStatistics() throws Exception {
|
||||||
|
client().admin().indices().prepareCreate("test").execute().actionGet();
|
||||||
|
ensureGreen();
|
||||||
|
|
||||||
|
logger.info("--> register a query");
|
||||||
|
client().prepareIndex("test", "_percolator", "1")
|
||||||
|
.setSource(jsonBuilder().startObject().field("query", matchAllQuery()).endObject())
|
||||||
|
.execute().actionGet();
|
||||||
|
client().admin().indices().prepareRefresh("test").execute().actionGet();
|
||||||
|
|
||||||
|
logger.info("--> First percolate request");
|
||||||
|
PercolateResponse response = client().preparePercolate("test", "type")
|
||||||
|
.setSource(jsonBuilder().startObject().startObject("doc").field("field", "val").endObject().endObject())
|
||||||
|
.execute().actionGet();
|
||||||
|
assertThat(response.getMatches(), arrayWithSize(1));
|
||||||
|
assertThat(convertFromTextArray(response.getMatches()), arrayContaining("1"));
|
||||||
|
|
||||||
|
IndicesStatsResponse indicesResponse = client().admin().indices().prepareStats("test").execute().actionGet();
|
||||||
|
assertThat(indicesResponse.getTotal().getPercolate().getCount(), equalTo(5l)); // We have 5 partitions
|
||||||
|
assertThat(indicesResponse.getTotal().getPercolate().getTimeInMillis(), greaterThan(0l));
|
||||||
|
assertThat(indicesResponse.getTotal().getPercolate().getCurrent(), equalTo(0l));
|
||||||
|
|
||||||
|
NodesStatsResponse nodesResponse = client().admin().cluster().prepareNodesStats().execute().actionGet();
|
||||||
|
long percolateCount = 0;
|
||||||
|
long percolateSumTime = 0;
|
||||||
|
for (NodeStats nodeStats : nodesResponse) {
|
||||||
|
percolateCount += nodeStats.getIndices().getPercolate().getCount();
|
||||||
|
percolateSumTime += nodeStats.getIndices().getPercolate().getTimeInMillis();
|
||||||
|
}
|
||||||
|
assertThat(percolateCount, equalTo(5l)); // We have 5 partitions
|
||||||
|
assertThat(percolateSumTime, greaterThan(0l));
|
||||||
|
|
||||||
|
logger.info("--> Second percolate request");
|
||||||
|
response = client().preparePercolate("test", "type")
|
||||||
|
.setSource(jsonBuilder().startObject().startObject("doc").field("field", "val").endObject().endObject())
|
||||||
|
.execute().actionGet();
|
||||||
|
assertThat(response.getMatches(), arrayWithSize(1));
|
||||||
|
assertThat(convertFromTextArray(response.getMatches()), arrayContaining("1"));
|
||||||
|
|
||||||
|
indicesResponse = client().admin().indices().prepareStats().setPercolate(true).execute().actionGet();
|
||||||
|
assertThat(indicesResponse.getTotal().getPercolate().getCount(), equalTo(10l));
|
||||||
|
assertThat(indicesResponse.getTotal().getPercolate().getTimeInMillis(), greaterThan(0l));
|
||||||
|
assertThat(indicesResponse.getTotal().getPercolate().getCurrent(), equalTo(0l));
|
||||||
|
|
||||||
|
nodesResponse = client().admin().cluster().prepareNodesStats().execute().actionGet();
|
||||||
|
percolateCount = 0;
|
||||||
|
percolateSumTime = 0;
|
||||||
|
for (NodeStats nodeStats : nodesResponse) {
|
||||||
|
percolateCount += nodeStats.getIndices().getPercolate().getCount();
|
||||||
|
percolateSumTime += nodeStats.getIndices().getPercolate().getTimeInMillis();
|
||||||
|
}
|
||||||
|
assertThat(percolateCount, equalTo(10l));
|
||||||
|
assertThat(percolateSumTime, greaterThan(0l));
|
||||||
|
}
|
||||||
|
|
||||||
public static String[] convertFromTextArray(Text[] texts) {
|
public static String[] convertFromTextArray(Text[] texts) {
|
||||||
if (texts.length == 0) {
|
if (texts.length == 0) {
|
||||||
return Strings.EMPTY_ARRAY;
|
return Strings.EMPTY_ARRAY;
|
||||||
|
|
Loading…
Reference in New Issue