diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java index fe04dd35d59..14e79584b18 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java @@ -29,6 +29,9 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.DynamicExecutors; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IllegalIndexShardStateException; @@ -45,6 +48,7 @@ import java.io.IOException; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; /** @@ -72,12 +76,17 @@ public class RecoverySource extends AbstractComponent { private final int translogBatchSize; + private final ExecutorService concurrentStreamPool; + @Inject public RecoverySource(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService) { super(settings); this.threadPool = threadPool; this.transportService = transportService; this.indicesService = indicesService; + int concurrentStreams = componentSettings.getAsInt("concurrent_streams", 5); + this.concurrentStreamPool = DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[recovery_stream]")); + this.fileChunkSize = componentSettings.getAsBytesSize("file_chunk_size", new ByteSizeValue(500, ByteSizeUnit.KB)); this.translogBatchSize = componentSettings.getAsInt("translog_batch_size", 100); this.compress = componentSettings.getAsBoolean("compress", true); @@ -87,6 +96,10 @@ public class RecoverySource extends AbstractComponent { transportService.registerHandler(Actions.START_RECOVERY, new StartRecoveryTransportRequestHandler()); } + public void close() { + concurrentStreamPool.shutdown(); + } + private RecoveryResponse recover(final StartRecoveryRequest request) { final InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id()); logger.trace("[{}][{}] starting recovery to {}, mark_as_relocated {}", request.shardId().index().name(), request.shardId().id(), request.targetNode(), request.markAsRelocated()); @@ -136,7 +149,7 @@ public class RecoverySource extends AbstractComponent { final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size()); final AtomicReference lastException = new AtomicReference(); for (final String name : response.phase1FileNames) { - threadPool.cached().execute(new Runnable() { + concurrentStreamPool.execute(new Runnable() { @Override public void run() { IndexInput indexInput = null; try { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index fae8e4cfcd8..f7ea8141796 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -48,6 +48,7 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.service.IndexService; import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.recovery.RecoveryFailedException; +import org.elasticsearch.index.shard.recovery.RecoverySource; import org.elasticsearch.index.shard.recovery.RecoveryTarget; import org.elasticsearch.index.shard.recovery.StartRecoveryRequest; import org.elasticsearch.index.shard.service.IndexShard; @@ -72,6 +73,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent