HADOOP-14394. Provide Builder pattern for DistributedFileSystem.create. (lei)
This commit is contained in:
parent
ef8edab930
commit
5e7cfdca7b
|
@ -18,36 +18,70 @@
|
|||
package org.apache.hadoop.fs;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||
|
||||
/** Base of specific file system FSDataOutputStreamBuilder. */
|
||||
/**
|
||||
* Builder for {@link FSDataOutputStream} and its subclasses.
|
||||
*
|
||||
* It is used to create {@link FSDataOutputStream} when creating a new file or
|
||||
* appending an existing file on {@link FileSystem}.
|
||||
*
|
||||
* By default, it does not create parent directory that do not exist.
|
||||
* {@link FileSystem#createNonRecursive(Path, boolean, int, short, long,
|
||||
* Progressable)}.
|
||||
*
|
||||
* To create missing parent directory, use {@link #recursive()}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
public class FSDataOutputStreamBuilder {
|
||||
private Path path = null;
|
||||
public abstract class FSDataOutputStreamBuilder
|
||||
<S extends FSDataOutputStream, B extends FSDataOutputStreamBuilder<S, B>> {
|
||||
private final FileSystem fs;
|
||||
private final Path path;
|
||||
private FsPermission permission = null;
|
||||
private Integer bufferSize;
|
||||
private Short replication;
|
||||
private Long blockSize;
|
||||
private int bufferSize;
|
||||
private short replication;
|
||||
private long blockSize;
|
||||
/** set to true to create missing directory. */
|
||||
private boolean recursive = false;
|
||||
private final EnumSet<CreateFlag> flags = EnumSet.noneOf(CreateFlag.class);
|
||||
private Progressable progress = null;
|
||||
private EnumSet<CreateFlag> flags = null;
|
||||
private ChecksumOpt checksumOpt = null;
|
||||
|
||||
private final FileSystem fs;
|
||||
/**
|
||||
* Return the concrete implementation of the builder instance.
|
||||
*/
|
||||
protected abstract B getThisBuilder();
|
||||
|
||||
protected FSDataOutputStreamBuilder(FileSystem fileSystem, Path p) {
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
protected FSDataOutputStreamBuilder(@Nonnull FileSystem fileSystem,
|
||||
@Nonnull Path p) {
|
||||
Preconditions.checkNotNull(fileSystem);
|
||||
Preconditions.checkNotNull(p);
|
||||
fs = fileSystem;
|
||||
path = p;
|
||||
bufferSize = fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
|
||||
IO_FILE_BUFFER_SIZE_DEFAULT);
|
||||
replication = fs.getDefaultReplication(path);
|
||||
blockSize = fs.getDefaultBlockSize(p);
|
||||
}
|
||||
|
||||
protected FileSystem getFS() {
|
||||
return fs;
|
||||
}
|
||||
|
||||
protected Path getPath() {
|
||||
|
@ -56,91 +90,136 @@ public class FSDataOutputStreamBuilder {
|
|||
|
||||
protected FsPermission getPermission() {
|
||||
if (permission == null) {
|
||||
return FsPermission.getFileDefault();
|
||||
permission = FsPermission.getFileDefault();
|
||||
}
|
||||
return permission;
|
||||
}
|
||||
|
||||
public FSDataOutputStreamBuilder setPermission(final FsPermission perm) {
|
||||
/**
|
||||
* Set permission for the file.
|
||||
*/
|
||||
public B permission(@Nonnull final FsPermission perm) {
|
||||
Preconditions.checkNotNull(perm);
|
||||
permission = perm;
|
||||
return this;
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
protected int getBufferSize() {
|
||||
if (bufferSize == null) {
|
||||
return fs.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
|
||||
IO_FILE_BUFFER_SIZE_DEFAULT);
|
||||
}
|
||||
return bufferSize;
|
||||
}
|
||||
|
||||
public FSDataOutputStreamBuilder setBufferSize(int bufSize) {
|
||||
/**
|
||||
* Set the size of the buffer to be used.
|
||||
*/
|
||||
public B bufferSize(int bufSize) {
|
||||
bufferSize = bufSize;
|
||||
return this;
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
protected short getReplication() {
|
||||
if (replication == null) {
|
||||
return fs.getDefaultReplication(getPath());
|
||||
}
|
||||
return replication;
|
||||
}
|
||||
|
||||
public FSDataOutputStreamBuilder setReplication(short replica) {
|
||||
/**
|
||||
* Set replication factor.
|
||||
*/
|
||||
public B replication(short replica) {
|
||||
replication = replica;
|
||||
return this;
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
protected long getBlockSize() {
|
||||
if (blockSize == null) {
|
||||
return fs.getDefaultBlockSize(getPath());
|
||||
}
|
||||
return blockSize;
|
||||
}
|
||||
|
||||
public FSDataOutputStreamBuilder setBlockSize(long blkSize) {
|
||||
/**
|
||||
* Set block size.
|
||||
*/
|
||||
public B blockSize(long blkSize) {
|
||||
blockSize = blkSize;
|
||||
return this;
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return true to create the parent directories if they do not exist.
|
||||
*/
|
||||
protected boolean isRecursive() {
|
||||
return recursive;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the parent directory if they do not exist.
|
||||
*/
|
||||
public B recursive() {
|
||||
recursive = true;
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
protected Progressable getProgress() {
|
||||
return progress;
|
||||
}
|
||||
|
||||
public FSDataOutputStreamBuilder setProgress(final Progressable prog) {
|
||||
/**
|
||||
* Set the facility of reporting progress.
|
||||
*/
|
||||
public B progress(@Nonnull final Progressable prog) {
|
||||
Preconditions.checkNotNull(prog);
|
||||
progress = prog;
|
||||
return this;
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
protected EnumSet<CreateFlag> getFlags() {
|
||||
if (flags == null) {
|
||||
return EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE);
|
||||
}
|
||||
return flags;
|
||||
}
|
||||
|
||||
public FSDataOutputStreamBuilder setFlags(
|
||||
final EnumSet<CreateFlag> enumFlags) {
|
||||
Preconditions.checkNotNull(enumFlags);
|
||||
flags = enumFlags;
|
||||
return this;
|
||||
/**
|
||||
* Create an FSDataOutputStream at the specified path.
|
||||
*/
|
||||
public B create() {
|
||||
flags.add(CreateFlag.CREATE);
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
/**
|
||||
* Set to true to overwrite the existing file.
|
||||
* Set it to false, an exception will be thrown when calling {@link #build()}
|
||||
* if the file exists.
|
||||
*/
|
||||
public B overwrite(boolean overwrite) {
|
||||
if (overwrite) {
|
||||
flags.add(CreateFlag.OVERWRITE);
|
||||
} else {
|
||||
flags.remove(CreateFlag.OVERWRITE);
|
||||
}
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
/**
|
||||
* Append to an existing file (optional operation).
|
||||
*/
|
||||
public B append() {
|
||||
flags.add(CreateFlag.APPEND);
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
protected ChecksumOpt getChecksumOpt() {
|
||||
return checksumOpt;
|
||||
}
|
||||
|
||||
public FSDataOutputStreamBuilder setChecksumOpt(
|
||||
final ChecksumOpt chksumOpt) {
|
||||
/**
|
||||
* Set checksum opt.
|
||||
*/
|
||||
public B checksumOpt(@Nonnull final ChecksumOpt chksumOpt) {
|
||||
Preconditions.checkNotNull(chksumOpt);
|
||||
checksumOpt = chksumOpt;
|
||||
return this;
|
||||
return getThisBuilder();
|
||||
}
|
||||
|
||||
public FSDataOutputStream build() throws IOException {
|
||||
return fs.create(getPath(), getPermission(), getFlags(), getBufferSize(),
|
||||
getReplication(), getBlockSize(), getProgress(), getChecksumOpt());
|
||||
}
|
||||
/**
|
||||
* Create the FSDataOutputStream to write on the file system.
|
||||
*
|
||||
* @throws HadoopIllegalArgumentException if the parameters are not valid.
|
||||
* @throws IOException on errors when file system creates or appends the file.
|
||||
*/
|
||||
public abstract S build() throws IOException;
|
||||
}
|
||||
|
|
|
@ -4140,8 +4140,34 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|||
return GlobalStorageStatistics.INSTANCE;
|
||||
}
|
||||
|
||||
private static final class FileSystemDataOutputStreamBuilder extends
|
||||
FSDataOutputStreamBuilder<FSDataOutputStream,
|
||||
FileSystemDataOutputStreamBuilder> {
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*/
|
||||
protected FileSystemDataOutputStreamBuilder(FileSystem fileSystem, Path p) {
|
||||
super(fileSystem, p);
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataOutputStream build() throws IOException {
|
||||
return getFS().create(getPath(), getPermission(), getFlags(),
|
||||
getBufferSize(), getReplication(), getBlockSize(), getProgress(),
|
||||
getChecksumOpt());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected FileSystemDataOutputStreamBuilder getThisBuilder() {
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new FSDataOutputStreamBuilder for the file with path.
|
||||
* Files are overwritten by default.
|
||||
*
|
||||
* @param path file path
|
||||
* @return a FSDataOutputStreamBuilder object to build the file
|
||||
*
|
||||
|
@ -4149,7 +4175,8 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|||
* builder interface becomes stable.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
protected FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
|
||||
return new FSDataOutputStreamBuilder(this, path);
|
||||
protected FSDataOutputStreamBuilder createFile(Path path) {
|
||||
return new FileSystemDataOutputStreamBuilder(this, path)
|
||||
.create().overwrite(true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -667,7 +667,7 @@ public class FilterFileSystem extends FileSystem {
|
|||
}
|
||||
|
||||
@Override
|
||||
protected FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
|
||||
return fs.newFSDataOutputStreamBuilder(path);
|
||||
public FSDataOutputStreamBuilder createFile(Path path) {
|
||||
return fs.createFile(path);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1270,7 +1270,7 @@ public class HarFileSystem extends FileSystem {
|
|||
}
|
||||
|
||||
@Override
|
||||
public FSDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
|
||||
return fs.newFSDataOutputStreamBuilder(path);
|
||||
public FSDataOutputStreamBuilder createFile(Path path) {
|
||||
return fs.createFile(path);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -659,9 +659,9 @@ public class TestLocalFileSystem {
|
|||
|
||||
try {
|
||||
FSDataOutputStreamBuilder builder =
|
||||
fileSys.newFSDataOutputStreamBuilder(path);
|
||||
fileSys.createFile(path);
|
||||
FSDataOutputStream out = builder.build();
|
||||
String content = "Create with a generic type of createBuilder!";
|
||||
String content = "Create with a generic type of createFile!";
|
||||
byte[] contentOrigin = content.getBytes("UTF8");
|
||||
out.write(contentOrigin);
|
||||
out.close();
|
||||
|
@ -680,7 +680,7 @@ public class TestLocalFileSystem {
|
|||
// Test value not being set for replication, block size, buffer size
|
||||
// and permission
|
||||
FSDataOutputStreamBuilder builder =
|
||||
fileSys.newFSDataOutputStreamBuilder(path);
|
||||
fileSys.createFile(path);
|
||||
builder.build();
|
||||
Assert.assertEquals("Should be default block size",
|
||||
builder.getBlockSize(), fileSys.getDefaultBlockSize());
|
||||
|
@ -694,8 +694,8 @@ public class TestLocalFileSystem {
|
|||
builder.getPermission(), FsPermission.getFileDefault());
|
||||
|
||||
// Test set 0 to replication, block size and buffer size
|
||||
builder = fileSys.newFSDataOutputStreamBuilder(path);
|
||||
builder.setBufferSize(0).setBlockSize(0).setReplication((short) 0);
|
||||
builder = fileSys.createFile(path);
|
||||
builder.bufferSize(0).blockSize(0).replication((short) 0);
|
||||
Assert.assertEquals("Block size should be 0",
|
||||
builder.getBlockSize(), 0);
|
||||
Assert.assertEquals("Replication factor should be 0",
|
||||
|
|
|
@ -101,7 +101,6 @@ import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
|
||||
|
@ -525,6 +524,49 @@ public class DistributedFileSystem extends FileSystem {
|
|||
return dfs.createWrappedOutputStream(dfsos, statistics);
|
||||
}
|
||||
|
||||
/**
|
||||
* Similar to {@link #create(Path, FsPermission, EnumSet, int, short, long,
|
||||
* Progressable, ChecksumOpt, InetSocketAddress[], String)}, it provides a
|
||||
* HDFS-specific version of {@link #createNonRecursive(Path, FsPermission,
|
||||
* EnumSet, int, short, long, Progressable)} with a few additions.
|
||||
*
|
||||
* @see #create(Path, FsPermission, EnumSet, int, short, long, Progressable,
|
||||
* ChecksumOpt, InetSocketAddress[], String) for the descriptions of
|
||||
* additional parameters, i.e., favoredNodes and ecPolicyName.
|
||||
*/
|
||||
private HdfsDataOutputStream createNonRecursive(final Path f,
|
||||
final FsPermission permission, final EnumSet<CreateFlag> flag,
|
||||
final int bufferSize, final short replication, final long blockSize,
|
||||
final Progressable progress, final ChecksumOpt checksumOpt,
|
||||
final InetSocketAddress[] favoredNodes, final String ecPolicyName)
|
||||
throws IOException {
|
||||
statistics.incrementWriteOps(1);
|
||||
storageStatistics.incrementOpCounter(OpType.CREATE);
|
||||
Path absF = fixRelativePart(f);
|
||||
return new FileSystemLinkResolver<HdfsDataOutputStream>() {
|
||||
@Override
|
||||
public HdfsDataOutputStream doCall(final Path p) throws IOException {
|
||||
final DFSOutputStream out = dfs.create(getPathName(f), permission,
|
||||
flag, false, replication, blockSize, progress, bufferSize,
|
||||
checksumOpt, favoredNodes, ecPolicyName);
|
||||
return dfs.createWrappedOutputStream(out, statistics);
|
||||
}
|
||||
@Override
|
||||
public HdfsDataOutputStream next(final FileSystem fs, final Path p)
|
||||
throws IOException {
|
||||
if (fs instanceof DistributedFileSystem) {
|
||||
DistributedFileSystem myDfs = (DistributedFileSystem)fs;
|
||||
return myDfs.createNonRecursive(p, permission, flag, bufferSize,
|
||||
replication, blockSize, progress, checksumOpt, favoredNodes,
|
||||
ecPolicyName);
|
||||
}
|
||||
throw new UnsupportedOperationException("Cannot create with" +
|
||||
" favoredNodes through a symlink to a non-DistributedFileSystem: "
|
||||
+ f + " -> " + p);
|
||||
}
|
||||
}.resolve(this, absF);
|
||||
}
|
||||
|
||||
/**
|
||||
* Same as create(), except fails if parent directory doesn't already exist.
|
||||
*/
|
||||
|
@ -2686,33 +2728,88 @@ public class DistributedFileSystem extends FileSystem {
|
|||
}
|
||||
|
||||
/**
|
||||
* Extends FSDataOutputStreamBuilder to support special requirements
|
||||
* of DistributedFileSystem.
|
||||
* HdfsDataOutputStreamBuilder provides the HDFS-specific capabilities to
|
||||
* write file on HDFS.
|
||||
*/
|
||||
public static class HdfsDataOutputStreamBuilder
|
||||
extends FSDataOutputStreamBuilder {
|
||||
public static final class HdfsDataOutputStreamBuilder
|
||||
extends FSDataOutputStreamBuilder<
|
||||
HdfsDataOutputStream, HdfsDataOutputStreamBuilder> {
|
||||
private final DistributedFileSystem dfs;
|
||||
private InetSocketAddress[] favoredNodes = null;
|
||||
private String ecPolicyName = null;
|
||||
private boolean shouldReplicate = false;
|
||||
|
||||
public HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) {
|
||||
/**
|
||||
* Construct a HdfsDataOutputStream builder for a file.
|
||||
* @param dfs the {@link DistributedFileSystem} instance.
|
||||
* @param path the path of the file to create / append.
|
||||
*/
|
||||
private HdfsDataOutputStreamBuilder(DistributedFileSystem dfs, Path path) {
|
||||
super(dfs, path);
|
||||
this.dfs = dfs;
|
||||
}
|
||||
|
||||
protected InetSocketAddress[] getFavoredNodes() {
|
||||
@Override
|
||||
protected HdfsDataOutputStreamBuilder getThisBuilder() {
|
||||
return this;
|
||||
}
|
||||
|
||||
private InetSocketAddress[] getFavoredNodes() {
|
||||
return favoredNodes;
|
||||
}
|
||||
|
||||
public HdfsDataOutputStreamBuilder setFavoredNodes(
|
||||
/**
|
||||
* Set favored DataNodes.
|
||||
* @param nodes the addresses of the favored DataNodes.
|
||||
*/
|
||||
public HdfsDataOutputStreamBuilder favoredNodes(
|
||||
@Nonnull final InetSocketAddress[] nodes) {
|
||||
Preconditions.checkNotNull(nodes);
|
||||
favoredNodes = nodes.clone();
|
||||
return this;
|
||||
}
|
||||
|
||||
protected String getEcPolicyName() {
|
||||
/**
|
||||
* Force closed blocks to disk.
|
||||
*
|
||||
* @see CreateFlag for the details.
|
||||
*/
|
||||
public HdfsDataOutputStreamBuilder syncBlock() {
|
||||
getFlags().add(CreateFlag.SYNC_BLOCK);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the block on transient storage if possible.
|
||||
*
|
||||
* @see CreateFlag for the details.
|
||||
*/
|
||||
public HdfsDataOutputStreamBuilder lazyPersist() {
|
||||
getFlags().add(CreateFlag.LAZY_PERSIST);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Append data to a new block instead of the end of the last partial block.
|
||||
*
|
||||
* @see CreateFlag for the details.
|
||||
*/
|
||||
public HdfsDataOutputStreamBuilder newBlock() {
|
||||
getFlags().add(CreateFlag.NEW_BLOCK);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Advise that a block replica NOT be written to the local DataNode.
|
||||
*
|
||||
* @see CreateFlag for the details.
|
||||
*/
|
||||
public HdfsDataOutputStreamBuilder noLocalWrite() {
|
||||
getFlags().add(CreateFlag.NO_LOCAL_WRITE);
|
||||
return this;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
String getEcPolicyName() {
|
||||
return ecPolicyName;
|
||||
}
|
||||
|
||||
|
@ -2722,17 +2819,17 @@ public class DistributedFileSystem extends FileSystem {
|
|||
* or erasure coding policy is. Don't call this function and
|
||||
* enforceReplicate() in the same builder since they have conflict
|
||||
* of interest.
|
||||
*
|
||||
*/
|
||||
public HdfsDataOutputStreamBuilder setEcPolicyName(
|
||||
public HdfsDataOutputStreamBuilder ecPolicyName(
|
||||
@Nonnull final String policyName) {
|
||||
Preconditions.checkNotNull(policyName);
|
||||
ecPolicyName = policyName;
|
||||
return this;
|
||||
}
|
||||
|
||||
public boolean shouldReplicate() {
|
||||
return shouldReplicate;
|
||||
@VisibleForTesting
|
||||
boolean shouldReplicate() {
|
||||
return getFlags().contains(CreateFlag.SHOULD_REPLICATE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2742,30 +2839,46 @@ public class DistributedFileSystem extends FileSystem {
|
|||
* conflict of interest.
|
||||
*/
|
||||
public HdfsDataOutputStreamBuilder replicate() {
|
||||
shouldReplicate = true;
|
||||
getFlags().add(CreateFlag.SHOULD_REPLICATE);
|
||||
return this;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@Override
|
||||
protected EnumSet<CreateFlag> getFlags() {
|
||||
return super.getFlags();
|
||||
}
|
||||
|
||||
/**
|
||||
* Build HdfsDataOutputStream to write.
|
||||
*
|
||||
* @return a fully-initialized OutputStream.
|
||||
* @throws IOException on I/O errors.
|
||||
*/
|
||||
@Override
|
||||
public HdfsDataOutputStream build() throws IOException {
|
||||
Preconditions.checkState(
|
||||
!(shouldReplicate() && (!StringUtils.isEmpty(getEcPolicyName()))),
|
||||
"shouldReplicate and ecPolicyName are " +
|
||||
"exclusive parameters. Set both is not allowed!");
|
||||
|
||||
EnumSet<CreateFlag> createFlags = getFlags();
|
||||
if (shouldReplicate()) {
|
||||
createFlags.add(CreateFlag.SHOULD_REPLICATE);
|
||||
if (isRecursive()) {
|
||||
return dfs.create(getPath(), getPermission(), getFlags(),
|
||||
getBufferSize(), getReplication(), getBlockSize(),
|
||||
getProgress(), getChecksumOpt(), getFavoredNodes(),
|
||||
getEcPolicyName());
|
||||
} else {
|
||||
return dfs.createNonRecursive(getPath(), getPermission(), getFlags(),
|
||||
getBufferSize(), getReplication(), getBlockSize(), getProgress(),
|
||||
getChecksumOpt(), getFavoredNodes(), getEcPolicyName());
|
||||
}
|
||||
return dfs.create(getPath(), getPermission(), createFlags,
|
||||
getBufferSize(), getReplication(), getBlockSize(),
|
||||
getProgress(), getChecksumOpt(), getFavoredNodes(),
|
||||
getEcPolicyName());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a HdfsDataOutputStreamBuilder to create a file on DFS.
|
||||
* Similar to {@link #create(Path)}, file is overwritten by default.
|
||||
*
|
||||
* @param path the path of the file to create.
|
||||
* @return A HdfsDataOutputStreamBuilder for creating a file.
|
||||
*/
|
||||
@Override
|
||||
public HdfsDataOutputStreamBuilder newFSDataOutputStreamBuilder(Path path) {
|
||||
return new HdfsDataOutputStreamBuilder(this, path);
|
||||
public HdfsDataOutputStreamBuilder createFile(Path path) {
|
||||
return new HdfsDataOutputStreamBuilder(this, path).create().overwrite(true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import java.net.URI;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -36,7 +35,6 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
|
@ -246,10 +244,8 @@ public class NameNodeConnector implements Closeable {
|
|||
fs.delete(idPath, true);
|
||||
}
|
||||
|
||||
final FSDataOutputStream fsout = fs.newFSDataOutputStreamBuilder(idPath)
|
||||
.replicate()
|
||||
.setFlags(EnumSet.of(CreateFlag.CREATE))
|
||||
.build();
|
||||
final FSDataOutputStream fsout = fs.createFile(idPath)
|
||||
.replicate().recursive().build();
|
||||
|
||||
Preconditions.checkState(
|
||||
fsout.hasCapability(StreamCapability.HFLUSH.getValue())
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|||
import org.apache.hadoop.fs.CreateFlag;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||
import org.apache.hadoop.fs.FileSystem.Statistics.StatisticsData;
|
||||
import org.apache.hadoop.fs.FsServerDefaults;
|
||||
import org.apache.hadoop.fs.FileChecksum;
|
||||
|
@ -71,6 +72,7 @@ import org.apache.hadoop.fs.StorageStatistics.LongStatistic;
|
|||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.fs.contract.ContractTestUtils;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem.HdfsDataOutputStreamBuilder;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
|
||||
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
|
||||
|
@ -1411,36 +1413,88 @@ public class TestDistributedFileSystem {
|
|||
}
|
||||
}
|
||||
|
||||
private void testBuilderSetters(DistributedFileSystem fs) {
|
||||
Path testFilePath = new Path("/testBuilderSetters");
|
||||
HdfsDataOutputStreamBuilder builder = fs.createFile(testFilePath);
|
||||
|
||||
builder.append().overwrite(false).newBlock().lazyPersist().noLocalWrite()
|
||||
.ecPolicyName("ec-policy");
|
||||
EnumSet<CreateFlag> flags = builder.getFlags();
|
||||
assertTrue(flags.contains(CreateFlag.APPEND));
|
||||
assertTrue(flags.contains(CreateFlag.CREATE));
|
||||
assertTrue(flags.contains(CreateFlag.NEW_BLOCK));
|
||||
assertTrue(flags.contains(CreateFlag.NO_LOCAL_WRITE));
|
||||
assertFalse(flags.contains(CreateFlag.OVERWRITE));
|
||||
assertFalse(flags.contains(CreateFlag.SYNC_BLOCK));
|
||||
|
||||
assertEquals("ec-policy", builder.getEcPolicyName());
|
||||
assertFalse(builder.shouldReplicate());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHdfsDataOutputStreamBuilderSetParameters()
|
||||
throws IOException {
|
||||
Configuration conf = getTestConfiguration();
|
||||
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1).build()) {
|
||||
cluster.waitActive();
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
|
||||
testBuilderSetters(fs);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDFSDataOutputStreamBuilder() throws Exception {
|
||||
Configuration conf = getTestConfiguration();
|
||||
MiniDFSCluster cluster = null;
|
||||
String testFile = "/testDFSDataOutputStreamBuilder";
|
||||
Path testFilePath = new Path(testFile);
|
||||
try {
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1).build()) {
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
|
||||
// Test create an empty file
|
||||
FSDataOutputStream out =
|
||||
fs.newFSDataOutputStreamBuilder(testFilePath).build();
|
||||
out.close();
|
||||
try (FSDataOutputStream out =
|
||||
fs.createFile(testFilePath).build()) {
|
||||
LOG.info("Test create an empty file");
|
||||
}
|
||||
|
||||
// Test create a file with content, and verify the content
|
||||
String content = "This is a test!";
|
||||
out = fs.newFSDataOutputStreamBuilder(testFilePath)
|
||||
.setBufferSize(4096).setReplication((short) 1)
|
||||
.setBlockSize(4096).build();
|
||||
byte[] contentOrigin = content.getBytes("UTF8");
|
||||
out.write(contentOrigin);
|
||||
out.close();
|
||||
try (FSDataOutputStream out1 = fs.createFile(testFilePath)
|
||||
.bufferSize(4096)
|
||||
.replication((short) 1)
|
||||
.blockSize(4096)
|
||||
.build()) {
|
||||
byte[] contentOrigin = content.getBytes("UTF8");
|
||||
out1.write(contentOrigin);
|
||||
}
|
||||
|
||||
ContractTestUtils.verifyFileContents(fs, testFilePath,
|
||||
content.getBytes());
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
|
||||
try (FSDataOutputStream out = fs.createFile(testFilePath).overwrite(false)
|
||||
.build()) {
|
||||
fail("it should fail to overwrite an existing file");
|
||||
} catch (FileAlreadyExistsException e) {
|
||||
// As expected, ignore.
|
||||
}
|
||||
|
||||
Path nonParentFile = new Path("/parent/test");
|
||||
try (FSDataOutputStream out = fs.createFile(nonParentFile).build()) {
|
||||
fail("parent directory not exist");
|
||||
} catch (FileNotFoundException e) {
|
||||
// As expected.
|
||||
}
|
||||
assertFalse("parent directory should not be created",
|
||||
fs.exists(new Path("/parent")));
|
||||
|
||||
try (FSDataOutputStream out = fs.createFile(nonParentFile).recursive()
|
||||
.build()) {
|
||||
out.write(1);
|
||||
}
|
||||
assertTrue("parent directory has not been created",
|
||||
fs.exists(new Path("/parent")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -540,15 +540,14 @@ public class TestErasureCodingPolicies {
|
|||
fs.setErasureCodingPolicy(dirPath, EC_POLICY.getName());
|
||||
|
||||
// null EC policy name value means inheriting parent directory's policy
|
||||
fs.newFSDataOutputStreamBuilder(filePath0).build().close();
|
||||
fs.createFile(filePath0).build().close();
|
||||
ErasureCodingPolicy ecPolicyOnFile = fs.getErasureCodingPolicy(filePath0);
|
||||
assertEquals(EC_POLICY, ecPolicyOnFile);
|
||||
|
||||
// Test illegal EC policy name
|
||||
final String illegalPolicyName = "RS-DEFAULT-1-2-64k";
|
||||
try {
|
||||
fs.newFSDataOutputStreamBuilder(filePath1)
|
||||
.setEcPolicyName(illegalPolicyName).build().close();
|
||||
fs.createFile(filePath1).ecPolicyName(illegalPolicyName).build().close();
|
||||
Assert.fail("illegal erasure coding policy should not be found");
|
||||
} catch (Exception e) {
|
||||
GenericTestUtils.assertExceptionContains("Policy '" + illegalPolicyName
|
||||
|
@ -563,8 +562,8 @@ public class TestErasureCodingPolicies {
|
|||
SystemErasureCodingPolicies.RS_3_2_POLICY_ID);
|
||||
ecPolicyOnFile = EC_POLICY;
|
||||
fs.setErasureCodingPolicy(dirPath, ecPolicyOnDir.getName());
|
||||
fs.newFSDataOutputStreamBuilder(filePath0)
|
||||
.setEcPolicyName(ecPolicyOnFile.getName()).build().close();
|
||||
fs.createFile(filePath0).ecPolicyName(ecPolicyOnFile.getName())
|
||||
.build().close();
|
||||
assertEquals(ecPolicyOnFile, fs.getErasureCodingPolicy(filePath0));
|
||||
assertEquals(ecPolicyOnDir, fs.getErasureCodingPolicy(dirPath));
|
||||
fs.delete(dirPath, true);
|
||||
|
@ -582,27 +581,27 @@ public class TestErasureCodingPolicies {
|
|||
fs.setErasureCodingPolicy(dirPath, EC_POLICY.getName());
|
||||
|
||||
final String ecPolicyName = "RS-10-4-64k";
|
||||
fs.newFSDataOutputStreamBuilder(filePath).build().close();
|
||||
fs.createFile(filePath).build().close();
|
||||
assertEquals(EC_POLICY, fs.getErasureCodingPolicy(filePath));
|
||||
fs.delete(filePath, true);
|
||||
|
||||
fs.newFSDataOutputStreamBuilder(filePath)
|
||||
.setEcPolicyName(ecPolicyName)
|
||||
fs.createFile(filePath)
|
||||
.ecPolicyName(ecPolicyName)
|
||||
.build()
|
||||
.close();
|
||||
assertEquals(ecPolicyName, fs.getErasureCodingPolicy(filePath).getName());
|
||||
fs.delete(filePath, true);
|
||||
|
||||
try {
|
||||
fs.newFSDataOutputStreamBuilder(filePath)
|
||||
.setEcPolicyName(ecPolicyName)
|
||||
fs.createFile(filePath)
|
||||
.ecPolicyName(ecPolicyName)
|
||||
.replicate()
|
||||
.build().close();
|
||||
Assert.fail("shouldReplicate and ecPolicyName are exclusive " +
|
||||
"parameters. Set both is not allowed.");
|
||||
}catch (Exception e){
|
||||
GenericTestUtils.assertExceptionContains("shouldReplicate and " +
|
||||
"ecPolicyName are exclusive parameters. Set both is not allowed!", e);
|
||||
GenericTestUtils.assertExceptionContains("SHOULD_REPLICATE flag and " +
|
||||
"ecPolicyName are exclusive parameters.", e);
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -618,7 +617,7 @@ public class TestErasureCodingPolicies {
|
|||
"ecPolicyName are exclusive parameters. Set both is not allowed!", e);
|
||||
}
|
||||
|
||||
fs.newFSDataOutputStreamBuilder(filePath)
|
||||
fs.createFile(filePath)
|
||||
.replicate()
|
||||
.build()
|
||||
.close();
|
||||
|
|
|
@ -199,7 +199,7 @@ public class TestFavoredNodesEndToEnd {
|
|||
InetSocketAddress[] dns = getDatanodes(rand);
|
||||
Path p = new Path("/filename"+i);
|
||||
FSDataOutputStream out =
|
||||
dfs.newFSDataOutputStreamBuilder(p).setFavoredNodes(dns).build();
|
||||
dfs.createFile(p).favoredNodes(dns).build();
|
||||
out.write(SOME_BYTES);
|
||||
out.close();
|
||||
BlockLocation[] locations = getBlockLocations(p);
|
||||
|
|
Loading…
Reference in New Issue