expose current CMS throttle in merge stats; fix tests, docs; also log per-merge stop/throttle/rate
This commit is contained in:
parent
31e6acf3f2
commit
1aad275c55
|
@ -25,7 +25,7 @@ of `indices`, `os`, `process`, `jvm`, `network`, `transport`, `http`,
|
|||
[horizontal]
|
||||
`indices`::
|
||||
Indices stats about size, document count, indexing and
|
||||
deletion times, search times, field cache size , merges and flushes
|
||||
deletion times, search times, field cache size, merges and flushes
|
||||
|
||||
`fs`::
|
||||
File system information, data path, free disk space, read/write
|
||||
|
|
|
@ -48,8 +48,6 @@ Will return, for example:
|
|||
"store": {
|
||||
"size": "5.6kb",
|
||||
"size_in_bytes": 5770,
|
||||
"throttle_time": "0s",
|
||||
"throttle_time_in_millis": 0
|
||||
},
|
||||
"fielddata": {
|
||||
"memory_size": "0b",
|
||||
|
|
|
@ -183,15 +183,6 @@ due to forced awareness or allocation filtering.
|
|||
`indices.recovery.max_bytes_per_sec`::
|
||||
See <<modules-indices>>
|
||||
|
||||
[float]
|
||||
==== Store level throttling
|
||||
|
||||
`indices.store.throttle.type`::
|
||||
See <<index-modules-store>>
|
||||
|
||||
`indices.store.throttle.max_bytes_per_sec`::
|
||||
See <<index-modules-store>>
|
||||
|
||||
[float]
|
||||
[[logger]]
|
||||
=== Logger
|
||||
|
|
|
@ -7,12 +7,6 @@ where the index data is stored, and are immutable up to delete markers.
|
|||
Segments are, periodically, merged into larger segments to keep the
|
||||
index size at bay and expunge deletes.
|
||||
|
||||
The more segments one has in the Lucene index means slower searches and
|
||||
more memory used. Segment merging is used to reduce the number of segments,
|
||||
however merges can be expensive to perform, especially on low IO environments.
|
||||
Merges can be throttled using <<store-throttling,store level throttling>>.
|
||||
|
||||
|
||||
[float]
|
||||
[[policy]]
|
||||
=== Policy
|
||||
|
@ -194,10 +188,21 @@ scheduler supports this setting:
|
|||
`index.merge.scheduler.max_thread_count`::
|
||||
|
||||
The maximum number of threads that may be merging at once. Defaults to
|
||||
`Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors() / 2))`
|
||||
`Math.max(1, Math.min(4, Runtime.getRuntime().availableProcessors() / 2))`
|
||||
which works well for a good solid-state-disk (SSD). If your index is on
|
||||
spinning platter drives instead, decrease this to 1.
|
||||
|
||||
`index.merge.scheduler.auto_throttle`::
|
||||
|
||||
If this is true (the default), then the merge scheduler will
|
||||
rate-limit IO (writes) for merges to an adaptive value depending on
|
||||
how many merges are requested over time. An application with a low
|
||||
indexing rate that unluckily suddenly requires a large merge will see
|
||||
that merge aggressively throttled, while an application doing heavy
|
||||
indexing will see the throttle move higher to allow merges to keep up
|
||||
with ongoing indexing. This is a dynamic setting (you can <<../indices/update-settings,change it
|
||||
at any time on a running index>>).
|
||||
|
||||
[float]
|
||||
==== SerialMergeScheduler
|
||||
|
||||
|
|
|
@ -90,6 +90,9 @@ settings API:
|
|||
All the settings for the merge policy currently configured.
|
||||
A different merge policy can't be set.
|
||||
|
||||
`index.merge.scheduler.*`::
|
||||
All the settings for the merge scheduler.
|
||||
|
||||
`index.routing.allocation.include.*`::
|
||||
A node matching any rule will be allowed to host shards from the index.
|
||||
|
||||
|
|
|
@ -60,17 +60,3 @@ The following settings can be set to manage the recovery policy:
|
|||
|
||||
`indices.recovery.max_bytes_per_sec`::
|
||||
defaults to `20mb`.
|
||||
|
||||
[float]
|
||||
[[throttling]]
|
||||
=== Store level throttling
|
||||
|
||||
The following settings can be set to control the store throttling:
|
||||
|
||||
[horizontal]
|
||||
`indices.store.throttle.type`::
|
||||
could be `merge` (default), `none` or `all`. See <<index-modules-store>>.
|
||||
|
||||
`indices.store.throttle.max_bytes_per_sec`::
|
||||
defaults to `20mb`.
|
||||
|
||||
|
|
|
@ -129,15 +129,22 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
|
|||
totalMergesSizeInBytes.inc(totalSizeInBytes);
|
||||
totalMerges.inc(took);
|
||||
|
||||
totalMergeStoppedTime.inc(merge.rateLimiter.getTotalStoppedNS()/1000000);
|
||||
totalMergeThrottledTime.inc(merge.rateLimiter.getTotalPausedNS()/1000000);
|
||||
long stoppedMS = merge.rateLimiter.getTotalStoppedNS()/1000000;
|
||||
long throttledMS = merge.rateLimiter.getTotalPausedNS()/1000000;
|
||||
|
||||
totalMergeStoppedTime.inc(stoppedMS);
|
||||
totalMergeThrottledTime.inc(throttledMS);
|
||||
|
||||
String message = String.format(Locale.ROOT,
|
||||
"merge segment [%s] done: took [%s], [%,.1f MB], [%,d docs]",
|
||||
"merge segment [%s] done: took [%s], [%,.1f MB], [%,d docs], [%s stopped], [%s throttled], [%,.1f MB written], [%,.1f MB/sec throttle]",
|
||||
merge.info == null ? "_na_" : merge.info.info.name,
|
||||
TimeValue.timeValueMillis(took),
|
||||
totalSizeInBytes/1024f/1024f,
|
||||
totalNumDocs);
|
||||
totalNumDocs,
|
||||
TimeValue.timeValueMillis(stoppedMS),
|
||||
TimeValue.timeValueMillis(throttledMS),
|
||||
merge.rateLimiter.getTotalBytesWritten()/1024f/1024f,
|
||||
merge.rateLimiter.getMBPerSec());
|
||||
|
||||
if (took > 20000) { // if more than 20 seconds, DEBUG log it
|
||||
logger.debug(message);
|
||||
|
|
|
@ -47,12 +47,14 @@ public class MergeStats implements Streamable, ToXContent {
|
|||
/** Total millis that we slept during writes so merge IO is throttled. */
|
||||
private long totalThrottledTimeInMillis;
|
||||
|
||||
private long totalBytesPerSecAutoThrottle;
|
||||
|
||||
public MergeStats() {
|
||||
|
||||
}
|
||||
|
||||
public void add(long totalMerges, long totalMergeTime, long totalNumDocs, long totalSizeInBytes, long currentMerges, long currentNumDocs, long currentSizeInBytes,
|
||||
long stoppedTimeMillis, long throttledTimeMillis) {
|
||||
long stoppedTimeMillis, long throttledTimeMillis, double mbPerSecAutoThrottle) {
|
||||
this.total += totalMerges;
|
||||
this.totalTimeInMillis += totalMergeTime;
|
||||
this.totalNumDocs += totalNumDocs;
|
||||
|
@ -62,6 +64,7 @@ public class MergeStats implements Streamable, ToXContent {
|
|||
this.currentSizeInBytes += currentSizeInBytes;
|
||||
this.totalStoppedTimeInMillis += stoppedTimeMillis;
|
||||
this.totalThrottledTimeInMillis += throttledTimeMillis;
|
||||
this.totalBytesPerSecAutoThrottle += (long) (mbPerSecAutoThrottle * 1024 * 1024);
|
||||
}
|
||||
|
||||
public void add(MergeStats mergeStats) {
|
||||
|
@ -77,6 +80,7 @@ public class MergeStats implements Streamable, ToXContent {
|
|||
this.currentSizeInBytes += mergeStats.currentSizeInBytes;
|
||||
this.totalStoppedTimeInMillis += mergeStats.totalStoppedTimeInMillis;
|
||||
this.totalThrottledTimeInMillis += mergeStats.totalThrottledTimeInMillis;
|
||||
this.totalBytesPerSecAutoThrottle += mergeStats.totalBytesPerSecAutoThrottle;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -140,6 +144,10 @@ public class MergeStats implements Streamable, ToXContent {
|
|||
return new ByteSizeValue(totalSizeInBytes);
|
||||
}
|
||||
|
||||
public long getTotalBytesPerSecAutoThrottle() {
|
||||
return totalBytesPerSecAutoThrottle;
|
||||
}
|
||||
|
||||
/**
|
||||
* The current number of merges executing.
|
||||
*/
|
||||
|
@ -177,6 +185,7 @@ public class MergeStats implements Streamable, ToXContent {
|
|||
builder.byteSizeField(Fields.TOTAL_SIZE_IN_BYTES, Fields.TOTAL_SIZE, totalSizeInBytes);
|
||||
builder.timeValueField(Fields.TOTAL_STOPPED_TIME_IN_MILLIS, Fields.TOTAL_STOPPED_TIME, totalStoppedTimeInMillis);
|
||||
builder.timeValueField(Fields.TOTAL_THROTTLED_TIME_IN_MILLIS, Fields.TOTAL_THROTTLED_TIME, totalThrottledTimeInMillis);
|
||||
builder.byteSizeField(Fields.TOTAL_THROTTLE_BYTES_PER_SEC_IN_BYTES, Fields.TOTAL_THROTTLE_BYTES_PER_SEC, totalBytesPerSecAutoThrottle);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
|
@ -197,6 +206,8 @@ public class MergeStats implements Streamable, ToXContent {
|
|||
static final XContentBuilderString TOTAL_DOCS = new XContentBuilderString("total_docs");
|
||||
static final XContentBuilderString TOTAL_SIZE = new XContentBuilderString("total_size");
|
||||
static final XContentBuilderString TOTAL_SIZE_IN_BYTES = new XContentBuilderString("total_size_in_bytes");
|
||||
static final XContentBuilderString TOTAL_THROTTLE_BYTES_PER_SEC_IN_BYTES = new XContentBuilderString("total_auto_throttle_in_bytes");
|
||||
static final XContentBuilderString TOTAL_THROTTLE_BYTES_PER_SEC = new XContentBuilderString("total_auto_throttle");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -211,6 +222,7 @@ public class MergeStats implements Streamable, ToXContent {
|
|||
if (in.getVersion().onOrAfter(Version.V_2_0_0)) {
|
||||
totalStoppedTimeInMillis = in.readVLong();
|
||||
totalThrottledTimeInMillis = in.readVLong();
|
||||
totalBytesPerSecAutoThrottle = in.readVLong();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -226,6 +238,7 @@ public class MergeStats implements Streamable, ToXContent {
|
|||
if (out.getVersion().onOrAfter(Version.V_2_0_0)) {
|
||||
out.writeVLong(totalStoppedTimeInMillis);
|
||||
out.writeVLong(totalThrottledTimeInMillis);
|
||||
out.writeVLong(totalBytesPerSecAutoThrottle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -86,11 +86,13 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
|
|||
@Override
|
||||
public MergeStats stats() {
|
||||
MergeStats mergeStats = new MergeStats();
|
||||
// TODO: why would there be more than one CMS for a single shard...?
|
||||
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
|
||||
mergeStats.add(scheduler.totalMerges(), scheduler.totalMergeTime(), scheduler.totalMergeNumDocs(), scheduler.totalMergeSizeInBytes(),
|
||||
scheduler.currentMerges(), scheduler.currentMergesNumDocs(), scheduler.currentMergesSizeInBytes(),
|
||||
scheduler.totalMergeStoppedTimeMillis(),
|
||||
scheduler.totalMergeThrottledTimeMillis());
|
||||
scheduler.totalMergeThrottledTimeMillis(),
|
||||
autoThrottle ? scheduler.getIORateLimitMBPerSec() : Double.POSITIVE_INFINITY);
|
||||
}
|
||||
return mergeStats;
|
||||
}
|
||||
|
|
|
@ -188,6 +188,7 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
|
|||
.put(ConcurrentMergeSchedulerProvider.AUTO_THROTTLE, "no"))
|
||||
.get();
|
||||
|
||||
// Make sure we log the change:
|
||||
assertTrue(mockAppender.sawUpdateAutoThrottle);
|
||||
|
||||
// Make sure setting says it is in fact changed:
|
||||
|
@ -237,17 +238,9 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
|
|||
// Make sure we log the change:
|
||||
assertTrue(mockAppender.sawUpdateMaxThreadCount);
|
||||
|
||||
client()
|
||||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(ImmutableSettings.builder()
|
||||
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "3")
|
||||
)
|
||||
.get();
|
||||
|
||||
// Wait for merges to finish
|
||||
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get();
|
||||
// Make sure setting says it is in fact changed:
|
||||
GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings("test").get();
|
||||
assertThat(getSettingsResponse.getSetting("test", ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT), equalTo("1"));
|
||||
|
||||
} finally {
|
||||
rootLogger.removeAppender(mockAppender);
|
||||
|
|
|
@ -442,6 +442,7 @@ public class IndexStatsTests extends ElasticsearchIntegrationTest {
|
|||
assertThat(stats.getTotal().getMerge().getTotal(), greaterThan(0l));
|
||||
assertThat(stats.getTotal().getMerge().getTotalStoppedTime(), notNullValue());
|
||||
assertThat(stats.getTotal().getMerge().getTotalThrottledTime(), notNullValue());
|
||||
assertThat(stats.getTotal().getMerge().getTotalBytesPerSecAutoThrottle(), greaterThan(0l));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue