Peer recovery: Allow to throttle recovery based on "size per sec", closes #1304.

This commit is contained in:
Shay Banon 2011-09-06 12:40:14 +03:00
parent 8ebbd1e7b9
commit b653d149d1
5 changed files with 60 additions and 5 deletions

View File

@ -17,6 +17,15 @@
#gateway.recover_after_time: 5m
#gateway.expected_nodes: 2
# Recovery Throttling
# The number of concurrent recoveries happening on a node
#cluster.routing.allocation.node_initial_primaries_recoveries: 4
#cluster.routing.allocation.node_concurrent_recoveries: 2
# Peer shard recovery size based throttling (set to 100mb for example to enable)
#indices.recovery.max_size_per_sec: 0
# Number open concurrent recovery streams allows
#indices.recovery.concurrent_streams: 5
# Controls the minimum number of master eligible nodes this node should "see"
# in order to operate within the cluster.
# Set this to a higher value (2-4) when running more than 2 nodes in the cluster

View File

@ -10,7 +10,7 @@ logger:
#index.gateway: DEBUG
# peer shard recovery
#index.shard.recovery: DEBUG
#indices.recovery: DEBUG
# discovery
#discovery: TRACE

View File

@ -20,6 +20,8 @@
package org.elasticsearch.indices.recovery;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.RateLimiter;
import org.elasticsearch.common.base.Objects;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -41,7 +43,8 @@ public class RecoverySettings extends AbstractComponent {
MetaData.addDynamicSettings("indices.recovery.translog_ops");
MetaData.addDynamicSettings("indices.recovery.translog_size");
MetaData.addDynamicSettings("indices.recovery.compress");
MetaData.addDynamicSettings("`");
MetaData.addDynamicSettings("indices.recovery.concurrent_streams");
MetaData.addDynamicSettings("indices.recovery.max_size_per_sec");
}
private volatile ByteSizeValue fileChunkSize;
@ -53,6 +56,9 @@ public class RecoverySettings extends AbstractComponent {
private volatile int concurrentStreams;
private final ThreadPoolExecutor concurrentStreamPool;
private volatile ByteSizeValue maxSizePerSec;
private volatile RateLimiter rateLimiter;
@Inject public RecoverySettings(Settings settings, NodeSettingsService nodeSettingsService) {
super(settings);
@ -64,8 +70,15 @@ public class RecoverySettings extends AbstractComponent {
this.concurrentStreams = componentSettings.getAsInt("concurrent_streams", settings.getAsInt("index.shard.recovery.concurrent_streams", 5));
this.concurrentStreamPool = (ThreadPoolExecutor) DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[recovery_stream]"));
logger.debug("using concurrent_streams [{}], file_chunk_size [{}], translog_size [{}], translog_ops [{}], and compress [{}]",
concurrentStreams, fileChunkSize, translogSize, translogOps, compress);
this.maxSizePerSec = componentSettings.getAsBytesSize("max_size_per_sec", new ByteSizeValue(0));
if (maxSizePerSec.bytes() <= 0) {
rateLimiter = null;
} else {
rateLimiter = new RateLimiter(maxSizePerSec.mbFrac());
}
logger.debug("using max_size_per_sec[{}], concurrent_streams [{}], file_chunk_size [{}], translog_size [{}], translog_ops [{}], and compress [{}]",
maxSizePerSec, concurrentStreams, fileChunkSize, translogSize, translogOps, compress);
nodeSettingsService.addListener(new ApplySettings());
}
@ -98,8 +111,25 @@ public class RecoverySettings extends AbstractComponent {
return concurrentStreamPool;
}
public RateLimiter rateLimiter() {
return rateLimiter;
}
class ApplySettings implements NodeSettingsService.Listener {
@Override public void onRefreshSettings(Settings settings) {
ByteSizeValue maxSizePerSec = settings.getAsBytesSize("indices.recovery.max_size_per_sec", RecoverySettings.this.maxSizePerSec);
if (!Objects.equal(maxSizePerSec, RecoverySettings.this.maxSizePerSec)) {
logger.info("updating [indices.recovery.max_size_per_sec] from [{}] to [{}]", RecoverySettings.this.maxSizePerSec, maxSizePerSec);
RecoverySettings.this.maxSizePerSec = maxSizePerSec;
if (maxSizePerSec.bytes() <= 0) {
rateLimiter = null;
} else if (rateLimiter != null) {
rateLimiter.setMaxRate(maxSizePerSec.mbFrac());
} else {
rateLimiter = new RateLimiter(maxSizePerSec.mbFrac());
}
}
ByteSizeValue fileChunkSize = settings.getAsBytesSize("indices.recovery.file_chunk_size", RecoverySettings.this.fileChunkSize);
if (!fileChunkSize.equals(RecoverySettings.this.fileChunkSize)) {
logger.info("updating [indices.recovery.file_chunk_size] from [{}] to [{}]", RecoverySettings.this.fileChunkSize, fileChunkSize);

View File

@ -151,6 +151,11 @@ public class RecoverySource extends AbstractComponent {
}
int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE;
long position = indexInput.getFilePointer();
if (recoverySettings.rateLimiter() != null) {
recoverySettings.rateLimiter().pause(toRead);
}
indexInput.readBytes(buf, 0, toRead, false);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, md.checksum(), buf, toRead),
TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
@ -247,6 +252,11 @@ public class RecoverySource extends AbstractComponent {
size += operation.estimateSize();
totalOperations++;
if (ops >= recoverySettings.translogOps() || size >= recoverySettings.translogSize().bytes()) {
if (recoverySettings.rateLimiter() != null) {
recoverySettings.rateLimiter().pause(size);
}
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
ops = 0;

View File

@ -87,14 +87,17 @@ public class RecoveryTarget extends AbstractComponent {
private final IndicesService indicesService;
private final RecoverySettings recoverySettings;
private final ConcurrentMap<ShardId, RecoveryStatus> onGoingRecoveries = ConcurrentCollections.newConcurrentMap();
@Inject public RecoveryTarget(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService,
IndicesLifecycle indicesLifecycle) {
IndicesLifecycle indicesLifecycle, RecoverySettings recoverySettings) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.indicesService = indicesService;
this.recoverySettings = recoverySettings;
transportService.registerHandler(Actions.FILES_INFO, new FilesInfoRequestHandler());
transportService.registerHandler(Actions.FILE_CHUNK, new FileChunkTransportRequestHandler());
@ -516,6 +519,9 @@ public class RecoveryTarget extends AbstractComponent {
}
synchronized (indexOutput) {
try {
if (recoverySettings.rateLimiter() != null) {
recoverySettings.rateLimiter().pause(request.contentLength());
}
indexOutput.writeBytes(request.content(), request.contentLength());
onGoingRecovery.currentFilesSize.addAndGet(request.contentLength());
if (indexOutput.getFilePointer() == request.length()) {