add flush stats

This commit is contained in:
Shay Banon 2011-08-21 04:54:27 +03:00
parent 6fd6965bdf
commit 67e161f710
10 changed files with 204 additions and 6 deletions

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.admin.indices.status;
import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
@ -194,6 +195,18 @@ public class IndexShardStatus implements Iterable<ShardStatus> {
return refreshStats(); return refreshStats();
} }
public FlushStats flushStats() {
FlushStats flushStats = new FlushStats();
for (ShardStatus shard : shards) {
flushStats.add(shard.flushStats);
}
return flushStats;
}
public FlushStats getFlushStats() {
return flushStats();
}
@Override public Iterator<ShardStatus> iterator() { @Override public Iterator<ShardStatus> iterator() {
return Iterators.forArray(shards); return Iterators.forArray(shards);
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.admin.indices.status;
import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.collect.Maps;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.refresh.RefreshStats;
@ -202,6 +203,18 @@ public class IndexStatus implements Iterable<IndexShardStatus> {
return refreshStats(); return refreshStats();
} }
public FlushStats flushStats() {
FlushStats flushStats = new FlushStats();
for (IndexShardStatus shard : this) {
flushStats.add(shard.flushStats());
}
return flushStats;
}
public FlushStats getFlushStats() {
return flushStats();
}
@Override public Iterator<IndexShardStatus> iterator() { @Override public Iterator<IndexShardStatus> iterator() {
return indexShards.values().iterator(); return indexShards.values().iterator();
} }

View File

@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.refresh.RefreshStats;
@ -158,6 +159,10 @@ public class IndicesStatusResponse extends BroadcastOperationResponse implements
if (refreshStats != null) { if (refreshStats != null) {
refreshStats.toXContent(builder, params); refreshStats.toXContent(builder, params);
} }
FlushStats flushStats = indexStatus.flushStats();
if (flushStats != null) {
flushStats.toXContent(builder, params);
}
builder.startObject(Fields.SHARDS); builder.startObject(Fields.SHARDS);
for (IndexShardStatus indexShardStatus : indexStatus) { for (IndexShardStatus indexShardStatus : indexStatus) {
@ -205,6 +210,10 @@ public class IndicesStatusResponse extends BroadcastOperationResponse implements
if (refreshStats != null) { if (refreshStats != null) {
refreshStats.toXContent(builder, params); refreshStats.toXContent(builder, params);
} }
flushStats = shardStatus.flushStats();
if (flushStats != null) {
flushStats.toXContent(builder, params);
}
if (shardStatus.peerRecoveryStatus() != null) { if (shardStatus.peerRecoveryStatus() != null) {
PeerRecoveryStatus peerRecoveryStatus = shardStatus.peerRecoveryStatus(); PeerRecoveryStatus peerRecoveryStatus = shardStatus.peerRecoveryStatus();

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.refresh.RefreshStats;
import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.IndexShardState;
@ -56,6 +57,8 @@ public class ShardStatus extends BroadcastShardOperationResponse {
RefreshStats refreshStats; RefreshStats refreshStats;
FlushStats flushStats;
PeerRecoveryStatus peerRecoveryStatus; PeerRecoveryStatus peerRecoveryStatus;
GatewayRecoveryStatus gatewayRecoveryStatus; GatewayRecoveryStatus gatewayRecoveryStatus;
@ -182,6 +185,14 @@ public class ShardStatus extends BroadcastShardOperationResponse {
return refreshStats(); return refreshStats();
} }
public FlushStats flushStats() {
return this.flushStats;
}
public FlushStats getFlushStats() {
return this.flushStats;
}
/** /**
* Peer recovery status (<tt>null</tt> if not applicable). Both real time if an on going recovery * Peer recovery status (<tt>null</tt> if not applicable). Both real time if an on going recovery
* is in progress and summary once it is done. * is in progress and summary once it is done.
@ -303,6 +314,12 @@ public class ShardStatus extends BroadcastShardOperationResponse {
out.writeBoolean(true); out.writeBoolean(true);
refreshStats.writeTo(out); refreshStats.writeTo(out);
} }
if (flushStats == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
flushStats.writeTo(out);
}
} }
@Override public void readFrom(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException {
@ -341,5 +358,8 @@ public class ShardStatus extends BroadcastShardOperationResponse {
if (in.readBoolean()) { if (in.readBoolean()) {
refreshStats = RefreshStats.readRefreshStats(in); refreshStats = RefreshStats.readRefreshStats(in);
} }
if (in.readBoolean()) {
flushStats = FlushStats.readFlushStats(in);
}
} }
} }

View File

@ -156,6 +156,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct
shardStatus.mergeStats = indexShard.mergeScheduler().stats(); shardStatus.mergeStats = indexShard.mergeScheduler().stats();
shardStatus.refreshStats = indexShard.refreshStats(); shardStatus.refreshStats = indexShard.refreshStats();
shardStatus.flushStats = indexShard.flushStats();
} }
if (request.recovery) { if (request.recovery) {

View File

@ -0,0 +1,112 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.index.flush;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
public class FlushStats implements Streamable, ToXContent {
private long total;
private long totalTimeInMillis;
public FlushStats() {
}
public FlushStats(long total, long totalTimeInMillis) {
this.total = total;
this.totalTimeInMillis = totalTimeInMillis;
}
public void add(long total, long totalTimeInMillis) {
this.total += total;
this.totalTimeInMillis += totalTimeInMillis;
}
public void add(FlushStats flushStats) {
if (flushStats == null) {
return;
}
this.total += flushStats.total;
this.totalTimeInMillis += flushStats.totalTimeInMillis;
}
/**
* The total number of flush executed.
*/
public long total() {
return this.total;
}
/**
* The total time merges have been executed (in milliseconds).
*/
public long totalTimeInMillis() {
return this.totalTimeInMillis;
}
/**
* The total time merges have been executed.
*/
public TimeValue totalTime() {
return new TimeValue(totalTimeInMillis);
}
public static FlushStats readFlushStats(StreamInput in) throws IOException {
FlushStats flushStats = new FlushStats();
flushStats.readFrom(in);
return flushStats;
}
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.FLUSH);
builder.field(Fields.TOTAL, total);
builder.field(Fields.TOTAL_TIME, totalTime().toString());
builder.field(Fields.TOTAL_TIME_IN_MILLIS, totalTimeInMillis);
builder.endObject();
return builder;
}
static final class Fields {
static final XContentBuilderString FLUSH = new XContentBuilderString("flush");
static final XContentBuilderString TOTAL = new XContentBuilderString("total");
static final XContentBuilderString TOTAL_TIME = new XContentBuilderString("total_time");
static final XContentBuilderString TOTAL_TIME_IN_MILLIS = new XContentBuilderString("total_time_in_millis");
}
@Override public void readFrom(StreamInput in) throws IOException {
total = in.readVLong();
totalTimeInMillis = in.readVLong();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(total);
out.writeVLong(totalTimeInMillis);
}
}

View File

@ -25,6 +25,7 @@ import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.util.concurrent.ThreadSafe; import org.elasticsearch.common.util.concurrent.ThreadSafe;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.ParsedDocument;
import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.mapper.SourceToParse;
import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.refresh.RefreshStats;
@ -45,6 +46,8 @@ public interface IndexShard extends IndexShardComponent {
RefreshStats refreshStats(); RefreshStats refreshStats();
FlushStats flushStats();
IndexShardState state(); IndexShardState state();
Engine.Create prepareCreate(SourceToParse source) throws ElasticSearchException; Engine.Create prepareCreate(SourceToParse source) throws ElasticSearchException;

View File

@ -46,6 +46,7 @@ import org.elasticsearch.index.engine.EngineClosedException;
import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.engine.OptimizeFailedEngineException; import org.elasticsearch.index.engine.OptimizeFailedEngineException;
import org.elasticsearch.index.engine.RefreshFailedEngineException; import org.elasticsearch.index.engine.RefreshFailedEngineException;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.DocumentMapper;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.ParsedDocument;
@ -69,6 +70,7 @@ import java.io.PrintStream;
import java.nio.channels.ClosedByInterruptException; import java.nio.channels.ClosedByInterruptException;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.index.mapper.SourceToParse.*; import static org.elasticsearch.index.mapper.SourceToParse.*;
@ -122,7 +124,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
private ApplyRefreshSettings applyRefreshSettings = new ApplyRefreshSettings(); private ApplyRefreshSettings applyRefreshSettings = new ApplyRefreshSettings();
private final MeanMetric totalRefreshMetric = new MeanMetric(); private final MeanMetric refreshMetric = new MeanMetric();
private final MeanMetric flushMetric = new MeanMetric();
@Inject public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog, @Inject public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog,
ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService) { ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService) {
@ -379,13 +382,17 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("refresh with {}", refresh); logger.trace("refresh with {}", refresh);
} }
long time = System.currentTimeMillis(); long time = System.nanoTime();
engine.refresh(refresh); engine.refresh(refresh);
totalRefreshMetric.inc(System.currentTimeMillis() - time); refreshMetric.inc(System.nanoTime() - time);
} }
@Override public RefreshStats refreshStats() { @Override public RefreshStats refreshStats() {
return new RefreshStats(totalRefreshMetric.count(), totalRefreshMetric.sum()); return new RefreshStats(refreshMetric.count(), TimeUnit.NANOSECONDS.toMillis(refreshMetric.sum()));
}
@Override public FlushStats flushStats() {
return new FlushStats(flushMetric.count(), TimeUnit.NANOSECONDS.toMillis(flushMetric.sum()));
} }
@Override public void flush(Engine.Flush flush) throws ElasticSearchException { @Override public void flush(Engine.Flush flush) throws ElasticSearchException {
@ -393,7 +400,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("flush with {}", flush); logger.trace("flush with {}", flush);
} }
long time = System.nanoTime();
engine.flush(flush); engine.flush(flush);
flushMetric.inc(System.nanoTime() - time);
} }
@Override public void optimize(Engine.Optimize optimize) throws ElasticSearchException { @Override public void optimize(Engine.Optimize optimize) throws ElasticSearchException {

View File

@ -53,6 +53,7 @@ import org.elasticsearch.index.cache.IndexCacheModule;
import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.IndexEngine; import org.elasticsearch.index.engine.IndexEngine;
import org.elasticsearch.index.engine.IndexEngineModule; import org.elasticsearch.index.engine.IndexEngineModule;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.gateway.IndexGateway; import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.gateway.IndexGatewayModule; import org.elasticsearch.index.gateway.IndexGatewayModule;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
@ -176,6 +177,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
CacheStats cacheStats = new CacheStats(); CacheStats cacheStats = new CacheStats();
MergeStats mergeStats = new MergeStats(); MergeStats mergeStats = new MergeStats();
RefreshStats refreshStats = new RefreshStats(); RefreshStats refreshStats = new RefreshStats();
FlushStats flushStats = new FlushStats();
for (IndexService indexService : indices.values()) { for (IndexService indexService : indices.values()) {
for (IndexShard indexShard : indexService) { for (IndexShard indexShard : indexService) {
try { try {
@ -194,10 +196,11 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
} }
mergeStats.add(((InternalIndexShard) indexShard).mergeScheduler().stats()); mergeStats.add(((InternalIndexShard) indexShard).mergeScheduler().stats());
refreshStats.add(indexShard.refreshStats()); refreshStats.add(indexShard.refreshStats());
flushStats.add(indexShard.flushStats());
} }
cacheStats.add(indexService.cache().stats()); cacheStats.add(indexService.cache().stats());
} }
return new NodeIndicesStats(new ByteSizeValue(storeTotalSize), numberOfDocs, cacheStats, mergeStats, refreshStats); return new NodeIndicesStats(new ByteSizeValue(storeTotalSize), numberOfDocs, cacheStats, mergeStats, refreshStats, flushStats);
} }
/** /**

View File

@ -27,6 +27,7 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentBuilderString;
import org.elasticsearch.index.cache.CacheStats; import org.elasticsearch.index.cache.CacheStats;
import org.elasticsearch.index.flush.FlushStats;
import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.merge.MergeStats;
import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.refresh.RefreshStats;
@ -50,15 +51,18 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
private RefreshStats refreshStats; private RefreshStats refreshStats;
private FlushStats flushStats;
NodeIndicesStats() { NodeIndicesStats() {
} }
public NodeIndicesStats(ByteSizeValue storeSize, long numDocs, CacheStats cacheStats, MergeStats mergeStats, RefreshStats refreshStats) { public NodeIndicesStats(ByteSizeValue storeSize, long numDocs, CacheStats cacheStats, MergeStats mergeStats, RefreshStats refreshStats, FlushStats flushStats) {
this.storeSize = storeSize; this.storeSize = storeSize;
this.numDocs = numDocs; this.numDocs = numDocs;
this.cacheStats = cacheStats; this.cacheStats = cacheStats;
this.mergeStats = mergeStats; this.mergeStats = mergeStats;
this.refreshStats = refreshStats; this.refreshStats = refreshStats;
this.flushStats = flushStats;
} }
/** /**
@ -113,6 +117,14 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
return this.refresh(); return this.refresh();
} }
public FlushStats flush() {
return this.flushStats;
}
public FlushStats getFlush() {
return this.flushStats;
}
public static NodeIndicesStats readIndicesStats(StreamInput in) throws IOException { public static NodeIndicesStats readIndicesStats(StreamInput in) throws IOException {
NodeIndicesStats stats = new NodeIndicesStats(); NodeIndicesStats stats = new NodeIndicesStats();
stats.readFrom(in); stats.readFrom(in);
@ -125,6 +137,7 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
cacheStats = CacheStats.readCacheStats(in); cacheStats = CacheStats.readCacheStats(in);
mergeStats = MergeStats.readMergeStats(in); mergeStats = MergeStats.readMergeStats(in);
refreshStats = RefreshStats.readRefreshStats(in); refreshStats = RefreshStats.readRefreshStats(in);
flushStats = FlushStats.readFlushStats(in);
} }
@Override public void writeTo(StreamOutput out) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException {
@ -133,6 +146,7 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
cacheStats.writeTo(out); cacheStats.writeTo(out);
mergeStats.writeTo(out); mergeStats.writeTo(out);
refreshStats.writeTo(out); refreshStats.writeTo(out);
flushStats.writeTo(out);
} }
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
@ -148,6 +162,7 @@ public class NodeIndicesStats implements Streamable, Serializable, ToXContent {
cacheStats.toXContent(builder, params); cacheStats.toXContent(builder, params);
mergeStats.toXContent(builder, params); mergeStats.toXContent(builder, params);
refreshStats.toXContent(builder, params); refreshStats.toXContent(builder, params);
flushStats.toXContent(builder, params);
builder.endObject(); builder.endObject();
return builder; return builder;