From 107099affa0f6e8bbacad140bb71bd1be62e1532 Mon Sep 17 00:00:00 2001 From: Michael McCandless Date: Wed, 14 Jan 2015 05:35:09 -0500 Subject: [PATCH] put back fixed throttling, but off by default --- docs/reference/cluster/stats.asciidoc | 2 + .../cluster/update-settings.asciidoc | 9 ++ docs/reference/index-modules/merge.asciidoc | 6 + docs/reference/index-modules/store.asciidoc | 32 +++++ docs/reference/modules/indices.asciidoc | 14 ++ .../org/apache/lucene/store/StoreUtils.java | 11 +- .../ClusterDynamicSettingsModule.java | 2 + .../elasticsearch/index/merge/MergeStats.java | 20 ++- .../ConcurrentMergeSchedulerProvider.java | 3 + .../scheduler/MergeSchedulerProvider.java | 3 + .../settings/IndexDynamicSettingsModule.java | 2 + .../index/store/DirectoryService.java | 4 +- .../elasticsearch/index/store/IndexStore.java | 7 + .../org/elasticsearch/index/store/Store.java | 2 +- .../elasticsearch/index/store/StoreStats.java | 25 +++- .../distributor/AbstractDistributor.java | 12 +- .../index/store/fs/FsDirectoryService.java | 22 ++- .../store/support/AbstractIndexStore.java | 58 ++++++++ .../indices/store/IndicesStore.java | 45 ++++++ .../engine/internal/InternalEngineTests.java | 5 + .../merge/policy/MergePolicySettingsTest.java | 5 + .../elasticsearch/index/store/StoreTest.java | 5 + .../store/distributor/DistributorTests.java | 5 + .../indices/settings/UpdateSettingsTests.java | 134 ++++++++++++++++++ .../indices/stats/IndexStatsTests.java | 87 +++++++++++- .../indices/store/SimpleDistributorTests.java | 52 +++++-- .../DedicatedClusterSnapshotRestoreTests.java | 10 ++ .../SharedClusterSnapshotRestoreTests.java | 11 ++ .../test/ElasticsearchIntegrationTest.java | 15 +- .../test/store/MockFSDirectoryService.java | 16 +++ 30 files changed, 584 insertions(+), 40 deletions(-) diff --git a/docs/reference/cluster/stats.asciidoc b/docs/reference/cluster/stats.asciidoc index 3ce6b92ed03..512c5a4da37 100644 --- a/docs/reference/cluster/stats.asciidoc +++ b/docs/reference/cluster/stats.asciidoc @@ -48,6 +48,8 @@ Will return, for example: "store": { "size": "5.6kb", "size_in_bytes": 5770, + "throttle_time": "0s", + "throttle_time_in_millis": 0 }, "fielddata": { "memory_size": "0b", diff --git a/docs/reference/cluster/update-settings.asciidoc b/docs/reference/cluster/update-settings.asciidoc index aded0fda24b..9962cf2bb23 100644 --- a/docs/reference/cluster/update-settings.asciidoc +++ b/docs/reference/cluster/update-settings.asciidoc @@ -183,6 +183,15 @@ due to forced awareness or allocation filtering. `indices.recovery.max_bytes_per_sec`:: See <> +[float] +==== Store level throttling + +`indices.store.throttle.type`:: + See <> + +`indices.store.throttle.max_bytes_per_sec`:: + See <> + [float] [[logger]] === Logger diff --git a/docs/reference/index-modules/merge.asciidoc b/docs/reference/index-modules/merge.asciidoc index 4d276f00037..6fd4d65793a 100644 --- a/docs/reference/index-modules/merge.asciidoc +++ b/docs/reference/index-modules/merge.asciidoc @@ -7,6 +7,12 @@ where the index data is stored, and are immutable up to delete markers. Segments are, periodically, merged into larger segments to keep the index size at bay and expunge deletes. +The more segments one has in the Lucene index means slower searches and +more memory used. Segment merging is used to reduce the number of segments, +however merges can be expensive to perform, especially on low IO environments. +Merges can be throttled using <>. + + [float] [[policy]] === Policy diff --git a/docs/reference/index-modules/store.asciidoc b/docs/reference/index-modules/store.asciidoc index 339f1f0d218..f7fdb86ad9a 100644 --- a/docs/reference/index-modules/store.asciidoc +++ b/docs/reference/index-modules/store.asciidoc @@ -18,6 +18,38 @@ heap space* using the "Memory" (see below) storage type. It translates to the fact that there is no need for extra large JVM heaps (with their own consequences) for storing the index in memory. + +[float] +[[store-throttling]] +=== Store Level Throttling + +The way Lucene, the IR library elasticsearch uses under the covers, +works is by creating immutable segments (up to deletes) and constantly +merging them (the merge policy settings allow to control how those +merges happen). The merge process happens in an asynchronous manner +without affecting the indexing / search speed. The problem though, +especially on systems with low IO, is that the merge process can be +expensive and affect search / index operation simply by the fact that +the box is now taxed with more IO happening. + +The store module allows to have throttling configured for merges (or +all) either on the node level, or on the index level. The node level +throttling will make sure that out of all the shards allocated on that +node, the merge process won't pass the specific setting bytes per +second. It can be set by setting `indices.store.throttle.type` to +`merge`, and setting `indices.store.throttle.max_bytes_per_sec` to +something like `5mb`. The node level settings can be changed dynamically +using the cluster update settings API. The default is disabled (set to `none`), +in favor of . + +If specific index level configuration is needed, regardless of the node +level settings, it can be set as well using the +`index.store.throttle.type`, and +`index.store.throttle.max_bytes_per_sec`. The default value for the type +is `node`, meaning it will throttle based on the node level settings and +participate in the global throttling happening. Both settings can be set +using the index update settings API dynamically. + [float] [[file-system]] === File system storage types diff --git a/docs/reference/modules/indices.asciidoc b/docs/reference/modules/indices.asciidoc index 774178e7b80..16c22a9dba7 100644 --- a/docs/reference/modules/indices.asciidoc +++ b/docs/reference/modules/indices.asciidoc @@ -60,3 +60,17 @@ The following settings can be set to manage the recovery policy: `indices.recovery.max_bytes_per_sec`:: defaults to `20mb`. + +[float] +[[throttling]] +=== Store level throttling + +The following settings can be set to control the store throttling: + +[horizontal] +`indices.store.throttle.type`:: + could be `merge` (default), `none` or `all`. See <>. + +`indices.store.throttle.max_bytes_per_sec`:: + defaults to `20mb`. + diff --git a/src/main/java/org/apache/lucene/store/StoreUtils.java b/src/main/java/org/apache/lucene/store/StoreUtils.java index 261e363db76..b7de08b1ec1 100644 --- a/src/main/java/org/apache/lucene/store/StoreUtils.java +++ b/src/main/java/org/apache/lucene/store/StoreUtils.java @@ -16,17 +16,12 @@ * specific language governing permissions and limitations * under the License. */ - -package org.elasticsearch.index.store; +package org.apache.lucene.store; import java.util.Arrays; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FileSwitchDirectory; -import org.apache.lucene.store.MMapDirectory; -import org.apache.lucene.store.NIOFSDirectory; -import org.apache.lucene.store.SimpleFSDirectory; - +/** + */ public final class StoreUtils { private StoreUtils() { diff --git a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java index 9cd7c427b41..cb4aca624ea 100644 --- a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java +++ b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java @@ -65,6 +65,8 @@ public class ClusterDynamicSettingsModule extends AbstractModule { clusterDynamicSettings.addDynamicSetting(IndicesFilterCache.INDICES_CACHE_FILTER_SIZE); clusterDynamicSettings.addDynamicSetting(IndicesFilterCache.INDICES_CACHE_FILTER_EXPIRE, Validator.TIME); clusterDynamicSettings.addDynamicSetting(IndicesFilterCache.INDICES_CACHE_FILTER_CONCURRENCY_LEVEL, Validator.POSITIVE_INTEGER); + clusterDynamicSettings.addDynamicSetting(IndicesStore.INDICES_STORE_THROTTLE_TYPE); + clusterDynamicSettings.addDynamicSetting(IndicesStore.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE); clusterDynamicSettings.addDynamicSetting(IndicesTTLService.INDICES_TTL_INTERVAL, Validator.TIME); clusterDynamicSettings.addDynamicSetting(MappingUpdatedAction.INDICES_MAPPING_ADDITIONAL_MAPPING_CHANGE_TIME, Validator.TIME); clusterDynamicSettings.addDynamicSetting(MetaData.SETTING_READ_ONLY); diff --git a/src/main/java/org/elasticsearch/index/merge/MergeStats.java b/src/main/java/org/elasticsearch/index/merge/MergeStats.java index a5cf9cc62fd..055558cc563 100644 --- a/src/main/java/org/elasticsearch/index/merge/MergeStats.java +++ b/src/main/java/org/elasticsearch/index/merge/MergeStats.java @@ -19,8 +19,6 @@ package org.elasticsearch.index.merge; -import java.io.IOException; - import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -31,6 +29,11 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; +import java.io.IOException; + +/** + * + */ public class MergeStats implements Streamable, ToXContent { private long total; @@ -64,7 +67,12 @@ public class MergeStats implements Streamable, ToXContent { this.currentSizeInBytes += currentSizeInBytes; this.totalStoppedTimeInMillis += stoppedTimeMillis; this.totalThrottledTimeInMillis += throttledTimeMillis; - this.totalBytesPerSecAutoThrottle += (long) (mbPerSecAutoThrottle * 1024 * 1024); + long bytesPerSecAutoThrottle = (long) (mbPerSecAutoThrottle * 1024 * 1024); + if (this.totalBytesPerSecAutoThrottle == Long.MAX_VALUE || bytesPerSecAutoThrottle == Long.MAX_VALUE) { + this.totalBytesPerSecAutoThrottle = Long.MAX_VALUE; + } else { + this.totalBytesPerSecAutoThrottle += bytesPerSecAutoThrottle; + } } public void add(MergeStats mergeStats) { @@ -80,7 +88,11 @@ public class MergeStats implements Streamable, ToXContent { this.currentSizeInBytes += mergeStats.currentSizeInBytes; this.totalStoppedTimeInMillis += mergeStats.totalStoppedTimeInMillis; this.totalThrottledTimeInMillis += mergeStats.totalThrottledTimeInMillis; - this.totalBytesPerSecAutoThrottle += mergeStats.totalBytesPerSecAutoThrottle; + if (this.totalBytesPerSecAutoThrottle == Long.MAX_VALUE || mergeStats.totalBytesPerSecAutoThrottle == Long.MAX_VALUE) { + this.totalBytesPerSecAutoThrottle = Long.MAX_VALUE; + } else { + this.totalBytesPerSecAutoThrottle += mergeStats.totalBytesPerSecAutoThrottle; + } } /** diff --git a/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java b/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java index c4bdc179c69..5605603ce97 100644 --- a/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/scheduler/ConcurrentMergeSchedulerProvider.java @@ -41,6 +41,9 @@ import java.io.IOException; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; +/** + * + */ public class ConcurrentMergeSchedulerProvider extends MergeSchedulerProvider { private final IndexSettingsService indexSettingsService; diff --git a/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java b/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java index 11719ec866e..de9a7d8bed6 100644 --- a/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java +++ b/src/main/java/org/elasticsearch/index/merge/scheduler/MergeSchedulerProvider.java @@ -34,6 +34,9 @@ import java.io.Closeable; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; +/** + * + */ public abstract class MergeSchedulerProvider extends AbstractIndexShardComponent implements IndexShardComponent, Closeable { public static interface FailureListener { diff --git a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java index 044db0c2c6b..7a17bcd46ff 100644 --- a/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java +++ b/src/main/java/org/elasticsearch/index/settings/IndexDynamicSettingsModule.java @@ -51,6 +51,8 @@ public class IndexDynamicSettingsModule extends AbstractModule { public IndexDynamicSettingsModule() { indexDynamicSettings = new DynamicSettings(); + indexDynamicSettings.addDynamicSetting(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, Validator.BYTES_SIZE); + indexDynamicSettings.addDynamicSetting(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE); indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT); indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT); indexDynamicSettings.addDynamicSetting(ConcurrentMergeSchedulerProvider.AUTO_THROTTLE); diff --git a/src/main/java/org/elasticsearch/index/store/DirectoryService.java b/src/main/java/org/elasticsearch/index/store/DirectoryService.java index 2fa4e8eb455..81d8910ed4c 100644 --- a/src/main/java/org/elasticsearch/index/store/DirectoryService.java +++ b/src/main/java/org/elasticsearch/index/store/DirectoryService.java @@ -39,6 +39,8 @@ public abstract class DirectoryService extends AbstractIndexShardComponent { public abstract Directory[] build() throws IOException; + public abstract long throttleTimeInNanos(); + /** * Creates a new Directory from the given distributor. * The default implementation returns a new {@link org.elasticsearch.index.store.DistributorDirectory} @@ -56,4 +58,4 @@ public abstract class DirectoryService extends AbstractIndexShardComponent { } return new DistributorDirectory(distributor); } -} +} \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/index/store/IndexStore.java b/src/main/java/org/elasticsearch/index/store/IndexStore.java index d22c993ae18..d979075694b 100644 --- a/src/main/java/org/elasticsearch/index/store/IndexStore.java +++ b/src/main/java/org/elasticsearch/index/store/IndexStore.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.store; +import org.apache.lucene.store.StoreRateLimiting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.shard.ShardId; @@ -32,6 +33,12 @@ import java.nio.file.Path; */ public interface IndexStore extends Closeable { + /** + * Returns the rate limiting, either of the index is explicitly configured, or + * the node level one (defaults to the node level one). + */ + StoreRateLimiting rateLimiting(); + /** * The shard store class that should be used for each shard. */ diff --git a/src/main/java/org/elasticsearch/index/store/Store.java b/src/main/java/org/elasticsearch/index/store/Store.java index c5610d6f257..87df70c81f9 100644 --- a/src/main/java/org/elasticsearch/index/store/Store.java +++ b/src/main/java/org/elasticsearch/index/store/Store.java @@ -283,7 +283,7 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref public StoreStats stats() throws IOException { ensureOpen(); - return new StoreStats(Directories.estimateSize(directory)); + return new StoreStats(Directories.estimateSize(directory), directoryService.throttleTimeInNanos()); } public void renameFile(String from, String to) throws IOException { diff --git a/src/main/java/org/elasticsearch/index/store/StoreStats.java b/src/main/java/org/elasticsearch/index/store/StoreStats.java index 2db10ff0f80..3b7f52a6895 100644 --- a/src/main/java/org/elasticsearch/index/store/StoreStats.java +++ b/src/main/java/org/elasticsearch/index/store/StoreStats.java @@ -19,9 +19,6 @@ package org.elasticsearch.index.store; -import java.io.IOException; - -import org.elasticsearch.Version; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -31,18 +28,23 @@ import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilderString; +import java.io.IOException; + /** */ public class StoreStats implements Streamable, ToXContent { private long sizeInBytes; + private long throttleTimeInNanos; + public StoreStats() { } - public StoreStats(long sizeInBytes) { + public StoreStats(long sizeInBytes, long throttleTimeInNanos) { this.sizeInBytes = sizeInBytes; + this.throttleTimeInNanos = throttleTimeInNanos; } public void add(StoreStats stats) { @@ -50,6 +52,7 @@ public class StoreStats implements Streamable, ToXContent { return; } sizeInBytes += stats.sizeInBytes; + throttleTimeInNanos += stats.throttleTimeInNanos; } @@ -69,6 +72,14 @@ public class StoreStats implements Streamable, ToXContent { return size(); } + public TimeValue throttleTime() { + return TimeValue.timeValueNanos(throttleTimeInNanos); + } + + public TimeValue getThrottleTime() { + return throttleTime(); + } + public static StoreStats readStoreStats(StreamInput in) throws IOException { StoreStats store = new StoreStats(); store.readFrom(in); @@ -78,17 +89,20 @@ public class StoreStats implements Streamable, ToXContent { @Override public void readFrom(StreamInput in) throws IOException { sizeInBytes = in.readVLong(); + throttleTimeInNanos = in.readVLong(); } @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(sizeInBytes); + out.writeVLong(throttleTimeInNanos); } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Fields.STORE); builder.byteSizeField(Fields.SIZE_IN_BYTES, Fields.SIZE, sizeInBytes); + builder.timeValueField(Fields.THROTTLE_TIME_IN_MILLIS, Fields.THROTTLE_TIME, throttleTime()); builder.endObject(); return builder; } @@ -97,5 +111,8 @@ public class StoreStats implements Streamable, ToXContent { static final XContentBuilderString STORE = new XContentBuilderString("store"); static final XContentBuilderString SIZE = new XContentBuilderString("size"); static final XContentBuilderString SIZE_IN_BYTES = new XContentBuilderString("size_in_bytes"); + + static final XContentBuilderString THROTTLE_TIME = new XContentBuilderString("throttle_time"); + static final XContentBuilderString THROTTLE_TIME_IN_MILLIS = new XContentBuilderString("throttle_time_in_millis"); } } diff --git a/src/main/java/org/elasticsearch/index/store/distributor/AbstractDistributor.java b/src/main/java/org/elasticsearch/index/store/distributor/AbstractDistributor.java index 5baa0da1f36..18dac8b6d92 100644 --- a/src/main/java/org/elasticsearch/index/store/distributor/AbstractDistributor.java +++ b/src/main/java/org/elasticsearch/index/store/distributor/AbstractDistributor.java @@ -19,18 +19,18 @@ package org.elasticsearch.index.store.distributor; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FSDirectory; +import org.apache.lucene.store.StoreUtils; +import org.elasticsearch.index.store.DirectoryService; +import org.elasticsearch.index.store.DirectoryUtils; + import java.io.IOException; import java.nio.file.FileStore; import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; -import org.apache.lucene.store.Directory; -import org.apache.lucene.store.FSDirectory; -import org.elasticsearch.index.store.DirectoryService; -import org.elasticsearch.index.store.DirectoryUtils; -import org.elasticsearch.index.store.StoreUtils; - public abstract class AbstractDistributor implements Distributor { protected final Directory[] delegates; diff --git a/src/main/java/org/elasticsearch/index/store/fs/FsDirectoryService.java b/src/main/java/org/elasticsearch/index/store/fs/FsDirectoryService.java index 5be06b37a3b..61786c8e5d6 100644 --- a/src/main/java/org/elasticsearch/index/store/fs/FsDirectoryService.java +++ b/src/main/java/org/elasticsearch/index/store/fs/FsDirectoryService.java @@ -36,15 +36,27 @@ import org.elasticsearch.index.store.StoreException; /** */ -public abstract class FsDirectoryService extends DirectoryService { +public abstract class FsDirectoryService extends DirectoryService implements StoreRateLimiting.Listener, StoreRateLimiting.Provider { protected final IndexStore indexStore; + private final CounterMetric rateLimitingTimeInNanos = new CounterMetric(); + public FsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) { super(shardId, indexSettings); this.indexStore = indexStore; } + @Override + public long throttleTimeInNanos() { + return rateLimitingTimeInNanos.count(); + } + + @Override + public StoreRateLimiting rateLimiting() { + return indexStore.rateLimiting(); + } + protected final LockFactory buildLockFactory() throws IOException { String fsLock = componentSettings.get("lock", componentSettings.get("fs_lock", "native")); LockFactory lockFactory; @@ -65,10 +77,16 @@ public abstract class FsDirectoryService extends DirectoryService { Directory[] dirs = new Directory[locations.length]; for (int i = 0; i < dirs.length; i++) { Files.createDirectories(locations[i]); - dirs[i] = newFSDirectory(locations[i], buildLockFactory()); + Directory wrapped = newFSDirectory(locations[i], buildLockFactory()); + dirs[i] = new RateLimitedFSDirectory(wrapped, this, this) ; } return dirs; } protected abstract Directory newFSDirectory(Path location, LockFactory lockFactory) throws IOException; + + @Override + public void onPause(long nanos) { + rateLimitingTimeInNanos.inc(nanos); + } } diff --git a/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java b/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java index 357e9ce771a..bb73c669047 100644 --- a/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java +++ b/src/main/java/org/elasticsearch/index/store/support/AbstractIndexStore.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.store.support; +import org.apache.lucene.store.StoreRateLimiting; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.common.io.FileSystemUtils; @@ -42,8 +43,37 @@ import java.nio.file.Path; */ public abstract class AbstractIndexStore extends AbstractIndexComponent implements IndexStore { + public static final String INDEX_STORE_THROTTLE_TYPE = "index.store.throttle.type"; + public static final String INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC = "index.store.throttle.max_bytes_per_sec"; + public static final String INDEX_FOLDER_NAME = "index"; public static final String TRANSLOG_FOLDER_NAME = "translog"; + + class ApplySettings implements IndexSettingsService.Listener { + @Override + public void onRefreshSettings(Settings settings) { + String rateLimitingType = settings.get(INDEX_STORE_THROTTLE_TYPE, AbstractIndexStore.this.rateLimitingType); + if (!rateLimitingType.equals(AbstractIndexStore.this.rateLimitingType)) { + logger.info("updating index.store.throttle.type from [{}] to [{}]", AbstractIndexStore.this.rateLimitingType, rateLimitingType); + if (rateLimitingType.equalsIgnoreCase("node")) { + AbstractIndexStore.this.rateLimitingType = rateLimitingType; + AbstractIndexStore.this.nodeRateLimiting = true; + } else { + StoreRateLimiting.Type.fromString(rateLimitingType); + AbstractIndexStore.this.rateLimitingType = rateLimitingType; + AbstractIndexStore.this.nodeRateLimiting = false; + AbstractIndexStore.this.rateLimiting.setType(rateLimitingType); + } + } + + ByteSizeValue rateLimitingThrottle = settings.getAsBytesSize(INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, AbstractIndexStore.this.rateLimitingThrottle); + if (!rateLimitingThrottle.equals(AbstractIndexStore.this.rateLimitingThrottle)) { + logger.info("updating index.store.throttle.max_bytes_per_sec from [{}] to [{}], note, type is [{}]", AbstractIndexStore.this.rateLimitingThrottle, rateLimitingThrottle, AbstractIndexStore.this.rateLimitingType); + AbstractIndexStore.this.rateLimitingThrottle = rateLimitingThrottle; + AbstractIndexStore.this.rateLimiting.setMaxRate(rateLimitingThrottle); + } + } + } private final NodeEnvironment nodeEnv; private final Path[] locations; @@ -52,11 +82,32 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen protected final IndicesStore indicesStore; + private volatile String rateLimitingType; + private volatile ByteSizeValue rateLimitingThrottle; + private volatile boolean nodeRateLimiting; + + private final StoreRateLimiting rateLimiting = new StoreRateLimiting(); + + private final ApplySettings applySettings = new ApplySettings(); + protected AbstractIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, IndicesStore indicesStore, NodeEnvironment nodeEnv) { super(index, indexSettings); this.indexService = indexService; this.indicesStore = indicesStore; + this.rateLimitingType = indexSettings.get(INDEX_STORE_THROTTLE_TYPE, "none"); + if (rateLimitingType.equalsIgnoreCase("node")) { + nodeRateLimiting = true; + } else { + nodeRateLimiting = false; + rateLimiting.setType(rateLimitingType); + } + this.rateLimitingThrottle = indexSettings.getAsBytesSize(INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, new ByteSizeValue(0)); + rateLimiting.setMaxRate(rateLimitingThrottle); + + logger.debug("using index.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimitingType, rateLimitingThrottle); + + indexService.settingsService().addListener(applySettings); this.nodeEnv = nodeEnv; if (nodeEnv.hasNodeFile()) { this.locations = nodeEnv.indexPaths(index); @@ -67,8 +118,15 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen @Override public void close() throws ElasticsearchException { + indexService.settingsService().removeListener(applySettings); } + @Override + public StoreRateLimiting rateLimiting() { + return nodeRateLimiting ? indicesStore.rateLimiting() : this.rateLimiting; + } + + @Override public boolean canDeleteUnallocated(ShardId shardId, @IndexSettings Settings indexSettings) { if (locations == null) { diff --git a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java index cb0b03da62e..558ccdfebbe 100644 --- a/src/main/java/org/elasticsearch/indices/store/IndicesStore.java +++ b/src/main/java/org/elasticsearch/indices/store/IndicesStore.java @@ -19,6 +19,7 @@ package org.elasticsearch.indices.store; +import org.apache.lucene.store.StoreRateLimiting; import org.elasticsearch.Version; import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -59,10 +60,34 @@ import java.util.concurrent.atomic.AtomicInteger; */ public class IndicesStore extends AbstractComponent implements ClusterStateListener, Closeable { + public static final String INDICES_STORE_THROTTLE_TYPE = "indices.store.throttle.type"; + public static final String INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC = "indices.store.throttle.max_bytes_per_sec"; + public static final String ACTION_SHARD_EXISTS = "internal:index/shard/exists"; private static final EnumSet ACTIVE_STATES = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED); + class ApplySettings implements NodeSettingsService.Listener { + @Override + public void onRefreshSettings(Settings settings) { + String rateLimitingType = settings.get(INDICES_STORE_THROTTLE_TYPE, IndicesStore.this.rateLimitingType); + // try and parse the type + StoreRateLimiting.Type.fromString(rateLimitingType); + if (!rateLimitingType.equals(IndicesStore.this.rateLimitingType)) { + logger.info("updating indices.store.throttle.type from [{}] to [{}]", IndicesStore.this.rateLimitingType, rateLimitingType); + IndicesStore.this.rateLimitingType = rateLimitingType; + IndicesStore.this.rateLimiting.setType(rateLimitingType); + } + + ByteSizeValue rateLimitingThrottle = settings.getAsBytesSize(INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, IndicesStore.this.rateLimitingThrottle); + if (!rateLimitingThrottle.equals(IndicesStore.this.rateLimitingThrottle)) { + logger.info("updating indices.store.throttle.max_bytes_per_sec from [{}] to [{}], note, type is [{}]", IndicesStore.this.rateLimitingThrottle, rateLimitingThrottle, IndicesStore.this.rateLimitingType); + IndicesStore.this.rateLimitingThrottle = rateLimitingThrottle; + IndicesStore.this.rateLimiting.setMaxRate(rateLimitingThrottle); + } + } + } + private final NodeEnvironment nodeEnv; private final NodeSettingsService nodeSettingsService; @@ -72,6 +97,12 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe private final ClusterService clusterService; private final TransportService transportService; + private volatile String rateLimitingType; + private volatile ByteSizeValue rateLimitingThrottle; + private final StoreRateLimiting rateLimiting = new StoreRateLimiting(); + + private final ApplySettings applySettings = new ApplySettings(); + @Inject public IndicesStore(Settings settings, NodeEnvironment nodeEnv, NodeSettingsService nodeSettingsService, IndicesService indicesService, ClusterService clusterService, TransportService transportService) { @@ -83,6 +114,15 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe this.transportService = transportService; transportService.registerHandler(ACTION_SHARD_EXISTS, new ShardActiveRequestHandler()); + // we don't limit by default (we default to CMS's auto throttle instead): + this.rateLimitingType = componentSettings.get("throttle.type", StoreRateLimiting.Type.NONE.name()); + rateLimiting.setType(rateLimitingType); + this.rateLimitingThrottle = componentSettings.getAsBytesSize("throttle.max_bytes_per_sec", new ByteSizeValue(10240, ByteSizeUnit.MB)); + rateLimiting.setMaxRate(rateLimitingThrottle); + + logger.debug("using indices.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimitingType, rateLimitingThrottle); + + nodeSettingsService.addListener(applySettings); clusterService.addLast(this); } @@ -95,8 +135,13 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe this.transportService = null; } + public StoreRateLimiting rateLimiting() { + return this.rateLimiting; + } + @Override public void close() { + nodeSettingsService.removeListener(applySettings); clusterService.remove(this); } diff --git a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java index 8aba2d37aa1..8485a5aae7d 100644 --- a/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java +++ b/src/test/java/org/elasticsearch/index/engine/internal/InternalEngineTests.java @@ -179,6 +179,11 @@ public class InternalEngineTests extends ElasticsearchLuceneTestCase { public Directory[] build() throws IOException { return new Directory[]{ directory }; } + + @Override + public long throttleTimeInNanos() { + return 0; + } }; return new Store(shardId, EMPTY_SETTINGS, directoryService, new LeastUsedDistributor(directoryService), new DummyShardLock(shardId)); } diff --git a/src/test/java/org/elasticsearch/index/merge/policy/MergePolicySettingsTest.java b/src/test/java/org/elasticsearch/index/merge/policy/MergePolicySettingsTest.java index 43306d1b315..6caac7c3186 100644 --- a/src/test/java/org/elasticsearch/index/merge/policy/MergePolicySettingsTest.java +++ b/src/test/java/org/elasticsearch/index/merge/policy/MergePolicySettingsTest.java @@ -178,6 +178,11 @@ public class MergePolicySettingsTest extends ElasticsearchTestCase { public Directory[] build() throws IOException { return new Directory[] { new RAMDirectory() } ; } + + @Override + public long throttleTimeInNanos() { + return 0; + } }; return new Store(shardId, settings, directoryService, new LeastUsedDistributor(directoryService), new DummyShardLock(shardId)); } diff --git a/src/test/java/org/elasticsearch/index/store/StoreTest.java b/src/test/java/org/elasticsearch/index/store/StoreTest.java index 35dc4968a12..2dd834a6047 100644 --- a/src/test/java/org/elasticsearch/index/store/StoreTest.java +++ b/src/test/java/org/elasticsearch/index/store/StoreTest.java @@ -710,6 +710,11 @@ public class StoreTest extends ElasticsearchLuceneTestCase { public Directory[] build() throws IOException { return dirs; } + + @Override + public long throttleTimeInNanos() { + return random.nextInt(1000); + } } public static void assertConsistent(Store store, Store.MetadataSnapshot metadata) throws IOException { diff --git a/src/test/java/org/elasticsearch/index/store/distributor/DistributorTests.java b/src/test/java/org/elasticsearch/index/store/distributor/DistributorTests.java index 5d319692681..119c7636843 100644 --- a/src/test/java/org/elasticsearch/index/store/distributor/DistributorTests.java +++ b/src/test/java/org/elasticsearch/index/store/distributor/DistributorTests.java @@ -150,6 +150,11 @@ public class DistributorTests extends ElasticsearchTestCase { public Directory[] build() throws IOException { return directories; } + + @Override + public long throttleTimeInNanos() { + return 0; + } } public static class FakeFsDirectory extends FSDirectory { diff --git a/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsTests.java b/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsTests.java index d03577fb07d..e9e5af52ebd 100644 --- a/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsTests.java +++ b/src/test/java/org/elasticsearch/indices/settings/UpdateSettingsTests.java @@ -125,6 +125,140 @@ public class UpdateSettingsTests extends ElasticsearchIntegrationTest { } + // #6626: make sure we can update throttle settings and the changes take effect + @Test + @Slow + public void testUpdateThrottleSettings() { + + // No throttling at first, only 1 non-replicated shard, force lots of merging: + assertAcked(prepareCreate("test") + .setSettings(ImmutableSettings.builder() + .put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "none") + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1") + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2") + .put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1") + .put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "2") + )); + ensureGreen(); + long termUpto = 0; + for(int i=0;i<100;i++) { + // Provoke slowish merging by making many unique terms: + StringBuilder sb = new StringBuilder(); + for(int j=0;j<100;j++) { + sb.append(' '); + sb.append(termUpto++); + } + client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get(); + if (i % 2 == 0) { + refresh(); + } + } + + // No merge IO throttling should have happened: + NodesStatsResponse nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get(); + for(NodeStats stats : nodesStats.getNodes()) { + assertThat(stats.getIndices().getStore().getThrottleTime().getMillis(), equalTo(0l)); + } + + logger.info("test: set low merge throttling"); + + // Now updates settings to turn on merge throttling lowish rate + client() + .admin() + .indices() + .prepareUpdateSettings("test") + .setSettings(ImmutableSettings.builder() + .put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "merge") + .put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, "1mb")) + .get(); + + // Make sure setting says it is in fact changed: + GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings("test").get(); + assertThat(getSettingsResponse.getSetting("test", AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE), equalTo("merge")); + + // Also make sure we see throttling kicking in: + boolean done = false; + while (done == false) { + // Provoke slowish merging by making many unique terms: + for(int i=0;i<5;i++) { + StringBuilder sb = new StringBuilder(); + for(int j=0;j<100;j++) { + sb.append(' '); + sb.append(termUpto++); + sb.append(" some random text that keeps repeating over and over again hambone"); + } + client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get(); + } + refresh(); + nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get(); + for(NodeStats stats : nodesStats.getNodes()) { + long throttleMillis = stats.getIndices().getStore().getThrottleTime().getMillis(); + if (throttleMillis > 0) { + done = true; + break; + } + } + } + + logger.info("test: disable merge throttling"); + + // Now updates settings to disable merge throttling + client() + .admin() + .indices() + .prepareUpdateSettings("test") + .setSettings(ImmutableSettings.builder() + .put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "none")) + .get(); + + // Optimize does a waitForMerges, which we must do to make sure all in-flight (throttled) merges finish: + logger.info("test: optimize"); + client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get(); + logger.info("test: optimize done"); + + // Record current throttling so far + long sumThrottleTime = 0; + nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get(); + for(NodeStats stats : nodesStats.getNodes()) { + sumThrottleTime += stats.getIndices().getStore().getThrottleTime().getMillis(); + } + + // Make sure no further throttling happens: + for(int i=0;i<100;i++) { + // Provoke slowish merging by making many unique terms: + StringBuilder sb = new StringBuilder(); + for(int j=0;j<100;j++) { + sb.append(' '); + sb.append(termUpto++); + } + client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get(); + if (i % 2 == 0) { + refresh(); + } + } + logger.info("test: done indexing after disabling throttling"); + + long newSumThrottleTime = 0; + nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get(); + for(NodeStats stats : nodesStats.getNodes()) { + newSumThrottleTime += stats.getIndices().getStore().getThrottleTime().getMillis(); + } + + // No additional merge IO throttling should have happened: + assertEquals(sumThrottleTime, newSumThrottleTime); + + // Optimize & flush and wait; else we sometimes get a "Delete Index failed - not acked" + // when ElasticsearchIntegrationTest.after tries to remove indices created by the test: + + // Wait for merges to finish + client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get(); + flush(); + + logger.info("test: test done"); + } + private static class MockAppender extends AppenderSkeleton { public boolean sawIndexWriterMessage; public boolean sawFlushDeletes; diff --git a/src/test/java/org/elasticsearch/indices/stats/IndexStatsTests.java b/src/test/java/org/elasticsearch/indices/stats/IndexStatsTests.java index 0d7621ac783..0adebd64bb3 100644 --- a/src/test/java/org/elasticsearch/indices/stats/IndexStatsTests.java +++ b/src/test/java/org/elasticsearch/indices/stats/IndexStatsTests.java @@ -301,6 +301,90 @@ public class IndexStatsTests extends ElasticsearchIntegrationTest { assertThat(client().admin().indices().prepareStats("idx").setQueryCache(true).get().getTotal().getQueryCache().getMemorySizeInBytes(), greaterThan(0l)); } + + @Test + public void nonThrottleStats() throws Exception { + assertAcked(prepareCreate("test") + .setSettings(ImmutableSettings.builder() + .put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "merge") + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1") + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2") + .put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1") + .put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "10000") + )); + ensureGreen(); + long termUpto = 0; + IndicesStatsResponse stats; + // Provoke slowish merging by making many unique terms: + for(int i=0; i<100; i++) { + StringBuilder sb = new StringBuilder(); + for(int j=0; j<100; j++) { + sb.append(' '); + sb.append(termUpto++); + sb.append(" some random text that keeps repeating over and over again hambone"); + } + client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get(); + } + refresh(); + stats = client().admin().indices().prepareStats().execute().actionGet(); + //nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get(); + + stats = client().admin().indices().prepareStats().execute().actionGet(); + assertThat(stats.getPrimaries().getIndexing().getTotal().getThrottleTimeInMillis(), equalTo(0l)); + } + + @Test + public void throttleStats() throws Exception { + assertAcked(prepareCreate("test") + .setSettings(ImmutableSettings.builder() + .put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "merge") + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, "1") + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, "0") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE, "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER, "2") + .put(ConcurrentMergeSchedulerProvider.MAX_THREAD_COUNT, "1") + .put(ConcurrentMergeSchedulerProvider.MAX_MERGE_COUNT, "1") + .put("index.merge.policy.type", "tiered") + + )); + ensureGreen(); + long termUpto = 0; + IndicesStatsResponse stats; + // make sure we see throttling kicking in: + boolean done = false; + long start = System.currentTimeMillis(); + while (!done) { + for(int i=0; i<100; i++) { + // Provoke slowish merging by making many unique terms: + StringBuilder sb = new StringBuilder(); + for(int j=0; j<100; j++) { + sb.append(' '); + sb.append(termUpto++); + } + client().prepareIndex("test", "type", ""+termUpto).setSource("field" + (i%10), sb.toString()).get(); + if (i % 2 == 0) { + refresh(); + } + } + refresh(); + stats = client().admin().indices().prepareStats().execute().actionGet(); + //nodesStats = client().admin().cluster().prepareNodesStats().setIndices(true).get(); + done = stats.getPrimaries().getIndexing().getTotal().getThrottleTimeInMillis() > 0; + if (System.currentTimeMillis() - start > 300*1000) { //Wait 5 minutes for throttling to kick in + fail("index throttling didn't kick in after 5 minutes of intense merging"); + } + } + + // Optimize & flush and wait; else we sometimes get a "Delete Index failed - not acked" + // when ElasticsearchIntegrationTest.after tries to remove indices created by the test: + logger.info("test: now optimize"); + client().admin().indices().prepareOptimize("test").setWaitForMerge(true).get(); + flush(); + logger.info("test: test done"); + } + @Test public void simpleStats() throws Exception { createIndex("test1", "test2"); @@ -440,9 +524,6 @@ public class IndexStatsTests extends ElasticsearchIntegrationTest { assertThat(stats.getTotal().getMerge(), notNullValue()); assertThat(stats.getTotal().getMerge().getTotal(), greaterThan(0l)); - assertThat(stats.getTotal().getMerge().getTotalStoppedTime(), notNullValue()); - assertThat(stats.getTotal().getMerge().getTotalThrottledTime(), notNullValue()); - assertThat(stats.getTotal().getMerge().getTotalBytesPerSecAutoThrottle(), greaterThan(0l)); } @Test diff --git a/src/test/java/org/elasticsearch/indices/store/SimpleDistributorTests.java b/src/test/java/org/elasticsearch/indices/store/SimpleDistributorTests.java index adab2a2df18..bfe2876c8dc 100644 --- a/src/test/java/org/elasticsearch/indices/store/SimpleDistributorTests.java +++ b/src/test/java/org/elasticsearch/indices/store/SimpleDistributorTests.java @@ -54,48 +54,63 @@ public class SimpleDistributorTests extends ElasticsearchIntegrationTest { String storeString = getStoreDirectory("test", 0).toString(); logger.info(storeString); Path[] dataPaths = dataPaths(); - assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); + assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[rate_limited(niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); if (dataPaths.length > 1) { - assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), niofs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); + assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), rate_limited(niofs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); } + assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])")); createIndexWithStoreType("test", IndexStoreModule.Type.NIOFS, "random"); storeString = getStoreDirectory("test", 0).toString(); logger.info(storeString); dataPaths = dataPaths(); - assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(random[niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); + assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(random[rate_limited(niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); if (dataPaths.length > 1) { - assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), niofs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); + assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), rate_limited(niofs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); } + assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])")); createIndexWithStoreType("test", IndexStoreModule.Type.MMAPFS, "least_used"); storeString = getStoreDirectory("test", 0).toString(); logger.info(storeString); dataPaths = dataPaths(); - assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[mmapfs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); + assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[rate_limited(mmapfs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); if (dataPaths.length > 1) { - assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), mmapfs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); + assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), rate_limited(mmapfs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); } + assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])")); createIndexWithStoreType("test", IndexStoreModule.Type.SIMPLEFS, "least_used"); storeString = getStoreDirectory("test", 0).toString(); logger.info(storeString); dataPaths = dataPaths(); - assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[simplefs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); + assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[rate_limited(simplefs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); if (dataPaths.length > 1) { - assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), simplefs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); + assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), rate_limited(simplefs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); } + assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])")); createIndexWithStoreType("test", IndexStoreModule.Type.DEFAULT, "least_used"); storeString = getStoreDirectory("test", 0).toString(); logger.info(storeString); dataPaths = dataPaths(); - assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[default(mmapfs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); + assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[rate_limited(default(mmapfs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); assertThat(storeString.toLowerCase(Locale.ROOT), containsString("),niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); if (dataPaths.length > 1) { - assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), default(mmapfs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); + assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), rate_limited(default(mmapfs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); } + assertThat(storeString, endsWith(", type=MERGE, rate=20.0)])")); + + createIndexWithoutRateLimitingStoreType("test", IndexStoreModule.Type.NIOFS, "least_used"); + storeString = getStoreDirectory("test", 0).toString(); + logger.info(storeString); + dataPaths = dataPaths(); + assertThat(storeString.toLowerCase(Locale.ROOT), startsWith("store(least_used[niofs(" + dataPaths[0].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); + if (dataPaths.length > 1) { + assertThat(storeString.toLowerCase(Locale.ROOT), containsString("), niofs(" + dataPaths[1].toAbsolutePath().toString().toLowerCase(Locale.ROOT))); + } + assertThat(storeString, endsWith(")])")); } private void createIndexWithStoreType(String index, IndexStoreModule.Type storeType, String distributor) { @@ -106,11 +121,28 @@ public class SimpleDistributorTests extends ElasticsearchIntegrationTest { .put("index.store.type", storeType.name()) .put("index.number_of_replicas", 0) .put("index.number_of_shards", 1) + .put("index.store.throttle.type", "merge") + .put("index.store.throttle.max_bytes_per_sec", "20mb") ) .execute().actionGet(); assertThat(client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet().isTimedOut(), equalTo(false)); } + private void createIndexWithoutRateLimitingStoreType(String index, IndexStoreModule.Type storeType, String distributor) { + cluster().wipeIndices(index); + client().admin().indices().prepareCreate(index) + .setSettings(settingsBuilder() + .put("index.store.distributor", distributor) + .put("index.store.type", storeType) + .put("index.store.throttle.type", "none") + .put("index.number_of_replicas", 0) + .put("index.number_of_shards", 1) + ) + .execute().actionGet(); + assertThat(client().admin().cluster().prepareHealth("test").setWaitForGreenStatus().execute().actionGet().isTimedOut(), equalTo(false)); + } + + private Path[] dataPaths() { Set nodes = internalCluster().nodesInclude("test"); assertThat(nodes.isEmpty(), equalTo(false)); diff --git a/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java b/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java index 7c3a00ed0e2..fad835d5e24 100644 --- a/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java +++ b/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreTests.java @@ -633,6 +633,11 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests asyncIndexThreads[i].join(); } + logger.info("--> update index settings to back to normal"); + assertAcked(client().admin().indices().prepareUpdateSettings("test-*").setSettings(ImmutableSettings.builder() + .put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "node") + )); + // Make sure that snapshot finished - doesn't matter if it failed or succeeded try { CreateSnapshotResponse snapshotResponse = snapshotResponseFuture.get(); @@ -674,6 +679,11 @@ public class DedicatedClusterSnapshotRestoreTests extends AbstractSnapshotTests for (int i = 0; i < between(10, 500); i++) { index(name, "doc", Integer.toString(i), "foo", "bar" + i); } + + assertAcked(client().admin().indices().prepareUpdateSettings(name).setSettings(ImmutableSettings.builder() + .put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "all") + .put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, between(100, 50000)) + )); } public static abstract class TestCustomMetaData implements MetaData.Custom { diff --git a/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java b/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java index 81d14aea5da..08bc0d3c1fc 100644 --- a/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java +++ b/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreTests.java @@ -1361,6 +1361,12 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests { refresh(); assertThat(client.prepareCount("test-idx").get().getCount(), equalTo(100L)); + // Update settings to make sure that relocation is slow so we can start snapshot before relocation is finished + assertAcked(client.admin().indices().prepareUpdateSettings("test-idx").setSettings(ImmutableSettings.builder() + .put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "all") + .put(AbstractIndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC, 100) + )); + logger.info("--> start relocations"); allowNodes("test-idx", internalCluster().numDataNodes()); @@ -1371,6 +1377,11 @@ public class SharedClusterSnapshotRestoreTests extends AbstractSnapshotTests { logger.info("--> snapshot"); client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(false).setIndices("test-idx").get(); + // Update settings to back to normal + assertAcked(client.admin().indices().prepareUpdateSettings("test-idx").setSettings(ImmutableSettings.builder() + .put(AbstractIndexStore.INDEX_STORE_THROTTLE_TYPE, "node") + )); + logger.info("--> wait for snapshot to complete"); SnapshotInfo snapshotInfo = waitForCompletion("test-repo", "test-snap", TimeValue.timeValueSeconds(600)); assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); diff --git a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java index 6f4f0e94a4b..14c1aa06302 100644 --- a/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java +++ b/src/test/java/org/elasticsearch/test/ElasticsearchIntegrationTest.java @@ -27,6 +27,7 @@ import com.google.common.base.Joiner; import com.google.common.base.Predicate; import com.google.common.collect.Lists; +import org.apache.lucene.store.StoreRateLimiting; import org.apache.lucene.util.AbstractRandomizedTest; import org.apache.lucene.util.IOUtils; import org.apache.lucene.util.TestUtil; @@ -429,11 +430,23 @@ public abstract class ElasticsearchIntegrationTest extends ElasticsearchTestCase setRandomTranslogSettings(random, builder); setRandomNormsLoading(random, builder); setRandomScriptingSettings(random, builder); + if (random.nextBoolean()) { + if (random.nextInt(10) == 0) { // do something crazy slow here + builder.put(IndicesStore.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, new ByteSizeValue(RandomInts.randomIntBetween(random, 1, 10), ByteSizeUnit.MB)); + } else { + builder.put(IndicesStore.INDICES_STORE_THROTTLE_MAX_BYTES_PER_SEC, new ByteSizeValue(RandomInts.randomIntBetween(random, 10, 200), ByteSizeUnit.MB)); + } + } + if (random.nextBoolean()) { + builder.put(IndicesStore.INDICES_STORE_THROTTLE_TYPE, RandomPicks.randomFrom(random, StoreRateLimiting.Type.values())); + } if (random.nextBoolean()) { builder.put(StoreModule.DISTIBUTOR_KEY, random.nextBoolean() ? StoreModule.LEAST_USED_DISTRIBUTOR : StoreModule.RANDOM_WEIGHT_DISTRIBUTOR); } - + if (random.nextBoolean()) { + builder.put(ConcurrentMergeSchedulerProvider.AUTO_THROTTLE, false); + } if (random.nextBoolean()) { if (random.nextInt(10) == 0) { // do something crazy slow here diff --git a/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java b/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java index fccd693866b..bb2a262c278 100644 --- a/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java +++ b/src/test/java/org/elasticsearch/test/store/MockFSDirectoryService.java @@ -24,6 +24,7 @@ import org.apache.lucene.index.CheckIndex; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.store.Directory; import org.apache.lucene.store.LockFactory; +import org.apache.lucene.store.StoreRateLimiting; import org.apache.lucene.util.AbstractRandomizedTest; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.Inject; @@ -150,6 +151,21 @@ public class MockFSDirectoryService extends FsDirectoryService { } } + @Override + public void onPause(long nanos) { + delegateService.onPause(nanos); + } + + @Override + public StoreRateLimiting rateLimiting() { + return delegateService.rateLimiting(); + } + + @Override + public long throttleTimeInNanos() { + return delegateService.throttleTimeInNanos(); + } + @Override public Directory newFromDistributor(Distributor distributor) throws IOException { return helper.wrap(super.newFromDistributor(distributor));