Core: switch to auto IO throttle for merges
This adds a new boolean (index.merge.scheduler.auto_throttle) dynamic setting, default true (matching Lucene), to adaptively set the IO rate limit for merges over time. This is more flexible than the previous fixed rate throttling because it responds depending on the incoming merge rate, so search-heavy applications that are not doing much indexing will see merges heavily throttled while indexing-heavy cases will lighten the throttle so merges can keep up within incoming indexing. The fixed rate throttling is still available as a fallback if things go horribly wrong. Closes #9243 Closes #9133
This commit is contained in:
commit
b9358ccca8
|
@ -25,7 +25,7 @@ of `indices`, `os`, `process`, `jvm`, `network`, `transport`, `http`,
|
||||||
[horizontal]
|
[horizontal]
|
||||||
`indices`::
|
`indices`::
|
||||||
Indices stats about size, document count, indexing and
|
Indices stats about size, document count, indexing and
|
||||||
deletion times, search times, field cache size , merges and flushes
|
deletion times, search times, field cache size, merges and flushes
|
||||||
|
|
||||||
`fs`::
|
`fs`::
|
||||||
File system information, data path, free disk space, read/write
|
File system information, data path, free disk space, read/write
|
||||||
|
|
|
@ -183,15 +183,6 @@ due to forced awareness or allocation filtering.
|
||||||
`indices.recovery.max_bytes_per_sec`::
|
`indices.recovery.max_bytes_per_sec`::
|
||||||
See <<modules-indices>>
|
See <<modules-indices>>
|
||||||
|
|
||||||
[float]
|
|
||||||
==== Store level throttling
|
|
||||||
|
|
||||||
`indices.store.throttle.type`::
|
|
||||||
See <<index-modules-store>>
|
|
||||||
|
|
||||||
`indices.store.throttle.max_bytes_per_sec`::
|
|
||||||
See <<index-modules-store>>
|
|
||||||
|
|
||||||
[float]
|
[float]
|
||||||
[[logger]]
|
[[logger]]
|
||||||
=== Logger
|
=== Logger
|
||||||
|
|
|
@ -7,12 +7,6 @@ where the index data is stored, and are immutable up to delete markers.
|
||||||
Segments are, periodically, merged into larger segments to keep the
|
Segments are, periodically, merged into larger segments to keep the
|
||||||
index size at bay and expunge deletes.
|
index size at bay and expunge deletes.
|
||||||
|
|
||||||
The more segments one has in the Lucene index means slower searches and
|
|
||||||
more memory used. Segment merging is used to reduce the number of segments,
|
|
||||||
however merges can be expensive to perform, especially on low IO environments.
|
|
||||||
Merges can be throttled using <<store-throttling,store level throttling>>.
|
|
||||||
|
|
||||||
|
|
||||||
[float]
|
[float]
|
||||||
[[policy]]
|
[[policy]]
|
||||||
=== Policy
|
=== Policy
|
||||||
|
@ -194,10 +188,21 @@ scheduler supports this setting:
|
||||||
`index.merge.scheduler.max_thread_count`::
|
`index.merge.scheduler.max_thread_count`::
|
||||||
|
|
||||||
The maximum number of threads that may be merging at once. Defaults to
|
The maximum number of threads that may be merging at once. Defaults to
|
||||||
`Math.max(1, Math.min(3, Runtime.getRuntime().availableProcessors() / 2))`
|
`Math.max(1, Math.min(4, Runtime.getRuntime().availableProcessors() / 2))`
|
||||||
which works well for a good solid-state-disk (SSD). If your index is on
|
which works well for a good solid-state-disk (SSD). If your index is on
|
||||||
spinning platter drives instead, decrease this to 1.
|
spinning platter drives instead, decrease this to 1.
|
||||||
|
|
||||||
|
`index.merge.scheduler.auto_throttle`::
|
||||||
|
|
||||||
|
If this is true (the default), then the merge scheduler will
|
||||||
|
rate-limit IO (writes) for merges to an adaptive value depending on
|
||||||
|
how many merges are requested over time. An application with a low
|
||||||
|
indexing rate that unluckily suddenly requires a large merge will see
|
||||||
|
that merge aggressively throttled, while an application doing heavy
|
||||||
|
indexing will see the throttle move higher to allow merges to keep up
|
||||||
|
with ongoing indexing. This is a dynamic setting (you can <<../indices/update-settings,change it
|
||||||
|
at any time on a running index>>).
|
||||||
|
|
||||||
[float]
|
[float]
|
||||||
==== SerialMergeScheduler
|
==== SerialMergeScheduler
|
||||||
|
|
||||||
|
|
|
@ -19,37 +19,6 @@ to the fact that there is no need for extra large JVM heaps (with their
|
||||||
own consequences) for storing the index in memory.
|
own consequences) for storing the index in memory.
|
||||||
|
|
||||||
|
|
||||||
[float]
|
|
||||||
[[store-throttling]]
|
|
||||||
=== Store Level Throttling
|
|
||||||
|
|
||||||
The way Lucene, the IR library elasticsearch uses under the covers,
|
|
||||||
works is by creating immutable segments (up to deletes) and constantly
|
|
||||||
merging them (the merge policy settings allow to control how those
|
|
||||||
merges happen). The merge process happens in an asynchronous manner
|
|
||||||
without affecting the indexing / search speed. The problem though,
|
|
||||||
especially on systems with low IO, is that the merge process can be
|
|
||||||
expensive and affect search / index operation simply by the fact that
|
|
||||||
the box is now taxed with more IO happening.
|
|
||||||
|
|
||||||
The store module allows to have throttling configured for merges (or
|
|
||||||
all) either on the node level, or on the index level. The node level
|
|
||||||
throttling will make sure that out of all the shards allocated on that
|
|
||||||
node, the merge process won't pass the specific setting bytes per
|
|
||||||
second. It can be set by setting `indices.store.throttle.type` to
|
|
||||||
`merge`, and setting `indices.store.throttle.max_bytes_per_sec` to
|
|
||||||
something like `5mb`. The node level settings can be changed dynamically
|
|
||||||
using the cluster update settings API. The default is set
|
|
||||||
to `20mb` with type `merge`.
|
|
||||||
|
|
||||||
If specific index level configuration is needed, regardless of the node
|
|
||||||
level settings, it can be set as well using the
|
|
||||||
`index.store.throttle.type`, and
|
|
||||||
`index.store.throttle.max_bytes_per_sec`. The default value for the type
|
|
||||||
is `node`, meaning it will throttle based on the node level settings and
|
|
||||||
participate in the global throttling happening. Both settings can be set
|
|
||||||
using the index update settings API dynamically.
|
|
||||||
|
|
||||||
[float]
|
[float]
|
||||||
[[file-system]]
|
[[file-system]]
|
||||||
=== File system storage types
|
=== File system storage types
|
||||||
|
|
|
@ -90,6 +90,9 @@ settings API:
|
||||||
All the settings for the merge policy currently configured.
|
All the settings for the merge policy currently configured.
|
||||||
A different merge policy can't be set.
|
A different merge policy can't be set.
|
||||||
|
|
||||||
|
`index.merge.scheduler.*`::
|
||||||
|
All the settings for the merge scheduler.
|
||||||
|
|
||||||
`index.routing.allocation.include.*`::
|
`index.routing.allocation.include.*`::
|
||||||
A node matching any rule will be allowed to host shards from the index.
|
A node matching any rule will be allowed to host shards from the index.
|
||||||
|
|
||||||
|
|
|
@ -61,16 +61,3 @@ The following settings can be set to manage the recovery policy:
|
||||||
`indices.recovery.max_bytes_per_sec`::
|
`indices.recovery.max_bytes_per_sec`::
|
||||||
defaults to `20mb`.
|
defaults to `20mb`.
|
||||||
|
|
||||||
[float]
|
|
||||||
[[throttling]]
|
|
||||||
=== Store level throttling
|
|
||||||
|
|
||||||
The following settings can be set to control the store throttling:
|
|
||||||
|
|
||||||
[horizontal]
|
|
||||||
`indices.store.throttle.type`::
|
|
||||||
could be `merge` (default), `none` or `all`. See <<index-modules-store>>.
|
|
||||||
|
|
||||||
`indices.store.throttle.max_bytes_per_sec`::
|
|
||||||
defaults to `20mb`.
|
|
||||||
|
|
||||||
|
|
|
@ -46,6 +46,8 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
|
||||||
private final CounterMetric currentMerges = new CounterMetric();
|
private final CounterMetric currentMerges = new CounterMetric();
|
||||||
private final CounterMetric currentMergesNumDocs = new CounterMetric();
|
private final CounterMetric currentMergesNumDocs = new CounterMetric();
|
||||||
private final CounterMetric currentMergesSizeInBytes = new CounterMetric();
|
private final CounterMetric currentMergesSizeInBytes = new CounterMetric();
|
||||||
|
private final CounterMetric totalMergeStoppedTime = new CounterMetric();
|
||||||
|
private final CounterMetric totalMergeThrottledTime = new CounterMetric();
|
||||||
|
|
||||||
private final Set<OnGoingMerge> onGoingMerges = ConcurrentCollections.newConcurrentSet();
|
private final Set<OnGoingMerge> onGoingMerges = ConcurrentCollections.newConcurrentSet();
|
||||||
private final Set<OnGoingMerge> readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges);
|
private final Set<OnGoingMerge> readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges);
|
||||||
|
@ -83,6 +85,14 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
|
||||||
return currentMergesSizeInBytes.count();
|
return currentMergesSizeInBytes.count();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long totalMergeStoppedTimeMillis() {
|
||||||
|
return totalMergeStoppedTime.count();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long totalMergeThrottledTimeMillis() {
|
||||||
|
return totalMergeThrottledTime.count();
|
||||||
|
}
|
||||||
|
|
||||||
public Set<OnGoingMerge> onGoingMerges() {
|
public Set<OnGoingMerge> onGoingMerges() {
|
||||||
return readOnlyOnGoingMerges;
|
return readOnlyOnGoingMerges;
|
||||||
}
|
}
|
||||||
|
@ -118,12 +128,23 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
|
||||||
totalMergesNumDocs.inc(totalNumDocs);
|
totalMergesNumDocs.inc(totalNumDocs);
|
||||||
totalMergesSizeInBytes.inc(totalSizeInBytes);
|
totalMergesSizeInBytes.inc(totalSizeInBytes);
|
||||||
totalMerges.inc(took);
|
totalMerges.inc(took);
|
||||||
|
|
||||||
|
long stoppedMS = merge.rateLimiter.getTotalStoppedNS()/1000000;
|
||||||
|
long throttledMS = merge.rateLimiter.getTotalPausedNS()/1000000;
|
||||||
|
|
||||||
|
totalMergeStoppedTime.inc(stoppedMS);
|
||||||
|
totalMergeThrottledTime.inc(throttledMS);
|
||||||
|
|
||||||
String message = String.format(Locale.ROOT,
|
String message = String.format(Locale.ROOT,
|
||||||
"merge segment [%s] done: took [%s], [%,.1f MB], [%,d docs]",
|
"merge segment [%s] done: took [%s], [%,.1f MB], [%,d docs], [%s stopped], [%s throttled], [%,.1f MB written], [%,.1f MB/sec throttle]",
|
||||||
merge.info == null ? "_na_" : merge.info.info.name,
|
merge.info == null ? "_na_" : merge.info.info.name,
|
||||||
TimeValue.timeValueMillis(took),
|
TimeValue.timeValueMillis(took),
|
||||||
totalSizeInBytes/1024f/1024f,
|
totalSizeInBytes/1024f/1024f,
|
||||||
totalNumDocs);
|
totalNumDocs,
|
||||||
|
TimeValue.timeValueMillis(stoppedMS),
|
||||||
|
TimeValue.timeValueMillis(throttledMS),
|
||||||
|
merge.rateLimiter.getTotalBytesWritten()/1024f/1024f,
|
||||||
|
merge.rateLimiter.getMBPerSec());
|
||||||
|
|
||||||
if (took > 20000) { // if more than 20 seconds, DEBUG log it
|
if (took > 20000) { // if more than 20 seconds, DEBUG log it
|
||||||
logger.debug(message);
|
logger.debug(message);
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.lucene.store;
|
package org.apache.lucene.store;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*/
|
*/
|
||||||
public final class StoreUtils {
|
public final class StoreUtils {
|
||||||
|
@ -46,4 +48,12 @@ public final class StoreUtils {
|
||||||
|
|
||||||
return directory.toString();
|
return directory.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static String toString(Directory[] directories) {
|
||||||
|
String[] strings = new String[directories.length];
|
||||||
|
for(int i=0;i<directories.length;i++) {
|
||||||
|
strings[i] = toString(directories[i]);
|
||||||
|
}
|
||||||
|
return Arrays.toString(strings);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package org.elasticsearch.index.merge;
|
package org.elasticsearch.index.merge;
|
||||||
|
|
||||||
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.common.io.stream.StreamInput;
|
import org.elasticsearch.common.io.stream.StreamInput;
|
||||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||||
import org.elasticsearch.common.io.stream.Streamable;
|
import org.elasticsearch.common.io.stream.Streamable;
|
||||||
|
@ -43,11 +44,20 @@ public class MergeStats implements Streamable, ToXContent {
|
||||||
private long currentNumDocs;
|
private long currentNumDocs;
|
||||||
private long currentSizeInBytes;
|
private long currentSizeInBytes;
|
||||||
|
|
||||||
|
/** Total millis that large merges were stopped so that smaller merges would finish. */
|
||||||
|
private long totalStoppedTimeInMillis;
|
||||||
|
|
||||||
|
/** Total millis that we slept during writes so merge IO is throttled. */
|
||||||
|
private long totalThrottledTimeInMillis;
|
||||||
|
|
||||||
|
private long totalBytesPerSecAutoThrottle;
|
||||||
|
|
||||||
public MergeStats() {
|
public MergeStats() {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void add(long totalMerges, long totalMergeTime, long totalNumDocs, long totalSizeInBytes, long currentMerges, long currentNumDocs, long currentSizeInBytes) {
|
public void add(long totalMerges, long totalMergeTime, long totalNumDocs, long totalSizeInBytes, long currentMerges, long currentNumDocs, long currentSizeInBytes,
|
||||||
|
long stoppedTimeMillis, long throttledTimeMillis, double mbPerSecAutoThrottle) {
|
||||||
this.total += totalMerges;
|
this.total += totalMerges;
|
||||||
this.totalTimeInMillis += totalMergeTime;
|
this.totalTimeInMillis += totalMergeTime;
|
||||||
this.totalNumDocs += totalNumDocs;
|
this.totalNumDocs += totalNumDocs;
|
||||||
|
@ -55,6 +65,14 @@ public class MergeStats implements Streamable, ToXContent {
|
||||||
this.current += currentMerges;
|
this.current += currentMerges;
|
||||||
this.currentNumDocs += currentNumDocs;
|
this.currentNumDocs += currentNumDocs;
|
||||||
this.currentSizeInBytes += currentSizeInBytes;
|
this.currentSizeInBytes += currentSizeInBytes;
|
||||||
|
this.totalStoppedTimeInMillis += stoppedTimeMillis;
|
||||||
|
this.totalThrottledTimeInMillis += throttledTimeMillis;
|
||||||
|
long bytesPerSecAutoThrottle = (long) (mbPerSecAutoThrottle * 1024 * 1024);
|
||||||
|
if (this.totalBytesPerSecAutoThrottle == Long.MAX_VALUE || bytesPerSecAutoThrottle == Long.MAX_VALUE) {
|
||||||
|
this.totalBytesPerSecAutoThrottle = Long.MAX_VALUE;
|
||||||
|
} else {
|
||||||
|
this.totalBytesPerSecAutoThrottle += bytesPerSecAutoThrottle;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void add(MergeStats mergeStats) {
|
public void add(MergeStats mergeStats) {
|
||||||
|
@ -68,6 +86,13 @@ public class MergeStats implements Streamable, ToXContent {
|
||||||
this.current += mergeStats.current;
|
this.current += mergeStats.current;
|
||||||
this.currentNumDocs += mergeStats.currentNumDocs;
|
this.currentNumDocs += mergeStats.currentNumDocs;
|
||||||
this.currentSizeInBytes += mergeStats.currentSizeInBytes;
|
this.currentSizeInBytes += mergeStats.currentSizeInBytes;
|
||||||
|
this.totalStoppedTimeInMillis += mergeStats.totalStoppedTimeInMillis;
|
||||||
|
this.totalThrottledTimeInMillis += mergeStats.totalThrottledTimeInMillis;
|
||||||
|
if (this.totalBytesPerSecAutoThrottle == Long.MAX_VALUE || mergeStats.totalBytesPerSecAutoThrottle == Long.MAX_VALUE) {
|
||||||
|
this.totalBytesPerSecAutoThrottle = Long.MAX_VALUE;
|
||||||
|
} else {
|
||||||
|
this.totalBytesPerSecAutoThrottle += mergeStats.totalBytesPerSecAutoThrottle;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -84,6 +109,34 @@ public class MergeStats implements Streamable, ToXContent {
|
||||||
return this.totalTimeInMillis;
|
return this.totalTimeInMillis;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The total time large merges were stopped so smaller merges could finish.
|
||||||
|
*/
|
||||||
|
public long getTotalStoppedTimeInMillis() {
|
||||||
|
return this.totalStoppedTimeInMillis;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The total time large merges were stopped so smaller merges could finish.
|
||||||
|
*/
|
||||||
|
public TimeValue getTotalStoppedTime() {
|
||||||
|
return new TimeValue(totalStoppedTimeInMillis);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The total time merge IO writes were throttled.
|
||||||
|
*/
|
||||||
|
public long getTotalThrottledTimeInMillis() {
|
||||||
|
return this.totalThrottledTimeInMillis;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The total time merge IO writes were throttled.
|
||||||
|
*/
|
||||||
|
public TimeValue getTotalThrottledTime() {
|
||||||
|
return new TimeValue(totalThrottledTimeInMillis);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The total time merges have been executed.
|
* The total time merges have been executed.
|
||||||
*/
|
*/
|
||||||
|
@ -103,6 +156,10 @@ public class MergeStats implements Streamable, ToXContent {
|
||||||
return new ByteSizeValue(totalSizeInBytes);
|
return new ByteSizeValue(totalSizeInBytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getTotalBytesPerSecAutoThrottle() {
|
||||||
|
return totalBytesPerSecAutoThrottle;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The current number of merges executing.
|
* The current number of merges executing.
|
||||||
*/
|
*/
|
||||||
|
@ -138,6 +195,9 @@ public class MergeStats implements Streamable, ToXContent {
|
||||||
builder.timeValueField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, totalTimeInMillis);
|
builder.timeValueField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, totalTimeInMillis);
|
||||||
builder.field(Fields.TOTAL_DOCS, totalNumDocs);
|
builder.field(Fields.TOTAL_DOCS, totalNumDocs);
|
||||||
builder.byteSizeField(Fields.TOTAL_SIZE_IN_BYTES, Fields.TOTAL_SIZE, totalSizeInBytes);
|
builder.byteSizeField(Fields.TOTAL_SIZE_IN_BYTES, Fields.TOTAL_SIZE, totalSizeInBytes);
|
||||||
|
builder.timeValueField(Fields.TOTAL_STOPPED_TIME_IN_MILLIS, Fields.TOTAL_STOPPED_TIME, totalStoppedTimeInMillis);
|
||||||
|
builder.timeValueField(Fields.TOTAL_THROTTLED_TIME_IN_MILLIS, Fields.TOTAL_THROTTLED_TIME, totalThrottledTimeInMillis);
|
||||||
|
builder.byteSizeField(Fields.TOTAL_THROTTLE_BYTES_PER_SEC_IN_BYTES, Fields.TOTAL_THROTTLE_BYTES_PER_SEC, totalBytesPerSecAutoThrottle);
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
return builder;
|
return builder;
|
||||||
}
|
}
|
||||||
|
@ -151,9 +211,15 @@ public class MergeStats implements Streamable, ToXContent {
|
||||||
static final XContentBuilderString TOTAL = new XContentBuilderString("total");
|
static final XContentBuilderString TOTAL = new XContentBuilderString("total");
|
||||||
static final XContentBuilderString TOTAL_TIME = new XContentBuilderString("total_time");
|
static final XContentBuilderString TOTAL_TIME = new XContentBuilderString("total_time");
|
||||||
static final XContentBuilderString TOTAL_TIME_IN_MILLIS = new XContentBuilderString("total_time_in_millis");
|
static final XContentBuilderString TOTAL_TIME_IN_MILLIS = new XContentBuilderString("total_time_in_millis");
|
||||||
|
static final XContentBuilderString TOTAL_STOPPED_TIME = new XContentBuilderString("total_stopped_time");
|
||||||
|
static final XContentBuilderString TOTAL_STOPPED_TIME_IN_MILLIS = new XContentBuilderString("total_stopped_time_in_millis");
|
||||||
|
static final XContentBuilderString TOTAL_THROTTLED_TIME = new XContentBuilderString("total_throttled_time");
|
||||||
|
static final XContentBuilderString TOTAL_THROTTLED_TIME_IN_MILLIS = new XContentBuilderString("total_throttled_time_in_millis");
|
||||||
static final XContentBuilderString TOTAL_DOCS = new XContentBuilderString("total_docs");
|
static final XContentBuilderString TOTAL_DOCS = new XContentBuilderString("total_docs");
|
||||||
static final XContentBuilderString TOTAL_SIZE = new XContentBuilderString("total_size");
|
static final XContentBuilderString TOTAL_SIZE = new XContentBuilderString("total_size");
|
||||||
static final XContentBuilderString TOTAL_SIZE_IN_BYTES = new XContentBuilderString("total_size_in_bytes");
|
static final XContentBuilderString TOTAL_SIZE_IN_BYTES = new XContentBuilderString("total_size_in_bytes");
|
||||||
|
static final XContentBuilderString TOTAL_THROTTLE_BYTES_PER_SEC_IN_BYTES = new XContentBuilderString("total_auto_throttle_in_bytes");
|
||||||
|
static final XContentBuilderString TOTAL_THROTTLE_BYTES_PER_SEC = new XContentBuilderString("total_auto_throttle");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -165,6 +231,10 @@ public class MergeStats implements Streamable, ToXContent {
|
||||||
current = in.readVLong();
|
current = in.readVLong();
|
||||||
currentNumDocs = in.readVLong();
|
currentNumDocs = in.readVLong();
|
||||||
currentSizeInBytes = in.readVLong();
|
currentSizeInBytes = in.readVLong();
|
||||||
|
// Added in 2.0:
|
||||||
|
totalStoppedTimeInMillis = in.readVLong();
|
||||||
|
totalThrottledTimeInMillis = in.readVLong();
|
||||||
|
totalBytesPerSecAutoThrottle = in.readVLong();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -176,5 +246,9 @@ public class MergeStats implements Streamable, ToXContent {
|
||||||
out.writeVLong(current);
|
out.writeVLong(current);
|
||||||
out.writeVLong(currentNumDocs);
|
out.writeVLong(currentNumDocs);
|
||||||
out.writeVLong(currentSizeInBytes);
|
out.writeVLong(currentSizeInBytes);
|
||||||
|
// Added in 2.0:
|
||||||
|
out.writeVLong(totalStoppedTimeInMillis);
|
||||||
|
out.writeVLong(totalThrottledTimeInMillis);
|
||||||
|
out.writeVLong(totalBytesPerSecAutoThrottle);
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -49,14 +49,13 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
|
||||||
private final IndexSettingsService indexSettingsService;
|
private final IndexSettingsService indexSettingsService;
|
||||||
private final ApplySettings applySettings = new ApplySettings();
|
private final ApplySettings applySettings = new ApplySettings();
|
||||||
|
|
||||||
private static final String MAX_THREAD_COUNT_KEY = "max_thread_count";
|
public static final String MAX_THREAD_COUNT = "index.merge.scheduler.max_thread_count";
|
||||||
private static final String MAX_MERGE_COUNT_KEY = "max_merge_count";
|
public static final String MAX_MERGE_COUNT = "index.merge.scheduler.max_merge_count";
|
||||||
|
public static final String AUTO_THROTTLE = "index.merge.scheduler.auto_throttle";
|
||||||
public static final String MAX_THREAD_COUNT = "index.merge.scheduler." + MAX_THREAD_COUNT_KEY;
|
|
||||||
public static final String MAX_MERGE_COUNT = "index.merge.scheduler." + MAX_MERGE_COUNT_KEY;
|
|
||||||
|
|
||||||
private volatile int maxThreadCount;
|
private volatile int maxThreadCount;
|
||||||
private volatile int maxMergeCount;
|
private volatile int maxMergeCount;
|
||||||
|
private volatile boolean autoThrottle;
|
||||||
|
|
||||||
private Set<CustomConcurrentMergeScheduler> schedulers = new CopyOnWriteArraySet<>();
|
private Set<CustomConcurrentMergeScheduler> schedulers = new CopyOnWriteArraySet<>();
|
||||||
|
|
||||||
|
@ -64,10 +63,10 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
|
||||||
public ConcurrentMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexSettingsService indexSettingsService) {
|
public ConcurrentMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexSettingsService indexSettingsService) {
|
||||||
super(shardId, indexSettings, threadPool);
|
super(shardId, indexSettings, threadPool);
|
||||||
this.indexSettingsService = indexSettingsService;
|
this.indexSettingsService = indexSettingsService;
|
||||||
// TODO LUCENE MONITOR this will change in Lucene 4.0
|
this.maxThreadCount = indexSettings.getAsInt(MAX_THREAD_COUNT, Math.max(1, Math.min(4, EsExecutors.boundedNumberOfProcessors(indexSettings) / 2)));
|
||||||
this.maxThreadCount = componentSettings.getAsInt(MAX_THREAD_COUNT_KEY, Math.max(1, Math.min(3, EsExecutors.boundedNumberOfProcessors(indexSettings) / 2)));
|
this.maxMergeCount = indexSettings.getAsInt(MAX_MERGE_COUNT, maxThreadCount + 5);
|
||||||
this.maxMergeCount = componentSettings.getAsInt(MAX_MERGE_COUNT_KEY, maxThreadCount + 2);
|
this.autoThrottle = indexSettings.getAsBoolean(AUTO_THROTTLE, true);
|
||||||
logger.debug("using [concurrent] merge scheduler with max_thread_count[{}], max_merge_count[{}]", maxThreadCount, maxMergeCount);
|
logger.debug("using [concurrent] merge scheduler with max_thread_count[{}], max_merge_count[{}], auto_throttle[{}]", maxThreadCount, maxMergeCount, autoThrottle);
|
||||||
|
|
||||||
indexSettingsService.addListener(applySettings);
|
indexSettingsService.addListener(applySettings);
|
||||||
}
|
}
|
||||||
|
@ -75,10 +74,14 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
|
||||||
@Override
|
@Override
|
||||||
public MergeScheduler newMergeScheduler() {
|
public MergeScheduler newMergeScheduler() {
|
||||||
CustomConcurrentMergeScheduler concurrentMergeScheduler = new CustomConcurrentMergeScheduler(logger, shardId, this);
|
CustomConcurrentMergeScheduler concurrentMergeScheduler = new CustomConcurrentMergeScheduler(logger, shardId, this);
|
||||||
// which would then stall if there are 2 merges in flight, and unstall once we are back to 1 or 0 merges
|
|
||||||
// NOTE: we pass maxMergeCount+1 here so that CMS will allow one too many merges to kick off which then allows
|
// NOTE: we pass maxMergeCount+1 here so that CMS will allow one too many merges to kick off which then allows
|
||||||
// InternalEngine.IndexThrottle to detect too-many-merges and throttle:
|
// InternalEngine.IndexThrottle to detect too-many-merges and throttle:
|
||||||
concurrentMergeScheduler.setMaxMergesAndThreads(maxMergeCount+1, maxThreadCount);
|
concurrentMergeScheduler.setMaxMergesAndThreads(maxMergeCount+1, maxThreadCount);
|
||||||
|
if (autoThrottle) {
|
||||||
|
concurrentMergeScheduler.enableAutoIOThrottle();
|
||||||
|
} else {
|
||||||
|
concurrentMergeScheduler.disableAutoIOThrottle();
|
||||||
|
}
|
||||||
schedulers.add(concurrentMergeScheduler);
|
schedulers.add(concurrentMergeScheduler);
|
||||||
return concurrentMergeScheduler;
|
return concurrentMergeScheduler;
|
||||||
}
|
}
|
||||||
|
@ -86,9 +89,13 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
|
||||||
@Override
|
@Override
|
||||||
public MergeStats stats() {
|
public MergeStats stats() {
|
||||||
MergeStats mergeStats = new MergeStats();
|
MergeStats mergeStats = new MergeStats();
|
||||||
|
// TODO: why would there be more than one CMS for a single shard...?
|
||||||
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
|
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
|
||||||
mergeStats.add(scheduler.totalMerges(), scheduler.totalMergeTime(), scheduler.totalMergeNumDocs(), scheduler.totalMergeSizeInBytes(),
|
mergeStats.add(scheduler.totalMerges(), scheduler.totalMergeTime(), scheduler.totalMergeNumDocs(), scheduler.totalMergeSizeInBytes(),
|
||||||
scheduler.currentMerges(), scheduler.currentMergesNumDocs(), scheduler.currentMergesSizeInBytes());
|
scheduler.currentMerges(), scheduler.currentMergesNumDocs(), scheduler.currentMergesSizeInBytes(),
|
||||||
|
scheduler.totalMergeStoppedTimeMillis(),
|
||||||
|
scheduler.totalMergeThrottledTimeMillis(),
|
||||||
|
autoThrottle ? scheduler.getIORateLimitMBPerSec() : Double.POSITIVE_INFINITY);
|
||||||
}
|
}
|
||||||
return mergeStats;
|
return mergeStats;
|
||||||
}
|
}
|
||||||
|
@ -165,7 +172,7 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
|
||||||
public void onRefreshSettings(Settings settings) {
|
public void onRefreshSettings(Settings settings) {
|
||||||
int maxThreadCount = settings.getAsInt(MAX_THREAD_COUNT, ConcurrentMergeSchedulerProvider.this.maxThreadCount);
|
int maxThreadCount = settings.getAsInt(MAX_THREAD_COUNT, ConcurrentMergeSchedulerProvider.this.maxThreadCount);
|
||||||
if (maxThreadCount != ConcurrentMergeSchedulerProvider.this.maxThreadCount) {
|
if (maxThreadCount != ConcurrentMergeSchedulerProvider.this.maxThreadCount) {
|
||||||
logger.info("updating [{}] from [{}] to [{}]", MAX_THREAD_COUNT_KEY, ConcurrentMergeSchedulerProvider.this.maxThreadCount, maxThreadCount);
|
logger.info("updating [{}] from [{}] to [{}]", MAX_THREAD_COUNT, ConcurrentMergeSchedulerProvider.this.maxThreadCount, maxThreadCount);
|
||||||
ConcurrentMergeSchedulerProvider.this.maxThreadCount = maxThreadCount;
|
ConcurrentMergeSchedulerProvider.this.maxThreadCount = maxThreadCount;
|
||||||
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
|
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
|
||||||
scheduler.setMaxMergesAndThreads(ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxThreadCount);
|
scheduler.setMaxMergesAndThreads(ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxThreadCount);
|
||||||
|
@ -174,12 +181,25 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
|
||||||
|
|
||||||
int maxMergeCount = settings.getAsInt(MAX_MERGE_COUNT, ConcurrentMergeSchedulerProvider.this.maxMergeCount);
|
int maxMergeCount = settings.getAsInt(MAX_MERGE_COUNT, ConcurrentMergeSchedulerProvider.this.maxMergeCount);
|
||||||
if (maxMergeCount != ConcurrentMergeSchedulerProvider.this.maxMergeCount) {
|
if (maxMergeCount != ConcurrentMergeSchedulerProvider.this.maxMergeCount) {
|
||||||
logger.info("updating [{}] from [{}] to [{}]", MAX_MERGE_COUNT_KEY, ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxMergeCount);
|
logger.info("updating [{}] from [{}] to [{}]", MAX_MERGE_COUNT, ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxMergeCount);
|
||||||
ConcurrentMergeSchedulerProvider.this.maxMergeCount = maxMergeCount;
|
ConcurrentMergeSchedulerProvider.this.maxMergeCount = maxMergeCount;
|
||||||
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
|
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
|
||||||
scheduler.setMaxMergesAndThreads(maxMergeCount, ConcurrentMergeSchedulerProvider.this.maxThreadCount);
|
scheduler.setMaxMergesAndThreads(maxMergeCount, ConcurrentMergeSchedulerProvider.this.maxThreadCount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean autoThrottle = settings.getAsBoolean(AUTO_THROTTLE, ConcurrentMergeSchedulerProvider.this.autoThrottle);
|
||||||
|
if (autoThrottle != ConcurrentMergeSchedulerProvider.this.autoThrottle) {
|
||||||
|
logger.info("updating [{}] from [{}] to [{}]", AUTO_THROTTLE, ConcurrentMergeSchedulerProvider.this.autoThrottle, autoThrottle);
|
||||||
|
ConcurrentMergeSchedulerProvider.this.autoThrottle = autoThrottle;
|
||||||
|
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
|
||||||
|
if (autoThrottle) {
|
||||||
|
scheduler.enableAutoIOThrottle();
|
||||||
|
} else {
|
||||||
|
scheduler.disableAutoIOThrottle();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,6 +55,7 @@ public class IndexDynamicSettingsModule extends AbstractModule {
|
||||||
indexDynamicSettings.addDynamicSetting(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE);
|
indexDynamicSettings.addDynamicSetting(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE);
|
||||||
indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT);
|
indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT);
|
||||||
indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT);
|
indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT);
|
||||||
|
indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.AUTO_THROTTLE);
|
||||||
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_REQUIRE_GROUP + "*");
|
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_REQUIRE_GROUP + "*");
|
||||||
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "*");
|
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "*");
|
||||||
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_EXCLUDE_GROUP + "*");
|
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_EXCLUDE_GROUP + "*");
|
||||||
|
|
|
@ -21,8 +21,9 @@ package org.elasticsearch.index.store.distributor;
|
||||||
|
|
||||||
import org.apache.lucene.store.Directory;
|
import org.apache.lucene.store.Directory;
|
||||||
import org.apache.lucene.store.FSDirectory;
|
import org.apache.lucene.store.FSDirectory;
|
||||||
import org.elasticsearch.index.store.DirectoryUtils;
|
import org.apache.lucene.store.StoreUtils;
|
||||||
import org.elasticsearch.index.store.DirectoryService;
|
import org.elasticsearch.index.store.DirectoryService;
|
||||||
|
import org.elasticsearch.index.store.DirectoryUtils;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.FileStore;
|
import java.nio.file.FileStore;
|
||||||
|
@ -68,7 +69,7 @@ public abstract class AbstractDistributor implements Distributor {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return name() + Arrays.toString(delegates);
|
return name() + StoreUtils.toString(delegates);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected abstract Directory doAny() throws IOException;
|
protected abstract Directory doAny() throws IOException;
|
||||||
|
|
|
@ -95,7 +95,7 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen
|
||||||
this.indexService = indexService;
|
this.indexService = indexService;
|
||||||
this.indicesStore = indicesStore;
|
this.indicesStore = indicesStore;
|
||||||
|
|
||||||
this.rateLimitingType = indexSettings.get(INDEX_STORE_THROTTLE_TYPE, "node");
|
this.rateLimitingType = indexSettings.get(INDEX_STORE_THROTTLE_TYPE, "none");
|
||||||
if (rateLimitingType.equalsIgnoreCase("node")) {
|
if (rateLimitingType.equalsIgnoreCase("node")) {
|
||||||
nodeRateLimiting = true;
|
nodeRateLimiting = true;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -114,10 +114,10 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
||||||
this.transportService = transportService;
|
this.transportService = transportService;
|
||||||
transportService.registerHandler(ACTION_SHARD_EXISTS, new ShardActiveRequestHandler());
|
transportService.registerHandler(ACTION_SHARD_EXISTS, new ShardActiveRequestHandler());
|
||||||
|
|
||||||
// we limit with 20MB / sec by default with a default type set to merge sice 0.90.1
|
// we don't limit by default (we default to CMS's auto throttle instead):
|
||||||
this.rateLimitingType = componentSettings.get("throttle.type", StoreRateLimiting.Type.MERGE.name());
|
this.rateLimitingType = componentSettings.get("throttle.type", StoreRateLimiting.Type.NONE.name());
|
||||||
rateLimiting.setType(rateLimitingType);
|
rateLimiting.setType(rateLimitingType);
|
||||||
this.rateLimitingThrottle = componentSettings.getAsBytesSize("throttle.max_bytes_per_sec", new ByteSizeValue(20, ByteSizeUnit.MB));
|
this.rateLimitingThrottle = componentSettings.getAsBytesSize("throttle.max_bytes_per_sec", new ByteSizeValue(10240, ByteSizeUnit.MB));
|
||||||
rateLimiting.setMaxRate(rateLimitingThrottle);
|
rateLimiting.setMaxRate(rateLimitingThrottle);
|
||||||
|
|
||||||
logger.debug("using indices.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimitingType, rateLimitingThrottle);
|
logger.debug("using indices.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimitingType, rateLimitingThrottle);
|
||||||
|
|
|
@ -23,7 +23,6 @@ import org.apache.log4j.AppenderSkeleton;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.log4j.spi.LoggingEvent;
|
import org.apache.log4j.spi.LoggingEvent;
|
||||||
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
|
|
||||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||||
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||||
|
@ -264,7 +263,8 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
|
||||||
public boolean sawIndexWriterMessage;
|
public boolean sawIndexWriterMessage;
|
||||||
public boolean sawFlushDeletes;
|
public boolean sawFlushDeletes;
|
||||||
public boolean sawMergeThreadPaused;
|
public boolean sawMergeThreadPaused;
|
||||||
public boolean sawUpdateSetting;
|
public boolean sawUpdateMaxThreadCount;
|
||||||
|
public boolean sawUpdateAutoThrottle;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void append(LoggingEvent event) {
|
protected void append(LoggingEvent event) {
|
||||||
|
@ -274,8 +274,11 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
|
||||||
sawFlushDeletes |= message.contains("IW: apply all deletes during flush");
|
sawFlushDeletes |= message.contains("IW: apply all deletes during flush");
|
||||||
sawMergeThreadPaused |= message.contains("CMS: pause thread");
|
sawMergeThreadPaused |= message.contains("CMS: pause thread");
|
||||||
}
|
}
|
||||||
if (event.getLevel() == Level.INFO && message.contains("updating [max_thread_count] from [10000] to [1]")) {
|
if (event.getLevel() == Level.INFO && message.contains("updating [index.merge.scheduler.max_thread_count] from [10000] to [1]")) {
|
||||||
sawUpdateSetting = true;
|
sawUpdateMaxThreadCount = true;
|
||||||
|
}
|
||||||
|
if (event.getLevel() == Level.INFO && message.contains("updating [index.merge.scheduler.auto_throttle] from [true] to [false]")) {
|
||||||
|
sawUpdateAutoThrottle = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -289,10 +292,50 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateAutoThrottleSettings() {
|
||||||
|
|
||||||
|
MockAppender mockAppender = new MockAppender();
|
||||||
|
Logger rootLogger = Logger.getRootLogger();
|
||||||
|
Level savedLevel = rootLogger.getLevel();
|
||||||
|
rootLogger.addAppender(mockAppender);
|
||||||
|
rootLogger.setLevel(Level.TRACE);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// No throttling at first, only 1 non-replicated shard, force lots of merging:
|
||||||
|
assertAcked(prepareCreate("test")
|
||||||
|
.setSettings(ImmutableSettings.builder()
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
|
||||||
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
|
||||||
|
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
|
||||||
|
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
|
||||||
|
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
|
||||||
|
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "2")
|
||||||
|
));
|
||||||
|
|
||||||
|
// Disable auto throttle:
|
||||||
|
client()
|
||||||
|
.admin()
|
||||||
|
.indices()
|
||||||
|
.prepareUpdateSettings("test")
|
||||||
|
.setSettings(ImmutableSettings.builder()
|
||||||
|
.put(ConcurrentMergeSchedulerProvider.AUTO_THROTTLE, "no"))
|
||||||
|
.get();
|
||||||
|
|
||||||
|
// Make sure we log the change:
|
||||||
|
assertTrue(mockAppender.sawUpdateAutoThrottle);
|
||||||
|
|
||||||
|
// Make sure setting says it is in fact changed:
|
||||||
|
GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings("test").get();
|
||||||
|
assertThat(getSettingsResponse.getSetting("test", ConcurrentMergeSchedulerProvider.AUTO_THROTTLE), equalTo("no"));
|
||||||
|
} finally {
|
||||||
|
rootLogger.removeAppender(mockAppender);
|
||||||
|
rootLogger.setLevel(savedLevel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// #6882: make sure we can change index.merge.scheduler.max_thread_count live
|
// #6882: make sure we can change index.merge.scheduler.max_thread_count live
|
||||||
@Test
|
@Test
|
||||||
@Slow
|
|
||||||
@AwaitsFix(bugUrl="Super slow because of LUCENE-6119. Muted until we clean up merge throttling.")
|
|
||||||
public void testUpdateMergeMaxThreadCount() {
|
public void testUpdateMergeMaxThreadCount() {
|
||||||
|
|
||||||
MockAppender mockAppender = new MockAppender();
|
MockAppender mockAppender = new MockAppender();
|
||||||
|
@ -303,11 +346,8 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
|
||||||
// Tons of merge threads allowed, only 1 non-replicated shard, force lots of merging, throttle so they fall behind:
|
|
||||||
assertAcked(prepareCreate("test")
|
assertAcked(prepareCreate("test")
|
||||||
.setSettings(ImmutableSettings.builder()
|
.setSettings(ImmutableSettings.builder()
|
||||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "merge")
|
|
||||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, "1mb")
|
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
|
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
|
||||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
|
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
|
||||||
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
|
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
|
||||||
|
@ -316,79 +356,25 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
|
||||||
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "10000")
|
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "10000")
|
||||||
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "10000")
|
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "10000")
|
||||||
));
|
));
|
||||||
ensureGreen();
|
|
||||||
long termUpto = 0;
|
|
||||||
for(int i=0;i<100;i++) {
|
|
||||||
// Provoke slowish merging by making many unique terms:
|
|
||||||
StringBuilder sb = new StringBuilder();
|
|
||||||
for(int j=0;j<100;j++) {
|
|
||||||
sb.append(' ');
|
|
||||||
sb.append(termUpto++);
|
|
||||||
}
|
|
||||||
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
|
|
||||||
if (i % 2 == 0) {
|
|
||||||
refresh();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
assertTrue(mockAppender.sawFlushDeletes);
|
assertFalse(mockAppender.sawUpdateMaxThreadCount);
|
||||||
assertFalse(mockAppender.sawMergeThreadPaused);
|
|
||||||
mockAppender.sawFlushDeletes = false;
|
|
||||||
mockAppender.sawMergeThreadPaused = false;
|
|
||||||
|
|
||||||
assertFalse(mockAppender.sawUpdateSetting);
|
// Now make a live change to reduce allowed merge threads:
|
||||||
|
|
||||||
// Now make a live change to reduce allowed merge threads, and waaay over-throttle merging so they fall behind:
|
|
||||||
client()
|
client()
|
||||||
.admin()
|
.admin()
|
||||||
.indices()
|
.indices()
|
||||||
.prepareUpdateSettings("test")
|
.prepareUpdateSettings("test")
|
||||||
.setSettings(ImmutableSettings.builder()
|
.setSettings(ImmutableSettings.builder()
|
||||||
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
|
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
|
||||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, "10kb")
|
|
||||||
)
|
)
|
||||||
.get();
|
.get();
|
||||||
|
|
||||||
try {
|
|
||||||
|
|
||||||
// Make sure we log the change:
|
// Make sure we log the change:
|
||||||
assertTrue(mockAppender.sawUpdateSetting);
|
assertTrue(mockAppender.sawUpdateMaxThreadCount);
|
||||||
|
|
||||||
int i = 0;
|
|
||||||
while (true) {
|
|
||||||
// Provoke slowish merging by making many unique terms:
|
|
||||||
StringBuilder sb = new StringBuilder();
|
|
||||||
for(int j=0;j<100;j++) {
|
|
||||||
sb.append(' ');
|
|
||||||
sb.append(termUpto++);
|
|
||||||
}
|
|
||||||
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
|
|
||||||
if (i % 2 == 0) {
|
|
||||||
refresh();
|
|
||||||
}
|
|
||||||
// This time we should see some merges were in fact paused:
|
|
||||||
if (mockAppender.sawMergeThreadPaused) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
i++;
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
// Make merges fast again & finish merges before we try to close; else we sometimes get a "Delete Index failed - not acked"
|
|
||||||
// when ElasticsearchIntegrationTest.after tries to remove indices created by the test:
|
|
||||||
client()
|
|
||||||
.admin()
|
|
||||||
.indices()
|
|
||||||
.prepareUpdateSettings("test")
|
|
||||||
.setSettings(ImmutableSettings.builder()
|
|
||||||
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "3")
|
|
||||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, "20mb")
|
|
||||||
)
|
|
||||||
.get();
|
|
||||||
|
|
||||||
// Wait for merges to finish
|
|
||||||
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get();
|
|
||||||
}
|
|
||||||
|
|
||||||
|
// Make sure setting says it is in fact changed:
|
||||||
|
GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings("test").get();
|
||||||
|
assertThat(getSettingsResponse.getSetting("test", ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT), equalTo("1"));
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
rootLogger.removeAppender(mockAppender);
|
rootLogger.removeAppender(mockAppender);
|
||||||
|
|
|
@ -121,6 +121,8 @@ public class SimpleDistributorTests extends ElasticsearchIntegrationTest {
|
||||||
.put("index.store.type", storeType.name())
|
.put("index.store.type", storeType.name())
|
||||||
.put("index.number_of_replicas", 0)
|
.put("index.number_of_replicas", 0)
|
||||||
.put("index.number_of_shards", 1)
|
.put("index.number_of_shards", 1)
|
||||||
|
.put("index.store.throttle.type", "merge")
|
||||||
|
.put("index.store.throttle.max_bytes_per_sec", "20mb")
|
||||||
)
|
)
|
||||||
.execute().actionGet();
|
.execute().actionGet();
|
||||||
assertThat(client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet().isTimedOut(), equalTo(false));
|
assertThat(client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet().isTimedOut(), equalTo(false));
|
||||||
|
|
|
@ -444,7 +444,9 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
|
||||||
if (random.nextBoolean()) {
|
if (random.nextBoolean()) {
|
||||||
builder.put(StoreModule.DISTIBUTOR_KEY, random.nextBoolean() ? StoreModule.LEAST_USED_DISTRIBUTOR : StoreModule.RANDOM_WEIGHT_DISTRIBUTOR);
|
builder.put(StoreModule.DISTIBUTOR_KEY, random.nextBoolean() ? StoreModule.LEAST_USED_DISTRIBUTOR : StoreModule.RANDOM_WEIGHT_DISTRIBUTOR);
|
||||||
}
|
}
|
||||||
|
if (random.nextBoolean()) {
|
||||||
|
builder.put(ConcurrentMergeSchedulerProvider.AUTO_THROTTLE, false);
|
||||||
|
}
|
||||||
|
|
||||||
if (random.nextBoolean()) {
|
if (random.nextBoolean()) {
|
||||||
if (random.nextInt(10) == 0) { // do something crazy slow here
|
if (random.nextInt(10) == 0) { // do something crazy slow here
|
||||||
|
|
Loading…
Reference in New Issue