HADOOP-14394. Provide Builder pattern for DistributedFileSystem.create. (lei)

(cherry picked from commit 5fbec46525d6d49837d934556b59ba77bd2301a8)

    Conflicts:
    hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java
    hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
    hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestErasureCodingPolicies.java
This commit is contained in:
Lei Xu 2017-06-14 23:17:53 -07:00
parent 7d81b0beab
commit abac844c90
9 changed files with 369 additions and 86 deletions

View File

@ -18,36 +18,70 @@
package org.apache.hadoop.fs;
import com.google.common.base.Preconditions;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.util.EnumSet;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
/** Base of specific file system FSDataOutputStreamBuilder. */
/**
* Builder for {@link FSDataOutputStream} and its subclasses.
*
* It is used to create {@link FSDataOutputStream} when creating a new file or
* appending an existing file on {@link FileSystem}.
*
* By default, it does not create parent directory that do not exist.
* {@link FileSystem#createNonRecursive(Path, boolean, int, short, long,
* Progressable)}.
*
* To create missing parent directory, use {@link #recursive()}.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class FSDataOutputStreamBuilder {
private Path path = null;
public abstract class FSDataOutputStreamBuilder
<S extends FSDataOutputStream, B extends FSDataOutputStreamBuilder<S, B>> {
private final FileSystem fs;
private final Path path;
private FsPermission permission = null;
private Integer bufferSize;
private Short replication;
private Long blockSize;
private int bufferSize;
private short replication;
private long blockSize;
/** set to true to create missing directory. */
private boolean recursive = false;
private final EnumSet<CreateFlag> flags = EnumSet.noneOf(CreateFlag.class);
private Progressable progress = null;
private EnumSet<CreateFlag> flags = null;
private ChecksumOpt checksumOpt = null;
private final FileSystem fs;
/**
* Return the concrete implementation of the builder instance.
*/
protected abstract B getThisBuilder();
protected FSDataOutputStreamBuilder(FileSystem fileSystem, Path p) {
/**
* Constructor.
*/
protected FSDataOutputStreamBuilder(@Nonnull FileSystem fileSystem,
@Nonnull Path p) {
Preconditions.checkNotNull(fileSystem);
Preconditions.checkNotNull(p);
fs = fileSystem;
path = p;
bufferSize = fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT);
replication = fs.getDefaultReplication(path);
blockSize = fs.getDefaultBlockSize(p);
}
protected FileSystem getFS() {
return fs;
}
protected Path getPath() {
@ -56,91 +90,136 @@ public class FSDataOutputStreamBuilder {
protected FsPermission getPermission() {
if (permission == null) {
return FsPermission.getFileDefault();
permission = FsPermission.getFileDefault();
}
return permission;
}
public FSDataOutputStreamBuilder setPermission(final FsPermission perm) {
/**
* Set permission for the file.
*/
public B permission(@Nonnull final FsPermission perm) {
Preconditions.checkNotNull(perm);
permission = perm;
return this;
return getThisBuilder();
}
protected int getBufferSize() {
if (bufferSize == null) {
return fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT);
}
return bufferSize;
}
public FSDataOutputStreamBuilder setBufferSize(int bufSize) {
/**
* Set the size of the buffer to be used.
*/
public B bufferSize(int bufSize) {
bufferSize = bufSize;
return this;
return getThisBuilder();
}
protected short getReplication() {
if (replication == null) {
return fs.getDefaultReplication(getPath());
}
return replication;
}
public FSDataOutputStreamBuilder setReplication(short replica) {
/**
* Set replication factor.
*/
public B replication(short replica) {
replication = replica;
return this;
return getThisBuilder();
}
protected long getBlockSize() {
if (blockSize == null) {
return fs.getDefaultBlockSize(getPath());
}
return blockSize;
}
public FSDataOutputStreamBuilder setBlockSize(long blkSize) {
/**
* Set block size.
*/
public B blockSize(long blkSize) {
blockSize = blkSize;
return this;
return getThisBuilder();
}
/**
* Return true to create the parent directories if they do not exist.
*/
protected boolean isRecursive() {
return recursive;
}
/**
* Create the parent directory if they do not exist.
*/
public B recursive() {
recursive = true;
return getThisBuilder();
}
protected Progressable getProgress() {
return progress;
}
public FSDataOutputStreamBuilder setProgress(final Progressable prog) {
/**
* Set the facility of reporting progress.
*/
public B progress(@Nonnull final Progressable prog) {
Preconditions.checkNotNull(prog);
progress = prog;
return this;
return getThisBuilder();
}
protected EnumSet<CreateFlag> getFlags() {
if (flags == null) {
return EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
}
return flags;
}
public FSDataOutputStreamBuilder setFlags(
final EnumSet<CreateFlag> enumFlags) {
Preconditions.checkNotNull(enumFlags);
flags = enumFlags;
return this;
/**
* Create an FSDataOutputStream at the specified path.
*/
public B create() {
flags.add(CreateFlag.CREATE);
return getThisBuilder();
}
/**
* Set to true to overwrite the existing file.
* Set it to false, an exception will be thrown when calling {@link #build()}
* if the file exists.
*/
public B overwrite(boolean overwrite) {
if (overwrite) {
flags.add(CreateFlag.OVERWRITE);
} else {
flags.remove(CreateFlag.OVERWRITE);
}
return getThisBuilder();
}
/**
* Append to an existing file (optional operation).
*/
public B append() {
flags.add(CreateFlag.APPEND);
return getThisBuilder();
}
protected ChecksumOpt getChecksumOpt() {
return checksumOpt;
}
public FSDataOutputStreamBuilder setChecksumOpt(
final ChecksumOpt chksumOpt) {
/**
* Set checksum opt.
*/
public B checksumOpt(@Nonnull final ChecksumOpt chksumOpt) {
Preconditions.checkNotNull(chksumOpt);
checksumOpt = chksumOpt;
return this;
return getThisBuilder();
}
public FSDataOutputStream build() throws IOException {
return fs.create(getPath(), getPermission(), getFlags(), getBufferSize(),
getReplication(), getBlockSize(), getProgress(), getChecksumOpt());
}
/**
* Create the FSDataOutputStream to write on the file system.
*
* @throws HadoopIllegalArgumentException if the parameters are not valid.
* @throws IOException on errors when file system creates or appends the file.
*/
public abstract S build() throws IOException;
}

View File

@ -4124,8 +4124,34 @@ public abstract class FileSystem extends Configured implements Closeable {
return GlobalStorageStatistics.INSTANCE;
}
private static final class FileSystemDataOutputStreamBuilder extends
FSDataOutputStreamBuilder<FSDataOutputStream,
FileSystemDataOutputStreamBuilder> {
/**
* Constructor.
*/
protected FileSystemDataOutputStreamBuilder(FileSystem fileSystem, Path p) {
super(fileSystem, p);
}
@Override
public FSDataOutputStream build() throws IOException {
return getFS().create(getPath(), getPermission(), getFlags(),
getBufferSize(), getReplication(), getBlockSize(), getProgress(),
getChecksumOpt());
}
@Override
protected FileSystemDataOutputStreamBuilder getThisBuilder() {
return this;
}
}
/**
* Create a new FSDataOutputStreamBuilder for the file with path.
* Files are overwritten by default.
*
* @param path file path
* @return a FSDataOutputStreamBuilder object to build the file
*
@ -4133,7 +4159,8 @@ public abstract class FileSystem extends Configured implements Closeable {
* builder interface becomes stable.
*/
@InterfaceAudience.Private
protected FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
return new FSDataOutputStreamBuilder(this, path);
protected FSDataOutputStreamBuilder createFile(Path path) {
return new FileSystemDataOutputStreamBuilder(this, path)
.create().overwrite(true);
}
}

View File

@ -667,7 +667,7 @@ public class FilterFileSystem extends FileSystem {
}
@Override
protected FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
return fs.newFSDataOutputStreamBuilder(path);
public FSDataOutputStreamBuilder createFile(Path path) {
return fs.createFile(path);
}
}

View File

@ -1270,7 +1270,7 @@ public class HarFileSystem extends FileSystem {
}
@Override
public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
return fs.newFSDataOutputStreamBuilder(path);
public FSDataOutputStreamBuilder createFile(Path path) {
return fs.createFile(path);
}
}

View File

@ -654,9 +654,9 @@ public class TestLocalFileSystem {
try {
FSDataOutputStreamBuilder builder =
fileSys.newFSDataOutputStreamBuilder(path);
fileSys.createFile(path);
FSDataOutputStream out = builder.build();
String content = "Create with a generic type of createBuilder!";
String content = "Create with a generic type of createFile!";
byte[] contentOrigin = content.getBytes("UTF8");
out.write(contentOrigin);
out.close();
@ -675,7 +675,7 @@ public class TestLocalFileSystem {
// Test value not being set for replication, block size, buffer size
// and permission
FSDataOutputStreamBuilder builder =
fileSys.newFSDataOutputStreamBuilder(path);
fileSys.createFile(path);
builder.build();
Assert.assertEquals("Should be default block size",
builder.getBlockSize(), fileSys.getDefaultBlockSize());
@ -689,8 +689,8 @@ public class TestLocalFileSystem {
builder.getPermission(), FsPermission.getFileDefault());
// Test set 0 to replication, block size and buffer size
builder = fileSys.newFSDataOutputStreamBuilder(path);
builder.setBufferSize(0).setBlockSize(0).setReplication((short) 0);
builder = fileSys.createFile(path);
builder.bufferSize(0).blockSize(0).replication((short) 0);
Assert.assertEquals("Block size should be 0",
builder.getBlockSize(), 0);
Assert.assertEquals("Replication factor should be 0",

View File

@ -101,6 +101,8 @@ import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import javax.annotation.Nonnull;
/****************************************************************
* Implementation of the abstract FileSystem for the DFS system.
* This object is the way end-user code interacts with a Hadoop
@ -540,6 +542,48 @@ public class DistributedFileSystem extends FileSystem {
return dfs.createWrappedOutputStream(dfsos, statistics);
}
/**
* Similar to {@link #create(Path, FsPermission, EnumSet, int, short, long,
* Progressable, ChecksumOpt, InetSocketAddress[])}, it provides a
* HDFS-specific version of {@link #createNonRecursive(Path, FsPermission,
* EnumSet, int, short, long, Progressable)} with a few additions.
*
* @see #create(Path, FsPermission, EnumSet, int, short, long, Progressable,
* ChecksumOpt, InetSocketAddress[]) for the descriptions of
* additional parameters, i.e., favoredNodes and ecPolicyName.
*/
private HdfsDataOutputStream createNonRecursive(final Path f,
final FsPermission permission, final EnumSet<CreateFlag> flag,
final int bufferSize, final short replication, final long blockSize,
final Progressable progress, final ChecksumOpt checksumOpt,
final InetSocketAddress[] favoredNodes)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<HdfsDataOutputStream>() {
@Override
public HdfsDataOutputStream doCall(final Path p) throws IOException {
final DFSOutputStream out = dfs.create(getPathName(f), permission,
flag, false, replication, blockSize, progress, bufferSize,
checksumOpt, favoredNodes);
return dfs.createWrappedOutputStream(out, statistics);
}
@Override
public HdfsDataOutputStream next(final FileSystem fs, final Path p)
throws IOException {
if (fs instanceof DistributedFileSystem) {
DistributedFileSystem myDfs = (DistributedFileSystem)fs;
return myDfs.createNonRecursive(p, permission, flag, bufferSize,
replication, blockSize, progress, checksumOpt, favoredNodes);
}
throw new UnsupportedOperationException("Cannot create with" +
" favoredNodes through a symlink to a non-DistributedFileSystem: "
+ f + " -> " + p);
}
}.resolve(this, absF);
}
/**
* Same as create(), except fails if parent directory doesn't already exist.
*/
@ -2603,40 +2647,120 @@ public class DistributedFileSystem extends FileSystem {
}
/**
* Extends FSDataOutputStreamBuilder to support special requirements
* of DistributedFileSystem.
* HdfsDataOutputStreamBuilder provides the HDFS-specific capabilities to
* write file on HDFS.
*/
public static class HdfsDataOutputStreamBuilder
extends FSDataOutputStreamBuilder {
public static final class HdfsDataOutputStreamBuilder
extends FSDataOutputStreamBuilder<
HdfsDataOutputStream, HdfsDataOutputStreamBuilder> {
private final DistributedFileSystem dfs;
private InetSocketAddress[] favoredNodes = null;
public HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) {
/**
* Construct a HdfsDataOutputStream builder for a file.
* @param dfs the {@link DistributedFileSystem} instance.
* @param path the path of the file to create / append.
*/
private HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) {
super(dfs, path);
this.dfs = dfs;
}
protected InetSocketAddress[] getFavoredNodes() {
@Override
protected HdfsDataOutputStreamBuilder getThisBuilder() {
return this;
}
private InetSocketAddress[] getFavoredNodes() {
return favoredNodes;
}
public HdfsDataOutputStreamBuilder setFavoredNodes(
final InetSocketAddress[] nodes) {
/**
* Set favored DataNodes.
* @param nodes the addresses of the favored DataNodes.
*/
public HdfsDataOutputStreamBuilder favoredNodes(
@Nonnull final InetSocketAddress[] nodes) {
Preconditions.checkNotNull(nodes);
favoredNodes = nodes.clone();
return this;
}
/**
* Force closed blocks to disk.
*
* @see CreateFlag for the details.
*/
public HdfsDataOutputStreamBuilder syncBlock() {
getFlags().add(CreateFlag.SYNC_BLOCK);
return this;
}
/**
* Create the block on transient storage if possible.
*
* @see CreateFlag for the details.
*/
public HdfsDataOutputStreamBuilder lazyPersist() {
getFlags().add(CreateFlag.LAZY_PERSIST);
return this;
}
/**
* Append data to a new block instead of the end of the last partial block.
*
* @see CreateFlag for the details.
*/
public HdfsDataOutputStreamBuilder newBlock() {
getFlags().add(CreateFlag.NEW_BLOCK);
return this;
}
/**
* Advise that a block replica NOT be written to the local DataNode.
*
* @see CreateFlag for the details.
*/
public HdfsDataOutputStreamBuilder noLocalWrite() {
getFlags().add(CreateFlag.NO_LOCAL_WRITE);
return this;
}
@VisibleForTesting
@Override
protected EnumSet<CreateFlag> getFlags() {
return super.getFlags();
}
/**
* Build HdfsDataOutputStream to write.
*
* @return a fully-initialized OutputStream.
* @throws IOException on I/O errors.
*/
@Override
public HdfsDataOutputStream build() throws IOException {
return dfs.create(getPath(), getPermission(), getFlags(),
getBufferSize(), getReplication(), getBlockSize(),
getProgress(), getChecksumOpt(), getFavoredNodes());
if (isRecursive()) {
return dfs.create(getPath(), getPermission(), getFlags(),
getBufferSize(), getReplication(), getBlockSize(),
getProgress(), getChecksumOpt(), getFavoredNodes());
} else {
return dfs.createNonRecursive(getPath(), getPermission(), getFlags(),
getBufferSize(), getReplication(), getBlockSize(), getProgress(),
getChecksumOpt(), getFavoredNodes());
}
}
}
/**
* Create a HdfsDataOutputStreamBuilder to create a file on DFS.
* Similar to {@link #create(Path)}, file is overwritten by default.
*
* @param path the path of the file to create.
* @return A HdfsDataOutputStreamBuilder for creating a file.
*/
@Override
public HdfsDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
return new HdfsDataOutputStreamBuilder(this, path);
public HdfsDataOutputStreamBuilder createFile(Path path) {
return new HdfsDataOutputStreamBuilder(this, path).create().overwrite(true);
}
}

View File

@ -242,7 +242,10 @@ public class NameNodeConnector implements Closeable {
IOUtils.closeStream(fs.append(idPath));
fs.delete(idPath, true);
}
final FSDataOutputStream fsout = fs.create(idPath, false);
final FSDataOutputStream fsout = fs.createFile(idPath)
.overwrite(false).recursive().build();
// mark balancer idPath to be deleted during filesystem closure
fs.deleteOnExit(idPath);
if (write2IdFile) {

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileSystem.Statistics.StatisticsData;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FileChecksum;
@ -78,6 +79,7 @@ import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.DistributedFileSystem.HdfsDataOutputStreamBuilder;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
@ -1542,36 +1544,84 @@ public class TestDistributedFileSystem {
}
}
private void testBuilderSetters(DistributedFileSystem fs) {
Path testFilePath = new Path("/testBuilderSetters");
HdfsDataOutputStreamBuilder builder = fs.createFile(testFilePath);
builder.append().overwrite(false).newBlock().lazyPersist().noLocalWrite();
EnumSet<CreateFlag> flags = builder.getFlags();
assertTrue(flags.contains(CreateFlag.APPEND));
assertTrue(flags.contains(CreateFlag.CREATE));
assertTrue(flags.contains(CreateFlag.NEW_BLOCK));
assertTrue(flags.contains(CreateFlag.NO_LOCAL_WRITE));
assertFalse(flags.contains(CreateFlag.OVERWRITE));
assertFalse(flags.contains(CreateFlag.SYNC_BLOCK));
}
@Test
public void testHdfsDataOutputStreamBuilderSetParameters()
throws IOException {
Configuration conf = getTestConfiguration();
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build()) {
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
testBuilderSetters(fs);
}
}
@Test
public void testDFSDataOutputStreamBuilder() throws Exception {
Configuration conf = getTestConfiguration();
MiniDFSCluster cluster = null;
String testFile = "/testDFSDataOutputStreamBuilder";
Path testFilePath = new Path(testFile);
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1).build()) {
DistributedFileSystem fs = cluster.getFileSystem();
// Test create an empty file
FSDataOutputStream out =
fs.newFSDataOutputStreamBuilder(testFilePath).build();
out.close();
try (FSDataOutputStream out =
fs.createFile(testFilePath).build()) {
LOG.info("Test create an empty file");
}
// Test create a file with content, and verify the content
String content = "This is a test!";
out = fs.newFSDataOutputStreamBuilder(testFilePath)
.setBufferSize(4096).setReplication((short) 1)
.setBlockSize(4096).build();
byte[] contentOrigin = content.getBytes("UTF8");
out.write(contentOrigin);
out.close();
try (FSDataOutputStream out1 = fs.createFile(testFilePath)
.bufferSize(4096)
.replication((short) 1)
.blockSize(4096)
.build()) {
byte[] contentOrigin = content.getBytes("UTF8");
out1.write(contentOrigin);
}
ContractTestUtils.verifyFileContents(fs, testFilePath,
content.getBytes());
} finally {
if (cluster != null) {
cluster.shutdown();
try (FSDataOutputStream out = fs.createFile(testFilePath).overwrite(false)
.build()) {
fail("it should fail to overwrite an existing file");
} catch (FileAlreadyExistsException e) {
// As expected, ignore.
}
Path nonParentFile = new Path("/parent/test");
try (FSDataOutputStream out = fs.createFile(nonParentFile).build()) {
fail("parent directory not exist");
} catch (FileNotFoundException e) {
// As expected.
}
assertFalse("parent directory should not be created",
fs.exists(new Path("/parent")));
try (FSDataOutputStream out = fs.createFile(nonParentFile).recursive()
.build()) {
out.write(1);
}
assertTrue("parent directory has not been created",
fs.exists(new Path("/parent")));
}
}
}

View File

@ -199,7 +199,7 @@ public class TestFavoredNodesEndToEnd {
InetSocketAddress[] dns = getDatanodes(rand);
Path p = new Path("/filename"+i);
FSDataOutputStream out =
dfs.newFSDataOutputStreamBuilder(p).setFavoredNodes(dns).build();
dfs.createFile(p).favoredNodes(dns).build();
out.write(SOME_BYTES);
out.close();
BlockLocation[] locations = getBlockLocations(p);