Add periodic flush count to flush stats (#29360)
Currently, a flush stats contains only the total flush which is the sum of manual flush (via API) and periodic flush (async triggered when the uncommitted translog size is exceeded the flush threshold). Sometimes, it's useful to know these two numbers independently. This commit tracks and returns a periodic flush count in a flush stats.
This commit is contained in:
parent
6a6c0ea5e6
commit
4e6a8900a3
|
@ -21,3 +21,34 @@
|
|||
indices.stats: {level: shards}
|
||||
|
||||
- is_true: indices.testing.shards.0.0.commit.user_data.sync_id
|
||||
|
||||
---
|
||||
"Flush stats":
|
||||
- skip:
|
||||
version: " - 6.99.99"
|
||||
reason: periodic flush stats is introduced in 7.0
|
||||
- do:
|
||||
indices.create:
|
||||
index: test
|
||||
body:
|
||||
settings:
|
||||
number_of_shards: 1
|
||||
index.translog.flush_threshold_size: 160b
|
||||
- do:
|
||||
indices.flush:
|
||||
index: test
|
||||
- do:
|
||||
indices.stats: { index: test }
|
||||
- match: { indices.test.primaries.flush.periodic: 0 }
|
||||
- match: { indices.test.primaries.flush.total: 1 }
|
||||
- do:
|
||||
index:
|
||||
index: test
|
||||
type: doc
|
||||
id: 1
|
||||
body: { "message": "a long message to make a periodic flush happen after this index operation" }
|
||||
- do:
|
||||
indices.stats: { index: test }
|
||||
# periodic flush is async
|
||||
- gte: { indices.test.primaries.flush.periodic: 0 }
|
||||
- gte: { indices.test.primaries.flush.total: 1 }
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.index.flush;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
|
@ -31,20 +32,22 @@ import java.io.IOException;
|
|||
public class FlushStats implements Streamable, ToXContentFragment {
|
||||
|
||||
private long total;
|
||||
|
||||
private long periodic;
|
||||
private long totalTimeInMillis;
|
||||
|
||||
public FlushStats() {
|
||||
|
||||
}
|
||||
|
||||
public FlushStats(long total, long totalTimeInMillis) {
|
||||
public FlushStats(long total, long periodic, long totalTimeInMillis) {
|
||||
this.total = total;
|
||||
this.periodic = periodic;
|
||||
this.totalTimeInMillis = totalTimeInMillis;
|
||||
}
|
||||
|
||||
public void add(long total, long totalTimeInMillis) {
|
||||
public void add(long total, long periodic, long totalTimeInMillis) {
|
||||
this.total += total;
|
||||
this.periodic += periodic;
|
||||
this.totalTimeInMillis += totalTimeInMillis;
|
||||
}
|
||||
|
||||
|
@ -57,6 +60,7 @@ public class FlushStats implements Streamable, ToXContentFragment {
|
|||
return;
|
||||
}
|
||||
this.total += flushStats.total;
|
||||
this.periodic += flushStats.periodic;
|
||||
this.totalTimeInMillis += flushStats.totalTimeInMillis;
|
||||
}
|
||||
|
||||
|
@ -67,6 +71,13 @@ public class FlushStats implements Streamable, ToXContentFragment {
|
|||
return this.total;
|
||||
}
|
||||
|
||||
/**
|
||||
* The number of flushes that were periodically triggered when translog exceeded the flush threshold.
|
||||
*/
|
||||
public long getPeriodic() {
|
||||
return periodic;
|
||||
}
|
||||
|
||||
/**
|
||||
* The total time merges have been executed (in milliseconds).
|
||||
*/
|
||||
|
@ -85,6 +96,7 @@ public class FlushStats implements Streamable, ToXContentFragment {
|
|||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(Fields.FLUSH);
|
||||
builder.field(Fields.TOTAL, total);
|
||||
builder.field(Fields.PERIODIC, periodic);
|
||||
builder.humanReadableField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, getTotalTime());
|
||||
builder.endObject();
|
||||
return builder;
|
||||
|
@ -93,6 +105,7 @@ public class FlushStats implements Streamable, ToXContentFragment {
|
|||
static final class Fields {
|
||||
static final String FLUSH = "flush";
|
||||
static final String TOTAL = "total";
|
||||
static final String PERIODIC = "periodic";
|
||||
static final String TOTAL_TIME = "total_time";
|
||||
static final String TOTAL_TIME_IN_MILLIS = "total_time_in_millis";
|
||||
}
|
||||
|
@ -101,11 +114,17 @@ public class FlushStats implements Streamable, ToXContentFragment {
|
|||
public void readFrom(StreamInput in) throws IOException {
|
||||
total = in.readVLong();
|
||||
totalTimeInMillis = in.readVLong();
|
||||
if (in.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
|
||||
periodic = in.readVLong();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVLong(total);
|
||||
out.writeVLong(totalTimeInMillis);
|
||||
if (out.getVersion().onOrAfter(Version.V_7_0_0_alpha1)) {
|
||||
out.writeVLong(periodic);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,6 +57,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
|||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.lease.Releasables;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
import org.elasticsearch.common.metrics.CounterMetric;
|
||||
import org.elasticsearch.common.metrics.MeanMetric;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
|
@ -208,6 +209,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
private final RecoveryStats recoveryStats = new RecoveryStats();
|
||||
private final MeanMetric refreshMetric = new MeanMetric();
|
||||
private final MeanMetric flushMetric = new MeanMetric();
|
||||
private final CounterMetric periodicFlushMetric = new CounterMetric();
|
||||
|
||||
private final ShardEventListener shardEventListener = new ShardEventListener();
|
||||
|
||||
|
@ -827,7 +829,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
}
|
||||
|
||||
public FlushStats flushStats() {
|
||||
return new FlushStats(flushMetric.count(), TimeUnit.NANOSECONDS.toMillis(flushMetric.sum()));
|
||||
return new FlushStats(flushMetric.count(), periodicFlushMetric.count(), TimeUnit.NANOSECONDS.toMillis(flushMetric.sum()));
|
||||
}
|
||||
|
||||
public DocsStats docStats() {
|
||||
|
@ -2344,6 +2346,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
|
|||
@Override
|
||||
protected void doRun() throws IOException {
|
||||
flush(new FlushRequest());
|
||||
periodicFlushMetric.inc();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -19,7 +19,6 @@
|
|||
package org.elasticsearch.index.shard;
|
||||
|
||||
import org.apache.lucene.store.LockObtainFailedException;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
|
@ -42,6 +41,7 @@ import org.elasticsearch.cluster.routing.TestShardRouting;
|
|||
import org.elasticsearch.cluster.routing.UnassignedInfo;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.CheckedRunnable;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.breaker.CircuitBreaker;
|
||||
import org.elasticsearch.common.bytes.BytesArray;
|
||||
import org.elasticsearch.common.lucene.uid.Versions;
|
||||
|
@ -50,6 +50,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
|||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.core.internal.io.IOUtils;
|
||||
import org.elasticsearch.env.Environment;
|
||||
import org.elasticsearch.env.NodeEnvironment;
|
||||
import org.elasticsearch.env.ShardLock;
|
||||
|
@ -102,6 +103,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcke
|
|||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoSearchHits;
|
||||
import static org.hamcrest.Matchers.allOf;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
@ -347,6 +349,7 @@ public class IndexShardIT extends ESSingleNodeTestCase {
|
|||
.setRefreshPolicy(randomBoolean() ? IMMEDIATE : NONE).get();
|
||||
assertBusy(() -> { // this is async
|
||||
assertFalse(shard.shouldPeriodicallyFlush());
|
||||
assertThat(shard.flushStats().getPeriodic(), greaterThan(0L));
|
||||
});
|
||||
assertEquals(0, translog.stats().getUncommittedOperations());
|
||||
translog.sync();
|
||||
|
@ -444,8 +447,12 @@ public class IndexShardIT extends ESSingleNodeTestCase {
|
|||
if (flush) {
|
||||
final FlushStats flushStats = shard.flushStats();
|
||||
final long total = flushStats.getTotal();
|
||||
final long periodic = flushStats.getPeriodic();
|
||||
client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get();
|
||||
check = () -> assertEquals(total + 1, shard.flushStats().getTotal());
|
||||
check = () -> {
|
||||
assertThat(shard.flushStats().getTotal(), equalTo(total + 1));
|
||||
assertThat(shard.flushStats().getPeriodic(), equalTo(periodic + 1));
|
||||
};
|
||||
} else {
|
||||
final long generation = shard.getEngine().getTranslog().currentFileGeneration();
|
||||
client().prepareIndex("test", "test", "1").setSource("{}", XContentType.JSON).get();
|
||||
|
@ -461,6 +468,30 @@ public class IndexShardIT extends ESSingleNodeTestCase {
|
|||
check.run();
|
||||
}
|
||||
|
||||
public void testFlushStats() throws Exception {
|
||||
final IndexService indexService = createIndex("test");
|
||||
ensureGreen();
|
||||
Settings settings = Settings.builder().put("index.translog.flush_threshold_size", "" + between(200, 300) + "b").build();
|
||||
client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get();
|
||||
final int numDocs = between(10, 100);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
client().prepareIndex("test", "doc", Integer.toString(i)).setSource("{}", XContentType.JSON).get();
|
||||
}
|
||||
// A flush stats may include the new total count but the old period count - assert eventually.
|
||||
assertBusy(() -> {
|
||||
final FlushStats flushStats = client().admin().indices().prepareStats("test").clear().setFlush(true).get().getTotal().flush;
|
||||
assertThat(flushStats.getPeriodic(), allOf(equalTo(flushStats.getTotal()), greaterThan(0L)));
|
||||
});
|
||||
assertBusy(() -> assertThat(indexService.getShard(0).shouldPeriodicallyFlush(), equalTo(false)));
|
||||
settings = Settings.builder().put("index.translog.flush_threshold_size", (String) null).build();
|
||||
client().admin().indices().prepareUpdateSettings("test").setSettings(settings).get();
|
||||
|
||||
client().prepareIndex("test", "doc", UUIDs.randomBase64UUID()).setSource("{}", XContentType.JSON).get();
|
||||
client().admin().indices().prepareFlush("test").setForce(randomBoolean()).setWaitIfOngoing(true).get();
|
||||
final FlushStats flushStats = client().admin().indices().prepareStats("test").clear().setFlush(true).get().getTotal().flush;
|
||||
assertThat(flushStats.getTotal(), greaterThan(flushStats.getPeriodic()));
|
||||
}
|
||||
|
||||
public void testShardHasMemoryBufferOnTranslogRecover() throws Throwable {
|
||||
createIndex("test");
|
||||
ensureGreen();
|
||||
|
|
Loading…
Reference in New Issue