diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java index 0be6e349e0d..a2273bf8334 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java @@ -30,20 +30,23 @@ import java.util.EnumSet; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.CanSetDropBehind; import org.apache.hadoop.fs.CanSetReadahead; +import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; import org.apache.hadoop.fs.HasFileDescriptor; import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.StreamCapabilitiesPolicy; import org.apache.hadoop.io.ByteBufferPool; - -import com.google.common.base.Preconditions; +import org.apache.hadoop.util.StringUtils; /** * CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is @@ -61,7 +64,7 @@ import com.google.common.base.Preconditions; public class CryptoInputStream extends FilterInputStream implements Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess, - ReadableByteChannel { + ReadableByteChannel, CanUnbuffer, StreamCapabilities { private final byte[] oneByteBuf = new byte[1]; private final CryptoCodec codec; private final Decryptor decryptor; @@ -719,4 +722,27 @@ public class CryptoInputStream extends FilterInputStream implements public boolean isOpen() { return !closed; } + + private void cleanDecryptorPool() { + decryptorPool.clear(); + } + + @Override + public void unbuffer() { + cleanBufferPool(); + cleanDecryptorPool(); + StreamCapabilitiesPolicy.unbuffer(in); + } + + @Override + public boolean hasCapability(String capability) { + switch (StringUtils.toLowerCase(capability)) { + case StreamCapabilities.READAHEAD: + case StreamCapabilities.DROPBEHIND: + case StreamCapabilities.UNBUFFER: + return true; + default: + return false; + } + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java index 9183524a1ad..259383dd420 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/CryptoStreamsTestBase.java @@ -27,6 +27,7 @@ import java.util.EnumSet; import java.util.Random; import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; @@ -102,7 +103,32 @@ public abstract class CryptoStreamsTestBase { return total; } - + + private int preadAll(PositionedReadable in, byte[] b, int off, int len) + throws IOException { + int n = 0; + int total = 0; + while (n != -1) { + total += n; + if (total >= len) { + break; + } + n = in.read(total, b, off + total, len - total); + } + + return total; + } + + private void preadCheck(PositionedReadable in) throws Exception { + byte[] result = new byte[dataLen]; + int n = preadAll(in, result, 0, dataLen); + + Assert.assertEquals(dataLen, n); + byte[] expectedData = new byte[n]; + System.arraycopy(data, 0, expectedData, 0, n); + Assert.assertArrayEquals(result, expectedData); + } + protected OutputStream getOutputStream(int bufferSize) throws IOException { return getOutputStream(bufferSize, key, iv); } @@ -146,7 +172,6 @@ public abstract class CryptoStreamsTestBase { // EOF n = in.read(result, 0, dataLen); Assert.assertEquals(n, -1); - in.close(); } /** Test crypto writing with different buffer size. */ @@ -730,4 +755,47 @@ public abstract class CryptoStreamsTestBase { in.close(); } + + /** Test unbuffer. */ + @Test(timeout=120000) + public void testUnbuffer() throws Exception { + OutputStream out = getOutputStream(smallBufferSize); + writeData(out); + + // Test buffered read + try (InputStream in = getInputStream(smallBufferSize)) { + // Test unbuffer after buffered read + readCheck(in); + ((CanUnbuffer) in).unbuffer(); + + if (in instanceof Seekable) { + // Test buffered read again after unbuffer + // Must seek to the beginning first + ((Seekable) in).seek(0); + readCheck(in); + } + + // Test close after unbuffer + ((CanUnbuffer) in).unbuffer(); + // The close will be called when exiting this try-with-resource block + } + + // Test pread + try (InputStream in = getInputStream(smallBufferSize)) { + if (in instanceof PositionedReadable) { + PositionedReadable pin = (PositionedReadable) in; + + // Test unbuffer after pread + preadCheck(pin); + ((CanUnbuffer) in).unbuffer(); + + // Test pread again after unbuffer + preadCheck(pin); + + // Test close after unbuffer + ((CanUnbuffer) in).unbuffer(); + // The close will be called when exiting this try-with-resource block + } + } + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java index 810270b522b..c6c60675cce 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreams.java @@ -29,11 +29,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.CanSetDropBehind; import org.apache.hadoop.fs.CanSetReadahead; +import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; import org.apache.hadoop.fs.HasFileDescriptor; import org.apache.hadoop.fs.PositionedReadable; import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.Seekable; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.io.DataInputBuffer; @@ -164,16 +166,18 @@ public class TestCryptoStreams extends CryptoStreamsTestBase { } } - public static class FakeInputStream extends InputStream implements - Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, - CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess { + static class FakeInputStream extends InputStream + implements Seekable, PositionedReadable, ByteBufferReadable, + HasFileDescriptor, CanSetDropBehind, CanSetReadahead, + HasEnhancedByteBufferAccess, CanUnbuffer, + StreamCapabilities { private final byte[] oneByteBuf = new byte[1]; private int pos = 0; private final byte[] data; private final int length; private boolean closed = false; - public FakeInputStream(DataInputBuffer in) { + FakeInputStream(DataInputBuffer in) { data = in.getData(); length = in.getLength(); } @@ -354,6 +358,22 @@ public class TestCryptoStreams extends CryptoStreamsTestBase { UnsupportedOperationException { } + @Override + public void unbuffer() { + } + + @Override + public boolean hasCapability(String capability) { + switch (capability.toLowerCase()) { + case StreamCapabilities.READAHEAD: + case StreamCapabilities.DROPBEHIND: + case StreamCapabilities.UNBUFFER: + return true; + default: + return false; + } + } + @Override public FileDescriptor getFileDescriptor() throws IOException { return null; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java index 1ef6f3cdcf9..bb3fd7a68d7 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsForLocalFS.java @@ -112,4 +112,9 @@ public class TestCryptoStreamsForLocalFS extends CryptoStreamsTestBase { @Test(timeout=10000) public void testSeekToNewSource() throws Exception { } + + @Ignore("Local file input stream does not support unbuffer") + @Override + @Test + public void testUnbuffer() throws Exception {} } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java index b5382c1efc6..7e300777a37 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/TestCryptoStreamsNormal.java @@ -120,4 +120,9 @@ public class TestCryptoStreamsNormal extends CryptoStreamsTestBase { @Override @Test(timeout=10000) public void testHasEnhancedByteBufferAccess() throws IOException {} + + @Ignore("ByteArrayInputStream does not support unbuffer") + @Override + @Test + public void testUnbuffer() throws Exception {} }