HBASE-7145 ReusableStreamGzipCodec NPE upon reset with IBM JDK (Renata and Ted Yu)
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1424090 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
593fe025b5
commit
c4957a21d2
|
@ -25,6 +25,7 @@ import java.util.zip.GZIPOutputStream;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.util.JVM;
|
||||||
import org.apache.hadoop.io.compress.CompressionOutputStream;
|
import org.apache.hadoop.io.compress.CompressionOutputStream;
|
||||||
import org.apache.hadoop.io.compress.CompressorStream;
|
import org.apache.hadoop.io.compress.CompressorStream;
|
||||||
import org.apache.hadoop.io.compress.GzipCodec;
|
import org.apache.hadoop.io.compress.GzipCodec;
|
||||||
|
@ -77,6 +78,10 @@ public class ReusableStreamGzipCodec extends GzipCodec {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class ResetableGZIPOutputStream extends GZIPOutputStream {
|
private static class ResetableGZIPOutputStream extends GZIPOutputStream {
|
||||||
|
|
||||||
|
private static final int TRAILER_SIZE = 8;
|
||||||
|
private static final boolean HAS_BROKEN_FINISH = JVM.isGZIPOutputStreamFinishBroken();
|
||||||
|
|
||||||
public ResetableGZIPOutputStream(OutputStream out) throws IOException {
|
public ResetableGZIPOutputStream(OutputStream out) throws IOException {
|
||||||
super(out);
|
super(out);
|
||||||
}
|
}
|
||||||
|
@ -86,6 +91,59 @@ public class ReusableStreamGzipCodec extends GzipCodec {
|
||||||
crc.reset();
|
crc.reset();
|
||||||
out.write(GZIP_HEADER);
|
out.write(GZIP_HEADER);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Override because certain implementation calls def.end() which
|
||||||
|
* causes problem when resetting the stream for reuse.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void finish() throws IOException {
|
||||||
|
if (HAS_BROKEN_FINISH) {
|
||||||
|
if (!def.finished()) {
|
||||||
|
def.finish();
|
||||||
|
while (!def.finished()) {
|
||||||
|
int i = def.deflate(this.buf, 0, this.buf.length);
|
||||||
|
if ((def.finished()) && (i <= this.buf.length - TRAILER_SIZE)) {
|
||||||
|
writeTrailer(this.buf, i);
|
||||||
|
i += TRAILER_SIZE;
|
||||||
|
out.write(this.buf, 0, i);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (i > 0) {
|
||||||
|
out.write(this.buf, 0, i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] arrayOfByte = new byte[TRAILER_SIZE];
|
||||||
|
writeTrailer(arrayOfByte, 0);
|
||||||
|
out.write(arrayOfByte);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
super.finish();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** re-implement because the relative method in jdk is invisible */
|
||||||
|
private void writeTrailer(byte[] paramArrayOfByte, int paramInt)
|
||||||
|
throws IOException {
|
||||||
|
writeInt((int)this.crc.getValue(), paramArrayOfByte, paramInt);
|
||||||
|
writeInt(this.def.getTotalIn(), paramArrayOfByte, paramInt + 4);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** re-implement because the relative method in jdk is invisible */
|
||||||
|
private void writeInt(int paramInt1, byte[] paramArrayOfByte, int paramInt2)
|
||||||
|
throws IOException {
|
||||||
|
writeShort(paramInt1 & 0xFFFF, paramArrayOfByte, paramInt2);
|
||||||
|
writeShort(paramInt1 >> 16 & 0xFFFF, paramArrayOfByte, paramInt2 + 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** re-implement because the relative method in jdk is invisible */
|
||||||
|
private void writeShort(int paramInt1, byte[] paramArrayOfByte, int paramInt2)
|
||||||
|
throws IOException {
|
||||||
|
paramArrayOfByte[paramInt2] = (byte)(paramInt1 & 0xFF);
|
||||||
|
paramArrayOfByte[(paramInt2 + 1)] = (byte)(paramInt1 >> 8 & 0xFF);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public ReusableGzipOutputStream(OutputStream out) throws IOException {
|
public ReusableGzipOutputStream(OutputStream out) throws IOException {
|
||||||
|
|
|
@ -57,6 +57,7 @@ public class JVM
|
||||||
System.getProperty("os.name").startsWith("Windows");
|
System.getProperty("os.name").startsWith("Windows");
|
||||||
private static final boolean linux =
|
private static final boolean linux =
|
||||||
System.getProperty("os.name").startsWith("Linux");
|
System.getProperty("os.name").startsWith("Linux");
|
||||||
|
private static final String JVMVersion = System.getProperty("java.version");
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor. Get the running Operating System instance
|
* Constructor. Get the running Operating System instance
|
||||||
|
@ -70,13 +71,22 @@ public class JVM
|
||||||
*
|
*
|
||||||
* @return whether this is unix or not.
|
* @return whether this is unix or not.
|
||||||
*/
|
*/
|
||||||
public boolean isUnix() {
|
public static boolean isUnix() {
|
||||||
if (windows) {
|
if (windows) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return (ibmvendor ? linux : true);
|
return (ibmvendor ? linux : true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the finish() method of GZIPOutputStream is broken
|
||||||
|
*
|
||||||
|
* @return whether GZIPOutputStream.finish() is broken.
|
||||||
|
*/
|
||||||
|
public static boolean isGZIPOutputStreamFinishBroken() {
|
||||||
|
return ibmvendor && JVMVersion.contains("1.6.0");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Load the implementation of UnixOperatingSystemMXBean for Oracle jvm
|
* Load the implementation of UnixOperatingSystemMXBean for Oracle jvm
|
||||||
* and runs the desired method.
|
* and runs the desired method.
|
||||||
|
@ -84,7 +94,6 @@ public class JVM
|
||||||
* @return the method result
|
* @return the method result
|
||||||
*/
|
*/
|
||||||
private Long runUnixMXBeanMethod (String mBeanMethodName) {
|
private Long runUnixMXBeanMethod (String mBeanMethodName) {
|
||||||
|
|
||||||
Object unixos;
|
Object unixos;
|
||||||
Class<?> classRef;
|
Class<?> classRef;
|
||||||
Method mBeanMethod;
|
Method mBeanMethod;
|
||||||
|
|
|
@ -89,11 +89,9 @@ public class ResourceCheckerJUnitListener extends RunListener {
|
||||||
static class OpenFileDescriptorResourceAnalyzer extends ResourceChecker.ResourceAnalyzer {
|
static class OpenFileDescriptorResourceAnalyzer extends ResourceChecker.ResourceAnalyzer {
|
||||||
@Override
|
@Override
|
||||||
public int getVal(Phase phase) {
|
public int getVal(Phase phase) {
|
||||||
|
if (JVM.isUnix() == false) return 0;
|
||||||
JVM jvm = new JVM();
|
JVM jvm = new JVM();
|
||||||
if (jvm != null && jvm.isUnix() == true)
|
|
||||||
return (int)jvm.getOpenFileDescriptorCount();
|
return (int)jvm.getOpenFileDescriptorCount();
|
||||||
else
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -105,11 +103,9 @@ public class ResourceCheckerJUnitListener extends RunListener {
|
||||||
static class MaxFileDescriptorResourceAnalyzer extends ResourceChecker.ResourceAnalyzer {
|
static class MaxFileDescriptorResourceAnalyzer extends ResourceChecker.ResourceAnalyzer {
|
||||||
@Override
|
@Override
|
||||||
public int getVal(Phase phase) {
|
public int getVal(Phase phase) {
|
||||||
|
if (JVM.isUnix() == false) return 0;
|
||||||
JVM jvm = new JVM();
|
JVM jvm = new JVM();
|
||||||
if (jvm != null && jvm.isUnix() == true)
|
|
||||||
return (int)jvm.getMaxFileDescriptorCount();
|
return (int)jvm.getMaxFileDescriptorCount();
|
||||||
else
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue