HDFS-7003. Add NFS Gateway support for reading and writing to encryption zones. (clamb via wang)

(cherry picked from commit 70be56d093)
This commit is contained in:
Andrew Wang 2014-09-18 14:57:25 -07:00
parent 5bc71071d1
commit 47c9dcfe8e
6 changed files with 134 additions and 4 deletions

View File

@ -241,7 +241,7 @@ class DFSClientCache {
public FSDataInputStream load(DFSInputStreamCaheKey key) throws Exception {
DFSClient client = getDfsClient(key.userId);
DFSInputStream dis = client.open(key.inodePath);
return new FSDataInputStream(dis);
return client.createWrappedInputStream(dis);
}
};
}

View File

@ -680,7 +680,7 @@ class OpenFileCtx {
}
try {
fis = new FSDataInputStream(dfsClient.open(path));
fis = dfsClient.createWrappedInputStream(dfsClient.open(path));
readCount = fis.read(offset, readbuffer, 0, count);
if (readCount < count) {
LOG.error("Can't read back " + count + " bytes, partial read size:"

View File

@ -922,8 +922,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface {
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) :
EnumSet.of(CreateFlag.CREATE);
fos = new HdfsDataOutputStream(dfsClient.create(fileIdPath, permission,
flag, false, replication, blockSize, null, bufferSize, null),
fos = dfsClient.createWrappedOutputStream(
dfsClient.create(fileIdPath, permission, flag, false, replication,
blockSize, null, bufferSize, null),
statistics);
if ((createMode == Nfs3Constant.CREATE_UNCHECKED)

View File

@ -17,19 +17,27 @@
*/
package org.apache.hadoop.hdfs.nfs.nfs3;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import org.apache.hadoop.crypto.key.JavaKeyStoreProvider;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.HdfsAdmin;
import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys;
import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@ -98,12 +106,16 @@ public class TestRpcProgramNfs3 {
static DistributedFileSystem hdfs;
static MiniDFSCluster cluster = null;
static NfsConfiguration config = new NfsConfiguration();
static HdfsAdmin dfsAdmin;
static NameNode nn;
static Nfs3 nfs;
static RpcProgramNfs3 nfsd;
static SecurityHandler securityHandler;
static SecurityHandler securityHandlerUnpriviledged;
static String testdir = "/tmp";
private static final String TEST_KEY = "testKey";
private static FileSystemTestHelper fsHelper;
private static File testRootDir;
@BeforeClass
public static void setup() throws Exception {
@ -114,12 +126,20 @@ public class TestRpcProgramNfs3 {
.getProxySuperuserGroupConfKey(currentUser), "*");
config.set(DefaultImpersonationProvider.getTestProvider()
.getProxySuperuserIpConfKey(currentUser), "*");
fsHelper = new FileSystemTestHelper();
// Set up java key store
String testRoot = fsHelper.getTestRootDir();
testRootDir = new File(testRoot).getAbsoluteFile();
final Path jksPath = new Path(testRootDir.toString(), "test.jks");
config.set(KeyProviderFactory.KEY_PROVIDER_PATH,
JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri());
ProxyUsers.refreshSuperUserGroupsConfiguration(config);
cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build();
cluster.waitActive();
hdfs = cluster.getFileSystem();
nn = cluster.getNameNode();
dfsAdmin = new HdfsAdmin(cluster.getURI(), config);
// Use ephemeral ports in case tests are running in parallel
config.setInt("nfs3.mountd.port", 0);
@ -131,6 +151,8 @@ public class TestRpcProgramNfs3 {
nfs.startServiceInternal(false);
nfsd = (RpcProgramNfs3) nfs.getRpcProgram();
hdfs.getClient().setKeyProvider(nn.getNamesystem().getProvider());
DFSTestUtil.createKey(TEST_KEY, cluster, config);
// Mock SecurityHandler which returns system user.name
securityHandler = Mockito.mock(SecurityHandler.class);
@ -310,6 +332,105 @@ public class TestRpcProgramNfs3 {
response2.getStatus());
}
@Test(timeout = 120000)
public void testEncryptedReadWrite() throws Exception {
final int len = 8192;
final Path zone = new Path("/zone");
hdfs.mkdirs(zone);
dfsAdmin.createEncryptionZone(zone, TEST_KEY);
final byte[] buffer = new byte[len];
for (int i = 0; i < len; i++) {
buffer[i] = (byte) i;
}
final String encFile1 = "/zone/myfile";
createFileUsingNfs(encFile1, buffer);
commit(encFile1, len);
assertArrayEquals("encFile1 not equal",
getFileContentsUsingNfs(encFile1, len),
getFileContentsUsingDfs(encFile1, len));
/*
* Same thing except this time create the encrypted file using DFS.
*/
final String encFile2 = "/zone/myfile2";
final Path encFile2Path = new Path(encFile2);
DFSTestUtil.createFile(hdfs, encFile2Path, len, (short) 1, 0xFEED);
assertArrayEquals("encFile2 not equal",
getFileContentsUsingNfs(encFile2, len),
getFileContentsUsingDfs(encFile2, len));
}
private void createFileUsingNfs(String fileName, byte[] buffer)
throws Exception {
DFSTestUtil.createFile(hdfs, new Path(fileName), 0, (short) 1, 0);
final HdfsFileStatus status = nn.getRpcServer().getFileInfo(fileName);
final long dirId = status.getFileId();
final FileHandle handle = new FileHandle(dirId);
final WRITE3Request writeReq = new WRITE3Request(handle, 0,
buffer.length, WriteStableHow.DATA_SYNC, ByteBuffer.wrap(buffer));
final XDR xdr_req = new XDR();
writeReq.serialize(xdr_req);
final WRITE3Response response = nfsd.write(xdr_req.asReadOnlyWrap(),
null, 1, securityHandler,
new InetSocketAddress("localhost", 1234));
assertEquals("Incorrect response: ", null, response);
}
private byte[] getFileContentsUsingNfs(String fileName, int len)
throws Exception {
final HdfsFileStatus status = nn.getRpcServer().getFileInfo(fileName);
final long dirId = status.getFileId();
final FileHandle handle = new FileHandle(dirId);
final READ3Request readReq = new READ3Request(handle, 0, len);
final XDR xdr_req = new XDR();
readReq.serialize(xdr_req);
final READ3Response response = nfsd.read(xdr_req.asReadOnlyWrap(),
securityHandler, new InetSocketAddress("localhost", 1234));
assertEquals("Incorrect return code: ", Nfs3Status.NFS3_OK,
response.getStatus());
assertTrue("expected full read", response.isEof());
return response.getData().array();
}
private byte[] getFileContentsUsingDfs(String fileName, int len)
throws Exception {
final FSDataInputStream in = hdfs.open(new Path(fileName));
final byte[] ret = new byte[len];
in.readFully(ret);
try {
in.readByte();
Assert.fail("expected end of file");
} catch (EOFException e) {
// expected. Unfortunately there is no associated message to check
}
in.close();
return ret;
}
private void commit(String fileName, int len) throws Exception {
final HdfsFileStatus status = nn.getRpcServer().getFileInfo(fileName);
final long dirId = status.getFileId();
final FileHandle handle = new FileHandle(dirId);
final XDR xdr_req = new XDR();
final COMMIT3Request req = new COMMIT3Request(handle, 0, len);
req.serialize(xdr_req);
Channel ch = Mockito.mock(Channel.class);
COMMIT3Response response2 = nfsd.commit(xdr_req.asReadOnlyWrap(),
ch, 1, securityHandler,
new InetSocketAddress("localhost", 1234));
assertEquals("Incorrect COMMIT3Response:", null, response2);
}
@Test(timeout = 60000)
public void testWrite() throws Exception {
HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar");

View File

@ -219,6 +219,9 @@ Release 2.6.0 - UNRELEASED
HDFS-7047. Expose FileStatus#isEncrypted in libhdfs (cmccabe)
HDFS-7003. Add NFS Gateway support for reading and writing to
encryption zones. (clamb via wang)
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -3089,4 +3089,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
public KeyProviderCryptoExtension getKeyProvider() {
return provider;
}
@VisibleForTesting
public void setKeyProvider(KeyProviderCryptoExtension provider) {
this.provider = provider;
}
}