Recover small files (< 1mb) using a separate threadpool than large files.

Fixes #3576
This commit is contained in:
Lee Hinman 2013-08-27 09:00:56 -06:00
parent 1ab037d4d0
commit 9d5868904b
2 changed files with 30 additions and 1 deletions

View File

@ -42,8 +42,11 @@ public class RecoverySettings extends AbstractComponent {
public static final String INDICES_RECOVERY_TRANSLOG_SIZE = "indices.recovery.translog_size";
public static final String INDICES_RECOVERY_COMPRESS = "indices.recovery.compress";
public static final String INDICES_RECOVERY_CONCURRENT_STREAMS = "indices.recovery.concurrent_streams";
public static final String INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS = "indices.recovery.concurrent_small_file_streams";
public static final String INDICES_RECOVERY_MAX_BYTES_PER_SEC = "indices.recovery.max_bytes_per_sec";
public static final long SMALL_FILE_CUTOFF_BYTES = ByteSizeValue.parseBytesSizeValue("5mb").bytes();
/**
* Use {@link #INDICES_RECOVERY_MAX_BYTES_PER_SEC} instead
*/
@ -57,7 +60,9 @@ public class RecoverySettings extends AbstractComponent {
private volatile ByteSizeValue translogSize;
private volatile int concurrentStreams;
private volatile int concurrentSmallFileStreams;
private final ThreadPoolExecutor concurrentStreamPool;
private final ThreadPoolExecutor concurrentSmallFileStreamPool;
private volatile ByteSizeValue maxBytesPerSec;
private volatile SimpleRateLimiter rateLimiter;
@ -73,6 +78,8 @@ public class RecoverySettings extends AbstractComponent {
this.concurrentStreams = componentSettings.getAsInt("concurrent_streams", settings.getAsInt("index.shard.recovery.concurrent_streams", 3));
this.concurrentStreamPool = EsExecutors.newScaling(0, concurrentStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[recovery_stream]"));
this.concurrentSmallFileStreams = componentSettings.getAsInt("concurrent_small_file_streams", settings.getAsInt("index.shard.recovery.concurrent_small_file_streams", 2));
this.concurrentSmallFileStreamPool = EsExecutors.newScaling(0, concurrentSmallFileStreams, 60, TimeUnit.SECONDS, EsExecutors.daemonThreadFactory(settings, "[small_file_recovery_stream]"));
this.maxBytesPerSec = componentSettings.getAsBytesSize("max_bytes_per_sec", componentSettings.getAsBytesSize("max_size_per_sec", new ByteSizeValue(20, ByteSizeUnit.MB)));
if (maxBytesPerSec.bytes() <= 0) {
@ -121,6 +128,10 @@ public class RecoverySettings extends AbstractComponent {
return concurrentStreamPool;
}
public ThreadPoolExecutor concurrentSmallFileStreamPool() {
return concurrentSmallFileStreamPool;
}
public RateLimiter rateLimiter() {
return rateLimiter;
}
@ -171,6 +182,13 @@ public class RecoverySettings extends AbstractComponent {
RecoverySettings.this.concurrentStreams = concurrentStreams;
RecoverySettings.this.concurrentStreamPool.setMaximumPoolSize(concurrentStreams);
}
int concurrentSmallFileStreams = settings.getAsInt(INDICES_RECOVERY_CONCURRENT_SMALL_FILE_STREAMS, RecoverySettings.this.concurrentSmallFileStreams);
if (concurrentSmallFileStreams != RecoverySettings.this.concurrentSmallFileStreams) {
logger.info("updating [indices.recovery.concurrent_small_file_streams] from [{}] to [{}]", RecoverySettings.this.concurrentSmallFileStreams, concurrentSmallFileStreams);
RecoverySettings.this.concurrentSmallFileStreams = concurrentSmallFileStreams;
RecoverySettings.this.concurrentSmallFileStreamPool.setMaximumPoolSize(concurrentSmallFileStreams);
}
}
}
}

View File

@ -51,6 +51,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicReference;
/**
@ -158,8 +159,17 @@ public class RecoverySource extends AbstractComponent {
final CountDownLatch latch = new CountDownLatch(response.phase1FileNames.size());
final AtomicReference<Exception> lastException = new AtomicReference<Exception>();
int fileIndex = 0;
for (final String name : response.phase1FileNames) {
recoverySettings.concurrentStreamPool().execute(new Runnable() {
ThreadPoolExecutor pool;
long fileSize = response.phase1FileSizes.get(fileIndex);
if (fileSize > recoverySettings.SMALL_FILE_CUTOFF_BYTES) {
pool = recoverySettings.concurrentStreamPool();
} else {
pool = recoverySettings.concurrentSmallFileStreamPool();
}
pool.execute(new Runnable() {
@Override
public void run() {
IndexInput indexInput = null;
@ -207,6 +217,7 @@ public class RecoverySource extends AbstractComponent {
}
}
});
fileIndex++;
}
latch.await();