HADOOP-15149. CryptoOutputStream should implement StreamCapabilities.
(cherry picked from commit81127616c5
) (cherry picked from commit8c275c63ff
)
This commit is contained in:
parent
50b233d356
commit
e20173d557
|
@ -26,6 +26,7 @@ import java.security.GeneralSecurityException;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.fs.CanSetDropBehind;
|
import org.apache.hadoop.fs.CanSetDropBehind;
|
||||||
|
import org.apache.hadoop.fs.StreamCapabilities;
|
||||||
import org.apache.hadoop.fs.Syncable;
|
import org.apache.hadoop.fs.Syncable;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
@ -47,7 +48,7 @@ import com.google.common.base.Preconditions;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class CryptoOutputStream extends FilterOutputStream implements
|
public class CryptoOutputStream extends FilterOutputStream implements
|
||||||
Syncable, CanSetDropBehind {
|
Syncable, CanSetDropBehind, StreamCapabilities {
|
||||||
private final byte[] oneByteBuf = new byte[1];
|
private final byte[] oneByteBuf = new byte[1];
|
||||||
private final CryptoCodec codec;
|
private final CryptoCodec codec;
|
||||||
private final Encryptor encryptor;
|
private final Encryptor encryptor;
|
||||||
|
@ -310,4 +311,12 @@ public class CryptoOutputStream extends FilterOutputStream implements
|
||||||
CryptoStreamUtils.freeDB(inBuffer);
|
CryptoStreamUtils.freeDB(inBuffer);
|
||||||
CryptoStreamUtils.freeDB(outBuffer);
|
CryptoStreamUtils.freeDB(outBuffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasCapability(String capability) {
|
||||||
|
if (out instanceof StreamCapabilities) {
|
||||||
|
return ((StreamCapabilities) out).hasCapability(capability);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,9 +50,9 @@ public abstract class CryptoStreamsTestBase {
|
||||||
CryptoStreamsTestBase.class);
|
CryptoStreamsTestBase.class);
|
||||||
|
|
||||||
protected static CryptoCodec codec;
|
protected static CryptoCodec codec;
|
||||||
private static final byte[] key = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06,
|
protected static final byte[] key = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06,
|
||||||
0x07, 0x08, 0x09, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16};
|
0x07, 0x08, 0x09, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16};
|
||||||
private static final byte[] iv = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06,
|
protected static final byte[] iv = {0x01, 0x02, 0x03, 0x04, 0x05, 0x06,
|
||||||
0x07, 0x08, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
|
0x07, 0x08, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08};
|
||||||
|
|
||||||
protected static final int count = 10000;
|
protected static final int count = 10000;
|
||||||
|
|
|
@ -42,6 +42,10 @@ import org.apache.hadoop.io.DataInputBuffer;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
public class TestCryptoStreams extends CryptoStreamsTestBase {
|
public class TestCryptoStreams extends CryptoStreamsTestBase {
|
||||||
/**
|
/**
|
||||||
|
@ -91,7 +95,7 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private class FakeOutputStream extends OutputStream
|
private class FakeOutputStream extends OutputStream
|
||||||
implements Syncable, CanSetDropBehind{
|
implements Syncable, CanSetDropBehind, StreamCapabilities{
|
||||||
private final byte[] oneByteBuf = new byte[1];
|
private final byte[] oneByteBuf = new byte[1];
|
||||||
private final DataOutputBuffer out;
|
private final DataOutputBuffer out;
|
||||||
private boolean closed;
|
private boolean closed;
|
||||||
|
@ -158,7 +162,19 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
|
||||||
checkStream();
|
checkStream();
|
||||||
flush();
|
flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean hasCapability(String capability) {
|
||||||
|
switch (capability.toLowerCase()) {
|
||||||
|
case StreamCapabilities.HFLUSH:
|
||||||
|
case StreamCapabilities.HSYNC:
|
||||||
|
case StreamCapabilities.DROPBEHIND:
|
||||||
|
return true;
|
||||||
|
default:
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void checkStream() throws IOException {
|
private void checkStream() throws IOException {
|
||||||
if (closed) {
|
if (closed) {
|
||||||
throw new IOException("Stream is closed!");
|
throw new IOException("Stream is closed!");
|
||||||
|
@ -398,4 +414,31 @@ public class TestCryptoStreams extends CryptoStreamsTestBase {
|
||||||
return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
|
return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This tests {@link StreamCapabilities#hasCapability(String)} for the
|
||||||
|
* the underlying streams.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 120000)
|
||||||
|
public void testHasCapability() throws Exception {
|
||||||
|
// verify hasCapability returns what FakeOutputStream is set up for
|
||||||
|
CryptoOutputStream cos =
|
||||||
|
(CryptoOutputStream) getOutputStream(defaultBufferSize, key, iv);
|
||||||
|
assertTrue(cos instanceof StreamCapabilities);
|
||||||
|
assertTrue(cos.hasCapability(StreamCapabilities.HFLUSH));
|
||||||
|
assertTrue(cos.hasCapability(StreamCapabilities.HSYNC));
|
||||||
|
assertTrue(cos.hasCapability(StreamCapabilities.DROPBEHIND));
|
||||||
|
assertFalse(cos.hasCapability(StreamCapabilities.READAHEAD));
|
||||||
|
assertFalse(cos.hasCapability(StreamCapabilities.UNBUFFER));
|
||||||
|
|
||||||
|
// verify hasCapability for input stream
|
||||||
|
CryptoInputStream cis =
|
||||||
|
(CryptoInputStream) getInputStream(defaultBufferSize, key, iv);
|
||||||
|
assertTrue(cis instanceof StreamCapabilities);
|
||||||
|
assertTrue(cis.hasCapability(StreamCapabilities.DROPBEHIND));
|
||||||
|
assertTrue(cis.hasCapability(StreamCapabilities.READAHEAD));
|
||||||
|
assertTrue(cis.hasCapability(StreamCapabilities.UNBUFFER));
|
||||||
|
assertFalse(cis.hasCapability(StreamCapabilities.HFLUSH));
|
||||||
|
assertFalse(cis.hasCapability(StreamCapabilities.HSYNC));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue