HADOOP-15149. CryptoOutputStream should implement StreamCapabilities.

This commit is contained in:
Xiao Chen 2017-12-29 13:40:42 -08:00
parent b82049b4f0
commit 81127616c5
3 changed files with 57 additions and 5 deletions

View File

@ -26,6 +26,7 @@ import java.security.GeneralSecurityException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import com.google.common.base.Preconditions;
@ -47,7 +48,7 @@ import com.google.common.base.Preconditions;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class CryptoOutputStream extends FilterOutputStream implements
Syncable, CanSetDropBehind {
Syncable, CanSetDropBehind, StreamCapabilities {
private final byte[] oneByteBuf = new byte[1];
private final CryptoCodec codec;
private final Encryptor encryptor;
@ -304,4 +305,12 @@ public class CryptoOutputStream extends FilterOutputStream implements
CryptoStreamUtils.freeDB(inBuffer);
CryptoStreamUtils.freeDB(outBuffer);
}
@Override
public boolean hasCapability(String capability) {
if (out instanceof StreamCapabilities) {
return ((StreamCapabilities) out).hasCapability(capability);
}
return false;
}
}

View File

@ -50,9 +50,9 @@ public abstract class CryptoStreamsTestBase {
CryptoStreamsTestBase.class);
protected static CryptoCodec codec;
private static final byte[] key = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06,
protected static final byte[] key = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06,
0x07, 0x08, 0x09, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16};
private static final byte[] iv = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06,
protected static final byte[] iv = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06,
0x07, 0x08, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
protected static final int count = 10000;

View File

@ -42,6 +42,10 @@ import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
public class TestCryptoStreams extends CryptoStreamsTestBase {
/**
@ -91,7 +95,7 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
}
private class FakeOutputStream extends OutputStream
implements Syncable, CanSetDropBehind{
implements Syncable, CanSetDropBehind, StreamCapabilities{
private final byte[] oneByteBuf = new byte[1];
private final DataOutputBuffer out;
private boolean closed;
@ -153,7 +157,19 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
checkStream();
flush();
}
@Override
public boolean hasCapability(String capability) {
switch (capability.toLowerCase()) {
case StreamCapabilities.HFLUSH:
case StreamCapabilities.HSYNC:
case StreamCapabilities.DROPBEHIND:
return true;
default:
return false;
}
}
private void checkStream() throws IOException {
if (closed) {
throw new IOException("Stream is closed!");
@ -393,4 +409,31 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
}
}
/**
* This tests {@link StreamCapabilities#hasCapability(String)} for the
* the underlying streams.
*/
@Test(timeout = 120000)
public void testHasCapability() throws Exception {
// verify hasCapability returns what FakeOutputStream is set up for
CryptoOutputStream cos =
(CryptoOutputStream) getOutputStream(defaultBufferSize, key, iv);
assertTrue(cos instanceof StreamCapabilities);
assertTrue(cos.hasCapability(StreamCapabilities.HFLUSH));
assertTrue(cos.hasCapability(StreamCapabilities.HSYNC));
assertTrue(cos.hasCapability(StreamCapabilities.DROPBEHIND));
assertFalse(cos.hasCapability(StreamCapabilities.READAHEAD));
assertFalse(cos.hasCapability(StreamCapabilities.UNBUFFER));
// verify hasCapability for input stream
CryptoInputStream cis =
(CryptoInputStream) getInputStream(defaultBufferSize, key, iv);
assertTrue(cis instanceof StreamCapabilities);
assertTrue(cis.hasCapability(StreamCapabilities.DROPBEHIND));
assertTrue(cis.hasCapability(StreamCapabilities.READAHEAD));
assertTrue(cis.hasCapability(StreamCapabilities.UNBUFFER));
assertFalse(cis.hasCapability(StreamCapabilities.HFLUSH));
assertFalse(cis.hasCapability(StreamCapabilities.HSYNC));
}
}