HADOOP-13578. Add Codec for ZStandard Compression. Contributed by churro morales

This commit is contained in:
Jason Lowe 2017-01-04 14:46:44 +00:00
parent 3207762192
commit db947fb870
25 changed files with 2107 additions and 4 deletions

View File

@ -77,6 +77,8 @@ Optional packages:
$ sudo apt-get install libjansson-dev
* Linux FUSE
$ sudo apt-get install fuse libfuse-dev
* ZStandard compression
$ sudo apt-get install zstd
----------------------------------------------------------------------------------
Maven main modules:
@ -148,6 +150,28 @@ Maven build goals:
and it ignores the -Dsnappy.prefix option. If -Dsnappy.lib isn't given, the
bundling and building will fail.
ZStandard build options:
ZStandard is a compression library that can be utilized by the native code.
It is currently an optional component, meaning that Hadoop can be built with
or without this dependency.
* Use -Drequire.zstd to fail the build if libzstd.so is not found.
If this option is not specified and the zstd library is missing.
* Use -Dzstd.prefix to specify a nonstandard location for the libzstd
header files and library files. You do not need this option if you have
installed zstandard using a package manager.
* Use -Dzstd.lib to specify a nonstandard location for the libzstd library
files. Similarly to zstd.prefix, you do not need this option if you have
installed using a package manager.
* Use -Dbundle.zstd to copy the contents of the zstd.lib directory into
the final tar file. This option requires that -Dzstd.lib is also given,
and it ignores the -Dzstd.prefix option. If -Dzstd.lib isn't given, the
bundling and building will fail.
OpenSSL build options:
OpenSSL includes a crypto library that can be utilized by the native code.

View File

@ -514,6 +514,10 @@
<snappy.lib></snappy.lib>
<snappy.include></snappy.include>
<require.snappy>false</require.snappy>
<zstd.prefix></zstd.prefix>
<zstd.lib></zstd.lib>
<zstd.include></zstd.include>
<require.zstd>false</require.zstd>
<openssl.prefix></openssl.prefix>
<openssl.lib></openssl.lib>
<openssl.include></openssl.include>
@ -568,6 +572,8 @@
<javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyCompressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardCompressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardDecompressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Compressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName>
<javahClassName>org.apache.hadoop.crypto.OpensslCipher</javahClassName>
@ -599,6 +605,10 @@
<CUSTOM_SNAPPY_PREFIX>${snappy.prefix}</CUSTOM_SNAPPY_PREFIX>
<CUSTOM_SNAPPY_LIB>${snappy.lib} </CUSTOM_SNAPPY_LIB>
<CUSTOM_SNAPPY_INCLUDE>${snappy.include} </CUSTOM_SNAPPY_INCLUDE>
<REQUIRE_ZSTD>${require.zstd}</REQUIRE_ZSTD>
<CUSTOM_ZSTD_PREFIX>${zstd.prefix}</CUSTOM_ZSTD_PREFIX>
<CUSTOM_ZSTD_LIB>${zstd.lib}</CUSTOM_ZSTD_LIB>
<CUSTOM_ZSTD_INCLUDE>${zstd.include}</CUSTOM_ZSTD_INCLUDE>
<REQUIRE_OPENSSL>${require.openssl} </REQUIRE_OPENSSL>
<CUSTOM_OPENSSL_PREFIX>${openssl.prefix} </CUSTOM_OPENSSL_PREFIX>
<CUSTOM_OPENSSL_LIB>${openssl.lib} </CUSTOM_OPENSSL_LIB>
@ -636,6 +646,11 @@
<snappy.include></snappy.include>
<require.snappy>false</require.snappy>
<bundle.snappy.in.bin>true</bundle.snappy.in.bin>
<zstd.prefix></zstd.prefix>
<zstd.lib></zstd.lib>
<zstd.include></zstd.include>
<require.ztsd>false</require.ztsd>
<bundle.zstd.in.bin>true</bundle.zstd.in.bin>
<openssl.prefix></openssl.prefix>
<openssl.lib></openssl.lib>
<openssl.include></openssl.include>
@ -685,6 +700,8 @@
<javahClassName>org.apache.hadoop.security.JniBasedUnixGroupsNetgroupMapping</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyCompressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.snappy.SnappyDecompressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardCompressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.zstd.ZStandardDecompressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Compressor</javahClassName>
<javahClassName>org.apache.hadoop.io.compress.lz4.Lz4Decompressor</javahClassName>
<javahClassName>org.apache.hadoop.crypto.OpensslCipher</javahClassName>
@ -736,6 +753,10 @@
<argument>/p:CustomSnappyLib=${snappy.lib}</argument>
<argument>/p:CustomSnappyInclude=${snappy.include}</argument>
<argument>/p:RequireSnappy=${require.snappy}</argument>
<argument>/p:CustomZstdPrefix=${zstd.prefix}</argument>
<argument>/p:CustomZstdLib=${zstd.lib}</argument>
<argument>/p:CustomZstdInclude=${zstd.include}</argument>
<argument>/p:RequireZstd=${require.ztsd}</argument>
<argument>/p:CustomOpensslPrefix=${openssl.prefix}</argument>
<argument>/p:CustomOpensslLib=${openssl.lib}</argument>
<argument>/p:CustomOpensslInclude=${openssl.include}</argument>

View File

@ -94,6 +94,31 @@ else()
endif()
endif()
# Require Zstandard
SET(STORED_CMAKE_FIND_LIBRARY_SUFFIXES ${CMAKE_FIND_LIBRARY_SUFFIXES})
hadoop_set_find_shared_library_version("1")
find_library(ZSTD_LIBRARY
NAMES zstd
PATHS ${CUSTOM_ZSTD_PREFIX} ${CUSTOM_ZSTD_PREFIX}/lib
${CUSTOM_ZSTD_PREFIX}/lib64 ${CUSTOM_ZSTD_LIB})
SET(CMAKE_FIND_LIBRARY_SUFFIXES ${STORED_CMAKE_FIND_LIBRARY_SUFFIXES})
find_path(ZSTD_INCLUDE_DIR
NAMES zstd.h
PATHS ${CUSTOM_ZSTD_PREFIX} ${CUSTOM_ZSTD_PREFIX}/include
${CUSTOM_ZSTD_INCLUDE})
if (ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR)
GET_FILENAME_COMPONENT(HADOOP_ZSTD_LIBRARY ${ZSTD_LIBRARY} NAME)
set(ZSTD_SOURCE_FILES
"${SRC}/io/compress/zstd/ZStandardCompressor.c"
"${SRC}/io/compress/zstd/ZStandardDecompressor.c")
else (ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR)
set(ZSTD_INCLUDE_DIR "")
set(ZSTD_SOURCE_FILES "")
IF(REQUIRE_ZSTD)
MESSAGE(FATAL_ERROR "Required zstandard library could not be found. ZSTD_LIBRARY=${ZSTD_LIBRARY}, ZSTD_INCLUDE_DIR=${ZSTD_INCLUDE_DIR}, CUSTOM_ZSTD_INCLUDE_DIR=${CUSTOM_ZSTD_INCLUDE_DIR}, CUSTOM_ZSTD_PREFIX=${CUSTOM_ZSTD_PREFIX}, CUSTOM_ZSTD_INCLUDE=${CUSTOM_ZSTD_INCLUDE}")
ENDIF(REQUIRE_ZSTD)
endif (ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR)
# Build hardware CRC32 acceleration, if supported on the platform.
if(CMAKE_SYSTEM_PROCESSOR MATCHES "^i.86$" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" OR CMAKE_SYSTEM_PROCESSOR STREQUAL "amd64")
set(BULK_CRC_ARCH_SOURCE_FIlE "${SRC}/util/bulk_crc32_x86.c")
@ -169,6 +194,7 @@ include_directories(
${ZLIB_INCLUDE_DIRS}
${BZIP2_INCLUDE_DIR}
${SNAPPY_INCLUDE_DIR}
${ZSTD_INCLUDE_DIR}
${OPENSSL_INCLUDE_DIR}
${SRC}/util
)
@ -182,6 +208,7 @@ hadoop_add_dual_library(hadoop
${SRC}/io/compress/lz4/lz4.c
${SRC}/io/compress/lz4/lz4hc.c
${SNAPPY_SOURCE_FILES}
${ZSTD_SOURCE_FILES}
${OPENSSL_SOURCE_FILES}
${SRC}/io/compress/zlib/ZlibCompressor.c
${SRC}/io/compress/zlib/ZlibDecompressor.c

View File

@ -21,6 +21,7 @@
#cmakedefine HADOOP_ZLIB_LIBRARY "@HADOOP_ZLIB_LIBRARY@"
#cmakedefine HADOOP_BZIP2_LIBRARY "@HADOOP_BZIP2_LIBRARY@"
#cmakedefine HADOOP_SNAPPY_LIBRARY "@HADOOP_SNAPPY_LIBRARY@"
#cmakedefine HADOOP_ZSTD_LIBRARY "@HADOOP_ZSTD_LIBRARY@"
#cmakedefine HADOOP_OPENSSL_LIBRARY "@HADOOP_OPENSSL_LIBRARY@"
#cmakedefine HAVE_SYNC_FILE_RANGE
#cmakedefine HAVE_POSIX_FADVISE

View File

@ -141,6 +141,22 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final int IO_COMPRESSION_CODEC_SNAPPY_BUFFERSIZE_DEFAULT =
256 * 1024;
/** ZStandard compression level. */
public static final String IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY =
"io.compression.codec.zstd.level";
/** Default value for IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY. */
public static final int IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT = 3;
/** ZStandard buffer size. */
public static final String IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY =
"io.compression.codec.zstd.buffersize";
/** ZStandard buffer size a value of 0 means use the recommended zstd
* buffer size that the library recommends. */
public static final int
IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT = 0;
/** Internal buffer size for Lz4 compressor/decompressors */
public static final String IO_COMPRESSION_CODEC_LZ4_BUFFERSIZE_KEY =
"io.compression.codec.lz4.buffersize";

View File

@ -95,7 +95,7 @@ public interface Decompressor {
* @param b Buffer for the compressed data
* @param off Start offset of the data
* @param len Size of the buffer
* @return The actual number of bytes of compressed data.
* @return The actual number of bytes of uncompressed data.
* @throws IOException
*/
public int decompress(byte[] b, int off, int len) throws IOException;

View File

@ -0,0 +1,242 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.compress.zstd.ZStandardCompressor;
import org.apache.hadoop.io.compress.zstd.ZStandardDecompressor;
import org.apache.hadoop.util.NativeCodeLoader;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY;
/**
* This class creates zstd compressors/decompressors.
*/
public class ZStandardCodec implements
Configurable, CompressionCodec, DirectDecompressionCodec {
private Configuration conf;
/**
* Set the configuration to be used by this object.
*
* @param conf the configuration object.
*/
@Override
public void setConf(Configuration conf) {
this.conf = conf;
}
/**
* Return the configuration used by this object.
*
* @return the configuration object used by this object.
*/
@Override
public Configuration getConf() {
return conf;
}
public static void checkNativeCodeLoaded() {
if (!NativeCodeLoader.isNativeCodeLoaded() ||
!NativeCodeLoader.buildSupportsZstd()) {
throw new RuntimeException("native zStandard library "
+ "not available: this version of libhadoop was built "
+ "without zstd support.");
}
if (!ZStandardCompressor.isNativeCodeLoaded()) {
throw new RuntimeException("native zStandard library not "
+ "available: ZStandardCompressor has not been loaded.");
}
if (!ZStandardDecompressor.isNativeCodeLoaded()) {
throw new RuntimeException("native zStandard library not "
+ "available: ZStandardDecompressor has not been loaded.");
}
}
public static boolean isNativeCodeLoaded() {
return ZStandardCompressor.isNativeCodeLoaded()
&& ZStandardDecompressor.isNativeCodeLoaded();
}
public static String getLibraryName() {
return ZStandardCompressor.getLibraryName();
}
public static int getCompressionLevel(Configuration conf) {
return conf.getInt(
CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_KEY,
CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT);
}
public static int getCompressionBufferSize(Configuration conf) {
int bufferSize = getBufferSize(conf);
return bufferSize == 0 ?
ZStandardCompressor.getRecommendedBufferSize() :
bufferSize;
}
public static int getDecompressionBufferSize(Configuration conf) {
int bufferSize = getBufferSize(conf);
return bufferSize == 0 ?
ZStandardDecompressor.getRecommendedBufferSize() :
bufferSize;
}
private static int getBufferSize(Configuration conf) {
return conf.getInt(IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_KEY,
IO_COMPRESSION_CODEC_ZSTD_BUFFER_SIZE_DEFAULT);
}
/**
* Create a {@link CompressionOutputStream} that will write to the given
* {@link OutputStream}.
*
* @param out the location for the final output stream
* @return a stream the user can write uncompressed data to have compressed
* @throws IOException
*/
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
return Util.
createOutputStreamWithCodecPool(this, conf, out);
}
/**
* Create a {@link CompressionOutputStream} that will write to the given
* {@link OutputStream} with the given {@link Compressor}.
*
* @param out the location for the final output stream
* @param compressor compressor to use
* @return a stream the user can write uncompressed data to have compressed
* @throws IOException
*/
@Override
public CompressionOutputStream createOutputStream(OutputStream out,
Compressor compressor)
throws IOException {
checkNativeCodeLoaded();
return new CompressorStream(out, compressor,
getCompressionBufferSize(conf));
}
/**
* Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
*
* @return the type of compressor needed by this codec.
*/
@Override
public Class<? extends Compressor> getCompressorType() {
checkNativeCodeLoaded();
return ZStandardCompressor.class;
}
/**
* Create a new {@link Compressor} for use by this {@link CompressionCodec}.
*
* @return a new compressor for use by this codec
*/
@Override
public Compressor createCompressor() {
checkNativeCodeLoaded();
return new ZStandardCompressor(
getCompressionLevel(conf), getCompressionBufferSize(conf));
}
/**
* Create a {@link CompressionInputStream} that will read from the given
* input stream.
*
* @param in the stream to read compressed bytes from
* @return a stream to read uncompressed bytes from
* @throws IOException
*/
@Override
public CompressionInputStream createInputStream(InputStream in)
throws IOException {
return Util.
createInputStreamWithCodecPool(this, conf, in);
}
/**
* Create a {@link CompressionInputStream} that will read from the given
* {@link InputStream} with the given {@link Decompressor}.
*
* @param in the stream to read compressed bytes from
* @param decompressor decompressor to use
* @return a stream to read uncompressed bytes from
* @throws IOException
*/
@Override
public CompressionInputStream createInputStream(InputStream in,
Decompressor decompressor)
throws IOException {
checkNativeCodeLoaded();
return new DecompressorStream(in, decompressor,
getDecompressionBufferSize(conf));
}
/**
* Get the type of {@link Decompressor} needed by
* this {@link CompressionCodec}.
*
* @return the type of decompressor needed by this codec.
*/
@Override
public Class<? extends Decompressor> getDecompressorType() {
checkNativeCodeLoaded();
return ZStandardDecompressor.class;
}
/**
* Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
*
* @return a new decompressor for use by this codec
*/
@Override
public Decompressor createDecompressor() {
checkNativeCodeLoaded();
return new ZStandardDecompressor(getDecompressionBufferSize(conf));
}
/**
* Get the default filename extension for this kind of compression.
*
* @return <code>.zst</code>.
*/
@Override
public String getDefaultExtension() {
return ".zst";
}
@Override
public DirectDecompressor createDirectDecompressor() {
return new ZStandardDecompressor.ZStandardDirectDecompressor(
getDecompressionBufferSize(conf)
);
}
}

View File

@ -0,0 +1,305 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress.zstd;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.ZStandardCodec;
import org.apache.hadoop.util.NativeCodeLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* A {@link Compressor} based on the zStandard compression algorithm.
* https://github.com/facebook/zstd
*/
public class ZStandardCompressor implements Compressor {
private static final Logger LOG =
LoggerFactory.getLogger(ZStandardCompressor.class);
private long stream;
private int level;
private int directBufferSize;
private byte[] userBuf = null;
private int userBufOff = 0, userBufLen = 0;
private ByteBuffer uncompressedDirectBuf = null;
private int uncompressedDirectBufOff = 0, uncompressedDirectBufLen = 0;
private boolean keepUncompressedBuf = false;
private ByteBuffer compressedDirectBuf = null;
private boolean finish, finished;
private long bytesRead = 0;
private long bytesWritten = 0;
private static boolean nativeZStandardLoaded = false;
static {
if (NativeCodeLoader.isNativeCodeLoaded()) {
try {
// Initialize the native library
initIDs();
nativeZStandardLoaded = true;
} catch (Throwable t) {
LOG.warn("Error loading zstandard native libraries: " + t);
}
}
}
public static boolean isNativeCodeLoaded() {
return nativeZStandardLoaded;
}
public static int getRecommendedBufferSize() {
return getStreamSize();
}
@VisibleForTesting
ZStandardCompressor() {
this(CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_LEVEL_DEFAULT,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
}
/**
* Creates a new compressor with the default compression level.
* Compressed data will be generated in ZStandard format.
*/
public ZStandardCompressor(int level, int bufferSize) {
this(level, bufferSize, bufferSize);
}
@VisibleForTesting
ZStandardCompressor(int level, int inputBufferSize, int outputBufferSize) {
this.level = level;
stream = create();
this.directBufferSize = outputBufferSize;
uncompressedDirectBuf = ByteBuffer.allocateDirect(inputBufferSize);
compressedDirectBuf = ByteBuffer.allocateDirect(outputBufferSize);
compressedDirectBuf.position(outputBufferSize);
reset();
}
/**
* Prepare the compressor to be used in a new stream with settings defined in
* the given Configuration. It will reset the compressor's compression level
* and compression strategy.
*
* @param conf Configuration storing new settings
*/
@Override
public void reinit(Configuration conf) {
if (conf == null) {
return;
}
level = ZStandardCodec.getCompressionLevel(conf);
reset();
LOG.debug("Reinit compressor with new compression configuration");
}
@Override
public void setInput(byte[] b, int off, int len) {
if (b == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
this.userBuf = b;
this.userBufOff = off;
this.userBufLen = len;
uncompressedDirectBufOff = 0;
setInputFromSavedData();
compressedDirectBuf.limit(directBufferSize);
compressedDirectBuf.position(directBufferSize);
}
//copy enough data from userBuf to uncompressedDirectBuf
private void setInputFromSavedData() {
int len = Math.min(userBufLen, uncompressedDirectBuf.remaining());
uncompressedDirectBuf.put(userBuf, userBufOff, len);
userBufLen -= len;
userBufOff += len;
uncompressedDirectBufLen = uncompressedDirectBuf.position();
}
@Override
public void setDictionary(byte[] b, int off, int len) {
throw new UnsupportedOperationException(
"Dictionary support is not enabled");
}
@Override
public boolean needsInput() {
// Consume remaining compressed data?
if (compressedDirectBuf.remaining() > 0) {
return false;
}
// have we consumed all input
if (keepUncompressedBuf && uncompressedDirectBufLen > 0) {
return false;
}
if (uncompressedDirectBuf.remaining() > 0) {
// Check if we have consumed all user-input
if (userBufLen <= 0) {
return true;
} else {
// copy enough data from userBuf to uncompressedDirectBuf
setInputFromSavedData();
// uncompressedDirectBuf is not full
return uncompressedDirectBuf.remaining() > 0;
}
}
return false;
}
@Override
public void finish() {
finish = true;
}
@Override
public boolean finished() {
// Check if 'zstd' says its 'finished' and all compressed
// data has been consumed
return (finished && compressedDirectBuf.remaining() == 0);
}
@Override
public int compress(byte[] b, int off, int len) throws IOException {
checkStream();
if (b == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
// Check if there is compressed data
int n = compressedDirectBuf.remaining();
if (n > 0) {
n = Math.min(n, len);
compressedDirectBuf.get(b, off, n);
return n;
}
// Re-initialize the output direct buffer
compressedDirectBuf.rewind();
compressedDirectBuf.limit(directBufferSize);
// Compress data
n = deflateBytesDirect(
uncompressedDirectBuf,
uncompressedDirectBufOff,
uncompressedDirectBufLen,
compressedDirectBuf,
directBufferSize
);
compressedDirectBuf.limit(n);
// Check if we have consumed all input buffer
if (uncompressedDirectBufLen <= 0) {
// consumed all input buffer
keepUncompressedBuf = false;
uncompressedDirectBuf.clear();
uncompressedDirectBufOff = 0;
uncompressedDirectBufLen = 0;
} else {
// did not consume all input buffer
keepUncompressedBuf = true;
}
// Get at most 'len' bytes
n = Math.min(n, len);
compressedDirectBuf.get(b, off, n);
return n;
}
/**
* Returns the total number of compressed bytes output so far.
*
* @return the total (non-negative) number of compressed bytes output so far
*/
@Override
public long getBytesWritten() {
checkStream();
return bytesWritten;
}
/**
* <p>Returns the total number of uncompressed bytes input so far.</p>
*
* @return the total (non-negative) number of uncompressed bytes input so far
*/
@Override
public long getBytesRead() {
checkStream();
return bytesRead;
}
@Override
public void reset() {
checkStream();
init(level, stream);
finish = false;
finished = false;
bytesRead = 0;
bytesWritten = 0;
uncompressedDirectBuf.rewind();
uncompressedDirectBufOff = 0;
uncompressedDirectBufLen = 0;
keepUncompressedBuf = false;
compressedDirectBuf.limit(directBufferSize);
compressedDirectBuf.position(directBufferSize);
userBufOff = 0;
userBufLen = 0;
}
@Override
public void end() {
if (stream != 0) {
end(stream);
stream = 0;
}
}
private void checkStream() {
if (stream == 0) {
throw new NullPointerException();
}
}
private native static long create();
private native static void init(int level, long stream);
private native int deflateBytesDirect(ByteBuffer src, int srcOffset,
int srcLen, ByteBuffer dst, int dstLen);
private static native int getStreamSize();
private native static void end(long strm);
private native static void initIDs();
public native static String getLibraryName();
}

View File

@ -0,0 +1,323 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress.zstd;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DirectDecompressor;
import org.apache.hadoop.util.NativeCodeLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* A {@link Decompressor} based on the zStandard compression algorithm.
* https://github.com/facebook/zstd
*/
public class ZStandardDecompressor implements Decompressor {
private static final Logger LOG =
LoggerFactory.getLogger(ZStandardDecompressor.class);
private long stream;
private int directBufferSize;
private ByteBuffer compressedDirectBuf = null;
private int compressedDirectBufOff, bytesInCompressedBuffer;
private ByteBuffer uncompressedDirectBuf = null;
private byte[] userBuf = null;
private int userBufOff = 0, userBufferBytesToConsume = 0;
private boolean finished;
private int remaining = 0;
private static boolean nativeZStandardLoaded = false;
static {
if (NativeCodeLoader.isNativeCodeLoaded()) {
try {
// Initialize the native library
initIDs();
nativeZStandardLoaded = true;
} catch (Throwable t) {
LOG.warn("Error loading zstandard native libraries: " + t);
}
}
}
public static boolean isNativeCodeLoaded() {
return nativeZStandardLoaded;
}
public static int getRecommendedBufferSize() {
return getStreamSize();
}
public ZStandardDecompressor() {
this(getStreamSize());
}
/**
* Creates a new decompressor.
*/
public ZStandardDecompressor(int bufferSize) {
this.directBufferSize = bufferSize;
compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize);
uncompressedDirectBuf.position(directBufferSize);
stream = create();
reset();
}
@Override
public void setInput(byte[] b, int off, int len) {
if (b == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
this.userBuf = b;
this.userBufOff = off;
this.userBufferBytesToConsume = len;
setInputFromSavedData();
uncompressedDirectBuf.limit(directBufferSize);
uncompressedDirectBuf.position(directBufferSize);
}
private void setInputFromSavedData() {
compressedDirectBufOff = 0;
bytesInCompressedBuffer = userBufferBytesToConsume;
if (bytesInCompressedBuffer > directBufferSize) {
bytesInCompressedBuffer = directBufferSize;
}
compressedDirectBuf.rewind();
compressedDirectBuf.put(
userBuf, userBufOff, bytesInCompressedBuffer);
userBufOff += bytesInCompressedBuffer;
userBufferBytesToConsume -= bytesInCompressedBuffer;
}
// dictionary is not supported
@Override
public void setDictionary(byte[] b, int off, int len) {
throw new UnsupportedOperationException(
"Dictionary support is not enabled");
}
@Override
public boolean needsInput() {
// Consume remaining compressed data?
if (uncompressedDirectBuf.remaining() > 0) {
return false;
}
// Check if we have consumed all input
if (bytesInCompressedBuffer - compressedDirectBufOff <= 0) {
// Check if we have consumed all user-input
if (userBufferBytesToConsume <= 0) {
return true;
} else {
setInputFromSavedData();
}
}
return false;
}
// dictionary is not supported.
@Override
public boolean needsDictionary() {
return false;
}
@Override
public boolean finished() {
// finished == true if ZSTD_decompressStream() returns 0
// also check we have nothing left in our buffer
return (finished && uncompressedDirectBuf.remaining() == 0);
}
@Override
public int decompress(byte[] b, int off, int len)
throws IOException {
checkStream();
if (b == null) {
throw new NullPointerException();
}
if (off < 0 || len < 0 || off > b.length - len) {
throw new ArrayIndexOutOfBoundsException();
}
// Check if there is uncompressed data
int n = uncompressedDirectBuf.remaining();
if (n > 0) {
return populateUncompressedBuffer(b, off, len, n);
}
// Re-initialize the output direct buffer
uncompressedDirectBuf.rewind();
uncompressedDirectBuf.limit(directBufferSize);
// Decompress data
n = inflateBytesDirect(
compressedDirectBuf,
compressedDirectBufOff,
bytesInCompressedBuffer,
uncompressedDirectBuf,
0,
directBufferSize
);
uncompressedDirectBuf.limit(n);
// Get at most 'len' bytes
return populateUncompressedBuffer(b, off, len, n);
}
/**
* <p>Returns the number of bytes remaining in the input buffers;
* normally called when finished() is true to determine amount of post-stream
* data.</p>
*
* @return the total (non-negative) number of unprocessed bytes in input
*/
@Override
public int getRemaining() {
checkStream();
// userBuf + compressedDirectBuf
return userBufferBytesToConsume + remaining;
}
/**
* Resets everything including the input buffers (user and direct).
*/
@Override
public void reset() {
checkStream();
init(stream);
remaining = 0;
finished = false;
compressedDirectBufOff = 0;
bytesInCompressedBuffer = 0;
uncompressedDirectBuf.limit(directBufferSize);
uncompressedDirectBuf.position(directBufferSize);
userBufOff = 0;
userBufferBytesToConsume = 0;
}
@Override
public void end() {
if (stream != 0) {
free(stream);
stream = 0;
}
}
@Override
protected void finalize() {
reset();
}
private void checkStream() {
if (stream == 0) {
throw new NullPointerException("Stream not initialized");
}
}
private int populateUncompressedBuffer(byte[] b, int off, int len, int n) {
n = Math.min(n, len);
uncompressedDirectBuf.get(b, off, n);
return n;
}
private native static void initIDs();
private native static long create();
private native static void init(long stream);
private native int inflateBytesDirect(ByteBuffer src, int srcOffset,
int srcLen, ByteBuffer dst, int dstOffset, int dstLen);
private native static void free(long strm);
private native static int getStreamSize();
int inflateDirect(ByteBuffer src, ByteBuffer dst) throws IOException {
assert
(this instanceof ZStandardDecompressor.ZStandardDirectDecompressor);
int originalPosition = dst.position();
int n = inflateBytesDirect(
src, src.position(), src.remaining(), dst, dst.position(),
dst.remaining()
);
dst.position(originalPosition + n);
if (bytesInCompressedBuffer > 0) {
src.position(compressedDirectBufOff);
} else {
src.position(src.limit());
}
return n;
}
/**
* A {@link DirectDecompressor} for ZStandard
* https://github.com/facebook/zstd.
*/
public static class ZStandardDirectDecompressor
extends ZStandardDecompressor implements DirectDecompressor {
public ZStandardDirectDecompressor(int directBufferSize) {
super(directBufferSize);
}
@Override
public boolean finished() {
return (endOfInput && super.finished());
}
@Override
public void reset() {
super.reset();
endOfInput = true;
}
private boolean endOfInput;
@Override
public void decompress(ByteBuffer src, ByteBuffer dst)
throws IOException {
assert dst.isDirect() : "dst.isDirect()";
assert src.isDirect() : "src.isDirect()";
assert dst.remaining() > 0 : "dst.remaining() > 0";
this.inflateDirect(src, dst);
endOfInput = !src.hasRemaining();
}
@Override
public void setDictionary(byte[] b, int off, int len) {
throw new UnsupportedOperationException(
"byte[] arrays are not supported for DirectDecompressor");
}
@Override
public int decompress(byte[] b, int off, int len) {
throw new UnsupportedOperationException(
"byte[] arrays are not supported for DirectDecompressor");
}
}
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.io.compress.zstd;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -79,6 +79,11 @@ public static boolean isNativeCodeLoaded() {
*/
public static native boolean buildSupportsSnappy();
/**
* Returns true only if this build was compiled with support for ZStandard.
*/
public static native boolean buildSupportsZstd();
/**
* Returns true only if this build was compiled with support for openssl.
*/

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.crypto.OpensslCipher;
import org.apache.hadoop.io.compress.Lz4Codec;
import org.apache.hadoop.io.compress.SnappyCodec;
import org.apache.hadoop.io.compress.ZStandardCodec;
import org.apache.hadoop.io.compress.bzip2.Bzip2Factory;
import org.apache.hadoop.io.compress.zlib.ZlibFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -65,6 +66,7 @@ public static void main(String[] args) {
boolean nativeHadoopLoaded = NativeCodeLoader.isNativeCodeLoaded();
boolean zlibLoaded = false;
boolean snappyLoaded = false;
boolean zStdLoaded = false;
// lz4 is linked within libhadoop
boolean lz4Loaded = nativeHadoopLoaded;
boolean bzip2Loaded = Bzip2Factory.isNativeBzip2Loaded(conf);
@ -75,6 +77,7 @@ public static void main(String[] args) {
String hadoopLibraryName = "";
String zlibLibraryName = "";
String snappyLibraryName = "";
String zstdLibraryName = "";
String lz4LibraryName = "";
String bzip2LibraryName = "";
String winutilsPath = null;
@ -90,6 +93,11 @@ public static void main(String[] args) {
if (snappyLoaded && NativeCodeLoader.buildSupportsSnappy()) {
snappyLibraryName = SnappyCodec.getLibraryName();
}
zStdLoaded = NativeCodeLoader.buildSupportsZstd() &&
ZStandardCodec.isNativeCodeLoaded();
if (zStdLoaded && NativeCodeLoader.buildSupportsZstd()) {
zstdLibraryName = ZStandardCodec.getLibraryName();
}
if (OpensslCipher.getLoadingFailureReason() != null) {
openSslDetail = OpensslCipher.getLoadingFailureReason();
openSslLoaded = false;
@ -122,6 +130,7 @@ public static void main(String[] args) {
System.out.printf("hadoop: %b %s%n", nativeHadoopLoaded, hadoopLibraryName);
System.out.printf("zlib: %b %s%n", zlibLoaded, zlibLibraryName);
System.out.printf("snappy: %b %s%n", snappyLoaded, snappyLibraryName);
System.out.printf("zstd : %b %s%n", zStdLoaded, zstdLibraryName);
System.out.printf("lz4: %b %s%n", lz4Loaded, lz4LibraryName);
System.out.printf("bzip2: %b %s%n", bzip2Loaded, bzip2LibraryName);
System.out.printf("openssl: %b %s%n", openSslLoaded, openSslDetail);
@ -130,7 +139,8 @@ public static void main(String[] args) {
}
if ((!nativeHadoopLoaded) || (Shell.WINDOWS && (!winutilsExists)) ||
(checkAll && !(zlibLoaded && snappyLoaded && lz4Loaded && bzip2Loaded))) {
(checkAll && !(zlibLoaded && snappyLoaded && lz4Loaded && bzip2Loaded
&& zStdLoaded))) {
// return 1 to indicated check failed
ExitUtil.terminate(1);
}

View File

@ -0,0 +1,259 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "org_apache_hadoop_io_compress_zstd.h"
#if defined HADOOP_ZSTD_LIBRARY
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef UNIX
#include <dlfcn.h>
#include "config.h"
#endif
#include "org_apache_hadoop_io_compress_zstd_ZStandardCompressor.h"
static jfieldID ZStandardCompressor_stream;
static jfieldID ZStandardCompressor_uncompressedDirectBufOff;
static jfieldID ZStandardCompressor_uncompressedDirectBufLen;
static jfieldID ZStandardCompressor_directBufferSize;
static jfieldID ZStandardCompressor_finish;
static jfieldID ZStandardCompressor_finished;
static jfieldID ZStandardCompressor_bytesWritten;
static jfieldID ZStandardCompressor_bytesRead;
#ifdef UNIX
static size_t (*dlsym_ZSTD_CStreamInSize)(void);
static size_t (*dlsym_ZSTD_CStreamOutSize)(void);
static ZSTD_CStream* (*dlsym_ZSTD_createCStream)(void);
static size_t (*dlsym_ZSTD_initCStream)(ZSTD_CStream*, int);
static size_t (*dlsym_ZSTD_freeCStream)(ZSTD_CStream*);
static size_t (*dlsym_ZSTD_compressStream)(ZSTD_CStream*, ZSTD_outBuffer*, ZSTD_inBuffer*);
static size_t (*dlsym_ZSTD_endStream)(ZSTD_CStream*, ZSTD_outBuffer*);
static size_t (*dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*);
static unsigned (*dlsym_ZSTD_isError)(size_t);
static const char * (*dlsym_ZSTD_getErrorName)(size_t);
#endif
#ifdef WINDOWS
typedef size_t (__cdecl *__dlsym_ZSTD_CStreamInSize)(void);
typedef size_t (__cdecl *__dlsym_ZSTD_CStreamOutSize)(void);
typedef ZSTD_CStream* (__cdecl *__dlsym_ZSTD_createCStream)(void);
typedef size_t (__cdecl *__dlsym_ZSTD_initCStream)(ZSTD_CStream*, int);
typedef size_t (__cdecl *__dlsym_ZSTD_freeCStream)(ZSTD_CStream*);
typedef size_t (__cdecl *__dlsym_ZSTD_compressStream)(ZSTD_CStream*, ZSTD_outBuffer*, ZSTD_inBuffer*);
typedef size_t (__cdecl *__dlsym_ZSTD_endStream)(ZSTD_CStream*, ZSTD_outBuffer*);
typedef size_t (__cdecl *__dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*);
typedef unsigned (__cdecl *__dlsym_ZSTD_isError)(size_t);
typedef const char * (__cdecl *__dlsym_ZSTD_getErrorName)(size_t);
static __dlsym_ZSTD_CStreamInSize dlsym_ZSTD_CStreamInSize;
static __dlsym_ZSTD_CStreamOutSize dlsym_ZSTD_CStreamOutSize;
static __dlsym_ZSTD_createCStream dlsym_ZSTD_createCStream;
static __dlsym_ZSTD_initCStream dlsym_ZSTD_initCStream;
static __dlsym_ZSTD_freeCStream dlsym_ZSTD_freeCStream;
static __dlsym_ZSTD_compressStream dlsym_ZSTD_compressStream;
static __dlsym_ZSTD_endStream dlsym_ZSTD_endStream;
static __dlsym_ZSTD_flushStream dlsym_ZSTD_flushStream;
static __dlsym_ZSTD_isError dlsym_ZSTD_isError;
static __dlsym_ZSTD_getErrorName dlsym_ZSTD_getErrorName;
#endif
// Load the libztsd.so from disk
JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_initIDs (JNIEnv *env, jclass clazz) {
#ifdef UNIX
// Load libzstd.so
void *libzstd = dlopen(HADOOP_ZSTD_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
if (!libzstd) {
char* msg = (char*)malloc(10000);
snprintf(msg, 10000, "%s (%s)!", "Cannot load " HADOOP_ZSTD_LIBRARY, dlerror());
THROW(env, "java/lang/InternalError", msg);
return;
}
#endif
#ifdef WINDOWS
HMODULE libzstd = LoadLibrary(HADOOP_ZSTD_LIBRARY);
if (!libzstd) {
THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load zstd.dll");
return;
}
#endif
#ifdef UNIX
// load dynamic symbols
dlerror();
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_CStreamInSize, env, libzstd, "ZSTD_CStreamInSize");
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_CStreamOutSize, env, libzstd, "ZSTD_CStreamOutSize");
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_createCStream, env, libzstd, "ZSTD_createCStream");
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_initCStream, env, libzstd, "ZSTD_initCStream");
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_freeCStream, env, libzstd, "ZSTD_freeCStream");
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_compressStream, env, libzstd, "ZSTD_compressStream");
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_endStream, env, libzstd, "ZSTD_endStream");
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream");
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError");
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName");
#endif
#ifdef WINDOWS
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_CStreamInSize, dlsym_ZSTD_CStreamInSize, env, libzstd, "ZSTD_CStreamInSize");
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_CStreamOutSize, dlsym_ZSTD_CStreamOutSize, env, libzstd, "ZSTD_CStreamOutSize");
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_createCStream, dlsym_ZSTD_createCStream, env, libzstd, "ZSTD_createCStream");
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_initCStream, dlsym_ZSTD_initCStream, env, libzstd, "ZSTD_initCStream");
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_freeCStream, dlsym_ZSTD_freeCStream, env, libzstd, "ZSTD_freeCStream");
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_compressStream, dlsym_ZSTD_compressStream, env, libzstd, "ZSTD_compressStream");
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_endStream, dlsym_ZSTD_endStream, env, libzstd, "ZSTD_endStream");
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_flushStream, dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream");
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_isError, dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError");
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_getErrorName, dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName");
#endif
// load fields
ZStandardCompressor_stream = (*env)->GetFieldID(env, clazz, "stream", "J");
ZStandardCompressor_finish = (*env)->GetFieldID(env, clazz, "finish", "Z");
ZStandardCompressor_finished = (*env)->GetFieldID(env, clazz, "finished", "Z");
ZStandardCompressor_uncompressedDirectBufOff = (*env)->GetFieldID(env, clazz, "uncompressedDirectBufOff", "I");
ZStandardCompressor_uncompressedDirectBufLen = (*env)->GetFieldID(env, clazz, "uncompressedDirectBufLen", "I");
ZStandardCompressor_directBufferSize = (*env)->GetFieldID(env, clazz, "directBufferSize", "I");
ZStandardCompressor_bytesRead = (*env)->GetFieldID(env, clazz, "bytesRead", "J");
ZStandardCompressor_bytesWritten = (*env)->GetFieldID(env, clazz, "bytesWritten", "J");
}
// Create the compression stream
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_create (JNIEnv *env, jobject this) {
ZSTD_CStream* const stream = dlsym_ZSTD_createCStream();
if (stream == NULL) {
THROW(env, "java/lang/InternalError", "Error creating the stream");
return (jlong)0;
}
return (jlong) stream;
}
// Initialize the compression stream
JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_init (JNIEnv *env, jobject this, jint level, jlong stream) {
size_t result = dlsym_ZSTD_initCStream((ZSTD_CStream *) stream, level);
if (dlsym_ZSTD_isError(result)) {
THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result));
return;
}
}
// free the compression stream
JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_end (JNIEnv *env, jobject this, jlong stream) {
size_t result = dlsym_ZSTD_freeCStream((ZSTD_CStream *) stream);
if (dlsym_ZSTD_isError(result)) {
THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result));
return;
}
}
JNIEXPORT jint Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_deflateBytesDirect
(JNIEnv *env, jobject this, jobject uncompressed_direct_buf, jint uncompressed_direct_buf_off, jint uncompressed_direct_buf_len, jobject compressed_direct_buf, jint compressed_direct_buf_len ) {
ZSTD_CStream* const stream = (ZSTD_CStream*) (*env)->GetLongField(env, this, ZStandardCompressor_stream);
if (!stream) {
THROW(env, "java/lang/NullPointerException", NULL);
return (jint)0;
}
jlong bytes_read = (*env)->GetLongField(env, this, ZStandardCompressor_bytesRead);
jlong bytes_written = (*env)->GetLongField(env, this, ZStandardCompressor_bytesWritten);
jboolean finish = (*env)->GetBooleanField(env, this, ZStandardCompressor_finish);
// Get the input direct buffer
void * uncompressed_bytes = (*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
if (!uncompressed_bytes) {
THROW(env, "java/lang/InternalError", "Undefined memory address for uncompressedDirectBuf");
return (jint) 0;
}
// Get the output direct buffer
void * compressed_bytes = (*env)->GetDirectBufferAddress(env, compressed_direct_buf);
if (!compressed_bytes) {
THROW(env, "java/lang/InternalError", "Undefined memory address for compressedDirectBuf");
return (jint) 0;
}
ZSTD_inBuffer input = { uncompressed_bytes, uncompressed_direct_buf_len, uncompressed_direct_buf_off };
ZSTD_outBuffer output = { compressed_bytes, compressed_direct_buf_len, 0 };
size_t size = dlsym_ZSTD_compressStream(stream, &output, &input);
if (dlsym_ZSTD_isError(size)) {
THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size));
return (jint) 0;
}
if (finish && input.pos == input.size) {
// end the stream, flush and write the frame epilogue
size = dlsym_ZSTD_endStream(stream, &output);
if (!size) {
(*env)->SetBooleanField(env, this, ZStandardCompressor_finished, JNI_TRUE);
}
} else {
// need to flush the output buffer
// this also updates the output buffer position.
size = dlsym_ZSTD_flushStream(stream, &output);
}
if (dlsym_ZSTD_isError(size)) {
THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size));
return (jint) 0;
}
bytes_read += input.pos;
bytes_written += output.pos;
(*env)->SetLongField(env, this, ZStandardCompressor_bytesRead, bytes_read);
(*env)->SetLongField(env, this, ZStandardCompressor_bytesWritten, bytes_written);
(*env)->SetIntField(env, this, ZStandardCompressor_uncompressedDirectBufOff, input.pos);
(*env)->SetIntField(env, this, ZStandardCompressor_uncompressedDirectBufLen, input.size - input.pos);
return (jint) output.pos;
}
JNIEXPORT jstring JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_getLibraryName
(JNIEnv *env, jclass class) {
#ifdef UNIX
if (dlsym_ZSTD_isError) {
Dl_info dl_info;
if (dladdr( dlsym_ZSTD_isError, &dl_info)) {
return (*env)->NewStringUTF(env, dl_info.dli_fname);
}
}
return (*env)->NewStringUTF(env, HADOOP_ZSTD_LIBRARY);
#endif
#ifdef WINDOWS
LPWSTR filename = NULL;
GetLibraryName(dlsym_ZSTD_isError, &filename);
if (filename != NULL) {
return (*env)->NewString(env, filename, (jsize) wcslen(filename));
} else {
return (*env)->NewStringUTF(env, "Unavailable");
}
#endif
}
// returns the max size of the recommended input and output buffers
JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_getStreamSize
(JNIEnv *env, jobject this) {
int x = (int) dlsym_ZSTD_CStreamInSize();
int y = (int) dlsym_ZSTD_CStreamOutSize();
return (x >= y) ? x : y;
}
#endif //define HADOOP_ZSTD_LIBRARY

View File

@ -0,0 +1,218 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "org_apache_hadoop_io_compress_zstd.h"
#if defined HADOOP_ZSTD_LIBRARY
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#ifdef UNIX
#include <dlfcn.h>
#include "config.h"
#endif
#include "org_apache_hadoop_io_compress_zstd_ZStandardDecompressor.h"
static jfieldID ZStandardDecompressor_stream;
static jfieldID ZStandardDecompressor_compressedDirectBufOff;
static jfieldID ZStandardDecompressor_bytesInCompressedBuffer;
static jfieldID ZStandardDecompressor_directBufferSize;
static jfieldID ZStandardDecompressor_finished;
static jfieldID ZStandardDecompressor_remaining;
#ifdef UNIX
static size_t (*dlsym_ZSTD_DStreamOutSize)(void);
static size_t (*dlsym_ZSTD_DStreamInSize)(void);
static ZSTD_DStream* (*dlsym_ZSTD_createDStream)(void);
static size_t (*dlsym_ZSTD_initDStream)(ZSTD_DStream*);
static size_t (*dlsym_ZSTD_freeDStream)(ZSTD_DStream*);
static size_t (*dlsym_ZSTD_resetDStream)(ZSTD_DStream*);
static size_t (*dlsym_ZSTD_decompressStream)(ZSTD_DStream*, ZSTD_outBuffer*, ZSTD_inBuffer*);
static size_t (*dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*);
static unsigned (*dlsym_ZSTD_isError)(size_t);
static const char * (*dlsym_ZSTD_getErrorName)(size_t);
#endif
#ifdef WINDOWS
typedef size_t (__cdecl *__dlsym_ZSTD_DStreamOutSize)(void);
typedef size_t (__cdecl *__dlsym_ZSTD_DStreamInSize)(void);
typedef ZSTD_DStream* (__cdecl *__dlsym_ZSTD_createDStream)(void);
typedef size_t (__cdecl *__dlsym_ZSTD_initDStream)(ZSTD_DStream*);
typedef size_t (__cdecl *__dlsym_ZSTD_freeDStream)(ZSTD_DStream*);
typedef size_t (__cdecl *__dlsym_ZSTD_resetDStream)(ZSTD_DStream*);
typedef size_t (__cdecl *__dlsym_ZSTD_decompressStream)(ZSTD_DStream*, ZSTD_outBuffer*, ZSTD_inBuffer*);
typedef size_t (__cdecl *__dlsym_ZSTD_flushStream)(ZSTD_CStream*, ZSTD_outBuffer*);
typedef unsigned (__cdecl *__dlsym_ZSTD_isError)(size_t);
typedef const char * (__cdecl *__dlsym_ZSTD_getErrorName)(size_t);
static __dlsym_ZSTD_DStreamOutSize dlsym_ZSTD_DStreamOutSize;
static __dlsym_ZSTD_DStreamInSize dlsym_ZSTD_DStreamInSize;
static __dlsym_ZSTD_createDStream dlsym_ZSTD_createDStream;
static __dlsym_ZSTD_initDStream dlsym_ZSTD_initDStream;
static __dlsym_ZSTD_freeDStream dlsym_ZSTD_freeDStream;
static __dlsym_ZSTD_resetDStream dlsym_ZSTD_resetDStream;
static __dlsym_ZSTD_decompressStream dlsym_ZSTD_decompressStream;
static __dlsym_ZSTD_isError dlsym_ZSTD_isError;
static __dlsym_ZSTD_getErrorName dlsym_ZSTD_getErrorName;
static __dlsym_ZSTD_flushStream dlsym_ZSTD_flushStream;
#endif
JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_initIDs (JNIEnv *env, jclass clazz) {
// Load libzstd.so
#ifdef UNIX
void *libzstd = dlopen(HADOOP_ZSTD_LIBRARY, RTLD_LAZY | RTLD_GLOBAL);
if (!libzstd) {
char* msg = (char*)malloc(1000);
snprintf(msg, 1000, "%s (%s)!", "Cannot load " HADOOP_ZSTD_LIBRARY, dlerror());
THROW(env, "java/lang/UnsatisfiedLinkError", msg);
return;
}
#endif
#ifdef WINDOWS
HMODULE libzstd = LoadLibrary(HADOOP_ZSTD_LIBRARY);
if (!libzstd) {
THROW(env, "java/lang/UnsatisfiedLinkError", "Cannot load zstd.dll");
return;
}
#endif
#ifdef UNIX
dlerror();
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_DStreamOutSize, env, libzstd, "ZSTD_DStreamOutSize");
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_DStreamInSize, env, libzstd, "ZSTD_DStreamInSize");
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_createDStream, env, libzstd, "ZSTD_createDStream");
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_initDStream, env, libzstd, "ZSTD_initDStream");
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_freeDStream, env, libzstd, "ZSTD_freeDStream");
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_resetDStream, env, libzstd, "ZSTD_resetDStream");
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_decompressStream, env, libzstd, "ZSTD_decompressStream");
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError");
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName");
LOAD_DYNAMIC_SYMBOL(dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream");
#endif
#ifdef WINDOWS
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_DStreamOutSize, dlsym_ZSTD_DStreamOutSize, env, libzstd, "ZSTD_DStreamOutSize");
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_DStreamInSize, dlsym_ZSTD_DStreamInSize, env, libzstd, "ZSTD_DStreamInSize");
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_createDStream, dlsym_ZSTD_createDStream, env, libzstd, "ZSTD_createDStream");
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_initDStream, dlsym_ZSTD_initDStream, env, libzstd, "ZSTD_initDStream");
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_freeDStream, dlsym_ZSTD_freeDStream, env, libzstd, "ZSTD_freeDStream");
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_resetDStream, dlsym_ZSTD_resetDStream, env, libzstd, "ZSTD_resetDStream");
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_decompressStream, dlsym_ZSTD_decompressStream, env, libzstd, "ZSTD_decompressStream");
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_isError, dlsym_ZSTD_isError, env, libzstd, "ZSTD_isError");
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_getErrorName, dlsym_ZSTD_getErrorName, env, libzstd, "ZSTD_getErrorName");
LOAD_DYNAMIC_SYMBOL(__dlsym_ZSTD_flushStream, dlsym_ZSTD_flushStream, env, libzstd, "ZSTD_flushStream");
#endif
ZStandardDecompressor_stream = (*env)->GetFieldID(env, clazz, "stream", "J");
ZStandardDecompressor_finished = (*env)->GetFieldID(env, clazz, "finished", "Z");
ZStandardDecompressor_compressedDirectBufOff = (*env)->GetFieldID(env, clazz, "compressedDirectBufOff", "I");
ZStandardDecompressor_bytesInCompressedBuffer = (*env)->GetFieldID(env, clazz, "bytesInCompressedBuffer", "I");
ZStandardDecompressor_directBufferSize = (*env)->GetFieldID(env, clazz, "directBufferSize", "I");
ZStandardDecompressor_remaining = (*env)->GetFieldID(env, clazz, "remaining", "I");
}
JNIEXPORT jlong JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_create(JNIEnv *env, jobject this) {
ZSTD_DStream * stream = dlsym_ZSTD_createDStream();
if (stream == NULL) {
THROW(env, "java/lang/InternalError", "Error creating stream");
return (jlong) 0;
}
return (jlong) stream;
}
JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_init(JNIEnv *env, jobject this, jlong stream) {
size_t result = dlsym_ZSTD_initDStream((ZSTD_DStream *) stream);
if (dlsym_ZSTD_isError(result)) {
THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result));
return;
}
(*env)->SetLongField(env, this, ZStandardDecompressor_remaining, 0);
}
JNIEXPORT void JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_free(JNIEnv *env, jclass obj, jlong stream) {
size_t result = dlsym_ZSTD_freeDStream((ZSTD_DStream *) stream);
if (dlsym_ZSTD_isError(result)) {
THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result));
return;
}
}
JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_inflateBytesDirect
(JNIEnv *env, jobject this, jobject compressed_direct_buf, jint compressed_direct_buf_off, jint compressed_direct_buf_len, jobject uncompressed_direct_buf, jint uncompressed_direct_buf_off, jint uncompressed_direct_buf_len) {
ZSTD_DStream *stream = (ZSTD_DStream *) (*env)->GetLongField(env, this, ZStandardDecompressor_stream);
if (!stream) {
THROW(env, "java/lang/NullPointerException", NULL);
return (jint)0;
}
// Get the input direct buffer
void * compressed_bytes = (*env)->GetDirectBufferAddress(env, compressed_direct_buf);
if (!compressed_bytes) {
THROW(env, "java/lang/InternalError", "Undefined memory address for compressedDirectBuf");
return (jint) 0;
}
// Get the output direct buffer
void * uncompressed_bytes = (*env)->GetDirectBufferAddress(env, uncompressed_direct_buf);
if (!uncompressed_bytes) {
THROW(env, "java/lang/InternalError", "Undefined memory address for uncompressedDirectBuf");
return (jint) 0;
}
uncompressed_bytes = ((char*) uncompressed_bytes) + uncompressed_direct_buf_off;
ZSTD_inBuffer input = { compressed_bytes, compressed_direct_buf_len, compressed_direct_buf_off };
ZSTD_outBuffer output = { uncompressed_bytes, uncompressed_direct_buf_len, 0 };
size_t const size = dlsym_ZSTD_decompressStream(stream, &output, &input);
// check for errors
if (dlsym_ZSTD_isError(size)) {
THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(size));
return (jint) 0;
}
int remaining = input.size - input.pos;
(*env)->SetIntField(env, this, ZStandardDecompressor_remaining, remaining);
// the entire frame has been decoded
if (size == 0) {
(*env)->SetBooleanField(env, this, ZStandardDecompressor_finished, JNI_TRUE);
size_t result = dlsym_ZSTD_initDStream(stream);
if (dlsym_ZSTD_isError(result)) {
THROW(env, "java/lang/InternalError", dlsym_ZSTD_getErrorName(result));
return (jint) 0;
}
}
(*env)->SetIntField(env, this, ZStandardDecompressor_compressedDirectBufOff, input.pos);
(*env)->SetIntField(env, this, ZStandardDecompressor_bytesInCompressedBuffer, input.size);
return (jint) output.pos;
}
// returns the max size of the recommended input and output buffers
JNIEXPORT jint JNICALL Java_org_apache_hadoop_io_compress_zstd_ZStandardDecompressor_getStreamSize
(JNIEnv *env, jclass obj) {
int x = (int) dlsym_ZSTD_DStreamInSize();
int y = (int) dlsym_ZSTD_DStreamOutSize();
return (x >= y) ? x : y;
}
#endif //define HADOOP_ZSTD_LIBRARY

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H
#define ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H
#include "org_apache_hadoop.h"
#ifdef UNIX
#include <dlfcn.h>
#endif
#include <jni.h>
#include <zstd.h>
#include <stddef.h>
#endif //ORG_APACHE_HADOOP_IO_COMPRESS_ZSTD_ZSTD_H

View File

@ -39,6 +39,16 @@ JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_util_NativeCodeLoader_buildSup
#endif
}
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_util_NativeCodeLoader_buildSupportsZstd
(JNIEnv *env, jclass clazz)
{
#ifdef HADOOP_ZSTD_LIBRARY
return JNI_TRUE;
#else
return JNI_FALSE;
#endif
}
JNIEXPORT jboolean JNICALL Java_org_apache_hadoop_util_NativeCodeLoader_buildSupportsOpenssl
(JNIEnv *env, jclass clazz)
{

View File

@ -17,4 +17,4 @@ org.apache.hadoop.io.compress.DeflateCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.Lz4Codec
org.apache.hadoop.io.compress.SnappyCodec
org.apache.hadoop.io.compress.ZStandardCodec

View File

@ -118,6 +118,7 @@ NativeLibraryChecker is a tool to check whether native libraries are loaded corr
hadoop: true /home/ozawa/hadoop/lib/native/libhadoop.so.1.0.0
zlib: true /lib/x86_64-linux-gnu/libz.so.1
snappy: true /usr/lib/libsnappy.so.1
zstd: true /usr/lib/libzstd.so.1
lz4: true revision:99
bzip2: false

View File

@ -75,6 +75,7 @@
import org.junit.Assume;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.junit.Assume.assumeTrue;
public class TestCodec {
@ -519,6 +520,18 @@ public void testSequenceFileBZip2NativeCodec() throws IOException,
}
}
@Test(timeout=20000)
public void testSequenceFileZStandardCodec() throws Exception {
assumeTrue(ZStandardCodec.isNativeCodeLoaded());
Configuration conf = new Configuration();
sequenceFileCodecTest(conf, 0,
"org.apache.hadoop.io.compress.ZStandardCodec", 100);
sequenceFileCodecTest(conf, 100,
"org.apache.hadoop.io.compress.ZStandardCodec", 100);
sequenceFileCodecTest(conf, 200000,
"org.apache.hadoop.io.compress.ZStandardCodec", 1000000);
}
@Test
public void testSequenceFileDeflateCodec() throws IOException, ClassNotFoundException,
InstantiationException, IllegalAccessException {
@ -581,7 +594,7 @@ private static void sequenceFileCodecTest(Configuration conf, int lines,
*/
@Test
public void testSnappyMapFile() throws Exception {
Assume.assumeTrue(SnappyCodec.isNativeCodeLoaded());
assumeTrue(SnappyCodec.isNativeCodeLoaded());
codecTestMapFile(SnappyCodec.class, CompressionType.BLOCK, 100);
}

View File

@ -37,6 +37,7 @@
import org.junit.Test;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
public class TestCompressionStreamReuse {
private static final Log LOG = LogFactory
@ -69,6 +70,13 @@ public void testGzipCompressStreamReuseWithParam() throws IOException {
"org.apache.hadoop.io.compress.GzipCodec");
}
@Test
public void testZStandardCompressStreamReuse() throws IOException {
assumeTrue(ZStandardCodec.isNativeCodeLoaded());
resetStateTest(conf, seed, count,
"org.apache.hadoop.io.compress.ZStandardCodec");
}
private void resetStateTest(Configuration conf, int seed, int count,
String codecClass) throws IOException {
// Create the codec

View File

@ -0,0 +1,485 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress.zstd;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.CompressorStream;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DecompressorStream;
import org.apache.hadoop.io.compress.ZStandardCodec;
import org.apache.hadoop.test.MultithreadedTestUtil;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Random;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
public class TestZStandardCompressorDecompressor {
private final static char[] HEX_ARRAY = "0123456789ABCDEF".toCharArray();
private static final Random RANDOM = new Random(12345L);
private static final Configuration CONFIGURATION = new Configuration();
private static File compressedFile;
private static File uncompressedFile;
@BeforeClass
public static void beforeClass() throws Exception {
CONFIGURATION.setInt(IO_FILE_BUFFER_SIZE_KEY, 1024 * 64);
uncompressedFile = new File(TestZStandardCompressorDecompressor.class
.getResource("/zstd/test_file.txt").toURI());
compressedFile = new File(TestZStandardCompressorDecompressor.class
.getResource("/zstd/test_file.txt.zst").toURI());
}
@Before
public void before() throws Exception {
assumeTrue(ZStandardCodec.isNativeCodeLoaded());
}
@Test
public void testCompressionCompressesCorrectly() throws Exception {
int uncompressedSize = (int) FileUtils.sizeOf(uncompressedFile);
byte[] bytes = FileUtils.readFileToByteArray(uncompressedFile);
assertEquals(uncompressedSize, bytes.length);
Configuration conf = new Configuration();
ZStandardCodec codec = new ZStandardCodec();
codec.setConf(conf);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Compressor compressor = codec.createCompressor();
CompressionOutputStream outputStream =
codec.createOutputStream(baos, compressor);
for (byte aByte : bytes) {
outputStream.write(aByte);
}
outputStream.finish();
outputStream.close();
assertEquals(uncompressedSize, compressor.getBytesRead());
assertTrue(compressor.finished());
// just make sure we can decompress the file
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
Decompressor decompressor = codec.createDecompressor();
CompressionInputStream inputStream =
codec.createInputStream(bais, decompressor);
byte[] buffer = new byte[100];
int n = buffer.length;
while ((n = inputStream.read(buffer, 0, n)) != -1) {
byteArrayOutputStream.write(buffer, 0, n);
}
assertArrayEquals(bytes, byteArrayOutputStream.toByteArray());
}
@Test(expected = NullPointerException.class)
public void testCompressorSetInputNullPointerException() {
ZStandardCompressor compressor = new ZStandardCompressor();
compressor.setInput(null, 0, 10);
}
//test on NullPointerException in {@code decompressor.setInput()}
@Test(expected = NullPointerException.class)
public void testDecompressorSetInputNullPointerException() {
ZStandardDecompressor decompressor =
new ZStandardDecompressor(IO_FILE_BUFFER_SIZE_DEFAULT);
decompressor.setInput(null, 0, 10);
}
//test on ArrayIndexOutOfBoundsException in {@code compressor.setInput()}
@Test(expected = ArrayIndexOutOfBoundsException.class)
public void testCompressorSetInputAIOBException() {
ZStandardCompressor compressor = new ZStandardCompressor();
compressor.setInput(new byte[] {}, -5, 10);
}
//test on ArrayIndexOutOfBoundsException in {@code decompressor.setInput()}
@Test(expected = ArrayIndexOutOfBoundsException.class)
public void testDecompressorSetInputAIOUBException() {
ZStandardDecompressor decompressor =
new ZStandardDecompressor(IO_FILE_BUFFER_SIZE_DEFAULT);
decompressor.setInput(new byte[] {}, -5, 10);
}
//test on NullPointerException in {@code compressor.compress()}
@Test(expected = NullPointerException.class)
public void testCompressorCompressNullPointerException() throws Exception {
ZStandardCompressor compressor = new ZStandardCompressor();
byte[] bytes = generate(1024 * 6);
compressor.setInput(bytes, 0, bytes.length);
compressor.compress(null, 0, 0);
}
//test on NullPointerException in {@code decompressor.decompress()}
@Test(expected = NullPointerException.class)
public void testDecompressorCompressNullPointerException() throws Exception {
ZStandardDecompressor decompressor =
new ZStandardDecompressor(IO_FILE_BUFFER_SIZE_DEFAULT);
byte[] bytes = generate(1024 * 6);
decompressor.setInput(bytes, 0, bytes.length);
decompressor.decompress(null, 0, 0);
}
//test on ArrayIndexOutOfBoundsException in {@code compressor.compress()}
@Test(expected = ArrayIndexOutOfBoundsException.class)
public void testCompressorCompressAIOBException() throws Exception {
ZStandardCompressor compressor = new ZStandardCompressor();
byte[] bytes = generate(1024 * 6);
compressor.setInput(bytes, 0, bytes.length);
compressor.compress(new byte[] {}, 0, -1);
}
//test on ArrayIndexOutOfBoundsException in decompressor.decompress()
@Test(expected = ArrayIndexOutOfBoundsException.class)
public void testDecompressorCompressAIOBException() throws Exception {
ZStandardDecompressor decompressor =
new ZStandardDecompressor(IO_FILE_BUFFER_SIZE_DEFAULT);
byte[] bytes = generate(1024 * 6);
decompressor.setInput(bytes, 0, bytes.length);
decompressor.decompress(new byte[] {}, 0, -1);
}
// test ZStandardCompressor compressor.compress()
@Test
public void testSetInputWithBytesSizeMoreThenDefaultZStandardBufferSize()
throws Exception {
int bytesSize = 1024 * 2056 + 1;
ZStandardCompressor compressor = new ZStandardCompressor();
byte[] bytes = generate(bytesSize);
assertTrue("needsInput error !!!", compressor.needsInput());
compressor.setInput(bytes, 0, bytes.length);
byte[] emptyBytes = new byte[bytesSize];
int cSize = compressor.compress(emptyBytes, 0, bytes.length);
assertTrue(cSize > 0);
}
// test compress/decompress process through
// CompressionOutputStream/CompressionInputStream api
@Test
public void testCompressorDecompressorLogicWithCompressionStreams()
throws Exception {
DataOutputStream deflateOut = null;
DataInputStream inflateIn = null;
int byteSize = 1024 * 100;
byte[] bytes = generate(byteSize);
int bufferSize = IO_FILE_BUFFER_SIZE_DEFAULT;
try {
DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
CompressionOutputStream deflateFilter =
new CompressorStream(compressedDataBuffer, new ZStandardCompressor(),
bufferSize);
deflateOut =
new DataOutputStream(new BufferedOutputStream(deflateFilter));
deflateOut.write(bytes, 0, bytes.length);
deflateOut.flush();
deflateFilter.finish();
DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
compressedDataBuffer.getLength());
CompressionInputStream inflateFilter =
new DecompressorStream(deCompressedDataBuffer,
new ZStandardDecompressor(bufferSize), bufferSize);
inflateIn = new DataInputStream(new BufferedInputStream(inflateFilter));
byte[] result = new byte[byteSize];
inflateIn.read(result);
assertArrayEquals("original array not equals compress/decompressed array",
result, bytes);
} finally {
IOUtils.closeQuietly(deflateOut);
IOUtils.closeQuietly(inflateIn);
}
}
@Test
public void testZStandardCompressDecompressInMultiThreads() throws Exception {
MultithreadedTestUtil.TestContext ctx =
new MultithreadedTestUtil.TestContext();
for (int i = 0; i < 10; i++) {
ctx.addThread(new MultithreadedTestUtil.TestingThread(ctx) {
@Override
public void doWork() throws Exception {
testCompressDecompress();
}
});
}
ctx.startThreads();
ctx.waitFor(60000);
}
@Test
public void testCompressDecompress() throws Exception {
byte[] rawData;
int rawDataSize;
rawDataSize = IO_FILE_BUFFER_SIZE_DEFAULT;
rawData = generate(rawDataSize);
ZStandardCompressor compressor = new ZStandardCompressor();
ZStandardDecompressor decompressor =
new ZStandardDecompressor(IO_FILE_BUFFER_SIZE_DEFAULT);
assertFalse(compressor.finished());
compressor.setInput(rawData, 0, rawData.length);
assertEquals(0, compressor.getBytesRead());
compressor.finish();
byte[] compressedResult = new byte[rawDataSize];
int cSize = compressor.compress(compressedResult, 0, rawDataSize);
assertEquals(rawDataSize, compressor.getBytesRead());
assertTrue(cSize < rawDataSize);
decompressor.setInput(compressedResult, 0, cSize);
byte[] decompressedBytes = new byte[rawDataSize];
decompressor.decompress(decompressedBytes, 0, decompressedBytes.length);
assertEquals(bytesToHex(rawData), bytesToHex(decompressedBytes));
compressor.reset();
decompressor.reset();
}
@Test
public void testCompressingWithOneByteOutputBuffer() throws Exception {
int uncompressedSize = (int) FileUtils.sizeOf(uncompressedFile);
byte[] bytes = FileUtils.readFileToByteArray(uncompressedFile);
assertEquals(uncompressedSize, bytes.length);
Configuration conf = new Configuration();
ZStandardCodec codec = new ZStandardCodec();
codec.setConf(conf);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Compressor compressor =
new ZStandardCompressor(3, IO_FILE_BUFFER_SIZE_DEFAULT, 1);
CompressionOutputStream outputStream =
codec.createOutputStream(baos, compressor);
for (byte aByte : bytes) {
outputStream.write(aByte);
}
outputStream.finish();
outputStream.close();
assertEquals(uncompressedSize, compressor.getBytesRead());
assertTrue(compressor.finished());
// just make sure we can decompress the file
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
Decompressor decompressor = codec.createDecompressor();
CompressionInputStream inputStream =
codec.createInputStream(bais, decompressor);
byte[] buffer = new byte[100];
int n = buffer.length;
while ((n = inputStream.read(buffer, 0, n)) != -1) {
byteArrayOutputStream.write(buffer, 0, n);
}
assertArrayEquals(bytes, byteArrayOutputStream.toByteArray());
}
@Test
public void testZStandardCompressDecompress() throws Exception {
byte[] rawData = null;
int rawDataSize = 0;
rawDataSize = IO_FILE_BUFFER_SIZE_DEFAULT;
rawData = generate(rawDataSize);
ZStandardCompressor compressor = new ZStandardCompressor();
ZStandardDecompressor decompressor = new ZStandardDecompressor(rawDataSize);
assertTrue(compressor.needsInput());
assertFalse("testZStandardCompressDecompress finished error",
compressor.finished());
compressor.setInput(rawData, 0, rawData.length);
compressor.finish();
byte[] compressedResult = new byte[rawDataSize];
int cSize = compressor.compress(compressedResult, 0, rawDataSize);
assertEquals(rawDataSize, compressor.getBytesRead());
assertTrue("compressed size no less then original size",
cSize < rawDataSize);
decompressor.setInput(compressedResult, 0, cSize);
byte[] decompressedBytes = new byte[rawDataSize];
decompressor.decompress(decompressedBytes, 0, decompressedBytes.length);
String decompressed = bytesToHex(decompressedBytes);
String original = bytesToHex(rawData);
assertEquals(original, decompressed);
compressor.reset();
decompressor.reset();
}
@Test
public void testDecompressingOutput() throws Exception {
byte[] expectedDecompressedResult =
FileUtils.readFileToByteArray(uncompressedFile);
ZStandardCodec codec = new ZStandardCodec();
codec.setConf(CONFIGURATION);
CompressionInputStream inputStream = codec
.createInputStream(FileUtils.openInputStream(compressedFile),
codec.createDecompressor());
byte[] toDecompress = new byte[100];
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] decompressedResult;
int totalFileSize = 0;
int result = toDecompress.length;
try {
while ((result = inputStream.read(toDecompress, 0, result)) != -1) {
baos.write(toDecompress, 0, result);
totalFileSize += result;
}
decompressedResult = baos.toByteArray();
} finally {
IOUtils.closeQuietly(baos);
}
assertEquals(decompressedResult.length, totalFileSize);
assertEquals(bytesToHex(expectedDecompressedResult),
bytesToHex(decompressedResult));
}
@Test
public void testZStandardDirectCompressDecompress() throws Exception {
int[] size = {1, 4, 16, 4 * 1024, 64 * 1024, 128 * 1024, 1024 * 1024 };
for (int aSize : size) {
System.out.println("aSize = " + aSize);
compressDecompressLoop(aSize);
}
}
private void compressDecompressLoop(int rawDataSize) throws IOException {
byte[] rawData = null;
rawData = generate(rawDataSize);
ByteArrayOutputStream baos = new ByteArrayOutputStream(rawDataSize + 12);
CompressionOutputStream deflateFilter =
new CompressorStream(baos, new ZStandardCompressor(), 4096);
DataOutputStream deflateOut =
new DataOutputStream(new BufferedOutputStream(deflateFilter));
deflateOut.write(rawData, 0, rawData.length);
deflateOut.flush();
deflateFilter.finish();
byte[] compressedResult = baos.toByteArray();
int compressedSize = compressedResult.length;
ZStandardDecompressor.ZStandardDirectDecompressor decompressor =
new ZStandardDecompressor.ZStandardDirectDecompressor(4096);
ByteBuffer inBuf = ByteBuffer.allocateDirect(compressedSize);
ByteBuffer outBuf = ByteBuffer.allocateDirect(8096);
inBuf.put(compressedResult, 0, compressedSize);
inBuf.flip();
ByteBuffer expected = ByteBuffer.wrap(rawData);
outBuf.clear();
while (!decompressor.finished()) {
decompressor.decompress(inBuf, outBuf);
if (outBuf.remaining() == 0) {
outBuf.flip();
while (outBuf.remaining() > 0) {
assertEquals(expected.get(), outBuf.get());
}
outBuf.clear();
}
}
outBuf.flip();
while (outBuf.remaining() > 0) {
assertEquals(expected.get(), outBuf.get());
}
outBuf.clear();
assertEquals(0, expected.remaining());
}
@Test
public void testReadingWithAStream() throws Exception {
FileInputStream inputStream = FileUtils.openInputStream(compressedFile);
ZStandardCodec codec = new ZStandardCodec();
codec.setConf(CONFIGURATION);
Decompressor decompressor = codec.createDecompressor();
CompressionInputStream cis =
codec.createInputStream(inputStream, decompressor);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] resultOfDecompression;
try {
byte[] buffer = new byte[100];
int n;
while ((n = cis.read(buffer, 0, buffer.length)) != -1) {
baos.write(buffer, 0, n);
}
resultOfDecompression = baos.toByteArray();
} finally {
IOUtils.closeQuietly(baos);
IOUtils.closeQuietly(cis);
}
byte[] expected = FileUtils.readFileToByteArray(uncompressedFile);
assertEquals(bytesToHex(expected), bytesToHex(resultOfDecompression));
}
@Test
public void testDecompressReturnsWhenNothingToDecompress() throws Exception {
ZStandardDecompressor decompressor =
new ZStandardDecompressor(IO_FILE_BUFFER_SIZE_DEFAULT);
int result = decompressor.decompress(new byte[10], 0, 10);
assertEquals(0, result);
}
public static byte[] generate(int size) {
byte[] data = new byte[size];
for (int i = 0; i < size; i++) {
data[i] = (byte) RANDOM.nextInt(16);
}
return data;
}
private static String bytesToHex(byte[] bytes) {
char[] hexChars = new char[bytes.length * 2];
for (int j = 0; j < bytes.length; j++) {
int v = bytes[j] & 0xFF;
hexChars[j * 2] = HEX_ARRAY[v >>> 4];
hexChars[j * 2 + 1] = HEX_ARRAY[v & 0x0F];
}
return new String(hexChars);
}
}

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files.
"Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions:
You must give any other recipients of the Work or Derivative Works a copy of this License; and
You must cause any modified files to carry prominent notices stating that You changed the files; and
You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and
If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License.
You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS

View File

@ -42,6 +42,9 @@
<snappy.lib></snappy.lib>
<bundle.snappy>false</bundle.snappy>
<bundle.snappy.in.bin>false</bundle.snappy.in.bin>
<zstd.lib></zstd.lib>
<bundle.zstd>false</bundle.zstd>
<bundle.zstd.in.bin>false</bundle.zstd.in.bin>
<openssl.lib></openssl.lib>
<bundle.openssl>false</bundle.openssl>
<bundle.openssl.in.bin>false</bundle.openssl.in.bin>
@ -361,6 +364,9 @@
<argument>--snappybinbundle=${bundle.snappy.in.bin}</argument>
<argument>--snappylib=${snappy.lib}</argument>
<argument>--snappylibbundle=${bundle.snappy}</argument>
<argument>--zstdbinbundle=${bundle.zstd.in.bin}</argument>
<argument>--zstdlib=${zstd.lib}</argument>
<argument>--zstdlibbundle=${bundle.zstd}</argument>
</arguments>
</configuration>
</execution>

View File

@ -1341,6 +1341,7 @@
<!-- attempt to open a file at this path. -->
<java.security.egd>file:/dev/urandom</java.security.egd>
<bundle.snappy.in.bin>true</bundle.snappy.in.bin>
<bundle.zstd.in.bin>true</bundle.zstd.in.bin>
<bundle.openssl.in.bin>true</bundle.openssl.in.bin>
</properties>
<build>
@ -1352,6 +1353,7 @@
<environmentVariables>
<!-- Specify where to look for the native DLL on Windows -->
<PATH>${env.PATH};${hadoop.common.build.dir}/bin;${snappy.lib}</PATH>
<PATH>${env.PATH};${hadoop.common.build.dir}/bin;${zstd.lib}</PATH>
<PATH>${env.PATH};${hadoop.common.build.dir}/bin;${openssl.lib}</PATH>
</environmentVariables>
</configuration>