allow to configure translog batch size (internal) and compress translog operations when doing peer recvoery

This commit is contained in:
kimchy 2010-08-16 09:03:48 +03:00
parent 1bdce4c7ef
commit a012e0f1bd
1 changed files with 5 additions and 3 deletions

View File

@ -76,6 +76,8 @@ public class RecoverySource extends AbstractComponent {
private final boolean compress; private final boolean compress;
private final int translogBatchSize;
@Inject public RecoverySource(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService, @Inject public RecoverySource(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService,
RecoveryThrottler recoveryThrottler) { RecoveryThrottler recoveryThrottler) {
super(settings); super(settings);
@ -85,6 +87,7 @@ public class RecoverySource extends AbstractComponent {
this.recoveryThrottler = recoveryThrottler; this.recoveryThrottler = recoveryThrottler;
this.fileChunkSize = componentSettings.getAsBytesSize("file_chunk_size", new ByteSizeValue(100, ByteSizeUnit.KB)); this.fileChunkSize = componentSettings.getAsBytesSize("file_chunk_size", new ByteSizeValue(100, ByteSizeUnit.KB));
this.translogBatchSize = componentSettings.getAsInt("translog_batch_size", 100);
this.compress = componentSettings.getAsBoolean("compress", true); this.compress = componentSettings.getAsBoolean("compress", true);
transportService.registerHandler(Actions.START_RECOVERY, new StartRecoveryTransportRequestHandler()); transportService.registerHandler(Actions.START_RECOVERY, new StartRecoveryTransportRequestHandler());
@ -253,7 +256,6 @@ public class RecoverySource extends AbstractComponent {
} }
private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticSearchException { private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticSearchException {
int translogBatchSize = 10; // TODO make this configurable
int counter = 0; int counter = 0;
int totalOperations = 0; int totalOperations = 0;
List<Translog.Operation> operations = Lists.newArrayList(); List<Translog.Operation> operations = Lists.newArrayList();
@ -265,7 +267,7 @@ public class RecoverySource extends AbstractComponent {
totalOperations++; totalOperations++;
if (++counter == translogBatchSize) { if (++counter == translogBatchSize) {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, VoidTransportResponseHandler.INSTANCE).txGet(); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet();
counter = 0; counter = 0;
operations = Lists.newArrayList(); operations = Lists.newArrayList();
} }
@ -273,7 +275,7 @@ public class RecoverySource extends AbstractComponent {
// send the leftover // send the leftover
if (!operations.isEmpty()) { if (!operations.isEmpty()) {
RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, VoidTransportResponseHandler.INSTANCE).txGet(); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet();
} }
return totalOperations; return totalOperations;
} }