allow to explicitly disable compression in peer recovery (defaults to true)

This commit is contained in:
kimchy 2010-08-15 15:13:45 +03:00
parent f6c58ff35d
commit ee33ee457a
4 changed files with 8 additions and 5 deletions

View File

@ -74,6 +74,8 @@ public class RecoverySource extends AbstractComponent {
private final ByteSizeValue fileChunkSize; private final ByteSizeValue fileChunkSize;
private final boolean compress;
@Inject public RecoverySource(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService, @Inject public RecoverySource(Settings settings, ThreadPool threadPool, TransportService transportService, IndicesService indicesService,
RecoveryThrottler recoveryThrottler) { RecoveryThrottler recoveryThrottler) {
super(settings); super(settings);
@ -83,6 +85,7 @@ public class RecoverySource extends AbstractComponent {
this.recoveryThrottler = recoveryThrottler; this.recoveryThrottler = recoveryThrottler;
this.fileChunkSize = componentSettings.getAsBytesSize("file_chunk_size", new ByteSizeValue(100, ByteSizeUnit.KB)); this.fileChunkSize = componentSettings.getAsBytesSize("file_chunk_size", new ByteSizeValue(100, ByteSizeUnit.KB));
this.compress = componentSettings.getAsBoolean("compress", true);
transportService.registerHandler(Actions.START_RECOVERY, new StartRecoveryTransportRequestHandler()); transportService.registerHandler(Actions.START_RECOVERY, new StartRecoveryTransportRequestHandler());
} }
@ -165,7 +168,7 @@ public class RecoverySource extends AbstractComponent {
long position = indexInput.getFilePointer(); long position = indexInput.getFilePointer();
indexInput.readBytes(buf, 0, toRead, false); indexInput.readBytes(buf, 0, toRead, false);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, buf, toRead), transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, buf, toRead),
TransportRequestOptions.options().withCompress(), VoidTransportResponseHandler.INSTANCE).txGet(); TransportRequestOptions.options().withCompress(compress), VoidTransportResponseHandler.INSTANCE).txGet();
readCount += toRead; readCount += toRead;
} }
indexInput.close(); indexInput.close();

View File

@ -45,8 +45,8 @@ public class TransportRequestOptions {
return this; return this;
} }
public TransportRequestOptions withCompress() { public TransportRequestOptions withCompress(boolean compress) {
this.compress = true; this.compress = compress;
return this; return this;
} }

View File

@ -385,7 +385,7 @@ public class NettyTransport extends AbstractLifecycleComponent<Transport> implem
Channel targetChannel = nodeChannel(node); Channel targetChannel = nodeChannel(node);
if (compress) { if (compress) {
options.withCompress(); options.withCompress(true);
} }
byte[] data = TransportStreams.buildRequest(requestId, action, message, options); byte[] data = TransportStreams.buildRequest(requestId, action, message, options);

View File

@ -136,7 +136,7 @@ public abstract class AbstractSimpleTransportTests {
}); });
TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHello", TransportFuture<StringMessage> res = serviceB.submitRequest(serviceANode, "sayHello",
new StringMessage("moshe"), TransportRequestOptions.options().withCompress(), new BaseTransportResponseHandler<StringMessage>() { new StringMessage("moshe"), TransportRequestOptions.options().withCompress(true), new BaseTransportResponseHandler<StringMessage>() {
@Override public StringMessage newInstance() { @Override public StringMessage newInstance() {
return new StringMessage(); return new StringMessage();
} }