HDFS-14099. Unknown frame descriptor when decompressing multiple frames (#3836)
Co-authored-by: xuzq <xuzengqiang@kuaishou.com>
Co-authored-by: Ashutosh Gupta <ashugpt@amazon.com>
Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
(cherry picked from commit caab29ec88
)
This commit is contained in:
parent
84a767ea24
commit
8aac1c7dc9
|
@ -113,6 +113,12 @@ public class ZStandardDecompressor implements Decompressor {
|
||||||
compressedDirectBuf.put(
|
compressedDirectBuf.put(
|
||||||
userBuf, userBufOff, bytesInCompressedBuffer);
|
userBuf, userBufOff, bytesInCompressedBuffer);
|
||||||
|
|
||||||
|
// Set the finished to false when compressedDirectBuf still
|
||||||
|
// contains some bytes.
|
||||||
|
if (compressedDirectBuf.position() > 0 && finished) {
|
||||||
|
finished = false;
|
||||||
|
}
|
||||||
|
|
||||||
userBufOff += bytesInCompressedBuffer;
|
userBufOff += bytesInCompressedBuffer;
|
||||||
userBufferBytesToConsume -= bytesInCompressedBuffer;
|
userBufferBytesToConsume -= bytesInCompressedBuffer;
|
||||||
}
|
}
|
||||||
|
@ -186,6 +192,13 @@ public class ZStandardDecompressor implements Decompressor {
|
||||||
0,
|
0,
|
||||||
directBufferSize
|
directBufferSize
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// Set the finished to false when compressedDirectBuf still
|
||||||
|
// contains some bytes.
|
||||||
|
if (remaining > 0 && finished) {
|
||||||
|
finished = false;
|
||||||
|
}
|
||||||
|
|
||||||
uncompressedDirectBuf.limit(n);
|
uncompressedDirectBuf.limit(n);
|
||||||
|
|
||||||
// Get at most 'len' bytes
|
// Get at most 'len' bytes
|
||||||
|
|
|
@ -234,6 +234,65 @@ public class TestZStandardCompressorDecompressor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify decompressor logic with some finish operation in compress.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testCompressorDecompressorWithFinish() throws Exception {
|
||||||
|
DataOutputStream deflateOut = null;
|
||||||
|
DataInputStream inflateIn = null;
|
||||||
|
int byteSize = 1024 * 100;
|
||||||
|
byte[] bytes = generate(byteSize);
|
||||||
|
int firstLength = 1024 * 30;
|
||||||
|
|
||||||
|
int bufferSize = IO_FILE_BUFFER_SIZE_DEFAULT;
|
||||||
|
try {
|
||||||
|
DataOutputBuffer compressedDataBuffer = new DataOutputBuffer();
|
||||||
|
CompressionOutputStream deflateFilter =
|
||||||
|
new CompressorStream(compressedDataBuffer, new ZStandardCompressor(),
|
||||||
|
bufferSize);
|
||||||
|
|
||||||
|
deflateOut =
|
||||||
|
new DataOutputStream(new BufferedOutputStream(deflateFilter));
|
||||||
|
|
||||||
|
// Write some data and finish.
|
||||||
|
deflateOut.write(bytes, 0, firstLength);
|
||||||
|
deflateFilter.finish();
|
||||||
|
deflateOut.flush();
|
||||||
|
|
||||||
|
// ResetState then write some data and finish.
|
||||||
|
deflateFilter.resetState();
|
||||||
|
deflateOut.write(bytes, firstLength, firstLength);
|
||||||
|
deflateFilter.finish();
|
||||||
|
deflateOut.flush();
|
||||||
|
|
||||||
|
// ResetState then write some data and finish.
|
||||||
|
deflateFilter.resetState();
|
||||||
|
deflateOut.write(bytes, firstLength * 2, byteSize - firstLength * 2);
|
||||||
|
deflateFilter.finish();
|
||||||
|
deflateOut.flush();
|
||||||
|
|
||||||
|
DataInputBuffer deCompressedDataBuffer = new DataInputBuffer();
|
||||||
|
deCompressedDataBuffer.reset(compressedDataBuffer.getData(), 0,
|
||||||
|
compressedDataBuffer.getLength());
|
||||||
|
|
||||||
|
CompressionInputStream inflateFilter =
|
||||||
|
new DecompressorStream(deCompressedDataBuffer,
|
||||||
|
new ZStandardDecompressor(bufferSize), bufferSize);
|
||||||
|
|
||||||
|
inflateIn = new DataInputStream(new BufferedInputStream(inflateFilter));
|
||||||
|
|
||||||
|
byte[] result = new byte[byteSize];
|
||||||
|
inflateIn.read(result);
|
||||||
|
assertArrayEquals(
|
||||||
|
"original array not equals compress/decompressed array", bytes,
|
||||||
|
result);
|
||||||
|
} finally {
|
||||||
|
IOUtils.closeStream(deflateOut);
|
||||||
|
IOUtils.closeStream(inflateIn);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testZStandardCompressDecompressInMultiThreads() throws Exception {
|
public void testZStandardCompressDecompressInMultiThreads() throws Exception {
|
||||||
MultithreadedTestUtil.TestContext ctx =
|
MultithreadedTestUtil.TestContext ctx =
|
||||||
|
|
Loading…
Reference in New Issue