From 6566301e477734caa00389142eb24ac2b800d609 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Wed, 6 May 2009 21:26:09 +0000 Subject: [PATCH] HBASE-1370 re-enable LZO using hadoop-gpl-compression library git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@772432 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + .../hadoop/hbase/HColumnDescriptor.java | 10 +- .../hadoop/hbase/io/hfile/Compression.java | 106 +++++++++--------- 3 files changed, 65 insertions(+), 53 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index b96980765d9..0a2c1792efb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -203,6 +203,8 @@ Release 0.20.0 - Unreleased created without supplying a column list unlike the other APIs. (Tim Sell via Stack) HBASE-1341 HTable pooler + HBASE-1379 re-enable LZO using hadoop-gpl-compression library + (Ryan Rawson via Stack) Release 0.19.0 - 01/21/2009 INCOMPATIBLE CHANGES diff --git a/src/java/org/apache/hadoop/hbase/HColumnDescriptor.java b/src/java/org/apache/hadoop/hbase/HColumnDescriptor.java index 3ee1b5da4c5..cde9f295fd7 100644 --- a/src/java/org/apache/hadoop/hbase/HColumnDescriptor.java +++ b/src/java/org/apache/hadoop/hbase/HColumnDescriptor.java @@ -59,7 +59,8 @@ public class HColumnDescriptor implements ISerializable, WritableComparableLZO Compression + * for how to enable it. * @param type Compression type setting. */ public void setCompressionType(Compression.Algorithm type) { String compressionType; switch (type) { + case LZO: compressionType = "LZO"; break; case GZ: compressionType = "GZ"; break; default: compressionType = "NONE"; break; } @@ -676,4 +682,4 @@ public class HColumnDescriptor implements ISerializable, WritableComparable 0) { - codec.getConf().setInt("io.file.buffer.size", downStreamBufferSize); - } - CompressionInputStream cis = - codec.createInputStream(downStream, decompressor); - BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); - return bis2; - } - - @Override - public synchronized OutputStream createCompressionStream( - OutputStream downStream, Compressor compressor, - int downStreamBufferSize) throws IOException { - OutputStream bos1 = null; - if (downStreamBufferSize > 0) { - bos1 = new BufferedOutputStream(downStream, downStreamBufferSize); - } - else { - bos1 = downStream; - } - codec.getConf().setInt("io.file.buffer.size", 32 * 1024); - CompressionOutputStream cos = - codec.createOutputStream(bos1, compressor); - BufferedOutputStream bos2 = - new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), - DATA_OBUF_SIZE); - return bos2; - } }, NONE("none") { @Override - CompressionCodec getCodec() { + DefaultCodec getCodec() { return null; } @@ -179,15 +156,42 @@ public final class Compression { this.compressName = name; } - abstract CompressionCodec getCodec(); + abstract DefaultCodec getCodec(); - public abstract InputStream createDecompressionStream( + public InputStream createDecompressionStream( InputStream downStream, Decompressor decompressor, - int downStreamBufferSize) throws IOException; + int downStreamBufferSize) throws IOException { + DefaultCodec codec = getCodec(); + // Set the internal buffer size to read from down stream. + if (downStreamBufferSize > 0) { + codec.getConf().setInt("io.file.buffer.size", downStreamBufferSize); + } + CompressionInputStream cis = + codec.createInputStream(downStream, decompressor); + BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE); + return bis2; - public abstract OutputStream createCompressionStream( + } + + public OutputStream createCompressionStream( OutputStream downStream, Compressor compressor, int downStreamBufferSize) - throws IOException; + throws IOException { + DefaultCodec codec = getCodec(); + OutputStream bos1 = null; + if (downStreamBufferSize > 0) { + bos1 = new BufferedOutputStream(downStream, downStreamBufferSize); + } + else { + bos1 = downStream; + } + codec.getConf().setInt("io.file.buffer.size", 32 * 1024); + CompressionOutputStream cos = + codec.createOutputStream(bos1, compressor); + BufferedOutputStream bos2 = + new BufferedOutputStream(new FinishOnFlushCompressionStream(cos), + DATA_OBUF_SIZE); + return bos2; + } public Compressor getCompressor() { CompressionCodec codec = getCodec();