put back fixed throttling, but off by default

This commit is contained in:
Michael McCandless 2015-01-14 05:35:09 -05:00 committed by mikemccand
parent 87d8fbff28
commit 107099affa
30 changed files with 584 additions and 40 deletions

View File

@ -48,6 +48,8 @@ Will return, for example:
"store": { "store": {
"size": "5.6kb", "size": "5.6kb",
"size_in_bytes": 5770, "size_in_bytes": 5770,
"throttle_time": "0s",
"throttle_time_in_millis": 0
}, },
"fielddata": { "fielddata": {
"memory_size": "0b", "memory_size": "0b",

View File

@ -183,6 +183,15 @@ due to forced awareness or allocation filtering.
`indices.recovery.max_bytes_per_sec`:: `indices.recovery.max_bytes_per_sec`::
See <<modules-indices>> See <<modules-indices>>
[float]
==== Store level throttling
`indices.store.throttle.type`::
See <<index-modules-store>>
`indices.store.throttle.max_bytes_per_sec`::
See <<index-modules-store>>
[float] [float]
[[logger]] [[logger]]
=== Logger === Logger

View File

@ -7,6 +7,12 @@ where the index data is stored, and are immutable up to delete markers.
Segments are, periodically, merged into larger segments to keep the Segments are, periodically, merged into larger segments to keep the
index size at bay and expunge deletes. index size at bay and expunge deletes.
The more segments one has in the Lucene index means slower searches and
more memory used. Segment merging is used to reduce the number of segments,
however merges can be expensive to perform, especially on low IO environments.
Merges can be throttled using <<store-throttling,store level throttling>>.
[float] [float]
[[policy]] [[policy]]
=== Policy === Policy

View File

@ -18,6 +18,38 @@ heap space* using the "Memory" (see below) storage type. It translates
to the fact that there is no need for extra large JVM heaps (with their to the fact that there is no need for extra large JVM heaps (with their
own consequences) for storing the index in memory. own consequences) for storing the index in memory.
[float]
[[store-throttling]]
=== Store Level Throttling
The way Lucene, the IR library elasticsearch uses under the covers,
works is by creating immutable segments (up to deletes) and constantly
merging them (the merge policy settings allow to control how those
merges happen). The merge process happens in an asynchronous manner
without affecting the indexing / search speed. The problem though,
especially on systems with low IO, is that the merge process can be
expensive and affect search / index operation simply by the fact that
the box is now taxed with more IO happening.
The store module allows to have throttling configured for merges (or
all) either on the node level, or on the index level. The node level
throttling will make sure that out of all the shards allocated on that
node, the merge process won't pass the specific setting bytes per
second. It can be set by setting `indices.store.throttle.type` to
`merge`, and setting `indices.store.throttle.max_bytes_per_sec` to
something like `5mb`. The node level settings can be changed dynamically
using the cluster update settings API. The default is disabled (set to `none`),
in favor of <merge,auto throttle in the merge scheduler>.
If specific index level configuration is needed, regardless of the node
level settings, it can be set as well using the
`index.store.throttle.type`, and
`index.store.throttle.max_bytes_per_sec`. The default value for the type
is `node`, meaning it will throttle based on the node level settings and
participate in the global throttling happening. Both settings can be set
using the index update settings API dynamically.
[float] [float]
[[file-system]] [[file-system]]
=== File system storage types === File system storage types

View File

@ -60,3 +60,17 @@ The following settings can be set to manage the recovery policy:
`indices.recovery.max_bytes_per_sec`:: `indices.recovery.max_bytes_per_sec`::
defaults to `20mb`. defaults to `20mb`.
[float]
[[throttling]]
=== Store level throttling
The following settings can be set to control the store throttling:
[horizontal]
`indices.store.throttle.type`::
could be `merge` (default), `none` or `all`. See <<index-modules-store>>.
`indices.store.throttle.max_bytes_per_sec`::
defaults to `20mb`.

View File

@ -16,17 +16,12 @@
* specific language governing permissions and limitations * specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package org.apache.lucene.store;
package org.elasticsearch.index.store;
import java.util.Arrays; import java.util.Arrays;
import org.apache.lucene.store.Directory; /**
import org.apache.lucene.store.FileSwitchDirectory; */
import org.apache.lucene.store.MMapDirectory;
import org.apache.lucene.store.NIOFSDirectory;
import org.apache.lucene.store.SimpleFSDirectory;
public final class StoreUtils { public final class StoreUtils {
private StoreUtils() { private StoreUtils() {

View File

@ -65,6 +65,8 @@ public class ClusterDynamicSettingsModule extends AbstractModule {
clusterDynamicSettings.addDynamicSetting(IndicesFilterCache.INDICES_CACHE_FILTER_SIZE); clusterDynamicSettings.addDynamicSetting(IndicesFilterCache.INDICES_CACHE_FILTER_SIZE);
clusterDynamicSettings.addDynamicSetting(IndicesFilterCache.INDICES_CACHE_FILTER_EXPIRE, Validator.TIME); clusterDynamicSettings.addDynamicSetting(IndicesFilterCache.INDICES_CACHE_FILTER_EXPIRE, Validator.TIME);
clusterDynamicSettings.addDynamicSetting(IndicesFilterCache.INDICES_CACHE_FILTER_CONCURRENCY_LEVEL, Validator.POSITIVE_INTEGER); clusterDynamicSettings.addDynamicSetting(IndicesFilterCache.INDICES_CACHE_FILTER_CONCURRENCY_LEVEL, Validator.POSITIVE_INTEGER);
clusterDynamicSettings.addDynamicSetting(IndicesStore.INDICES_STORE_THROTTLE_TYPE);
clusterDynamicSettings.addDynamicSetting(IndicesStore.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE);
clusterDynamicSettings.addDynamicSetting(IndicesTTLService.INDICES_TTL_INTERVAL, Validator.TIME); clusterDynamicSettings.addDynamicSetting(IndicesTTLService.INDICES_TTL_INTERVAL, Validator.TIME);
clusterDynamicSettings.addDynamicSetting(MappingUpdatedAction.INDICES_MAPPING_ADDITIONAL_MAPPING_CHANGE_TIME, Validator.TIME); clusterDynamicSettings.addDynamicSetting(MappingUpdatedAction.INDICES_MAPPING_ADDITIONAL_MAPPING_CHANGE_TIME, Validator.TIME);
clusterDynamicSettings.addDynamicSetting(MetaData.SETTING_READ_ONLY); clusterDynamicSettings.addDynamicSetting(MetaData.SETTING_READ_ONLY);

View File

@ -19,8 +19,6 @@
package org.elasticsearch.index.merge; package org.elasticsearch.index.merge;
import java.io.IOException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
@ -31,6 +29,11 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
/**
*
*/
public class MergeStats implements Streamable, ToXContent { public class MergeStats implements Streamable, ToXContent {
private long total; private long total;
@ -64,7 +67,12 @@ public class MergeStats implements Streamable, ToXContent {
this.currentSizeInBytes += currentSizeInBytes; this.currentSizeInBytes += currentSizeInBytes;
this.totalStoppedTimeInMillis += stoppedTimeMillis; this.totalStoppedTimeInMillis += stoppedTimeMillis;
this.totalThrottledTimeInMillis += throttledTimeMillis; this.totalThrottledTimeInMillis += throttledTimeMillis;
this.totalBytesPerSecAutoThrottle += (long) (mbPerSecAutoThrottle * 1024 * 1024); long bytesPerSecAutoThrottle = (long) (mbPerSecAutoThrottle * 1024 * 1024);
if (this.totalBytesPerSecAutoThrottle == Long.MAX_VALUE || bytesPerSecAutoThrottle == Long.MAX_VALUE) {
this.totalBytesPerSecAutoThrottle = Long.MAX_VALUE;
} else {
this.totalBytesPerSecAutoThrottle += bytesPerSecAutoThrottle;
}
} }
public void add(MergeStats mergeStats) { public void add(MergeStats mergeStats) {
@ -80,7 +88,11 @@ public class MergeStats implements Streamable, ToXContent {
this.currentSizeInBytes += mergeStats.currentSizeInBytes; this.currentSizeInBytes += mergeStats.currentSizeInBytes;
this.totalStoppedTimeInMillis += mergeStats.totalStoppedTimeInMillis; this.totalStoppedTimeInMillis += mergeStats.totalStoppedTimeInMillis;
this.totalThrottledTimeInMillis += mergeStats.totalThrottledTimeInMillis; this.totalThrottledTimeInMillis += mergeStats.totalThrottledTimeInMillis;
this.totalBytesPerSecAutoThrottle += mergeStats.totalBytesPerSecAutoThrottle; if (this.totalBytesPerSecAutoThrottle == Long.MAX_VALUE || mergeStats.totalBytesPerSecAutoThrottle == Long.MAX_VALUE) {
this.totalBytesPerSecAutoThrottle = Long.MAX_VALUE;
} else {
this.totalBytesPerSecAutoThrottle += mergeStats.totalBytesPerSecAutoThrottle;
}
} }
/** /**

View File

@ -41,6 +41,9 @@ import java.io.IOException;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.CopyOnWriteArraySet;
/**
*
*/
public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider { public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider {
private final IndexSettingsService indexSettingsService; private final IndexSettingsService indexSettingsService;

View File

@ -34,6 +34,9 @@ import java.io.Closeable;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
/**
*
*/
public abstract class MergeSchedulerProvider extends AbstractIndexShardComponent implements IndexShardComponent, Closeable { public abstract class MergeSchedulerProvider extends AbstractIndexShardComponent implements IndexShardComponent, Closeable {
public static interface FailureListener { public static interface FailureListener {

View File

@ -51,6 +51,8 @@ public class IndexDynamicSettingsModule extends AbstractModule {
public IndexDynamicSettingsModule() { public IndexDynamicSettingsModule() {
indexDynamicSettings = new DynamicSettings(); indexDynamicSettings = new DynamicSettings();
indexDynamicSettings.addDynamicSetting(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE);
indexDynamicSettings.addDynamicSetting(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE);
indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT); indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT);
indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT); indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT);
indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.AUTO_THROTTLE); indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.AUTO_THROTTLE);

View File

@ -39,6 +39,8 @@ public abstract class DirectoryService extends AbstractIndexShardComponent {
public abstract Directory[] build() throws IOException; public abstract Directory[] build() throws IOException;
public abstract long throttleTimeInNanos();
/** /**
* Creates a new Directory from the given distributor. * Creates a new Directory from the given distributor.
* The default implementation returns a new {@link org.elasticsearch.index.store.DistributorDirectory} * The default implementation returns a new {@link org.elasticsearch.index.store.DistributorDirectory}
@ -56,4 +58,4 @@ public abstract class DirectoryService extends AbstractIndexShardComponent {
} }
return new DistributorDirectory(distributor); return new DistributorDirectory(distributor);
} }
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.store; package org.elasticsearch.index.store;
import org.apache.lucene.store.StoreRateLimiting;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
@ -32,6 +33,12 @@ import java.nio.file.Path;
*/ */
public interface IndexStore extends Closeable { public interface IndexStore extends Closeable {
/**
* Returns the rate limiting, either of the index is explicitly configured, or
* the node level one (defaults to the node level one).
*/
StoreRateLimiting rateLimiting();
/** /**
* The shard store class that should be used for each shard. * The shard store class that should be used for each shard.
*/ */

View File

@ -283,7 +283,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
public StoreStats stats() throws IOException { public StoreStats stats() throws IOException {
ensureOpen(); ensureOpen();
return new StoreStats(Directories.estimateSize(directory)); return new StoreStats(Directories.estimateSize(directory), directoryService.throttleTimeInNanos());
} }
public void renameFile(String from, String to) throws IOException { public void renameFile(String from, String to) throws IOException {

View File

@ -19,9 +19,6 @@
package org.elasticsearch.index.store; package org.elasticsearch.index.store;
import java.io.IOException;
import org.elasticsearch.Version;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
@ -31,18 +28,23 @@ import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentBuilderString;
import java.io.IOException;
/** /**
*/ */
public class StoreStats implements Streamable, ToXContent { public class StoreStats implements Streamable, ToXContent {
private long sizeInBytes; private long sizeInBytes;
private long throttleTimeInNanos;
public StoreStats() { public StoreStats() {
} }
public StoreStats(long sizeInBytes) { public StoreStats(long sizeInBytes, long throttleTimeInNanos) {
this.sizeInBytes = sizeInBytes; this.sizeInBytes = sizeInBytes;
this.throttleTimeInNanos = throttleTimeInNanos;
} }
public void add(StoreStats stats) { public void add(StoreStats stats) {
@ -50,6 +52,7 @@ public class StoreStats implements Streamable, ToXContent {
return; return;
} }
sizeInBytes += stats.sizeInBytes; sizeInBytes += stats.sizeInBytes;
throttleTimeInNanos += stats.throttleTimeInNanos;
} }
@ -69,6 +72,14 @@ public class StoreStats implements Streamable, ToXContent {
return size(); return size();
} }
public TimeValue throttleTime() {
return TimeValue.timeValueNanos(throttleTimeInNanos);
}
public TimeValue getThrottleTime() {
return throttleTime();
}
public static StoreStats readStoreStats(StreamInput in) throws IOException { public static StoreStats readStoreStats(StreamInput in) throws IOException {
StoreStats store = new StoreStats(); StoreStats store = new StoreStats();
store.readFrom(in); store.readFrom(in);
@ -78,17 +89,20 @@ public class StoreStats implements Streamable, ToXContent {
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
sizeInBytes = in.readVLong(); sizeInBytes = in.readVLong();
throttleTimeInNanos = in.readVLong();
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(sizeInBytes); out.writeVLong(sizeInBytes);
out.writeVLong(throttleTimeInNanos);
} }
@Override @Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Fields.STORE); builder.startObject(Fields.STORE);
builder.byteSizeField(Fields.SIZE_IN_BYTES, Fields.SIZE, sizeInBytes); builder.byteSizeField(Fields.SIZE_IN_BYTES, Fields.SIZE, sizeInBytes);
builder.timeValueField(Fields.THROTTLE_TIME_IN_MILLIS, Fields.THROTTLE_TIME, throttleTime());
builder.endObject(); builder.endObject();
return builder; return builder;
} }
@ -97,5 +111,8 @@ public class StoreStats implements Streamable, ToXContent {
static final XContentBuilderString STORE = new XContentBuilderString("store"); static final XContentBuilderString STORE = new XContentBuilderString("store");
static final XContentBuilderString SIZE = new XContentBuilderString("size"); static final XContentBuilderString SIZE = new XContentBuilderString("size");
static final XContentBuilderString SIZE_IN_BYTES = new XContentBuilderString("size_in_bytes"); static final XContentBuilderString SIZE_IN_BYTES = new XContentBuilderString("size_in_bytes");
static final XContentBuilderString THROTTLE_TIME = new XContentBuilderString("throttle_time");
static final XContentBuilderString THROTTLE_TIME_IN_MILLIS = new XContentBuilderString("throttle_time_in_millis");
} }
} }

View File

@ -19,18 +19,18 @@
package org.elasticsearch.index.store.distributor; package org.elasticsearch.index.store.distributor;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.StoreUtils;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.DirectoryUtils;
import java.io.IOException; import java.io.IOException;
import java.nio.file.FileStore; import java.nio.file.FileStore;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Arrays; import java.util.Arrays;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.DirectoryUtils;
import org.elasticsearch.index.store.StoreUtils;
public abstract class AbstractDistributor implements Distributor { public abstract class AbstractDistributor implements Distributor {
protected final Directory[] delegates; protected final Directory[] delegates;

View File

@ -36,15 +36,27 @@ import org.elasticsearch.index.store.StoreException;
/** /**
*/ */
public abstract class FsDirectoryService extends DirectoryService { public abstract class FsDirectoryService extends DirectoryService implements StoreRateLimiting.Listener, StoreRateLimiting.Provider {
protected final IndexStore indexStore; protected final IndexStore indexStore;
private final CounterMetric rateLimitingTimeInNanos = new CounterMetric();
public FsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) { public FsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) {
super(shardId, indexSettings); super(shardId, indexSettings);
this.indexStore = indexStore; this.indexStore = indexStore;
} }
@Override
public long throttleTimeInNanos() {
return rateLimitingTimeInNanos.count();
}
@Override
public StoreRateLimiting rateLimiting() {
return indexStore.rateLimiting();
}
protected final LockFactory buildLockFactory() throws IOException { protected final LockFactory buildLockFactory() throws IOException {
String fsLock = componentSettings.get("lock", componentSettings.get("fs_lock", "native")); String fsLock = componentSettings.get("lock", componentSettings.get("fs_lock", "native"));
LockFactory lockFactory; LockFactory lockFactory;
@ -65,10 +77,16 @@ public abstract class FsDirectoryService extends DirectoryService {
Directory[] dirs = new Directory[locations.length]; Directory[] dirs = new Directory[locations.length];
for (int i = 0; i < dirs.length; i++) { for (int i = 0; i < dirs.length; i++) {
Files.createDirectories(locations[i]); Files.createDirectories(locations[i]);
dirs[i] = newFSDirectory(locations[i], buildLockFactory()); Directory wrapped = newFSDirectory(locations[i], buildLockFactory());
dirs[i] = new RateLimitedFSDirectory(wrapped, this, this) ;
} }
return dirs; return dirs;
} }
protected abstract Directory newFSDirectory(Path location, LockFactory lockFactory) throws IOException; protected abstract Directory newFSDirectory(Path location, LockFactory lockFactory) throws IOException;
@Override
public void onPause(long nanos) {
rateLimitingTimeInNanos.inc(nanos);
}
} }

View File

@ -19,6 +19,7 @@
package org.elasticsearch.index.store.support; package org.elasticsearch.index.store.support;
import org.apache.lucene.store.StoreRateLimiting;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.FileSystemUtils;
@ -42,8 +43,37 @@ import java.nio.file.Path;
*/ */
public abstract class AbstractIndexStore extends AbstractIndexComponent implements IndexStore { public abstract class AbstractIndexStore extends AbstractIndexComponent implements IndexStore {
public static final String INDEX_STORE_THROTTLE_TYPE = "index.store.throttle.type";
public static final String INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC = "index.store.throttle.max_bytes_per_sec";
public static final String INDEX_FOLDER_NAME = "index"; public static final String INDEX_FOLDER_NAME = "index";
public static final String TRANSLOG_FOLDER_NAME = "translog"; public static final String TRANSLOG_FOLDER_NAME = "translog";
class ApplySettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
String rateLimitingType = settings.get(INDEX_STORE_THROTTLE_TYPE, AbstractIndexStore.this.rateLimitingType);
if (!rateLimitingType.equals(AbstractIndexStore.this.rateLimitingType)) {
logger.info("updating index.store.throttle.type from [{}] to [{}]", AbstractIndexStore.this.rateLimitingType, rateLimitingType);
if (rateLimitingType.equalsIgnoreCase("node")) {
AbstractIndexStore.this.rateLimitingType = rateLimitingType;
AbstractIndexStore.this.nodeRateLimiting = true;
} else {
StoreRateLimiting.Type.fromString(rateLimitingType);
AbstractIndexStore.this.rateLimitingType = rateLimitingType;
AbstractIndexStore.this.nodeRateLimiting = false;
AbstractIndexStore.this.rateLimiting.setType(rateLimitingType);
}
}
ByteSizeValue rateLimitingThrottle = settings.getAsBytesSize(INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, AbstractIndexStore.this.rateLimitingThrottle);
if (!rateLimitingThrottle.equals(AbstractIndexStore.this.rateLimitingThrottle)) {
logger.info("updating index.store.throttle.max_bytes_per_sec from [{}] to [{}], note, type is [{}]", AbstractIndexStore.this.rateLimitingThrottle, rateLimitingThrottle, AbstractIndexStore.this.rateLimitingType);
AbstractIndexStore.this.rateLimitingThrottle = rateLimitingThrottle;
AbstractIndexStore.this.rateLimiting.setMaxRate(rateLimitingThrottle);
}
}
}
private final NodeEnvironment nodeEnv; private final NodeEnvironment nodeEnv;
private final Path[] locations; private final Path[] locations;
@ -52,11 +82,32 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen
protected final IndicesStore indicesStore; protected final IndicesStore indicesStore;
private volatile String rateLimitingType;
private volatile ByteSizeValue rateLimitingThrottle;
private volatile boolean nodeRateLimiting;
private final StoreRateLimiting rateLimiting = new StoreRateLimiting();
private final ApplySettings applySettings = new ApplySettings();
protected AbstractIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, IndicesStore indicesStore, NodeEnvironment nodeEnv) { protected AbstractIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, IndicesStore indicesStore, NodeEnvironment nodeEnv) {
super(index, indexSettings); super(index, indexSettings);
this.indexService = indexService; this.indexService = indexService;
this.indicesStore = indicesStore; this.indicesStore = indicesStore;
this.rateLimitingType = indexSettings.get(INDEX_STORE_THROTTLE_TYPE, "none");
if (rateLimitingType.equalsIgnoreCase("node")) {
nodeRateLimiting = true;
} else {
nodeRateLimiting = false;
rateLimiting.setType(rateLimitingType);
}
this.rateLimitingThrottle = indexSettings.getAsBytesSize(INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, new ByteSizeValue(0));
rateLimiting.setMaxRate(rateLimitingThrottle);
logger.debug("using index.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimitingType, rateLimitingThrottle);
indexService.settingsService().addListener(applySettings);
this.nodeEnv = nodeEnv; this.nodeEnv = nodeEnv;
if (nodeEnv.hasNodeFile()) { if (nodeEnv.hasNodeFile()) {
this.locations = nodeEnv.indexPaths(index); this.locations = nodeEnv.indexPaths(index);
@ -67,8 +118,15 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen
@Override @Override
public void close() throws ElasticsearchException { public void close() throws ElasticsearchException {
indexService.settingsService().removeListener(applySettings);
} }
@Override
public StoreRateLimiting rateLimiting() {
return nodeRateLimiting ? indicesStore.rateLimiting() : this.rateLimiting;
}
@Override @Override
public boolean canDeleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings) { public boolean canDeleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings) {
if (locations == null) { if (locations == null) {

View File

@ -19,6 +19,7 @@
package org.elasticsearch.indices.store; package org.elasticsearch.indices.store;
import org.apache.lucene.store.StoreRateLimiting;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -59,10 +60,34 @@ import java.util.concurrent.atomic.AtomicInteger;
*/ */
public class IndicesStore extends AbstractComponent implements ClusterStateListener, Closeable { public class IndicesStore extends AbstractComponent implements ClusterStateListener, Closeable {
public static final String INDICES_STORE_THROTTLE_TYPE = "indices.store.throttle.type";
public static final String INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC = "indices.store.throttle.max_bytes_per_sec";
public static final String ACTION_SHARD_EXISTS = "internal:index/shard/exists"; public static final String ACTION_SHARD_EXISTS = "internal:index/shard/exists";
private static final EnumSet<IndexShardState> ACTIVE_STATES = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED); private static final EnumSet<IndexShardState> ACTIVE_STATES = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED);
class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
String rateLimitingType = settings.get(INDICES_STORE_THROTTLE_TYPE, IndicesStore.this.rateLimitingType);
// try and parse the type
StoreRateLimiting.Type.fromString(rateLimitingType);
if (!rateLimitingType.equals(IndicesStore.this.rateLimitingType)) {
logger.info("updating indices.store.throttle.type from [{}] to [{}]", IndicesStore.this.rateLimitingType, rateLimitingType);
IndicesStore.this.rateLimitingType = rateLimitingType;
IndicesStore.this.rateLimiting.setType(rateLimitingType);
}
ByteSizeValue rateLimitingThrottle = settings.getAsBytesSize(INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, IndicesStore.this.rateLimitingThrottle);
if (!rateLimitingThrottle.equals(IndicesStore.this.rateLimitingThrottle)) {
logger.info("updating indices.store.throttle.max_bytes_per_sec from [{}] to [{}], note, type is [{}]", IndicesStore.this.rateLimitingThrottle, rateLimitingThrottle, IndicesStore.this.rateLimitingType);
IndicesStore.this.rateLimitingThrottle = rateLimitingThrottle;
IndicesStore.this.rateLimiting.setMaxRate(rateLimitingThrottle);
}
}
}
private final NodeEnvironment nodeEnv; private final NodeEnvironment nodeEnv;
private final NodeSettingsService nodeSettingsService; private final NodeSettingsService nodeSettingsService;
@ -72,6 +97,12 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
private final ClusterService clusterService; private final ClusterService clusterService;
private final TransportService transportService; private final TransportService transportService;
private volatile String rateLimitingType;
private volatile ByteSizeValue rateLimitingThrottle;
private final StoreRateLimiting rateLimiting = new StoreRateLimiting();
private final ApplySettings applySettings = new ApplySettings();
@Inject @Inject
public IndicesStore(Settings settings, NodeEnvironment nodeEnv, NodeSettingsService nodeSettingsService, IndicesService indicesService, public IndicesStore(Settings settings, NodeEnvironment nodeEnv, NodeSettingsService nodeSettingsService, IndicesService indicesService,
ClusterService clusterService, TransportService transportService) { ClusterService clusterService, TransportService transportService) {
@ -83,6 +114,15 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
this.transportService = transportService; this.transportService = transportService;
transportService.registerHandler(ACTION_SHARD_EXISTS, new ShardActiveRequestHandler()); transportService.registerHandler(ACTION_SHARD_EXISTS, new ShardActiveRequestHandler());
// we don't limit by default (we default to CMS's auto throttle instead):
this.rateLimitingType = componentSettings.get("throttle.type", StoreRateLimiting.Type.NONE.name());
rateLimiting.setType(rateLimitingType);
this.rateLimitingThrottle = componentSettings.getAsBytesSize("throttle.max_bytes_per_sec", new ByteSizeValue(10240, ByteSizeUnit.MB));
rateLimiting.setMaxRate(rateLimitingThrottle);
logger.debug("using indices.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimitingType, rateLimitingThrottle);
nodeSettingsService.addListener(applySettings);
clusterService.addLast(this); clusterService.addLast(this);
} }
@ -95,8 +135,13 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
this.transportService = null; this.transportService = null;
} }
public StoreRateLimiting rateLimiting() {
return this.rateLimiting;
}
@Override @Override
public void close() { public void close() {
nodeSettingsService.removeListener(applySettings);
clusterService.remove(this); clusterService.remove(this);
} }

View File

@ -179,6 +179,11 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase {
public Directory[] build() throws IOException { public Directory[] build() throws IOException {
return new Directory[]{ directory }; return new Directory[]{ directory };
} }
@Override
public long throttleTimeInNanos() {
return 0;
}
}; };
return new Store(shardId, EMPTY_SETTINGS, directoryService, new LeastUsedDistributor(directoryService), new DummyShardLock(shardId)); return new Store(shardId, EMPTY_SETTINGS, directoryService, new LeastUsedDistributor(directoryService), new DummyShardLock(shardId));
} }

View File

@ -178,6 +178,11 @@ public class MergePolicySettingsTest extends ElasticsearchTestCase {
public Directory[] build() throws IOException { public Directory[] build() throws IOException {
return new Directory[] { new RAMDirectory() } ; return new Directory[] { new RAMDirectory() } ;
} }
@Override
public long throttleTimeInNanos() {
return 0;
}
}; };
return new Store(shardId, settings, directoryService, new LeastUsedDistributor(directoryService), new DummyShardLock(shardId)); return new Store(shardId, settings, directoryService, new LeastUsedDistributor(directoryService), new DummyShardLock(shardId));
} }

View File

@ -710,6 +710,11 @@ public class StoreTest extends ElasticsearchLuceneTestCase {
public Directory[] build() throws IOException { public Directory[] build() throws IOException {
return dirs; return dirs;
} }
@Override
public long throttleTimeInNanos() {
return random.nextInt(1000);
}
} }
public static void assertConsistent(Store store, Store.MetadataSnapshot metadata) throws IOException { public static void assertConsistent(Store store, Store.MetadataSnapshot metadata) throws IOException {

View File

@ -150,6 +150,11 @@ public class DistributorTests extends ElasticsearchTestCase {
public Directory[] build() throws IOException { public Directory[] build() throws IOException {
return directories; return directories;
} }
@Override
public long throttleTimeInNanos() {
return 0;
}
} }
public static class FakeFsDirectory extends FSDirectory { public static class FakeFsDirectory extends FSDirectory {

View File

@ -125,6 +125,140 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest {
} }
// #6626: make sure we can update throttle settings and the changes take effect
@Test
@Slow
public void testUpdateThrottleSettings() {
// No throttling at first, only 1 non-replicated shard, force lots of merging:
assertAcked(prepareCreate("test")
.setSettings(ImmutableSettings.builder()
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "none")
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "2")
));
ensureGreen();
long termUpto = 0;
for(int i=0;i<100;i++) {
// Provoke slowish merging by making many unique terms:
StringBuilder sb = new StringBuilder();
for(int j=0;j<100;j++) {
sb.append(' ');
sb.append(termUpto++);
}
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
if (i % 2 == 0) {
refresh();
}
}
// No merge IO throttling should have happened:
NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
for(NodeStats stats : nodesStats.getNodes()) {
assertThat(stats.getIndices().getStore().getThrottleTime().getMillis(), equalTo(0l));
}
logger.info("test: set low merge throttling");
// Now updates settings to turn on merge throttling lowish rate
client()
.admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(ImmutableSettings.builder()
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "merge")
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, "1mb"))
.get();
// Make sure setting says it is in fact changed:
GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings("test").get();
assertThat(getSettingsResponse.getSetting("test", AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE), equalTo("merge"));
// Also make sure we see throttling kicking in:
boolean done = false;
while (done == false) {
// Provoke slowish merging by making many unique terms:
for(int i=0;i<5;i++) {
StringBuilder sb = new StringBuilder();
for(int j=0;j<100;j++) {
sb.append(' ');
sb.append(termUpto++);
sb.append(" some random text that keeps repeating over and over again hambone");
}
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
}
refresh();
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
for(NodeStats stats : nodesStats.getNodes()) {
long throttleMillis = stats.getIndices().getStore().getThrottleTime().getMillis();
if (throttleMillis > 0) {
done = true;
break;
}
}
}
logger.info("test: disable merge throttling");
// Now updates settings to disable merge throttling
client()
.admin()
.indices()
.prepareUpdateSettings("test")
.setSettings(ImmutableSettings.builder()
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "none"))
.get();
// Optimize does a waitForMerges, which we must do to make sure all in-flight (throttled) merges finish:
logger.info("test: optimize");
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get();
logger.info("test: optimize done");
// Record current throttling so far
long sumThrottleTime = 0;
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
for(NodeStats stats : nodesStats.getNodes()) {
sumThrottleTime += stats.getIndices().getStore().getThrottleTime().getMillis();
}
// Make sure no further throttling happens:
for(int i=0;i<100;i++) {
// Provoke slowish merging by making many unique terms:
StringBuilder sb = new StringBuilder();
for(int j=0;j<100;j++) {
sb.append(' ');
sb.append(termUpto++);
}
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
if (i % 2 == 0) {
refresh();
}
}
logger.info("test: done indexing after disabling throttling");
long newSumThrottleTime = 0;
nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
for(NodeStats stats : nodesStats.getNodes()) {
newSumThrottleTime += stats.getIndices().getStore().getThrottleTime().getMillis();
}
// No additional merge IO throttling should have happened:
assertEquals(sumThrottleTime, newSumThrottleTime);
// Optimize & flush and wait; else we sometimes get a "Delete Index failed - not acked"
// when ElasticsearchIntegrationTest.after tries to remove indices created by the test:
// Wait for merges to finish
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get();
flush();
logger.info("test: test done");
}
private static class MockAppender extends AppenderSkeleton { private static class MockAppender extends AppenderSkeleton {
public boolean sawIndexWriterMessage; public boolean sawIndexWriterMessage;
public boolean sawFlushDeletes; public boolean sawFlushDeletes;

View File

@ -301,6 +301,90 @@ public class IndexStatsTests extends ElasticsearchIntegrationTest {
assertThat(client().admin().indices().prepareStats("idx").setQueryCache(true).get().getTotal().getQueryCache().getMemorySizeInBytes(), greaterThan(0l)); assertThat(client().admin().indices().prepareStats("idx").setQueryCache(true).get().getTotal().getQueryCache().getMemorySizeInBytes(), greaterThan(0l));
} }
@Test
public void nonThrottleStats() throws Exception {
assertAcked(prepareCreate("test")
.setSettings(ImmutableSettings.builder()
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "merge")
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "10000")
));
ensureGreen();
long termUpto = 0;
IndicesStatsResponse stats;
// Provoke slowish merging by making many unique terms:
for(int i=0; i<100; i++) {
StringBuilder sb = new StringBuilder();
for(int j=0; j<100; j++) {
sb.append(' ');
sb.append(termUpto++);
sb.append(" some random text that keeps repeating over and over again hambone");
}
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
}
refresh();
stats = client().admin().indices().prepareStats().execute().actionGet();
//nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
stats = client().admin().indices().prepareStats().execute().actionGet();
assertThat(stats.getPrimaries().getIndexing().getTotal().getThrottleTimeInMillis(), equalTo(0l));
}
@Test
public void throttleStats() throws Exception {
assertAcked(prepareCreate("test")
.setSettings(ImmutableSettings.builder()
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "merge")
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1")
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2")
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2")
.put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1")
.put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "1")
.put("index.merge.policy.type", "tiered")
));
ensureGreen();
long termUpto = 0;
IndicesStatsResponse stats;
// make sure we see throttling kicking in:
boolean done = false;
long start = System.currentTimeMillis();
while (!done) {
for(int i=0; i<100; i++) {
// Provoke slowish merging by making many unique terms:
StringBuilder sb = new StringBuilder();
for(int j=0; j<100; j++) {
sb.append(' ');
sb.append(termUpto++);
}
client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get();
if (i % 2 == 0) {
refresh();
}
}
refresh();
stats = client().admin().indices().prepareStats().execute().actionGet();
//nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get();
done = stats.getPrimaries().getIndexing().getTotal().getThrottleTimeInMillis() > 0;
if (System.currentTimeMillis() - start > 300*1000) { //Wait 5 minutes for throttling to kick in
fail("index throttling didn't kick in after 5 minutes of intense merging");
}
}
// Optimize & flush and wait; else we sometimes get a "Delete Index failed - not acked"
// when ElasticsearchIntegrationTest.after tries to remove indices created by the test:
logger.info("test: now optimize");
client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get();
flush();
logger.info("test: test done");
}
@Test @Test
public void simpleStats() throws Exception { public void simpleStats() throws Exception {
createIndex("test1", "test2"); createIndex("test1", "test2");
@ -440,9 +524,6 @@ public class IndexStatsTests extends ElasticsearchIntegrationTest {
assertThat(stats.getTotal().getMerge(), notNullValue()); assertThat(stats.getTotal().getMerge(), notNullValue());
assertThat(stats.getTotal().getMerge().getTotal(), greaterThan(0l)); assertThat(stats.getTotal().getMerge().getTotal(), greaterThan(0l));
assertThat(stats.getTotal().getMerge().getTotalStoppedTime(), notNullValue());
assertThat(stats.getTotal().getMerge().getTotalThrottledTime(), notNullValue());
assertThat(stats.getTotal().getMerge().getTotalBytesPerSecAutoThrottle(), greaterThan(0l));
} }
@Test @Test

View File

@ -54,48 +54,63 @@ public class SimpleDistributorTests extends ElasticsearchIntegrationTest {
String storeString = getStoreDirectory("test", 0).toString(); String storeString = getStoreDirectory("test", 0).toString();
logger.info(storeString); logger.info(storeString);
Path[] dataPaths = dataPaths(); Path[] dataPaths = dataPaths();
assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[rate_limited(niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
if (dataPaths.length > 1) { if (dataPaths.length > 1) {
assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), niofs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), rate_limited(niofs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
} }
assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
createIndexWithStoreType("test", IndexStoreModule.Type.NIOFS, "random"); createIndexWithStoreType("test", IndexStoreModule.Type.NIOFS, "random");
storeString = getStoreDirectory("test", 0).toString(); storeString = getStoreDirectory("test", 0).toString();
logger.info(storeString); logger.info(storeString);
dataPaths = dataPaths(); dataPaths = dataPaths();
assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(random[niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(random[rate_limited(niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
if (dataPaths.length > 1) { if (dataPaths.length > 1) {
assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), niofs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), rate_limited(niofs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
} }
assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
createIndexWithStoreType("test", IndexStoreModule.Type.MMAPFS, "least_used"); createIndexWithStoreType("test", IndexStoreModule.Type.MMAPFS, "least_used");
storeString = getStoreDirectory("test", 0).toString(); storeString = getStoreDirectory("test", 0).toString();
logger.info(storeString); logger.info(storeString);
dataPaths = dataPaths(); dataPaths = dataPaths();
assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[mmapfs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[rate_limited(mmapfs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
if (dataPaths.length > 1) { if (dataPaths.length > 1) {
assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), mmapfs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), rate_limited(mmapfs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
} }
assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
createIndexWithStoreType("test", IndexStoreModule.Type.SIMPLEFS, "least_used"); createIndexWithStoreType("test", IndexStoreModule.Type.SIMPLEFS, "least_used");
storeString = getStoreDirectory("test", 0).toString(); storeString = getStoreDirectory("test", 0).toString();
logger.info(storeString); logger.info(storeString);
dataPaths = dataPaths(); dataPaths = dataPaths();
assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[simplefs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[rate_limited(simplefs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
if (dataPaths.length > 1) { if (dataPaths.length > 1) {
assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), simplefs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), rate_limited(simplefs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
} }
assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
createIndexWithStoreType("test", IndexStoreModule.Type.DEFAULT, "least_used"); createIndexWithStoreType("test", IndexStoreModule.Type.DEFAULT, "least_used");
storeString = getStoreDirectory("test", 0).toString(); storeString = getStoreDirectory("test", 0).toString();
logger.info(storeString); logger.info(storeString);
dataPaths = dataPaths(); dataPaths = dataPaths();
assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[default(mmapfs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[rate_limited(default(mmapfs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
assertThat(storeString.toLowerCase(Locale.ROOT), containsString("),niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); assertThat(storeString.toLowerCase(Locale.ROOT), containsString("),niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
if (dataPaths.length > 1) { if (dataPaths.length > 1) {
assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), default(mmapfs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), rate_limited(default(mmapfs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
} }
assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])"));
createIndexWithoutRateLimitingStoreType("test", IndexStoreModule.Type.NIOFS, "least_used");
storeString = getStoreDirectory("test", 0).toString();
logger.info(storeString);
dataPaths = dataPaths();
assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
if (dataPaths.length > 1) {
assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), niofs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT)));
}
assertThat(storeString, endsWith(")])"));
} }
private void createIndexWithStoreType(String index, IndexStoreModule.Type storeType, String distributor) { private void createIndexWithStoreType(String index, IndexStoreModule.Type storeType, String distributor) {
@ -106,11 +121,28 @@ public class SimpleDistributorTests extends ElasticsearchIntegrationTest {
.put("index.store.type", storeType.name()) .put("index.store.type", storeType.name())
.put("index.number_of_replicas", 0) .put("index.number_of_replicas", 0)
.put("index.number_of_shards", 1) .put("index.number_of_shards", 1)
.put("index.store.throttle.type", "merge")
.put("index.store.throttle.max_bytes_per_sec", "20mb")
) )
.execute().actionGet(); .execute().actionGet();
assertThat(client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet().isTimedOut(), equalTo(false)); assertThat(client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet().isTimedOut(), equalTo(false));
} }
private void createIndexWithoutRateLimitingStoreType(String index, IndexStoreModule.Type storeType, String distributor) {
cluster().wipeIndices(index);
client().admin().indices().prepareCreate(index)
.setSettings(settingsBuilder()
.put("index.store.distributor", distributor)
.put("index.store.type", storeType)
.put("index.store.throttle.type", "none")
.put("index.number_of_replicas", 0)
.put("index.number_of_shards", 1)
)
.execute().actionGet();
assertThat(client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet().isTimedOut(), equalTo(false));
}
private Path[] dataPaths() { private Path[] dataPaths() {
Set<String> nodes = internalCluster().nodesInclude("test"); Set<String> nodes = internalCluster().nodesInclude("test");
assertThat(nodes.isEmpty(), equalTo(false)); assertThat(nodes.isEmpty(), equalTo(false));

View File

@ -633,6 +633,11 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
asyncIndexThreads[i].join(); asyncIndexThreads[i].join();
} }
logger.info("--> update index settings to back to normal");
assertAcked(client().admin().indices().prepareUpdateSettings("test-*").setSettings(ImmutableSettings.builder()
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "node")
));
// Make sure that snapshot finished - doesn't matter if it failed or succeeded // Make sure that snapshot finished - doesn't matter if it failed or succeeded
try { try {
CreateSnapshotResponse snapshotResponse = snapshotResponseFuture.get(); CreateSnapshotResponse snapshotResponse = snapshotResponseFuture.get();
@ -674,6 +679,11 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests
for (int i = 0; i < between(10, 500); i++) { for (int i = 0; i < between(10, 500); i++) {
index(name, "doc", Integer.toString(i), "foo", "bar" + i); index(name, "doc", Integer.toString(i), "foo", "bar" + i);
} }
assertAcked(client().admin().indices().prepareUpdateSettings(name).setSettings(ImmutableSettings.builder()
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "all")
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, between(100, 50000))
));
} }
public static abstract class TestCustomMetaData implements MetaData.Custom { public static abstract class TestCustomMetaData implements MetaData.Custom {

View File

@ -1361,6 +1361,12 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
refresh(); refresh();
assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L)); assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L));
// Update settings to make sure that relocation is slow so we can start snapshot before relocation is finished
assertAcked(client.admin().indices().prepareUpdateSettings("test-idx").setSettings(ImmutableSettings.builder()
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "all")
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, 100)
));
logger.info("--> start relocations"); logger.info("--> start relocations");
allowNodes("test-idx", internalCluster().numDataNodes()); allowNodes("test-idx", internalCluster().numDataNodes());
@ -1371,6 +1377,11 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests {
logger.info("--> snapshot"); logger.info("--> snapshot");
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get(); client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get();
// Update settings to back to normal
assertAcked(client.admin().indices().prepareUpdateSettings("test-idx").setSettings(ImmutableSettings.builder()
.put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "node")
));
logger.info("--> wait for snapshot to complete"); logger.info("--> wait for snapshot to complete");
SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600)); SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600));
assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS));

View File

@ -27,6 +27,7 @@ import com.google.common.base.Joiner;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.apache.lucene.store.StoreRateLimiting;
import org.apache.lucene.util.AbstractRandomizedTest; import org.apache.lucene.util.AbstractRandomizedTest;
import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.TestUtil; import org.apache.lucene.util.TestUtil;
@ -429,11 +430,23 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase
setRandomTranslogSettings(random, builder); setRandomTranslogSettings(random, builder);
setRandomNormsLoading(random, builder); setRandomNormsLoading(random, builder);
setRandomScriptingSettings(random, builder); setRandomScriptingSettings(random, builder);
if (random.nextBoolean()) {
if (random.nextInt(10) == 0) { // do something crazy slow here
builder.put(IndicesStore.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, new ByteSizeValue(RandomInts.randomIntBetween(random, 1, 10), ByteSizeUnit.MB));
} else {
builder.put(IndicesStore.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, new ByteSizeValue(RandomInts.randomIntBetween(random, 10, 200), ByteSizeUnit.MB));
}
}
if (random.nextBoolean()) {
builder.put(IndicesStore.INDICES_STORE_THROTTLE_TYPE, RandomPicks.randomFrom(random, StoreRateLimiting.Type.values()));
}
if (random.nextBoolean()) { if (random.nextBoolean()) {
builder.put(StoreModule.DISTIBUTOR_KEY, random.nextBoolean() ? StoreModule.LEAST_USED_DISTRIBUTOR : StoreModule.RANDOM_WEIGHT_DISTRIBUTOR); builder.put(StoreModule.DISTIBUTOR_KEY, random.nextBoolean() ? StoreModule.LEAST_USED_DISTRIBUTOR : StoreModule.RANDOM_WEIGHT_DISTRIBUTOR);
} }
if (random.nextBoolean()) {
builder.put(ConcurrentMergeSchedulerProvider.AUTO_THROTTLE, false);
}
if (random.nextBoolean()) { if (random.nextBoolean()) {
if (random.nextInt(10) == 0) { // do something crazy slow here if (random.nextInt(10) == 0) { // do something crazy slow here

View File

@ -24,6 +24,7 @@ import org.apache.lucene.index.CheckIndex;
import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.LockFactory; import org.apache.lucene.store.LockFactory;
import org.apache.lucene.store.StoreRateLimiting;
import org.apache.lucene.util.AbstractRandomizedTest; import org.apache.lucene.util.AbstractRandomizedTest;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
@ -150,6 +151,21 @@ public class MockFSDirectoryService extends FsDirectoryService {
} }
} }
@Override
public void onPause(long nanos) {
delegateService.onPause(nanos);
}
@Override
public StoreRateLimiting rateLimiting() {
return delegateService.rateLimiting();
}
@Override
public long throttleTimeInNanos() {
return delegateService.throttleTimeInNanos();
}
@Override @Override
public Directory newFromDistributor(Distributor distributor) throws IOException { public Directory newFromDistributor(Distributor distributor) throws IOException {
return helper.wrap(super.newFromDistributor(distributor)); return helper.wrap(super.newFromDistributor(distributor));