From 86bb9c580887aa7e0b63bdced7b7695b1c426737 Mon Sep 17 00:00:00 2001 From: cnauroth Date: Wed, 11 Mar 2015 23:27:49 -0700 Subject: [PATCH] HADOOP-10027. *Compressor_deflateBytesDirect passes instance instead of jclass to GetStaticObjectField. Contributed by Hui Zheng. (cherry picked from commit ff83ae72318fe6c0f266a7bf08138bd2fdb51cbd) Conflicts: hadoop-common-project/hadoop-common/CHANGES.txt --- .../hadoop-common/CHANGES.txt | 3 + .../io/compress/bzip2/Bzip2Compressor.java | 3 - .../io/compress/bzip2/Bzip2Decompressor.java | 3 - .../hadoop/io/compress/lz4/Lz4Compressor.java | 4 - .../io/compress/lz4/Lz4Decompressor.java | 4 - .../io/compress/snappy/SnappyCompressor.java | 4 - .../compress/snappy/SnappyDecompressor.java | 4 - .../io/compress/zlib/ZlibCompressor.java | 3 - .../io/compress/zlib/ZlibDecompressor.java | 5 +- .../io/compress/bzip2/Bzip2Compressor.c | 9 +- .../io/compress/bzip2/Bzip2Decompressor.c | 7 -- .../hadoop/io/compress/lz4/Lz4Compressor.c | 13 --- .../hadoop/io/compress/lz4/Lz4Decompressor.c | 8 -- .../io/compress/snappy/SnappyCompressor.c | 8 -- .../io/compress/snappy/SnappyDecompressor.c | 8 -- .../hadoop/io/compress/zlib/ZlibCompressor.c | 11 -- .../io/compress/zlib/ZlibDecompressor.c | 10 -- .../TestBzip2CompressorDecompressor.java | 104 ++++++++++++++++++ .../lz4/TestLz4CompressorDecompressor.java | 17 +++ .../TestSnappyCompressorDecompressor.java | 17 +++ .../zlib/TestZlibCompressorDecompressor.java | 17 +++ 21 files changed, 160 insertions(+), 102 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/TestBzip2CompressorDecompressor.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 4f3b404d07e..1a1e111db00 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -18,6 +18,9 @@ Release 2.8.0 - UNRELEASED HADOOP-11568. Description on usage of classpath in hadoop command is incomplete. ( Archana T via vinayakumarb ) + HADOOP-10027. *Compressor_deflateBytesDirect passes instance instead of + jclass to GetStaticObjectField. (Hui Zheng via cnauroth) + Release 2.7.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.java index 0f333bb36ce..a973dc93340 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.java @@ -44,9 +44,6 @@ public class Bzip2Compressor implements Compressor { private static final Log LOG = LogFactory.getLog(Bzip2Compressor.class); - // HACK - Use this as a global lock in the JNI layer. - private static Class clazz = Bzip2Compressor.class; - private long stream; private int blockSize; private int workFactor; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.java index 672090209db..3135165e879 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.java @@ -38,9 +38,6 @@ public class Bzip2Decompressor implements Decompressor { private static final Log LOG = LogFactory.getLog(Bzip2Decompressor.class); - // HACK - Use this as a global lock in the JNI layer. - private static Class clazz = Bzip2Decompressor.class; - private long stream; private boolean conserveMemory; private int directBufferSize; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Compressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Compressor.java index b5db99f92dc..ccfae8b3c36 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Compressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Compressor.java @@ -37,10 +37,6 @@ public class Lz4Compressor implements Compressor { LogFactory.getLog(Lz4Compressor.class.getName()); private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; - // HACK - Use this as a global lock in the JNI layer - @SuppressWarnings({"unchecked", "unused"}) - private static Class clazz = Lz4Compressor.class; - private int directBufferSize; private Buffer compressedDirectBuf = null; private int uncompressedDirectBufLen; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.java index 22a3118f5f9..685956cc1bf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.java @@ -36,10 +36,6 @@ public class Lz4Decompressor implements Decompressor { LogFactory.getLog(Lz4Compressor.class.getName()); private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; - // HACK - Use this as a global lock in the JNI layer - @SuppressWarnings({"unchecked", "unused"}) - private static Class clazz = Lz4Decompressor.class; - private int directBufferSize; private Buffer compressedDirectBuf = null; private int compressedDirectBufLen; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java index ab45f250585..814718d99ef 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyCompressor.java @@ -37,10 +37,6 @@ public class SnappyCompressor implements Compressor { LogFactory.getLog(SnappyCompressor.class.getName()); private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; - // HACK - Use this as a global lock in the JNI layer - @SuppressWarnings({"unchecked", "unused"}) - private static Class clazz = SnappyCompressor.class; - private int directBufferSize; private Buffer compressedDirectBuf = null; private int uncompressedDirectBufLen; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java index b5f5acf8664..dbffba811b1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.java @@ -37,10 +37,6 @@ public class SnappyDecompressor implements Decompressor { LogFactory.getLog(SnappyCompressor.class.getName()); private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; - // HACK - Use this as a global lock in the JNI layer - @SuppressWarnings({"unchecked", "unused"}) - private static Class clazz = SnappyDecompressor.class; - private int directBufferSize; private Buffer compressedDirectBuf = null; private int compressedDirectBufLen; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java index 6799403b160..b9550449f7f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibCompressor.java @@ -41,9 +41,6 @@ public class ZlibCompressor implements Compressor { private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024; - // HACK - Use this as a global lock in the JNI layer - private static Class clazz = ZlibCompressor.class; - private long stream; private CompressionLevel level; private CompressionStrategy strategy; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java index 89c879a03c8..d728fad817b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.java @@ -34,10 +34,7 @@ import org.apache.hadoop.util.NativeCodeLoader; */ public class ZlibDecompressor implements Decompressor { private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024; - - // HACK - Use this as a global lock in the JNI layer - private static Class clazz = ZlibDecompressor.class; - + private long stream; private CompressionHeader header; private int directBufferSize; diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.c index ef81bea2ce0..a92123efcd9 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Compressor.c @@ -25,7 +25,6 @@ #include "org_apache_hadoop_io_compress_bzip2.h" #include "org_apache_hadoop_io_compress_bzip2_Bzip2Compressor.h" -static jfieldID Bzip2Compressor_clazz; static jfieldID Bzip2Compressor_stream; static jfieldID Bzip2Compressor_uncompressedDirectBuf; static jfieldID Bzip2Compressor_uncompressedDirectBufOff; @@ -74,8 +73,6 @@ Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_initIDs( "BZ2_bzCompressEnd"); // Initialize the requisite fieldIds. - Bzip2Compressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz", - "Ljava/lang/Class;"); Bzip2Compressor_stream = (*env)->GetFieldID(env, class, "stream", "J"); Bzip2Compressor_finish = (*env)->GetFieldID(env, class, "finish", "Z"); Bzip2Compressor_finished = (*env)->GetFieldID(env, class, "finished", "Z"); @@ -155,9 +152,7 @@ Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_deflateBytesDirect( return (jint)0; } - jobject clazz = (*env)->GetStaticObjectField(env, this, - Bzip2Compressor_clazz); - jobject uncompressed_direct_buf = (*env)->GetObjectField(env, this, + jobject uncompressed_direct_buf = (*env)->GetObjectField(env, this, Bzip2Compressor_uncompressedDirectBuf); jint uncompressed_direct_buf_off = (*env)->GetIntField(env, this, Bzip2Compressor_uncompressedDirectBufOff); @@ -173,12 +168,10 @@ Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_deflateBytesDirect( Bzip2Compressor_finish); // Get the input and output direct buffers. - LOCK_CLASS(env, clazz, "Bzip2Compressor"); char* uncompressed_bytes = (*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); char* compressed_bytes = (*env)->GetDirectBufferAddress(env, compressed_direct_buf); - UNLOCK_CLASS(env, clazz, "Bzip2Compressor"); if (!uncompressed_bytes || !compressed_bytes) { return (jint)0; diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.c index ad9bcb72c6c..3d9fc084f78 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/bzip2/Bzip2Decompressor.c @@ -25,7 +25,6 @@ #include "org_apache_hadoop_io_compress_bzip2.h" #include "org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor.h" -static jfieldID Bzip2Decompressor_clazz; static jfieldID Bzip2Decompressor_stream; static jfieldID Bzip2Decompressor_compressedDirectBuf; static jfieldID Bzip2Decompressor_compressedDirectBufOff; @@ -73,8 +72,6 @@ Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_initIDs( "BZ2_bzDecompressEnd"); // Initialize the requisite fieldIds. - Bzip2Decompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz", - "Ljava/lang/Class;"); Bzip2Decompressor_stream = (*env)->GetFieldID(env, class, "stream", "J"); Bzip2Decompressor_finished = (*env)->GetFieldID(env, class, "finished", "Z"); @@ -144,8 +141,6 @@ Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_inflateBytesDirect( return (jint)0; } - jobject clazz = (*env)->GetStaticObjectField(env, this, - Bzip2Decompressor_clazz); jarray compressed_direct_buf = (jarray)(*env)->GetObjectField(env, this, Bzip2Decompressor_compressedDirectBuf); jint compressed_direct_buf_off = (*env)->GetIntField(env, this, @@ -159,12 +154,10 @@ Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_inflateBytesDirect( Bzip2Decompressor_directBufferSize); // Get the input and output direct buffers. - LOCK_CLASS(env, clazz, "Bzip2Decompressor"); char* compressed_bytes = (*env)->GetDirectBufferAddress(env, compressed_direct_buf); char* uncompressed_bytes = (*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); - UNLOCK_CLASS(env, clazz, "Bzip2Decompressor"); if (!compressed_bytes || !uncompressed_bytes) { return (jint)0; diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c index 58544f537bc..b8f8276fce6 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Compressor.c @@ -27,7 +27,6 @@ #include "lz4hc.h" -static jfieldID Lz4Compressor_clazz; static jfieldID Lz4Compressor_uncompressedDirectBuf; static jfieldID Lz4Compressor_uncompressedDirectBufLen; static jfieldID Lz4Compressor_compressedDirectBuf; @@ -37,8 +36,6 @@ static jfieldID Lz4Compressor_directBufferSize; JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_initIDs (JNIEnv *env, jclass clazz){ - Lz4Compressor_clazz = (*env)->GetStaticFieldID(env, clazz, "clazz", - "Ljava/lang/Class;"); Lz4Compressor_uncompressedDirectBuf = (*env)->GetFieldID(env, clazz, "uncompressedDirectBuf", "Ljava/nio/Buffer;"); @@ -57,25 +54,20 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_comp char *compressed_bytes; // Get members of Lz4Compressor - jobject clazz = (*env)->GetStaticObjectField(env, thisj, Lz4Compressor_clazz); jobject uncompressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_uncompressedDirectBuf); jint uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_uncompressedDirectBufLen); jobject compressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_compressedDirectBuf); jint compressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_directBufferSize); // Get the input direct buffer - LOCK_CLASS(env, clazz, "Lz4Compressor"); uncompressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); - UNLOCK_CLASS(env, clazz, "Lz4Compressor"); if (uncompressed_bytes == 0) { return (jint)0; } // Get the output direct buffer - LOCK_CLASS(env, clazz, "Lz4Compressor"); compressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, compressed_direct_buf); - UNLOCK_CLASS(env, clazz, "Lz4Compressor"); if (compressed_bytes == 0) { return (jint)0; @@ -104,25 +96,20 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_comp char* compressed_bytes = NULL; // Get members of Lz4Compressor - jobject clazz = (*env)->GetStaticObjectField(env, thisj, Lz4Compressor_clazz); jobject uncompressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_uncompressedDirectBuf); jint uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_uncompressedDirectBufLen); jobject compressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_compressedDirectBuf); jint compressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_directBufferSize); // Get the input direct buffer - LOCK_CLASS(env, clazz, "Lz4Compressor"); uncompressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); - UNLOCK_CLASS(env, clazz, "Lz4Compressor"); if (uncompressed_bytes == 0) { return (jint)0; } // Get the output direct buffer - LOCK_CLASS(env, clazz, "Lz4Compressor"); compressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, compressed_direct_buf); - UNLOCK_CLASS(env, clazz, "Lz4Compressor"); if (compressed_bytes == 0) { return (jint)0; diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c index 6570303d027..38f6b90d354 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/Lz4Decompressor.c @@ -25,7 +25,6 @@ #include "lz4.h" -static jfieldID Lz4Decompressor_clazz; static jfieldID Lz4Decompressor_compressedDirectBuf; static jfieldID Lz4Decompressor_compressedDirectBufLen; static jfieldID Lz4Decompressor_uncompressedDirectBuf; @@ -34,8 +33,6 @@ static jfieldID Lz4Decompressor_directBufferSize; JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Decompressor_initIDs (JNIEnv *env, jclass clazz){ - Lz4Decompressor_clazz = (*env)->GetStaticFieldID(env, clazz, "clazz", - "Ljava/lang/Class;"); Lz4Decompressor_compressedDirectBuf = (*env)->GetFieldID(env,clazz, "compressedDirectBuf", "Ljava/nio/Buffer;"); @@ -54,25 +51,20 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Decompressor_de char *uncompressed_bytes; // Get members of Lz4Decompressor - jobject clazz = (*env)->GetStaticObjectField(env,thisj, Lz4Decompressor_clazz); jobject compressed_direct_buf = (*env)->GetObjectField(env,thisj, Lz4Decompressor_compressedDirectBuf); jint compressed_direct_buf_len = (*env)->GetIntField(env,thisj, Lz4Decompressor_compressedDirectBufLen); jobject uncompressed_direct_buf = (*env)->GetObjectField(env,thisj, Lz4Decompressor_uncompressedDirectBuf); size_t uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Decompressor_directBufferSize); // Get the input direct buffer - LOCK_CLASS(env, clazz, "Lz4Decompressor"); compressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, compressed_direct_buf); - UNLOCK_CLASS(env, clazz, "Lz4Decompressor"); if (compressed_bytes == 0) { return (jint)0; } // Get the output direct buffer - LOCK_CLASS(env, clazz, "Lz4Decompressor"); uncompressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); - UNLOCK_CLASS(env, clazz, "Lz4Decompressor"); if (uncompressed_bytes == 0) { return (jint)0; diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c index 65c978b2065..d8800afcc12 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyCompressor.c @@ -38,7 +38,6 @@ #define JINT_MAX 0x7fffffff -static jfieldID SnappyCompressor_clazz; static jfieldID SnappyCompressor_uncompressedDirectBuf; static jfieldID SnappyCompressor_uncompressedDirectBufLen; static jfieldID SnappyCompressor_compressedDirectBuf; @@ -84,8 +83,6 @@ JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyCompresso LOAD_DYNAMIC_SYMBOL(__dlsym_snappy_compress, dlsym_snappy_compress, env, libsnappy, "snappy_compress"); #endif - SnappyCompressor_clazz = (*env)->GetStaticFieldID(env, clazz, "clazz", - "Ljava/lang/Class;"); SnappyCompressor_uncompressedDirectBuf = (*env)->GetFieldID(env, clazz, "uncompressedDirectBuf", "Ljava/nio/Buffer;"); @@ -104,7 +101,6 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyCompresso char* compressed_bytes; snappy_status ret; // Get members of SnappyCompressor - jobject clazz = (*env)->GetStaticObjectField(env, thisj, SnappyCompressor_clazz); jobject uncompressed_direct_buf = (*env)->GetObjectField(env, thisj, SnappyCompressor_uncompressedDirectBuf); jint uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, SnappyCompressor_uncompressedDirectBufLen); jobject compressed_direct_buf = (*env)->GetObjectField(env, thisj, SnappyCompressor_compressedDirectBuf); @@ -112,18 +108,14 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyCompresso size_t buf_len; // Get the input direct buffer - LOCK_CLASS(env, clazz, "SnappyCompressor"); uncompressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); - UNLOCK_CLASS(env, clazz, "SnappyCompressor"); if (uncompressed_bytes == 0) { return (jint)0; } // Get the output direct buffer - LOCK_CLASS(env, clazz, "SnappyCompressor"); compressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, compressed_direct_buf); - UNLOCK_CLASS(env, clazz, "SnappyCompressor"); if (compressed_bytes == 0) { return (jint)0; diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c index 022f2b04591..0f2b7c0dfdd 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/snappy/SnappyDecompressor.c @@ -31,7 +31,6 @@ #include "org_apache_hadoop_io_compress_snappy_SnappyDecompressor.h" -static jfieldID SnappyDecompressor_clazz; static jfieldID SnappyDecompressor_compressedDirectBuf; static jfieldID SnappyDecompressor_compressedDirectBufLen; static jfieldID SnappyDecompressor_uncompressedDirectBuf; @@ -79,8 +78,6 @@ JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyDecompres LOAD_DYNAMIC_SYMBOL(__dlsym_snappy_uncompress, dlsym_snappy_uncompress, env, libsnappy, "snappy_uncompress"); #endif - SnappyDecompressor_clazz = (*env)->GetStaticFieldID(env, clazz, "clazz", - "Ljava/lang/Class;"); SnappyDecompressor_compressedDirectBuf = (*env)->GetFieldID(env,clazz, "compressedDirectBuf", "Ljava/nio/Buffer;"); @@ -99,25 +96,20 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyDecompres char* uncompressed_bytes = NULL; snappy_status ret; // Get members of SnappyDecompressor - jobject clazz = (*env)->GetStaticObjectField(env,thisj, SnappyDecompressor_clazz); jobject compressed_direct_buf = (*env)->GetObjectField(env,thisj, SnappyDecompressor_compressedDirectBuf); jint compressed_direct_buf_len = (*env)->GetIntField(env,thisj, SnappyDecompressor_compressedDirectBufLen); jobject uncompressed_direct_buf = (*env)->GetObjectField(env,thisj, SnappyDecompressor_uncompressedDirectBuf); size_t uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, SnappyDecompressor_directBufferSize); // Get the input direct buffer - LOCK_CLASS(env, clazz, "SnappyDecompressor"); compressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, compressed_direct_buf); - UNLOCK_CLASS(env, clazz, "SnappyDecompressor"); if (compressed_bytes == 0) { return (jint)0; } // Get the output direct buffer - LOCK_CLASS(env, clazz, "SnappyDecompressor"); uncompressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); - UNLOCK_CLASS(env, clazz, "SnappyDecompressor"); if (uncompressed_bytes == 0) { return (jint)0; diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c index f7c0cb9db99..51f7bed9bd7 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zlib/ZlibCompressor.c @@ -28,7 +28,6 @@ #include "org_apache_hadoop_io_compress_zlib.h" #include "org_apache_hadoop_io_compress_zlib_ZlibCompressor.h" -static jfieldID ZlibCompressor_clazz; static jfieldID ZlibCompressor_stream; static jfieldID ZlibCompressor_uncompressedDirectBuf; static jfieldID ZlibCompressor_uncompressedDirectBufOff; @@ -141,8 +140,6 @@ Java_org_apache_hadoop_io_compress_zlib_ZlibCompressor_initIDs( #endif // Initialize the requisite fieldIds - ZlibCompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz", - "Ljava/lang/Class;"); ZlibCompressor_stream = (*env)->GetFieldID(env, class, "stream", "J"); ZlibCompressor_finish = (*env)->GetFieldID(env, class, "finish", "Z"); ZlibCompressor_finished = (*env)->GetFieldID(env, class, "finished", "Z"); @@ -239,7 +236,6 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_zlib_ZlibCompressor_deflateBytesDirect( JNIEnv *env, jobject this ) { - jobject clazz = NULL; jobject uncompressed_direct_buf = NULL; jint uncompressed_direct_buf_off = 0; jint uncompressed_direct_buf_len = 0; @@ -260,9 +256,6 @@ Java_org_apache_hadoop_io_compress_zlib_ZlibCompressor_deflateBytesDirect( return (jint)0; } - // Get members of ZlibCompressor - clazz = (*env)->GetStaticObjectField(env, this, - ZlibCompressor_clazz); uncompressed_direct_buf = (*env)->GetObjectField(env, this, ZlibCompressor_uncompressedDirectBuf); uncompressed_direct_buf_off = (*env)->GetIntField(env, this, @@ -278,20 +271,16 @@ Java_org_apache_hadoop_io_compress_zlib_ZlibCompressor_deflateBytesDirect( finish = (*env)->GetBooleanField(env, this, ZlibCompressor_finish); // Get the input direct buffer - LOCK_CLASS(env, clazz, "ZlibCompressor"); uncompressed_bytes = (*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); - UNLOCK_CLASS(env, clazz, "ZlibCompressor"); if (uncompressed_bytes == 0) { return (jint)0; } // Get the output direct buffer - LOCK_CLASS(env, clazz, "ZlibCompressor"); compressed_bytes = (*env)->GetDirectBufferAddress(env, compressed_direct_buf); - UNLOCK_CLASS(env, clazz, "ZlibCompressor"); if (compressed_bytes == 0) { return (jint)0; diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c index 8b78f41e1a1..b9f23b181b0 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zlib/ZlibDecompressor.c @@ -28,7 +28,6 @@ #include "org_apache_hadoop_io_compress_zlib.h" #include "org_apache_hadoop_io_compress_zlib_ZlibDecompressor.h" -static jfieldID ZlibDecompressor_clazz; static jfieldID ZlibDecompressor_stream; static jfieldID ZlibDecompressor_compressedDirectBuf; static jfieldID ZlibDecompressor_compressedDirectBufOff; @@ -104,8 +103,6 @@ JNIEnv *env, jclass class // Initialize the requisite fieldIds - ZlibDecompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz", - "Ljava/lang/Class;"); ZlibDecompressor_stream = (*env)->GetFieldID(env, class, "stream", "J"); ZlibDecompressor_needDict = (*env)->GetFieldID(env, class, "needDict", "Z"); ZlibDecompressor_finished = (*env)->GetFieldID(env, class, "finished", "Z"); @@ -197,7 +194,6 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_inflateBytesDirect( JNIEnv *env, jobject this ) { - jobject clazz = NULL; jarray compressed_direct_buf = NULL; jint compressed_direct_buf_off = 0; jint compressed_direct_buf_len = 0; @@ -218,8 +214,6 @@ Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_inflateBytesDirect( } // Get members of ZlibDecompressor - clazz = (*env)->GetStaticObjectField(env, this, - ZlibDecompressor_clazz); compressed_direct_buf = (jarray)(*env)->GetObjectField(env, this, ZlibDecompressor_compressedDirectBuf); compressed_direct_buf_off = (*env)->GetIntField(env, this, @@ -233,20 +227,16 @@ Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_inflateBytesDirect( ZlibDecompressor_directBufferSize); // Get the input direct buffer - LOCK_CLASS(env, clazz, "ZlibDecompressor"); compressed_bytes = (*env)->GetDirectBufferAddress(env, compressed_direct_buf); - UNLOCK_CLASS(env, clazz, "ZlibDecompressor"); if (!compressed_bytes) { return (jint)0; } // Get the output direct buffer - LOCK_CLASS(env, clazz, "ZlibDecompressor"); uncompressed_bytes = (*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); - UNLOCK_CLASS(env, clazz, "ZlibDecompressor"); if (!uncompressed_bytes) { return (jint)0; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/TestBzip2CompressorDecompressor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/TestBzip2CompressorDecompressor.java new file mode 100644 index 00000000000..c585a463e46 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/bzip2/TestBzip2CompressorDecompressor.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io.compress.bzip2; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataInputBuffer; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.compress.*; +import org.apache.hadoop.io.compress.bzip2.Bzip2Compressor; +import org.apache.hadoop.io.compress.bzip2.Bzip2Decompressor; +import org.apache.hadoop.test.MultithreadedTestUtil; +import org.junit.Before; +import org.junit.Test; + +import java.io.*; +import java.util.Random; + +import static org.junit.Assert.*; +import static org.junit.Assume.*; +import static org.junit.Assume.assumeTrue; + +public class TestBzip2CompressorDecompressor { + + private static final Random rnd = new Random(12345l); + + @Before + public void before() { + assumeTrue(Bzip2Factory.isNativeBzip2Loaded(new Configuration())); + } + + // test compress/decompress process + @Test + public void testCompressDecompress() { + byte[] rawData = null; + int rawDataSize = 0; + rawDataSize = 1024 * 64; + rawData = generate(rawDataSize); + try { + Bzip2Compressor compressor = new Bzip2Compressor(); + Bzip2Decompressor decompressor = new Bzip2Decompressor(); + assertFalse("testBzip2CompressDecompress finished error", + compressor.finished()); + compressor.setInput(rawData, 0, rawData.length); + assertTrue("testBzip2CompressDecompress getBytesRead before error", + compressor.getBytesRead() == 0); + compressor.finish(); + + byte[] compressedResult = new byte[rawDataSize]; + int cSize = compressor.compress(compressedResult, 0, rawDataSize); + assertTrue("testBzip2CompressDecompress getBytesRead after error", + compressor.getBytesRead() == rawDataSize); + assertTrue( + "testBzip2CompressDecompress compressed size no less than original size", + cSize < rawDataSize); + decompressor.setInput(compressedResult, 0, cSize); + byte[] decompressedBytes = new byte[rawDataSize]; + decompressor.decompress(decompressedBytes, 0, decompressedBytes.length); + assertArrayEquals("testBzip2CompressDecompress arrays not equals ", + rawData, decompressedBytes); + compressor.reset(); + decompressor.reset(); + } catch (IOException ex) { + fail("testBzip2CompressDecompress ex !!!" + ex); + } + } + + public static byte[] generate(int size) { + byte[] array = new byte[size]; + for (int i = 0; i < size; i++) + array[i] = (byte)rnd.nextInt(16); + return array; + } + + @Test + public void testBzip2CompressDecompressInMultiThreads() throws Exception { + MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(); + for(int i=0;i<10;i++) { + ctx.addThread( new MultithreadedTestUtil.TestingThread(ctx) { + @Override + public void doWork() throws Exception { + testCompressDecompress(); + } + }); + } + ctx.startThreads(); + + ctx.waitFor(60000); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/lz4/TestLz4CompressorDecompressor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/lz4/TestLz4CompressorDecompressor.java index e8555b23887..6f3b076097a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/lz4/TestLz4CompressorDecompressor.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/lz4/TestLz4CompressorDecompressor.java @@ -36,6 +36,7 @@ import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.Lz4Codec; import org.apache.hadoop.io.compress.lz4.Lz4Compressor; import org.apache.hadoop.io.compress.lz4.Lz4Decompressor; +import org.apache.hadoop.test.MultithreadedTestUtil; import org.junit.Before; import org.junit.Test; import static org.junit.Assume.*; @@ -313,4 +314,20 @@ public class TestLz4CompressorDecompressor { array[i] = (byte)rnd.nextInt(16); return array; } + + @Test + public void testLz4CompressDecompressInMultiThreads() throws Exception { + MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(); + for(int i=0;i<10;i++) { + ctx.addThread( new MultithreadedTestUtil.TestingThread(ctx) { + @Override + public void doWork() throws Exception { + testCompressDecompress(); + } + }); + } + ctx.startThreads(); + + ctx.waitFor(60000); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/snappy/TestSnappyCompressorDecompressor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/snappy/TestSnappyCompressorDecompressor.java index 77fbcc099ac..cc986c7e0ae 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/snappy/TestSnappyCompressorDecompressor.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/snappy/TestSnappyCompressorDecompressor.java @@ -40,6 +40,7 @@ import org.apache.hadoop.io.compress.CompressionInputStream; import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.SnappyCodec; import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor; +import org.apache.hadoop.test.MultithreadedTestUtil; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -391,4 +392,20 @@ public class TestSnappyCompressorDecompressor { return array; } } + + @Test + public void testSnappyCompressDecompressInMultiThreads() throws Exception { + MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(); + for(int i=0;i<10;i++) { + ctx.addThread( new MultithreadedTestUtil.TestingThread(ctx) { + @Override + public void doWork() throws Exception { + testSnappyCompressDecompress(); + } + }); + } + ctx.startThreads(); + + ctx.waitFor(60000); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java index db5784c07aa..e7511251b64 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/zlib/TestZlibCompressorDecompressor.java @@ -38,6 +38,7 @@ import org.apache.hadoop.io.compress.CompressDecompressTester.CompressionTestStr import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel; import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy; import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor; +import org.apache.hadoop.test.MultithreadedTestUtil; import org.apache.hadoop.util.NativeCodeLoader; import org.junit.Before; import org.junit.Test; @@ -419,4 +420,20 @@ public class TestZlibCompressorDecompressor { data[i] = (byte)random.nextInt(16); return data; } + + @Test + public void testZlibCompressDecompressInMultiThreads() throws Exception { + MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext(); + for(int i=0;i<10;i++) { + ctx.addThread( new MultithreadedTestUtil.TestingThread(ctx) { + @Override + public void doWork() throws Exception { + testZlibCompressDecompress(); + } + }); + } + ctx.startThreads(); + + ctx.waitFor(60000); + } }