Recovery: Allow to control `concurrent_streams` (per node) during recovery

This commit is contained in:
kimchy 2010-12-13 00:03:04 +02:00
parent 1c5477d4ed
commit ceb7b90740
2 changed files with 21 additions and 2 deletions

View File

@ -29,6 +29,9 @@ import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.DynamicExecutors;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.deletionpolicy.SnapshotIndexCommit;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
@ -45,6 +48,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
/**
@ -72,12 +76,17 @@ public class RecoverySource extends AbstractComponent {
private final int translogBatchSize;
private final ExecutorService concurrentStreamPool;
@Inject public RecoverySource(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService) {
super(settings);
this.threadPool = threadPool;
this.transportService = transportService;
this.indicesService = indicesService;
int concurrentStreams = componentSettings.getAsInt("concurrent_streams", 5);
this.concurrentStreamPool = DynamicExecutors.newScalingThreadPool(1, concurrentStreams, TimeValue.timeValueSeconds(5).millis(), EsExecutors.daemonThreadFactory(settings, "[recovery_stream]"));
this.fileChunkSize = componentSettings.getAsBytesSize("file_chunk_size", new ByteSizeValue(500, ByteSizeUnit.KB));
this.translogBatchSize = componentSettings.getAsInt("translog_batch_size", 100);
this.compress = componentSettings.getAsBoolean("compress", true);
@ -87,6 +96,10 @@ public class RecoverySource extends AbstractComponent {
transportService.registerHandler(Actions.START_RECOVERY, new StartRecoveryTransportRequestHandler());
}
public void close() {
concurrentStreamPool.shutdown();
}
private RecoveryResponse recover(final StartRecoveryRequest request) {
final InternalIndexShard shard = (InternalIndexShard) indicesService.indexServiceSafe(request.shardId().index().name()).shardSafe(request.shardId().id());
logger.trace("[{}][{}] starting recovery to {}, mark_as_relocated {}", request.shardId().index().name(), request.shardId().id(), request.targetNode(), request.markAsRelocated());
@ -136,7 +149,7 @@ public class RecoverySource extends AbstractComponent {
final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size());
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
for (final String name : response.phase1FileNames) {
threadPool.cached().execute(new Runnable() {
concurrentStreamPool.execute(new Runnable() {
@Override public void run() {
IndexInput indexInput = null;
try {

View File

@ -48,6 +48,7 @@ import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.service.IndexService;
import org.elasticsearch.index.shard.IndexShardState;
import org.elasticsearch.index.shard.recovery.RecoveryFailedException;
import org.elasticsearch.index.shard.recovery.RecoverySource;
import org.elasticsearch.index.shard.recovery.RecoveryTarget;
import org.elasticsearch.index.shard.recovery.StartRecoveryRequest;
import org.elasticsearch.index.shard.service.IndexShard;
@ -72,6 +73,8 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private final ThreadPool threadPool;
private final RecoverySource recoverySource;
private final RecoveryTarget recoveryTarget;
private final ShardStateAction shardStateAction;
@ -88,13 +91,15 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
private final Object mutex = new Object();
@Inject public IndicesClusterStateService(Settings settings, IndicesService indicesService, ClusterService clusterService,
ThreadPool threadPool, RecoveryTarget recoveryTarget, ShardStateAction shardStateAction,
ThreadPool threadPool, RecoveryTarget recoveryTarget, RecoverySource recoverySource,
ShardStateAction shardStateAction,
NodeIndexCreatedAction nodeIndexCreatedAction, NodeIndexDeletedAction nodeIndexDeletedAction,
NodeMappingCreatedAction nodeMappingCreatedAction) {
super(settings);
this.indicesService = indicesService;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.recoverySource = recoverySource;
this.recoveryTarget = recoveryTarget;
this.shardStateAction = shardStateAction;
this.nodeIndexCreatedAction = nodeIndexCreatedAction;
@ -111,6 +116,7 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent<Indic
}
@Override protected void doClose() throws ElasticSearchException {
recoverySource.close();
}
@Override public void clusterChanged(final ClusterChangedEvent event) {