HADOOP-14872. CryptoInputStream should implement unbuffer. Contributed by John Zhuge.

This commit is contained in:
John Zhuge 2017-11-07 00:09:34 -08:00
parent bf6a660232
commit 6c32ddad30
5 changed files with 133 additions and 9 deletions

View File

@ -30,20 +30,23 @@
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
import org.apache.hadoop.fs.HasFileDescriptor;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.StreamCapabilitiesPolicy;
import org.apache.hadoop.io.ByteBufferPool;
import com.google.common.base.Preconditions;
import org.apache.hadoop.util.StringUtils;
/**
* CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is
@ -61,7 +64,7 @@
public class CryptoInputStream extends FilterInputStream implements
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess,
ReadableByteChannel {
ReadableByteChannel, CanUnbuffer, StreamCapabilities {
private final byte[] oneByteBuf = new byte[1];
private final CryptoCodec codec;
private final Decryptor decryptor;
@ -719,4 +722,27 @@ private void returnDecryptor(Decryptor decryptor) {
public boolean isOpen() {
return !closed;
}
private void cleanDecryptorPool() {
decryptorPool.clear();
}
@Override
public void unbuffer() {
cleanBufferPool();
cleanDecryptorPool();
StreamCapabilitiesPolicy.unbuffer(in);
}
@Override
public boolean hasCapability(String capability) {
switch (StringUtils.toLowerCase(capability)) {
case StreamCapabilities.READAHEAD:
case StreamCapabilities.DROPBEHIND:
case StreamCapabilities.UNBUFFER:
return true;
default:
return false;
}
}
}

View File

@ -27,6 +27,7 @@
import java.util.Random;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
@ -102,7 +103,32 @@ private int readAll(InputStream in, byte[] b, int off, int len)
return total;
}
private int preadAll(PositionedReadable in, byte[] b, int off, int len)
throws IOException {
int n = 0;
int total = 0;
while (n != -1) {
total += n;
if (total >= len) {
break;
}
n = in.read(total, b, off + total, len - total);
}
return total;
}
private void preadCheck(PositionedReadable in) throws Exception {
byte[] result = new byte[dataLen];
int n = preadAll(in, result, 0, dataLen);
Assert.assertEquals(dataLen, n);
byte[] expectedData = new byte[n];
System.arraycopy(data, 0, expectedData, 0, n);
Assert.assertArrayEquals(result, expectedData);
}
protected OutputStream getOutputStream(int bufferSize) throws IOException {
return getOutputStream(bufferSize, key, iv);
}
@ -146,7 +172,6 @@ private void readCheck(InputStream in) throws Exception {
// EOF
n = in.read(result, 0, dataLen);
Assert.assertEquals(n, -1);
in.close();
}
/** Test crypto writing with different buffer size. */
@ -730,4 +755,47 @@ public void testHasEnhancedByteBufferAccess() throws Exception {
in.close();
}
/** Test unbuffer. */
@Test(timeout=120000)
public void testUnbuffer() throws Exception {
OutputStream out = getOutputStream(smallBufferSize);
writeData(out);
// Test buffered read
try (InputStream in = getInputStream(smallBufferSize)) {
// Test unbuffer after buffered read
readCheck(in);
((CanUnbuffer) in).unbuffer();
if (in instanceof Seekable) {
// Test buffered read again after unbuffer
// Must seek to the beginning first
((Seekable) in).seek(0);
readCheck(in);
}
// Test close after unbuffer
((CanUnbuffer) in).unbuffer();
// The close will be called when exiting this try-with-resource block
}
// Test pread
try (InputStream in = getInputStream(smallBufferSize)) {
if (in instanceof PositionedReadable) {
PositionedReadable pin = (PositionedReadable) in;
// Test unbuffer after pread
preadCheck(pin);
((CanUnbuffer) in).unbuffer();
// Test pread again after unbuffer
preadCheck(pin);
// Test close after unbuffer
((CanUnbuffer) in).unbuffer();
// The close will be called when exiting this try-with-resource block
}
}
}
}

View File

@ -29,11 +29,13 @@
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.CanUnbuffer;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
import org.apache.hadoop.fs.HasFileDescriptor;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.DataInputBuffer;
@ -159,16 +161,18 @@ private void checkStream() throws IOException {
}
}
public static class FakeInputStream extends InputStream implements
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess {
static class FakeInputStream extends InputStream
implements Seekable, PositionedReadable, ByteBufferReadable,
HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
HasEnhancedByteBufferAccess, CanUnbuffer,
StreamCapabilities {
private final byte[] oneByteBuf = new byte[1];
private int pos = 0;
private final byte[] data;
private final int length;
private boolean closed = false;
public FakeInputStream(DataInputBuffer in) {
FakeInputStream(DataInputBuffer in) {
data = in.getData();
length = in.getLength();
}
@ -349,6 +353,22 @@ public void setDropBehind(Boolean dropCache) throws IOException,
UnsupportedOperationException {
}
@Override
public void unbuffer() {
}
@Override
public boolean hasCapability(String capability) {
switch (capability.toLowerCase()) {
case StreamCapabilities.READAHEAD:
case StreamCapabilities.DROPBEHIND:
case StreamCapabilities.UNBUFFER:
return true;
default:
return false;
}
}
@Override
public FileDescriptor getFileDescriptor() throws IOException {
return null;

View File

@ -112,4 +112,9 @@ public void testHasEnhancedByteBufferAccess() throws Exception {
@Test(timeout=10000)
public void testSeekToNewSource() throws Exception {
}
@Ignore("Local file input stream does not support unbuffer")
@Override
@Test
public void testUnbuffer() throws Exception {}
}

View File

@ -120,4 +120,9 @@ public void testSeekToNewSource() throws IOException {}
@Override
@Test(timeout=10000)
public void testHasEnhancedByteBufferAccess() throws IOException {}
@Ignore("ByteArrayInputStream does not support unbuffer")
@Override
@Test
public void testUnbuffer() throws Exception {}
}