Indices Status API: Add refresh stats, closes #811.

This commit is contained in:
kimchy 2011-03-29 17:54:00 +02:00
parent 2a032f8718
commit 3138269573
9 changed files with 224 additions and 31 deletions

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.indices.status;
import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import java.util.Iterator; import java.util.Iterator;
@ -181,6 +182,18 @@ public class IndexShardStatus implements Iterable<ShardStatus> {
return this.mergeStats(); return this.mergeStats();
} }
public RefreshStats refreshStats() {
RefreshStats refreshStats = new RefreshStats();
for (ShardStatus shard : shards) {
refreshStats.add(shard.refreshStats());
}
return refreshStats;
}
public RefreshStats getRefreshStats() {
return refreshStats();
}
@Override public Iterator<ShardStatus> iterator() { @Override public Iterator<ShardStatus> iterator() {
return Iterators.forArray(shards); return Iterators.forArray(shards);
} }

View File

@ -22,6 +22,7 @@ package org.elasticsearch.action.admin.indices.status;
import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.refresh.RefreshStats;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -189,6 +190,18 @@ public class IndexStatus implements Iterable<IndexShardStatus> {
return this.mergeStats(); return this.mergeStats();
} }
public RefreshStats refreshStats() {
RefreshStats refreshStats = new RefreshStats();
for (IndexShardStatus shard : this) {
refreshStats.add(shard.refreshStats());
}
return refreshStats;
}
public RefreshStats getRefreshStats() {
return refreshStats();
}
@Override public Iterator<IndexShardStatus> iterator() { @Override public Iterator<IndexShardStatus> iterator() {
return indexShards.values().iterator(); return indexShards.values().iterator();
} }

View File

@ -31,6 +31,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.refresh.RefreshStats;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
@ -153,6 +154,10 @@ public class IndicesStatusResponse extends BroadcastOperationResponse implements
if (mergeStats != null) { if (mergeStats != null) {
mergeStats.toXContent(builder, params); mergeStats.toXContent(builder, params);
} }
RefreshStats refreshStats = indexStatus.refreshStats();
if (refreshStats != null) {
refreshStats.toXContent(builder, params);
}
builder.startObject(Fields.SHARDS); builder.startObject(Fields.SHARDS);
for (IndexShardStatus indexShardStatus : indexStatus) { for (IndexShardStatus indexShardStatus : indexStatus) {
@ -196,6 +201,11 @@ public class IndicesStatusResponse extends BroadcastOperationResponse implements
mergeStats.toXContent(builder, params); mergeStats.toXContent(builder, params);
} }
refreshStats = shardStatus.refreshStats();
if (refreshStats != null) {
refreshStats.toXContent(builder, params);
}
if (shardStatus.peerRecoveryStatus() != null) { if (shardStatus.peerRecoveryStatus() != null) {
PeerRecoveryStatus peerRecoveryStatus = shardStatus.peerRecoveryStatus(); PeerRecoveryStatus peerRecoveryStatus = shardStatus.peerRecoveryStatus();
builder.startObject(Fields.PEER_RECOVERY); builder.startObject(Fields.PEER_RECOVERY);

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardState;
import java.io.IOException; import java.io.IOException;
@ -53,6 +54,8 @@ public class ShardStatus extends BroadcastShardOperationResponse {
MergeStats mergeStats; MergeStats mergeStats;
RefreshStats refreshStats;
PeerRecoveryStatus peerRecoveryStatus; PeerRecoveryStatus peerRecoveryStatus;
GatewayRecoveryStatus gatewayRecoveryStatus; GatewayRecoveryStatus gatewayRecoveryStatus;
@ -165,6 +168,20 @@ public class ShardStatus extends BroadcastShardOperationResponse {
return this.mergeStats; return this.mergeStats;
} }
/**
* Refresh stats.
*/
public RefreshStats refreshStats() {
return this.refreshStats;
}
/**
* Refresh stats.
*/
public RefreshStats getRefreshStats() {
return refreshStats();
}
/** /**
* Peer recovery status (<tt>null</tt> if not applicable). Both real time if an on going recovery * Peer recovery status (<tt>null</tt> if not applicable). Both real time if an on going recovery
* is in progress and summary once it is done. * is in progress and summary once it is done.
@ -280,6 +297,12 @@ public class ShardStatus extends BroadcastShardOperationResponse {
out.writeBoolean(true); out.writeBoolean(true);
mergeStats.writeTo(out); mergeStats.writeTo(out);
} }
if (refreshStats == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
refreshStats.writeTo(out);
}
} }
@Override public void readFrom(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException {
@ -315,5 +338,8 @@ public class ShardStatus extends BroadcastShardOperationResponse {
if (in.readBoolean()) { if (in.readBoolean()) {
mergeStats = MergeStats.readMergeStats(in); mergeStats = MergeStats.readMergeStats(in);
} }
if (in.readBoolean()) {
refreshStats = RefreshStats.readRefreshStats(in);
}
} }
} }

View File

@ -170,6 +170,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
} }
shardStatus.mergeStats = indexShard.mergeScheduler().stats(); shardStatus.mergeStats = indexShard.mergeScheduler().stats();
shardStatus.refreshStats = indexShard.refreshStats();
} }
if (request.recovery) { if (request.recovery) {

View File

@ -34,60 +34,63 @@ import java.io.IOException;
*/ */
public class MergeStats implements Streamable, ToXContent { public class MergeStats implements Streamable, ToXContent {
private long totalMerges; private long total;
private long currentMerges; private long current;
private long totalMergeTime; private long totalTimeInMillis;
public MergeStats() { public MergeStats() {
} }
public MergeStats(long totalMerges, long currentMerges, long totalMergeTime) { public MergeStats(long total, long current, long totalTimeInMillis) {
this.totalMerges = totalMerges; this.total = total;
this.currentMerges = currentMerges; this.current = current;
this.totalMergeTime = totalMergeTime; this.totalTimeInMillis = totalTimeInMillis;
} }
public void add(long totalMerges, long currentMerges, long totalMergeTime) { public void add(long totalMerges, long currentMerges, long totalMergeTime) {
this.totalMerges += totalMerges; this.total += totalMerges;
this.currentMerges += currentMerges; this.current += currentMerges;
this.totalMergeTime += totalMergeTime; this.totalTimeInMillis += totalMergeTime;
} }
public void add(MergeStats mergeStats) { public void add(MergeStats mergeStats) {
this.totalMerges += mergeStats.totalMerges; if (mergeStats == null) {
this.currentMerges += mergeStats.currentMerges; return;
this.totalMergeTime += mergeStats.totalMergeTime; }
this.total += mergeStats.total;
this.current += mergeStats.current;
this.totalTimeInMillis += mergeStats.totalTimeInMillis;
} }
/** /**
* The total number of merges executed. * The total number of merges executed.
*/ */
public long totalMerges() { public long total() {
return this.totalMerges; return this.total;
} }
/** /**
* The current number of merges executing. * The current number of merges executing.
*/ */
public long currentMerges() { public long current() {
return this.currentMerges; return this.current;
} }
/** /**
* The total time merges have been executed (in milliseconds). * The total time merges have been executed (in milliseconds).
*/ */
public long totalMergeTimeInMillis() { public long totalTimeInMillis() {
return this.totalMergeTime; return this.totalTimeInMillis;
} }
/** /**
* The total time merges have been executed. * The total time merges have been executed.
*/ */
public TimeValue totalMergeTime() { public TimeValue totalTime() {
return new TimeValue(totalMergeTime); return new TimeValue(totalTimeInMillis);
} }
public static MergeStats readMergeStats(StreamInput in) throws IOException { public static MergeStats readMergeStats(StreamInput in) throws IOException {
@ -98,10 +101,10 @@ public class MergeStats implements Streamable, ToXContent {
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.MERGES); builder.startObject(Fields.MERGES);
builder.field(Fields.CURRENT, currentMerges); builder.field(Fields.CURRENT, current);
builder.field(Fields.TOTAL, totalMerges); builder.field(Fields.TOTAL, total);
builder.field(Fields.TOTAL_TIME, totalMergeTime().toString()); builder.field(Fields.TOTAL_TIME, totalTime().toString());
builder.field(Fields.TOTAL_TIME_IN_MILLIS, totalMergeTime); builder.field(Fields.TOTAL_TIME_IN_MILLIS, totalTimeInMillis);
builder.endObject(); builder.endObject();
return builder; return builder;
} }
@ -115,14 +118,14 @@ public class MergeStats implements Streamable, ToXContent {
} }
@Override public void readFrom(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException {
totalMerges = in.readVLong(); total = in.readVLong();
currentMerges = in.readVLong(); current = in.readVLong();
totalMergeTime = in.readVLong(); totalTimeInMillis = in.readVLong();
} }
@Override public void writeTo(StreamOutput out) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(totalMerges); out.writeVLong(total);
out.writeVLong(currentMerges); out.writeVLong(current);
out.writeVLong(totalMergeTime); out.writeVLong(totalTimeInMillis);
} }
} }

View File

@ -0,0 +1,112 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.refresh;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
public class RefreshStats implements Streamable, ToXContent {
private long total;
private long totalTimeInMillis;
public RefreshStats() {
}
public RefreshStats(long total, long totalTimeInMillis) {
this.total = total;
this.totalTimeInMillis = totalTimeInMillis;
}
public void add(long total, long totalTimeInMillis) {
this.total += total;
this.totalTimeInMillis += totalTimeInMillis;
}
public void add(RefreshStats refreshStats) {
if (refreshStats == null) {
return;
}
this.total += refreshStats.total;
this.totalTimeInMillis += refreshStats.totalTimeInMillis;
}
/**
* The total number of refresh executed.
*/
public long total() {
return this.total;
}
/**
* The total time merges have been executed (in milliseconds).
*/
public long totalTimeInMillis() {
return this.totalTimeInMillis;
}
/**
* The total time merges have been executed.
*/
public TimeValue totalTime() {
return new TimeValue(totalTimeInMillis);
}
public static RefreshStats readRefreshStats(StreamInput in) throws IOException {
RefreshStats refreshStats = new RefreshStats();
refreshStats.readFrom(in);
return refreshStats;
}
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.REFRESH);
builder.field(Fields.TOTAL, total);
builder.field(Fields.TOTAL_TIME, totalTime().toString());
builder.field(Fields.TOTAL_TIME_IN_MILLIS, totalTimeInMillis);
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString REFRESH = new XContentBuilderString("refresh");
static final XContentBuilderString TOTAL = new XContentBuilderString("total");
static final XContentBuilderString TOTAL_TIME = new XContentBuilderString("total_time");
static final XContentBuilderString TOTAL_TIME_IN_MILLIS = new XContentBuilderString("total_time_in_millis");
}
@Override public void readFrom(StreamInput in) throws IOException {
total = in.readVLong();
totalTimeInMillis = in.readVLong();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(total);
out.writeVLong(totalTimeInMillis);
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.shard.IndexShardComponent; import org.elasticsearch.index.shard.IndexShardComponent;
import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardState;
@ -43,6 +44,8 @@ public interface IndexShard extends IndexShardComponent {
ShardRouting routingEntry(); ShardRouting routingEntry();
RefreshStats refreshStats();
IndexShardState state(); IndexShardState state();
/** /**

View File

@ -45,6 +45,7 @@ import org.elasticsearch.index.merge.scheduler.MergeSchedulerProvider;
import org.elasticsearch.index.query.IndexQueryParser; import org.elasticsearch.index.query.IndexQueryParser;
import org.elasticsearch.index.query.IndexQueryParserMissingException; import org.elasticsearch.index.query.IndexQueryParserMissingException;
import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.*; import org.elasticsearch.index.shard.*;
@ -60,6 +61,7 @@ import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException; import java.nio.channels.ClosedByInterruptException;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.index.mapper.SourceToParse.*; import static org.elasticsearch.index.mapper.SourceToParse.*;
@ -111,6 +113,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private ApplyRefreshSettings applyRefreshSettings = new ApplyRefreshSettings(); private ApplyRefreshSettings applyRefreshSettings = new ApplyRefreshSettings();
private final AtomicLong totalRefresh = new AtomicLong();
private final AtomicLong totalRefreshTime = new AtomicLong();
@Inject public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog, @Inject public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache) { ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache) {
super(shardId, indexSettings); super(shardId, indexSettings);
@ -403,7 +408,14 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("refresh with {}", refresh); logger.trace("refresh with {}", refresh);
} }
long time = System.currentTimeMillis();
engine.refresh(refresh); engine.refresh(refresh);
totalRefresh.incrementAndGet();
totalRefreshTime.addAndGet(System.currentTimeMillis() - time);
}
@Override public RefreshStats refreshStats() {
return new RefreshStats(totalRefresh.get(), totalRefreshTime.get());
} }
@Override public void flush(Engine.Flush flush) throws ElasticSearchException { @Override public void flush(Engine.Flush flush) throws ElasticSearchException {