add size and docs to merge stats
This commit is contained in:
parent
67e161f710
commit
f4a36a2d87
|
@ -35,7 +35,11 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
|
|||
private final ESLogger logger;
|
||||
|
||||
private final MeanMetric totalMerges = new MeanMetric();
|
||||
private final CounterMetric totalMergesNumDocs = new CounterMetric();
|
||||
private final CounterMetric totalMergesSizeInBytes = new CounterMetric();
|
||||
private final CounterMetric currentMerges = new CounterMetric();
|
||||
private final CounterMetric currentMergesNumDocs = new CounterMetric();
|
||||
private final CounterMetric currentMergesSizeInBytes = new CounterMetric();
|
||||
|
||||
public TrackingConcurrentMergeScheduler(ESLogger logger) {
|
||||
super();
|
||||
|
@ -50,21 +54,47 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
|
|||
return totalMerges.sum();
|
||||
}
|
||||
|
||||
public long totalMergeNumDocs() {
|
||||
return totalMergesNumDocs.count();
|
||||
}
|
||||
|
||||
public long totalMergeSizeInBytes() {
|
||||
return totalMergesSizeInBytes.count();
|
||||
}
|
||||
|
||||
public long currentMerges() {
|
||||
return currentMerges.count();
|
||||
}
|
||||
|
||||
public long currentMergesNumDocs() {
|
||||
return currentMergesNumDocs.count();
|
||||
}
|
||||
|
||||
public long currentMergesSizeInBytes() {
|
||||
return currentMergesSizeInBytes.count();
|
||||
}
|
||||
|
||||
@Override protected void doMerge(MergePolicy.OneMerge merge) throws IOException {
|
||||
int totalNumDocs = merge.totalNumDocs();
|
||||
long totalSizeInBytes = merge.totalBytesSize();
|
||||
long time = System.currentTimeMillis();
|
||||
currentMerges.inc();
|
||||
currentMergesNumDocs.inc(totalNumDocs);
|
||||
currentMergesSizeInBytes.inc(totalSizeInBytes);
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("merge [{}] starting...", merge.info.name);
|
||||
}
|
||||
try {
|
||||
super.doMerge(merge);
|
||||
} finally {
|
||||
currentMerges.dec();
|
||||
long took = System.currentTimeMillis() - time;
|
||||
|
||||
currentMerges.dec();
|
||||
currentMergesNumDocs.dec(totalNumDocs);
|
||||
currentMergesSizeInBytes.dec(totalSizeInBytes);
|
||||
|
||||
totalMergesNumDocs.inc(totalNumDocs);
|
||||
totalMergesSizeInBytes.inc(totalSizeInBytes);
|
||||
totalMerges.inc(took);
|
||||
if (took > 20000) { // if more than 20 seconds, DEBUG log it
|
||||
logger.debug("merge [{}] done, took [{}]", merge.info.name, TimeValue.timeValueMillis(took));
|
||||
|
|
|
@ -32,7 +32,11 @@ public class TrackingSerialMergeScheduler extends MergeScheduler {
|
|||
private final ESLogger logger;
|
||||
|
||||
private final MeanMetric totalMerges = new MeanMetric();
|
||||
private final CounterMetric totalMergesNumDocs = new CounterMetric();
|
||||
private final CounterMetric totalMergesSizeInBytes = new CounterMetric();
|
||||
private final CounterMetric currentMerges = new CounterMetric();
|
||||
private final CounterMetric currentMergesNumDocs = new CounterMetric();
|
||||
private final CounterMetric currentMergesSizeInBytes = new CounterMetric();
|
||||
|
||||
public TrackingSerialMergeScheduler(ESLogger logger) {
|
||||
this.logger = logger;
|
||||
|
@ -46,10 +50,26 @@ public class TrackingSerialMergeScheduler extends MergeScheduler {
|
|||
return totalMerges.sum();
|
||||
}
|
||||
|
||||
public long totalMergeNumDocs() {
|
||||
return totalMergesNumDocs.count();
|
||||
}
|
||||
|
||||
public long totalMergeSizeInBytes() {
|
||||
return totalMergesSizeInBytes.count();
|
||||
}
|
||||
|
||||
public long currentMerges() {
|
||||
return currentMerges.count();
|
||||
}
|
||||
|
||||
public long currentMergesNumDocs() {
|
||||
return currentMergesNumDocs.count();
|
||||
}
|
||||
|
||||
public long currentMergesSizeInBytes() {
|
||||
return currentMergesSizeInBytes.count();
|
||||
}
|
||||
|
||||
/**
|
||||
* Just do the merges in sequence. We do this
|
||||
* "synchronized" so that even if the application is using
|
||||
|
@ -66,13 +86,23 @@ public class TrackingSerialMergeScheduler extends MergeScheduler {
|
|||
logger.trace("merge [{}] starting...", merge.info.name);
|
||||
}
|
||||
|
||||
int totalNumDocs = merge.totalNumDocs();
|
||||
long totalSizeInBytes = merge.totalBytesSize();
|
||||
long time = System.currentTimeMillis();
|
||||
currentMerges.inc();
|
||||
currentMergesNumDocs.inc(totalNumDocs);
|
||||
currentMergesSizeInBytes.inc(totalSizeInBytes);
|
||||
try {
|
||||
writer.merge(merge);
|
||||
} finally {
|
||||
currentMerges.dec();
|
||||
long took = System.currentTimeMillis() - time;
|
||||
|
||||
currentMerges.dec();
|
||||
currentMergesNumDocs.dec(totalNumDocs);
|
||||
currentMergesSizeInBytes.dec(totalSizeInBytes);
|
||||
|
||||
totalMergesNumDocs.inc(totalNumDocs);
|
||||
totalMergesSizeInBytes.inc(totalSizeInBytes);
|
||||
totalMerges.inc(took);
|
||||
if (took > 20000) { // if more than 20 seconds, DEBUG log it
|
||||
logger.debug("merge [{}] done, took [{}]", merge.info.name, TimeValue.timeValueMillis(took));
|
||||
|
|
|
@ -22,6 +22,7 @@ package org.elasticsearch.index.merge;
|
|||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
|
@ -35,25 +36,25 @@ import java.io.IOException;
|
|||
public class MergeStats implements Streamable, ToXContent {
|
||||
|
||||
private long total;
|
||||
|
||||
private long current;
|
||||
|
||||
private long totalTimeInMillis;
|
||||
private long totalNumDocs;
|
||||
private long totalSizeInBytes;
|
||||
private long current;
|
||||
private long currentNumDocs;
|
||||
private long currentSizeInBytes;
|
||||
|
||||
public MergeStats() {
|
||||
|
||||
}
|
||||
|
||||
public MergeStats(long total, long current, long totalTimeInMillis) {
|
||||
this.total = total;
|
||||
this.current = current;
|
||||
this.totalTimeInMillis = totalTimeInMillis;
|
||||
}
|
||||
|
||||
public void add(long totalMerges, long currentMerges, long totalMergeTime) {
|
||||
public void add(long totalMerges, long totalMergeTime, long totalNumDocs, long totalSizeInBytes, long currentMerges, long currentNumDocs, long currentSizeInBytes) {
|
||||
this.total += totalMerges;
|
||||
this.current += currentMerges;
|
||||
this.totalTimeInMillis += totalMergeTime;
|
||||
this.totalNumDocs += totalNumDocs;
|
||||
this.totalSizeInBytes += totalSizeInBytes;
|
||||
this.current += currentMerges;
|
||||
this.currentNumDocs += currentNumDocs;
|
||||
this.currentSizeInBytes += currentSizeInBytes;
|
||||
}
|
||||
|
||||
public void add(MergeStats mergeStats) {
|
||||
|
@ -61,8 +62,12 @@ public class MergeStats implements Streamable, ToXContent {
|
|||
return;
|
||||
}
|
||||
this.total += mergeStats.total;
|
||||
this.current += mergeStats.current;
|
||||
this.totalTimeInMillis += mergeStats.totalTimeInMillis;
|
||||
this.totalNumDocs += mergeStats.totalNumDocs;
|
||||
this.totalSizeInBytes += mergeStats.totalSizeInBytes;
|
||||
this.current += mergeStats.current;
|
||||
this.currentNumDocs += mergeStats.currentNumDocs;
|
||||
this.currentSizeInBytes += mergeStats.currentSizeInBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -72,13 +77,6 @@ public class MergeStats implements Streamable, ToXContent {
|
|||
return this.total;
|
||||
}
|
||||
|
||||
/**
|
||||
* The current number of merges executing.
|
||||
*/
|
||||
public long current() {
|
||||
return this.current;
|
||||
}
|
||||
|
||||
/**
|
||||
* The total time merges have been executed (in milliseconds).
|
||||
*/
|
||||
|
@ -93,6 +91,37 @@ public class MergeStats implements Streamable, ToXContent {
|
|||
return new TimeValue(totalTimeInMillis);
|
||||
}
|
||||
|
||||
public long totalNumDocs() {
|
||||
return this.totalNumDocs;
|
||||
}
|
||||
|
||||
public long totalSizeInBytes() {
|
||||
return this.totalSizeInBytes;
|
||||
}
|
||||
|
||||
public ByteSizeValue totalSize() {
|
||||
return new ByteSizeValue(totalSizeInBytes);
|
||||
}
|
||||
|
||||
/**
|
||||
* The current number of merges executing.
|
||||
*/
|
||||
public long current() {
|
||||
return this.current;
|
||||
}
|
||||
|
||||
public long currentNumDocs() {
|
||||
return this.currentNumDocs;
|
||||
}
|
||||
|
||||
public long currentSizeInBytes() {
|
||||
return this.currentSizeInBytes;
|
||||
}
|
||||
|
||||
public ByteSizeValue currentSize() {
|
||||
return new ByteSizeValue(currentSizeInBytes);
|
||||
}
|
||||
|
||||
public static MergeStats readMergeStats(StreamInput in) throws IOException {
|
||||
MergeStats stats = new MergeStats();
|
||||
stats.readFrom(in);
|
||||
|
@ -102,9 +131,15 @@ public class MergeStats implements Streamable, ToXContent {
|
|||
@Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(Fields.MERGES);
|
||||
builder.field(Fields.CURRENT, current);
|
||||
builder.field(Fields.CURRENT_DOCS, currentNumDocs);
|
||||
builder.field(Fields.CURRENT_SIZE, currentSize().toString());
|
||||
builder.field(Fields.CURRENT_SIZE_IN_BYTES, currentSizeInBytes);
|
||||
builder.field(Fields.TOTAL, total);
|
||||
builder.field(Fields.TOTAL_TIME, totalTime().toString());
|
||||
builder.field(Fields.TOTAL_TIME_IN_MILLIS, totalTimeInMillis);
|
||||
builder.field(Fields.TOTAL_DOCS, totalNumDocs);
|
||||
builder.field(Fields.TOTAL_SIZE, totalSize().toString());
|
||||
builder.field(Fields.TOTAL_SIZE_IN_BYTES, totalSizeInBytes);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
@ -112,20 +147,34 @@ public class MergeStats implements Streamable, ToXContent {
|
|||
static final class Fields {
|
||||
static final XContentBuilderString MERGES = new XContentBuilderString("merges");
|
||||
static final XContentBuilderString CURRENT = new XContentBuilderString("current");
|
||||
static final XContentBuilderString CURRENT_DOCS = new XContentBuilderString("current_docs");
|
||||
static final XContentBuilderString CURRENT_SIZE = new XContentBuilderString("current_size");
|
||||
static final XContentBuilderString CURRENT_SIZE_IN_BYTES = new XContentBuilderString("current_size_in_bytes");
|
||||
static final XContentBuilderString TOTAL = new XContentBuilderString("total");
|
||||
static final XContentBuilderString TOTAL_TIME = new XContentBuilderString("total_time");
|
||||
static final XContentBuilderString TOTAL_TIME_IN_MILLIS = new XContentBuilderString("total_time_in_millis");
|
||||
static final XContentBuilderString TOTAL_DOCS = new XContentBuilderString("total_docs");
|
||||
static final XContentBuilderString TOTAL_SIZE = new XContentBuilderString("total_size");
|
||||
static final XContentBuilderString TOTAL_SIZE_IN_BYTES = new XContentBuilderString("total_size_in_bytes");
|
||||
}
|
||||
|
||||
@Override public void readFrom(StreamInput in) throws IOException {
|
||||
total = in.readVLong();
|
||||
current = in.readVLong();
|
||||
totalTimeInMillis = in.readVLong();
|
||||
totalNumDocs = in.readVLong();
|
||||
totalSizeInBytes = in.readVLong();
|
||||
current = in.readVLong();
|
||||
currentNumDocs = in.readVLong();
|
||||
currentSizeInBytes = in.readVLong();
|
||||
}
|
||||
|
||||
@Override public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVLong(total);
|
||||
out.writeVLong(current);
|
||||
out.writeVLong(totalTimeInMillis);
|
||||
out.writeVLong(totalNumDocs);
|
||||
out.writeVLong(totalSizeInBytes);
|
||||
out.writeVLong(current);
|
||||
out.writeVLong(currentNumDocs);
|
||||
out.writeVLong(currentSizeInBytes);
|
||||
}
|
||||
}
|
|
@ -65,7 +65,8 @@ public class ConcurrentMergeSchedulerProvider extends AbstractIndexShardComponen
|
|||
@Override public MergeStats stats() {
|
||||
MergeStats mergeStats = new MergeStats();
|
||||
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
|
||||
mergeStats.add(scheduler.totalMerges(), scheduler.currentMerges(), scheduler.totalMergeTime());
|
||||
mergeStats.add(scheduler.totalMerges(), scheduler.totalMergeTime(), scheduler.totalMergeNumDocs(), scheduler.totalMergeSizeInBytes(),
|
||||
scheduler.currentMerges(), scheduler.currentMergesNumDocs(), scheduler.currentMergesSizeInBytes());
|
||||
}
|
||||
return mergeStats;
|
||||
}
|
||||
|
|
|
@ -58,7 +58,8 @@ public class SerialMergeSchedulerProvider extends AbstractIndexShardComponent im
|
|||
@Override public MergeStats stats() {
|
||||
MergeStats mergeStats = new MergeStats();
|
||||
for (CustomSerialMergeScheduler scheduler : schedulers) {
|
||||
mergeStats.add(scheduler.totalMerges(), scheduler.currentMerges(), scheduler.totalMergeTime());
|
||||
mergeStats.add(scheduler.totalMerges(), scheduler.totalMergeTime(), scheduler.totalMergeNumDocs(), scheduler.totalMergeSizeInBytes(),
|
||||
scheduler.currentMerges(), scheduler.currentMergesNumDocs(), scheduler.currentMergesSizeInBytes());
|
||||
}
|
||||
return mergeStats;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue