HADOOP-15149. CryptoOutputStream should implement StreamCapabilities.

(cherry picked from commit 81127616c5)
This commit is contained in:
Xiao Chen 2017-12-29 13:40:42 -08:00
parent 8f9603cb9d
commit 8c275c63ff
3 changed files with 57 additions and 5 deletions

View File

@ -26,6 +26,7 @@ import java.security.GeneralSecurityException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CanSetDropBehind; import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.Syncable;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -47,7 +48,7 @@ import com.google.common.base.Preconditions;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class CryptoOutputStream extends FilterOutputStream implements public class CryptoOutputStream extends FilterOutputStream implements
Syncable, CanSetDropBehind { Syncable, CanSetDropBehind, StreamCapabilities {
private final byte[] oneByteBuf = new byte[1]; private final byte[] oneByteBuf = new byte[1];
private final CryptoCodec codec; private final CryptoCodec codec;
private final Encryptor encryptor; private final Encryptor encryptor;
@ -304,4 +305,12 @@ public class CryptoOutputStream extends FilterOutputStream implements
CryptoStreamUtils.freeDB(inBuffer); CryptoStreamUtils.freeDB(inBuffer);
CryptoStreamUtils.freeDB(outBuffer); CryptoStreamUtils.freeDB(outBuffer);
} }
@Override
public boolean hasCapability(String capability) {
if (out instanceof StreamCapabilities) {
return ((StreamCapabilities) out).hasCapability(capability);
}
return false;
}
} }

View File

@ -50,9 +50,9 @@ public abstract class CryptoStreamsTestBase {
CryptoStreamsTestBase.class); CryptoStreamsTestBase.class);
protected static CryptoCodec codec; protected static CryptoCodec codec;
private static final byte[] key = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, protected static final byte[] key = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06,
0x07, 0x08, 0x09, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16}; 0x07, 0x08, 0x09, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16};
private static final byte[] iv = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06, protected static final byte[] iv = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06,
0x07, 0x08, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08}; 0x07, 0x08, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
protected static final int count = 10000; protected static final int count = 10000;

View File

@ -42,6 +42,10 @@ import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestCryptoStreams extends CryptoStreamsTestBase { public class TestCryptoStreams extends CryptoStreamsTestBase {
/** /**
@ -91,7 +95,7 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
} }
private class FakeOutputStream extends OutputStream private class FakeOutputStream extends OutputStream
implements Syncable, CanSetDropBehind{ implements Syncable, CanSetDropBehind, StreamCapabilities{
private final byte[] oneByteBuf = new byte[1]; private final byte[] oneByteBuf = new byte[1];
private final DataOutputBuffer out; private final DataOutputBuffer out;
private boolean closed; private boolean closed;
@ -153,7 +157,19 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
checkStream(); checkStream();
flush(); flush();
} }
@Override
public boolean hasCapability(String capability) {
switch (capability.toLowerCase()) {
case StreamCapabilities.HFLUSH:
case StreamCapabilities.HSYNC:
case StreamCapabilities.DROPBEHIND:
return true;
default:
return false;
}
}
private void checkStream() throws IOException { private void checkStream() throws IOException {
if (closed) { if (closed) {
throw new IOException("Stream is closed!"); throw new IOException("Stream is closed!");
@ -393,4 +409,31 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff); return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
} }
} }
/**
* This tests {@link StreamCapabilities#hasCapability(String)} for the
* the underlying streams.
*/
@Test(timeout = 120000)
public void testHasCapability() throws Exception {
// verify hasCapability returns what FakeOutputStream is set up for
CryptoOutputStream cos =
(CryptoOutputStream) getOutputStream(defaultBufferSize, key, iv);
assertTrue(cos instanceof StreamCapabilities);
assertTrue(cos.hasCapability(StreamCapabilities.HFLUSH));
assertTrue(cos.hasCapability(StreamCapabilities.HSYNC));
assertTrue(cos.hasCapability(StreamCapabilities.DROPBEHIND));
assertFalse(cos.hasCapability(StreamCapabilities.READAHEAD));
assertFalse(cos.hasCapability(StreamCapabilities.UNBUFFER));
// verify hasCapability for input stream
CryptoInputStream cis =
(CryptoInputStream) getInputStream(defaultBufferSize, key, iv);
assertTrue(cis instanceof StreamCapabilities);
assertTrue(cis.hasCapability(StreamCapabilities.DROPBEHIND));
assertTrue(cis.hasCapability(StreamCapabilities.READAHEAD));
assertTrue(cis.hasCapability(StreamCapabilities.UNBUFFER));
assertFalse(cis.hasCapability(StreamCapabilities.HFLUSH));
assertFalse(cis.hasCapability(StreamCapabilities.HSYNC));
}
} }