HBASE-1370 re-enable LZO using hadoop-gpl-compression library
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@772432 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2597f74d10
commit
6566301e47
|
@ -203,6 +203,8 @@ Release 0.20.0 - Unreleased
|
||||||
created without supplying a column list unlike the other APIs.
|
created without supplying a column list unlike the other APIs.
|
||||||
(Tim Sell via Stack)
|
(Tim Sell via Stack)
|
||||||
HBASE-1341 HTable pooler
|
HBASE-1341 HTable pooler
|
||||||
|
HBASE-1379 re-enable LZO using hadoop-gpl-compression library
|
||||||
|
(Ryan Rawson via Stack)
|
||||||
|
|
||||||
Release 0.19.0 - 01/21/2009
|
Release 0.19.0 - 01/21/2009
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -59,7 +59,8 @@ public class HColumnDescriptor implements ISerializable, WritableComparable<HCol
|
||||||
/**
|
/**
|
||||||
* The type of compression.
|
* The type of compression.
|
||||||
* @see org.apache.hadoop.io.SequenceFile.Writer
|
* @see org.apache.hadoop.io.SequenceFile.Writer
|
||||||
* @deprecated
|
* @deprecated Compression now means which compression library
|
||||||
|
* rather than 'what' to cmopress. See {@link Compression.Algorithm}
|
||||||
*/
|
*/
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static enum CompressionType {
|
public static enum CompressionType {
|
||||||
|
@ -426,11 +427,16 @@ public class HColumnDescriptor implements ISerializable, WritableComparable<HCol
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Compression types supported in hbase.
|
||||||
|
* LZO is not bundled as part of the hbase distribution.
|
||||||
|
* See <a href="http://wiki.apache.org/hadoop/UsingLzoCompression">LZO Compression</a>
|
||||||
|
* for how to enable it.
|
||||||
* @param type Compression type setting.
|
* @param type Compression type setting.
|
||||||
*/
|
*/
|
||||||
public void setCompressionType(Compression.Algorithm type) {
|
public void setCompressionType(Compression.Algorithm type) {
|
||||||
String compressionType;
|
String compressionType;
|
||||||
switch (type) {
|
switch (type) {
|
||||||
|
case LZO: compressionType = "LZO"; break;
|
||||||
case GZ: compressionType = "GZ"; break;
|
case GZ: compressionType = "GZ"; break;
|
||||||
default: compressionType = "NONE"; break;
|
default: compressionType = "NONE"; break;
|
||||||
}
|
}
|
||||||
|
@ -676,4 +682,4 @@ public class HColumnDescriptor implements ISerializable, WritableComparable<HCol
|
||||||
public void restSerialize(IRestSerializer serializer) throws HBaseRestException {
|
public void restSerialize(IRestSerializer serializer) throws HBaseRestException {
|
||||||
serializer.serializeColumnDescriptor(this);
|
serializer.serializeColumnDescriptor(this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.io.compress.CompressionOutputStream;
|
||||||
import org.apache.hadoop.io.compress.Compressor;
|
import org.apache.hadoop.io.compress.Compressor;
|
||||||
import org.apache.hadoop.io.compress.Decompressor;
|
import org.apache.hadoop.io.compress.Decompressor;
|
||||||
import org.apache.hadoop.io.compress.GzipCodec;
|
import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
|
import org.apache.hadoop.io.compress.DefaultCodec;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compression related stuff.
|
* Compression related stuff.
|
||||||
|
@ -72,25 +73,35 @@ public final class Compression {
|
||||||
*/
|
*/
|
||||||
public static enum Algorithm {
|
public static enum Algorithm {
|
||||||
LZO("lzo") {
|
LZO("lzo") {
|
||||||
|
// Use base type to avoid compile-time dependencies.
|
||||||
|
private DefaultCodec lzoCodec;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
CompressionCodec getCodec() {
|
DefaultCodec getCodec() {
|
||||||
throw new UnsupportedOperationException("LZO compression is disabled for now");
|
if (lzoCodec == null) {
|
||||||
}
|
Configuration conf = new Configuration();
|
||||||
@Override
|
conf.setBoolean("hadoop.native.lib", true);
|
||||||
public InputStream createDecompressionStream(InputStream downStream, Decompressor decompressor, int downStreamBufferSize) throws IOException {
|
try {
|
||||||
throw new UnsupportedOperationException("LZO compression is disabled for now");
|
Class externalCodec =
|
||||||
}
|
ClassLoader.getSystemClassLoader().loadClass("com.hadoop.compression.lzo.LzoCodec");
|
||||||
@Override
|
lzoCodec = (DefaultCodec) externalCodec.newInstance();
|
||||||
public OutputStream createCompressionStream(OutputStream downStream, Compressor compressor, int downStreamBufferSize) throws IOException {
|
lzoCodec.setConf(conf);
|
||||||
throw new UnsupportedOperationException("LZO compression is disabled for now");
|
} catch (ClassNotFoundException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
} catch (InstantiationException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
} catch (IllegalAccessException e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return lzoCodec;
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
GZ("gz") {
|
GZ("gz") {
|
||||||
private GzipCodec codec;
|
private GzipCodec codec;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
CompressionCodec getCodec() {
|
DefaultCodec getCodec() {
|
||||||
if (codec == null) {
|
if (codec == null) {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
conf.setBoolean("hadoop.native.lib", true);
|
conf.setBoolean("hadoop.native.lib", true);
|
||||||
|
@ -100,45 +111,11 @@ public final class Compression {
|
||||||
|
|
||||||
return codec;
|
return codec;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized InputStream createDecompressionStream(
|
|
||||||
InputStream downStream, Decompressor decompressor,
|
|
||||||
int downStreamBufferSize) throws IOException {
|
|
||||||
// Set the internal buffer size to read from down stream.
|
|
||||||
if (downStreamBufferSize > 0) {
|
|
||||||
codec.getConf().setInt("io.file.buffer.size", downStreamBufferSize);
|
|
||||||
}
|
|
||||||
CompressionInputStream cis =
|
|
||||||
codec.createInputStream(downStream, decompressor);
|
|
||||||
BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
|
|
||||||
return bis2;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public synchronized OutputStream createCompressionStream(
|
|
||||||
OutputStream downStream, Compressor compressor,
|
|
||||||
int downStreamBufferSize) throws IOException {
|
|
||||||
OutputStream bos1 = null;
|
|
||||||
if (downStreamBufferSize > 0) {
|
|
||||||
bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
bos1 = downStream;
|
|
||||||
}
|
|
||||||
codec.getConf().setInt("io.file.buffer.size", 32 * 1024);
|
|
||||||
CompressionOutputStream cos =
|
|
||||||
codec.createOutputStream(bos1, compressor);
|
|
||||||
BufferedOutputStream bos2 =
|
|
||||||
new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
|
|
||||||
DATA_OBUF_SIZE);
|
|
||||||
return bos2;
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
|
|
||||||
NONE("none") {
|
NONE("none") {
|
||||||
@Override
|
@Override
|
||||||
CompressionCodec getCodec() {
|
DefaultCodec getCodec() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -179,15 +156,42 @@ public final class Compression {
|
||||||
this.compressName = name;
|
this.compressName = name;
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract CompressionCodec getCodec();
|
abstract DefaultCodec getCodec();
|
||||||
|
|
||||||
public abstract InputStream createDecompressionStream(
|
public InputStream createDecompressionStream(
|
||||||
InputStream downStream, Decompressor decompressor,
|
InputStream downStream, Decompressor decompressor,
|
||||||
int downStreamBufferSize) throws IOException;
|
int downStreamBufferSize) throws IOException {
|
||||||
|
DefaultCodec codec = getCodec();
|
||||||
|
// Set the internal buffer size to read from down stream.
|
||||||
|
if (downStreamBufferSize > 0) {
|
||||||
|
codec.getConf().setInt("io.file.buffer.size", downStreamBufferSize);
|
||||||
|
}
|
||||||
|
CompressionInputStream cis =
|
||||||
|
codec.createInputStream(downStream, decompressor);
|
||||||
|
BufferedInputStream bis2 = new BufferedInputStream(cis, DATA_IBUF_SIZE);
|
||||||
|
return bis2;
|
||||||
|
|
||||||
public abstract OutputStream createCompressionStream(
|
}
|
||||||
|
|
||||||
|
public OutputStream createCompressionStream(
|
||||||
OutputStream downStream, Compressor compressor, int downStreamBufferSize)
|
OutputStream downStream, Compressor compressor, int downStreamBufferSize)
|
||||||
throws IOException;
|
throws IOException {
|
||||||
|
DefaultCodec codec = getCodec();
|
||||||
|
OutputStream bos1 = null;
|
||||||
|
if (downStreamBufferSize > 0) {
|
||||||
|
bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
bos1 = downStream;
|
||||||
|
}
|
||||||
|
codec.getConf().setInt("io.file.buffer.size", 32 * 1024);
|
||||||
|
CompressionOutputStream cos =
|
||||||
|
codec.createOutputStream(bos1, compressor);
|
||||||
|
BufferedOutputStream bos2 =
|
||||||
|
new BufferedOutputStream(new FinishOnFlushCompressionStream(cos),
|
||||||
|
DATA_OBUF_SIZE);
|
||||||
|
return bos2;
|
||||||
|
}
|
||||||
|
|
||||||
public Compressor getCompressor() {
|
public Compressor getCompressor() {
|
||||||
CompressionCodec codec = getCodec();
|
CompressionCodec codec = getCodec();
|
||||||
|
|
Loading…
Reference in New Issue