Index Stats: Add support for segments stats

closes #4101
This commit is contained in:
Shay Banon 2013-11-06 00:06:16 +01:00 committed by Boaz Leskes
parent e1b6988886
commit c95c7096e5
14 changed files with 271 additions and 7 deletions

View File

@ -28,6 +28,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.cache.filter.FilterCacheStats;
import org.elasticsearch.index.cache.id.IdCacheStats;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
@ -96,6 +97,9 @@ public class CommonStats implements Streamable, ToXContent {
case Completion:
completion = new CompletionStats();
break;
case Segments:
segments = new SegmentsStats();
break;
case Percolate:
percolate = new PercolateStats();
break;
@ -150,6 +154,9 @@ public class CommonStats implements Streamable, ToXContent {
case Completion:
completion = indexShard.completionStats(flags.completionDataFields());
break;
case Segments:
segments = indexShard.segmentStats();
break;
case Percolate:
percolate = indexShard.shardPercolateService().stats();
break;
@ -201,6 +208,9 @@ public class CommonStats implements Streamable, ToXContent {
@Nullable
public CompletionStats completion;
@Nullable
public SegmentsStats segments;
public void add(CommonStats stats) {
if (docs == null) {
if (stats.getDocs() != null) {
@ -316,6 +326,14 @@ public class CommonStats implements Streamable, ToXContent {
} else {
completion.add(stats.getCompletion());
}
if (segments == null) {
if (stats.getSegments() != null) {
segments = new SegmentsStats();
segments.add(stats.getSegments());
}
} else {
segments.add(stats.getSegments());
}
}
@Nullable
@ -388,6 +406,11 @@ public class CommonStats implements Streamable, ToXContent {
return completion;
}
@Nullable
public SegmentsStats getSegments() {
return segments;
}
public static CommonStats readCommonStats(StreamInput in) throws IOException {
CommonStats stats = new CommonStats();
stats.readFrom(in);
@ -440,6 +463,11 @@ public class CommonStats implements Streamable, ToXContent {
completion = CompletionStats.readCompletionStats(in);
}
}
if (in.getVersion().after(Version.V_0_90_6)) {
if (in.readBoolean()) {
segments = SegmentsStats.readSegmentsStats(in);
}
}
}
@Override
@ -530,6 +558,14 @@ public class CommonStats implements Streamable, ToXContent {
completion.writeTo(out);
}
}
if (out.getVersion().after(Version.V_0_90_6)) {
if (segments == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
segments.writeTo(out);
}
}
}
// note, requires a wrapping object
@ -577,6 +613,9 @@ public class CommonStats implements Streamable, ToXContent {
if (completion != null) {
completion.toXContent(builder, params);
}
if (segments != null) {
segments.toXContent(builder, params);
}
return builder;
}
}

View File

@ -211,7 +211,8 @@ public class CommonStatsFlags implements Streamable, Cloneable {
Docs("docs"),
Warmer("warmer"),
Percolate("percolate"),
Completion("completion");
Completion("completion"),
Segments("segments");
private final String restName;

View File

@ -203,6 +203,15 @@ public class IndicesStatsRequest extends BroadcastOperationRequest<IndicesStatsR
return flags.isSet(Flag.Percolate);
}
public IndicesStatsRequest segments(boolean segments) {
flags.set(Flag.Segments, segments);
return this;
}
public boolean segments() {
return flags.isSet(Flag.Segments);
}
public IndicesStatsRequest fieldDataFields(String... fieldDataFields) {
flags.fieldDataFields(fieldDataFields);
return this;
@ -221,7 +230,7 @@ public class IndicesStatsRequest extends BroadcastOperationRequest<IndicesStatsR
return flags.isSet(Flag.Completion);
}
public IndicesStatsRequest completionFields(String ... completionDataFields) {
public IndicesStatsRequest completionFields(String... completionDataFields) {
flags.completionDataFields(completionDataFields);
return this;
}

View File

@ -139,6 +139,11 @@ public class IndicesStatsRequestBuilder extends BroadcastOperationRequestBuilder
return this;
}
public IndicesStatsRequestBuilder setSegments(boolean segments) {
request.segments(segments);
return this;
}
public IndicesStatsRequestBuilder setCompletion(boolean completion) {
request.completion(completion);
return this;

View File

@ -183,6 +183,9 @@ public class TransportIndicesStatsAction extends TransportBroadcastOperationActi
if (request.request.percolate()) {
flags.set(CommonStatsFlags.Flag.Percolate);
}
if (request.request.segments()) {
flags.set(CommonStatsFlags.Flag.Segments);
}
if (request.request.completion()) {
flags.set(CommonStatsFlags.Flag.Completion);
flags.completionDataFields(request.request.completionFields());

View File

@ -90,6 +90,14 @@ public interface Engine extends IndexShardComponent, CloseableComponent {
*/
Searcher acquireSearcher(String source) throws EngineException;
/**
* Global stats on segments.
*/
SegmentsStats segmentsStats();
/**
* The list of segments in the engine.
*/
List<Segment> segments();
/**

View File

@ -0,0 +1,88 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.engine;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
/**
*
*/
public class SegmentsStats implements Streamable, ToXContent {
private long count;
public SegmentsStats() {
}
public void add(long count) {
this.count += count;
}
public void add(SegmentsStats mergeStats) {
if (mergeStats == null) {
return;
}
this.count += mergeStats.count;
}
/**
* The the segments count.
*/
public long getCount() {
return this.count;
}
public static SegmentsStats readSegmentsStats(StreamInput in) throws IOException {
SegmentsStats stats = new SegmentsStats();
stats.readFrom(in);
return stats;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.SEGMENTS);
builder.field(Fields.COUNT, count);
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString SEGMENTS = new XContentBuilderString("segments");
static final XContentBuilderString COUNT = new XContentBuilderString("count");
}
@Override
public void readFrom(StreamInput in) throws IOException {
count = in.readVLong();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(count);
}
}

View File

@ -1118,6 +1118,24 @@ public class RobinEngine extends AbstractIndexShardComponent implements Engine {
}
}
@Override
public SegmentsStats segmentsStats() {
rwl.readLock().lock();
try {
ensureOpen();
Searcher searcher = acquireSearcher("segments_stats");
try {
SegmentsStats stats = new SegmentsStats();
stats.add(searcher.reader().leaves().size());
return stats;
} finally {
searcher.release();
}
} finally {
rwl.readLock().unlock();
}
}
@Override
public List<Segment> segments() {
rwl.readLock().lock();

View File

@ -29,6 +29,7 @@ import org.elasticsearch.index.cache.id.IdCacheStats;
import org.elasticsearch.index.cache.id.ShardIdCache;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.fielddata.IndexFieldDataService;
import org.elasticsearch.index.fielddata.ShardFieldData;
@ -89,6 +90,8 @@ public interface IndexShard extends IndexShardComponent {
MergeStats mergeStats();
SegmentsStats segmentStats();
RefreshStats refreshStats();
FlushStats flushStats();

View File

@ -520,6 +520,11 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
return mergeScheduler.stats();
}
@Override
public SegmentsStats segmentStats() {
return engine.segmentsStats();
}
@Override
public WarmerStats warmerStats() {
return shardWarmerService.stats();

View File

@ -29,6 +29,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.cache.filter.FilterCacheStats;
import org.elasticsearch.index.cache.id.IdCacheStats;
import org.elasticsearch.index.engine.SegmentsStats;
import org.elasticsearch.index.fielddata.FieldDataStats;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.get.GetStats;
@ -39,6 +40,7 @@ import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.search.stats.SearchStats;
import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.search.suggest.completion.CompletionStats;
import java.io.IOException;
import java.io.Serializable;
@ -117,6 +119,16 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
return stats.getIdCache();
}
@Nullable
public CompletionStats getCompletion() {
return stats.getCompletion();
}
@Nullable
public SegmentsStats getSegments() {
return stats.getSegments();
}
public static NodeIndicesStats readIndicesStats(StreamInput in) throws IOException {
NodeIndicesStats stats = new NodeIndicesStats();
stats.readFrom(in);

View File

@ -96,6 +96,9 @@ public class RestIndicesStatsAction extends BaseRestHandler {
controller.registerHandler(GET, "/_stats/percolate", new RestPercolateStatsHandler());
controller.registerHandler(GET, "/{index}/_stats/percolate", new RestPercolateStatsHandler());
controller.registerHandler(GET, "/_stats/segments", new RestSegmentsStatsHandler());
controller.registerHandler(GET, "/{index}/_stats/segments", new RestSegmentsStatsHandler());
}
@Override
@ -120,7 +123,7 @@ public class RestIndicesStatsAction extends BaseRestHandler {
}
/* We use "fields" as the default field list for stats that support field inclusion filters and further down
* a more specific list of fields that overrides this list.*/
final String[] defaultIncludedFields = request.paramAsStringArray("fields", null);
final String[] defaultIncludedFields = request.paramAsStringArray("fields", null);
indicesStatsRequest.docs(request.paramAsBoolean("docs", indicesStatsRequest.docs()));
indicesStatsRequest.store(request.paramAsBoolean("store", indicesStatsRequest.store()));
indicesStatsRequest.indexing(request.paramAsBoolean("indexing", indicesStatsRequest.indexing()));
@ -135,6 +138,7 @@ public class RestIndicesStatsAction extends BaseRestHandler {
indicesStatsRequest.fieldData(request.paramAsBoolean("fielddata", indicesStatsRequest.fieldData()));
indicesStatsRequest.fieldDataFields(request.paramAsStringArray("fielddata_fields", defaultIncludedFields));
indicesStatsRequest.percolate(request.paramAsBoolean("percolate", indicesStatsRequest.percolate()));
indicesStatsRequest.segments(request.paramAsBoolean("segments", indicesStatsRequest.segments()));
indicesStatsRequest.completion(request.paramAsBoolean("completion", indicesStatsRequest.completion()));
indicesStatsRequest.completionFields(request.paramAsStringArray("completion_fields", defaultIncludedFields));
@ -710,4 +714,40 @@ public class RestIndicesStatsAction extends BaseRestHandler {
}
}
class RestSegmentsStatsHandler implements RestHandler {
@Override
public void handleRequest(final RestRequest request, final RestChannel channel) {
IndicesStatsRequest indicesStatsRequest = new IndicesStatsRequest();
indicesStatsRequest.listenerThreaded(false);
indicesStatsRequest.clear().segments(true);
indicesStatsRequest.indices(Strings.splitStringByCommaToArray(request.param("index")));
client.admin().indices().stats(indicesStatsRequest, new ActionListener<IndicesStatsResponse>() {
@Override
public void onResponse(IndicesStatsResponse response) {
try {
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
builder.startObject();
builder.field("ok", true);
buildBroadcastShardsHeader(builder, response);
response.toXContent(builder, request);
builder.endObject();
channel.sendResponse(new XContentRestResponse(request, OK, builder));
} catch (Throwable e) {
onFailure(e);
}
}
@Override
public void onFailure(Throwable e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
}
}
}

View File

@ -206,6 +206,7 @@ public class RobinEngineTests extends ElasticsearchTestCase {
public void testSegments() throws Exception {
List<Segment> segments = engine.segments();
assertThat(segments.isEmpty(), equalTo(true));
assertThat(engine.segmentsStats().getCount(), equalTo(0l));
final boolean defaultCompound = defaultSettings.getAsBoolean(RobinEngine.INDEX_COMPOUND_ON_FLUSH, true);
// create a doc and refresh
@ -218,6 +219,7 @@ public class RobinEngineTests extends ElasticsearchTestCase {
segments = engine.segments();
assertThat(segments.size(), equalTo(1));
assertThat(engine.segmentsStats().getCount(), equalTo(1l));
assertThat(segments.get(0).isCommitted(), equalTo(false));
assertThat(segments.get(0).isSearch(), equalTo(true));
assertThat(segments.get(0).getNumDocs(), equalTo(2));
@ -228,6 +230,7 @@ public class RobinEngineTests extends ElasticsearchTestCase {
segments = engine.segments();
assertThat(segments.size(), equalTo(1));
assertThat(engine.segmentsStats().getCount(), equalTo(1l));
assertThat(segments.get(0).isCommitted(), equalTo(true));
assertThat(segments.get(0).isSearch(), equalTo(true));
assertThat(segments.get(0).getNumDocs(), equalTo(2));
@ -242,6 +245,7 @@ public class RobinEngineTests extends ElasticsearchTestCase {
segments = engine.segments();
assertThat(segments.size(), equalTo(2));
assertThat(engine.segmentsStats().getCount(), equalTo(2l));
assertThat(segments.get(0).getGeneration() < segments.get(1).getGeneration(), equalTo(true));
assertThat(segments.get(0).isCommitted(), equalTo(true));
assertThat(segments.get(0).isSearch(), equalTo(true));
@ -262,6 +266,7 @@ public class RobinEngineTests extends ElasticsearchTestCase {
segments = engine.segments();
assertThat(segments.size(), equalTo(2));
assertThat(engine.segmentsStats().getCount(), equalTo(2l));
assertThat(segments.get(0).getGeneration() < segments.get(1).getGeneration(), equalTo(true));
assertThat(segments.get(0).isCommitted(), equalTo(true));
assertThat(segments.get(0).isSearch(), equalTo(true));
@ -282,6 +287,7 @@ public class RobinEngineTests extends ElasticsearchTestCase {
segments = engine.segments();
assertThat(segments.size(), equalTo(3));
assertThat(engine.segmentsStats().getCount(), equalTo(3l));
assertThat(segments.get(0).getGeneration() < segments.get(1).getGeneration(), equalTo(true));
assertThat(segments.get(0).isCommitted(), equalTo(true));
assertThat(segments.get(0).isSearch(), equalTo(true));

View File

@ -20,8 +20,11 @@
package org.elasticsearch.indices.stats;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.indices.stats.*;
import org.elasticsearch.action.admin.indices.stats.CommonStats;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags;
import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags.Flag;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.bytes.BytesReference;
@ -41,7 +44,7 @@ import static org.hamcrest.Matchers.*;
/**
*
*/
@ClusterScope(scope=Scope.SUITE, numNodes=2)
@ClusterScope(scope = Scope.SUITE, numNodes = 2)
public class SimpleIndexStatsTests extends AbstractIntegrationTest {
@Test
@ -187,13 +190,32 @@ public class SimpleIndexStatsTests extends AbstractIntegrationTest {
assertThat(stats.getTotal().getMerge().getTotal(), greaterThan(0l));
}
@Test
public void testSegmentsStasts() {
prepareCreate("test1", 2).setSettings("index.number_of_shards", 5, "index.number_of_replicas", 1).get();
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().get();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
for (int i = 0; i < 20; i++) {
index("test1", "type1", Integer.toString(i), "field", "value");
index("test1", "type2", Integer.toString(i), "field", "value");
client().admin().indices().prepareFlush().get();
}
client().admin().indices().prepareOptimize().setWaitForMerge(true).setMaxNumSegments(1).execute().actionGet();
IndicesStatsResponse stats = client().admin().indices().prepareStats().setSegments(true).get();
assertThat(stats.getTotal().getSegments(), notNullValue());
assertThat(stats.getTotal().getSegments().getCount(), equalTo(10l));
}
@Test
public void testAllFlags() throws Exception {
// rely on 1 replica for this tests
createIndex("test1");
createIndex("test2");
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().get();
assertThat(clusterHealthResponse.isTimedOut(), equalTo(false));
client().prepareIndex("test1", "type1", Integer.toString(1)).setSource("field", "value").execute().actionGet();
@ -297,7 +319,7 @@ public class SimpleIndexStatsTests extends AbstractIntegrationTest {
@Test
public void testFlagOrdinalOrder() {
Flag[] flags = new Flag[]{Flag.Store, Flag.Indexing, Flag.Get, Flag.Search, Flag.Merge, Flag.Flush, Flag.Refresh,
Flag.FilterCache, Flag.IdCache, Flag.FieldData, Flag.Docs, Flag.Warmer, Flag.Percolate, Flag.Completion};
Flag.FilterCache, Flag.IdCache, Flag.FieldData, Flag.Docs, Flag.Warmer, Flag.Percolate, Flag.Completion, Flag.Segments};
assertThat(flags.length, equalTo(Flag.values().length));
for (int i = 0; i < flags.length; i++) {
@ -349,6 +371,9 @@ public class SimpleIndexStatsTests extends AbstractIntegrationTest {
case Completion:
builder.setCompletion(set);
break;
case Segments:
builder.setSegments(set);
break;
default:
assert false : "new flag? " + flag;
break;
@ -385,6 +410,8 @@ public class SimpleIndexStatsTests extends AbstractIntegrationTest {
return response.getPercolate() != null;
case Completion:
return response.getCompletion() != null;
case Segments:
return response.getSegments() != null;
default:
assert false : "new flag? " + flag;
return false;