don't double compress already compressed files when doing recovery

This commit is contained in:
Shay Banon 2012-07-02 21:41:06 +02:00
parent 1668533556
commit 1c3c737524
2 changed files with 11 additions and 1 deletions

View File

@ -61,6 +61,10 @@ public class CompressorFactory {
return compressor(data, offset, length) != null;
}
public static boolean isCompressed(IndexInput in) throws IOException {
return compressor(in) != null;
}
@Nullable
public static Compressor compressor(BytesHolder bytes) {
return compressor(bytes.bytes(), bytes.offset(), bytes.length());

View File

@ -26,6 +26,7 @@ import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.common.BytesHolder;
import org.elasticsearch.common.StopWatch;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
@ -135,6 +136,11 @@ public class RecoverySource extends AbstractComponent {
byte[] buf = new byte[BUFFER_SIZE];
StoreFileMetaData md = shard.store().metaData(name);
indexInput = shard.store().openInputRaw(name);
boolean shouldCompressRequest = recoverySettings.compress();
if (CompressorFactory.isCompressed(indexInput)) {
shouldCompressRequest = false;
}
long len = indexInput.length();
long readCount = 0;
while (readCount < len) {
@ -151,7 +157,7 @@ public class RecoverySource extends AbstractComponent {
indexInput.readBytes(buf, 0, toRead, false);
BytesHolder content = new BytesHolder(buf, 0, toRead);
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, md.checksum(), content),
TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
TransportRequestOptions.options().withCompress(shouldCompressRequest).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet();
readCount += toRead;
}
indexInput.close();