HADOOP-14397. Pull up the builder pattern to FileSystem and add AbstractContractCreateTest for it. (Lei (Eddy) Xu)
This commit is contained in:
parent
abbf4129a2
commit
9586b0e24f
|
@ -44,8 +44,8 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_
|
|||
*
|
||||
* To create missing parent directory, use {@link #recursive()}.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public abstract class FSDataOutputStreamBuilder
|
||||
<S extends FSDataOutputStream, B extends FSDataOutputStreamBuilder<S, B>> {
|
||||
private final FileSystem fs;
|
||||
|
|
|
@ -4153,9 +4153,21 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|||
|
||||
@Override
|
||||
public FSDataOutputStream build() throws IOException {
|
||||
return getFS().create(getPath(), getPermission(), getFlags(),
|
||||
getBufferSize(), getReplication(), getBlockSize(), getProgress(),
|
||||
getChecksumOpt());
|
||||
if (getFlags().contains(CreateFlag.CREATE) ||
|
||||
getFlags().contains(CreateFlag.OVERWRITE)) {
|
||||
if (isRecursive()) {
|
||||
return getFS().create(getPath(), getPermission(), getFlags(),
|
||||
getBufferSize(), getReplication(), getBlockSize(), getProgress(),
|
||||
getChecksumOpt());
|
||||
} else {
|
||||
return getFS().createNonRecursive(getPath(), getPermission(),
|
||||
getFlags(), getBufferSize(), getReplication(), getBlockSize(),
|
||||
getProgress());
|
||||
}
|
||||
} else if (getFlags().contains(CreateFlag.APPEND)) {
|
||||
return getFS().append(getPath(), getBufferSize(), getProgress());
|
||||
}
|
||||
throw new IOException("Must specify either create, overwrite or append");
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -4174,8 +4186,7 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|||
* HADOOP-14384. Temporarily reduce the visibility of method before the
|
||||
* builder interface becomes stable.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
protected FSDataOutputStreamBuilder createFile(Path path) {
|
||||
public FSDataOutputStreamBuilder createFile(Path path) {
|
||||
return new FileSystemDataOutputStreamBuilder(this, path)
|
||||
.create().overwrite(true);
|
||||
}
|
||||
|
@ -4185,8 +4196,7 @@ public abstract class FileSystem extends Configured implements Closeable {
|
|||
* @param path file path.
|
||||
* @return a {@link FSDataOutputStreamBuilder} to build file append request.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
protected FSDataOutputStreamBuilder appendFile(Path path) {
|
||||
public FSDataOutputStreamBuilder appendFile(Path path) {
|
||||
return new FileSystemDataOutputStreamBuilder(this, path).append();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -659,7 +659,7 @@ public class TestLocalFileSystem {
|
|||
|
||||
try {
|
||||
FSDataOutputStreamBuilder builder =
|
||||
fileSys.createFile(path);
|
||||
fileSys.createFile(path).recursive();
|
||||
FSDataOutputStream out = builder.build();
|
||||
String content = "Create with a generic type of createFile!";
|
||||
byte[] contentOrigin = content.getBytes("UTF8");
|
||||
|
|
|
@ -60,6 +60,19 @@ public abstract class AbstractContractAppendTest extends AbstractFSContractTestB
|
|||
ContractTestUtils.compareByteArrays(dataset, bytes, dataset.length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBuilderAppendToEmptyFile() throws Throwable {
|
||||
touch(getFileSystem(), target);
|
||||
byte[] dataset = dataset(256, 'a', 'z');
|
||||
try (FSDataOutputStream outputStream =
|
||||
getFileSystem().appendFile(target).build()) {
|
||||
outputStream.write(dataset);
|
||||
}
|
||||
byte[] bytes = ContractTestUtils.readDataset(getFileSystem(), target,
|
||||
dataset.length);
|
||||
ContractTestUtils.compareByteArrays(dataset, bytes, dataset.length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppendNonexistentFile() throws Throwable {
|
||||
try {
|
||||
|
@ -78,15 +91,29 @@ public abstract class AbstractContractAppendTest extends AbstractFSContractTestB
|
|||
byte[] original = dataset(8192, 'A', 'Z');
|
||||
byte[] appended = dataset(8192, '0', '9');
|
||||
createFile(getFileSystem(), target, false, original);
|
||||
FSDataOutputStream outputStream = getFileSystem().append(target);
|
||||
outputStream.write(appended);
|
||||
outputStream.close();
|
||||
try (FSDataOutputStream out = getFileSystem().append(target)) {
|
||||
out.write(appended);
|
||||
}
|
||||
byte[] bytes = ContractTestUtils.readDataset(getFileSystem(), target,
|
||||
original.length + appended.length);
|
||||
ContractTestUtils.validateFileContent(bytes,
|
||||
new byte[] [] { original, appended });
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBuilderAppendToExistingFile() throws Throwable {
|
||||
byte[] original = dataset(8192, 'A', 'Z');
|
||||
byte[] appended = dataset(8192, '0', '9');
|
||||
createFile(getFileSystem(), target, false, original);
|
||||
try (FSDataOutputStream out = getFileSystem().appendFile(target).build()) {
|
||||
out.write(appended);
|
||||
}
|
||||
byte[] bytes = ContractTestUtils.readDataset(getFileSystem(), target,
|
||||
original.length + appended.length);
|
||||
ContractTestUtils.validateFileContent(bytes,
|
||||
new byte[][]{original, appended});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAppendMissingTarget() throws Throwable {
|
||||
try {
|
||||
|
|
|
@ -47,24 +47,37 @@ public abstract class AbstractContractCreateTest extends
|
|||
*/
|
||||
public static final int CREATE_TIMEOUT = 15000;
|
||||
|
||||
@Test
|
||||
public void testCreateNewFile() throws Throwable {
|
||||
describe("Foundational 'create a file' test");
|
||||
Path path = path("testCreateNewFile");
|
||||
protected Path path(String filepath, boolean useBuilder) throws IOException {
|
||||
return super.path(filepath + (useBuilder ? "" : "-builder"));
|
||||
}
|
||||
|
||||
private void testCreateNewFile(boolean useBuilder) throws Throwable {
|
||||
describe("Foundational 'create a file' test, using builder API=" +
|
||||
useBuilder);
|
||||
Path path = path("testCreateNewFile", useBuilder);
|
||||
byte[] data = dataset(256, 'a', 'z');
|
||||
writeDataset(getFileSystem(), path, data, data.length, 1024 * 1024, false);
|
||||
writeDataset(getFileSystem(), path, data, data.length, 1024 * 1024, false,
|
||||
useBuilder);
|
||||
ContractTestUtils.verifyFileContents(getFileSystem(), path, data);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateFileOverExistingFileNoOverwrite() throws Throwable {
|
||||
describe("Verify overwriting an existing file fails");
|
||||
Path path = path("testCreateFileOverExistingFileNoOverwrite");
|
||||
public void testCreateNewFile() throws Throwable {
|
||||
testCreateNewFile(true);
|
||||
testCreateNewFile(false);
|
||||
}
|
||||
|
||||
private void testCreateFileOverExistingFileNoOverwrite(boolean useBuilder)
|
||||
throws Throwable {
|
||||
describe("Verify overwriting an existing file fails, using builder API=" +
|
||||
useBuilder);
|
||||
Path path = path("testCreateFileOverExistingFileNoOverwrite", useBuilder);
|
||||
byte[] data = dataset(256, 'a', 'z');
|
||||
writeDataset(getFileSystem(), path, data, data.length, 1024, false);
|
||||
byte[] data2 = dataset(10 * 1024, 'A', 'Z');
|
||||
try {
|
||||
writeDataset(getFileSystem(), path, data2, data2.length, 1024, false);
|
||||
writeDataset(getFileSystem(), path, data2, data2.length, 1024, false,
|
||||
useBuilder);
|
||||
fail("writing without overwrite unexpectedly succeeded");
|
||||
} catch (FileAlreadyExistsException expected) {
|
||||
//expected
|
||||
|
@ -76,6 +89,26 @@ public abstract class AbstractContractCreateTest extends
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreateFileOverExistingFileNoOverwrite() throws Throwable {
|
||||
testCreateFileOverExistingFileNoOverwrite(false);
|
||||
testCreateFileOverExistingFileNoOverwrite(true);
|
||||
}
|
||||
|
||||
private void testOverwriteExistingFile(boolean useBuilder) throws Throwable {
|
||||
describe("Overwrite an existing file and verify the new data is there, " +
|
||||
"use builder API=" + useBuilder);
|
||||
Path path = path("testOverwriteExistingFile", useBuilder);
|
||||
byte[] data = dataset(256, 'a', 'z');
|
||||
writeDataset(getFileSystem(), path, data, data.length, 1024, false,
|
||||
useBuilder);
|
||||
ContractTestUtils.verifyFileContents(getFileSystem(), path, data);
|
||||
byte[] data2 = dataset(10 * 1024, 'A', 'Z');
|
||||
writeDataset(getFileSystem(), path, data2, data2.length, 1024, true,
|
||||
useBuilder);
|
||||
ContractTestUtils.verifyFileContents(getFileSystem(), path, data2);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test catches some eventual consistency problems that blobstores exhibit,
|
||||
* as we are implicitly verifying that updates are consistent. This
|
||||
|
@ -84,25 +117,21 @@ public abstract class AbstractContractCreateTest extends
|
|||
*/
|
||||
@Test
|
||||
public void testOverwriteExistingFile() throws Throwable {
|
||||
describe("Overwrite an existing file and verify the new data is there");
|
||||
Path path = path("testOverwriteExistingFile");
|
||||
byte[] data = dataset(256, 'a', 'z');
|
||||
writeDataset(getFileSystem(), path, data, data.length, 1024, false);
|
||||
ContractTestUtils.verifyFileContents(getFileSystem(), path, data);
|
||||
byte[] data2 = dataset(10 * 1024, 'A', 'Z');
|
||||
writeDataset(getFileSystem(), path, data2, data2.length, 1024, true);
|
||||
ContractTestUtils.verifyFileContents(getFileSystem(), path, data2);
|
||||
testOverwriteExistingFile(false);
|
||||
testOverwriteExistingFile(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOverwriteEmptyDirectory() throws Throwable {
|
||||
describe("verify trying to create a file over an empty dir fails");
|
||||
private void testOverwriteEmptyDirectory(boolean useBuilder)
|
||||
throws Throwable {
|
||||
describe("verify trying to create a file over an empty dir fails, " +
|
||||
"use builder API=" + useBuilder);
|
||||
Path path = path("testOverwriteEmptyDirectory");
|
||||
mkdirs(path);
|
||||
assertIsDirectory(path);
|
||||
byte[] data = dataset(256, 'a', 'z');
|
||||
try {
|
||||
writeDataset(getFileSystem(), path, data, data.length, 1024, true);
|
||||
writeDataset(getFileSystem(), path, data, data.length, 1024, true,
|
||||
useBuilder);
|
||||
assertIsDirectory(path);
|
||||
fail("write of file over empty dir succeeded");
|
||||
} catch (FileAlreadyExistsException expected) {
|
||||
|
@ -121,8 +150,15 @@ public abstract class AbstractContractCreateTest extends
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testOverwriteNonEmptyDirectory() throws Throwable {
|
||||
describe("verify trying to create a file over a non-empty dir fails");
|
||||
public void testOverwriteEmptyDirectory() throws Throwable {
|
||||
testOverwriteEmptyDirectory(false);
|
||||
testOverwriteEmptyDirectory(true);
|
||||
}
|
||||
|
||||
private void testOverwriteNonEmptyDirectory(boolean useBuilder)
|
||||
throws Throwable {
|
||||
describe("verify trying to create a file over a non-empty dir fails, " +
|
||||
"use builder API=" + useBuilder);
|
||||
Path path = path("testOverwriteNonEmptyDirectory");
|
||||
mkdirs(path);
|
||||
try {
|
||||
|
@ -140,7 +176,7 @@ public abstract class AbstractContractCreateTest extends
|
|||
byte[] data = dataset(256, 'a', 'z');
|
||||
try {
|
||||
writeDataset(getFileSystem(), path, data, data.length, 1024,
|
||||
true);
|
||||
true, useBuilder);
|
||||
FileStatus status = getFileSystem().getFileStatus(path);
|
||||
|
||||
boolean isDir = status.isDirectory();
|
||||
|
@ -166,6 +202,12 @@ public abstract class AbstractContractCreateTest extends
|
|||
assertIsFile(child);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testOverwriteNonEmptyDirectory() throws Throwable {
|
||||
testOverwriteNonEmptyDirectory(false);
|
||||
testOverwriteNonEmptyDirectory(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCreatedFileIsImmediatelyVisible() throws Throwable {
|
||||
describe("verify that a newly created file exists as soon as open returns");
|
||||
|
|
|
@ -146,16 +146,45 @@ public class ContractTestUtils extends Assert {
|
|||
int len,
|
||||
int buffersize,
|
||||
boolean overwrite) throws IOException {
|
||||
writeDataset(fs, path, src, len, buffersize, overwrite, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Write a file.
|
||||
* Optional flags control
|
||||
* whether file overwrite operations should be enabled
|
||||
* Optional using {@link org.apache.hadoop.fs.FSDataOutputStreamBuilder}
|
||||
*
|
||||
* @param fs filesystem
|
||||
* @param path path to write to
|
||||
* @param len length of data
|
||||
* @param overwrite should the create option allow overwrites?
|
||||
* @param useBuilder should use builder API to create file?
|
||||
* @throws IOException IO problems
|
||||
*/
|
||||
public static void writeDataset(FileSystem fs, Path path, byte[] src,
|
||||
int len, int buffersize, boolean overwrite, boolean useBuilder)
|
||||
throws IOException {
|
||||
assertTrue(
|
||||
"Not enough data in source array to write " + len + " bytes",
|
||||
src.length >= len);
|
||||
FSDataOutputStream out = fs.create(path,
|
||||
overwrite,
|
||||
fs.getConf()
|
||||
.getInt(IO_FILE_BUFFER_SIZE_KEY,
|
||||
IO_FILE_BUFFER_SIZE_DEFAULT),
|
||||
(short) 1,
|
||||
buffersize);
|
||||
FSDataOutputStream out;
|
||||
if (useBuilder) {
|
||||
out = fs.createFile(path)
|
||||
.overwrite(overwrite)
|
||||
.replication((short) 1)
|
||||
.bufferSize(buffersize)
|
||||
.blockSize(buffersize)
|
||||
.build();
|
||||
} else {
|
||||
out = fs.create(path,
|
||||
overwrite,
|
||||
fs.getConf()
|
||||
.getInt(IO_FILE_BUFFER_SIZE_KEY,
|
||||
IO_FILE_BUFFER_SIZE_DEFAULT),
|
||||
(short) 1,
|
||||
buffersize);
|
||||
}
|
||||
out.write(src, 0, len);
|
||||
out.close();
|
||||
assertFileHasLength(fs, path, len);
|
||||
|
|
|
@ -2892,7 +2892,8 @@ public class DistributedFileSystem extends FileSystem {
|
|||
*/
|
||||
@Override
|
||||
public FSDataOutputStream build() throws IOException {
|
||||
if (getFlags().contains(CreateFlag.CREATE)) {
|
||||
if (getFlags().contains(CreateFlag.CREATE) ||
|
||||
getFlags().contains(CreateFlag.OVERWRITE)) {
|
||||
if (isRecursive()) {
|
||||
return dfs.create(getPath(), getPermission(), getFlags(),
|
||||
getBufferSize(), getReplication(), getBlockSize(),
|
||||
|
|
Loading…
Reference in New Issue