diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java index 960083519dd..8b1b6db0860 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java @@ -159,7 +159,7 @@ public class ZStandardCompressor implements Compressor { } // have we consumed all input - if (keepUncompressedBuf && uncompressedDirectBufLen > 0) { + if (keepUncompressedBuf && uncompressedDirectBufLen - uncompressedDirectBufOff > 0) { return false; } @@ -223,7 +223,7 @@ public class ZStandardCompressor implements Compressor { compressedDirectBuf.limit(n); // Check if we have consumed all input buffer - if (uncompressedDirectBufLen <= 0) { + if (uncompressedDirectBufLen - uncompressedDirectBufOff <= 0) { // consumed all input buffer keepUncompressedBuf = false; uncompressedDirectBuf.clear(); diff --git a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c index 41eb9e2c85a..6581f292b4a 100644 --- a/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c +++ b/hadoop-common-project/hadoop-common/src/main/native/src/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.c @@ -219,13 +219,13 @@ JNIEXPORT jint Java_org_apache_hadoop_io_compress_zstd_ZStandardCompressor_defla return (jint) 0; } - bytes_read += input.pos; + bytes_read += input.pos - uncompressed_direct_buf_off; bytes_written += output.pos; (*env)->SetLongField(env, this, ZStandardCompressor_bytesRead, bytes_read); (*env)->SetLongField(env, this, ZStandardCompressor_bytesWritten, bytes_written); (*env)->SetIntField(env, this, ZStandardCompressor_uncompressedDirectBufOff, input.pos); - (*env)->SetIntField(env, this, ZStandardCompressor_uncompressedDirectBufLen, input.size - input.pos); + (*env)->SetIntField(env, this, ZStandardCompressor_uncompressedDirectBufLen, input.size); return (jint) output.pos; }