HADOOP-17292. Using lz4-java in Lz4Codec (#2350)

Contributed by Liang-Chi Hsieh.
This commit is contained in:
Liang-Chi Hsieh 2020-11-18 12:03:25 -08:00 committed by GitHub
parent 0d3155a687
commit 34aa6137bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 132 additions and 2319 deletions

View File

@ -371,6 +371,11 @@
<artifactId>snappy-java</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
@ -577,11 +582,6 @@
<exclude>src/main/native/m4/*</exclude>
<exclude>src/test/empty-file</exclude>
<exclude>src/test/all-tests</exclude>
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4.h</exclude>
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4.c</exclude>
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4hc.h</exclude>
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4hc.c</exclude>
<exclude>src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4hc_encoder.h</exclude>
<exclude>src/main/native/gtest/**/*</exclude>
<exclude>src/test/resources/test-untar.tgz</exclude>
<exclude>src/test/resources/test.har/_SUCCESS</exclude>

View File

@ -236,10 +236,6 @@ configure_file(${CMAKE_SOURCE_DIR}/config.h.cmake ${CMAKE_BINARY_DIR}/config.h)
set(CMAKE_BUILD_WITH_INSTALL_RPATH TRUE)
hadoop_add_dual_library(hadoop
main/native/src/exception.c
${SRC}/io/compress/lz4/Lz4Compressor.c
${SRC}/io/compress/lz4/Lz4Decompressor.c
${SRC}/io/compress/lz4/lz4.c
${SRC}/io/compress/lz4/lz4hc.c
${ISAL_SOURCE_FILES}
${ZSTD_SOURCE_FILES}
${OPENSSL_SOURCE_FILES}

View File

@ -27,17 +27,12 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.lz4.Lz4Compressor;
import org.apache.hadoop.io.compress.lz4.Lz4Decompressor;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.util.NativeCodeLoader;
/**
* This class creates lz4 compressors/decompressors.
*/
public class Lz4Codec implements Configurable, CompressionCodec {
static {
NativeCodeLoader.isNativeCodeLoaded();
}
Configuration conf;
/**
@ -60,19 +55,6 @@ public class Lz4Codec implements Configurable, CompressionCodec {
return conf;
}
/**
* Are the native lz4 libraries loaded &amp; initialized?
*
* @return true if loaded &amp; initialized, otherwise false
*/
public static boolean isNativeCodeLoaded() {
return NativeCodeLoader.isNativeCodeLoaded();
}
public static String getLibraryName() {
return Lz4Compressor.getLibraryName();
}
/**
* Create a {@link CompressionOutputStream} that will write to the given
* {@link OutputStream}.
@ -101,9 +83,6 @@ public class Lz4Codec implements Configurable, CompressionCodec {
public CompressionOutputStream createOutputStream(OutputStream out,
Compressor compressor)
throws IOException {
if (!isNativeCodeLoaded()) {
throw new RuntimeException("native lz4 library not available");
}
int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);
@ -121,10 +100,6 @@ public class Lz4Codec implements Configurable, CompressionCodec {
*/
@Override
public Class<? extends Compressor> getCompressorType() {
if (!isNativeCodeLoaded()) {
throw new RuntimeException("native lz4 library not available");
}
return Lz4Compressor.class;
}
@ -135,9 +110,6 @@ public class Lz4Codec implements Configurable, CompressionCodec {
*/
@Override
public Compressor createCompressor() {
if (!isNativeCodeLoaded()) {
throw new RuntimeException("native lz4 library not available");
}
int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);
@ -175,10 +147,6 @@ public class Lz4Codec implements Configurable, CompressionCodec {
public CompressionInputStream createInputStream(InputStream in,
Decompressor decompressor)
throws IOException {
if (!isNativeCodeLoaded()) {
throw new RuntimeException("native lz4 library not available");
}
return new BlockDecompressorStream(in, decompressor, conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT));
@ -191,10 +159,6 @@ public class Lz4Codec implements Configurable, CompressionCodec {
*/
@Override
public Class<? extends Decompressor> getDecompressorType() {
if (!isNativeCodeLoaded()) {
throw new RuntimeException("native lz4 library not available");
}
return Lz4Decompressor.class;
}
@ -205,9 +169,6 @@ public class Lz4Codec implements Configurable, CompressionCodec {
*/
@Override
public Decompressor createDecompressor() {
if (!isNativeCodeLoaded()) {
throw new RuntimeException("native lz4 library not available");
}
int bufferSize = conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_DEFAULT);

View File

@ -22,9 +22,11 @@ import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4Compressor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.util.NativeCodeLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,22 +51,7 @@ public class Lz4Compressor implements Compressor {
private long bytesRead = 0L;
private long bytesWritten = 0L;
private final boolean useLz4HC;
static {
if (NativeCodeLoader.isNativeCodeLoaded()) {
// Initialize the native library
try {
initIDs();
} catch (Throwable t) {
// Ignore failure to load/initialize lz4
LOG.warn(t.toString());
}
} else {
LOG.error("Cannot load " + Lz4Compressor.class.getName() +
" without native hadoop library!");
}
}
private final LZ4Compressor lz4Compressor;
/**
* Creates a new compressor.
@ -74,9 +61,21 @@ public class Lz4Compressor implements Compressor {
* which trades CPU for compression ratio.
*/
public Lz4Compressor(int directBufferSize, boolean useLz4HC) {
this.useLz4HC = useLz4HC;
this.directBufferSize = directBufferSize;
try {
LZ4Factory lz4Factory = LZ4Factory.fastestInstance();
if (useLz4HC) {
lz4Compressor = lz4Factory.highCompressor();
} else {
lz4Compressor = lz4Factory.fastCompressor();
}
} catch (AssertionError t) {
throw new RuntimeException("lz4-java library is not available: " +
"Lz4Compressor has not been loaded. You need to add " +
"lz4-java.jar to your CLASSPATH. " + t, t);
}
uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
// Compression is guaranteed to succeed if 'dstCapacity' >=
@ -243,7 +242,7 @@ public class Lz4Compressor implements Compressor {
}
// Compress data
n = useLz4HC ? compressBytesDirectHC() : compressBytesDirect();
n = compressDirectBuf();
compressedDirectBuf.limit(n);
uncompressedDirectBuf.clear(); // lz4 consumes all buffer input
@ -309,11 +308,20 @@ public class Lz4Compressor implements Compressor {
public synchronized void end() {
}
private native static void initIDs();
private native int compressBytesDirect();
private native int compressBytesDirectHC();
public native static String getLibraryName();
private int compressDirectBuf() {
if (uncompressedDirectBufLen == 0) {
return 0;
} else {
// Set the position and limit of `uncompressedDirectBuf` for reading
uncompressedDirectBuf.limit(uncompressedDirectBufLen).position(0);
compressedDirectBuf.clear();
lz4Compressor.compress((ByteBuffer) uncompressedDirectBuf,
(ByteBuffer) compressedDirectBuf);
uncompressedDirectBufLen = 0;
uncompressedDirectBuf.limit(directBufferSize).position(0);
int size = compressedDirectBuf.position();
compressedDirectBuf.position(0);
return size;
}
}
}

View File

@ -22,8 +22,10 @@ import java.io.IOException;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4SafeDecompressor;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.util.NativeCodeLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -44,20 +46,7 @@ public class Lz4Decompressor implements Decompressor {
private int userBufOff = 0, userBufLen = 0;
private boolean finished;
static {
if (NativeCodeLoader.isNativeCodeLoaded()) {
// Initialize the native library
try {
initIDs();
} catch (Throwable t) {
// Ignore failure to load/initialize lz4
LOG.warn(t.toString());
}
} else {
LOG.error("Cannot load " + Lz4Compressor.class.getName() +
" without native hadoop library!");
}
}
private LZ4SafeDecompressor lz4Decompressor;
/**
* Creates a new compressor.
@ -67,6 +56,15 @@ public class Lz4Decompressor implements Decompressor {
public Lz4Decompressor(int directBufferSize) {
this.directBufferSize = directBufferSize;
try {
LZ4Factory lz4Factory = LZ4Factory.fastestInstance();
lz4Decompressor = lz4Factory.safeDecompressor();
} catch (AssertionError t) {
throw new RuntimeException("lz4-java library is not available: " +
"Lz4Decompressor has not been loaded. You need to add " +
"lz4-java.jar to your CLASSPATH. " + t, t);
}
compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
uncompressedDirectBuf.position(directBufferSize);
@ -200,7 +198,7 @@ public class Lz4Decompressor implements Decompressor {
* @param b Buffer for the compressed data
* @param off Start offset of the data
* @param len Size of the buffer
* @return The actual number of bytes of compressed data.
* @return The actual number of bytes of uncompressed data.
* @throws IOException
*/
@Override
@ -228,7 +226,7 @@ public class Lz4Decompressor implements Decompressor {
uncompressedDirectBuf.limit(directBufferSize);
// Decompress data
n = decompressBytesDirect();
n = decompressDirectBuf();
uncompressedDirectBuf.limit(n);
if (userBufLen <= 0) {
@ -272,7 +270,18 @@ public class Lz4Decompressor implements Decompressor {
// do nothing
}
private native static void initIDs();
private native int decompressBytesDirect();
private int decompressDirectBuf() {
if (compressedDirectBufLen == 0) {
return 0;
} else {
compressedDirectBuf.limit(compressedDirectBufLen).position(0);
lz4Decompressor.decompress((ByteBuffer) compressedDirectBuf,
(ByteBuffer) uncompressedDirectBuf);
compressedDirectBufLen = 0;
compressedDirectBuf.clear();
int size = uncompressedDirectBuf.position();
uncompressedDirectBuf.position(0);
return size;
}
}
}

View File

@ -22,7 +22,6 @@ import org.apache.hadoop.io.compress.ZStandardCodec;
import org.apache.hadoop.io.erasurecode.ErasureCodeNative;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.OpensslCipher;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -69,8 +68,6 @@ public class NativeLibraryChecker {
boolean isalLoaded = false;
boolean zStdLoaded = false;
boolean pmdkLoaded = false;
// lz4 is linked within libhadoop
boolean lz4Loaded = nativeHadoopLoaded;
boolean bzip2Loaded = Bzip2Factory.isNativeBzip2Loaded(conf);
boolean openSslLoaded = false;
boolean winutilsExists = false;
@ -81,7 +78,6 @@ public class NativeLibraryChecker {
String isalDetail = "";
String pmdkDetail = "";
String zstdLibraryName = "";
String lz4LibraryName = "";
String bzip2LibraryName = "";
String winutilsPath = null;
@ -119,9 +115,6 @@ public class NativeLibraryChecker {
openSslLoaded = true;
}
if (lz4Loaded) {
lz4LibraryName = Lz4Codec.getLibraryName();
}
if (bzip2Loaded) {
bzip2LibraryName = Bzip2Factory.getLibraryName(conf);
}
@ -144,7 +137,6 @@ public class NativeLibraryChecker {
System.out.printf("hadoop: %b %s%n", nativeHadoopLoaded, hadoopLibraryName);
System.out.printf("zlib: %b %s%n", zlibLoaded, zlibLibraryName);
System.out.printf("zstd : %b %s%n", zStdLoaded, zstdLibraryName);
System.out.printf("lz4: %b %s%n", lz4Loaded, lz4LibraryName);
System.out.printf("bzip2: %b %s%n", bzip2Loaded, bzip2LibraryName);
System.out.printf("openssl: %b %s%n", openSslLoaded, openSslDetail);
System.out.printf("ISA-L: %b %s%n", isalLoaded, isalDetail);
@ -155,8 +147,8 @@ public class NativeLibraryChecker {
}
if ((!nativeHadoopLoaded) || (Shell.WINDOWS && (!winutilsExists)) ||
(checkAll && !(zlibLoaded && lz4Loaded
&& bzip2Loaded && isalLoaded && zStdLoaded))) {
(checkAll && !(zlibLoaded && bzip2Loaded
&& isalLoaded && zStdLoaded))) {
// return 1 to indicated check failed
ExitUtil.terminate(1);
}

View File

@ -1,128 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "org_apache_hadoop.h"
#include "org_apache_hadoop_io_compress_lz4_Lz4Compressor.h"
#ifdef UNIX
#include "config.h"
#endif // UNIX
#include "lz4.h"
#include "lz4hc.h"
static jfieldID Lz4Compressor_uncompressedDirectBuf;
static jfieldID Lz4Compressor_uncompressedDirectBufLen;
static jfieldID Lz4Compressor_compressedDirectBuf;
static jfieldID Lz4Compressor_dstCapacity;
JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_initIDs
(JNIEnv *env, jclass clazz){
Lz4Compressor_uncompressedDirectBuf = (*env)->GetFieldID(env, clazz,
"uncompressedDirectBuf",
"Ljava/nio/Buffer;");
Lz4Compressor_uncompressedDirectBufLen = (*env)->GetFieldID(env, clazz,
"uncompressedDirectBufLen", "I");
Lz4Compressor_compressedDirectBuf = (*env)->GetFieldID(env, clazz,
"compressedDirectBuf",
"Ljava/nio/Buffer;");
Lz4Compressor_dstCapacity = (*env)->GetFieldID(env, clazz,
"dstCapacity", "I");
}
JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_compressBytesDirect
(JNIEnv *env, jobject thisj){
const char* uncompressed_bytes;
char *compressed_bytes;
// Get members of Lz4Compressor
jobject uncompressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_uncompressedDirectBuf);
jint uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_uncompressedDirectBufLen);
jobject compressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_compressedDirectBuf);
jint compressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_dstCapacity);
// Get the input direct buffer
uncompressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
if (uncompressed_bytes == 0) {
return (jint)0;
}
// Get the output direct buffer
compressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
if (compressed_bytes == 0) {
return (jint)0;
}
compressed_direct_buf_len = LZ4_compress_default(uncompressed_bytes, compressed_bytes, uncompressed_direct_buf_len, compressed_direct_buf_len);
if (compressed_direct_buf_len < 0){
THROW(env, "java/lang/InternalError", "LZ4_compress failed");
}
(*env)->SetIntField(env, thisj, Lz4Compressor_uncompressedDirectBufLen, 0);
return (jint)compressed_direct_buf_len;
}
JNIEXPORT jstring JNICALL
Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_getLibraryName(
JNIEnv *env, jclass class
) {
char version_buf[128];
snprintf(version_buf, sizeof(version_buf), "revision:%d", LZ4_versionNumber());
return (*env)->NewStringUTF(env, version_buf);
}
JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Compressor_compressBytesDirectHC
(JNIEnv *env, jobject thisj){
const char* uncompressed_bytes = NULL;
char* compressed_bytes = NULL;
// Get members of Lz4Compressor
jobject uncompressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_uncompressedDirectBuf);
jint uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_uncompressedDirectBufLen);
jobject compressed_direct_buf = (*env)->GetObjectField(env, thisj, Lz4Compressor_compressedDirectBuf);
jint compressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Compressor_dstCapacity);
// Get the input direct buffer
uncompressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
if (uncompressed_bytes == 0) {
return (jint)0;
}
// Get the output direct buffer
compressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
if (compressed_bytes == 0) {
return (jint)0;
}
compressed_direct_buf_len = LZ4_compress_HC(uncompressed_bytes, compressed_bytes, uncompressed_direct_buf_len, compressed_direct_buf_len, 0);
if (compressed_direct_buf_len < 0){
THROW(env, "java/lang/InternalError", "LZ4_compressHC failed");
}
(*env)->SetIntField(env, thisj, Lz4Compressor_uncompressedDirectBufLen, 0);
return (jint)compressed_direct_buf_len;
}

View File

@ -1,81 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "org_apache_hadoop.h"
#include "org_apache_hadoop_io_compress_lz4_Lz4Decompressor.h"
#ifdef UNIX
#include "config.h"
#endif // UNIX
#include "lz4.h"
static jfieldID Lz4Decompressor_compressedDirectBuf;
static jfieldID Lz4Decompressor_compressedDirectBufLen;
static jfieldID Lz4Decompressor_uncompressedDirectBuf;
static jfieldID Lz4Decompressor_directBufferSize;
JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Decompressor_initIDs
(JNIEnv *env, jclass clazz){
Lz4Decompressor_compressedDirectBuf = (*env)->GetFieldID(env,clazz,
"compressedDirectBuf",
"Ljava/nio/Buffer;");
Lz4Decompressor_compressedDirectBufLen = (*env)->GetFieldID(env,clazz,
"compressedDirectBufLen", "I");
Lz4Decompressor_uncompressedDirectBuf = (*env)->GetFieldID(env,clazz,
"uncompressedDirectBuf",
"Ljava/nio/Buffer;");
Lz4Decompressor_directBufferSize = (*env)->GetFieldID(env, clazz,
"directBufferSize", "I");
}
JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_lz4_Lz4Decompressor_decompressBytesDirect
(JNIEnv *env, jobject thisj){
const char *compressed_bytes;
char *uncompressed_bytes;
// Get members of Lz4Decompressor
jobject compressed_direct_buf = (*env)->GetObjectField(env,thisj, Lz4Decompressor_compressedDirectBuf);
jint compressed_direct_buf_len = (*env)->GetIntField(env,thisj, Lz4Decompressor_compressedDirectBufLen);
jobject uncompressed_direct_buf = (*env)->GetObjectField(env,thisj, Lz4Decompressor_uncompressedDirectBuf);
size_t uncompressed_direct_buf_len = (*env)->GetIntField(env, thisj, Lz4Decompressor_directBufferSize);
// Get the input direct buffer
compressed_bytes = (const char*)(*env)->GetDirectBufferAddress(env, compressed_direct_buf);
if (compressed_bytes == 0) {
return (jint)0;
}
// Get the output direct buffer
uncompressed_bytes = (char *)(*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
if (uncompressed_bytes == 0) {
return (jint)0;
}
uncompressed_direct_buf_len = LZ4_decompress_safe(compressed_bytes, uncompressed_bytes, compressed_direct_buf_len, uncompressed_direct_buf_len);
if (uncompressed_direct_buf_len < 0) {
THROW(env, "java/lang/InternalError", "LZ4_uncompress_unknownOutputSize failed.");
}
(*env)->SetIntField(env, thisj, Lz4Decompressor_compressedDirectBufLen, 0);
return (jint)uncompressed_direct_buf_len;
}

View File

@ -1,438 +0,0 @@
/*
LZ4 HC - High Compression Mode of LZ4
Header File
Copyright (C) 2011-2017, Yann Collet.
BSD 2-Clause License (http://www.opensource.org/licenses/bsd-license.php)
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
You can contact the author at :
- LZ4 source repository : https://github.com/lz4/lz4
- LZ4 public forum : https://groups.google.com/forum/#!forum/lz4c
*/
#ifndef LZ4_HC_H_19834876238432
#define LZ4_HC_H_19834876238432
#if defined (__cplusplus)
extern "C" {
#endif
/* --- Dependency --- */
/* note : lz4hc requires lz4.h/lz4.c for compilation */
#include "lz4.h" /* stddef, LZ4LIB_API, LZ4_DEPRECATED */
/* --- Useful constants --- */
#define LZ4HC_CLEVEL_MIN 3
#define LZ4HC_CLEVEL_DEFAULT 9
#define LZ4HC_CLEVEL_OPT_MIN 10
#define LZ4HC_CLEVEL_MAX 12
/*-************************************
* Block Compression
**************************************/
/*! LZ4_compress_HC() :
* Compress data from `src` into `dst`, using the powerful but slower "HC" algorithm.
* `dst` must be already allocated.
* Compression is guaranteed to succeed if `dstCapacity >= LZ4_compressBound(srcSize)` (see "lz4.h")
* Max supported `srcSize` value is LZ4_MAX_INPUT_SIZE (see "lz4.h")
* `compressionLevel` : any value between 1 and LZ4HC_CLEVEL_MAX will work.
* Values > LZ4HC_CLEVEL_MAX behave the same as LZ4HC_CLEVEL_MAX.
* @return : the number of bytes written into 'dst'
* or 0 if compression fails.
*/
LZ4LIB_API int LZ4_compress_HC (const char* src, char* dst, int srcSize, int dstCapacity, int compressionLevel);
/* Note :
* Decompression functions are provided within "lz4.h" (BSD license)
*/
/*! LZ4_compress_HC_extStateHC() :
* Same as LZ4_compress_HC(), but using an externally allocated memory segment for `state`.
* `state` size is provided by LZ4_sizeofStateHC().
* Memory segment must be aligned on 8-bytes boundaries (which a normal malloc() should do properly).
*/
LZ4LIB_API int LZ4_sizeofStateHC(void);
LZ4LIB_API int LZ4_compress_HC_extStateHC(void* stateHC, const char* src, char* dst, int srcSize, int maxDstSize, int compressionLevel);
/*! LZ4_compress_HC_destSize() : v1.9.0+
* Will compress as much data as possible from `src`
* to fit into `targetDstSize` budget.
* Result is provided in 2 parts :
* @return : the number of bytes written into 'dst' (necessarily <= targetDstSize)
* or 0 if compression fails.
* `srcSizePtr` : on success, *srcSizePtr is updated to indicate how much bytes were read from `src`
*/
LZ4LIB_API int LZ4_compress_HC_destSize(void* stateHC,
const char* src, char* dst,
int* srcSizePtr, int targetDstSize,
int compressionLevel);
/*-************************************
* Streaming Compression
* Bufferless synchronous API
**************************************/
typedef union LZ4_streamHC_u LZ4_streamHC_t; /* incomplete type (defined later) */
/*! LZ4_createStreamHC() and LZ4_freeStreamHC() :
* These functions create and release memory for LZ4 HC streaming state.
* Newly created states are automatically initialized.
* A same state can be used multiple times consecutively,
* starting with LZ4_resetStreamHC_fast() to start a new stream of blocks.
*/
LZ4LIB_API LZ4_streamHC_t* LZ4_createStreamHC(void);
LZ4LIB_API int LZ4_freeStreamHC (LZ4_streamHC_t* streamHCPtr);
/*
These functions compress data in successive blocks of any size,
using previous blocks as dictionary, to improve compression ratio.
One key assumption is that previous blocks (up to 64 KB) remain read-accessible while compressing next blocks.
There is an exception for ring buffers, which can be smaller than 64 KB.
Ring-buffer scenario is automatically detected and handled within LZ4_compress_HC_continue().
Before starting compression, state must be allocated and properly initialized.
LZ4_createStreamHC() does both, though compression level is set to LZ4HC_CLEVEL_DEFAULT.
Selecting the compression level can be done with LZ4_resetStreamHC_fast() (starts a new stream)
or LZ4_setCompressionLevel() (anytime, between blocks in the same stream) (experimental).
LZ4_resetStreamHC_fast() only works on states which have been properly initialized at least once,
which is automatically the case when state is created using LZ4_createStreamHC().
After reset, a first "fictional block" can be designated as initial dictionary,
using LZ4_loadDictHC() (Optional).
Invoke LZ4_compress_HC_continue() to compress each successive block.
The number of blocks is unlimited.
Previous input blocks, including initial dictionary when present,
must remain accessible and unmodified during compression.
It's allowed to update compression level anytime between blocks,
using LZ4_setCompressionLevel() (experimental).
'dst' buffer should be sized to handle worst case scenarios
(see LZ4_compressBound(), it ensures compression success).
In case of failure, the API does not guarantee recovery,
so the state _must_ be reset.
To ensure compression success
whenever `dst` buffer size cannot be made >= LZ4_compressBound(),
consider using LZ4_compress_HC_continue_destSize().
Whenever previous input blocks can't be preserved unmodified in-place during compression of next blocks,
it's possible to copy the last blocks into a more stable memory space, using LZ4_saveDictHC().
Return value of LZ4_saveDictHC() is the size of dictionary effectively saved into 'safeBuffer' (<= 64 KB)
After completing a streaming compression,
it's possible to start a new stream of blocks, using the same LZ4_streamHC_t state,
just by resetting it, using LZ4_resetStreamHC_fast().
*/
LZ4LIB_API void LZ4_resetStreamHC_fast(LZ4_streamHC_t* streamHCPtr, int compressionLevel); /* v1.9.0+ */
LZ4LIB_API int LZ4_loadDictHC (LZ4_streamHC_t* streamHCPtr, const char* dictionary, int dictSize);
LZ4LIB_API int LZ4_compress_HC_continue (LZ4_streamHC_t* streamHCPtr,
const char* src, char* dst,
int srcSize, int maxDstSize);
/*! LZ4_compress_HC_continue_destSize() : v1.9.0+
* Similar to LZ4_compress_HC_continue(),
* but will read as much data as possible from `src`
* to fit into `targetDstSize` budget.
* Result is provided into 2 parts :
* @return : the number of bytes written into 'dst' (necessarily <= targetDstSize)
* or 0 if compression fails.
* `srcSizePtr` : on success, *srcSizePtr will be updated to indicate how much bytes were read from `src`.
* Note that this function may not consume the entire input.
*/
LZ4LIB_API int LZ4_compress_HC_continue_destSize(LZ4_streamHC_t* LZ4_streamHCPtr,
const char* src, char* dst,
int* srcSizePtr, int targetDstSize);
LZ4LIB_API int LZ4_saveDictHC (LZ4_streamHC_t* streamHCPtr, char* safeBuffer, int maxDictSize);
/*^**********************************************
* !!!!!! STATIC LINKING ONLY !!!!!!
***********************************************/
/*-******************************************************************
* PRIVATE DEFINITIONS :
* Do not use these definitions directly.
* They are merely exposed to allow static allocation of `LZ4_streamHC_t`.
* Declare an `LZ4_streamHC_t` directly, rather than any type below.
* Even then, only do so in the context of static linking, as definitions may change between versions.
********************************************************************/
#define LZ4HC_DICTIONARY_LOGSIZE 16
#define LZ4HC_MAXD (1<<LZ4HC_DICTIONARY_LOGSIZE)
#define LZ4HC_MAXD_MASK (LZ4HC_MAXD - 1)
#define LZ4HC_HASH_LOG 15
#define LZ4HC_HASHTABLESIZE (1 << LZ4HC_HASH_LOG)
#define LZ4HC_HASH_MASK (LZ4HC_HASHTABLESIZE - 1)
#if defined(__cplusplus) || (defined (__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L) /* C99 */)
#include <stdint.h>
typedef struct LZ4HC_CCtx_internal LZ4HC_CCtx_internal;
struct LZ4HC_CCtx_internal
{
uint32_t hashTable[LZ4HC_HASHTABLESIZE];
uint16_t chainTable[LZ4HC_MAXD];
const uint8_t* end; /* next block here to continue on current prefix */
const uint8_t* base; /* All index relative to this position */
const uint8_t* dictBase; /* alternate base for extDict */
uint32_t dictLimit; /* below that point, need extDict */
uint32_t lowLimit; /* below that point, no more dict */
uint32_t nextToUpdate; /* index from which to continue dictionary update */
short compressionLevel;
int8_t favorDecSpeed; /* favor decompression speed if this flag set,
otherwise, favor compression ratio */
int8_t dirty; /* stream has to be fully reset if this flag is set */
const LZ4HC_CCtx_internal* dictCtx;
};
#else
typedef struct LZ4HC_CCtx_internal LZ4HC_CCtx_internal;
struct LZ4HC_CCtx_internal
{
unsigned int hashTable[LZ4HC_HASHTABLESIZE];
unsigned short chainTable[LZ4HC_MAXD];
const unsigned char* end; /* next block here to continue on current prefix */
const unsigned char* base; /* All index relative to this position */
const unsigned char* dictBase; /* alternate base for extDict */
unsigned int dictLimit; /* below that point, need extDict */
unsigned int lowLimit; /* below that point, no more dict */
unsigned int nextToUpdate; /* index from which to continue dictionary update */
short compressionLevel;
char favorDecSpeed; /* favor decompression speed if this flag set,
otherwise, favor compression ratio */
char dirty; /* stream has to be fully reset if this flag is set */
const LZ4HC_CCtx_internal* dictCtx;
};
#endif
/* Do not use these definitions directly !
* Declare or allocate an LZ4_streamHC_t instead.
*/
#define LZ4_STREAMHCSIZE (4*LZ4HC_HASHTABLESIZE + 2*LZ4HC_MAXD + 56 + ((sizeof(void*)==16) ? 56 : 0) /* AS400*/ ) /* 262200 or 262256*/
#define LZ4_STREAMHCSIZE_SIZET (LZ4_STREAMHCSIZE / sizeof(size_t))
union LZ4_streamHC_u {
size_t table[LZ4_STREAMHCSIZE_SIZET];
LZ4HC_CCtx_internal internal_donotuse;
}; /* previously typedef'd to LZ4_streamHC_t */
/* LZ4_streamHC_t :
* This structure allows static allocation of LZ4 HC streaming state.
* This can be used to allocate statically, on state, or as part of a larger structure.
*
* Such state **must** be initialized using LZ4_initStreamHC() before first use.
*
* Note that invoking LZ4_initStreamHC() is not required when
* the state was created using LZ4_createStreamHC() (which is recommended).
* Using the normal builder, a newly created state is automatically initialized.
*
* Static allocation shall only be used in combination with static linking.
*/
/* LZ4_initStreamHC() : v1.9.0+
* Required before first use of a statically allocated LZ4_streamHC_t.
* Before v1.9.0 : use LZ4_resetStreamHC() instead
*/
LZ4LIB_API LZ4_streamHC_t* LZ4_initStreamHC (void* buffer, size_t size);
/*-************************************
* Deprecated Functions
**************************************/
/* see lz4.h LZ4_DISABLE_DEPRECATE_WARNINGS to turn off deprecation warnings */
/* deprecated compression functions */
LZ4_DEPRECATED("use LZ4_compress_HC() instead") LZ4LIB_API int LZ4_compressHC (const char* source, char* dest, int inputSize);
LZ4_DEPRECATED("use LZ4_compress_HC() instead") LZ4LIB_API int LZ4_compressHC_limitedOutput (const char* source, char* dest, int inputSize, int maxOutputSize);
LZ4_DEPRECATED("use LZ4_compress_HC() instead") LZ4LIB_API int LZ4_compressHC2 (const char* source, char* dest, int inputSize, int compressionLevel);
LZ4_DEPRECATED("use LZ4_compress_HC() instead") LZ4LIB_API int LZ4_compressHC2_limitedOutput(const char* source, char* dest, int inputSize, int maxOutputSize, int compressionLevel);
LZ4_DEPRECATED("use LZ4_compress_HC_extStateHC() instead") LZ4LIB_API int LZ4_compressHC_withStateHC (void* state, const char* source, char* dest, int inputSize);
LZ4_DEPRECATED("use LZ4_compress_HC_extStateHC() instead") LZ4LIB_API int LZ4_compressHC_limitedOutput_withStateHC (void* state, const char* source, char* dest, int inputSize, int maxOutputSize);
LZ4_DEPRECATED("use LZ4_compress_HC_extStateHC() instead") LZ4LIB_API int LZ4_compressHC2_withStateHC (void* state, const char* source, char* dest, int inputSize, int compressionLevel);
LZ4_DEPRECATED("use LZ4_compress_HC_extStateHC() instead") LZ4LIB_API int LZ4_compressHC2_limitedOutput_withStateHC(void* state, const char* source, char* dest, int inputSize, int maxOutputSize, int compressionLevel);
LZ4_DEPRECATED("use LZ4_compress_HC_continue() instead") LZ4LIB_API int LZ4_compressHC_continue (LZ4_streamHC_t* LZ4_streamHCPtr, const char* source, char* dest, int inputSize);
LZ4_DEPRECATED("use LZ4_compress_HC_continue() instead") LZ4LIB_API int LZ4_compressHC_limitedOutput_continue (LZ4_streamHC_t* LZ4_streamHCPtr, const char* source, char* dest, int inputSize, int maxOutputSize);
/* Obsolete streaming functions; degraded functionality; do not use!
*
* In order to perform streaming compression, these functions depended on data
* that is no longer tracked in the state. They have been preserved as well as
* possible: using them will still produce a correct output. However, use of
* LZ4_slideInputBufferHC() will truncate the history of the stream, rather
* than preserve a window-sized chunk of history.
*/
LZ4_DEPRECATED("use LZ4_createStreamHC() instead") LZ4LIB_API void* LZ4_createHC (const char* inputBuffer);
LZ4_DEPRECATED("use LZ4_saveDictHC() instead") LZ4LIB_API char* LZ4_slideInputBufferHC (void* LZ4HC_Data);
LZ4_DEPRECATED("use LZ4_freeStreamHC() instead") LZ4LIB_API int LZ4_freeHC (void* LZ4HC_Data);
LZ4_DEPRECATED("use LZ4_compress_HC_continue() instead") LZ4LIB_API int LZ4_compressHC2_continue (void* LZ4HC_Data, const char* source, char* dest, int inputSize, int compressionLevel);
LZ4_DEPRECATED("use LZ4_compress_HC_continue() instead") LZ4LIB_API int LZ4_compressHC2_limitedOutput_continue (void* LZ4HC_Data, const char* source, char* dest, int inputSize, int maxOutputSize, int compressionLevel);
LZ4_DEPRECATED("use LZ4_createStreamHC() instead") LZ4LIB_API int LZ4_sizeofStreamStateHC(void);
LZ4_DEPRECATED("use LZ4_initStreamHC() instead") LZ4LIB_API int LZ4_resetStreamStateHC(void* state, char* inputBuffer);
/* LZ4_resetStreamHC() is now replaced by LZ4_initStreamHC().
* The intention is to emphasize the difference with LZ4_resetStreamHC_fast(),
* which is now the recommended function to start a new stream of blocks,
* but cannot be used to initialize a memory segment containing arbitrary garbage data.
*
* It is recommended to switch to LZ4_initStreamHC().
* LZ4_resetStreamHC() will generate deprecation warnings in a future version.
*/
LZ4LIB_API void LZ4_resetStreamHC (LZ4_streamHC_t* streamHCPtr, int compressionLevel);
#if defined (__cplusplus)
}
#endif
#endif /* LZ4_HC_H_19834876238432 */
/*-**************************************************
* !!!!! STATIC LINKING ONLY !!!!!
* Following definitions are considered experimental.
* They should not be linked from DLL,
* as there is no guarantee of API stability yet.
* Prototypes will be promoted to "stable" status
* after successfull usage in real-life scenarios.
***************************************************/
#ifdef LZ4_HC_STATIC_LINKING_ONLY /* protection macro */
#ifndef LZ4_HC_SLO_098092834
#define LZ4_HC_SLO_098092834
#define LZ4_STATIC_LINKING_ONLY /* LZ4LIB_STATIC_API */
#include "lz4.h"
#if defined (__cplusplus)
extern "C" {
#endif
/*! LZ4_setCompressionLevel() : v1.8.0+ (experimental)
* It's possible to change compression level
* between successive invocations of LZ4_compress_HC_continue*()
* for dynamic adaptation.
*/
LZ4LIB_STATIC_API void LZ4_setCompressionLevel(
LZ4_streamHC_t* LZ4_streamHCPtr, int compressionLevel);
/*! LZ4_favorDecompressionSpeed() : v1.8.2+ (experimental)
* Opt. Parser will favor decompression speed over compression ratio.
* Only applicable to levels >= LZ4HC_CLEVEL_OPT_MIN.
*/
LZ4LIB_STATIC_API void LZ4_favorDecompressionSpeed(
LZ4_streamHC_t* LZ4_streamHCPtr, int favor);
/*! LZ4_resetStreamHC_fast() : v1.9.0+
* When an LZ4_streamHC_t is known to be in a internally coherent state,
* it can often be prepared for a new compression with almost no work, only
* sometimes falling back to the full, expensive reset that is always required
* when the stream is in an indeterminate state (i.e., the reset performed by
* LZ4_resetStreamHC()).
*
* LZ4_streamHCs are guaranteed to be in a valid state when:
* - returned from LZ4_createStreamHC()
* - reset by LZ4_resetStreamHC()
* - memset(stream, 0, sizeof(LZ4_streamHC_t))
* - the stream was in a valid state and was reset by LZ4_resetStreamHC_fast()
* - the stream was in a valid state and was then used in any compression call
* that returned success
* - the stream was in an indeterminate state and was used in a compression
* call that fully reset the state (LZ4_compress_HC_extStateHC()) and that
* returned success
*
* Note:
* A stream that was last used in a compression call that returned an error
* may be passed to this function. However, it will be fully reset, which will
* clear any existing history and settings from the context.
*/
LZ4LIB_STATIC_API void LZ4_resetStreamHC_fast(
LZ4_streamHC_t* LZ4_streamHCPtr, int compressionLevel);
/*! LZ4_compress_HC_extStateHC_fastReset() :
* A variant of LZ4_compress_HC_extStateHC().
*
* Using this variant avoids an expensive initialization step. It is only safe
* to call if the state buffer is known to be correctly initialized already
* (see above comment on LZ4_resetStreamHC_fast() for a definition of
* "correctly initialized"). From a high level, the difference is that this
* function initializes the provided state with a call to
* LZ4_resetStreamHC_fast() while LZ4_compress_HC_extStateHC() starts with a
* call to LZ4_resetStreamHC().
*/
LZ4LIB_STATIC_API int LZ4_compress_HC_extStateHC_fastReset (
void* state,
const char* src, char* dst,
int srcSize, int dstCapacity,
int compressionLevel);
/*! LZ4_attach_HC_dictionary() :
* This is an experimental API that allows for the efficient use of a
* static dictionary many times.
*
* Rather than re-loading the dictionary buffer into a working context before
* each compression, or copying a pre-loaded dictionary's LZ4_streamHC_t into a
* working LZ4_streamHC_t, this function introduces a no-copy setup mechanism,
* in which the working stream references the dictionary stream in-place.
*
* Several assumptions are made about the state of the dictionary stream.
* Currently, only streams which have been prepared by LZ4_loadDictHC() should
* be expected to work.
*
* Alternatively, the provided dictionary stream pointer may be NULL, in which
* case any existing dictionary stream is unset.
*
* A dictionary should only be attached to a stream without any history (i.e.,
* a stream that has just been reset).
*
* The dictionary will remain attached to the working stream only for the
* current stream session. Calls to LZ4_resetStreamHC(_fast) will remove the
* dictionary context association from the working stream. The dictionary
* stream (and source buffer) must remain in-place / accessible / unchanged
* through the lifetime of the stream session.
*/
LZ4LIB_STATIC_API void LZ4_attach_HC_dictionary(
LZ4_streamHC_t *working_stream,
const LZ4_streamHC_t *dictionary_stream);
#if defined (__cplusplus)
}
#endif
#endif /* LZ4_HC_SLO_098092834 */
#endif /* LZ4_HC_STATIC_LINKING_ONLY */

View File

@ -473,8 +473,7 @@ public class CompressDecompressTester<T extends Compressor, E extends Decompress
private static <T extends Compressor, E extends Decompressor> boolean isAvailable(TesterPair<T, E> pair) {
Compressor compressor = pair.compressor;
if (compressor.getClass().isAssignableFrom(Lz4Compressor.class)
&& (NativeCodeLoader.isNativeCodeLoaded()))
if (compressor.getClass().isAssignableFrom(Lz4Compressor.class))
return true;
else if (compressor.getClass().isAssignableFrom(BuiltInZlibDeflater.class)

View File

@ -140,22 +140,16 @@ public class TestCodec {
@Test
public void testLz4Codec() throws IOException {
if (NativeCodeLoader.isNativeCodeLoaded()) {
if (Lz4Codec.isNativeCodeLoaded()) {
conf.setBoolean(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_USELZ4HC_KEY,
false);
codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.Lz4Codec");
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.Lz4Codec");
conf.setBoolean(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_USELZ4HC_KEY,
true);
codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.Lz4Codec");
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.Lz4Codec");
} else {
Assert.fail("Native hadoop library available but lz4 not");
}
}
conf.setBoolean(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_USELZ4HC_KEY,
false);
codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.Lz4Codec");
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.Lz4Codec");
conf.setBoolean(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZ4_USELZ4HC_KEY,
true);
codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.Lz4Codec");
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.Lz4Codec");
}
@Test

View File

@ -27,17 +27,20 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.BlockCompressorStream;
import org.apache.hadoop.io.compress.BlockDecompressorStream;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.lz4.Lz4Compressor;
import org.apache.hadoop.io.compress.lz4.Lz4Decompressor;
import org.apache.hadoop.test.MultithreadedTestUtil;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assume.*;
@ -45,12 +48,7 @@ public class TestLz4CompressorDecompressor {
private static final Random rnd = new Random(12345l);
@Before
public void before() {
assumeTrue(Lz4Codec.isNativeCodeLoaded());
}
//test on NullPointerException in {@code compressor.setInput()}
//test on NullPointerException in {@code compressor.setInput()}
@Test
public void testCompressorSetInputNullPointerException() {
try {
@ -330,4 +328,36 @@ public class TestLz4CompressorDecompressor {
ctx.waitFor(60000);
}
@Test
public void testLz4Compatibility() throws Exception {
// The sequence file was created using native Lz4 codec before HADOOP-17292.
// After we use lz4-java for lz4 compression, this test makes sure we can
// decompress the sequence file correctly.
Path filePath = new Path(TestLz4CompressorDecompressor.class
.getResource("/lz4/sequencefile").toURI());
Configuration conf = new Configuration();
conf.setInt("io.seqfile.compress.blocksize", 1000);
FileSystem fs = FileSystem.get(conf);
int lines = 2000;
SequenceFile.Reader reader = new SequenceFile.Reader(fs, filePath, conf);
Writable key = (Writable)reader.getKeyClass().newInstance();
Writable value = (Writable)reader.getValueClass().newInstance();
int lc = 0;
try {
while (reader.next(key, value)) {
assertEquals("key" + lc, key.toString());
assertEquals("value" + lc, value.toString());
lc++;
}
} finally {
reader.close();
}
assertEquals(lines, lc);
}
}

View File

@ -21,7 +21,6 @@ import org.junit.Test;
import static org.junit.Assert.*;
import org.apache.hadoop.crypto.OpensslCipher;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
import org.apache.hadoop.util.NativeCodeLoader;
import org.slf4j.Logger;
@ -54,7 +53,6 @@ public class TestNativeCodeLoader {
if (NativeCodeLoader.buildSupportsOpenssl()) {
assertFalse(OpensslCipher.getLibraryName().isEmpty());
}
assertFalse(Lz4Codec.getLibraryName().isEmpty());
LOG.info("TestNativeCodeLoader: libhadoop.so is loaded.");
}
}

View File

@ -71,6 +71,11 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
@ -156,9 +161,9 @@
</goals>
<configuration>
<target>
<copy file="${basedir}/../../../hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4.h"
<copy file="${basedir}/src/main/native/lz4/lz4.h"
todir="${project.build.directory}/native/" />
<copy file="${basedir}/../../../hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/lz4/lz4.c"
<copy file="${basedir}/src/main/native/lz4/lz4.c"
todir="${project.build.directory}/native/" />
<copy todir="${project.build.directory}/native/test/testData"
overwrite="true">

View File

@ -144,6 +144,7 @@
<netty3.version>3.10.6.Final</netty3.version>
<netty4.version>4.1.50.Final</netty4.version>
<snappy-java.version>1.1.8.1</snappy-java.version>
<lz4-java.version>1.7.1</lz4-java.version>
<!-- Maven protoc compiler -->
<protobuf-maven-plugin.version>0.5.1</protobuf-maven-plugin.version>
@ -1792,6 +1793,11 @@
<artifactId>snappy-java</artifactId>
<version>${snappy-java.version}</version>
</dependency>
<dependency>
<groupId>org.lz4</groupId>
<artifactId>lz4-java</artifactId>
<version>${lz4-java.version}</version>
</dependency>
</dependencies>
</dependencyManagement>