From 47c9dcfe8eb425ad77b72900c69df56f39fd1889 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Thu, 18 Sep 2014 14:57:25 -0700 Subject: [PATCH] HDFS-7003. Add NFS Gateway support for reading and writing to encryption zones. (clamb via wang) (cherry picked from commit 70be56d093022de9953e14a92dfa1a146bd9a290) --- .../hadoop/hdfs/nfs/nfs3/DFSClientCache.java | 2 +- .../hadoop/hdfs/nfs/nfs3/OpenFileCtx.java | 2 +- .../hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java | 5 +- .../hdfs/nfs/nfs3/TestRpcProgramNfs3.java | 121 ++++++++++++++++++ hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/DFSClient.java | 5 + 6 files changed, 134 insertions(+), 4 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java index f2d26115d10..aad20e0bec3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/DFSClientCache.java @@ -241,7 +241,7 @@ class DFSClientCache { public FSDataInputStream load(DFSInputStreamCaheKey key) throws Exception { DFSClient client = getDfsClient(key.userId); DFSInputStream dis = client.open(key.inodePath); - return new FSDataInputStream(dis); + return client.createWrappedInputStream(dis); } }; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java index 707900b59b0..6a5368cf1b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java @@ -680,7 +680,7 @@ class OpenFileCtx { } try { - fis = new FSDataInputStream(dfsClient.open(path)); + fis = dfsClient.createWrappedInputStream(dfsClient.open(path)); readCount = fis.read(offset, readbuffer, 0, count); if (readCount < count) { LOG.error("Can't read back " + count + " bytes, partial read size:" diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java index 0d591d63963..70c37d86cbf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java @@ -922,8 +922,9 @@ public class RpcProgramNfs3 extends RpcProgram implements Nfs3Interface { EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE); - fos = new HdfsDataOutputStream(dfsClient.create(fileIdPath, permission, - flag, false, replication, blockSize, null, bufferSize, null), + fos = dfsClient.createWrappedOutputStream( + dfsClient.create(fileIdPath, permission, flag, false, replication, + blockSize, null, bufferSize, null), statistics); if ((createMode == Nfs3Constant.CREATE_UNCHECKED) diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java index 05b976da8be..68efac2b9cc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestRpcProgramNfs3.java @@ -17,19 +17,27 @@ */ package org.apache.hadoop.hdfs.nfs.nfs3; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.EOFException; +import java.io.File; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.EnumSet; +import org.apache.hadoop.crypto.key.JavaKeyStoreProvider; +import org.apache.hadoop.crypto.key.KeyProviderFactory; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsAdmin; import org.apache.hadoop.hdfs.nfs.conf.NfsConfigKeys; import org.apache.hadoop.hdfs.nfs.conf.NfsConfiguration; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -98,12 +106,16 @@ public class TestRpcProgramNfs3 { static DistributedFileSystem hdfs; static MiniDFSCluster cluster = null; static NfsConfiguration config = new NfsConfiguration(); + static HdfsAdmin dfsAdmin; static NameNode nn; static Nfs3 nfs; static RpcProgramNfs3 nfsd; static SecurityHandler securityHandler; static SecurityHandler securityHandlerUnpriviledged; static String testdir = "/tmp"; + private static final String TEST_KEY = "testKey"; + private static FileSystemTestHelper fsHelper; + private static File testRootDir; @BeforeClass public static void setup() throws Exception { @@ -114,12 +126,20 @@ public class TestRpcProgramNfs3 { .getProxySuperuserGroupConfKey(currentUser), "*"); config.set(DefaultImpersonationProvider.getTestProvider() .getProxySuperuserIpConfKey(currentUser), "*"); + fsHelper = new FileSystemTestHelper(); + // Set up java key store + String testRoot = fsHelper.getTestRootDir(); + testRootDir = new File(testRoot).getAbsoluteFile(); + final Path jksPath = new Path(testRootDir.toString(), "test.jks"); + config.set(KeyProviderFactory.KEY_PROVIDER_PATH, + JavaKeyStoreProvider.SCHEME_NAME + "://file" + jksPath.toUri()); ProxyUsers.refreshSuperUserGroupsConfiguration(config); cluster = new MiniDFSCluster.Builder(config).numDataNodes(1).build(); cluster.waitActive(); hdfs = cluster.getFileSystem(); nn = cluster.getNameNode(); + dfsAdmin = new HdfsAdmin(cluster.getURI(), config); // Use ephemeral ports in case tests are running in parallel config.setInt("nfs3.mountd.port", 0); @@ -131,6 +151,8 @@ public class TestRpcProgramNfs3 { nfs.startServiceInternal(false); nfsd = (RpcProgramNfs3) nfs.getRpcProgram(); + hdfs.getClient().setKeyProvider(nn.getNamesystem().getProvider()); + DFSTestUtil.createKey(TEST_KEY, cluster, config); // Mock SecurityHandler which returns system user.name securityHandler = Mockito.mock(SecurityHandler.class); @@ -310,6 +332,105 @@ public class TestRpcProgramNfs3 { response2.getStatus()); } + @Test(timeout = 120000) + public void testEncryptedReadWrite() throws Exception { + final int len = 8192; + + final Path zone = new Path("/zone"); + hdfs.mkdirs(zone); + dfsAdmin.createEncryptionZone(zone, TEST_KEY); + + final byte[] buffer = new byte[len]; + for (int i = 0; i < len; i++) { + buffer[i] = (byte) i; + } + + final String encFile1 = "/zone/myfile"; + createFileUsingNfs(encFile1, buffer); + commit(encFile1, len); + assertArrayEquals("encFile1 not equal", + getFileContentsUsingNfs(encFile1, len), + getFileContentsUsingDfs(encFile1, len)); + + /* + * Same thing except this time create the encrypted file using DFS. + */ + final String encFile2 = "/zone/myfile2"; + final Path encFile2Path = new Path(encFile2); + DFSTestUtil.createFile(hdfs, encFile2Path, len, (short) 1, 0xFEED); + assertArrayEquals("encFile2 not equal", + getFileContentsUsingNfs(encFile2, len), + getFileContentsUsingDfs(encFile2, len)); + } + + private void createFileUsingNfs(String fileName, byte[] buffer) + throws Exception { + DFSTestUtil.createFile(hdfs, new Path(fileName), 0, (short) 1, 0); + + final HdfsFileStatus status = nn.getRpcServer().getFileInfo(fileName); + final long dirId = status.getFileId(); + final FileHandle handle = new FileHandle(dirId); + + final WRITE3Request writeReq = new WRITE3Request(handle, 0, + buffer.length, WriteStableHow.DATA_SYNC, ByteBuffer.wrap(buffer)); + final XDR xdr_req = new XDR(); + writeReq.serialize(xdr_req); + + final WRITE3Response response = nfsd.write(xdr_req.asReadOnlyWrap(), + null, 1, securityHandler, + new InetSocketAddress("localhost", 1234)); + assertEquals("Incorrect response: ", null, response); + } + + private byte[] getFileContentsUsingNfs(String fileName, int len) + throws Exception { + final HdfsFileStatus status = nn.getRpcServer().getFileInfo(fileName); + final long dirId = status.getFileId(); + final FileHandle handle = new FileHandle(dirId); + + final READ3Request readReq = new READ3Request(handle, 0, len); + final XDR xdr_req = new XDR(); + readReq.serialize(xdr_req); + + final READ3Response response = nfsd.read(xdr_req.asReadOnlyWrap(), + securityHandler, new InetSocketAddress("localhost", 1234)); + assertEquals("Incorrect return code: ", Nfs3Status.NFS3_OK, + response.getStatus()); + assertTrue("expected full read", response.isEof()); + return response.getData().array(); + } + + private byte[] getFileContentsUsingDfs(String fileName, int len) + throws Exception { + final FSDataInputStream in = hdfs.open(new Path(fileName)); + final byte[] ret = new byte[len]; + in.readFully(ret); + try { + in.readByte(); + Assert.fail("expected end of file"); + } catch (EOFException e) { + // expected. Unfortunately there is no associated message to check + } + in.close(); + return ret; + } + + private void commit(String fileName, int len) throws Exception { + final HdfsFileStatus status = nn.getRpcServer().getFileInfo(fileName); + final long dirId = status.getFileId(); + final FileHandle handle = new FileHandle(dirId); + final XDR xdr_req = new XDR(); + final COMMIT3Request req = new COMMIT3Request(handle, 0, len); + req.serialize(xdr_req); + + Channel ch = Mockito.mock(Channel.class); + + COMMIT3Response response2 = nfsd.commit(xdr_req.asReadOnlyWrap(), + ch, 1, securityHandler, + new InetSocketAddress("localhost", 1234)); + assertEquals("Incorrect COMMIT3Response:", null, response2); + } + @Test(timeout = 60000) public void testWrite() throws Exception { HdfsFileStatus status = nn.getRpcServer().getFileInfo("/tmp/bar"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index bbd80b9d543..4345daef759 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -219,6 +219,9 @@ Release 2.6.0 - UNRELEASED HDFS-7047. Expose FileStatus#isEncrypted in libhdfs (cmccabe) + HDFS-7003. Add NFS Gateway support for reading and writing to + encryption zones. (clamb via wang) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 456fac63425..dfc9e0dccc4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -3089,4 +3089,9 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public KeyProviderCryptoExtension getKeyProvider() { return provider; } + + @VisibleForTesting + public void setKeyProvider(KeyProviderCryptoExtension provider) { + this.provider = provider; + } }