mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-25 17:38:44 +00:00
first cut
This commit is contained in:
parent
e416ed2426
commit
31e6acf3f2
@ -18,38 +18,6 @@ heap space* using the "Memory" (see below) storage type. It translates
|
||||
to the fact that there is no need for extra large JVM heaps (with their
|
||||
own consequences) for storing the index in memory.
|
||||
|
||||
|
||||
[float]
|
||||
[[store-throttling]]
|
||||
=== Store Level Throttling
|
||||
|
||||
The way Lucene, the IR library elasticsearch uses under the covers,
|
||||
works is by creating immutable segments (up to deletes) and constantly
|
||||
merging them (the merge policy settings allow to control how those
|
||||
merges happen). The merge process happens in an asynchronous manner
|
||||
without affecting the indexing / search speed. The problem though,
|
||||
especially on systems with low IO, is that the merge process can be
|
||||
expensive and affect search / index operation simply by the fact that
|
||||
the box is now taxed with more IO happening.
|
||||
|
||||
The store module allows to have throttling configured for merges (or
|
||||
all) either on the node level, or on the index level. The node level
|
||||
throttling will make sure that out of all the shards allocated on that
|
||||
node, the merge process won't pass the specific setting bytes per
|
||||
second. It can be set by setting `indices.store.throttle.type` to
|
||||
`merge`, and setting `indices.store.throttle.max_bytes_per_sec` to
|
||||
something like `5mb`. The node level settings can be changed dynamically
|
||||
using the cluster update settings API. The default is set
|
||||
to `20mb` with type `merge`.
|
||||
|
||||
If specific index level configuration is needed, regardless of the node
|
||||
level settings, it can be set as well using the
|
||||
`index.store.throttle.type`, and
|
||||
`index.store.throttle.max_bytes_per_sec`. The default value for the type
|
||||
is `node`, meaning it will throttle based on the node level settings and
|
||||
participate in the global throttling happening. Both settings can be set
|
||||
using the index update settings API dynamically.
|
||||
|
||||
[float]
|
||||
[[file-system]]
|
||||
=== File system storage types
|
||||
|
@ -46,6 +46,8 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
|
||||
private final CounterMetric currentMerges = new CounterMetric();
|
||||
private final CounterMetric currentMergesNumDocs = new CounterMetric();
|
||||
private final CounterMetric currentMergesSizeInBytes = new CounterMetric();
|
||||
private final CounterMetric totalMergeStoppedTime = new CounterMetric();
|
||||
private final CounterMetric totalMergeThrottledTime = new CounterMetric();
|
||||
|
||||
private final Set<OnGoingMerge> onGoingMerges = ConcurrentCollections.newConcurrentSet();
|
||||
private final Set<OnGoingMerge> readOnlyOnGoingMerges = Collections.unmodifiableSet(onGoingMerges);
|
||||
@ -83,6 +85,14 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
|
||||
return currentMergesSizeInBytes.count();
|
||||
}
|
||||
|
||||
public long totalMergeStoppedTimeMillis() {
|
||||
return totalMergeStoppedTime.count();
|
||||
}
|
||||
|
||||
public long totalMergeThrottledTimeMillis() {
|
||||
return totalMergeThrottledTime.count();
|
||||
}
|
||||
|
||||
public Set<OnGoingMerge> onGoingMerges() {
|
||||
return readOnlyOnGoingMerges;
|
||||
}
|
||||
@ -118,6 +128,10 @@ public class TrackingConcurrentMergeScheduler extends ConcurrentMergeScheduler {
|
||||
totalMergesNumDocs.inc(totalNumDocs);
|
||||
totalMergesSizeInBytes.inc(totalSizeInBytes);
|
||||
totalMerges.inc(took);
|
||||
|
||||
totalMergeStoppedTime.inc(merge.rateLimiter.getTotalStoppedNS()/1000000);
|
||||
totalMergeThrottledTime.inc(merge.rateLimiter.getTotalPausedNS()/1000000);
|
||||
|
||||
String message = String.format(Locale.ROOT,
|
||||
"merge segment [%s] done: took [%s], [%,.1f MB], [%,d docs]",
|
||||
merge.info == null ? "_na_" : merge.info.info.name,
|
||||
|
@ -1,106 +0,0 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.lucene.store;
|
||||
|
||||
import org.apache.lucene.store.IOContext.Context;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public final class RateLimitedFSDirectory extends FilterDirectory {
|
||||
|
||||
private final StoreRateLimiting.Provider rateLimitingProvider;
|
||||
|
||||
private final StoreRateLimiting.Listener rateListener;
|
||||
|
||||
public RateLimitedFSDirectory(Directory wrapped, StoreRateLimiting.Provider rateLimitingProvider,
|
||||
StoreRateLimiting.Listener rateListener) {
|
||||
super(wrapped);
|
||||
this.rateLimitingProvider = rateLimitingProvider;
|
||||
this.rateListener = rateListener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public IndexOutput createOutput(String name, IOContext context) throws IOException {
|
||||
final IndexOutput output = in.createOutput(name, context);
|
||||
|
||||
StoreRateLimiting rateLimiting = rateLimitingProvider.rateLimiting();
|
||||
StoreRateLimiting.Type type = rateLimiting.getType();
|
||||
RateLimiter limiter = rateLimiting.getRateLimiter();
|
||||
if (type == StoreRateLimiting.Type.NONE || limiter == null) {
|
||||
return output;
|
||||
}
|
||||
if (context.context == Context.MERGE || type == StoreRateLimiting.Type.ALL) {
|
||||
// we are merging, and type is either MERGE or ALL, rate limit...
|
||||
return new RateLimitedIndexOutput(new RateLimiterWrapper(limiter, rateListener), output);
|
||||
}
|
||||
// we shouldn't really get here...
|
||||
return output;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
in.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
StoreRateLimiting rateLimiting = rateLimitingProvider.rateLimiting();
|
||||
StoreRateLimiting.Type type = rateLimiting.getType();
|
||||
RateLimiter limiter = rateLimiting.getRateLimiter();
|
||||
if (type == StoreRateLimiting.Type.NONE || limiter == null) {
|
||||
return StoreUtils.toString(in);
|
||||
} else {
|
||||
return "rate_limited(" + StoreUtils.toString(in) + ", type=" + type.name() + ", rate=" + limiter.getMBPerSec() + ")";
|
||||
}
|
||||
}
|
||||
|
||||
// we wrap the limiter to notify our store if we limited to get statistics
|
||||
static final class RateLimiterWrapper extends RateLimiter {
|
||||
private final RateLimiter delegate;
|
||||
private final StoreRateLimiting.Listener rateListener;
|
||||
|
||||
RateLimiterWrapper(RateLimiter delegate, StoreRateLimiting.Listener rateListener) {
|
||||
this.delegate = delegate;
|
||||
this.rateListener = rateListener;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMBPerSec(double mbPerSec) {
|
||||
delegate.setMBPerSec(mbPerSec);
|
||||
}
|
||||
|
||||
@Override
|
||||
public double getMBPerSec() {
|
||||
return delegate.getMBPerSec();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long pause(long bytes) throws IOException {
|
||||
long pause = delegate.pause(bytes);
|
||||
rateListener.onPause(pause);
|
||||
return pause;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMinPauseCheckBytes() {
|
||||
return delegate.getMinPauseCheckBytes();
|
||||
}
|
||||
}
|
||||
}
|
@ -1,94 +0,0 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.lucene.store;
|
||||
|
||||
import org.apache.lucene.store.RateLimiter.SimpleRateLimiter;
|
||||
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class StoreRateLimiting {
|
||||
|
||||
public static interface Provider {
|
||||
|
||||
StoreRateLimiting rateLimiting();
|
||||
}
|
||||
|
||||
public interface Listener {
|
||||
|
||||
void onPause(long nanos);
|
||||
}
|
||||
|
||||
public static enum Type {
|
||||
NONE,
|
||||
MERGE,
|
||||
ALL;
|
||||
|
||||
public static Type fromString(String type) throws ElasticsearchIllegalArgumentException {
|
||||
if ("none".equalsIgnoreCase(type)) {
|
||||
return NONE;
|
||||
} else if ("merge".equalsIgnoreCase(type)) {
|
||||
return MERGE;
|
||||
} else if ("all".equalsIgnoreCase(type)) {
|
||||
return ALL;
|
||||
}
|
||||
throw new ElasticsearchIllegalArgumentException("rate limiting type [" + type + "] not valid, can be one of [all|merge|none]");
|
||||
}
|
||||
}
|
||||
|
||||
private final SimpleRateLimiter rateLimiter = new SimpleRateLimiter(0);
|
||||
private volatile SimpleRateLimiter actualRateLimiter;
|
||||
|
||||
private volatile Type type;
|
||||
|
||||
public StoreRateLimiting() {
|
||||
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public RateLimiter getRateLimiter() {
|
||||
return actualRateLimiter;
|
||||
}
|
||||
|
||||
public void setMaxRate(ByteSizeValue rate) {
|
||||
if (rate.bytes() <= 0) {
|
||||
actualRateLimiter = null;
|
||||
} else if (actualRateLimiter == null) {
|
||||
actualRateLimiter = rateLimiter;
|
||||
actualRateLimiter.setMBPerSec(rate.mbFrac());
|
||||
} else {
|
||||
assert rateLimiter == actualRateLimiter;
|
||||
rateLimiter.setMBPerSec(rate.mbFrac());
|
||||
}
|
||||
}
|
||||
|
||||
public Type getType() {
|
||||
return type;
|
||||
}
|
||||
|
||||
public void setType(Type type) {
|
||||
this.type = type;
|
||||
}
|
||||
|
||||
public void setType(String type) throws ElasticsearchIllegalArgumentException {
|
||||
this.type = Type.fromString(type);
|
||||
}
|
||||
}
|
@ -65,8 +65,6 @@ public class ClusterDynamicSettingsModule extends AbstractModule {
|
||||
clusterDynamicSettings.addDynamicSetting(IndicesFilterCache.INDICES_CACHE_FILTER_SIZE);
|
||||
clusterDynamicSettings.addDynamicSetting(IndicesFilterCache.INDICES_CACHE_FILTER_EXPIRE, Validator.TIME);
|
||||
clusterDynamicSettings.addDynamicSetting(IndicesFilterCache.INDICES_CACHE_FILTER_CONCURRENCY_LEVEL, Validator.POSITIVE_INTEGER);
|
||||
clusterDynamicSettings.addDynamicSetting(IndicesStore.INDICES_STORE_THROTTLE_TYPE);
|
||||
clusterDynamicSettings.addDynamicSetting(IndicesStore.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE);
|
||||
clusterDynamicSettings.addDynamicSetting(IndicesTTLService.INDICES_TTL_INTERVAL, Validator.TIME);
|
||||
clusterDynamicSettings.addDynamicSetting(MappingUpdatedAction.INDICES_MAPPING_ADDITIONAL_MAPPING_CHANGE_TIME, Validator.TIME);
|
||||
clusterDynamicSettings.addDynamicSetting(MetaData.SETTING_READ_ONLY);
|
||||
|
@ -19,6 +19,9 @@
|
||||
|
||||
package org.elasticsearch.index.merge;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
@ -28,11 +31,6 @@ import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class MergeStats implements Streamable, ToXContent {
|
||||
|
||||
private long total;
|
||||
@ -43,11 +41,18 @@ public class MergeStats implements Streamable, ToXContent {
|
||||
private long currentNumDocs;
|
||||
private long currentSizeInBytes;
|
||||
|
||||
/** Total millis that large merges were stopped so that smaller merges would finish. */
|
||||
private long totalStoppedTimeInMillis;
|
||||
|
||||
/** Total millis that we slept during writes so merge IO is throttled. */
|
||||
private long totalThrottledTimeInMillis;
|
||||
|
||||
public MergeStats() {
|
||||
|
||||
}
|
||||
|
||||
public void add(long totalMerges, long totalMergeTime, long totalNumDocs, long totalSizeInBytes, long currentMerges, long currentNumDocs, long currentSizeInBytes) {
|
||||
public void add(long totalMerges, long totalMergeTime, long totalNumDocs, long totalSizeInBytes, long currentMerges, long currentNumDocs, long currentSizeInBytes,
|
||||
long stoppedTimeMillis, long throttledTimeMillis) {
|
||||
this.total += totalMerges;
|
||||
this.totalTimeInMillis += totalMergeTime;
|
||||
this.totalNumDocs += totalNumDocs;
|
||||
@ -55,6 +60,8 @@ public class MergeStats implements Streamable, ToXContent {
|
||||
this.current += currentMerges;
|
||||
this.currentNumDocs += currentNumDocs;
|
||||
this.currentSizeInBytes += currentSizeInBytes;
|
||||
this.totalStoppedTimeInMillis += stoppedTimeMillis;
|
||||
this.totalThrottledTimeInMillis += throttledTimeMillis;
|
||||
}
|
||||
|
||||
public void add(MergeStats mergeStats) {
|
||||
@ -68,6 +75,8 @@ public class MergeStats implements Streamable, ToXContent {
|
||||
this.current += mergeStats.current;
|
||||
this.currentNumDocs += mergeStats.currentNumDocs;
|
||||
this.currentSizeInBytes += mergeStats.currentSizeInBytes;
|
||||
this.totalStoppedTimeInMillis += mergeStats.totalStoppedTimeInMillis;
|
||||
this.totalThrottledTimeInMillis += mergeStats.totalThrottledTimeInMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -84,6 +93,34 @@ public class MergeStats implements Streamable, ToXContent {
|
||||
return this.totalTimeInMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
* The total time large merges were stopped so smaller merges could finish.
|
||||
*/
|
||||
public long getTotalStoppedTimeInMillis() {
|
||||
return this.totalStoppedTimeInMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
* The total time large merges were stopped so smaller merges could finish.
|
||||
*/
|
||||
public TimeValue getTotalStoppedTime() {
|
||||
return new TimeValue(totalStoppedTimeInMillis);
|
||||
}
|
||||
|
||||
/**
|
||||
* The total time merge IO writes were throttled.
|
||||
*/
|
||||
public long getTotalThrottledTimeInMillis() {
|
||||
return this.totalThrottledTimeInMillis;
|
||||
}
|
||||
|
||||
/**
|
||||
* The total time merge IO writes were throttled.
|
||||
*/
|
||||
public TimeValue getTotalThrottledTime() {
|
||||
return new TimeValue(totalThrottledTimeInMillis);
|
||||
}
|
||||
|
||||
/**
|
||||
* The total time merges have been executed.
|
||||
*/
|
||||
@ -138,6 +175,8 @@ public class MergeStats implements Streamable, ToXContent {
|
||||
builder.timeValueField(Fields.TOTAL_TIME_IN_MILLIS, Fields.TOTAL_TIME, totalTimeInMillis);
|
||||
builder.field(Fields.TOTAL_DOCS, totalNumDocs);
|
||||
builder.byteSizeField(Fields.TOTAL_SIZE_IN_BYTES, Fields.TOTAL_SIZE, totalSizeInBytes);
|
||||
builder.timeValueField(Fields.TOTAL_STOPPED_TIME_IN_MILLIS, Fields.TOTAL_STOPPED_TIME, totalStoppedTimeInMillis);
|
||||
builder.timeValueField(Fields.TOTAL_THROTTLED_TIME_IN_MILLIS, Fields.TOTAL_THROTTLED_TIME, totalThrottledTimeInMillis);
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
@ -151,6 +190,10 @@ public class MergeStats implements Streamable, ToXContent {
|
||||
static final XContentBuilderString TOTAL = new XContentBuilderString("total");
|
||||
static final XContentBuilderString TOTAL_TIME = new XContentBuilderString("total_time");
|
||||
static final XContentBuilderString TOTAL_TIME_IN_MILLIS = new XContentBuilderString("total_time_in_millis");
|
||||
static final XContentBuilderString TOTAL_STOPPED_TIME = new XContentBuilderString("total_stopped_time");
|
||||
static final XContentBuilderString TOTAL_STOPPED_TIME_IN_MILLIS = new XContentBuilderString("total_stopped_time_in_millis");
|
||||
static final XContentBuilderString TOTAL_THROTTLED_TIME = new XContentBuilderString("total_throttled_time");
|
||||
static final XContentBuilderString TOTAL_THROTTLED_TIME_IN_MILLIS = new XContentBuilderString("total_throttled_time_in_millis");
|
||||
static final XContentBuilderString TOTAL_DOCS = new XContentBuilderString("total_docs");
|
||||
static final XContentBuilderString TOTAL_SIZE = new XContentBuilderString("total_size");
|
||||
static final XContentBuilderString TOTAL_SIZE_IN_BYTES = new XContentBuilderString("total_size_in_bytes");
|
||||
@ -165,6 +208,10 @@ public class MergeStats implements Streamable, ToXContent {
|
||||
current = in.readVLong();
|
||||
currentNumDocs = in.readVLong();
|
||||
currentSizeInBytes = in.readVLong();
|
||||
if (in.getVersion().onOrAfter(Version.V_2_0_0)) {
|
||||
totalStoppedTimeInMillis = in.readVLong();
|
||||
totalThrottledTimeInMillis = in.readVLong();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -176,5 +223,9 @@ public class MergeStats implements Streamable, ToXContent {
|
||||
out.writeVLong(current);
|
||||
out.writeVLong(currentNumDocs);
|
||||
out.writeVLong(currentSizeInBytes);
|
||||
if (out.getVersion().onOrAfter(Version.V_2_0_0)) {
|
||||
out.writeVLong(totalStoppedTimeInMillis);
|
||||
out.writeVLong(totalThrottledTimeInMillis);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -41,22 +41,18 @@ import java.io.IOException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
|
||||
|
||||
private final IndexSettingsService indexSettingsService;
|
||||
private final ApplySettings applySettings = new ApplySettings();
|
||||
|
||||
private static final String MAX_THREAD_COUNT_KEY = "max_thread_count";
|
||||
private static final String MAX_MERGE_COUNT_KEY = "max_merge_count";
|
||||
|
||||
public static final String MAX_THREAD_COUNT = "index.merge.scheduler." + MAX_THREAD_COUNT_KEY;
|
||||
public static final String MAX_MERGE_COUNT = "index.merge.scheduler." + MAX_MERGE_COUNT_KEY;
|
||||
public static final String MAX_THREAD_COUNT = "index.merge.scheduler.max_thread_count";
|
||||
public static final String MAX_MERGE_COUNT = "index.merge.scheduler.max_merge_count";
|
||||
public static final String AUTO_THROTTLE = "index.merge.scheduler.auto_throttle";
|
||||
|
||||
private volatile int maxThreadCount;
|
||||
private volatile int maxMergeCount;
|
||||
private volatile boolean autoThrottle;
|
||||
|
||||
private Set<CustomConcurrentMergeScheduler> schedulers = new CopyOnWriteArraySet<>();
|
||||
|
||||
@ -64,10 +60,10 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
|
||||
public ConcurrentMergeSchedulerProvider(ShardId shardId, @IndexSettings Settings indexSettings, ThreadPool threadPool, IndexSettingsService indexSettingsService) {
|
||||
super(shardId, indexSettings, threadPool);
|
||||
this.indexSettingsService = indexSettingsService;
|
||||
// TODO LUCENE MONITOR this will change in Lucene 4.0
|
||||
this.maxThreadCount = componentSettings.getAsInt(MAX_THREAD_COUNT_KEY, Math.max(1, Math.min(3, EsExecutors.boundedNumberOfProcessors(indexSettings) / 2)));
|
||||
this.maxMergeCount = componentSettings.getAsInt(MAX_MERGE_COUNT_KEY, maxThreadCount + 2);
|
||||
logger.debug("using [concurrent] merge scheduler with max_thread_count[{}], max_merge_count[{}]", maxThreadCount, maxMergeCount);
|
||||
this.maxThreadCount = indexSettings.getAsInt(MAX_THREAD_COUNT, Math.max(1, Math.min(4, EsExecutors.boundedNumberOfProcessors(indexSettings) / 2)));
|
||||
this.maxMergeCount = indexSettings.getAsInt(MAX_MERGE_COUNT, maxThreadCount + 5);
|
||||
this.autoThrottle = indexSettings.getAsBoolean(AUTO_THROTTLE, true);
|
||||
logger.debug("using [concurrent] merge scheduler with max_thread_count[{}], max_merge_count[{}], auto_throttle[{}]", maxThreadCount, maxMergeCount, autoThrottle);
|
||||
|
||||
indexSettingsService.addListener(applySettings);
|
||||
}
|
||||
@ -75,10 +71,14 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
|
||||
@Override
|
||||
public MergeScheduler newMergeScheduler() {
|
||||
CustomConcurrentMergeScheduler concurrentMergeScheduler = new CustomConcurrentMergeScheduler(logger, shardId, this);
|
||||
// which would then stall if there are 2 merges in flight, and unstall once we are back to 1 or 0 merges
|
||||
// NOTE: we pass maxMergeCount+1 here so that CMS will allow one too many merges to kick off which then allows
|
||||
// InternalEngine.IndexThrottle to detect too-many-merges and throttle:
|
||||
concurrentMergeScheduler.setMaxMergesAndThreads(maxMergeCount+1, maxThreadCount);
|
||||
if (autoThrottle) {
|
||||
concurrentMergeScheduler.enableAutoIOThrottle();
|
||||
} else {
|
||||
concurrentMergeScheduler.disableAutoIOThrottle();
|
||||
}
|
||||
schedulers.add(concurrentMergeScheduler);
|
||||
return concurrentMergeScheduler;
|
||||
}
|
||||
@ -88,7 +88,9 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
|
||||
MergeStats mergeStats = new MergeStats();
|
||||
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
|
||||
mergeStats.add(scheduler.totalMerges(), scheduler.totalMergeTime(), scheduler.totalMergeNumDocs(), scheduler.totalMergeSizeInBytes(),
|
||||
scheduler.currentMerges(), scheduler.currentMergesNumDocs(), scheduler.currentMergesSizeInBytes());
|
||||
scheduler.currentMerges(), scheduler.currentMergesNumDocs(), scheduler.currentMergesSizeInBytes(),
|
||||
scheduler.totalMergeStoppedTimeMillis(),
|
||||
scheduler.totalMergeThrottledTimeMillis());
|
||||
}
|
||||
return mergeStats;
|
||||
}
|
||||
@ -165,7 +167,7 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
|
||||
public void onRefreshSettings(Settings settings) {
|
||||
int maxThreadCount = settings.getAsInt(MAX_THREAD_COUNT, ConcurrentMergeSchedulerProvider.this.maxThreadCount);
|
||||
if (maxThreadCount != ConcurrentMergeSchedulerProvider.this.maxThreadCount) {
|
||||
logger.info("updating [{}] from [{}] to [{}]", MAX_THREAD_COUNT_KEY, ConcurrentMergeSchedulerProvider.this.maxThreadCount, maxThreadCount);
|
||||
logger.info("updating [{}] from [{}] to [{}]", MAX_THREAD_COUNT, ConcurrentMergeSchedulerProvider.this.maxThreadCount, maxThreadCount);
|
||||
ConcurrentMergeSchedulerProvider.this.maxThreadCount = maxThreadCount;
|
||||
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
|
||||
scheduler.setMaxMergesAndThreads(ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxThreadCount);
|
||||
@ -174,12 +176,25 @@ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
|
||||
|
||||
int maxMergeCount = settings.getAsInt(MAX_MERGE_COUNT, ConcurrentMergeSchedulerProvider.this.maxMergeCount);
|
||||
if (maxMergeCount != ConcurrentMergeSchedulerProvider.this.maxMergeCount) {
|
||||
logger.info("updating [{}] from [{}] to [{}]", MAX_MERGE_COUNT_KEY, ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxMergeCount);
|
||||
logger.info("updating [{}] from [{}] to [{}]", MAX_MERGE_COUNT, ConcurrentMergeSchedulerProvider.this.maxMergeCount, maxMergeCount);
|
||||
ConcurrentMergeSchedulerProvider.this.maxMergeCount = maxMergeCount;
|
||||
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
|
||||
scheduler.setMaxMergesAndThreads(maxMergeCount, ConcurrentMergeSchedulerProvider.this.maxThreadCount);
|
||||
}
|
||||
}
|
||||
|
||||
boolean autoThrottle = settings.getAsBoolean(AUTO_THROTTLE, ConcurrentMergeSchedulerProvider.this.autoThrottle);
|
||||
if (autoThrottle != ConcurrentMergeSchedulerProvider.this.autoThrottle) {
|
||||
logger.info("updating [{}] from [{}] to [{}]", AUTO_THROTTLE, ConcurrentMergeSchedulerProvider.this.autoThrottle, autoThrottle);
|
||||
ConcurrentMergeSchedulerProvider.this.autoThrottle = autoThrottle;
|
||||
for (CustomConcurrentMergeScheduler scheduler : schedulers) {
|
||||
if (autoThrottle) {
|
||||
scheduler.enableAutoIOThrottle();
|
||||
} else {
|
||||
scheduler.disableAutoIOThrottle();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -34,9 +34,6 @@ import java.io.Closeable;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public abstract class MergeSchedulerProvider extends AbstractIndexShardComponent implements IndexShardComponent, Closeable {
|
||||
|
||||
public static interface FailureListener {
|
||||
|
@ -51,10 +51,9 @@ public class IndexDynamicSettingsModule extends AbstractModule {
|
||||
|
||||
public IndexDynamicSettingsModule() {
|
||||
indexDynamicSettings = new DynamicSettings();
|
||||
indexDynamicSettings.addDynamicSetting(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE);
|
||||
indexDynamicSettings.addDynamicSetting(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE);
|
||||
indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT);
|
||||
indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT);
|
||||
indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.AUTO_THROTTLE);
|
||||
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_REQUIRE_GROUP + "*");
|
||||
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_INCLUDE_GROUP + "*");
|
||||
indexDynamicSettings.addDynamicSetting(FilterAllocationDecider.INDEX_ROUTING_EXCLUDE_GROUP + "*");
|
||||
|
@ -39,8 +39,6 @@ public abstract class DirectoryService extends AbstractIndexShardComponent {
|
||||
|
||||
public abstract Directory[] build() throws IOException;
|
||||
|
||||
public abstract long throttleTimeInNanos();
|
||||
|
||||
/**
|
||||
* Creates a new Directory from the given distributor.
|
||||
* The default implementation returns a new {@link org.elasticsearch.index.store.DistributorDirectory}
|
||||
@ -58,4 +56,4 @@ public abstract class DirectoryService extends AbstractIndexShardComponent {
|
||||
}
|
||||
return new DistributorDirectory(distributor);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -19,7 +19,6 @@
|
||||
|
||||
package org.elasticsearch.index.store;
|
||||
|
||||
import org.apache.lucene.store.StoreRateLimiting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.settings.IndexSettings;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
@ -33,12 +32,6 @@ import java.nio.file.Path;
|
||||
*/
|
||||
public interface IndexStore extends Closeable {
|
||||
|
||||
/**
|
||||
* Returns the rate limiting, either of the index is explicitly configured, or
|
||||
* the node level one (defaults to the node level one).
|
||||
*/
|
||||
StoreRateLimiting rateLimiting();
|
||||
|
||||
/**
|
||||
* The shard store class that should be used for each shard.
|
||||
*/
|
||||
|
@ -283,7 +283,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
|
||||
|
||||
public StoreStats stats() throws IOException {
|
||||
ensureOpen();
|
||||
return new StoreStats(Directories.estimateSize(directory), directoryService.throttleTimeInNanos());
|
||||
return new StoreStats(Directories.estimateSize(directory));
|
||||
}
|
||||
|
||||
public void renameFile(String from, String to) throws IOException {
|
||||
|
@ -19,6 +19,9 @@
|
||||
|
||||
package org.elasticsearch.index.store;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
@ -28,23 +31,18 @@ import org.elasticsearch.common.xcontent.ToXContent;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilderString;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class StoreStats implements Streamable, ToXContent {
|
||||
|
||||
private long sizeInBytes;
|
||||
|
||||
private long throttleTimeInNanos;
|
||||
|
||||
public StoreStats() {
|
||||
|
||||
}
|
||||
|
||||
public StoreStats(long sizeInBytes, long throttleTimeInNanos) {
|
||||
public StoreStats(long sizeInBytes) {
|
||||
this.sizeInBytes = sizeInBytes;
|
||||
this.throttleTimeInNanos = throttleTimeInNanos;
|
||||
}
|
||||
|
||||
public void add(StoreStats stats) {
|
||||
@ -52,7 +50,6 @@ public class StoreStats implements Streamable, ToXContent {
|
||||
return;
|
||||
}
|
||||
sizeInBytes += stats.sizeInBytes;
|
||||
throttleTimeInNanos += stats.throttleTimeInNanos;
|
||||
}
|
||||
|
||||
|
||||
@ -72,14 +69,6 @@ public class StoreStats implements Streamable, ToXContent {
|
||||
return size();
|
||||
}
|
||||
|
||||
public TimeValue throttleTime() {
|
||||
return TimeValue.timeValueNanos(throttleTimeInNanos);
|
||||
}
|
||||
|
||||
public TimeValue getThrottleTime() {
|
||||
return throttleTime();
|
||||
}
|
||||
|
||||
public static StoreStats readStoreStats(StreamInput in) throws IOException {
|
||||
StoreStats store = new StoreStats();
|
||||
store.readFrom(in);
|
||||
@ -89,20 +78,25 @@ public class StoreStats implements Streamable, ToXContent {
|
||||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
sizeInBytes = in.readVLong();
|
||||
throttleTimeInNanos = in.readVLong();
|
||||
if (in.getVersion().before(Version.V_2_0_0)) {
|
||||
// Ignore throttleTimeInNanos
|
||||
in.readVLong();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVLong(sizeInBytes);
|
||||
out.writeVLong(throttleTimeInNanos);
|
||||
if (out.getVersion().before(Version.V_2_0_0)) {
|
||||
// Send dummy throttleTimeInNanos
|
||||
out.writeVLong(0);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(Fields.STORE);
|
||||
builder.byteSizeField(Fields.SIZE_IN_BYTES, Fields.SIZE, sizeInBytes);
|
||||
builder.timeValueField(Fields.THROTTLE_TIME_IN_MILLIS, Fields.THROTTLE_TIME, throttleTime());
|
||||
builder.endObject();
|
||||
return builder;
|
||||
}
|
||||
@ -111,8 +105,5 @@ public class StoreStats implements Streamable, ToXContent {
|
||||
static final XContentBuilderString STORE = new XContentBuilderString("store");
|
||||
static final XContentBuilderString SIZE = new XContentBuilderString("size");
|
||||
static final XContentBuilderString SIZE_IN_BYTES = new XContentBuilderString("size_in_bytes");
|
||||
|
||||
static final XContentBuilderString THROTTLE_TIME = new XContentBuilderString("throttle_time");
|
||||
static final XContentBuilderString THROTTLE_TIME_IN_MILLIS = new XContentBuilderString("throttle_time_in_millis");
|
||||
}
|
||||
}
|
||||
|
@ -16,10 +16,17 @@
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.lucene.store;
|
||||
|
||||
/**
|
||||
*/
|
||||
package org.elasticsearch.index.store;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FileSwitchDirectory;
|
||||
import org.apache.lucene.store.MMapDirectory;
|
||||
import org.apache.lucene.store.NIOFSDirectory;
|
||||
import org.apache.lucene.store.SimpleFSDirectory;
|
||||
|
||||
public final class StoreUtils {
|
||||
|
||||
private StoreUtils() {
|
||||
@ -46,4 +53,12 @@ public final class StoreUtils {
|
||||
|
||||
return directory.toString();
|
||||
}
|
||||
|
||||
public static String toString(Directory[] directories) {
|
||||
String[] strings = new String[directories.length];
|
||||
for(int i=0;i<directories.length;i++) {
|
||||
strings[i] = toString(directories[i]);
|
||||
}
|
||||
return Arrays.toString(strings);
|
||||
}
|
||||
}
|
@ -19,17 +19,18 @@
|
||||
|
||||
package org.elasticsearch.index.store.distributor;
|
||||
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FSDirectory;
|
||||
import org.elasticsearch.index.store.DirectoryUtils;
|
||||
import org.elasticsearch.index.store.DirectoryService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.FileStore;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.FSDirectory;
|
||||
import org.elasticsearch.index.store.DirectoryService;
|
||||
import org.elasticsearch.index.store.DirectoryUtils;
|
||||
import org.elasticsearch.index.store.StoreUtils;
|
||||
|
||||
public abstract class AbstractDistributor implements Distributor {
|
||||
|
||||
protected final Directory[] delegates;
|
||||
@ -68,7 +69,7 @@ public abstract class AbstractDistributor implements Distributor {
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return name() + Arrays.toString(delegates);
|
||||
return name() + StoreUtils.toString(delegates);
|
||||
}
|
||||
|
||||
protected abstract Directory doAny() throws IOException;
|
||||
|
@ -36,27 +36,15 @@ import org.elasticsearch.index.store.StoreException;
|
||||
|
||||
/**
|
||||
*/
|
||||
public abstract class FsDirectoryService extends DirectoryService implements StoreRateLimiting.Listener, StoreRateLimiting.Provider {
|
||||
public abstract class FsDirectoryService extends DirectoryService {
|
||||
|
||||
protected final IndexStore indexStore;
|
||||
|
||||
private final CounterMetric rateLimitingTimeInNanos = new CounterMetric();
|
||||
|
||||
public FsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) {
|
||||
super(shardId, indexSettings);
|
||||
this.indexStore = indexStore;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long throttleTimeInNanos() {
|
||||
return rateLimitingTimeInNanos.count();
|
||||
}
|
||||
|
||||
@Override
|
||||
public StoreRateLimiting rateLimiting() {
|
||||
return indexStore.rateLimiting();
|
||||
}
|
||||
|
||||
protected final LockFactory buildLockFactory() throws IOException {
|
||||
String fsLock = componentSettings.get("lock", componentSettings.get("fs_lock", "native"));
|
||||
LockFactory lockFactory;
|
||||
@ -77,16 +65,10 @@ public abstract class FsDirectoryService extends DirectoryService implements Sto
|
||||
Directory[] dirs = new Directory[locations.length];
|
||||
for (int i = 0; i < dirs.length; i++) {
|
||||
Files.createDirectories(locations[i]);
|
||||
Directory wrapped = newFSDirectory(locations[i], buildLockFactory());
|
||||
dirs[i] = new RateLimitedFSDirectory(wrapped, this, this) ;
|
||||
dirs[i] = newFSDirectory(locations[i], buildLockFactory());
|
||||
}
|
||||
return dirs;
|
||||
}
|
||||
|
||||
protected abstract Directory newFSDirectory(Path location, LockFactory lockFactory) throws IOException;
|
||||
|
||||
@Override
|
||||
public void onPause(long nanos) {
|
||||
rateLimitingTimeInNanos.inc(nanos);
|
||||
}
|
||||
}
|
||||
|
@ -19,7 +19,6 @@
|
||||
|
||||
package org.elasticsearch.index.store.support;
|
||||
|
||||
import org.apache.lucene.store.StoreRateLimiting;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.ElasticsearchIllegalStateException;
|
||||
import org.elasticsearch.common.io.FileSystemUtils;
|
||||
@ -43,37 +42,8 @@ import java.nio.file.Path;
|
||||
*/
|
||||
public abstract class AbstractIndexStore extends AbstractIndexComponent implements IndexStore {
|
||||
|
||||
public static final String INDEX_STORE_THROTTLE_TYPE = "index.store.throttle.type";
|
||||
public static final String INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC = "index.store.throttle.max_bytes_per_sec";
|
||||
|
||||
public static final String INDEX_FOLDER_NAME = "index";
|
||||
public static final String TRANSLOG_FOLDER_NAME = "translog";
|
||||
|
||||
class ApplySettings implements IndexSettingsService.Listener {
|
||||
@Override
|
||||
public void onRefreshSettings(Settings settings) {
|
||||
String rateLimitingType = settings.get(INDEX_STORE_THROTTLE_TYPE, AbstractIndexStore.this.rateLimitingType);
|
||||
if (!rateLimitingType.equals(AbstractIndexStore.this.rateLimitingType)) {
|
||||
logger.info("updating index.store.throttle.type from [{}] to [{}]", AbstractIndexStore.this.rateLimitingType, rateLimitingType);
|
||||
if (rateLimitingType.equalsIgnoreCase("node")) {
|
||||
AbstractIndexStore.this.rateLimitingType = rateLimitingType;
|
||||
AbstractIndexStore.this.nodeRateLimiting = true;
|
||||
} else {
|
||||
StoreRateLimiting.Type.fromString(rateLimitingType);
|
||||
AbstractIndexStore.this.rateLimitingType = rateLimitingType;
|
||||
AbstractIndexStore.this.nodeRateLimiting = false;
|
||||
AbstractIndexStore.this.rateLimiting.setType(rateLimitingType);
|
||||
}
|
||||
}
|
||||
|
||||
ByteSizeValue rateLimitingThrottle = settings.getAsBytesSize(INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, AbstractIndexStore.this.rateLimitingThrottle);
|
||||
if (!rateLimitingThrottle.equals(AbstractIndexStore.this.rateLimitingThrottle)) {
|
||||
logger.info("updating index.store.throttle.max_bytes_per_sec from [{}] to [{}], note, type is [{}]", AbstractIndexStore.this.rateLimitingThrottle, rateLimitingThrottle, AbstractIndexStore.this.rateLimitingType);
|
||||
AbstractIndexStore.this.rateLimitingThrottle = rateLimitingThrottle;
|
||||
AbstractIndexStore.this.rateLimiting.setMaxRate(rateLimitingThrottle);
|
||||
}
|
||||
}
|
||||
}
|
||||
private final NodeEnvironment nodeEnv;
|
||||
|
||||
private final Path[] locations;
|
||||
@ -82,32 +52,11 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen
|
||||
|
||||
protected final IndicesStore indicesStore;
|
||||
|
||||
private volatile String rateLimitingType;
|
||||
private volatile ByteSizeValue rateLimitingThrottle;
|
||||
private volatile boolean nodeRateLimiting;
|
||||
|
||||
private final StoreRateLimiting rateLimiting = new StoreRateLimiting();
|
||||
|
||||
private final ApplySettings applySettings = new ApplySettings();
|
||||
|
||||
protected AbstractIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, IndicesStore indicesStore, NodeEnvironment nodeEnv) {
|
||||
super(index, indexSettings);
|
||||
this.indexService = indexService;
|
||||
this.indicesStore = indicesStore;
|
||||
|
||||
this.rateLimitingType = indexSettings.get(INDEX_STORE_THROTTLE_TYPE, "node");
|
||||
if (rateLimitingType.equalsIgnoreCase("node")) {
|
||||
nodeRateLimiting = true;
|
||||
} else {
|
||||
nodeRateLimiting = false;
|
||||
rateLimiting.setType(rateLimitingType);
|
||||
}
|
||||
this.rateLimitingThrottle = indexSettings.getAsBytesSize(INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, new ByteSizeValue(0));
|
||||
rateLimiting.setMaxRate(rateLimitingThrottle);
|
||||
|
||||
logger.debug("using index.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimitingType, rateLimitingThrottle);
|
||||
|
||||
indexService.settingsService().addListener(applySettings);
|
||||
this.nodeEnv = nodeEnv;
|
||||
if (nodeEnv.hasNodeFile()) {
|
||||
this.locations = nodeEnv.indexPaths(index);
|
||||
@ -118,15 +67,8 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen
|
||||
|
||||
@Override
|
||||
public void close() throws ElasticsearchException {
|
||||
indexService.settingsService().removeListener(applySettings);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StoreRateLimiting rateLimiting() {
|
||||
return nodeRateLimiting ? indicesStore.rateLimiting() : this.rateLimiting;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public boolean canDeleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings) {
|
||||
if (locations == null) {
|
||||
|
@ -19,7 +19,6 @@
|
||||
|
||||
package org.elasticsearch.indices.store;
|
||||
|
||||
import org.apache.lucene.store.StoreRateLimiting;
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.cluster.*;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
@ -60,34 +59,10 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||
*/
|
||||
public class IndicesStore extends AbstractComponent implements ClusterStateListener, Closeable {
|
||||
|
||||
public static final String INDICES_STORE_THROTTLE_TYPE = "indices.store.throttle.type";
|
||||
public static final String INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC = "indices.store.throttle.max_bytes_per_sec";
|
||||
|
||||
public static final String ACTION_SHARD_EXISTS = "internal:index/shard/exists";
|
||||
|
||||
private static final EnumSet<IndexShardState> ACTIVE_STATES = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED);
|
||||
|
||||
class ApplySettings implements NodeSettingsService.Listener {
|
||||
@Override
|
||||
public void onRefreshSettings(Settings settings) {
|
||||
String rateLimitingType = settings.get(INDICES_STORE_THROTTLE_TYPE, IndicesStore.this.rateLimitingType);
|
||||
// try and parse the type
|
||||
StoreRateLimiting.Type.fromString(rateLimitingType);
|
||||
if (!rateLimitingType.equals(IndicesStore.this.rateLimitingType)) {
|
||||
logger.info("updating indices.store.throttle.type from [{}] to [{}]", IndicesStore.this.rateLimitingType, rateLimitingType);
|
||||
IndicesStore.this.rateLimitingType = rateLimitingType;
|
||||
IndicesStore.this.rateLimiting.setType(rateLimitingType);
|
||||
}
|
||||
|
||||
ByteSizeValue rateLimitingThrottle = settings.getAsBytesSize(INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, IndicesStore.this.rateLimitingThrottle);
|
||||
if (!rateLimitingThrottle.equals(IndicesStore.this.rateLimitingThrottle)) {
|
||||
logger.info("updating indices.store.throttle.max_bytes_per_sec from [{}] to [{}], note, type is [{}]", IndicesStore.this.rateLimitingThrottle, rateLimitingThrottle, IndicesStore.this.rateLimitingType);
|
||||
IndicesStore.this.rateLimitingThrottle = rateLimitingThrottle;
|
||||
IndicesStore.this.rateLimiting.setMaxRate(rateLimitingThrottle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private final NodeEnvironment nodeEnv;
|
||||
|
||||
private final NodeSettingsService nodeSettingsService;
|
||||
@ -97,12 +72,6 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
||||
private final ClusterService clusterService;
|
||||
private final TransportService transportService;
|
||||
|
||||
private volatile String rateLimitingType;
|
||||
private volatile ByteSizeValue rateLimitingThrottle;
|
||||
private final StoreRateLimiting rateLimiting = new StoreRateLimiting();
|
||||
|
||||
private final ApplySettings applySettings = new ApplySettings();
|
||||
|
||||
@Inject
|
||||
public IndicesStore(Settings settings, NodeEnvironment nodeEnv, NodeSettingsService nodeSettingsService, IndicesService indicesService,
|
||||
ClusterService clusterService, TransportService transportService) {
|
||||
@ -114,15 +83,6 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
||||
this.transportService = transportService;
|
||||
transportService.registerHandler(ACTION_SHARD_EXISTS, new ShardActiveRequestHandler());
|
||||
|
||||
// we limit with 20MB / sec by default with a default type set to merge sice 0.90.1
|
||||
this.rateLimitingType = componentSettings.get("throttle.type", StoreRateLimiting.Type.MERGE.name());
|
||||
rateLimiting.setType(rateLimitingType);
|
||||
this.rateLimitingThrottle = componentSettings.getAsBytesSize("throttle.max_bytes_per_sec", new ByteSizeValue(20, ByteSizeUnit.MB));
|
||||
rateLimiting.setMaxRate(rateLimitingThrottle);
|
||||
|
||||
logger.debug("using indices.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimitingType, rateLimitingThrottle);
|
||||
|
||||
nodeSettingsService.addListener(applySettings);
|
||||
clusterService.addLast(this);
|
||||
}
|
||||
|
||||
@ -135,13 +95,8 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
||||
this.transportService = null;
|
||||
}
|
||||
|
||||
public StoreRateLimiting rateLimiting() {
|
||||
return this.rateLimiting;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
nodeSettingsService.removeListener(applySettings);
|
||||
clusterService.remove(this);
|
||||
}
|
||||
|
||||
@ -458,4 +413,4 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
||||
node.writeTo(out);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -179,11 +179,6 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
|
||||
public Directory[] build() throws IOException {
|
||||
return new Directory[]{ directory };
|
||||
}
|
||||
|
||||
@Override
|
||||
public long throttleTimeInNanos() {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
return new Store(shardId, EMPTY_SETTINGS, directoryService, new LeastUsedDistributor(directoryService), new DummyShardLock(shardId));
|
||||
}
|
||||
|
@ -178,11 +178,6 @@ public class MergePolicySettingsTest extends ElasticsearchTestCase {
|
||||
public Directory[] build() throws IOException {
|
||||
return new Directory[] { new RAMDirectory() } ;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long throttleTimeInNanos() {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
return new Store(shardId, settings, directoryService, new LeastUsedDistributor(directoryService), new DummyShardLock(shardId));
|
||||
}
|
||||
|
@ -710,11 +710,6 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
|
||||
public Directory[] build() throws IOException {
|
||||
return dirs;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long throttleTimeInNanos() {
|
||||
return random.nextInt(1000);
|
||||
}
|
||||
}
|
||||
|
||||
public static void assertConsistent(Store store, Store.MetadataSnapshot metadata) throws IOException {
|
||||
|
@ -150,11 +150,6 @@ public class DistributorTests extends ElasticsearchTestCase {
|
||||
public Directory[] build() throws IOException {
|
||||
return directories;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long throttleTimeInNanos() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
public static class FakeFsDirectory extends FSDirectory {
|
||||
|
@ -23,7 +23,6 @@ import org.apache.log4j.AppenderSkeleton;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.apache.log4j.spi.LoggingEvent;
|
||||
import org.apache.lucene.util.LuceneTestCase.AwaitsFix;
|
||||
import org.apache.lucene.util.LuceneTestCase.Slow;
|
||||
import org.elasticsearch.ElasticsearchIllegalArgumentException;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
@ -126,145 +125,12 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
|
||||
|
||||
}
|
||||
|
||||
// #6626: make sure we can update throttle settings and the changes take effect
|
||||
@Test
|
||||
@Slow
|
||||
public void testUpdateThrottleSettings() {
|
||||
|
||||
// No throttling at first, only 1 non-replicated shard, force lots of merging:
|
||||
assertAcked(prepareCreate("test")
|
||||
.setSettings(ImmutableSettings.builder()
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "none")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
|
||||
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
|
||||
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
|
||||
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
|
||||
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "2")
|
||||
));
|
||||
ensureGreen();
|
||||
long termUpto = 0;
|
||||
for(int i=0;i<100;i++) {
|
||||
// Provoke slowish merging by making many unique terms:
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for(int j=0;j<100;j++) {
|
||||
sb.append(' ');
|
||||
sb.append(termUpto++);
|
||||
}
|
||||
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
|
||||
if (i % 2 == 0) {
|
||||
refresh();
|
||||
}
|
||||
}
|
||||
|
||||
// No merge IO throttling should have happened:
|
||||
NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
|
||||
for(NodeStats stats : nodesStats.getNodes()) {
|
||||
assertThat(stats.getIndices().getStore().getThrottleTime().getMillis(), equalTo(0l));
|
||||
}
|
||||
|
||||
logger.info("test: set low merge throttling");
|
||||
|
||||
// Now updates settings to turn on merge throttling lowish rate
|
||||
client()
|
||||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(ImmutableSettings.builder()
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "merge")
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, "1mb"))
|
||||
.get();
|
||||
|
||||
// Make sure setting says it is in fact changed:
|
||||
GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings("test").get();
|
||||
assertThat(getSettingsResponse.getSetting("test", AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE), equalTo("merge"));
|
||||
|
||||
// Also make sure we see throttling kicking in:
|
||||
boolean done = false;
|
||||
while (done == false) {
|
||||
// Provoke slowish merging by making many unique terms:
|
||||
for(int i=0;i<5;i++) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for(int j=0;j<100;j++) {
|
||||
sb.append(' ');
|
||||
sb.append(termUpto++);
|
||||
sb.append(" some random text that keeps repeating over and over again hambone");
|
||||
}
|
||||
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
|
||||
}
|
||||
refresh();
|
||||
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
|
||||
for(NodeStats stats : nodesStats.getNodes()) {
|
||||
long throttleMillis = stats.getIndices().getStore().getThrottleTime().getMillis();
|
||||
if (throttleMillis > 0) {
|
||||
done = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("test: disable merge throttling");
|
||||
|
||||
// Now updates settings to disable merge throttling
|
||||
client()
|
||||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(ImmutableSettings.builder()
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "none"))
|
||||
.get();
|
||||
|
||||
// Optimize does a waitForMerges, which we must do to make sure all in-flight (throttled) merges finish:
|
||||
logger.info("test: optimize");
|
||||
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get();
|
||||
logger.info("test: optimize done");
|
||||
|
||||
// Record current throttling so far
|
||||
long sumThrottleTime = 0;
|
||||
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
|
||||
for(NodeStats stats : nodesStats.getNodes()) {
|
||||
sumThrottleTime += stats.getIndices().getStore().getThrottleTime().getMillis();
|
||||
}
|
||||
|
||||
// Make sure no further throttling happens:
|
||||
for(int i=0;i<100;i++) {
|
||||
// Provoke slowish merging by making many unique terms:
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for(int j=0;j<100;j++) {
|
||||
sb.append(' ');
|
||||
sb.append(termUpto++);
|
||||
}
|
||||
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
|
||||
if (i % 2 == 0) {
|
||||
refresh();
|
||||
}
|
||||
}
|
||||
logger.info("test: done indexing after disabling throttling");
|
||||
|
||||
long newSumThrottleTime = 0;
|
||||
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
|
||||
for(NodeStats stats : nodesStats.getNodes()) {
|
||||
newSumThrottleTime += stats.getIndices().getStore().getThrottleTime().getMillis();
|
||||
}
|
||||
|
||||
// No additional merge IO throttling should have happened:
|
||||
assertEquals(sumThrottleTime, newSumThrottleTime);
|
||||
|
||||
// Optimize & flush and wait; else we sometimes get a "Delete Index failed - not acked"
|
||||
// when ElasticsearchIntegrationTest.after tries to remove indices created by the test:
|
||||
|
||||
// Wait for merges to finish
|
||||
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get();
|
||||
flush();
|
||||
|
||||
logger.info("test: test done");
|
||||
}
|
||||
|
||||
private static class MockAppender extends AppenderSkeleton {
|
||||
public boolean sawIndexWriterMessage;
|
||||
public boolean sawFlushDeletes;
|
||||
public boolean sawMergeThreadPaused;
|
||||
public boolean sawUpdateSetting;
|
||||
public boolean sawUpdateMaxThreadCount;
|
||||
public boolean sawUpdateAutoThrottle;
|
||||
|
||||
@Override
|
||||
protected void append(LoggingEvent event) {
|
||||
@ -274,8 +140,11 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
|
||||
sawFlushDeletes |= message.contains("IW: apply all deletes during flush");
|
||||
sawMergeThreadPaused |= message.contains("CMS: pause thread");
|
||||
}
|
||||
if (event.getLevel() == Level.INFO && message.contains("updating [max_thread_count] from [10000] to [1]")) {
|
||||
sawUpdateSetting = true;
|
||||
if (event.getLevel() == Level.INFO && message.contains("updating [index.merge.scheduler.max_thread_count] from [10000] to [1]")) {
|
||||
sawUpdateMaxThreadCount = true;
|
||||
}
|
||||
if (event.getLevel() == Level.INFO && message.contains("updating [index.merge.scheduler.auto_throttle] from [true] to [false]")) {
|
||||
sawUpdateAutoThrottle = true;
|
||||
}
|
||||
}
|
||||
|
||||
@ -289,10 +158,49 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateAutoThrottleSettings() {
|
||||
|
||||
MockAppender mockAppender = new MockAppender();
|
||||
Logger rootLogger = Logger.getRootLogger();
|
||||
Level savedLevel = rootLogger.getLevel();
|
||||
rootLogger.addAppender(mockAppender);
|
||||
rootLogger.setLevel(Level.TRACE);
|
||||
|
||||
try {
|
||||
// No throttling at first, only 1 non-replicated shard, force lots of merging:
|
||||
assertAcked(prepareCreate("test")
|
||||
.setSettings(ImmutableSettings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
|
||||
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
|
||||
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
|
||||
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
|
||||
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "2")
|
||||
));
|
||||
|
||||
// Disable auto throttle:
|
||||
client()
|
||||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(ImmutableSettings.builder()
|
||||
.put(ConcurrentMergeSchedulerProvider.AUTO_THROTTLE, "no"))
|
||||
.get();
|
||||
|
||||
assertTrue(mockAppender.sawUpdateAutoThrottle);
|
||||
|
||||
// Make sure setting says it is in fact changed:
|
||||
GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings("test").get();
|
||||
assertThat(getSettingsResponse.getSetting("test", ConcurrentMergeSchedulerProvider.AUTO_THROTTLE), equalTo("no"));
|
||||
} finally {
|
||||
rootLogger.removeAppender(mockAppender);
|
||||
rootLogger.setLevel(savedLevel);
|
||||
}
|
||||
}
|
||||
|
||||
// #6882: make sure we can change index.merge.scheduler.max_thread_count live
|
||||
@Test
|
||||
@Slow
|
||||
@AwaitsFix(bugUrl="Super slow because of LUCENE-6119. Muted until we clean up merge throttling.")
|
||||
public void testUpdateMergeMaxThreadCount() {
|
||||
|
||||
MockAppender mockAppender = new MockAppender();
|
||||
@ -303,11 +211,8 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
|
||||
|
||||
try {
|
||||
|
||||
// Tons of merge threads allowed, only 1 non-replicated shard, force lots of merging, throttle so they fall behind:
|
||||
assertAcked(prepareCreate("test")
|
||||
.setSettings(ImmutableSettings.builder()
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "merge")
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, "1mb")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
|
||||
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
|
||||
@ -316,79 +221,33 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
|
||||
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "10000")
|
||||
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "10000")
|
||||
));
|
||||
ensureGreen();
|
||||
long termUpto = 0;
|
||||
for(int i=0;i<100;i++) {
|
||||
// Provoke slowish merging by making many unique terms:
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for(int j=0;j<100;j++) {
|
||||
sb.append(' ');
|
||||
sb.append(termUpto++);
|
||||
}
|
||||
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
|
||||
if (i % 2 == 0) {
|
||||
refresh();
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue(mockAppender.sawFlushDeletes);
|
||||
assertFalse(mockAppender.sawMergeThreadPaused);
|
||||
mockAppender.sawFlushDeletes = false;
|
||||
mockAppender.sawMergeThreadPaused = false;
|
||||
assertFalse(mockAppender.sawUpdateMaxThreadCount);
|
||||
|
||||
assertFalse(mockAppender.sawUpdateSetting);
|
||||
|
||||
// Now make a live change to reduce allowed merge threads, and waaay over-throttle merging so they fall behind:
|
||||
// Now make a live change to reduce allowed merge threads:
|
||||
client()
|
||||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(ImmutableSettings.builder()
|
||||
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, "10kb")
|
||||
)
|
||||
.get();
|
||||
|
||||
try {
|
||||
// Make sure we log the change:
|
||||
assertTrue(mockAppender.sawUpdateMaxThreadCount);
|
||||
|
||||
// Make sure we log the change:
|
||||
assertTrue(mockAppender.sawUpdateSetting);
|
||||
|
||||
int i = 0;
|
||||
while (true) {
|
||||
// Provoke slowish merging by making many unique terms:
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for(int j=0;j<100;j++) {
|
||||
sb.append(' ');
|
||||
sb.append(termUpto++);
|
||||
}
|
||||
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
|
||||
if (i % 2 == 0) {
|
||||
refresh();
|
||||
}
|
||||
// This time we should see some merges were in fact paused:
|
||||
if (mockAppender.sawMergeThreadPaused) {
|
||||
break;
|
||||
}
|
||||
i++;
|
||||
}
|
||||
} finally {
|
||||
// Make merges fast again & finish merges before we try to close; else we sometimes get a "Delete Index failed - not acked"
|
||||
// when ElasticsearchIntegrationTest.after tries to remove indices created by the test:
|
||||
client()
|
||||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(ImmutableSettings.builder()
|
||||
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "3")
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, "20mb")
|
||||
)
|
||||
.get();
|
||||
|
||||
// Wait for merges to finish
|
||||
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get();
|
||||
}
|
||||
client()
|
||||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(ImmutableSettings.builder()
|
||||
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "3")
|
||||
)
|
||||
.get();
|
||||
|
||||
// Wait for merges to finish
|
||||
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get();
|
||||
|
||||
} finally {
|
||||
rootLogger.removeAppender(mockAppender);
|
||||
|
@ -301,90 +301,6 @@ public class IndexStatsTests extends ElasticsearchIntegrationTest {
|
||||
assertThat(client().admin().indices().prepareStats("idx").setQueryCache(true).get().getTotal().getQueryCache().getMemorySizeInBytes(), greaterThan(0l));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void nonThrottleStats() throws Exception {
|
||||
assertAcked(prepareCreate("test")
|
||||
.setSettings(ImmutableSettings.builder()
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "merge")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
|
||||
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
|
||||
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
|
||||
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
|
||||
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "10000")
|
||||
));
|
||||
ensureGreen();
|
||||
long termUpto = 0;
|
||||
IndicesStatsResponse stats;
|
||||
// Provoke slowish merging by making many unique terms:
|
||||
for(int i=0; i<100; i++) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for(int j=0; j<100; j++) {
|
||||
sb.append(' ');
|
||||
sb.append(termUpto++);
|
||||
sb.append(" some random text that keeps repeating over and over again hambone");
|
||||
}
|
||||
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
|
||||
}
|
||||
refresh();
|
||||
stats = client().admin().indices().prepareStats().execute().actionGet();
|
||||
//nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
|
||||
|
||||
stats = client().admin().indices().prepareStats().execute().actionGet();
|
||||
assertThat(stats.getPrimaries().getIndexing().getTotal().getThrottleTimeInMillis(), equalTo(0l));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void throttleStats() throws Exception {
|
||||
assertAcked(prepareCreate("test")
|
||||
.setSettings(ImmutableSettings.builder()
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "merge")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
|
||||
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
|
||||
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
|
||||
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
|
||||
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "1")
|
||||
.put("index.merge.policy.type", "tiered")
|
||||
|
||||
));
|
||||
ensureGreen();
|
||||
long termUpto = 0;
|
||||
IndicesStatsResponse stats;
|
||||
// make sure we see throttling kicking in:
|
||||
boolean done = false;
|
||||
long start = System.currentTimeMillis();
|
||||
while (!done) {
|
||||
for(int i=0; i<100; i++) {
|
||||
// Provoke slowish merging by making many unique terms:
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for(int j=0; j<100; j++) {
|
||||
sb.append(' ');
|
||||
sb.append(termUpto++);
|
||||
}
|
||||
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
|
||||
if (i % 2 == 0) {
|
||||
refresh();
|
||||
}
|
||||
}
|
||||
refresh();
|
||||
stats = client().admin().indices().prepareStats().execute().actionGet();
|
||||
//nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
|
||||
done = stats.getPrimaries().getIndexing().getTotal().getThrottleTimeInMillis() > 0;
|
||||
if (System.currentTimeMillis() - start > 300*1000) { //Wait 5 minutes for throttling to kick in
|
||||
fail("index throttling didn't kick in after 5 minutes of intense merging");
|
||||
}
|
||||
}
|
||||
|
||||
// Optimize & flush and wait; else we sometimes get a "Delete Index failed - not acked"
|
||||
// when ElasticsearchIntegrationTest.after tries to remove indices created by the test:
|
||||
logger.info("test: now optimize");
|
||||
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get();
|
||||
flush();
|
||||
logger.info("test: test done");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void simpleStats() throws Exception {
|
||||
createIndex("test1", "test2");
|
||||
@ -524,6 +440,8 @@ public class IndexStatsTests extends ElasticsearchIntegrationTest {
|
||||
|
||||
assertThat(stats.getTotal().getMerge(), notNullValue());
|
||||
assertThat(stats.getTotal().getMerge().getTotal(), greaterThan(0l));
|
||||
assertThat(stats.getTotal().getMerge().getTotalStoppedTime(), notNullValue());
|
||||
assertThat(stats.getTotal().getMerge().getTotalThrottledTime(), notNullValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -54,63 +54,48 @@ public class SimpleDistributorTests extends ElasticsearchIntegrationTest {
|
||||
String storeString = getStoreDirectory("test", 0).toString();
|
||||
logger.info(storeString);
|
||||
Path[] dataPaths = dataPaths();
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[rate_limited(niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
if (dataPaths.length > 1) {
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), rate_limited(niofs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), niofs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
}
|
||||
assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
|
||||
|
||||
createIndexWithStoreType("test", IndexStoreModule.Type.NIOFS, "random");
|
||||
storeString = getStoreDirectory("test", 0).toString();
|
||||
logger.info(storeString);
|
||||
dataPaths = dataPaths();
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(random[rate_limited(niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(random[niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
if (dataPaths.length > 1) {
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), rate_limited(niofs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), niofs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
}
|
||||
assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
|
||||
|
||||
createIndexWithStoreType("test", IndexStoreModule.Type.MMAPFS, "least_used");
|
||||
storeString = getStoreDirectory("test", 0).toString();
|
||||
logger.info(storeString);
|
||||
dataPaths = dataPaths();
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[rate_limited(mmapfs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[mmapfs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
if (dataPaths.length > 1) {
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), rate_limited(mmapfs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), mmapfs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
}
|
||||
assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
|
||||
|
||||
createIndexWithStoreType("test", IndexStoreModule.Type.SIMPLEFS, "least_used");
|
||||
storeString = getStoreDirectory("test", 0).toString();
|
||||
logger.info(storeString);
|
||||
dataPaths = dataPaths();
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[rate_limited(simplefs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[simplefs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
if (dataPaths.length > 1) {
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), rate_limited(simplefs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), simplefs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
}
|
||||
assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
|
||||
|
||||
createIndexWithStoreType("test", IndexStoreModule.Type.DEFAULT, "least_used");
|
||||
storeString = getStoreDirectory("test", 0).toString();
|
||||
logger.info(storeString);
|
||||
dataPaths = dataPaths();
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[rate_limited(default(mmapfs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[default(mmapfs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), containsString("),niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
|
||||
if (dataPaths.length > 1) {
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), rate_limited(default(mmapfs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), default(mmapfs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
}
|
||||
assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
|
||||
|
||||
createIndexWithoutRateLimitingStoreType("test", IndexStoreModule.Type.NIOFS, "least_used");
|
||||
storeString = getStoreDirectory("test", 0).toString();
|
||||
logger.info(storeString);
|
||||
dataPaths = dataPaths();
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
if (dataPaths.length > 1) {
|
||||
assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), niofs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
|
||||
}
|
||||
assertThat(storeString, endsWith(")])"));
|
||||
}
|
||||
|
||||
private void createIndexWithStoreType(String index, IndexStoreModule.Type storeType, String distributor) {
|
||||
@ -126,21 +111,6 @@ public class SimpleDistributorTests extends ElasticsearchIntegrationTest {
|
||||
assertThat(client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet().isTimedOut(), equalTo(false));
|
||||
}
|
||||
|
||||
private void createIndexWithoutRateLimitingStoreType(String index, IndexStoreModule.Type storeType, String distributor) {
|
||||
cluster().wipeIndices(index);
|
||||
client().admin().indices().prepareCreate(index)
|
||||
.setSettings(settingsBuilder()
|
||||
.put("index.store.distributor", distributor)
|
||||
.put("index.store.type", storeType)
|
||||
.put("index.store.throttle.type", "none")
|
||||
.put("index.number_of_replicas", 0)
|
||||
.put("index.number_of_shards", 1)
|
||||
)
|
||||
.execute().actionGet();
|
||||
assertThat(client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet().isTimedOut(), equalTo(false));
|
||||
}
|
||||
|
||||
|
||||
private Path[] dataPaths() {
|
||||
Set<String> nodes = internalCluster().nodesInclude("test");
|
||||
assertThat(nodes.isEmpty(), equalTo(false));
|
||||
|
@ -633,11 +633,6 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
|
||||
asyncIndexThreads[i].join();
|
||||
}
|
||||
|
||||
logger.info("--> update index settings to back to normal");
|
||||
assertAcked(client().admin().indices().prepareUpdateSettings("test-*").setSettings(ImmutableSettings.builder()
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "node")
|
||||
));
|
||||
|
||||
// Make sure that snapshot finished - doesn't matter if it failed or succeeded
|
||||
try {
|
||||
CreateSnapshotResponse snapshotResponse = snapshotResponseFuture.get();
|
||||
@ -679,11 +674,6 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
|
||||
for (int i = 0; i < between(10, 500); i++) {
|
||||
index(name, "doc", Integer.toString(i), "foo", "bar" + i);
|
||||
}
|
||||
|
||||
assertAcked(client().admin().indices().prepareUpdateSettings(name).setSettings(ImmutableSettings.builder()
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "all")
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, between(100, 50000))
|
||||
));
|
||||
}
|
||||
|
||||
public static abstract class TestCustomMetaData implements MetaData.Custom {
|
||||
|
@ -1361,12 +1361,6 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
|
||||
refresh();
|
||||
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
|
||||
|
||||
// Update settings to make sure that relocation is slow so we can start snapshot before relocation is finished
|
||||
assertAcked(client.admin().indices().prepareUpdateSettings("test-idx").setSettings(ImmutableSettings.builder()
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "all")
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, 100)
|
||||
));
|
||||
|
||||
logger.info("--> start relocations");
|
||||
allowNodes("test-idx", internalCluster().numDataNodes());
|
||||
|
||||
@ -1377,11 +1371,6 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
|
||||
logger.info("--> snapshot");
|
||||
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
|
||||
|
||||
// Update settings to back to normal
|
||||
assertAcked(client.admin().indices().prepareUpdateSettings("test-idx").setSettings(ImmutableSettings.builder()
|
||||
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "node")
|
||||
));
|
||||
|
||||
logger.info("--> wait for snapshot to complete");
|
||||
SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600));
|
||||
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));
|
||||
|
@ -27,7 +27,6 @@ import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Predicate;
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import org.apache.lucene.store.StoreRateLimiting;
|
||||
import org.apache.lucene.util.AbstractRandomizedTest;
|
||||
import org.apache.lucene.util.IOUtils;
|
||||
import org.apache.lucene.util.TestUtil;
|
||||
@ -430,16 +429,6 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
|
||||
setRandomTranslogSettings(random, builder);
|
||||
setRandomNormsLoading(random, builder);
|
||||
setRandomScriptingSettings(random, builder);
|
||||
if (random.nextBoolean()) {
|
||||
if (random.nextInt(10) == 0) { // do something crazy slow here
|
||||
builder.put(IndicesStore.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, new ByteSizeValue(RandomInts.randomIntBetween(random, 1, 10), ByteSizeUnit.MB));
|
||||
} else {
|
||||
builder.put(IndicesStore.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, new ByteSizeValue(RandomInts.randomIntBetween(random, 10, 200), ByteSizeUnit.MB));
|
||||
}
|
||||
}
|
||||
if (random.nextBoolean()) {
|
||||
builder.put(IndicesStore.INDICES_STORE_THROTTLE_TYPE, RandomPicks.randomFrom(random, StoreRateLimiting.Type.values()));
|
||||
}
|
||||
|
||||
if (random.nextBoolean()) {
|
||||
builder.put(StoreModule.DISTIBUTOR_KEY, random.nextBoolean() ? StoreModule.LEAST_USED_DISTRIBUTOR : StoreModule.RANDOM_WEIGHT_DISTRIBUTOR);
|
||||
|
@ -24,7 +24,6 @@ import org.apache.lucene.index.CheckIndex;
|
||||
import org.apache.lucene.index.IndexWriter;
|
||||
import org.apache.lucene.store.Directory;
|
||||
import org.apache.lucene.store.LockFactory;
|
||||
import org.apache.lucene.store.StoreRateLimiting;
|
||||
import org.apache.lucene.util.AbstractRandomizedTest;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
@ -151,21 +150,6 @@ public class MockFSDirectoryService extends FsDirectoryService {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onPause(long nanos) {
|
||||
delegateService.onPause(nanos);
|
||||
}
|
||||
|
||||
@Override
|
||||
public StoreRateLimiting rateLimiting() {
|
||||
return delegateService.rateLimiting();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long throttleTimeInNanos() {
|
||||
return delegateService.throttleTimeInNanos();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Directory newFromDistributor(Distributor distributor) throws IOException {
|
||||
return helper.wrap(super.newFromDistributor(distributor));
|
||||
|
Loading…
x
Reference in New Issue
Block a user