HADOOP-6392. Wire crypto streams for encrypted files in DFSClient. (clamb and yliu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/fs-encryption@1600582 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Charles Lamb 2014-06-05 10:10:48 +00:00
parent d86db3f76f
commit bdee397e95
23 changed files with 612 additions and 47 deletions

View File

@ -10,6 +10,9 @@ fs-encryption (Unreleased)
IMPROVEMENTS IMPROVEMENTS
HADOOP-6392. Wire crypto streams for encrypted files in
DFSClient. (clamb and yliu)
HADOOP-10603. Crypto input and output streams implementing Hadoop stream HADOOP-10603. Crypto input and output streams implementing Hadoop stream
interfaces. (Yi Liu and Charles Lamb) interfaces. (Yi Liu and Charles Lamb)

View File

@ -99,7 +99,7 @@ public class FSDataOutputStream extends DataOutputStream
} }
/** /**
* Get a reference to the wrapped output stream. Used by unit tests. * Get a reference to the wrapped output stream.
* *
* @return the underlying output stream * @return the underlying output stream
*/ */

View File

@ -17,7 +17,6 @@
*/ */
package org.apache.hadoop.fs; package org.apache.hadoop.fs;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
@ -31,12 +30,17 @@ import java.util.NoSuchElementException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoCodec;
import org.apache.hadoop.crypto.CryptoOutputStream;
import org.apache.hadoop.crypto.CryptoInputStream;
import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.hdfs.CorruptFileBlockIterator; import org.apache.hadoop.hdfs.CorruptFileBlockIterator;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
@ -53,11 +57,14 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import com.google.common.base.Preconditions;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class Hdfs extends AbstractFileSystem { public class Hdfs extends AbstractFileSystem {
DFSClient dfs; DFSClient dfs;
final CryptoCodec factory;
private boolean verifyChecksum = true; private boolean verifyChecksum = true;
static { static {
@ -84,6 +91,7 @@ public class Hdfs extends AbstractFileSystem {
} }
this.dfs = new DFSClient(theUri, conf, getStatistics()); this.dfs = new DFSClient(theUri, conf, getStatistics());
this.factory = CryptoCodec.getInstance(conf);
} }
@Override @Override
@ -96,9 +104,27 @@ public class Hdfs extends AbstractFileSystem {
EnumSet<CreateFlag> createFlag, FsPermission absolutePermission, EnumSet<CreateFlag> createFlag, FsPermission absolutePermission,
int bufferSize, short replication, long blockSize, Progressable progress, int bufferSize, short replication, long blockSize, Progressable progress,
ChecksumOpt checksumOpt, boolean createParent) throws IOException { ChecksumOpt checksumOpt, boolean createParent) throws IOException {
return new HdfsDataOutputStream(dfs.primitiveCreate(getUriPath(f),
final DFSOutputStream dfsos = dfs.primitiveCreate(getUriPath(f),
absolutePermission, createFlag, createParent, replication, blockSize, absolutePermission, createFlag, createParent, replication, blockSize,
progress, bufferSize, checksumOpt), getStatistics()); progress, bufferSize, checksumOpt);
final byte[] key = dfsos.getKey();
final byte[] iv = dfsos.getIv();
Preconditions.checkState(!(key == null ^ iv == null),
"Only one of the Key and IV were found.");
if (false && key != null) {
/*
* The Key and IV were found. Wrap up the output stream with an encryption
* wrapper.
*/
final CryptoOutputStream cbos =
new CryptoOutputStream(dfsos, factory, key, iv);
return new HdfsDataOutputStream(cbos, getStatistics());
} else {
/* No key/IV present so no encryption. */
return new HdfsDataOutputStream(dfsos, getStatistics());
}
} }
@Override @Override
@ -307,8 +333,25 @@ public class Hdfs extends AbstractFileSystem {
@Override @Override
public HdfsDataInputStream open(Path f, int bufferSize) public HdfsDataInputStream open(Path f, int bufferSize)
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
return new DFSClient.DFSDataInputStream(dfs.open(getUriPath(f), final DFSInputStream dfsis = dfs.open(getUriPath(f),
bufferSize, verifyChecksum)); bufferSize, verifyChecksum);
final byte[] key = dfsis.getKey();
final byte[] iv = dfsis.getIv();
Preconditions.checkState(!(key == null ^ iv == null),
"Only one of the Key and IV were found.");
if (false && key != null) {
/*
* The Key and IV were found. Wrap up the input stream with an encryption
* wrapper.
*/
final CryptoInputStream cbis =
new CryptoInputStream(dfsis, factory, key, iv);
return new HdfsDataInputStream(cbis);
} else {
/* No key/IV pair so no encryption. */
return new HdfsDataInputStream(dfsis);
}
} }
@Override @Override

View File

@ -88,6 +88,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
private final boolean verifyChecksum; private final boolean verifyChecksum;
private LocatedBlocks locatedBlocks = null; private LocatedBlocks locatedBlocks = null;
private long lastBlockBeingWrittenLength = 0; private long lastBlockBeingWrittenLength = 0;
private byte[] key = null;
private byte[] iv = null;
private DatanodeInfo currentNode = null; private DatanodeInfo currentNode = null;
private LocatedBlock currentLocatedBlock = null; private LocatedBlock currentLocatedBlock = null;
private long pos = 0; private long pos = 0;
@ -297,6 +299,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
} }
} }
key = locatedBlocks.getKey();
iv = locatedBlocks.getIv();
currentNode = null; currentNode = null;
return lastBlockBeingWrittenLength; return lastBlockBeingWrittenLength;
} }
@ -1517,6 +1521,24 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
return new ReadStatistics(readStatistics); return new ReadStatistics(readStatistics);
} }
/**
* Get the encryption key for this stream.
*
* @return byte[] the key
*/
public synchronized byte[] getKey() {
return key;
}
/**
* Get the encryption initialization vector (IV) for this stream.
*
* @return byte[] the initialization vector (IV).
*/
public synchronized byte[] getIv() {
return iv;
}
private synchronized void closeCurrentBlockReader() { private synchronized void closeCurrentBlockReader() {
if (blockReader == null) return; if (blockReader == null) return;
// Close the current block reader so that the new caching settings can // Close the current block reader so that the new caching settings can

View File

@ -154,6 +154,8 @@ public class DFSOutputStream extends FSOutputSummer
private boolean shouldSyncBlock = false; // force blocks to disk upon close private boolean shouldSyncBlock = false; // force blocks to disk upon close
private final AtomicReference<CachingStrategy> cachingStrategy; private final AtomicReference<CachingStrategy> cachingStrategy;
private boolean failPacket = false; private boolean failPacket = false;
private byte[] key = null;
private byte[] iv = null;
private static class Packet { private static class Packet {
private static final long HEART_BEAT_SEQNO = -1L; private static final long HEART_BEAT_SEQNO = -1L;
@ -1562,6 +1564,8 @@ public class DFSOutputStream extends FSOutputSummer
this.fileId = stat.getFileId(); this.fileId = stat.getFileId();
this.blockSize = stat.getBlockSize(); this.blockSize = stat.getBlockSize();
this.blockReplication = stat.getReplication(); this.blockReplication = stat.getReplication();
this.key = stat.getKey();
this.iv = stat.getIv();
this.progress = progress; this.progress = progress;
this.cachingStrategy = new AtomicReference<CachingStrategy>( this.cachingStrategy = new AtomicReference<CachingStrategy>(
dfsClient.getDefaultWriteCachingStrategy()); dfsClient.getDefaultWriteCachingStrategy());
@ -2178,6 +2182,24 @@ public class DFSOutputStream extends FSOutputSummer
return initialFileSize; return initialFileSize;
} }
/**
* Get the encryption key for this stream.
*
* @return byte[] the key.
*/
public byte[] getKey() {
return key;
}
/**
* Get the encryption initialization vector (IV) for this stream.
*
* @return byte[] the initialization vector (IV).
*/
public byte[] getIv() {
return iv;
}
/** /**
* Returns the access token currently used by streamer, for testing only * Returns the access token currently used by streamer, for testing only
*/ */

View File

@ -17,17 +17,21 @@
*/ */
package org.apache.hadoop.hdfs.client; package org.apache.hadoop.hdfs.client;
import java.io.InputStream;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.crypto.CryptoInputStream;
import org.apache.hadoop.hdfs.DFSInputStream; import org.apache.hadoop.hdfs.DFSInputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import com.google.common.base.Preconditions;
/** /**
* The Hdfs implementation of {@link FSDataInputStream}. * The Hdfs implementation of {@link FSDataInputStream}.
*/ */
@ -38,25 +42,49 @@ public class HdfsDataInputStream extends FSDataInputStream {
super(in); super(in);
} }
public HdfsDataInputStream(CryptoInputStream in) throws IOException {
super(in);
Preconditions.checkArgument(in.getWrappedStream() instanceof DFSInputStream,
"CryptoInputStream should wrap a DFSInputStream");
}
private DFSInputStream getDFSInputStream() {
if (in instanceof CryptoInputStream) {
return (DFSInputStream) ((CryptoInputStream) in).getWrappedStream();
}
return (DFSInputStream) in;
}
/**
* Get a reference to the wrapped output stream. We always want to return the
* actual underlying InputStream, even when we're using a CryptoStream. e.g.
* in the delegated methods below.
*
* @return the underlying output stream
*/
public InputStream getWrappedStream() {
return in;
}
/** /**
* Get the datanode from which the stream is currently reading. * Get the datanode from which the stream is currently reading.
*/ */
public DatanodeInfo getCurrentDatanode() { public DatanodeInfo getCurrentDatanode() {
return ((DFSInputStream) in).getCurrentDatanode(); return getDFSInputStream().getCurrentDatanode();
} }
/** /**
* Get the block containing the target position. * Get the block containing the target position.
*/ */
public ExtendedBlock getCurrentBlock() { public ExtendedBlock getCurrentBlock() {
return ((DFSInputStream) in).getCurrentBlock(); return getDFSInputStream().getCurrentBlock();
} }
/** /**
* Get the collection of blocks that has already been located. * Get the collection of blocks that has already been located.
*/ */
public synchronized List<LocatedBlock> getAllBlocks() throws IOException { public synchronized List<LocatedBlock> getAllBlocks() throws IOException {
return ((DFSInputStream) in).getAllBlocks(); return getDFSInputStream().getAllBlocks();
} }
/** /**
@ -66,7 +94,7 @@ public class HdfsDataInputStream extends FSDataInputStream {
* @return The visible length of the file. * @return The visible length of the file.
*/ */
public long getVisibleLength() throws IOException { public long getVisibleLength() throws IOException {
return ((DFSInputStream) in).getFileLength(); return getDFSInputStream().getFileLength();
} }
/** /**
@ -76,6 +104,6 @@ public class HdfsDataInputStream extends FSDataInputStream {
* bytes read through HdfsDataInputStream. * bytes read through HdfsDataInputStream.
*/ */
public synchronized DFSInputStream.ReadStatistics getReadStatistics() { public synchronized DFSInputStream.ReadStatistics getReadStatistics() {
return ((DFSInputStream) in).getReadStatistics(); return getDFSInputStream().getReadStatistics();
} }
} }

View File

@ -18,14 +18,18 @@
package org.apache.hadoop.hdfs.client; package org.apache.hadoop.hdfs.client;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.util.EnumSet; import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.crypto.CryptoOutputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DFSOutputStream;
import com.google.common.base.Preconditions;
/** /**
* The Hdfs implementation of {@link FSDataOutputStream}. * The Hdfs implementation of {@link FSDataOutputStream}.
*/ */
@ -42,6 +46,18 @@ public class HdfsDataOutputStream extends FSDataOutputStream {
this(out, stats, 0L); this(out, stats, 0L);
} }
public HdfsDataOutputStream(CryptoOutputStream out, FileSystem.Statistics stats,
long startPosition) throws IOException {
super(out, stats, startPosition);
Preconditions.checkArgument(out.getWrappedStream() instanceof DFSOutputStream,
"CryptoOutputStream should wrap a DFSOutputStream");
}
public HdfsDataOutputStream(CryptoOutputStream out, FileSystem.Statistics stats)
throws IOException {
this(out, stats, 0L);
}
/** /**
* Get the actual number of replicas of the current block. * Get the actual number of replicas of the current block.
* *
@ -55,7 +71,11 @@ public class HdfsDataOutputStream extends FSDataOutputStream {
* @return the number of valid replicas of the current block * @return the number of valid replicas of the current block
*/ */
public synchronized int getCurrentBlockReplication() throws IOException { public synchronized int getCurrentBlockReplication() throws IOException {
return ((DFSOutputStream)getWrappedStream()).getCurrentBlockReplication(); OutputStream wrappedStream = getWrappedStream();
if (wrappedStream instanceof CryptoOutputStream) {
wrappedStream = ((CryptoOutputStream) wrappedStream).getWrappedStream();
}
return ((DFSOutputStream) wrappedStream).getCurrentBlockReplication();
} }
/** /**
@ -67,13 +87,19 @@ public class HdfsDataOutputStream extends FSDataOutputStream {
* @see FSDataOutputStream#hsync() * @see FSDataOutputStream#hsync()
*/ */
public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException { public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
((DFSOutputStream) getWrappedStream()).hsync(syncFlags); OutputStream wrappedStream = getWrappedStream();
if (wrappedStream instanceof CryptoOutputStream) {
((CryptoOutputStream) wrappedStream).flush();
wrappedStream = ((CryptoOutputStream) wrappedStream).getWrappedStream();
}
((DFSOutputStream) wrappedStream).hsync(syncFlags);
} }
public static enum SyncFlag { public static enum SyncFlag {
/** /**
* When doing sync to DataNodes, also update the metadata (block * When doing sync to DataNodes, also update the metadata (block length) in
* length) in the NameNode * the NameNode.
*/ */
UPDATE_LENGTH; UPDATE_LENGTH;
} }

View File

@ -161,4 +161,8 @@ public class HdfsConstants {
public static final String SEPARATOR_DOT_SNAPSHOT_DIR public static final String SEPARATOR_DOT_SNAPSHOT_DIR
= Path.SEPARATOR + DOT_SNAPSHOT_DIR; = Path.SEPARATOR + DOT_SNAPSHOT_DIR;
/* Temporary until we stop hard-coding these values. */
public static final byte[] KEY = "0123456789012345".getBytes();
public static final byte[] IV = "ABCDEFGJIJKLMNOP".getBytes();
} }

View File

@ -45,6 +45,9 @@ public class HdfsFileStatus {
private final String group; private final String group;
private final long fileId; private final long fileId;
private final byte[] key;
private final byte[] iv;
// Used by dir, not including dot and dotdot. Always zero for a regular file. // Used by dir, not including dot and dotdot. Always zero for a regular file.
private final int childrenNum; private final int childrenNum;
@ -66,8 +69,17 @@ public class HdfsFileStatus {
*/ */
public HdfsFileStatus(long length, boolean isdir, int block_replication, public HdfsFileStatus(long length, boolean isdir, int block_replication,
long blocksize, long modification_time, long access_time, long blocksize, long modification_time, long access_time,
FsPermission permission, String owner, String group, FsPermission permission, String owner, String group, byte[] symlink,
byte[] symlink, byte[] path, long fileId, int childrenNum) { byte[] path, long fileId, int childrenNum) {
this(length, isdir, block_replication, blocksize, modification_time,
access_time, permission, owner, group, symlink, path, fileId,
childrenNum, HdfsConstants.KEY, HdfsConstants.IV);
}
public HdfsFileStatus(long length, boolean isdir, int block_replication,
long blocksize, long modification_time, long access_time,
FsPermission permission, String owner, String group, byte[] symlink,
byte[] path, long fileId, int childrenNum, byte[] key, byte[] iv) {
this.length = length; this.length = length;
this.isdir = isdir; this.isdir = isdir;
this.block_replication = (short)block_replication; this.block_replication = (short)block_replication;
@ -85,6 +97,8 @@ public class HdfsFileStatus {
this.path = path; this.path = path;
this.fileId = fileId; this.fileId = fileId;
this.childrenNum = childrenNum; this.childrenNum = childrenNum;
this.key = key;
this.iv = iv;
} }
/** /**
@ -238,6 +252,14 @@ public class HdfsFileStatus {
return fileId; return fileId;
} }
final public byte[] getKey() {
return key;
}
final public byte[] getIv() {
return iv;
}
final public int getChildrenNum() { final public int getChildrenNum() {
return childrenNum; return childrenNum;
} }

View File

@ -56,10 +56,10 @@ public class HdfsLocatedFileStatus extends HdfsFileStatus {
int block_replication, long blocksize, long modification_time, int block_replication, long blocksize, long modification_time,
long access_time, FsPermission permission, String owner, String group, long access_time, FsPermission permission, String owner, String group,
byte[] symlink, byte[] path, long fileId, LocatedBlocks locations, byte[] symlink, byte[] path, long fileId, LocatedBlocks locations,
int childrenNum) { int childrenNum, byte[] key, byte[] iv) {
super(length, isdir, block_replication, blocksize, modification_time, super(length, isdir, block_replication, blocksize, modification_time,
access_time, permission, owner, group, symlink, path, fileId, access_time, permission, owner, group, symlink, path, fileId,
childrenNum); childrenNum, key, iv);
this.locations = locations; this.locations = locations;
} }

View File

@ -35,22 +35,27 @@ public class LocatedBlocks {
private final boolean underConstruction; private final boolean underConstruction;
private LocatedBlock lastLocatedBlock = null; private LocatedBlock lastLocatedBlock = null;
private boolean isLastBlockComplete = false; private boolean isLastBlockComplete = false;
private final byte[] key;
private final byte[] iv;
public LocatedBlocks() { public LocatedBlocks() {
fileLength = 0; fileLength = 0;
blocks = null; blocks = null;
underConstruction = false; underConstruction = false;
key = null;
iv = null;
} }
/** public Constructor */
public LocatedBlocks(long flength, boolean isUnderConstuction, public LocatedBlocks(long flength, boolean isUnderConstuction,
List<LocatedBlock> blks, List<LocatedBlock> blks, LocatedBlock lastBlock,
LocatedBlock lastBlock, boolean isLastBlockCompleted) { boolean isLastBlockCompleted, byte[] key, byte[] iv) {
fileLength = flength; fileLength = flength;
blocks = blks; blocks = blks;
underConstruction = isUnderConstuction; underConstruction = isUnderConstuction;
this.lastLocatedBlock = lastBlock; this.lastLocatedBlock = lastBlock;
this.isLastBlockComplete = isLastBlockCompleted; this.isLastBlockComplete = isLastBlockCompleted;
this.key = key;
this.iv = iv;
} }
/** /**
@ -92,13 +97,21 @@ public class LocatedBlocks {
} }
/** /**
* Return ture if file was under construction when * Return true if file was under construction when this LocatedBlocks was
* this LocatedBlocks was constructed, false otherwise. * constructed, false otherwise.
*/ */
public boolean isUnderConstruction() { public boolean isUnderConstruction() {
return underConstruction; return underConstruction;
} }
public byte[] getKey() {
return key;
}
public byte[] getIv() {
return iv;
}
/** /**
* Find block containing specified offset. * Find block containing specified offset.
* *

View File

@ -61,7 +61,7 @@ public class SnapshottableDirectoryStatus {
int snapshotNumber, int snapshotQuota, byte[] parentFullPath) { int snapshotNumber, int snapshotQuota, byte[] parentFullPath) {
this.dirStatus = new HdfsFileStatus(0, true, 0, 0, modification_time, this.dirStatus = new HdfsFileStatus(0, true, 0, 0, modification_time,
access_time, permission, owner, group, null, localName, inodeId, access_time, permission, owner, group, null, localName, inodeId,
childrenNum); childrenNum, null /* key */, null /* IV */);
this.snapshotNumber = snapshotNumber; this.snapshotNumber = snapshotNumber;
this.snapshotQuota = snapshotQuota; this.snapshotQuota = snapshotQuota;
this.parentFullPath = parentFullPath; this.parentFullPath = parentFullPath;

View File

@ -1127,7 +1127,9 @@ public class PBHelper {
lb.getFileLength(), lb.getUnderConstruction(), lb.getFileLength(), lb.getUnderConstruction(),
PBHelper.convertLocatedBlock(lb.getBlocksList()), PBHelper.convertLocatedBlock(lb.getBlocksList()),
lb.hasLastBlock() ? PBHelper.convert(lb.getLastBlock()) : null, lb.hasLastBlock() ? PBHelper.convert(lb.getLastBlock()) : null,
lb.getIsLastBlockComplete()); lb.getIsLastBlockComplete(),
lb.hasKey() ? lb.getKey().toByteArray() : null,
lb.hasIv() ? lb.getIv().toByteArray() : null);
} }
public static LocatedBlocksProto convert(LocatedBlocks lb) { public static LocatedBlocksProto convert(LocatedBlocks lb) {
@ -1139,6 +1141,12 @@ public class PBHelper {
if (lb.getLastLocatedBlock() != null) { if (lb.getLastLocatedBlock() != null) {
builder.setLastBlock(PBHelper.convert(lb.getLastLocatedBlock())); builder.setLastBlock(PBHelper.convert(lb.getLastLocatedBlock()));
} }
if (lb.getKey() != null) {
builder.setKey(ByteString.copyFrom(lb.getKey()));
}
if (lb.getIv() != null) {
builder.setIv(ByteString.copyFrom(lb.getIv()));
}
return builder.setFileLength(lb.getFileLength()) return builder.setFileLength(lb.getFileLength())
.setUnderConstruction(lb.isUnderConstruction()) .setUnderConstruction(lb.isUnderConstruction())
.addAllBlocks(PBHelper.convertLocatedBlock2(lb.getLocatedBlocks())) .addAllBlocks(PBHelper.convertLocatedBlock2(lb.getLocatedBlocks()))
@ -1264,7 +1272,9 @@ public class PBHelper {
fs.getPath().toByteArray(), fs.getPath().toByteArray(),
fs.hasFileId()? fs.getFileId(): INodeId.GRANDFATHER_INODE_ID, fs.hasFileId()? fs.getFileId(): INodeId.GRANDFATHER_INODE_ID,
fs.hasLocations() ? PBHelper.convert(fs.getLocations()) : null, fs.hasLocations() ? PBHelper.convert(fs.getLocations()) : null,
fs.hasChildrenNum() ? fs.getChildrenNum() : -1); fs.hasChildrenNum() ? fs.getChildrenNum() : -1,
fs.hasKey() ? fs.getKey().toByteArray() : null,
fs.hasIv() ? fs.getIv().toByteArray() : null);
} }
public static SnapshottableDirectoryStatus convert( public static SnapshottableDirectoryStatus convert(
@ -1314,6 +1324,12 @@ public class PBHelper {
if (fs.isSymlink()) { if (fs.isSymlink()) {
builder.setSymlink(ByteString.copyFrom(fs.getSymlinkInBytes())); builder.setSymlink(ByteString.copyFrom(fs.getSymlinkInBytes()));
} }
if (fs.getKey() != null) {
builder.setKey(ByteString.copyFrom(fs.getKey()));
}
if (fs.getIv() != null) {
builder.setIv(ByteString.copyFrom(fs.getIv()));
}
if (fs instanceof HdfsLocatedFileStatus) { if (fs instanceof HdfsLocatedFileStatus) {
LocatedBlocks locations = ((HdfsLocatedFileStatus)fs).getBlockLocations(); LocatedBlocks locations = ((HdfsLocatedFileStatus)fs).getBlockLocations();
if (locations != null) { if (locations != null) {

View File

@ -52,6 +52,8 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException; import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
@ -847,7 +849,8 @@ public class BlockManager {
return null; return null;
} else if (blocks.length == 0) { } else if (blocks.length == 0) {
return new LocatedBlocks(0, isFileUnderConstruction, return new LocatedBlocks(0, isFileUnderConstruction,
Collections.<LocatedBlock>emptyList(), null, false); Collections.<LocatedBlock>emptyList(), null, false,
null /* key */, null /* IV */);
} else { } else {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("blocks = " + java.util.Arrays.asList(blocks)); LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
@ -872,7 +875,8 @@ public class BlockManager {
} }
return new LocatedBlocks( return new LocatedBlocks(
fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction, fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction,
locatedblocks, lastlb, isComplete); locatedblocks, lastlb, isComplete,
HdfsConstants.KEY, HdfsConstants.IV);
} }
} }

View File

@ -1640,7 +1640,7 @@ public class FSDirectory implements Closeable {
throws UnresolvedLinkException { throws UnresolvedLinkException {
if (getINode4DotSnapshot(src) != null) { if (getINode4DotSnapshot(src) != null) {
return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null, return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
HdfsFileStatus.EMPTY_NAME, -1L, 0); HdfsFileStatus.EMPTY_NAME, -1L, 0, null /* key */, null /* IV */);
} }
return null; return null;
} }
@ -2611,7 +2611,9 @@ public class FSDirectory implements Closeable {
node.isSymlink() ? node.asSymlink().getSymlink() : null, node.isSymlink() ? node.asSymlink().getSymlink() : null,
path, path,
node.getId(), node.getId(),
childrenNum); childrenNum,
HdfsConstants.KEY, // key
HdfsConstants.IV); // IV
} }
/** /**
@ -2651,7 +2653,7 @@ public class FSDirectory implements Closeable {
getPermissionForFileStatus(node, snapshot), getPermissionForFileStatus(node, snapshot),
node.getUserName(snapshot), node.getGroupName(snapshot), node.getUserName(snapshot), node.getGroupName(snapshot),
node.isSymlink() ? node.asSymlink().getSymlink() : null, path, node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
node.getId(), loc, childrenNum); node.getId(), loc, childrenNum, null /* key */, null /* IV */);
// Set caching information for the located blocks. // Set caching information for the located blocks.
if (loc != null) { if (loc != null) {
CacheManager cacheManager = namesystem.getCacheManager(); CacheManager cacheManager = namesystem.getCacheManager();

View File

@ -251,7 +251,8 @@ public class JsonUtil {
: childrenNumLong.intValue(); : childrenNumLong.intValue();
return new HdfsFileStatus(len, type == PathType.DIRECTORY, replication, return new HdfsFileStatus(len, type == PathType.DIRECTORY, replication,
blockSize, mTime, aTime, permission, owner, group, blockSize, mTime, aTime, permission, owner, group,
symlink, DFSUtil.string2Bytes(localName), fileId, childrenNum); symlink, DFSUtil.string2Bytes(localName), fileId, childrenNum,
null /* key */, null /* IV */);
} }
/** Convert an ExtendedBlock to a Json map. */ /** Convert an ExtendedBlock to a Json map. */
@ -531,7 +532,7 @@ public class JsonUtil {
(Map<?, ?>)m.get("lastLocatedBlock")); (Map<?, ?>)m.get("lastLocatedBlock"));
final boolean isLastBlockComplete = (Boolean)m.get("isLastBlockComplete"); final boolean isLastBlockComplete = (Boolean)m.get("isLastBlockComplete");
return new LocatedBlocks(fileLength, isUnderConstruction, locatedBlocks, return new LocatedBlocks(fileLength, isUnderConstruction, locatedBlocks,
lastLocatedBlock, isLastBlockComplete); lastLocatedBlock, isLastBlockComplete, null /* key */, null /* IV */);
} }
/** Convert a ContentSummary to a Json string. */ /** Convert a ContentSummary to a Json string. */

View File

@ -179,6 +179,8 @@ message LocatedBlocksProto {
required bool underConstruction = 3; required bool underConstruction = 3;
optional LocatedBlockProto lastBlock = 4; optional LocatedBlockProto lastBlock = 4;
required bool isLastBlockComplete = 5; required bool isLastBlockComplete = 5;
optional bytes key = 6;
optional bytes iv = 7;
} }
@ -212,6 +214,10 @@ message HdfsFileStatusProto {
// Optional field for fileId // Optional field for fileId
optional uint64 fileId = 13 [default = 0]; // default as an invalid id optional uint64 fileId = 13 [default = 0]; // default as an invalid id
optional int32 childrenNum = 14 [default = -1]; optional int32 childrenNum = 14 [default = -1];
// Optional fields for key/iv for encryption
optional bytes key = 15;
optional bytes iv = 16;
} }
/** /**

View File

@ -0,0 +1,352 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.FileContextTestHelper.getDefaultBlockSize;
import static org.apache.hadoop.fs.FileContextTestHelper.getFileData;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.UUID;
import javax.security.auth.login.LoginException;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestHDFSEncryption {
private static MiniDFSCluster cluster;
private static Path defaultWorkingDirectory;
private static final HdfsConfiguration CONF = new HdfsConfiguration();
private static FileContext fc;
private Path localFsRootPath;
private Path src1;
/* The KeyProvider, if any. */
private static KeyProvider provider = null;
private static File tmpDir;
@BeforeClass
public static void clusterSetupAtBegining() throws IOException,
LoginException, URISyntaxException {
tmpDir = new File(System.getProperty("test.build.data", "target"),
UUID.randomUUID().toString()).getAbsoluteFile();
tmpDir.mkdirs();
CONF.set(KeyProviderFactory.KEY_PROVIDER_PATH,
JavaKeyStoreProvider.SCHEME_NAME + "://file" + tmpDir + "/test.jks");
initializeKeyProvider(CONF);
try {
createOneKey();
KeyVersion blort = provider.getCurrentKey("blort");
} catch (java.security.NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(1).build();
cluster.waitClusterUp();
URI uri0 = cluster.getURI(0);
fc = FileContext.getFileContext(uri0, CONF);
defaultWorkingDirectory = fc.makeQualified(new Path("/user/" +
UserGroupInformation.getCurrentUser().getShortUserName()));
fc.mkdir(defaultWorkingDirectory, FileContext.DEFAULT_PERM, true);
}
private static void initializeKeyProvider(final Configuration conf)
throws IOException {
final List<KeyProvider> providers = KeyProviderFactory.getProviders(conf);
if (providers == null) {
return;
}
if (providers.size() == 0) {
return;
}
if (providers.size() > 1) {
final String err =
"Multiple KeyProviders found. Only one is permitted.";
throw new RuntimeException(err);
}
provider = providers.get(0);
if (provider.isTransient()) {
final String err =
"A KeyProvider was found but it is a transient provider.";
throw new RuntimeException(err);
}
}
private static void createOneKey()
throws java.security.NoSuchAlgorithmException, IOException {
final org.apache.hadoop.crypto.key.KeyProvider.Options options =
KeyProvider.options(CONF);
provider.createKey("blort", options);
provider.flush();
}
@AfterClass
public static void ClusterShutdownAtEnd() throws Exception {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@Before
public void setUp() throws Exception {
File testBuildData = new File(System.getProperty("test.build.data",
"build/test/data"), RandomStringUtils.randomAlphanumeric(10));
Path rootPath = new Path(testBuildData.getAbsolutePath(),
"root-uri");
localFsRootPath = rootPath.makeQualified(LocalFileSystem.NAME, null);
fc.mkdir(getTestRootPath(fc, "test"), FileContext.DEFAULT_PERM, true);
src1 = getTestRootPath(fc, "testfile");
}
@After
public void tearDown() throws Exception {
final boolean del =
fc.delete(new Path(fileContextTestHelper.getAbsoluteTestRootPath(fc), new Path("test")), true);
assertTrue(del);
fc.delete(localFsRootPath, true);
}
protected final FileContextTestHelper fileContextTestHelper =
createFileContextHelper();
protected FileContextTestHelper createFileContextHelper() {
return new FileContextTestHelper();
}
protected Path getDefaultWorkingDirectory() {
return defaultWorkingDirectory;
}
private Path getTestRootPath(FileContext fc, String path) {
return fileContextTestHelper.getTestRootPath(fc, path);
}
protected IOException unwrapException(IOException e) {
if (e instanceof RemoteException) {
return ((RemoteException) e).unwrapRemoteException();
}
return e;
}
private static final int NUM_BLOCKS = 3;
private static final byte[] data = getFileData(NUM_BLOCKS,
getDefaultBlockSize());
private void writeSomeData() throws Exception {
writeSomeData(false, false);
}
private void writeSomeData(boolean doHFlush, boolean doHSync) throws Exception {
final FSDataOutputStream out =
fc.create(src1, EnumSet.of(CREATE), Options.CreateOpts.createParent());
out.write(data, 0, data.length);
if (doHFlush) {
out.hflush();
}
if (doHSync) {
out.hsync();
}
out.close();
}
private void writeAndVerify(boolean doHFlush, boolean doHSync) throws Exception {
writeSomeData(doHFlush, doHSync);
final FSDataInputStream in = fc.open(src1);
try {
final byte[] readBuf = new byte[getDefaultBlockSize() * NUM_BLOCKS];
in.readFully(readBuf);
assertTrue("Expected read-back data to be equal (hflush=" + doHFlush
+ " hfsync=" + doHSync + ")", Arrays.equals(data, readBuf));
} finally {
in.close();
}
}
@Test
public void testBasicEncryptionStreamNoFlushNoSync() throws Exception {
writeAndVerify(false, false);
}
@Test
public void testBasicEncryptionStreamFlushSync() throws Exception {
writeAndVerify(true, true);
}
@Test
public void testBasicEncryptionStreamNoFlushSync() throws Exception {
writeAndVerify(false, true);
}
@Test
public void testBasicEncryptionStreamFlushNoSync() throws Exception {
writeAndVerify(true, false);
}
@Test
public void testGetPos() throws Exception {
writeSomeData();
final FSDataInputStream in = fc.open(src1);
int expectedGetPos = 0;
while (in.read() != -1) {
assertTrue(++expectedGetPos == in.getPos());
}
}
@Test
public void testDoubleClose() throws Exception {
writeSomeData();
final FSDataInputStream in = fc.open(src1);
in.close();
try {
in.close();
} catch (Exception e) {
fail("Caught unexpected exception on double-close: " + e);
}
}
@Test
public void testHFlush() throws Exception {
final DistributedFileSystem fs = cluster.getFileSystem();
final FSDataOutputStream out =
fc.create(src1, EnumSet.of(CREATE), Options.CreateOpts.createParent());
out.write(data, 0, data.length);
out.hflush();
out.close();
}
@Test
public void testSeekBogusArgs() throws Exception {
writeSomeData();
final FSDataInputStream in = fc.open(src1);
try {
in.seek(-1);
fail("Expected IOException");
} catch (Exception e) {
GenericTestUtils.assertExceptionContains("Cannot seek to negative offset", e);
}
try {
in.seek(1 << 20);
fail("Expected IOException");
} catch (Exception e) {
GenericTestUtils.assertExceptionContains("Cannot seek after EOF", e);
}
in.close();
}
@Test
public void testSeekForward() throws Exception {
writeSomeData();
final FSDataInputStream in = fc.open(src1);
for (int seekInc = 1; seekInc < 1024; seekInc += 32) {
long seekTo = 0;
while (seekTo < data.length) {
in.seek(seekTo);
int b = in.read();
byte expected = data[(int) seekTo];
assertTrue("seek(" + seekTo + ") Expected: " + expected + ", but got: " + b,
b == expected);
seekTo += seekInc;
}
}
in.close();
}
@Test
public void testSeekBackwards() throws Exception {
writeSomeData();
final FSDataInputStream in = fc.open(src1);
for (int seekInc = 1; seekInc < 1024; seekInc += 32) {
long seekTo = data.length - 1;
while (seekTo >= 0) {
in.seek(seekTo);
int b = in.read();
byte expected = data[(int) seekTo];
assertTrue("seek(" + seekTo + ") Expected: " + expected + ", but got: " + b,
b == expected);
seekTo -= seekInc;
}
}
in.close();
}
@Test
public void testPostionedReadable() throws Exception {
writeSomeData();
final FSDataInputStream in = fc.open(src1);
try {
final byte[] oneByteToRead = new byte[1];
for (int i = 0; i < data.length; i++) {
int nread = in.read(i, oneByteToRead, 0, 1);
final byte b = oneByteToRead[0];
byte expected = data[(int) i];
assertTrue("read() expected only one byte to be read, but got " + nread, nread == 1);
assertTrue("read() expected: " + expected + ", but got: " + b,
b == expected);
}
} finally {
in.close();
}
}
}

View File

@ -253,12 +253,12 @@ public class TestDFSClientRetries {
Mockito.doReturn( Mockito.doReturn(
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission( new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
(short) 777), "owner", "group", new byte[0], new byte[0], (short) 777), "owner", "group", new byte[0], new byte[0],
1010, 0)).when(mockNN).getFileInfo(anyString()); 1010, 0, null, null)).when(mockNN).getFileInfo(anyString());
Mockito.doReturn( Mockito.doReturn(
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission( new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
(short) 777), "owner", "group", new byte[0], new byte[0], (short) 777), "owner", "group", new byte[0], new byte[0],
1010, 0)) 1010, 0, null, null))
.when(mockNN) .when(mockNN)
.create(anyString(), (FsPermission) anyObject(), anyString(), .create(anyString(), (FsPermission) anyObject(), anyString(),
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(), (EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),
@ -494,7 +494,8 @@ public class TestDFSClientRetries {
List<LocatedBlock> badBlocks = new ArrayList<LocatedBlock>(); List<LocatedBlock> badBlocks = new ArrayList<LocatedBlock>();
badBlocks.add(badLocatedBlock); badBlocks.add(badLocatedBlock);
return new LocatedBlocks(goodBlockList.getFileLength(), false, return new LocatedBlocks(goodBlockList.getFileLength(), false,
badBlocks, null, true); badBlocks, null, true,
null /* key */, null /* IV */);
} }
} }

View File

@ -95,7 +95,7 @@ public class TestDFSUtil {
LocatedBlock l2 = new LocatedBlock(b2, ds, 0, true); LocatedBlock l2 = new LocatedBlock(b2, ds, 0, true);
List<LocatedBlock> ls = Arrays.asList(l1, l2); List<LocatedBlock> ls = Arrays.asList(l1, l2);
LocatedBlocks lbs = new LocatedBlocks(10, false, ls, l2, true); LocatedBlocks lbs = new LocatedBlocks(10, false, ls, l2, true, null, null);
BlockLocation[] bs = DFSUtil.locatedBlocks2Locations(lbs); BlockLocation[] bs = DFSUtil.locatedBlocks2Locations(lbs);

View File

@ -339,12 +339,12 @@ public class TestLease {
Mockito.doReturn( Mockito.doReturn(
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission( new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
(short) 777), "owner", "group", new byte[0], new byte[0], (short) 777), "owner", "group", new byte[0], new byte[0],
1010, 0)).when(mcp).getFileInfo(anyString()); 1010, 0, null, null)).when(mcp).getFileInfo(anyString());
Mockito Mockito
.doReturn( .doReturn(
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission( new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
(short) 777), "owner", "group", new byte[0], new byte[0], (short) 777), "owner", "group", new byte[0], new byte[0],
1010, 0)) 1010, 0, null, null))
.when(mcp) .when(mcp)
.create(anyString(), (FsPermission) anyObject(), anyString(), .create(anyString(), (FsPermission) anyObject(), anyString(),
(EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(), (EnumSetWritable<CreateFlag>) anyObject(), anyBoolean(),

View File

@ -1015,7 +1015,7 @@ public class TestFsck {
HdfsFileStatus file = new HdfsFileStatus(length, isDir, blockReplication, HdfsFileStatus file = new HdfsFileStatus(length, isDir, blockReplication,
blockSize, modTime, accessTime, perms, owner, group, symlink, path, blockSize, modTime, accessTime, perms, owner, group, symlink, path,
fileId, numChildren); fileId, numChildren, null, null);
Result res = new Result(conf); Result res = new Result(conf);
try { try {

View File

@ -64,7 +64,7 @@ public class TestJsonUtil {
final HdfsFileStatus status = new HdfsFileStatus(1001L, false, 3, 1L << 26, final HdfsFileStatus status = new HdfsFileStatus(1001L, false, 3, 1L << 26,
now, now + 10, new FsPermission((short) 0644), "user", "group", now, now + 10, new FsPermission((short) 0644), "user", "group",
DFSUtil.string2Bytes("bar"), DFSUtil.string2Bytes("foo"), DFSUtil.string2Bytes("bar"), DFSUtil.string2Bytes("foo"),
INodeId.GRANDFATHER_INODE_ID, 0); INodeId.GRANDFATHER_INODE_ID, 0, null, null);
final FileStatus fstatus = toFileStatus(status, parent); final FileStatus fstatus = toFileStatus(status, parent);
System.out.println("status = " + status); System.out.println("status = " + status);
System.out.println("fstatus = " + fstatus); System.out.println("fstatus = " + fstatus);