HDFS-11170. Add builder-based create API to FileSystem. Contributed by SammiChen and Wei Zhou.

This commit is contained in:
Andrew Wang 2017-03-24 12:56:46 -07:00
parent 52b00600df
commit 332a997e10
8 changed files with 353 additions and 1 deletions

View File

@ -0,0 +1,142 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import java.io.IOException;
import java.util.EnumSet;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
/** Base of specific file system FSDataOutputStreamBuilder. */
public class FSDataOutputStreamBuilder{
private Path path = null;
private FsPermission permission = null;
private Integer bufferSize;
private Short replication;
private Long blockSize;
private Progressable progress = null;
private EnumSet<CreateFlag> flags = null;
private ChecksumOpt checksumOpt = null;
private final FileSystem fs;
public FSDataOutputStreamBuilder(FileSystem fileSystem, Path p) {
fs = fileSystem;
path = p;
}
protected Path getPath() {
return path;
}
protected FsPermission getPermission() {
if (permission == null) {
return FsPermission.getFileDefault();
}
return permission;
}
public FSDataOutputStreamBuilder setPermission(final FsPermission perm) {
Preconditions.checkNotNull(perm);
permission = perm;
return this;
}
protected int getBufferSize() {
if (bufferSize == null) {
return fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT);
}
return bufferSize;
}
public FSDataOutputStreamBuilder setBufferSize(int bufSize) {
bufferSize = bufSize;
return this;
}
protected short getReplication() {
if (replication == null) {
return fs.getDefaultReplication(getPath());
}
return replication;
}
public FSDataOutputStreamBuilder setReplication(short replica) {
replication = replica;
return this;
}
protected long getBlockSize() {
if (blockSize == null) {
return fs.getDefaultBlockSize(getPath());
}
return blockSize;
}
public FSDataOutputStreamBuilder setBlockSize(long blkSize) {
blockSize = blkSize;
return this;
}
protected Progressable getProgress() {
return progress;
}
public FSDataOutputStreamBuilder setProgress(final Progressable prog) {
Preconditions.checkNotNull(prog);
progress = prog;
return this;
}
protected EnumSet<CreateFlag> getFlags() {
if (flags == null) {
return EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
}
return flags;
}
public FSDataOutputStreamBuilder setFlags(
final EnumSet<CreateFlag> enumFlags) {
Preconditions.checkNotNull(enumFlags);
flags = enumFlags;
return this;
}
protected ChecksumOpt getChecksumOpt() {
return checksumOpt;
}
public FSDataOutputStreamBuilder setChecksumOpt(
final ChecksumOpt chksumOpt) {
Preconditions.checkNotNull(chksumOpt);
checksumOpt = chksumOpt;
return this;
}
public FSDataOutputStream build() throws IOException {
return fs.create(getPath(), getPermission(), getFlags(), getBufferSize(),
getReplication(), getBlockSize(), getProgress(), getChecksumOpt());
}
}

View File

@ -4138,4 +4138,13 @@ public abstract class FileSystem extends Configured implements Closeable {
public static GlobalStorageStatistics getGlobalStorageStatistics() {
return GlobalStorageStatistics.INSTANCE;
}
/**
* Create a new FSDataOutputStreamBuilder for the file with path.
* @param path file path
* @return a FSDataOutputStreamBuilder object to build the file
*/
public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
return new FSDataOutputStreamBuilder(this, path);
}
}

View File

@ -665,4 +665,9 @@ public class FilterFileSystem extends FileSystem {
public Collection<FileStatus> getTrashRoots(boolean allUsers) {
return fs.getTrashRoots(allUsers);
}
@Override
public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
return fs.newFSDataOutputStreamBuilder(path);
}
}

View File

@ -1268,4 +1268,9 @@ public class HarFileSystem extends FileSystem {
public short getDefaultReplication(Path f) {
return fs.getDefaultReplication(f);
}
@Override
public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
return fs.newFSDataOutputStreamBuilder(path);
}
}

View File

@ -19,10 +19,13 @@ package org.apache.hadoop.fs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.fs.FileSystemTestHelper.*;
import java.io.*;
@ -636,4 +639,55 @@ public class TestLocalFileSystem {
FileStatus[] stats = fs.listStatus(path);
assertTrue(stats != null && stats.length == 1 && stats[0] == stat);
}
@Test
public void testFSOutputStreamBuilder() throws Exception {
Path path = new Path(TEST_ROOT_DIR, "testBuilder");
try {
FSDataOutputStreamBuilder builder =
fileSys.newFSDataOutputStreamBuilder(path);
FSDataOutputStream out = builder.build();
String content = "Create with a generic type of createBuilder!";
byte[] contentOrigin = content.getBytes("UTF8");
out.write(contentOrigin);
out.close();
FSDataInputStream input = fileSys.open(path);
byte[] buffer =
new byte[(int) (fileSys.getFileStatus(path).getLen())];
input.readFully(0, buffer);
input.close();
Assert.assertArrayEquals("The data be read should equals with the "
+ "data written.", contentOrigin, buffer);
} catch (IOException e) {
throw e;
}
// Test value not being set for replication, block size, buffer size
// and permission
FSDataOutputStreamBuilder builder =
fileSys.newFSDataOutputStreamBuilder(path);
builder.build();
Assert.assertEquals("Should be default block size",
builder.getBlockSize(), fileSys.getDefaultBlockSize());
Assert.assertEquals("Should be default replication factor",
builder.getReplication(), fileSys.getDefaultReplication());
Assert.assertEquals("Should be default buffer size",
builder.getBufferSize(),
fileSys.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT));
Assert.assertEquals("Should be default permission",
builder.getPermission(), FsPermission.getFileDefault());
// Test set 0 to replication, block size and buffer size
builder = fileSys.newFSDataOutputStreamBuilder(path);
builder.setBufferSize(0).setBlockSize(0).setReplication((short) 0);
Assert.assertEquals("Block size should be 0",
builder.getBlockSize(), 0);
Assert.assertEquals("Replication factor should be 0",
builder.getReplication(), 0);
Assert.assertEquals("Buffer size should be 0",
builder.getBufferSize(), 0);
}
}

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.FSLinkResolver;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileEncryptionInfo;
@ -446,6 +447,48 @@ public class DistributedFileSystem extends FileSystem {
}.resolve(this, absF);
}
/**
* Same as
* {@link #create(Path, FsPermission, EnumSet<CreateFlag>, int, short, long,
* Progressable, ChecksumOpt)} with the addition of favoredNodes that is a
* hint to where the namenode should place the file blocks.
* The favored nodes hint is not persisted in HDFS. Hence it may be honored
* at the creation time only. And with favored nodes, blocks will be pinned
* on the datanodes to prevent balancing move the block. HDFS could move the
* blocks during replication, to move the blocks from favored nodes. A value
* of null means no favored nodes for this create
*/
private HdfsDataOutputStream create(final Path f,
final FsPermission permission, EnumSet<CreateFlag> flag,
final int bufferSize, final short replication, final long blockSize,
final Progressable progress, final ChecksumOpt checksumOpt,
final InetSocketAddress[] favoredNodes) throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<HdfsDataOutputStream>() {
@Override
public HdfsDataOutputStream doCall(final Path p) throws IOException {
final DFSOutputStream out = dfs.create(getPathName(f), permission,
flag, true, replication, blockSize, progress, bufferSize,
checksumOpt, favoredNodes);
return dfs.createWrappedOutputStream(out, statistics);
}
@Override
public HdfsDataOutputStream next(final FileSystem fs, final Path p)
throws IOException {
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem myDfs = (DistributedFileSystem)fs;
return myDfs.create(p, permission, flag, bufferSize, replication,
blockSize, progress, checksumOpt, favoredNodes);
}
throw new UnsupportedOperationException("Cannot create with" +
" favoredNodes through a symlink to a non-DistributedFileSystem: "
+ f + " -> " + p);
}
}.resolve(this, absF);
}
@Override
protected HdfsDataOutputStream primitiveCreate(Path f,
FsPermission absolutePermission, EnumSet<CreateFlag> flag, int bufferSize,
@ -2584,4 +2627,42 @@ public class DistributedFileSystem extends FileSystem {
DFSOpsCountStatistics getDFSOpsCountStatistics() {
return storageStatistics;
}
/**
* Extends FSDataOutputStreamBuilder to support special requirements
* of DistributedFileSystem.
*/
public static class HdfsDataOutputStreamBuilder
extends FSDataOutputStreamBuilder {
private final DistributedFileSystem dfs;
private InetSocketAddress[] favoredNodes = null;
public HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) {
super(dfs, path);
this.dfs = dfs;
}
protected InetSocketAddress[] getFavoredNodes() {
return favoredNodes;
}
public HdfsDataOutputStreamBuilder setFavoredNodes(
final InetSocketAddress[] nodes) {
Preconditions.checkNotNull(nodes);
favoredNodes = nodes.clone();
return this;
}
@Override
public HdfsDataOutputStream build() throws IOException {
return dfs.create(getPath(), getPermission(), getFlags(),
getBufferSize(), getReplication(), getBlockSize(),
getProgress(), getChecksumOpt(), getFavoredNodes());
}
}
@Override
public HdfsDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
return new HdfsDataOutputStreamBuilder(this, path);
}
}

View File

@ -69,6 +69,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StorageStatistics.LongStatistic;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
@ -81,7 +82,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.top.window.RollingWindowManager.Op;
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
@ -1410,4 +1410,37 @@ public class TestDistributedFileSystem {
}
}
}
@Test
public void testDFSDataOutputStreamBuilder() throws Exception {
Configuration conf = getTestConfiguration();
MiniDFSCluster cluster = null;
String testFile = "/testDFSDataOutputStreamBuilder";
Path testFilePath = new Path(testFile);
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
DistributedFileSystem fs = cluster.getFileSystem();
// Test create an empty file
FSDataOutputStream out =
fs.newFSDataOutputStreamBuilder(testFilePath).build();
out.close();
// Test create a file with content, and verify the content
String content = "This is a test!";
out = fs.newFSDataOutputStreamBuilder(testFilePath)
.setBufferSize(4096).setReplication((short) 1)
.setBlockSize(4096).build();
byte[] contentOrigin = content.getBytes("UTF8");
out.write(contentOrigin);
out.close();
ContractTestUtils.verifyFileContents(fs, testFilePath,
content.getBytes());
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -189,6 +189,29 @@ public class TestFavoredNodesEndToEnd {
}
}
@Test(timeout = 180000)
public void testCreateStreamBuilderFavoredNodesEndToEnd() throws Exception {
//create 10 files with random preferred nodes
for (int i = 0; i < NUM_FILES; i++) {
Random rand = new Random(System.currentTimeMillis() + i);
//pass a new created rand so as to get a uniform distribution each time
//without too much collisions (look at the do-while loop in getDatanodes)
InetSocketAddress[] dns = getDatanodes(rand);
Path p = new Path("/filename"+i);
FSDataOutputStream out =
dfs.newFSDataOutputStreamBuilder(p).setFavoredNodes(dns).build();
out.write(SOME_BYTES);
out.close();
BlockLocation[] locations = getBlockLocations(p);
//verify the files got created in the right nodes
for (BlockLocation loc : locations) {
String[] hosts = loc.getNames();
String[] hosts1 = getStringForInetSocketAddrs(dns);
assertTrue(compareNodes(hosts, hosts1));
}
}
}
private BlockLocation[] getBlockLocations(Path p) throws Exception {
DFSTestUtil.waitReplication(dfs, p, (short)3);
BlockLocation[] locations = dfs.getClient().getBlockLocations(