From 33afa1fdca8dfafc0214504626650fb25aec0b95 Mon Sep 17 00:00:00 2001 From: Lei Xu Date: Fri, 16 Jun 2017 17:24:00 -0700 Subject: [PATCH] HADOOP-14395. Provide Builder pattern for DistributedFileSystem.append. Contributed by Lei (Eddy) Xu. (cherry picked from commit 6460df21a09a7fcc29eceb8dc3859d6298da6882) Conflicts: hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java --- .../java/org/apache/hadoop/fs/FileSystem.java | 10 +++++ .../hadoop/hdfs/DistributedFileSystem.java | 39 ++++++++++++----- .../hdfs/TestDistributedFileSystem.java | 43 ++++++++++++++++++- 3 files changed, 81 insertions(+), 11 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index e1329f44ed0..38e53a487ce 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -4163,4 +4163,14 @@ public abstract class FileSystem extends Configured implements Closeable { return new FileSystemDataOutputStreamBuilder(this, path) .create().overwrite(true); } + + /** + * Create a Builder to append a file. + * @param path file path. + * @return a {@link FSDataOutputStreamBuilder} to build file append request. + */ + @InterfaceAudience.Private + protected FSDataOutputStreamBuilder appendFile(Path path) { + return new FileSystemDataOutputStreamBuilder(this, path).append(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 3e7c899c16d..9bccf77cb0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -29,6 +29,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Map; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -2652,7 +2653,7 @@ public class DistributedFileSystem extends FileSystem { */ public static final class HdfsDataOutputStreamBuilder extends FSDataOutputStreamBuilder< - HdfsDataOutputStream, HdfsDataOutputStreamBuilder> { + FSDataOutputStream, HdfsDataOutputStreamBuilder> { private final DistributedFileSystem dfs; private InetSocketAddress[] favoredNodes = null; @@ -2739,16 +2740,23 @@ public class DistributedFileSystem extends FileSystem { * @throws IOException on I/O errors. */ @Override - public HdfsDataOutputStream build() throws IOException { - if (isRecursive()) { - return dfs.create(getPath(), getPermission(), getFlags(), - getBufferSize(), getReplication(), getBlockSize(), - getProgress(), getChecksumOpt(), getFavoredNodes()); - } else { - return dfs.createNonRecursive(getPath(), getPermission(), getFlags(), - getBufferSize(), getReplication(), getBlockSize(), getProgress(), - getChecksumOpt(), getFavoredNodes()); + public FSDataOutputStream build() throws IOException { + if (getFlags().contains(CreateFlag.CREATE)) { + if (isRecursive()) { + return dfs.create(getPath(), getPermission(), getFlags(), + getBufferSize(), getReplication(), getBlockSize(), + getProgress(), getChecksumOpt(), getFavoredNodes()); + } else { + return dfs.createNonRecursive(getPath(), getPermission(), getFlags(), + getBufferSize(), getReplication(), getBlockSize(), getProgress(), + getChecksumOpt(), getFavoredNodes()); + } + } else if (getFlags().contains(CreateFlag.APPEND)) { + return dfs.append(getPath(), getFlags(), getBufferSize(), getProgress(), + getFavoredNodes()); } + throw new HadoopIllegalArgumentException( + "Must specify either create or append"); } } @@ -2763,4 +2771,15 @@ public class DistributedFileSystem extends FileSystem { public HdfsDataOutputStreamBuilder createFile(Path path) { return new HdfsDataOutputStreamBuilder(this, path).create().overwrite(true); } + + /** + * Create a {@link HdfsDataOutputStreamBuilder} to append a file on DFS. + * + * @param path file path. + * @return A {@link HdfsDataOutputStreamBuilder} for appending a file. + */ + @Override + public HdfsDataOutputStreamBuilder appendFile(Path path) { + return new HdfsDataOutputStreamBuilder(this, path).append(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index fe2dbb9a86d..0cff7d491f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -53,6 +53,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicReference; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockStorageLocation; @@ -1572,7 +1573,7 @@ public class TestDistributedFileSystem { } @Test - public void testDFSDataOutputStreamBuilder() throws Exception { + public void testDFSDataOutputStreamBuilderForCreation() throws Exception { Configuration conf = getTestConfiguration(); String testFile = "/testDFSDataOutputStreamBuilder"; Path testFilePath = new Path(testFile); @@ -1580,6 +1581,11 @@ public class TestDistributedFileSystem { .numDataNodes(1).build()) { DistributedFileSystem fs = cluster.getFileSystem(); + // Before calling build(), no change was made in the file system + HdfsDataOutputStreamBuilder builder = fs.createFile(testFilePath) + .blockSize(4096).replication((short)1); + assertFalse(fs.exists(testFilePath)); + // Test create an empty file try (FSDataOutputStream out = fs.createFile(testFilePath).build()) { @@ -1624,4 +1630,39 @@ public class TestDistributedFileSystem { fs.exists(new Path("/parent"))); } } + + @Test + public void testDFSDataOutputStreamBuilderForAppend() throws IOException { + Configuration conf = getTestConfiguration(); + String testFile = "/testDFSDataOutputStreamBuilderForAppend"; + Path path = new Path(testFile); + Random random = new Random(); + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1).build()) { + DistributedFileSystem fs = cluster.getFileSystem(); + + byte[] buf = new byte[16]; + random.nextBytes(buf); + + try (FSDataOutputStream out = fs.appendFile(path).build()) { + out.write(buf); + fail("should fail on appending to non-existent file"); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains("non-existent", e); + } + + random.nextBytes(buf); + try (FSDataOutputStream out = fs.createFile(path).build()) { + out.write(buf); + } + + random.nextBytes(buf); + try (FSDataOutputStream out = fs.appendFile(path).build()) { + out.write(buf); + } + + FileStatus status = fs.getFileStatus(path); + assertEquals(16 * 2, status.getLen()); + } + } }