diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java index 45fe59a5920..ab9fe251ca7 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/shard/recovery/RecoverySource.java @@ -76,6 +76,8 @@ public class RecoverySource extends AbstractComponent { private final boolean compress; + private final int translogBatchSize; + @Inject public RecoverySource(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService, RecoveryThrottler recoveryThrottler) { super(settings); @@ -85,6 +87,7 @@ public class RecoverySource extends AbstractComponent { this.recoveryThrottler = recoveryThrottler; this.fileChunkSize = componentSettings.getAsBytesSize("file_chunk_size", new ByteSizeValue(100, ByteSizeUnit.KB)); + this.translogBatchSize = componentSettings.getAsInt("translog_batch_size", 100); this.compress = componentSettings.getAsBoolean("compress", true); transportService.registerHandler(Actions.START_RECOVERY, new StartRecoveryTransportRequestHandler()); @@ -253,7 +256,6 @@ public class RecoverySource extends AbstractComponent { } private int sendSnapshot(Translog.Snapshot snapshot) throws ElasticSearchException { - int translogBatchSize = 10; // TODO make this configurable int counter = 0; int totalOperations = 0; List operations = Lists.newArrayList(); @@ -265,7 +267,7 @@ public class RecoverySource extends AbstractComponent { totalOperations++; if (++counter == translogBatchSize) { RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, VoidTransportResponseHandler.INSTANCE).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet(); counter = 0; operations = Lists.newArrayList(); } @@ -273,7 +275,7 @@ public class RecoverySource extends AbstractComponent { // send the leftover if (!operations.isEmpty()) { RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(request.shardId(), operations); - transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, VoidTransportResponseHandler.INSTANCE).txGet(); + transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest, TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet(); } return totalOperations; }