HADOOP-14395. Provide Builder pattern for DistributedFileSystem.append. Contributed by Lei (Eddy) Xu.
(cherry picked from commit 6460df21a0
)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
This commit is contained in:
parent
9529513f18
commit
33afa1fdca
|
@ -4163,4 +4163,14 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|||
return new FileSystemDataOutputStreamBuilder(this, path)
|
||||
.create().overwrite(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a Builder to append a file.
|
||||
* @param path file path.
|
||||
* @return a {@link FSDataOutputStreamBuilder} to build file append request.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
protected FSDataOutputStreamBuilder appendFile(Path path) {
|
||||
return new FileSystemDataOutputStreamBuilder(this, path).append();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,6 +29,7 @@ import java.util.EnumSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -2652,7 +2653,7 @@ public class DistributedFileSystem extends FileSystem {
|
|||
*/
|
||||
public static final class HdfsDataOutputStreamBuilder
|
||||
extends FSDataOutputStreamBuilder<
|
||||
HdfsDataOutputStream, HdfsDataOutputStreamBuilder> {
|
||||
FSDataOutputStream, HdfsDataOutputStreamBuilder> {
|
||||
private final DistributedFileSystem dfs;
|
||||
private InetSocketAddress[] favoredNodes = null;
|
||||
|
||||
|
@ -2739,7 +2740,8 @@ public class DistributedFileSystem extends FileSystem {
|
|||
* @throws IOException on I/O errors.
|
||||
*/
|
||||
@Override
|
||||
public HdfsDataOutputStream build() throws IOException {
|
||||
public FSDataOutputStream build() throws IOException {
|
||||
if (getFlags().contains(CreateFlag.CREATE)) {
|
||||
if (isRecursive()) {
|
||||
return dfs.create(getPath(), getPermission(), getFlags(),
|
||||
getBufferSize(), getReplication(), getBlockSize(),
|
||||
|
@ -2749,6 +2751,12 @@ public class DistributedFileSystem extends FileSystem {
|
|||
getBufferSize(), getReplication(), getBlockSize(), getProgress(),
|
||||
getChecksumOpt(), getFavoredNodes());
|
||||
}
|
||||
} else if (getFlags().contains(CreateFlag.APPEND)) {
|
||||
return dfs.append(getPath(), getFlags(), getBufferSize(), getProgress(),
|
||||
getFavoredNodes());
|
||||
}
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Must specify either create or append");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2763,4 +2771,15 @@ public class DistributedFileSystem extends FileSystem {
|
|||
public HdfsDataOutputStreamBuilder createFile(Path path) {
|
||||
return new HdfsDataOutputStreamBuilder(this, path).create().overwrite(true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a {@link HdfsDataOutputStreamBuilder} to append a file on DFS.
|
||||
*
|
||||
* @param path file path.
|
||||
* @return A {@link HdfsDataOutputStreamBuilder} for appending a file.
|
||||
*/
|
||||
@Override
|
||||
public HdfsDataOutputStreamBuilder appendFile(Path path) {
|
||||
return new HdfsDataOutputStreamBuilder(this, path).append();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,6 +53,7 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.BlockLocation;
|
||||
import org.apache.hadoop.fs.BlockStorageLocation;
|
||||
|
@ -1572,7 +1573,7 @@ public class TestDistributedFileSystem {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testDFSDataOutputStreamBuilder() throws Exception {
|
||||
public void testDFSDataOutputStreamBuilderForCreation() throws Exception {
|
||||
Configuration conf = getTestConfiguration();
|
||||
String testFile = "/testDFSDataOutputStreamBuilder";
|
||||
Path testFilePath = new Path(testFile);
|
||||
|
@ -1580,6 +1581,11 @@ public class TestDistributedFileSystem {
|
|||
.numDataNodes(1).build()) {
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
|
||||
// Before calling build(), no change was made in the file system
|
||||
HdfsDataOutputStreamBuilder builder = fs.createFile(testFilePath)
|
||||
.blockSize(4096).replication((short)1);
|
||||
assertFalse(fs.exists(testFilePath));
|
||||
|
||||
// Test create an empty file
|
||||
try (FSDataOutputStream out =
|
||||
fs.createFile(testFilePath).build()) {
|
||||
|
@ -1624,4 +1630,39 @@ public class TestDistributedFileSystem {
|
|||
fs.exists(new Path("/parent")));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDFSDataOutputStreamBuilderForAppend() throws IOException {
|
||||
Configuration conf = getTestConfiguration();
|
||||
String testFile = "/testDFSDataOutputStreamBuilderForAppend";
|
||||
Path path = new Path(testFile);
|
||||
Random random = new Random();
|
||||
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1).build()) {
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
|
||||
byte[] buf = new byte[16];
|
||||
random.nextBytes(buf);
|
||||
|
||||
try (FSDataOutputStream out = fs.appendFile(path).build()) {
|
||||
out.write(buf);
|
||||
fail("should fail on appending to non-existent file");
|
||||
} catch (IOException e) {
|
||||
GenericTestUtils.assertExceptionContains("non-existent", e);
|
||||
}
|
||||
|
||||
random.nextBytes(buf);
|
||||
try (FSDataOutputStream out = fs.createFile(path).build()) {
|
||||
out.write(buf);
|
||||
}
|
||||
|
||||
random.nextBytes(buf);
|
||||
try (FSDataOutputStream out = fs.appendFile(path).build()) {
|
||||
out.write(buf);
|
||||
}
|
||||
|
||||
FileStatus status = fs.getFileStatus(path);
|
||||
assertEquals(16 * 2, status.getLen());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue