diff --git a/config/elasticsearch.yml b/config/elasticsearch.yml index 8ba07c2bcfa..2b7ecc8cd83 100644 --- a/config/elasticsearch.yml +++ b/config/elasticsearch.yml @@ -17,6 +17,15 @@ #gateway.recover_after_time: 5m #gateway.expected_nodes: 2 +# Recovery Throttling +# The number of concurrent recoveries happening on a node +#cluster.routing.allocation.node_initial_primaries_recoveries: 4 +#cluster.routing.allocation.node_concurrent_recoveries: 2 +# Peer shard recovery size based throttling (set to 100mb for example to enable) +#indices.recovery.max_size_per_sec: 0 +# Number open concurrent recovery streams allows +#indices.recovery.concurrent_streams: 5 + # Controls the minimum number of master eligible nodes this node should "see" # in order to operate within the cluster. # Set this to a higher value (2-4) when running more than 2 nodes in the cluster diff --git a/config/logging.yml b/config/logging.yml index 553a9fcec0b..9f28c9891c5 100644 --- a/config/logging.yml +++ b/config/logging.yml @@ -10,7 +10,7 @@ logger: #index.gateway: DEBUG # peer shard recovery - #index.shard.recovery: DEBUG + #indices.recovery: DEBUG # discovery #discovery: TRACE diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index 121f7a894b0..e346e45f4b0 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -20,6 +20,8 @@ package org.elasticsearch.indices.recovery; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.common.RateLimiter; +import org.elasticsearch.common.base.Objects; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -41,7 +43,8 @@ public class RecoverySettings extends AbstractComponent { MetaData.addDynamicSettings("indices.recovery.translog_ops"); MetaData.addDynamicSettings("indices.recovery.translog_size"); MetaData.addDynamicSettings("indices.recovery.compress"); - MetaData.addDynamicSettings("`"); + MetaData.addDynamicSettings("indices.recovery.concurrent_streams"); + MetaData.addDynamicSettings("indices.recovery.max_size_per_sec"); } private volatile ByteSizeValue fileChunkSize; @@ -53,6 +56,9 @@ public class RecoverySettings extends AbstractComponent { private volatile int concurrentStreams; private final ThreadPoolExecutor concurrentStreamPool; + private volatile ByteSizeValue maxSizePerSec; + private volatile RateLimiter rateLimiter; + @Inject public RecoverySettings(Settings settings, NodeSettingsService nodeSettingsService) { super(settings); @@ -64,8 +70,15 @@ public class RecoverySettings extends AbstractComponent { this.concurrentStreams = componentSettings.getAsInt("concurrent_streams", settings.getAsInt("index.shard.recovery.concurrent_streams", 5)); this.concurrentStreamPool = (ThreadPoolExecutor) DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[recovery_stream]")); - logger.debug("using concurrent_streams [{}], file_chunk_size [{}], translog_size [{}], translog_ops [{}], and compress [{}]", - concurrentStreams, fileChunkSize, translogSize, translogOps, compress); + this.maxSizePerSec = componentSettings.getAsBytesSize("max_size_per_sec", new ByteSizeValue(0)); + if (maxSizePerSec.bytes() <= 0) { + rateLimiter = null; + } else { + rateLimiter = new RateLimiter(maxSizePerSec.mbFrac()); + } + + logger.debug("using max_size_per_sec[{}], concurrent_streams [{}], file_chunk_size [{}], translog_size [{}], translog_ops [{}], and compress [{}]", + maxSizePerSec, concurrentStreams, fileChunkSize, translogSize, translogOps, compress); nodeSettingsService.addListener(new ApplySettings()); } @@ -98,8 +111,25 @@ public class RecoverySettings extends AbstractComponent { return concurrentStreamPool; } + public RateLimiter rateLimiter() { + return rateLimiter; + } + class ApplySettings implements NodeSettingsService.Listener { @Override public void onRefreshSettings(Settings settings) { + ByteSizeValue maxSizePerSec = settings.getAsBytesSize("indices.recovery.max_size_per_sec", RecoverySettings.this.maxSizePerSec); + if (!Objects.equal(maxSizePerSec, RecoverySettings.this.maxSizePerSec)) { + logger.info("updating [indices.recovery.max_size_per_sec] from [{}] to [{}]", RecoverySettings.this.maxSizePerSec, maxSizePerSec); + RecoverySettings.this.maxSizePerSec = maxSizePerSec; + if (maxSizePerSec.bytes() <= 0) { + rateLimiter = null; + } else if (rateLimiter != null) { + rateLimiter.setMaxRate(maxSizePerSec.mbFrac()); + } else { + rateLimiter = new RateLimiter(maxSizePerSec.mbFrac()); + } + } + ByteSizeValue fileChunkSize = settings.getAsBytesSize("indices.recovery.file_chunk_size", RecoverySettings.this.fileChunkSize); if (!fileChunkSize.equals(RecoverySettings.this.fileChunkSize)) { logger.info("updating [indices.recovery.file_chunk_size] from [{}] to [{}]", RecoverySettings.this.fileChunkSize, fileChunkSize); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java index c3d912428b2..9b6a580d670 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java @@ -151,6 +151,11 @@ public class RecoverySource extends AbstractComponent { } int toRead = readCount + BUFFER_SIZE > len ? (int) (len - readCount) : BUFFER_SIZE; long position = indexInput.getFilePointer(); + + if (recoverySettings.rateLimiter() != null) { + recoverySettings.rateLimiter().pause(toRead); + } + indexInput.readBytes(buf, 0, toRead, false); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, md.checksum(), buf, toRead), TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); @@ -247,6 +252,11 @@ public class RecoverySource extends AbstractComponent { size += operation.estimateSize(); totalOperations++; if (ops >= recoverySettings.translogOps() || size >= recoverySettings.translogSize().bytes()) { + + if (recoverySettings.rateLimiter() != null) { + recoverySettings.rateLimiter().pause(size); + } + RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); ops = 0; diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 428cc7e3584..b26b24c82b7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -87,14 +87,17 @@ public class RecoveryTarget extends AbstractComponent { private final IndicesService indicesService; + private final RecoverySettings recoverySettings; + private final ConcurrentMap onGoingRecoveries = ConcurrentCollections.newConcurrentMap(); @Inject public RecoveryTarget(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService, - IndicesLifecycle indicesLifecycle) { + IndicesLifecycle indicesLifecycle, RecoverySettings recoverySettings) { super(settings); this.threadPool = threadPool; this.transportService = transportService; this.indicesService = indicesService; + this.recoverySettings = recoverySettings; transportService.registerHandler(Actions.FILES_INFO, new FilesInfoRequestHandler()); transportService.registerHandler(Actions.FILE_CHUNK, new FileChunkTransportRequestHandler()); @@ -516,6 +519,9 @@ public class RecoveryTarget extends AbstractComponent { } synchronized (indexOutput) { try { + if (recoverySettings.rateLimiter() != null) { + recoverySettings.rateLimiter().pause(request.contentLength()); + } indexOutput.writeBytes(request.content(), request.contentLength()); onGoingRecovery.currentFilesSize.addAndGet(request.contentLength()); if (indexOutput.getFilePointer() == request.length()) {