HADOOP-14872. CryptoInputStream should implement unbuffer. Contributed by John Zhuge.
(cherry picked from commit6c32ddad30
) (cherry picked from commitfc9e156484
) (cherry picked from commit5e0f4f212d
)
This commit is contained in:
parent
141cc8ffa2
commit
a1334ea177
|
@ -30,20 +30,23 @@ import java.util.EnumSet;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.ByteBufferReadable;
|
import org.apache.hadoop.fs.ByteBufferReadable;
|
||||||
import org.apache.hadoop.fs.CanSetDropBehind;
|
import org.apache.hadoop.fs.CanSetDropBehind;
|
||||||
import org.apache.hadoop.fs.CanSetReadahead;
|
import org.apache.hadoop.fs.CanSetReadahead;
|
||||||
|
import org.apache.hadoop.fs.CanUnbuffer;
|
||||||
import org.apache.hadoop.fs.FSExceptionMessages;
|
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||||
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
|
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
|
||||||
import org.apache.hadoop.fs.HasFileDescriptor;
|
import org.apache.hadoop.fs.HasFileDescriptor;
|
||||||
import org.apache.hadoop.fs.PositionedReadable;
|
import org.apache.hadoop.fs.PositionedReadable;
|
||||||
import org.apache.hadoop.fs.ReadOption;
|
import org.apache.hadoop.fs.ReadOption;
|
||||||
import org.apache.hadoop.fs.Seekable;
|
import org.apache.hadoop.fs.Seekable;
|
||||||
|
import org.apache.hadoop.fs.StreamCapabilities;
|
||||||
|
import org.apache.hadoop.fs.StreamCapabilitiesPolicy;
|
||||||
import org.apache.hadoop.io.ByteBufferPool;
|
import org.apache.hadoop.io.ByteBufferPool;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import com.google.common.base.Preconditions;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is
|
* CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is
|
||||||
|
@ -61,7 +64,7 @@ import com.google.common.base.Preconditions;
|
||||||
public class CryptoInputStream extends FilterInputStream implements
|
public class CryptoInputStream extends FilterInputStream implements
|
||||||
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
|
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
|
||||||
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess,
|
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess,
|
||||||
ReadableByteChannel {
|
ReadableByteChannel, CanUnbuffer, StreamCapabilities {
|
||||||
private final byte[] oneByteBuf = new byte[1];
|
private final byte[] oneByteBuf = new byte[1];
|
||||||
private final CryptoCodec codec;
|
private final CryptoCodec codec;
|
||||||
private final Decryptor decryptor;
|
private final Decryptor decryptor;
|
||||||
|
@ -719,4 +722,27 @@ public class CryptoInputStream extends FilterInputStream implements
|
||||||
public boolean isOpen() {
|
public boolean isOpen() {
|
||||||
return !closed;
|
return !closed;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void cleanDecryptorPool() {
|
||||||
|
decryptorPool.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void unbuffer() {
|
||||||
|
cleanBufferPool();
|
||||||
|
cleanDecryptorPool();
|
||||||
|
StreamCapabilitiesPolicy.unbuffer(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasCapability(String capability) {
|
||||||
|
switch (StringUtils.toLowerCase(capability)) {
|
||||||
|
case StreamCapabilities.READAHEAD:
|
||||||
|
case StreamCapabilities.DROPBEHIND:
|
||||||
|
case StreamCapabilities.UNBUFFER:
|
||||||
|
return true;
|
||||||
|
default:
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.util.EnumSet;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.ByteBufferReadable;
|
import org.apache.hadoop.fs.ByteBufferReadable;
|
||||||
|
import org.apache.hadoop.fs.CanUnbuffer;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FSExceptionMessages;
|
import org.apache.hadoop.fs.FSExceptionMessages;
|
||||||
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
|
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
|
||||||
|
@ -103,6 +104,31 @@ public abstract class CryptoStreamsTestBase {
|
||||||
return total;
|
return total;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int preadAll(PositionedReadable in, byte[] b, int off, int len)
|
||||||
|
throws IOException {
|
||||||
|
int n = 0;
|
||||||
|
int total = 0;
|
||||||
|
while (n != -1) {
|
||||||
|
total += n;
|
||||||
|
if (total >= len) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
n = in.read(total, b, off + total, len - total);
|
||||||
|
}
|
||||||
|
|
||||||
|
return total;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void preadCheck(PositionedReadable in) throws Exception {
|
||||||
|
byte[] result = new byte[dataLen];
|
||||||
|
int n = preadAll(in, result, 0, dataLen);
|
||||||
|
|
||||||
|
Assert.assertEquals(dataLen, n);
|
||||||
|
byte[] expectedData = new byte[n];
|
||||||
|
System.arraycopy(data, 0, expectedData, 0, n);
|
||||||
|
Assert.assertArrayEquals(result, expectedData);
|
||||||
|
}
|
||||||
|
|
||||||
protected OutputStream getOutputStream(int bufferSize) throws IOException {
|
protected OutputStream getOutputStream(int bufferSize) throws IOException {
|
||||||
return getOutputStream(bufferSize, key, iv);
|
return getOutputStream(bufferSize, key, iv);
|
||||||
}
|
}
|
||||||
|
@ -146,7 +172,6 @@ public abstract class CryptoStreamsTestBase {
|
||||||
// EOF
|
// EOF
|
||||||
n = in.read(result, 0, dataLen);
|
n = in.read(result, 0, dataLen);
|
||||||
Assert.assertEquals(n, -1);
|
Assert.assertEquals(n, -1);
|
||||||
in.close();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Test crypto writing with different buffer size. */
|
/** Test crypto writing with different buffer size. */
|
||||||
|
@ -730,4 +755,47 @@ public abstract class CryptoStreamsTestBase {
|
||||||
|
|
||||||
in.close();
|
in.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Test unbuffer. */
|
||||||
|
@Test(timeout=120000)
|
||||||
|
public void testUnbuffer() throws Exception {
|
||||||
|
OutputStream out = getOutputStream(smallBufferSize);
|
||||||
|
writeData(out);
|
||||||
|
|
||||||
|
// Test buffered read
|
||||||
|
try (InputStream in = getInputStream(smallBufferSize)) {
|
||||||
|
// Test unbuffer after buffered read
|
||||||
|
readCheck(in);
|
||||||
|
((CanUnbuffer) in).unbuffer();
|
||||||
|
|
||||||
|
if (in instanceof Seekable) {
|
||||||
|
// Test buffered read again after unbuffer
|
||||||
|
// Must seek to the beginning first
|
||||||
|
((Seekable) in).seek(0);
|
||||||
|
readCheck(in);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test close after unbuffer
|
||||||
|
((CanUnbuffer) in).unbuffer();
|
||||||
|
// The close will be called when exiting this try-with-resource block
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test pread
|
||||||
|
try (InputStream in = getInputStream(smallBufferSize)) {
|
||||||
|
if (in instanceof PositionedReadable) {
|
||||||
|
PositionedReadable pin = (PositionedReadable) in;
|
||||||
|
|
||||||
|
// Test unbuffer after pread
|
||||||
|
preadCheck(pin);
|
||||||
|
((CanUnbuffer) in).unbuffer();
|
||||||
|
|
||||||
|
// Test pread again after unbuffer
|
||||||
|
preadCheck(pin);
|
||||||
|
|
||||||
|
// Test close after unbuffer
|
||||||
|
((CanUnbuffer) in).unbuffer();
|
||||||
|
// The close will be called when exiting this try-with-resource block
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,11 +29,13 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.ByteBufferReadable;
|
import org.apache.hadoop.fs.ByteBufferReadable;
|
||||||
import org.apache.hadoop.fs.CanSetDropBehind;
|
import org.apache.hadoop.fs.CanSetDropBehind;
|
||||||
import org.apache.hadoop.fs.CanSetReadahead;
|
import org.apache.hadoop.fs.CanSetReadahead;
|
||||||
|
import org.apache.hadoop.fs.CanUnbuffer;
|
||||||
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
|
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
|
||||||
import org.apache.hadoop.fs.HasFileDescriptor;
|
import org.apache.hadoop.fs.HasFileDescriptor;
|
||||||
import org.apache.hadoop.fs.PositionedReadable;
|
import org.apache.hadoop.fs.PositionedReadable;
|
||||||
import org.apache.hadoop.fs.ReadOption;
|
import org.apache.hadoop.fs.ReadOption;
|
||||||
import org.apache.hadoop.fs.Seekable;
|
import org.apache.hadoop.fs.Seekable;
|
||||||
|
import org.apache.hadoop.fs.StreamCapabilities;
|
||||||
import org.apache.hadoop.fs.Syncable;
|
import org.apache.hadoop.fs.Syncable;
|
||||||
import org.apache.hadoop.io.ByteBufferPool;
|
import org.apache.hadoop.io.ByteBufferPool;
|
||||||
import org.apache.hadoop.io.DataInputBuffer;
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
|
@ -164,16 +166,18 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class FakeInputStream extends InputStream implements
|
static class FakeInputStream extends InputStream
|
||||||
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
|
implements Seekable, PositionedReadable, ByteBufferReadable,
|
||||||
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess {
|
HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
|
||||||
|
HasEnhancedByteBufferAccess, CanUnbuffer,
|
||||||
|
StreamCapabilities {
|
||||||
private final byte[] oneByteBuf = new byte[1];
|
private final byte[] oneByteBuf = new byte[1];
|
||||||
private int pos = 0;
|
private int pos = 0;
|
||||||
private final byte[] data;
|
private final byte[] data;
|
||||||
private final int length;
|
private final int length;
|
||||||
private boolean closed = false;
|
private boolean closed = false;
|
||||||
|
|
||||||
public FakeInputStream(DataInputBuffer in) {
|
FakeInputStream(DataInputBuffer in) {
|
||||||
data = in.getData();
|
data = in.getData();
|
||||||
length = in.getLength();
|
length = in.getLength();
|
||||||
}
|
}
|
||||||
|
@ -354,6 +358,22 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
|
||||||
UnsupportedOperationException {
|
UnsupportedOperationException {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void unbuffer() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasCapability(String capability) {
|
||||||
|
switch (capability.toLowerCase()) {
|
||||||
|
case StreamCapabilities.READAHEAD:
|
||||||
|
case StreamCapabilities.DROPBEHIND:
|
||||||
|
case StreamCapabilities.UNBUFFER:
|
||||||
|
return true;
|
||||||
|
default:
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public FileDescriptor getFileDescriptor() throws IOException {
|
public FileDescriptor getFileDescriptor() throws IOException {
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -112,4 +112,9 @@ public class TestCryptoStreamsForLocalFS extends CryptoStreamsTestBase {
|
||||||
@Test(timeout=10000)
|
@Test(timeout=10000)
|
||||||
public void testSeekToNewSource() throws Exception {
|
public void testSeekToNewSource() throws Exception {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Ignore("Local file input stream does not support unbuffer")
|
||||||
|
@Override
|
||||||
|
@Test
|
||||||
|
public void testUnbuffer() throws Exception {}
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,4 +120,9 @@ public class TestCryptoStreamsNormal extends CryptoStreamsTestBase {
|
||||||
@Override
|
@Override
|
||||||
@Test(timeout=10000)
|
@Test(timeout=10000)
|
||||||
public void testHasEnhancedByteBufferAccess() throws IOException {}
|
public void testHasEnhancedByteBufferAccess() throws IOException {}
|
||||||
|
|
||||||
|
@Ignore("ByteArrayInputStream does not support unbuffer")
|
||||||
|
@Override
|
||||||
|
@Test
|
||||||
|
public void testUnbuffer() throws Exception {}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue