diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndexShardStatus.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndexShardStatus.java index 820ac0c22e3..a86a4e5167d 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndexShardStatus.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndexShardStatus.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.admin.indices.status; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.shard.ShardId; @@ -194,6 +195,18 @@ public class IndexShardStatus implements Iterable { return refreshStats(); } + public FlushStats flushStats() { + FlushStats flushStats = new FlushStats(); + for (ShardStatus shard : shards) { + flushStats.add(shard.flushStats); + } + return flushStats; + } + + public FlushStats getFlushStats() { + return flushStats(); + } + @Override public Iterator iterator() { return Iterators.forArray(shards); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndexStatus.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndexStatus.java index a8e33a61c9f..3b85a28ce7c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndexStatus.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndexStatus.java @@ -21,6 +21,7 @@ package org.elasticsearch.action.admin.indices.status; import org.elasticsearch.common.collect.Maps; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.refresh.RefreshStats; @@ -202,6 +203,18 @@ public class IndexStatus implements Iterable { return refreshStats(); } + public FlushStats flushStats() { + FlushStats flushStats = new FlushStats(); + for (IndexShardStatus shard : this) { + flushStats.add(shard.flushStats()); + } + return flushStats; + } + + public FlushStats getFlushStats() { + return flushStats(); + } + @Override public Iterator iterator() { return indexShards.values().iterator(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusResponse.java index f4e93a9e0f4..35318ed65af 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusResponse.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/IndicesStatusResponse.java @@ -30,6 +30,7 @@ import org.elasticsearch.common.settings.SettingsFilter; import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.refresh.RefreshStats; @@ -158,6 +159,10 @@ public class IndicesStatusResponse extends BroadcastOperationResponse implements if (refreshStats != null) { refreshStats.toXContent(builder, params); } + FlushStats flushStats = indexStatus.flushStats(); + if (flushStats != null) { + flushStats.toXContent(builder, params); + } builder.startObject(Fields.SHARDS); for (IndexShardStatus indexShardStatus : indexStatus) { @@ -205,6 +210,10 @@ public class IndicesStatusResponse extends BroadcastOperationResponse implements if (refreshStats != null) { refreshStats.toXContent(builder, params); } + flushStats = shardStatus.flushStats(); + if (flushStats != null) { + flushStats.toXContent(builder, params); + } if (shardStatus.peerRecoveryStatus() != null) { PeerRecoveryStatus peerRecoveryStatus = shardStatus.peerRecoveryStatus(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/ShardStatus.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/ShardStatus.java index 611097976e1..4d16711d7c2 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/ShardStatus.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/ShardStatus.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.merge.MergeStats; import org.elasticsearch.index.refresh.RefreshStats; import org.elasticsearch.index.shard.IndexShardState; @@ -56,6 +57,8 @@ public class ShardStatus extends BroadcastShardOperationResponse { RefreshStats refreshStats; + FlushStats flushStats; + PeerRecoveryStatus peerRecoveryStatus; GatewayRecoveryStatus gatewayRecoveryStatus; @@ -182,6 +185,14 @@ public class ShardStatus extends BroadcastShardOperationResponse { return refreshStats(); } + public FlushStats flushStats() { + return this.flushStats; + } + + public FlushStats getFlushStats() { + return this.flushStats; + } + /** * Peer recovery status (null if not applicable). Both real time if an on going recovery * is in progress and summary once it is done. @@ -303,6 +314,12 @@ public class ShardStatus extends BroadcastShardOperationResponse { out.writeBoolean(true); refreshStats.writeTo(out); } + if (flushStats == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + flushStats.writeTo(out); + } } @Override public void readFrom(StreamInput in) throws IOException { @@ -341,5 +358,8 @@ public class ShardStatus extends BroadcastShardOperationResponse { if (in.readBoolean()) { refreshStats = RefreshStats.readRefreshStats(in); } + if (in.readBoolean()) { + flushStats = FlushStats.readFlushStats(in); + } } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java index e97f128481d..122f3f440a9 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/indices/status/TransportIndicesStatusAction.java @@ -156,6 +156,7 @@ public class TransportIndicesStatusAction extends TransportBroadcastOperationAct shardStatus.mergeStats = indexShard.mergeScheduler().stats(); shardStatus.refreshStats = indexShard.refreshStats(); + shardStatus.flushStats = indexShard.flushStats(); } if (request.recovery) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/flush/FlushStats.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/flush/FlushStats.java new file mode 100644 index 00000000000..78ae390fbb9 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/flush/FlushStats.java @@ -0,0 +1,112 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.flush; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.io.stream.Streamable; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; + +import java.io.IOException; + +public class FlushStats implements Streamable, ToXContent { + + private long total; + + private long totalTimeInMillis; + + public FlushStats() { + + } + + public FlushStats(long total, long totalTimeInMillis) { + this.total = total; + this.totalTimeInMillis = totalTimeInMillis; + } + + public void add(long total, long totalTimeInMillis) { + this.total += total; + this.totalTimeInMillis += totalTimeInMillis; + } + + public void add(FlushStats flushStats) { + if (flushStats == null) { + return; + } + this.total += flushStats.total; + this.totalTimeInMillis += flushStats.totalTimeInMillis; + } + + /** + * The total number of flush executed. + */ + public long total() { + return this.total; + } + + /** + * The total time merges have been executed (in milliseconds). + */ + public long totalTimeInMillis() { + return this.totalTimeInMillis; + } + + /** + * The total time merges have been executed. + */ + public TimeValue totalTime() { + return new TimeValue(totalTimeInMillis); + } + + public static FlushStats readFlushStats(StreamInput in) throws IOException { + FlushStats flushStats = new FlushStats(); + flushStats.readFrom(in); + return flushStats; + } + + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(Fields.FLUSH); + builder.field(Fields.TOTAL, total); + builder.field(Fields.TOTAL_TIME, totalTime().toString()); + builder.field(Fields.TOTAL_TIME_IN_MILLIS, totalTimeInMillis); + builder.endObject(); + return builder; + } + + static final class Fields { + static final XContentBuilderString FLUSH = new XContentBuilderString("flush"); + static final XContentBuilderString TOTAL = new XContentBuilderString("total"); + static final XContentBuilderString TOTAL_TIME = new XContentBuilderString("total_time"); + static final XContentBuilderString TOTAL_TIME_IN_MILLIS = new XContentBuilderString("total_time_in_millis"); + } + + @Override public void readFrom(StreamInput in) throws IOException { + total = in.readVLong(); + totalTimeInMillis = in.readVLong(); + } + + @Override public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(total); + out.writeVLong(totalTimeInMillis); + } +} \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java index 7ae6a3f2a2f..1398d7b3556 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/IndexShard.java @@ -25,6 +25,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.util.concurrent.ThreadSafe; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineException; +import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.mapper.ParsedDocument; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.refresh.RefreshStats; @@ -45,6 +46,8 @@ public interface IndexShard extends IndexShardComponent { RefreshStats refreshStats(); + FlushStats flushStats(); + IndexShardState state(); Engine.Create prepareCreate(SourceToParse source) throws ElasticSearchException; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java index 8a6dabfe885..9329790950f 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/service/InternalIndexShard.java @@ -46,6 +46,7 @@ import org.elasticsearch.index.engine.EngineClosedException; import org.elasticsearch.index.engine.EngineException; import org.elasticsearch.index.engine.OptimizeFailedEngineException; import org.elasticsearch.index.engine.RefreshFailedEngineException; +import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.mapper.DocumentMapper; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.ParsedDocument; @@ -69,6 +70,7 @@ import java.io.PrintStream; import java.nio.channels.ClosedByInterruptException; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import static org.elasticsearch.index.mapper.SourceToParse.*; @@ -122,7 +124,8 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I private ApplyRefreshSettings applyRefreshSettings = new ApplyRefreshSettings(); - private final MeanMetric totalRefreshMetric = new MeanMetric(); + private final MeanMetric refreshMetric = new MeanMetric(); + private final MeanMetric flushMetric = new MeanMetric(); @Inject public InternalIndexShard(ShardId shardId, @IndexSettings Settings indexSettings, IndexSettingsService indexSettingsService, IndicesLifecycle indicesLifecycle, Store store, Engine engine, MergeSchedulerProvider mergeScheduler, Translog translog, ThreadPool threadPool, MapperService mapperService, IndexQueryParserService queryParserService, IndexCache indexCache, IndexAliasesService indexAliasesService) { @@ -379,13 +382,17 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I if (logger.isTraceEnabled()) { logger.trace("refresh with {}", refresh); } - long time = System.currentTimeMillis(); + long time = System.nanoTime(); engine.refresh(refresh); - totalRefreshMetric.inc(System.currentTimeMillis() - time); + refreshMetric.inc(System.nanoTime() - time); } @Override public RefreshStats refreshStats() { - return new RefreshStats(totalRefreshMetric.count(), totalRefreshMetric.sum()); + return new RefreshStats(refreshMetric.count(), TimeUnit.NANOSECONDS.toMillis(refreshMetric.sum())); + } + + @Override public FlushStats flushStats() { + return new FlushStats(flushMetric.count(), TimeUnit.NANOSECONDS.toMillis(flushMetric.sum())); } @Override public void flush(Engine.Flush flush) throws ElasticSearchException { @@ -393,7 +400,9 @@ public class InternalIndexShard extends AbstractIndexShardComponent implements I if (logger.isTraceEnabled()) { logger.trace("flush with {}", flush); } + long time = System.nanoTime(); engine.flush(flush); + flushMetric.inc(System.nanoTime() - time); } @Override public void optimize(Engine.Optimize optimize) throws ElasticSearchException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java index 96925a96a7f..e3a26e82bfd 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/InternalIndicesService.java @@ -53,6 +53,7 @@ import org.elasticsearch.index.cache.IndexCacheModule; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.IndexEngine; import org.elasticsearch.index.engine.IndexEngineModule; +import org.elasticsearch.index.flush.FlushStats; import org.elasticsearch.index.gateway.IndexGateway; import org.elasticsearch.index.gateway.IndexGatewayModule; import org.elasticsearch.index.mapper.MapperService; @@ -176,6 +177,7 @@ public class InternalIndicesService extends AbstractLifecycleComponent