From 1c3c737524ed96b3a3ecedc2e7d78f87f38c6d30 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Mon, 2 Jul 2012 21:41:06 +0200 Subject: [PATCH] don't double compress already compressed files when doing recovery --- .../elasticsearch/common/compress/CompressorFactory.java | 4 ++++ .../elasticsearch/indices/recovery/RecoverySource.java | 8 +++++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java b/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java index 15bd262f150..a4cf752cea7 100644 --- a/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java +++ b/src/main/java/org/elasticsearch/common/compress/CompressorFactory.java @@ -61,6 +61,10 @@ public class CompressorFactory { return compressor(data, offset, length) != null; } + public static boolean isCompressed(IndexInput in) throws IOException { + return compressor(in) != null; + } + @Nullable public static Compressor compressor(BytesHolder bytes) { return compressor(bytes.bytes(), bytes.offset(), bytes.length()); diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java index 2090e59aa1a..3192f9c5e28 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySource.java @@ -26,6 +26,7 @@ import org.elasticsearch.ElasticSearchException; import org.elasticsearch.common.BytesHolder; import org.elasticsearch.common.StopWatch; import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.compress.CompressorFactory; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; @@ -135,6 +136,11 @@ public class RecoverySource extends AbstractComponent { byte[] buf = new byte[BUFFER_SIZE]; StoreFileMetaData md = shard.store().metaData(name); indexInput = shard.store().openInputRaw(name); + boolean shouldCompressRequest = recoverySettings.compress(); + if (CompressorFactory.isCompressed(indexInput)) { + shouldCompressRequest = false; + } + long len = indexInput.length(); long readCount = 0; while (readCount < len) { @@ -151,7 +157,7 @@ public class RecoverySource extends AbstractComponent { indexInput.readBytes(buf, 0, toRead, false); BytesHolder content = new BytesHolder(buf, 0, toRead); transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.FILE_CHUNK, new RecoveryFileChunkRequest(request.shardId(), name, position, len, md.checksum(), content), - TransportRequestOptions.options().withCompress(recoverySettings.compress()).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); + TransportRequestOptions.options().withCompress(shouldCompressRequest).withLowType(), VoidTransportResponseHandler.INSTANCE_SAME).txGet(); readCount += toRead; } indexInput.close();