From 10deafee82376543f0dd06f03196351cb275d42f Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Mon, 27 Mar 2017 14:35:03 -0700 Subject: [PATCH] HDFS-11170. Add builder-based create API to FileSystem. Contributed by SammiChen and Wei Zhou. --- .../hadoop/fs/FSDataOutputStreamBuilder.java | 142 ++++++++++++++++++ .../java/org/apache/hadoop/fs/FileSystem.java | 9 ++ .../apache/hadoop/fs/FilterFileSystem.java | 5 + .../org/apache/hadoop/fs/HarFileSystem.java | 5 + .../apache/hadoop/fs/TestLocalFileSystem.java | 54 +++++++ .../hadoop/hdfs/DistributedFileSystem.java | 81 ++++++++++ .../hdfs/TestDistributedFileSystem.java | 35 ++++- .../namenode/TestFavoredNodesEndToEnd.java | 23 +++ 8 files changed, 353 insertions(+), 1 deletion(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java new file mode 100644 index 00000000000..2e885f3460c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import com.google.common.base.Preconditions; +import org.apache.hadoop.fs.Options.ChecksumOpt; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.util.Progressable; + +import java.io.IOException; +import java.util.EnumSet; + +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; + +/** Base of specific file system FSDataOutputStreamBuilder. */ +public class FSDataOutputStreamBuilder{ + private Path path = null; + private FsPermission permission = null; + private Integer bufferSize; + private Short replication; + private Long blockSize; + private Progressable progress = null; + private EnumSet flags = null; + private ChecksumOpt checksumOpt = null; + + private final FileSystem fs; + + public FSDataOutputStreamBuilder(FileSystem fileSystem, Path p) { + fs = fileSystem; + path = p; + } + + protected Path getPath() { + return path; + } + + protected FsPermission getPermission() { + if (permission == null) { + return FsPermission.getFileDefault(); + } + return permission; + } + + public FSDataOutputStreamBuilder setPermission(final FsPermission perm) { + Preconditions.checkNotNull(perm); + permission = perm; + return this; + } + + protected int getBufferSize() { + if (bufferSize == null) { + return fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, + IO_FILE_BUFFER_SIZE_DEFAULT); + } + return bufferSize; + } + + public FSDataOutputStreamBuilder setBufferSize(int bufSize) { + bufferSize = bufSize; + return this; + } + + protected short getReplication() { + if (replication == null) { + return fs.getDefaultReplication(getPath()); + } + return replication; + } + + public FSDataOutputStreamBuilder setReplication(short replica) { + replication = replica; + return this; + } + + protected long getBlockSize() { + if (blockSize == null) { + return fs.getDefaultBlockSize(getPath()); + } + return blockSize; + } + + public FSDataOutputStreamBuilder setBlockSize(long blkSize) { + blockSize = blkSize; + return this; + } + + protected Progressable getProgress() { + return progress; + } + + public FSDataOutputStreamBuilder setProgress(final Progressable prog) { + Preconditions.checkNotNull(prog); + progress = prog; + return this; + } + + protected EnumSet getFlags() { + if (flags == null) { + return EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); + } + return flags; + } + + public FSDataOutputStreamBuilder setFlags( + final EnumSet enumFlags) { + Preconditions.checkNotNull(enumFlags); + flags = enumFlags; + return this; + } + + protected ChecksumOpt getChecksumOpt() { + return checksumOpt; + } + + public FSDataOutputStreamBuilder setChecksumOpt( + final ChecksumOpt chksumOpt) { + Preconditions.checkNotNull(chksumOpt); + checksumOpt = chksumOpt; + return this; + } + + public FSDataOutputStream build() throws IOException { + return fs.create(getPath(), getPermission(), getFlags(), getBufferSize(), + getReplication(), getBlockSize(), getProgress(), getChecksumOpt()); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 80f1d09127f..29409117eec 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -4122,4 +4122,13 @@ public StorageStatistics getStorageStatistics() { public static GlobalStorageStatistics getGlobalStorageStatistics() { return GlobalStorageStatistics.INSTANCE; } + + /** + * Create a new FSDataOutputStreamBuilder for the file with path. + * @param path file path + * @return a FSDataOutputStreamBuilder object to build the file + */ + public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) { + return new FSDataOutputStreamBuilder(this, path); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java index 41429ace83f..ef0945896ef 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FilterFileSystem.java @@ -665,4 +665,9 @@ public Path getTrashRoot(Path path) { public Collection getTrashRoots(boolean allUsers) { return fs.getTrashRoots(allUsers); } + + @Override + public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) { + return fs.newFSDataOutputStreamBuilder(path); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java index 9b174045862..6f57c7b9ef0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HarFileSystem.java @@ -1268,4 +1268,9 @@ public short getDefaultReplication() { public short getDefaultReplication(Path f) { return fs.getDefaultReplication(f); } + + @Override + public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) { + return fs.newFSDataOutputStreamBuilder(path); + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java index 89049cd7c4c..d4dbf5dd420 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystem.java @@ -19,10 +19,13 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem.Statistics; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Shell; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; import static org.apache.hadoop.fs.FileSystemTestHelper.*; import java.io.*; @@ -631,4 +634,55 @@ public void testFileStatusPipeFile() throws Exception { FileStatus[] stats = fs.listStatus(path); assertTrue(stats != null && stats.length == 1 && stats[0] == stat); } + + @Test + public void testFSOutputStreamBuilder() throws Exception { + Path path = new Path(TEST_ROOT_DIR, "testBuilder"); + + try { + FSDataOutputStreamBuilder builder = + fileSys.newFSDataOutputStreamBuilder(path); + FSDataOutputStream out = builder.build(); + String content = "Create with a generic type of createBuilder!"; + byte[] contentOrigin = content.getBytes("UTF8"); + out.write(contentOrigin); + out.close(); + + FSDataInputStream input = fileSys.open(path); + byte[] buffer = + new byte[(int) (fileSys.getFileStatus(path).getLen())]; + input.readFully(0, buffer); + input.close(); + Assert.assertArrayEquals("The data be read should equals with the " + + "data written.", contentOrigin, buffer); + } catch (IOException e) { + throw e; + } + + // Test value not being set for replication, block size, buffer size + // and permission + FSDataOutputStreamBuilder builder = + fileSys.newFSDataOutputStreamBuilder(path); + builder.build(); + Assert.assertEquals("Should be default block size", + builder.getBlockSize(), fileSys.getDefaultBlockSize()); + Assert.assertEquals("Should be default replication factor", + builder.getReplication(), fileSys.getDefaultReplication()); + Assert.assertEquals("Should be default buffer size", + builder.getBufferSize(), + fileSys.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, + IO_FILE_BUFFER_SIZE_DEFAULT)); + Assert.assertEquals("Should be default permission", + builder.getPermission(), FsPermission.getFileDefault()); + + // Test set 0 to replication, block size and buffer size + builder = fileSys.newFSDataOutputStreamBuilder(path); + builder.setBufferSize(0).setBlockSize(0).setReplication((short) 0); + Assert.assertEquals("Block size should be 0", + builder.getBlockSize(), 0); + Assert.assertEquals("Replication factor should be 0", + builder.getReplication(), 0); + Assert.assertEquals("Buffer size should be 0", + builder.getBufferSize(), 0); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 8080a3ffbd1..69d4b2a571e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FSDataOutputStreamBuilder; import org.apache.hadoop.fs.FSLinkResolver; import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileEncryptionInfo; @@ -483,6 +484,48 @@ public FSDataOutputStream next(final FileSystem fs, final Path p) }.resolve(this, absF); } + /** + * Same as + * {@link #create(Path, FsPermission, EnumSet, int, short, long, + * Progressable, ChecksumOpt)} with the addition of favoredNodes that is a + * hint to where the namenode should place the file blocks. + * The favored nodes hint is not persisted in HDFS. Hence it may be honored + * at the creation time only. And with favored nodes, blocks will be pinned + * on the datanodes to prevent balancing move the block. HDFS could move the + * blocks during replication, to move the blocks from favored nodes. A value + * of null means no favored nodes for this create + */ + private HdfsDataOutputStream create(final Path f, + final FsPermission permission, final EnumSet flag, + final int bufferSize, final short replication, final long blockSize, + final Progressable progress, final ChecksumOpt checksumOpt, + final InetSocketAddress[] favoredNodes) throws IOException { + statistics.incrementWriteOps(1); + storageStatistics.incrementOpCounter(OpType.CREATE); + Path absF = fixRelativePart(f); + return new FileSystemLinkResolver() { + @Override + public HdfsDataOutputStream doCall(final Path p) throws IOException { + final DFSOutputStream out = dfs.create(getPathName(f), permission, + flag, true, replication, blockSize, progress, bufferSize, + checksumOpt, favoredNodes); + return dfs.createWrappedOutputStream(out, statistics); + } + @Override + public HdfsDataOutputStream next(final FileSystem fs, final Path p) + throws IOException { + if (fs instanceof DistributedFileSystem) { + DistributedFileSystem myDfs = (DistributedFileSystem)fs; + return myDfs.create(p, permission, flag, bufferSize, replication, + blockSize, progress, checksumOpt, favoredNodes); + } + throw new UnsupportedOperationException("Cannot create with" + + " favoredNodes through a symlink to a non-DistributedFileSystem: " + + f + " -> " + p); + } + }.resolve(this, absF); + } + @Override protected HdfsDataOutputStream primitiveCreate(Path f, FsPermission absolutePermission, EnumSet flag, int bufferSize, @@ -2550,4 +2593,42 @@ Statistics getFsStatistics() { DFSOpsCountStatistics getDFSOpsCountStatistics() { return storageStatistics; } + + /** + * Extends FSDataOutputStreamBuilder to support special requirements + * of DistributedFileSystem. + */ + public static class HdfsDataOutputStreamBuilder + extends FSDataOutputStreamBuilder { + private final DistributedFileSystem dfs; + private InetSocketAddress[] favoredNodes = null; + + public HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) { + super(dfs, path); + this.dfs = dfs; + } + + protected InetSocketAddress[] getFavoredNodes() { + return favoredNodes; + } + + public HdfsDataOutputStreamBuilder setFavoredNodes( + final InetSocketAddress[] nodes) { + Preconditions.checkNotNull(nodes); + favoredNodes = nodes.clone(); + return this; + } + + @Override + public HdfsDataOutputStream build() throws IOException { + return dfs.create(getPath(), getPermission(), getFlags(), + getBufferSize(), getReplication(), getBlockSize(), + getProgress(), getChecksumOpt(), getFavoredNodes()); + } + } + + @Override + public HdfsDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) { + return new HdfsDataOutputStreamBuilder(this, path); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index a79ca4697d0..9932f9caab2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -75,6 +75,7 @@ import org.apache.hadoop.fs.StorageStatistics.LongStatistic; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.VolumeId; +import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; @@ -88,7 +89,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.web.HftpFileSystem; -import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op; import org.apache.hadoop.hdfs.web.WebHdfsConstants; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.net.DNSToSwitchMapping; @@ -1519,4 +1519,37 @@ public void testDFSCloseFilesBeingWritten() throws Exception { } } } + + @Test + public void testDFSDataOutputStreamBuilder() throws Exception { + Configuration conf = getTestConfiguration(); + MiniDFSCluster cluster = null; + String testFile = "/testDFSDataOutputStreamBuilder"; + Path testFilePath = new Path(testFile); + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + DistributedFileSystem fs = cluster.getFileSystem(); + + // Test create an empty file + FSDataOutputStream out = + fs.newFSDataOutputStreamBuilder(testFilePath).build(); + out.close(); + + // Test create a file with content, and verify the content + String content = "This is a test!"; + out = fs.newFSDataOutputStreamBuilder(testFilePath) + .setBufferSize(4096).setReplication((short) 1) + .setBlockSize(4096).build(); + byte[] contentOrigin = content.getBytes("UTF8"); + out.write(contentOrigin); + out.close(); + + ContractTestUtils.verifyFileContents(fs, testFilePath, + content.getBytes()); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java index b78b6cc36ed..50e56cc7e81 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFavoredNodesEndToEnd.java @@ -189,6 +189,29 @@ public void testFavoredNodesEndToEndForAppend() throws Exception { } } + @Test(timeout = 180000) + public void testCreateStreamBuilderFavoredNodesEndToEnd() throws Exception { + //create 10 files with random preferred nodes + for (int i = 0; i < NUM_FILES; i++) { + Random rand = new Random(System.currentTimeMillis() + i); + //pass a new created rand so as to get a uniform distribution each time + //without too much collisions (look at the do-while loop in getDatanodes) + InetSocketAddress[] dns = getDatanodes(rand); + Path p = new Path("/filename"+i); + FSDataOutputStream out = + dfs.newFSDataOutputStreamBuilder(p).setFavoredNodes(dns).build(); + out.write(SOME_BYTES); + out.close(); + BlockLocation[] locations = getBlockLocations(p); + //verify the files got created in the right nodes + for (BlockLocation loc : locations) { + String[] hosts = loc.getNames(); + String[] hosts1 = getStringForInetSocketAddrs(dns); + assertTrue(compareNodes(hosts, hosts1)); + } + } + } + private BlockLocation[] getBlockLocations(Path p) throws Exception { DFSTestUtil.waitReplication(dfs, p, (short)3); BlockLocation[] locations = dfs.getClient().getBlockLocations(