HADOOP-10027. *Compressor_deflateBytesDirect passes instance instead of jclass to GetStaticObjectField. Contributed by Hui Zheng.

This commit is contained in:
cnauroth 2015-03-11 23:27:49 -07:00
parent 85f6d67fa7
commit ff83ae7231
21 changed files with 160 additions and 102 deletions

View File

@ -450,6 +450,9 @@ Release 2.8.0 - UNRELEASED
BUG FIXES BUG FIXES
HADOOP-10027. *Compressor_deflateBytesDirect passes instance instead of
jclass to GetStaticObjectField. (Hui Zheng via cnauroth)
Release 2.7.0 - UNRELEASED Release 2.7.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -44,9 +44,6 @@ public class Bzip2Compressor implements Compressor {
private static final Log LOG = LogFactory.getLog(Bzip2Compressor.class); private static final Log LOG = LogFactory.getLog(Bzip2Compressor.class);
// HACK - Use this as a global lock in the JNI layer.
private static Class<Bzip2Compressor> clazz = Bzip2Compressor.class;
private long stream; private long stream;
private int blockSize; private int blockSize;
private int workFactor; private int workFactor;

View File

@ -38,9 +38,6 @@ public class Bzip2Decompressor implements Decompressor {
private static final Log LOG = LogFactory.getLog(Bzip2Decompressor.class); private static final Log LOG = LogFactory.getLog(Bzip2Decompressor.class);
// HACK - Use this as a global lock in the JNI layer.
private static Class<Bzip2Decompressor> clazz = Bzip2Decompressor.class;
private long stream; private long stream;
private boolean conserveMemory; private boolean conserveMemory;
private int directBufferSize; private int directBufferSize;

View File

@ -37,10 +37,6 @@ public class Lz4Compressor implements Compressor {
LogFactory.getLog(Lz4Compressor.class.getName()); LogFactory.getLog(Lz4Compressor.class.getName());
private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024;
// HACK - Use this as a global lock in the JNI layer
@SuppressWarnings({"unchecked", "unused"})
private static Class clazz = Lz4Compressor.class;
private int directBufferSize; private int directBufferSize;
private Buffer compressedDirectBuf = null; private Buffer compressedDirectBuf = null;
private int uncompressedDirectBufLen; private int uncompressedDirectBufLen;

View File

@ -36,10 +36,6 @@ public class Lz4Decompressor implements Decompressor {
LogFactory.getLog(Lz4Compressor.class.getName()); LogFactory.getLog(Lz4Compressor.class.getName());
private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024;
// HACK - Use this as a global lock in the JNI layer
@SuppressWarnings({"unchecked", "unused"})
private static Class clazz = Lz4Decompressor.class;
private int directBufferSize; private int directBufferSize;
private Buffer compressedDirectBuf = null; private Buffer compressedDirectBuf = null;
private int compressedDirectBufLen; private int compressedDirectBufLen;

View File

@ -37,10 +37,6 @@ public class SnappyCompressor implements Compressor {
LogFactory.getLog(SnappyCompressor.class.getName()); LogFactory.getLog(SnappyCompressor.class.getName());
private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024;
// HACK - Use this as a global lock in the JNI layer
@SuppressWarnings({"unchecked", "unused"})
private static Class clazz = SnappyCompressor.class;
private int directBufferSize; private int directBufferSize;
private Buffer compressedDirectBuf = null; private Buffer compressedDirectBuf = null;
private int uncompressedDirectBufLen; private int uncompressedDirectBufLen;

View File

@ -37,10 +37,6 @@ public class SnappyDecompressor implements Decompressor {
LogFactory.getLog(SnappyCompressor.class.getName()); LogFactory.getLog(SnappyCompressor.class.getName());
private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024;
// HACK - Use this as a global lock in the JNI layer
@SuppressWarnings({"unchecked", "unused"})
private static Class clazz = SnappyDecompressor.class;
private int directBufferSize; private int directBufferSize;
private Buffer compressedDirectBuf = null; private Buffer compressedDirectBuf = null;
private int compressedDirectBufLen; private int compressedDirectBufLen;

View File

@ -41,9 +41,6 @@ public class ZlibCompressor implements Compressor {
private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024; private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
// HACK - Use this as a global lock in the JNI layer
private static Class clazz = ZlibCompressor.class;
private long stream; private long stream;
private CompressionLevel level; private CompressionLevel level;
private CompressionStrategy strategy; private CompressionStrategy strategy;

View File

@ -35,9 +35,6 @@ import org.apache.hadoop.util.NativeCodeLoader;
public class ZlibDecompressor implements Decompressor { public class ZlibDecompressor implements Decompressor {
private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024; private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64*1024;
// HACK - Use this as a global lock in the JNI layer
private static Class clazz = ZlibDecompressor.class;
private long stream; private long stream;
private CompressionHeader header; private CompressionHeader header;
private int directBufferSize; private int directBufferSize;

View File

@ -25,7 +25,6 @@
#include "org_apache_hadoop_io_compress_bzip2.h" #include "org_apache_hadoop_io_compress_bzip2.h"
#include "org_apache_hadoop_io_compress_bzip2_Bzip2Compressor.h" #include "org_apache_hadoop_io_compress_bzip2_Bzip2Compressor.h"
static jfieldID Bzip2Compressor_clazz;
static jfieldID Bzip2Compressor_stream; static jfieldID Bzip2Compressor_stream;
static jfieldID Bzip2Compressor_uncompressedDirectBuf; static jfieldID Bzip2Compressor_uncompressedDirectBuf;
static jfieldID Bzip2Compressor_uncompressedDirectBufOff; static jfieldID Bzip2Compressor_uncompressedDirectBufOff;
@ -74,8 +73,6 @@ Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_initIDs(
"BZ2_bzCompressEnd"); "BZ2_bzCompressEnd");
// Initialize the requisite fieldIds. // Initialize the requisite fieldIds.
Bzip2Compressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz",
"Ljava/lang/Class;");
Bzip2Compressor_stream = (*env)->GetFieldID(env, class, "stream", "J"); Bzip2Compressor_stream = (*env)->GetFieldID(env, class, "stream", "J");
Bzip2Compressor_finish = (*env)->GetFieldID(env, class, "finish", "Z"); Bzip2Compressor_finish = (*env)->GetFieldID(env, class, "finish", "Z");
Bzip2Compressor_finished = (*env)->GetFieldID(env, class, "finished", "Z"); Bzip2Compressor_finished = (*env)->GetFieldID(env, class, "finished", "Z");
@ -155,8 +152,6 @@ Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_deflateBytesDirect(
return (jint)0; return (jint)0;
} }
jobject clazz = (*env)->GetStaticObjectField(env, this,
Bzip2Compressor_clazz);
jobject uncompressed_direct_buf = (*env)->GetObjectField(env, this, jobject uncompressed_direct_buf = (*env)->GetObjectField(env, this,
Bzip2Compressor_uncompressedDirectBuf); Bzip2Compressor_uncompressedDirectBuf);
jint uncompressed_direct_buf_off = (*env)->GetIntField(env, this, jint uncompressed_direct_buf_off = (*env)->GetIntField(env, this,
@ -173,12 +168,10 @@ Java_org_apache_hadoop_io_compress_bzip2_Bzip2Compressor_deflateBytesDirect(
Bzip2Compressor_finish); Bzip2Compressor_finish);
// Get the input and output direct buffers. // Get the input and output direct buffers.
LOCK_CLASS(env, clazz, "Bzip2Compressor");
char* uncompressed_bytes = (*env)->GetDirectBufferAddress(env, char* uncompressed_bytes = (*env)->GetDirectBufferAddress(env,
uncompressed_direct_buf); uncompressed_direct_buf);
char* compressed_bytes = (*env)->GetDirectBufferAddress(env, char* compressed_bytes = (*env)->GetDirectBufferAddress(env,
compressed_direct_buf); compressed_direct_buf);
UNLOCK_CLASS(env, clazz, "Bzip2Compressor");
if (!uncompressed_bytes || !compressed_bytes) { if (!uncompressed_bytes || !compressed_bytes) {
return (jint)0; return (jint)0;

View File

@ -25,7 +25,6 @@
#include "org_apache_hadoop_io_compress_bzip2.h" #include "org_apache_hadoop_io_compress_bzip2.h"
#include "org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor.h" #include "org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor.h"
static jfieldID Bzip2Decompressor_clazz;
static jfieldID Bzip2Decompressor_stream; static jfieldID Bzip2Decompressor_stream;
static jfieldID Bzip2Decompressor_compressedDirectBuf; static jfieldID Bzip2Decompressor_compressedDirectBuf;
static jfieldID Bzip2Decompressor_compressedDirectBufOff; static jfieldID Bzip2Decompressor_compressedDirectBufOff;
@ -73,8 +72,6 @@ Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_initIDs(
"BZ2_bzDecompressEnd"); "BZ2_bzDecompressEnd");
// Initialize the requisite fieldIds. // Initialize the requisite fieldIds.
Bzip2Decompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz",
"Ljava/lang/Class;");
Bzip2Decompressor_stream = (*env)->GetFieldID(env, class, "stream", "J"); Bzip2Decompressor_stream = (*env)->GetFieldID(env, class, "stream", "J");
Bzip2Decompressor_finished = (*env)->GetFieldID(env, class, Bzip2Decompressor_finished = (*env)->GetFieldID(env, class,
"finished", "Z"); "finished", "Z");
@ -144,8 +141,6 @@ Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_inflateBytesDirect(
return (jint)0; return (jint)0;
} }
jobject clazz = (*env)->GetStaticObjectField(env, this,
Bzip2Decompressor_clazz);
jarray compressed_direct_buf = (jarray)(*env)->GetObjectField(env, jarray compressed_direct_buf = (jarray)(*env)->GetObjectField(env,
this, Bzip2Decompressor_compressedDirectBuf); this, Bzip2Decompressor_compressedDirectBuf);
jint compressed_direct_buf_off = (*env)->GetIntField(env, this, jint compressed_direct_buf_off = (*env)->GetIntField(env, this,
@ -159,12 +154,10 @@ Java_org_apache_hadoop_io_compress_bzip2_Bzip2Decompressor_inflateBytesDirect(
Bzip2Decompressor_directBufferSize); Bzip2Decompressor_directBufferSize);
// Get the input and output direct buffers. // Get the input and output direct buffers.
LOCK_CLASS(env, clazz, "Bzip2Decompressor");
char* compressed_bytes = (*env)->GetDirectBufferAddress(env, char* compressed_bytes = (*env)->GetDirectBufferAddress(env,
compressed_direct_buf); compressed_direct_buf);
char* uncompressed_bytes = (*env)->GetDirectBufferAddress(env, char* uncompressed_bytes = (*env)->GetDirectBufferAddress(env,
uncompressed_direct_buf); uncompressed_direct_buf);
UNLOCK_CLASS(env, clazz, "Bzip2Decompressor");
if (!compressed_bytes || !uncompressed_bytes) { if (!compressed_bytes || !uncompressed_bytes) {
return (jint)0; return (jint)0;

View File

@ -27,7 +27,6 @@
#include "lz4hc.h" #include "lz4hc.h"
static jfieldID Lz4Compressor_clazz;
static jfieldID Lz4Compressor_uncompressedDirectBuf; static jfieldID Lz4Compressor_uncompressedDirectBuf;
static jfieldID Lz4Compressor_uncompressedDirectBufLen; static jfieldID Lz4Compressor_uncompressedDirectBufLen;
static jfieldID Lz4Compressor_compressedDirectBuf; static jfieldID Lz4Compressor_compressedDirectBuf;
@ -37,8 +36,6 @@ static jfieldID Lz4Compressor_directBufferSize;
JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_initIDs JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_initIDs
(JNIEnv *env, jclass clazz){ (JNIEnv *env, jclass clazz){
Lz4Compressor_clazz = (*env)->GetStaticFieldID(env, clazz, "clazz",
"Ljava/lang/Class;");
Lz4Compressor_uncompressedDirectBuf = (*env)->GetFieldID(env, clazz, Lz4Compressor_uncompressedDirectBuf = (*env)->GetFieldID(env, clazz,
"uncompressedDirectBuf", "uncompressedDirectBuf",
"Ljava/nio/Buffer;"); "Ljava/nio/Buffer;");
@ -57,25 +54,20 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_comp
char *compressed_bytes; char *compressed_bytes;
// Get members of Lz4Compressor // Get members of Lz4Compressor
jobject clazz = (*env)->GetStaticObjectField(env, thisj, Lz4Compressor_clazz);
jobject uncompressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_uncompressedDirectBuf); jobject uncompressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_uncompressedDirectBuf);
jint uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_uncompressedDirectBufLen); jint uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_uncompressedDirectBufLen);
jobject compressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_compressedDirectBuf); jobject compressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_compressedDirectBuf);
jint compressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_directBufferSize); jint compressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_directBufferSize);
// Get the input direct buffer // Get the input direct buffer
LOCK_CLASS(env, clazz, "Lz4Compressor");
uncompressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); uncompressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
UNLOCK_CLASS(env, clazz, "Lz4Compressor");
if (uncompressed_bytes == 0) { if (uncompressed_bytes == 0) {
return (jint)0; return (jint)0;
} }
// Get the output direct buffer // Get the output direct buffer
LOCK_CLASS(env, clazz, "Lz4Compressor");
compressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, compressed_direct_buf); compressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
UNLOCK_CLASS(env, clazz, "Lz4Compressor");
if (compressed_bytes == 0) { if (compressed_bytes == 0) {
return (jint)0; return (jint)0;
@ -104,25 +96,20 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_comp
char* compressed_bytes = NULL; char* compressed_bytes = NULL;
// Get members of Lz4Compressor // Get members of Lz4Compressor
jobject clazz = (*env)->GetStaticObjectField(env, thisj, Lz4Compressor_clazz);
jobject uncompressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_uncompressedDirectBuf); jobject uncompressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_uncompressedDirectBuf);
jint uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_uncompressedDirectBufLen); jint uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_uncompressedDirectBufLen);
jobject compressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_compressedDirectBuf); jobject compressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_compressedDirectBuf);
jint compressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_directBufferSize); jint compressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_directBufferSize);
// Get the input direct buffer // Get the input direct buffer
LOCK_CLASS(env, clazz, "Lz4Compressor");
uncompressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); uncompressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
UNLOCK_CLASS(env, clazz, "Lz4Compressor");
if (uncompressed_bytes == 0) { if (uncompressed_bytes == 0) {
return (jint)0; return (jint)0;
} }
// Get the output direct buffer // Get the output direct buffer
LOCK_CLASS(env, clazz, "Lz4Compressor");
compressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, compressed_direct_buf); compressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
UNLOCK_CLASS(env, clazz, "Lz4Compressor");
if (compressed_bytes == 0) { if (compressed_bytes == 0) {
return (jint)0; return (jint)0;

View File

@ -25,7 +25,6 @@
#include "lz4.h" #include "lz4.h"
static jfieldID Lz4Decompressor_clazz;
static jfieldID Lz4Decompressor_compressedDirectBuf; static jfieldID Lz4Decompressor_compressedDirectBuf;
static jfieldID Lz4Decompressor_compressedDirectBufLen; static jfieldID Lz4Decompressor_compressedDirectBufLen;
static jfieldID Lz4Decompressor_uncompressedDirectBuf; static jfieldID Lz4Decompressor_uncompressedDirectBuf;
@ -34,8 +33,6 @@ static jfieldID Lz4Decompressor_directBufferSize;
JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Decompressor_initIDs JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Decompressor_initIDs
(JNIEnv *env, jclass clazz){ (JNIEnv *env, jclass clazz){
Lz4Decompressor_clazz = (*env)->GetStaticFieldID(env, clazz, "clazz",
"Ljava/lang/Class;");
Lz4Decompressor_compressedDirectBuf = (*env)->GetFieldID(env,clazz, Lz4Decompressor_compressedDirectBuf = (*env)->GetFieldID(env,clazz,
"compressedDirectBuf", "compressedDirectBuf",
"Ljava/nio/Buffer;"); "Ljava/nio/Buffer;");
@ -54,25 +51,20 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Decompressor_de
char *uncompressed_bytes; char *uncompressed_bytes;
// Get members of Lz4Decompressor // Get members of Lz4Decompressor
jobject clazz = (*env)->GetStaticObjectField(env,thisj, Lz4Decompressor_clazz);
jobject compressed_direct_buf = (*env)->GetObjectField(env,thisj, Lz4Decompressor_compressedDirectBuf); jobject compressed_direct_buf = (*env)->GetObjectField(env,thisj, Lz4Decompressor_compressedDirectBuf);
jint compressed_direct_buf_len = (*env)->GetIntField(env,thisj, Lz4Decompressor_compressedDirectBufLen); jint compressed_direct_buf_len = (*env)->GetIntField(env,thisj, Lz4Decompressor_compressedDirectBufLen);
jobject uncompressed_direct_buf = (*env)->GetObjectField(env,thisj, Lz4Decompressor_uncompressedDirectBuf); jobject uncompressed_direct_buf = (*env)->GetObjectField(env,thisj, Lz4Decompressor_uncompressedDirectBuf);
size_t uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Decompressor_directBufferSize); size_t uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Decompressor_directBufferSize);
// Get the input direct buffer // Get the input direct buffer
LOCK_CLASS(env, clazz, "Lz4Decompressor");
compressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, compressed_direct_buf); compressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
UNLOCK_CLASS(env, clazz, "Lz4Decompressor");
if (compressed_bytes == 0) { if (compressed_bytes == 0) {
return (jint)0; return (jint)0;
} }
// Get the output direct buffer // Get the output direct buffer
LOCK_CLASS(env, clazz, "Lz4Decompressor");
uncompressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); uncompressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
UNLOCK_CLASS(env, clazz, "Lz4Decompressor");
if (uncompressed_bytes == 0) { if (uncompressed_bytes == 0) {
return (jint)0; return (jint)0;

View File

@ -38,7 +38,6 @@
#define JINT_MAX 0x7fffffff #define JINT_MAX 0x7fffffff
static jfieldID SnappyCompressor_clazz;
static jfieldID SnappyCompressor_uncompressedDirectBuf; static jfieldID SnappyCompressor_uncompressedDirectBuf;
static jfieldID SnappyCompressor_uncompressedDirectBufLen; static jfieldID SnappyCompressor_uncompressedDirectBufLen;
static jfieldID SnappyCompressor_compressedDirectBuf; static jfieldID SnappyCompressor_compressedDirectBuf;
@ -84,8 +83,6 @@ JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyCompresso
LOAD_DYNAMIC_SYMBOL(__dlsym_snappy_compress, dlsym_snappy_compress, env, libsnappy, "snappy_compress"); LOAD_DYNAMIC_SYMBOL(__dlsym_snappy_compress, dlsym_snappy_compress, env, libsnappy, "snappy_compress");
#endif #endif
SnappyCompressor_clazz = (*env)->GetStaticFieldID(env, clazz, "clazz",
"Ljava/lang/Class;");
SnappyCompressor_uncompressedDirectBuf = (*env)->GetFieldID(env, clazz, SnappyCompressor_uncompressedDirectBuf = (*env)->GetFieldID(env, clazz,
"uncompressedDirectBuf", "uncompressedDirectBuf",
"Ljava/nio/Buffer;"); "Ljava/nio/Buffer;");
@ -104,7 +101,6 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyCompresso
char* compressed_bytes; char* compressed_bytes;
snappy_status ret; snappy_status ret;
// Get members of SnappyCompressor // Get members of SnappyCompressor
jobject clazz = (*env)->GetStaticObjectField(env, thisj, SnappyCompressor_clazz);
jobject uncompressed_direct_buf = (*env)->GetObjectField(env, thisj, SnappyCompressor_uncompressedDirectBuf); jobject uncompressed_direct_buf = (*env)->GetObjectField(env, thisj, SnappyCompressor_uncompressedDirectBuf);
jint uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, SnappyCompressor_uncompressedDirectBufLen); jint uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, SnappyCompressor_uncompressedDirectBufLen);
jobject compressed_direct_buf = (*env)->GetObjectField(env, thisj, SnappyCompressor_compressedDirectBuf); jobject compressed_direct_buf = (*env)->GetObjectField(env, thisj, SnappyCompressor_compressedDirectBuf);
@ -112,18 +108,14 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyCompresso
size_t buf_len; size_t buf_len;
// Get the input direct buffer // Get the input direct buffer
LOCK_CLASS(env, clazz, "SnappyCompressor");
uncompressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); uncompressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
UNLOCK_CLASS(env, clazz, "SnappyCompressor");
if (uncompressed_bytes == 0) { if (uncompressed_bytes == 0) {
return (jint)0; return (jint)0;
} }
// Get the output direct buffer // Get the output direct buffer
LOCK_CLASS(env, clazz, "SnappyCompressor");
compressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, compressed_direct_buf); compressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
UNLOCK_CLASS(env, clazz, "SnappyCompressor");
if (compressed_bytes == 0) { if (compressed_bytes == 0) {
return (jint)0; return (jint)0;

View File

@ -31,7 +31,6 @@
#include "org_apache_hadoop_io_compress_snappy_SnappyDecompressor.h" #include "org_apache_hadoop_io_compress_snappy_SnappyDecompressor.h"
static jfieldID SnappyDecompressor_clazz;
static jfieldID SnappyDecompressor_compressedDirectBuf; static jfieldID SnappyDecompressor_compressedDirectBuf;
static jfieldID SnappyDecompressor_compressedDirectBufLen; static jfieldID SnappyDecompressor_compressedDirectBufLen;
static jfieldID SnappyDecompressor_uncompressedDirectBuf; static jfieldID SnappyDecompressor_uncompressedDirectBuf;
@ -79,8 +78,6 @@ JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyDecompres
LOAD_DYNAMIC_SYMBOL(__dlsym_snappy_uncompress, dlsym_snappy_uncompress, env, libsnappy, "snappy_uncompress"); LOAD_DYNAMIC_SYMBOL(__dlsym_snappy_uncompress, dlsym_snappy_uncompress, env, libsnappy, "snappy_uncompress");
#endif #endif
SnappyDecompressor_clazz = (*env)->GetStaticFieldID(env, clazz, "clazz",
"Ljava/lang/Class;");
SnappyDecompressor_compressedDirectBuf = (*env)->GetFieldID(env,clazz, SnappyDecompressor_compressedDirectBuf = (*env)->GetFieldID(env,clazz,
"compressedDirectBuf", "compressedDirectBuf",
"Ljava/nio/Buffer;"); "Ljava/nio/Buffer;");
@ -99,25 +96,20 @@ JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_snappy_SnappyDecompres
char* uncompressed_bytes = NULL; char* uncompressed_bytes = NULL;
snappy_status ret; snappy_status ret;
// Get members of SnappyDecompressor // Get members of SnappyDecompressor
jobject clazz = (*env)->GetStaticObjectField(env,thisj, SnappyDecompressor_clazz);
jobject compressed_direct_buf = (*env)->GetObjectField(env,thisj, SnappyDecompressor_compressedDirectBuf); jobject compressed_direct_buf = (*env)->GetObjectField(env,thisj, SnappyDecompressor_compressedDirectBuf);
jint compressed_direct_buf_len = (*env)->GetIntField(env,thisj, SnappyDecompressor_compressedDirectBufLen); jint compressed_direct_buf_len = (*env)->GetIntField(env,thisj, SnappyDecompressor_compressedDirectBufLen);
jobject uncompressed_direct_buf = (*env)->GetObjectField(env,thisj, SnappyDecompressor_uncompressedDirectBuf); jobject uncompressed_direct_buf = (*env)->GetObjectField(env,thisj, SnappyDecompressor_uncompressedDirectBuf);
size_t uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, SnappyDecompressor_directBufferSize); size_t uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, SnappyDecompressor_directBufferSize);
// Get the input direct buffer // Get the input direct buffer
LOCK_CLASS(env, clazz, "SnappyDecompressor");
compressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, compressed_direct_buf); compressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
UNLOCK_CLASS(env, clazz, "SnappyDecompressor");
if (compressed_bytes == 0) { if (compressed_bytes == 0) {
return (jint)0; return (jint)0;
} }
// Get the output direct buffer // Get the output direct buffer
LOCK_CLASS(env, clazz, "SnappyDecompressor");
uncompressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf); uncompressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
UNLOCK_CLASS(env, clazz, "SnappyDecompressor");
if (uncompressed_bytes == 0) { if (uncompressed_bytes == 0) {
return (jint)0; return (jint)0;

View File

@ -28,7 +28,6 @@
#include "org_apache_hadoop_io_compress_zlib.h" #include "org_apache_hadoop_io_compress_zlib.h"
#include "org_apache_hadoop_io_compress_zlib_ZlibCompressor.h" #include "org_apache_hadoop_io_compress_zlib_ZlibCompressor.h"
static jfieldID ZlibCompressor_clazz;
static jfieldID ZlibCompressor_stream; static jfieldID ZlibCompressor_stream;
static jfieldID ZlibCompressor_uncompressedDirectBuf; static jfieldID ZlibCompressor_uncompressedDirectBuf;
static jfieldID ZlibCompressor_uncompressedDirectBufOff; static jfieldID ZlibCompressor_uncompressedDirectBufOff;
@ -141,8 +140,6 @@ Java_org_apache_hadoop_io_compress_zlib_ZlibCompressor_initIDs(
#endif #endif
// Initialize the requisite fieldIds // Initialize the requisite fieldIds
ZlibCompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz",
"Ljava/lang/Class;");
ZlibCompressor_stream = (*env)->GetFieldID(env, class, "stream", "J"); ZlibCompressor_stream = (*env)->GetFieldID(env, class, "stream", "J");
ZlibCompressor_finish = (*env)->GetFieldID(env, class, "finish", "Z"); ZlibCompressor_finish = (*env)->GetFieldID(env, class, "finish", "Z");
ZlibCompressor_finished = (*env)->GetFieldID(env, class, "finished", "Z"); ZlibCompressor_finished = (*env)->GetFieldID(env, class, "finished", "Z");
@ -239,7 +236,6 @@ JNIEXPORT jint JNICALL
Java_org_apache_hadoop_io_compress_zlib_ZlibCompressor_deflateBytesDirect( Java_org_apache_hadoop_io_compress_zlib_ZlibCompressor_deflateBytesDirect(
JNIEnv *env, jobject this JNIEnv *env, jobject this
) { ) {
jobject clazz = NULL;
jobject uncompressed_direct_buf = NULL; jobject uncompressed_direct_buf = NULL;
jint uncompressed_direct_buf_off = 0; jint uncompressed_direct_buf_off = 0;
jint uncompressed_direct_buf_len = 0; jint uncompressed_direct_buf_len = 0;
@ -260,9 +256,6 @@ Java_org_apache_hadoop_io_compress_zlib_ZlibCompressor_deflateBytesDirect(
return (jint)0; return (jint)0;
} }
// Get members of ZlibCompressor
clazz = (*env)->GetStaticObjectField(env, this,
ZlibCompressor_clazz);
uncompressed_direct_buf = (*env)->GetObjectField(env, this, uncompressed_direct_buf = (*env)->GetObjectField(env, this,
ZlibCompressor_uncompressedDirectBuf); ZlibCompressor_uncompressedDirectBuf);
uncompressed_direct_buf_off = (*env)->GetIntField(env, this, uncompressed_direct_buf_off = (*env)->GetIntField(env, this,
@ -278,20 +271,16 @@ Java_org_apache_hadoop_io_compress_zlib_ZlibCompressor_deflateBytesDirect(
finish = (*env)->GetBooleanField(env, this, ZlibCompressor_finish); finish = (*env)->GetBooleanField(env, this, ZlibCompressor_finish);
// Get the input direct buffer // Get the input direct buffer
LOCK_CLASS(env, clazz, "ZlibCompressor");
uncompressed_bytes = (*env)->GetDirectBufferAddress(env, uncompressed_bytes = (*env)->GetDirectBufferAddress(env,
uncompressed_direct_buf); uncompressed_direct_buf);
UNLOCK_CLASS(env, clazz, "ZlibCompressor");
if (uncompressed_bytes == 0) { if (uncompressed_bytes == 0) {
return (jint)0; return (jint)0;
} }
// Get the output direct buffer // Get the output direct buffer
LOCK_CLASS(env, clazz, "ZlibCompressor");
compressed_bytes = (*env)->GetDirectBufferAddress(env, compressed_bytes = (*env)->GetDirectBufferAddress(env,
compressed_direct_buf); compressed_direct_buf);
UNLOCK_CLASS(env, clazz, "ZlibCompressor");
if (compressed_bytes == 0) { if (compressed_bytes == 0) {
return (jint)0; return (jint)0;

View File

@ -28,7 +28,6 @@
#include "org_apache_hadoop_io_compress_zlib.h" #include "org_apache_hadoop_io_compress_zlib.h"
#include "org_apache_hadoop_io_compress_zlib_ZlibDecompressor.h" #include "org_apache_hadoop_io_compress_zlib_ZlibDecompressor.h"
static jfieldID ZlibDecompressor_clazz;
static jfieldID ZlibDecompressor_stream; static jfieldID ZlibDecompressor_stream;
static jfieldID ZlibDecompressor_compressedDirectBuf; static jfieldID ZlibDecompressor_compressedDirectBuf;
static jfieldID ZlibDecompressor_compressedDirectBufOff; static jfieldID ZlibDecompressor_compressedDirectBufOff;
@ -104,8 +103,6 @@ JNIEnv *env, jclass class
// Initialize the requisite fieldIds // Initialize the requisite fieldIds
ZlibDecompressor_clazz = (*env)->GetStaticFieldID(env, class, "clazz",
"Ljava/lang/Class;");
ZlibDecompressor_stream = (*env)->GetFieldID(env, class, "stream", "J"); ZlibDecompressor_stream = (*env)->GetFieldID(env, class, "stream", "J");
ZlibDecompressor_needDict = (*env)->GetFieldID(env, class, "needDict", "Z"); ZlibDecompressor_needDict = (*env)->GetFieldID(env, class, "needDict", "Z");
ZlibDecompressor_finished = (*env)->GetFieldID(env, class, "finished", "Z"); ZlibDecompressor_finished = (*env)->GetFieldID(env, class, "finished", "Z");
@ -197,7 +194,6 @@ JNIEXPORT jint JNICALL
Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_inflateBytesDirect( Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_inflateBytesDirect(
JNIEnv *env, jobject this JNIEnv *env, jobject this
) { ) {
jobject clazz = NULL;
jarray compressed_direct_buf = NULL; jarray compressed_direct_buf = NULL;
jint compressed_direct_buf_off = 0; jint compressed_direct_buf_off = 0;
jint compressed_direct_buf_len = 0; jint compressed_direct_buf_len = 0;
@ -218,8 +214,6 @@ Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_inflateBytesDirect(
} }
// Get members of ZlibDecompressor // Get members of ZlibDecompressor
clazz = (*env)->GetStaticObjectField(env, this,
ZlibDecompressor_clazz);
compressed_direct_buf = (jarray)(*env)->GetObjectField(env, this, compressed_direct_buf = (jarray)(*env)->GetObjectField(env, this,
ZlibDecompressor_compressedDirectBuf); ZlibDecompressor_compressedDirectBuf);
compressed_direct_buf_off = (*env)->GetIntField(env, this, compressed_direct_buf_off = (*env)->GetIntField(env, this,
@ -233,20 +227,16 @@ Java_org_apache_hadoop_io_compress_zlib_ZlibDecompressor_inflateBytesDirect(
ZlibDecompressor_directBufferSize); ZlibDecompressor_directBufferSize);
// Get the input direct buffer // Get the input direct buffer
LOCK_CLASS(env, clazz, "ZlibDecompressor");
compressed_bytes = (*env)->GetDirectBufferAddress(env, compressed_bytes = (*env)->GetDirectBufferAddress(env,
compressed_direct_buf); compressed_direct_buf);
UNLOCK_CLASS(env, clazz, "ZlibDecompressor");
if (!compressed_bytes) { if (!compressed_bytes) {
return (jint)0; return (jint)0;
} }
// Get the output direct buffer // Get the output direct buffer
LOCK_CLASS(env, clazz, "ZlibDecompressor");
uncompressed_bytes = (*env)->GetDirectBufferAddress(env, uncompressed_bytes = (*env)->GetDirectBufferAddress(env,
uncompressed_direct_buf); uncompressed_direct_buf);
UNLOCK_CLASS(env, clazz, "ZlibDecompressor");
if (!uncompressed_bytes) { if (!uncompressed_bytes) {
return (jint)0; return (jint)0;

View File

@ -0,0 +1,104 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress.bzip2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.compress.*;
import org.apache.hadoop.io.compress.bzip2.Bzip2Compressor;
import org.apache.hadoop.io.compress.bzip2.Bzip2Decompressor;
import org.apache.hadoop.test.MultithreadedTestUtil;
import org.junit.Before;
import org.junit.Test;
import java.io.*;
import java.util.Random;
import static org.junit.Assert.*;
import static org.junit.Assume.*;
import static org.junit.Assume.assumeTrue;
public class TestBzip2CompressorDecompressor {
private static final Random rnd = new Random(12345l);
@Before
public void before() {
assumeTrue(Bzip2Factory.isNativeBzip2Loaded(new Configuration()));
}
// test compress/decompress process
@Test
public void testCompressDecompress() {
byte[] rawData = null;
int rawDataSize = 0;
rawDataSize = 1024 * 64;
rawData = generate(rawDataSize);
try {
Bzip2Compressor compressor = new Bzip2Compressor();
Bzip2Decompressor decompressor = new Bzip2Decompressor();
assertFalse("testBzip2CompressDecompress finished error",
compressor.finished());
compressor.setInput(rawData, 0, rawData.length);
assertTrue("testBzip2CompressDecompress getBytesRead before error",
compressor.getBytesRead() == 0);
compressor.finish();
byte[] compressedResult = new byte[rawDataSize];
int cSize = compressor.compress(compressedResult, 0, rawDataSize);
assertTrue("testBzip2CompressDecompress getBytesRead after error",
compressor.getBytesRead() == rawDataSize);
assertTrue(
"testBzip2CompressDecompress compressed size no less than original size",
cSize < rawDataSize);
decompressor.setInput(compressedResult, 0, cSize);
byte[] decompressedBytes = new byte[rawDataSize];
decompressor.decompress(decompressedBytes, 0, decompressedBytes.length);
assertArrayEquals("testBzip2CompressDecompress arrays not equals ",
rawData, decompressedBytes);
compressor.reset();
decompressor.reset();
} catch (IOException ex) {
fail("testBzip2CompressDecompress ex !!!" + ex);
}
}
public static byte[] generate(int size) {
byte[] array = new byte[size];
for (int i = 0; i < size; i++)
array[i] = (byte)rnd.nextInt(16);
return array;
}
@Test
public void testBzip2CompressDecompressInMultiThreads() throws Exception {
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext();
for(int i=0;i<10;i++) {
ctx.addThread( new MultithreadedTestUtil.TestingThread(ctx) {
@Override
public void doWork() throws Exception {
testCompressDecompress();
}
});
}
ctx.startThreads();
ctx.waitFor(60000);
}
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Lz4Codec; import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.lz4.Lz4Compressor; import org.apache.hadoop.io.compress.lz4.Lz4Compressor;
import org.apache.hadoop.io.compress.lz4.Lz4Decompressor; import org.apache.hadoop.io.compress.lz4.Lz4Decompressor;
import org.apache.hadoop.test.MultithreadedTestUtil;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assume.*; import static org.junit.Assume.*;
@ -313,4 +314,20 @@ public class TestLz4CompressorDecompressor {
array[i] = (byte)rnd.nextInt(16); array[i] = (byte)rnd.nextInt(16);
return array; return array;
} }
@Test
public void testLz4CompressDecompressInMultiThreads() throws Exception {
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext();
for(int i=0;i<10;i++) {
ctx.addThread( new MultithreadedTestUtil.TestingThread(ctx) {
@Override
public void doWork() throws Exception {
testCompressDecompress();
}
});
}
ctx.startThreads();
ctx.waitFor(60000);
}
} }

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream; import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.SnappyCodec; import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor; import org.apache.hadoop.io.compress.snappy.SnappyDecompressor.SnappyDirectDecompressor;
import org.apache.hadoop.test.MultithreadedTestUtil;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -391,4 +392,20 @@ public class TestSnappyCompressorDecompressor {
return array; return array;
} }
} }
@Test
public void testSnappyCompressDecompressInMultiThreads() throws Exception {
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext();
for(int i=0;i<10;i++) {
ctx.addThread( new MultithreadedTestUtil.TestingThread(ctx) {
@Override
public void doWork() throws Exception {
testSnappyCompressDecompress();
}
});
}
ctx.startThreads();
ctx.waitFor(60000);
}
} }

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.io.compress.CompressDecompressTester.CompressionTestStr
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel; import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionLevel;
import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy; import org.apache.hadoop.io.compress.zlib.ZlibCompressor.CompressionStrategy;
import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor; import org.apache.hadoop.io.compress.zlib.ZlibDecompressor.ZlibDirectDecompressor;
import org.apache.hadoop.test.MultithreadedTestUtil;
import org.apache.hadoop.util.NativeCodeLoader; import org.apache.hadoop.util.NativeCodeLoader;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -419,4 +420,20 @@ public class TestZlibCompressorDecompressor {
data[i] = (byte)random.nextInt(16); data[i] = (byte)random.nextInt(16);
return data; return data;
} }
@Test
public void testZlibCompressDecompressInMultiThreads() throws Exception {
MultithreadedTestUtil.TestContext ctx = new MultithreadedTestUtil.TestContext();
for(int i=0;i<10;i++) {
ctx.addThread( new MultithreadedTestUtil.TestingThread(ctx) {
@Override
public void doWork() throws Exception {
testZlibCompressDecompress();
}
});
}
ctx.startThreads();
ctx.waitFor(60000);
}
} }