HBASE-3709 HFile compression not sharing configuration; work around accidental use of CDH-ism
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1086511 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
df3a53e764
commit
1685e9b536
|
@ -25,6 +25,7 @@ import java.io.OutputStream;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configurable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.compress.CodecPool;
|
import org.apache.hadoop.io.compress.CodecPool;
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
|
@ -80,12 +81,13 @@ public final class Compression {
|
||||||
private transient CompressionCodec lzoCodec;
|
private transient CompressionCodec lzoCodec;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
CompressionCodec getCodec() {
|
CompressionCodec getCodec(Configuration conf) {
|
||||||
if (lzoCodec == null) {
|
if (lzoCodec == null) {
|
||||||
try {
|
try {
|
||||||
Class<?> externalCodec =
|
Class<?> externalCodec =
|
||||||
ClassLoader.getSystemClassLoader().loadClass("com.hadoop.compression.lzo.LzoCodec");
|
ClassLoader.getSystemClassLoader().loadClass("com.hadoop.compression.lzo.LzoCodec");
|
||||||
lzoCodec = (CompressionCodec) ReflectionUtils.newInstance(externalCodec, getConf());
|
lzoCodec = (CompressionCodec) ReflectionUtils.newInstance(externalCodec,
|
||||||
|
new Configuration(conf));
|
||||||
} catch (ClassNotFoundException e) {
|
} catch (ClassNotFoundException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
@ -97,10 +99,10 @@ public final class Compression {
|
||||||
private transient GzipCodec codec;
|
private transient GzipCodec codec;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
DefaultCodec getCodec() {
|
DefaultCodec getCodec(Configuration conf) {
|
||||||
if (codec == null) {
|
if (codec == null) {
|
||||||
codec = new GzipCodec();
|
codec = new GzipCodec();
|
||||||
codec.setConf(getConf());
|
codec.setConf(new Configuration(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
return codec;
|
return codec;
|
||||||
|
@ -109,7 +111,7 @@ public final class Compression {
|
||||||
|
|
||||||
NONE("none") {
|
NONE("none") {
|
||||||
@Override
|
@Override
|
||||||
DefaultCodec getCodec() {
|
DefaultCodec getCodec(Configuration conf) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -142,9 +144,9 @@ public final class Compression {
|
||||||
|
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private final String compressName;
|
private final String compressName;
|
||||||
// data input buffer size to absorb small reads from application.
|
// data input buffer size to absorb small reads from application.
|
||||||
private static final int DATA_IBUF_SIZE = 1 * 1024;
|
private static final int DATA_IBUF_SIZE = 1 * 1024;
|
||||||
// data output buffer size to absorb small writes from application.
|
// data output buffer size to absorb small writes from application.
|
||||||
private static final int DATA_OBUF_SIZE = 4 * 1024;
|
private static final int DATA_OBUF_SIZE = 4 * 1024;
|
||||||
|
|
||||||
Algorithm(String name) {
|
Algorithm(String name) {
|
||||||
|
@ -153,19 +155,16 @@ public final class Compression {
|
||||||
this.compressName = name;
|
this.compressName = name;
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract CompressionCodec getCodec();
|
abstract CompressionCodec getCodec(Configuration conf);
|
||||||
|
|
||||||
public Configuration getConf() {
|
|
||||||
return conf;
|
|
||||||
}
|
|
||||||
|
|
||||||
public InputStream createDecompressionStream(
|
public InputStream createDecompressionStream(
|
||||||
InputStream downStream, Decompressor decompressor,
|
InputStream downStream, Decompressor decompressor,
|
||||||
int downStreamBufferSize) throws IOException {
|
int downStreamBufferSize) throws IOException {
|
||||||
CompressionCodec codec = getCodec();
|
CompressionCodec codec = getCodec(conf);
|
||||||
// Set the internal buffer size to read from down stream.
|
// Set the internal buffer size to read from down stream.
|
||||||
if (downStreamBufferSize > 0) {
|
if (downStreamBufferSize > 0) {
|
||||||
getConf().setInt("io.file.buffer.size", downStreamBufferSize);
|
((Configurable)codec).getConf().setInt("io.file.buffer.size",
|
||||||
|
downStreamBufferSize);
|
||||||
}
|
}
|
||||||
CompressionInputStream cis =
|
CompressionInputStream cis =
|
||||||
codec.createInputStream(downStream, decompressor);
|
codec.createInputStream(downStream, decompressor);
|
||||||
|
@ -177,7 +176,7 @@ public final class Compression {
|
||||||
public OutputStream createCompressionStream(
|
public OutputStream createCompressionStream(
|
||||||
OutputStream downStream, Compressor compressor, int downStreamBufferSize)
|
OutputStream downStream, Compressor compressor, int downStreamBufferSize)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
CompressionCodec codec = getCodec();
|
CompressionCodec codec = getCodec(conf);
|
||||||
OutputStream bos1 = null;
|
OutputStream bos1 = null;
|
||||||
if (downStreamBufferSize > 0) {
|
if (downStreamBufferSize > 0) {
|
||||||
bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
|
bos1 = new BufferedOutputStream(downStream, downStreamBufferSize);
|
||||||
|
@ -185,7 +184,7 @@ public final class Compression {
|
||||||
else {
|
else {
|
||||||
bos1 = downStream;
|
bos1 = downStream;
|
||||||
}
|
}
|
||||||
getConf().setInt("io.file.buffer.size", 32 * 1024);
|
((Configurable)codec).getConf().setInt("io.file.buffer.size", 32 * 1024);
|
||||||
CompressionOutputStream cos =
|
CompressionOutputStream cos =
|
||||||
codec.createOutputStream(bos1, compressor);
|
codec.createOutputStream(bos1, compressor);
|
||||||
BufferedOutputStream bos2 =
|
BufferedOutputStream bos2 =
|
||||||
|
@ -195,9 +194,9 @@ public final class Compression {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Compressor getCompressor() {
|
public Compressor getCompressor() {
|
||||||
CompressionCodec codec = getCodec();
|
CompressionCodec codec = getCodec(conf);
|
||||||
if (codec != null) {
|
if (codec != null) {
|
||||||
Compressor compressor = CodecPool.getCompressor(codec, getConf());
|
Compressor compressor = CodecPool.getCompressor(codec);
|
||||||
if (compressor != null) {
|
if (compressor != null) {
|
||||||
if (compressor.finished()) {
|
if (compressor.finished()) {
|
||||||
// Somebody returns the compressor to CodecPool but is still using
|
// Somebody returns the compressor to CodecPool but is still using
|
||||||
|
@ -221,7 +220,7 @@ public final class Compression {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Decompressor getDecompressor() {
|
public Decompressor getDecompressor() {
|
||||||
CompressionCodec codec = getCodec();
|
CompressionCodec codec = getCodec(conf);
|
||||||
if (codec != null) {
|
if (codec != null) {
|
||||||
Decompressor decompressor = CodecPool.getDecompressor(codec);
|
Decompressor decompressor = CodecPool.getDecompressor(codec);
|
||||||
if (decompressor != null) {
|
if (decompressor != null) {
|
||||||
|
|
Loading…
Reference in New Issue