Store Throttling (node level and/or index level) with options on merge or all, closes #2041.

Allow to configure store throttling (only applied on file system based storage), which allows to control the maximum bytes per sec written to the file system. It can be configured to only apply while merging, or on all output operations. The setting can eb set on the node level (in which case the throttling is done across all shards allocated on the node), or index level, in which case it only applied to that index.

The node level settings are indices.store.throttle.type to set the type, with values of none, merge and all (defaults to none). And, also, indices.store.throttle.max_bytes_per_sec (defaults to 0), which can be set to something like 1mb.

The index level settings is index.store.throttle.type for the type, with values of node, none, merge, and all. Defaults to node which will use the "shared" throttling on the node level. And, index.store.throttle.max_bytes_per_sec (defaults to 0).
This commit is contained in:
Shay Banon 2012-06-21 21:20:18 +02:00
parent 9e6cfa77a5
commit 90371beedc
29 changed files with 561 additions and 29 deletions

View File

@ -1,3 +1,22 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lucene.index; package org.apache.lucene.index;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;

View File

@ -0,0 +1,76 @@
package org.apache.lucene.store;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.RateLimiter;
import org.elasticsearch.common.unit.ByteSizeValue;
/**
*/
public class StoreRateLimiting {
public static interface Provider {
StoreRateLimiting rateLimiting();
}
public interface Listener {
void onPause(long nanos);
}
public static enum Type {
NONE,
MERGE,
ALL;
public static Type fromString(String type) throws ElasticSearchIllegalArgumentException {
if ("none".equalsIgnoreCase(type)) {
return NONE;
} else if ("merge".equalsIgnoreCase(type)) {
return MERGE;
} else if ("all".equalsIgnoreCase(type)) {
return ALL;
}
throw new ElasticSearchIllegalArgumentException("rate limiting type [" + type + "] not valid, can be one of [all|merge|none]");
}
}
private final RateLimiter rateLimiter = new RateLimiter(0);
private volatile RateLimiter actualRateLimiter;
private volatile Type type;
public StoreRateLimiting() {
}
@Nullable
public RateLimiter getRateLimiter() {
return actualRateLimiter;
}
public void setMaxRate(ByteSizeValue rate) {
if (rate.bytes() <= 0) {
actualRateLimiter = null;
} else if (actualRateLimiter == null) {
actualRateLimiter = rateLimiter;
actualRateLimiter.setMaxRate(rate.mbFrac());
} else {
assert rateLimiter == actualRateLimiter;
rateLimiter.setMaxRate(rate.mbFrac());
}
}
public Type getType() {
return type;
}
public void setType(Type type) {
this.type = type;
}
public void setType(String type) throws ElasticSearchIllegalArgumentException {
this.type = Type.fromString(type);
}
}

View File

@ -0,0 +1,26 @@
package org.apache.lucene.store;
import org.elasticsearch.common.RateLimiter;
import java.io.IOException;
/**
*/
class XFSIndexOutput extends FSDirectory.FSIndexOutput {
private final RateLimiter rateLimiter;
private final StoreRateLimiting.Listener rateListener;
XFSIndexOutput(FSDirectory parent, String name, RateLimiter rateLimiter, StoreRateLimiting.Listener rateListener) throws IOException {
super(parent, name);
this.rateLimiter = rateLimiter;
this.rateListener = rateListener;
}
@Override
public void flushBuffer(byte[] b, int offset, int size) throws IOException {
rateListener.onPause(rateLimiter.pause(size));
super.flushBuffer(b, offset, size);
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lucene.store;
import org.apache.lucene.index.TrackingMergeScheduler;
import org.elasticsearch.common.RateLimiter;
import java.io.File;
import java.io.IOException;
/**
*/
public class XMMapFSDirectory extends NIOFSDirectory {
private final StoreRateLimiting.Provider rateLimitingProvider;
private final StoreRateLimiting.Listener rateListener;
public XMMapFSDirectory(File path, LockFactory lockFactory, StoreRateLimiting.Provider rateLimitingProvider, StoreRateLimiting.Listener rateListener) throws IOException {
super(path, lockFactory);
this.rateLimitingProvider = rateLimitingProvider;
this.rateListener = rateListener;
}
@Override
public IndexOutput createOutput(String name) throws IOException {
StoreRateLimiting rateLimiting = rateLimitingProvider.rateLimiting();
StoreRateLimiting.Type type = rateLimiting.getType();
RateLimiter limiter = rateLimiting.getRateLimiter();
if (type == StoreRateLimiting.Type.NONE || limiter == null) {
return super.createOutput(name);
}
if (TrackingMergeScheduler.getCurrentMerge() != null) {
// we are mering, and type is either MERGE or ALL, rate limit...
ensureOpen();
ensureCanWrite(name);
return new XFSIndexOutput(this, name, limiter, rateListener);
}
if (type == StoreRateLimiting.Type.ALL) {
ensureOpen();
ensureCanWrite(name);
return new XFSIndexOutput(this, name, limiter, rateListener);
}
// we shouldn't really get here...
return super.createOutput(name);
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lucene.store;
import org.apache.lucene.index.TrackingMergeScheduler;
import org.elasticsearch.common.RateLimiter;
import java.io.File;
import java.io.IOException;
/**
*/
public class XNIOFSDirectory extends NIOFSDirectory {
private final StoreRateLimiting.Provider rateLimitingProvider;
private final StoreRateLimiting.Listener rateListener;
public XNIOFSDirectory(File path, LockFactory lockFactory, StoreRateLimiting.Provider rateLimitingProvider, StoreRateLimiting.Listener rateListener) throws IOException {
super(path, lockFactory);
this.rateLimitingProvider = rateLimitingProvider;
this.rateListener = rateListener;
}
@Override
public IndexOutput createOutput(String name) throws IOException {
StoreRateLimiting rateLimiting = rateLimitingProvider.rateLimiting();
StoreRateLimiting.Type type = rateLimiting.getType();
RateLimiter limiter = rateLimiting.getRateLimiter();
if (type == StoreRateLimiting.Type.NONE || limiter == null) {
return super.createOutput(name);
}
if (TrackingMergeScheduler.getCurrentMerge() != null) {
// we are mering, and type is either MERGE or ALL, rate limit...
ensureOpen();
ensureCanWrite(name);
return new XFSIndexOutput(this, name, limiter, rateListener);
}
if (type == StoreRateLimiting.Type.ALL) {
ensureOpen();
ensureCanWrite(name);
return new XFSIndexOutput(this, name, limiter, rateListener);
}
// we shouldn't really get here...
return super.createOutput(name);
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.lucene.store;
import org.apache.lucene.index.TrackingMergeScheduler;
import org.elasticsearch.common.RateLimiter;
import java.io.File;
import java.io.IOException;
/**
*/
public class XSimpleFSDirectory extends SimpleFSDirectory {
private final StoreRateLimiting.Provider rateLimitingProvider;
private final StoreRateLimiting.Listener rateListener;
public XSimpleFSDirectory(File path, LockFactory lockFactory, StoreRateLimiting.Provider rateLimitingProvider, StoreRateLimiting.Listener rateListener) throws IOException {
super(path, lockFactory);
this.rateLimitingProvider = rateLimitingProvider;
this.rateListener = rateListener;
}
@Override
public IndexOutput createOutput(String name) throws IOException {
StoreRateLimiting rateLimiting = rateLimitingProvider.rateLimiting();
StoreRateLimiting.Type type = rateLimiting.getType();
RateLimiter limiter = rateLimiting.getRateLimiter();
if (type == StoreRateLimiting.Type.NONE || limiter == null) {
return super.createOutput(name);
}
if (TrackingMergeScheduler.getCurrentMerge() != null) {
// we are mering, and type is either MERGE or ALL, rate limit...
ensureOpen();
ensureCanWrite(name);
return new XFSIndexOutput(this, name, limiter, rateListener);
}
if (type == StoreRateLimiting.Type.ALL) {
ensureOpen();
ensureCanWrite(name);
return new XFSIndexOutput(this, name, limiter, rateListener);
}
// we shouldn't really get here...
return super.createOutput(name);
}
}

View File

@ -53,7 +53,7 @@ public class RateLimiter {
* might exceed the target). It's best to call this * might exceed the target). It's best to call this
* with a biggish count, not one byte at a time. * with a biggish count, not one byte at a time.
*/ */
public void pause(long bytes) { public long pause(long bytes) {
// TODO: this is purely instantenous rate; maybe we // TODO: this is purely instantenous rate; maybe we
// should also offer decayed recent history one? // should also offer decayed recent history one?
@ -65,11 +65,13 @@ public class RateLimiter {
// While loop because Thread.sleep doesn't alway sleep // While loop because Thread.sleep doesn't alway sleep
// enough: // enough:
long totalPauseTime = 0;
while (true) { while (true) {
final long pauseNS = targetNS - curNS; final long pauseNS = targetNS - curNS;
if (pauseNS > 0) { if (pauseNS > 0) {
try { try {
Thread.sleep((int) (pauseNS / 1000000), (int) (pauseNS % 1000000)); Thread.sleep((int) (pauseNS / 1000000), (int) (pauseNS % 1000000));
totalPauseTime += pauseNS;
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
throw new ElasticSearchInterruptedException("interrupted while rate limiting", ie); throw new ElasticSearchInterruptedException("interrupted while rate limiting", ie);
} }
@ -78,5 +80,6 @@ public class RateLimiter {
} }
break; break;
} }
return totalPauseTime;
} }
} }

View File

@ -32,6 +32,7 @@ import org.elasticsearch.index.gateway.IndexGateway;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.percolator.PercolatorService; import org.elasticsearch.index.percolator.PercolatorService;
import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.similarity.SimilarityService;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
@ -47,6 +48,8 @@ public interface IndexService extends IndexComponent, Iterable<IndexShard> {
IndexCache cache(); IndexCache cache();
IndexSettingsService settingsService();
PercolatorService percolateService(); PercolatorService percolateService();
AnalysisService analysisService(); AnalysisService analysisService();

View File

@ -52,6 +52,7 @@ import org.elasticsearch.index.percolator.PercolatorService;
import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.query.IndexQueryParserService;
import org.elasticsearch.index.search.stats.ShardSearchModule; import org.elasticsearch.index.search.stats.ShardSearchModule;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.IndexShardCreationException; import org.elasticsearch.index.shard.IndexShardCreationException;
import org.elasticsearch.index.shard.IndexShardManagement; import org.elasticsearch.index.shard.IndexShardManagement;
import org.elasticsearch.index.shard.IndexShardModule; import org.elasticsearch.index.shard.IndexShardModule;
@ -117,6 +118,8 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
private final IndexStore indexStore; private final IndexStore indexStore;
private final IndexSettingsService settingsService;
private volatile ImmutableMap<Integer, Injector> shardsInjectors = ImmutableMap.of(); private volatile ImmutableMap<Integer, Injector> shardsInjectors = ImmutableMap.of();
private volatile ImmutableMap<Integer, IndexShard> shards = ImmutableMap.of(); private volatile ImmutableMap<Integer, IndexShard> shards = ImmutableMap.of();
@ -127,7 +130,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv, ThreadPool threadPool, public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv, ThreadPool threadPool,
PercolatorService percolatorService, AnalysisService analysisService, MapperService mapperService, PercolatorService percolatorService, AnalysisService analysisService, MapperService mapperService,
IndexQueryParserService queryParserService, SimilarityService similarityService, IndexAliasesService aliasesService, IndexQueryParserService queryParserService, SimilarityService similarityService, IndexAliasesService aliasesService,
IndexCache indexCache, IndexEngine indexEngine, IndexGateway indexGateway, IndexStore indexStore) { IndexCache indexCache, IndexEngine indexEngine, IndexGateway indexGateway, IndexStore indexStore, IndexSettingsService settingsService) {
super(index, indexSettings); super(index, indexSettings);
this.injector = injector; this.injector = injector;
this.nodeEnv = nodeEnv; this.nodeEnv = nodeEnv;
@ -143,6 +146,7 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
this.indexEngine = indexEngine; this.indexEngine = indexEngine;
this.indexGateway = indexGateway; this.indexGateway = indexGateway;
this.indexStore = indexStore; this.indexStore = indexStore;
this.settingsService = settingsService;
this.pluginsService = injector.getInstance(PluginsService.class); this.pluginsService = injector.getInstance(PluginsService.class);
this.indicesLifecycle = (InternalIndicesLifecycle) injector.getInstance(IndicesLifecycle.class); this.indicesLifecycle = (InternalIndicesLifecycle) injector.getInstance(IndicesLifecycle.class);
@ -192,6 +196,11 @@ public class InternalIndexService extends AbstractIndexComponent implements Inde
return indexGateway; return indexGateway;
} }
@Override
public IndexSettingsService settingsService() {
return this.settingsService;
}
@Override @Override
public IndexStore store() { public IndexStore store() {
return indexStore; return indexStore;

View File

@ -29,6 +29,8 @@ public interface DirectoryService {
Directory[] build() throws IOException; Directory[] build() throws IOException;
long throttleTimeInNanos();
void renameFile(Directory dir, String from, String to) throws IOException; void renameFile(Directory dir, String from, String to) throws IOException;
void fullDelete(Directory dir) throws IOException; void fullDelete(Directory dir) throws IOException;

View File

@ -19,24 +19,32 @@
package org.elasticsearch.index.store; package org.elasticsearch.index.store;
import org.apache.lucene.store.StoreRateLimiting;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.IndexComponent; import org.elasticsearch.index.CloseableIndexComponent;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.store.IndicesStore;
import java.io.IOException; import java.io.IOException;
/** /**
* Index store is an index level information of the {@link Store} each shard will use. * Index store is an index level information of the {@link Store} each shard will use.
*
*
*/ */
public interface IndexStore extends IndexComponent { public interface IndexStore extends CloseableIndexComponent {
/** /**
* Is the store a persistent store that can survive full restarts. * Is the store a persistent store that can survive full restarts.
*/ */
boolean persistent(); boolean persistent();
IndicesStore indicesStore();
/**
* Returns the rate limiting, either of the index is explicitly configured, or
* the node level one (defaults to the node level one).
*/
StoreRateLimiting rateLimiting();
/** /**
* The shard store class that should be used for each shard. * The shard store class that should be used for each shard.
*/ */

View File

@ -173,7 +173,7 @@ public class Store extends AbstractIndexShardComponent {
} }
public StoreStats stats() throws IOException { public StoreStats stats() throws IOException {
return new StoreStats(Directories.estimateSize(directory)); return new StoreStats(Directories.estimateSize(directory), directoryService.throttleTimeInNanos());
} }
public ByteSizeValue estimateSize() throws IOException { public ByteSizeValue estimateSize() throws IOException {

View File

@ -23,6 +23,7 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentBuilderString; import org.elasticsearch.common.xcontent.XContentBuilderString;
@ -35,12 +36,15 @@ public class StoreStats implements Streamable, ToXContent {
private long sizeInBytes; private long sizeInBytes;
private long throttleTimeInNanos;
public StoreStats() { public StoreStats() {
} }
public StoreStats(long sizeInBytes) { public StoreStats(long sizeInBytes, long throttleTimeInNanos) {
this.sizeInBytes = sizeInBytes; this.sizeInBytes = sizeInBytes;
this.throttleTimeInNanos = throttleTimeInNanos;
} }
public void add(StoreStats stats) { public void add(StoreStats stats) {
@ -48,6 +52,7 @@ public class StoreStats implements Streamable, ToXContent {
return; return;
} }
sizeInBytes += stats.sizeInBytes; sizeInBytes += stats.sizeInBytes;
throttleTimeInNanos += stats.throttleTimeInNanos;
} }
@ -67,6 +72,14 @@ public class StoreStats implements Streamable, ToXContent {
return size(); return size();
} }
public TimeValue throttleTime() {
return TimeValue.timeValueNanos(throttleTimeInNanos);
}
public TimeValue getThrottleTime() {
return throttleTime();
}
public static StoreStats readStoreStats(StreamInput in) throws IOException { public static StoreStats readStoreStats(StreamInput in) throws IOException {
StoreStats store = new StoreStats(); StoreStats store = new StoreStats();
store.readFrom(in); store.readFrom(in);
@ -76,11 +89,13 @@ public class StoreStats implements Streamable, ToXContent {
@Override @Override
public void readFrom(StreamInput in) throws IOException { public void readFrom(StreamInput in) throws IOException {
sizeInBytes = in.readVLong(); sizeInBytes = in.readVLong();
throttleTimeInNanos = in.readVLong();
} }
@Override @Override
public void writeTo(StreamOutput out) throws IOException { public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(sizeInBytes); out.writeVLong(sizeInBytes);
out.writeVLong(throttleTimeInNanos);
} }
@Override @Override
@ -88,6 +103,8 @@ public class StoreStats implements Streamable, ToXContent {
builder.startObject(Fields.STORE); builder.startObject(Fields.STORE);
builder.field(Fields.SIZE, size().toString()); builder.field(Fields.SIZE, size().toString());
builder.field(Fields.SIZE_IN_BYTES, sizeInBytes); builder.field(Fields.SIZE_IN_BYTES, sizeInBytes);
builder.field(Fields.THROTTLE_TIME, throttleTime().toString());
builder.field(Fields.THROTTLE_TIME_IN_MILLIS, throttleTime().millis());
builder.endObject(); builder.endObject();
return builder; return builder;
} }
@ -96,5 +113,8 @@ public class StoreStats implements Streamable, ToXContent {
static final XContentBuilderString STORE = new XContentBuilderString("store"); static final XContentBuilderString STORE = new XContentBuilderString("store");
static final XContentBuilderString SIZE = new XContentBuilderString("size"); static final XContentBuilderString SIZE = new XContentBuilderString("size");
static final XContentBuilderString SIZE_IN_BYTES = new XContentBuilderString("size_in_bytes"); static final XContentBuilderString SIZE_IN_BYTES = new XContentBuilderString("size_in_bytes");
static final XContentBuilderString THROTTLE_TIME = new XContentBuilderString("throttle_time");
static final XContentBuilderString THROTTLE_TIME_IN_MILLIS = new XContentBuilderString("throttle_time_in_millis");
} }
} }

View File

@ -21,6 +21,7 @@ package org.elasticsearch.index.store.fs;
import org.apache.lucene.store.*; import org.apache.lucene.store.*;
import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.AbstractIndexShardComponent; import org.elasticsearch.index.shard.AbstractIndexShardComponent;
@ -35,15 +36,27 @@ import java.io.InterruptedIOException;
/** /**
*/ */
public abstract class FsDirectoryService extends AbstractIndexShardComponent implements DirectoryService { public abstract class FsDirectoryService extends AbstractIndexShardComponent implements DirectoryService, StoreRateLimiting.Listener, StoreRateLimiting.Provider {
protected final FsIndexStore indexStore; protected final FsIndexStore indexStore;
private final CounterMetric rateLimitingTimeInNanos = new CounterMetric();
public FsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) { public FsDirectoryService(ShardId shardId, @IndexSettings Settings indexSettings, IndexStore indexStore) {
super(shardId, indexSettings); super(shardId, indexSettings);
this.indexStore = (FsIndexStore) indexStore; this.indexStore = (FsIndexStore) indexStore;
} }
@Override
public long throttleTimeInNanos() {
return rateLimitingTimeInNanos.count();
}
@Override
public StoreRateLimiting rateLimiting() {
return indexStore.rateLimiting();
}
protected LockFactory buildLockFactory() throws IOException { protected LockFactory buildLockFactory() throws IOException {
String fsLock = componentSettings.get("lock", componentSettings.get("fs_lock", "native")); String fsLock = componentSettings.get("lock", componentSettings.get("fs_lock", "native"));
LockFactory lockFactory = NoLockFactory.getNoLockFactory(); LockFactory lockFactory = NoLockFactory.getNoLockFactory();
@ -98,4 +111,9 @@ public abstract class FsDirectoryService extends AbstractIndexShardComponent imp
FileSystemUtils.deleteRecursively(fsDirectory.getDirectory().getParentFile()); FileSystemUtils.deleteRecursively(fsDirectory.getDirectory().getParentFile());
} }
} }
@Override
public void onPause(long nanos) {
rateLimitingTimeInNanos.inc(nanos);
}
} }

View File

@ -29,6 +29,7 @@ import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.support.AbstractIndexStore; import org.elasticsearch.index.store.support.AbstractIndexStore;
import org.elasticsearch.indices.store.IndicesStore;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -42,8 +43,8 @@ public abstract class FsIndexStore extends AbstractIndexStore {
private final File[] locations; private final File[] locations;
public FsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, NodeEnvironment nodeEnv) { public FsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, IndicesStore indicesStore, NodeEnvironment nodeEnv) {
super(index, indexSettings, indexService); super(index, indexSettings, indexService, indicesStore);
this.nodeEnv = nodeEnv; this.nodeEnv = nodeEnv;
if (nodeEnv.hasNodeFile()) { if (nodeEnv.hasNodeFile()) {
this.locations = nodeEnv.indexLocations(index); this.locations = nodeEnv.indexLocations(index);

View File

@ -20,7 +20,7 @@
package org.elasticsearch.index.store.fs; package org.elasticsearch.index.store.fs;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.MMapDirectory; import org.apache.lucene.store.XMMapFSDirectory;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -46,7 +46,7 @@ public class MmapFsDirectoryService extends FsDirectoryService {
Directory[] dirs = new Directory[locations.length]; Directory[] dirs = new Directory[locations.length];
for (int i = 0; i < dirs.length; i++) { for (int i = 0; i < dirs.length; i++) {
FileSystemUtils.mkdirs(locations[i]); FileSystemUtils.mkdirs(locations[i]);
dirs[i] = new MMapDirectory(locations[i], buildLockFactory()); dirs[i] = new XMMapFSDirectory(locations[i], buildLockFactory(), this, this);
} }
return dirs; return dirs;
} }

View File

@ -26,6 +26,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.indices.store.IndicesStore;
/** /**
* *
@ -33,8 +34,8 @@ import org.elasticsearch.index.store.DirectoryService;
public class MmapFsIndexStore extends FsIndexStore { public class MmapFsIndexStore extends FsIndexStore {
@Inject @Inject
public MmapFsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, NodeEnvironment nodeEnv) { public MmapFsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, IndicesStore indicesStore, NodeEnvironment nodeEnv) {
super(index, indexSettings, indexService, nodeEnv); super(index, indexSettings, indexService, indicesStore, nodeEnv);
} }
@Override @Override

View File

@ -20,7 +20,7 @@
package org.elasticsearch.index.store.fs; package org.elasticsearch.index.store.fs;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.NIOFSDirectory; import org.apache.lucene.store.XNIOFSDirectory;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -46,7 +46,7 @@ public class NioFsDirectoryService extends FsDirectoryService {
Directory[] dirs = new Directory[locations.length]; Directory[] dirs = new Directory[locations.length];
for (int i = 0; i < dirs.length; i++) { for (int i = 0; i < dirs.length; i++) {
FileSystemUtils.mkdirs(locations[i]); FileSystemUtils.mkdirs(locations[i]);
dirs[i] = new NIOFSDirectory(locations[i], buildLockFactory()); dirs[i] = new XNIOFSDirectory(locations[i], buildLockFactory(), this, this);
} }
return dirs; return dirs;
} }

View File

@ -26,6 +26,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.indices.store.IndicesStore;
/** /**
* *
@ -33,8 +34,8 @@ import org.elasticsearch.index.store.DirectoryService;
public class NioFsIndexStore extends FsIndexStore { public class NioFsIndexStore extends FsIndexStore {
@Inject @Inject
public NioFsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, NodeEnvironment nodeEnv) { public NioFsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, IndicesStore indicesStore, NodeEnvironment nodeEnv) {
super(index, indexSettings, indexService, nodeEnv); super(index, indexSettings, indexService, indicesStore, nodeEnv);
} }
@Override @Override

View File

@ -20,7 +20,7 @@
package org.elasticsearch.index.store.fs; package org.elasticsearch.index.store.fs;
import org.apache.lucene.store.Directory; import org.apache.lucene.store.Directory;
import org.apache.lucene.store.SimpleFSDirectory; import org.apache.lucene.store.XSimpleFSDirectory;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
@ -46,7 +46,7 @@ public class SimpleFsDirectoryService extends FsDirectoryService {
Directory[] dirs = new Directory[locations.length]; Directory[] dirs = new Directory[locations.length];
for (int i = 0; i < dirs.length; i++) { for (int i = 0; i < dirs.length; i++) {
FileSystemUtils.mkdirs(locations[i]); FileSystemUtils.mkdirs(locations[i]);
dirs[i] = new SimpleFSDirectory(locations[i], buildLockFactory()); dirs[i] = new XSimpleFSDirectory(locations[i], buildLockFactory(), this, this);
} }
return dirs; return dirs;
} }

View File

@ -26,6 +26,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.indices.store.IndicesStore;
/** /**
* *
@ -33,8 +34,8 @@ import org.elasticsearch.index.store.DirectoryService;
public class SimpleFsIndexStore extends FsIndexStore { public class SimpleFsIndexStore extends FsIndexStore {
@Inject @Inject
public SimpleFsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, NodeEnvironment nodeEnv) { public SimpleFsIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, IndicesStore indicesStore, NodeEnvironment nodeEnv) {
super(index, indexSettings, indexService, nodeEnv); super(index, indexSettings, indexService, indicesStore, nodeEnv);
} }
@Override @Override

View File

@ -47,6 +47,11 @@ public class ByteBufferDirectoryService extends AbstractIndexShardComponent impl
this.byteBufferCache = byteBufferCache; this.byteBufferCache = byteBufferCache;
} }
@Override
public long throttleTimeInNanos() {
return 0;
}
@Override @Override
public Directory[] build() { public Directory[] build() {
return new Directory[]{new CustomByteBufferDirectory(byteBufferCache)}; return new Directory[]{new CustomByteBufferDirectory(byteBufferCache)};

View File

@ -29,6 +29,7 @@ import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.support.AbstractIndexStore; import org.elasticsearch.index.store.support.AbstractIndexStore;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.monitor.jvm.JvmStats; import org.elasticsearch.monitor.jvm.JvmStats;
@ -41,8 +42,8 @@ public class ByteBufferIndexStore extends AbstractIndexStore {
@Inject @Inject
public ByteBufferIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, public ByteBufferIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService,
ByteBufferCache byteBufferCache) { ByteBufferCache byteBufferCache, IndicesStore indicesStore) {
super(index, indexSettings, indexService); super(index, indexSettings, indexService, indicesStore);
this.direct = byteBufferCache.direct(); this.direct = byteBufferCache.direct();
} }

View File

@ -41,6 +41,11 @@ public class RamDirectoryService extends AbstractIndexShardComponent implements
super(shardId, indexSettings); super(shardId, indexSettings);
} }
@Override
public long throttleTimeInNanos() {
return 0;
}
@Override @Override
public Directory[] build() { public Directory[] build() {
return new Directory[]{new CustomRAMDirectory()}; return new Directory[]{new CustomRAMDirectory()};

View File

@ -27,6 +27,7 @@ import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.store.DirectoryService; import org.elasticsearch.index.store.DirectoryService;
import org.elasticsearch.index.store.support.AbstractIndexStore; import org.elasticsearch.index.store.support.AbstractIndexStore;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.monitor.jvm.JvmInfo; import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.monitor.jvm.JvmStats; import org.elasticsearch.monitor.jvm.JvmStats;
@ -36,8 +37,8 @@ import org.elasticsearch.monitor.jvm.JvmStats;
public class RamIndexStore extends AbstractIndexStore { public class RamIndexStore extends AbstractIndexStore {
@Inject @Inject
public RamIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService) { public RamIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, IndicesStore indicesStore) {
super(index, indexSettings, indexService); super(index, indexSettings, indexService, indicesStore);
} }
@Override @Override

View File

@ -19,13 +19,19 @@
package org.elasticsearch.index.store.support; package org.elasticsearch.index.store.support;
import org.apache.lucene.store.StoreRateLimiting;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.AbstractIndexComponent; import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettings;
import org.elasticsearch.index.settings.IndexSettingsService;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.indices.store.IndicesStore;
import java.io.IOException; import java.io.IOException;
@ -34,11 +40,75 @@ import java.io.IOException;
*/ */
public abstract class AbstractIndexStore extends AbstractIndexComponent implements IndexStore { public abstract class AbstractIndexStore extends AbstractIndexComponent implements IndexStore {
static {
IndexMetaData.addDynamicSettings(
"index.store.throttle.type",
"index.store.throttle.max_bytes_per_sec"
);
}
class ApplySettings implements IndexSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
String rateLimitingType = indexSettings.get("index.store.throttle.type", AbstractIndexStore.this.rateLimitingType);
if (!rateLimitingType.equals(AbstractIndexStore.this.rateLimitingType)) {
logger.info("updating index.store.throttle.type from [{}] to [{}]", AbstractIndexStore.this.rateLimitingType, rateLimitingType);
if (rateLimitingType.equalsIgnoreCase("node")) {
AbstractIndexStore.this.rateLimitingType = rateLimitingType;
AbstractIndexStore.this.nodeRateLimiting = true;
} else {
StoreRateLimiting.Type.fromString(rateLimitingType);
AbstractIndexStore.this.rateLimitingType = rateLimitingType;
AbstractIndexStore.this.nodeRateLimiting = false;
AbstractIndexStore.this.rateLimiting.setType(rateLimitingType);
}
}
ByteSizeValue rateLimitingThrottle = settings.getAsBytesSize("index.store.throttle.max_bytes_per_sec", AbstractIndexStore.this.rateLimitingThrottle);
if (!rateLimitingThrottle.equals(AbstractIndexStore.this.rateLimitingThrottle)) {
logger.info("updating index.store.throttle.max_bytes_per_sec from [{}] to [{}], note, type is [{}]", AbstractIndexStore.this.rateLimitingThrottle, rateLimitingThrottle, AbstractIndexStore.this.rateLimitingType);
AbstractIndexStore.this.rateLimitingThrottle = rateLimitingThrottle;
AbstractIndexStore.this.rateLimiting.setMaxRate(rateLimitingThrottle);
}
}
}
protected final IndexService indexService; protected final IndexService indexService;
protected AbstractIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService) { protected final IndicesStore indicesStore;
private volatile String rateLimitingType;
private volatile ByteSizeValue rateLimitingThrottle;
private volatile boolean nodeRateLimiting;
private final StoreRateLimiting rateLimiting = new StoreRateLimiting();
private final ApplySettings applySettings = new ApplySettings();
protected AbstractIndexStore(Index index, @IndexSettings Settings indexSettings, IndexService indexService, IndicesStore indicesStore) {
super(index, indexSettings); super(index, indexSettings);
this.indexService = indexService; this.indexService = indexService;
this.indicesStore = indicesStore;
this.rateLimitingType = indexSettings.get("index.store.throttle.type", "node");
if (rateLimitingType.equalsIgnoreCase("node")) {
nodeRateLimiting = true;
} else {
nodeRateLimiting = false;
rateLimiting.setType(rateLimitingType);
}
this.rateLimitingThrottle = indexSettings.getAsBytesSize("index.store.throttle.max_bytes_per_sec", new ByteSizeValue(0));
rateLimiting.setMaxRate(rateLimitingThrottle);
logger.debug("using index.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimitingType, rateLimitingThrottle);
indexService.settingsService().addListener(applySettings);
}
@Override
public void close(boolean delete) throws ElasticSearchException {
indexService.settingsService().removeListener(applySettings);
} }
@Override @Override
@ -50,4 +120,14 @@ public abstract class AbstractIndexStore extends AbstractIndexComponent implemen
public void deleteUnallocated(ShardId shardId) throws IOException { public void deleteUnallocated(ShardId shardId) throws IOException {
// do nothing here... // do nothing here...
} }
@Override
public IndicesStore indicesStore() {
return indicesStore;
}
@Override
public StoreRateLimiting rateLimiting() {
return nodeRateLimiting ? indicesStore.rateLimiting() : this.rateLimiting;
}
} }

View File

@ -32,6 +32,7 @@ import org.elasticsearch.indices.query.IndicesQueriesModule;
import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.indices.recovery.RecoverySource; import org.elasticsearch.indices.recovery.RecoverySource;
import org.elasticsearch.indices.recovery.RecoveryTarget; import org.elasticsearch.indices.recovery.RecoveryTarget;
import org.elasticsearch.indices.store.IndicesStore;
import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData; import org.elasticsearch.indices.store.TransportNodesListShardStoreMetaData;
import org.elasticsearch.indices.ttl.IndicesTTLService; import org.elasticsearch.indices.ttl.IndicesTTLService;
import org.elasticsearch.indices.warmer.IndicesWarmer; import org.elasticsearch.indices.warmer.IndicesWarmer;
@ -63,6 +64,7 @@ public class IndicesModule extends AbstractModule implements SpawnModules {
bind(RecoveryTarget.class).asEagerSingleton(); bind(RecoveryTarget.class).asEagerSingleton();
bind(RecoverySource.class).asEagerSingleton(); bind(RecoverySource.class).asEagerSingleton();
bind(IndicesStore.class).asEagerSingleton();
bind(IndicesClusterStateService.class).asEagerSingleton(); bind(IndicesClusterStateService.class).asEagerSingleton();
bind(IndexingMemoryController.class).asEagerSingleton(); bind(IndexingMemoryController.class).asEagerSingleton();
bind(IndicesFilterCache.class).asEagerSingleton(); bind(IndicesFilterCache.class).asEagerSingleton();

View File

@ -62,6 +62,7 @@ import org.elasticsearch.index.shard.DocsStats;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.index.similarity.SimilarityModule; import org.elasticsearch.index.similarity.SimilarityModule;
import org.elasticsearch.index.store.IndexStore;
import org.elasticsearch.index.store.IndexStoreModule; import org.elasticsearch.index.store.IndexStoreModule;
import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.index.store.StoreStats;
import org.elasticsearch.indices.analysis.IndicesAnalysisService; import org.elasticsearch.indices.analysis.IndicesAnalysisService;
@ -349,6 +350,8 @@ public class InternalIndicesService extends AbstractLifecycleComponent<IndicesSe
indexInjector.getInstance(MapperService.class).close(); indexInjector.getInstance(MapperService.class).close();
indexInjector.getInstance(IndexQueryParserService.class).close(); indexInjector.getInstance(IndexQueryParserService.class).close();
indexInjector.getInstance(IndexStore.class).close(delete);
Injectors.close(injector); Injectors.close(injector);
indicesLifecycle.afterIndexClosed(indexService.index(), delete); indicesLifecycle.afterIndexClosed(indexService.index(), delete);

View File

@ -19,14 +19,17 @@
package org.elasticsearch.indices.store; package org.elasticsearch.indices.store;
import org.apache.lucene.store.StoreRateLimiting;
import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.*; import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
@ -34,6 +37,7 @@ import org.elasticsearch.index.Index;
import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import java.io.File; import java.io.File;
@ -45,8 +49,39 @@ import java.util.concurrent.ScheduledFuture;
*/ */
public class IndicesStore extends AbstractComponent implements ClusterStateListener { public class IndicesStore extends AbstractComponent implements ClusterStateListener {
static {
MetaData.addDynamicSettings(
"indices.store.throttle.type",
"indices.store.throttle.max_bytes_per_sec"
);
}
class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
String rateLimitingType = settings.get("indices.store.throttle.type", IndicesStore.this.rateLimitingType);
// try and parse the type
StoreRateLimiting.Type.fromString(rateLimitingType);
if (!rateLimitingType.equals(IndicesStore.this.rateLimitingType)) {
logger.info("updating indices.store.throttle.type from [{}] to [{}]", IndicesStore.this.rateLimitingType, rateLimitingType);
IndicesStore.this.rateLimitingType = rateLimitingType;
IndicesStore.this.rateLimiting.setType(rateLimitingType);
}
ByteSizeValue rateLimitingThrottle = settings.getAsBytesSize("indices.store.throttle.max_bytes_per_sec", IndicesStore.this.rateLimitingThrottle);
if (!rateLimitingThrottle.equals(IndicesStore.this.rateLimitingThrottle)) {
logger.info("updating indices.store.throttle.max_bytes_per_sec from [{}] to [{}], note, type is [{}]", IndicesStore.this.rateLimitingThrottle, rateLimitingThrottle, IndicesStore.this.rateLimitingType);
IndicesStore.this.rateLimitingThrottle = rateLimitingThrottle;
IndicesStore.this.rateLimiting.setMaxRate(rateLimitingThrottle);
}
}
}
private final NodeEnvironment nodeEnv; private final NodeEnvironment nodeEnv;
private final NodeSettingsService nodeSettingsService;
private final IndicesService indicesService; private final IndicesService indicesService;
private final ClusterService clusterService; private final ClusterService clusterService;
@ -59,6 +94,12 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
private final Object danglingMutex = new Object(); private final Object danglingMutex = new Object();
private volatile String rateLimitingType;
private volatile ByteSizeValue rateLimitingThrottle;
private final StoreRateLimiting rateLimiting = new StoreRateLimiting();
private final ApplySettings applySettings = new ApplySettings();
static class DanglingIndex { static class DanglingIndex {
public final String index; public final String index;
public final ScheduledFuture future; public final ScheduledFuture future;
@ -70,19 +111,33 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
} }
@Inject @Inject
public IndicesStore(Settings settings, NodeEnvironment nodeEnv, IndicesService indicesService, ClusterService clusterService, ThreadPool threadPool) { public IndicesStore(Settings settings, NodeEnvironment nodeEnv, NodeSettingsService nodeSettingsService, IndicesService indicesService, ClusterService clusterService, ThreadPool threadPool) {
super(settings); super(settings);
this.nodeEnv = nodeEnv; this.nodeEnv = nodeEnv;
this.nodeSettingsService = nodeSettingsService;
this.indicesService = indicesService; this.indicesService = indicesService;
this.clusterService = clusterService; this.clusterService = clusterService;
this.threadPool = threadPool; this.threadPool = threadPool;
this.rateLimitingType = componentSettings.get("throttle.type", "none");
rateLimiting.setType(rateLimitingType);
this.rateLimitingThrottle = componentSettings.getAsBytesSize("throttle.max_bytes_per_sec", new ByteSizeValue(0));
rateLimiting.setMaxRate(rateLimitingThrottle);
this.danglingTimeout = componentSettings.getAsTime("dangling_timeout", TimeValue.timeValueHours(2)); this.danglingTimeout = componentSettings.getAsTime("dangling_timeout", TimeValue.timeValueHours(2));
logger.debug("using indices.store.throttle.type [{}], with index.store.throttle.max_bytes_per_sec [{}]", rateLimitingType, rateLimitingThrottle);
nodeSettingsService.addListener(applySettings);
clusterService.addLast(this); clusterService.addLast(this);
} }
public StoreRateLimiting rateLimiting() {
return this.rateLimiting;
}
public void close() { public void close() {
nodeSettingsService.removeListener(applySettings);
clusterService.remove(this); clusterService.remove(this);
} }