HADOOP-14397. Pull up the builder pattern to FileSystem and add AbstractContractCreateTest for it. (Lei (Eddy) Xu)

(cherry picked from commit 667ee003bf47e44beb3fdff8d06a7264a13dd22c)
This commit is contained in:
Lei Xu 2017-07-31 20:04:57 -07:00
parent 60ae10b14f
commit f09d20cffb
7 changed files with 154 additions and 45 deletions

View File

@ -44,8 +44,8 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_
* *
* To create missing parent directory, use {@link #recursive()}. * To create missing parent directory, use {@link #recursive()}.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Public
@InterfaceStability.Unstable @InterfaceStability.Evolving
public abstract class FSDataOutputStreamBuilder public abstract class FSDataOutputStreamBuilder
<S extends FSDataOutputStream, B extends FSDataOutputStreamBuilder<S, B>> { <S extends FSDataOutputStream, B extends FSDataOutputStreamBuilder<S, B>> {
private final FileSystem fs; private final FileSystem fs;

View File

@ -4137,9 +4137,21 @@ public abstract class FileSystem extends Configured implements Closeable {
@Override @Override
public FSDataOutputStream build() throws IOException { public FSDataOutputStream build() throws IOException {
if (getFlags().contains(CreateFlag.CREATE) ||
getFlags().contains(CreateFlag.OVERWRITE)) {
if (isRecursive()) {
return getFS().create(getPath(), getPermission(), getFlags(), return getFS().create(getPath(), getPermission(), getFlags(),
getBufferSize(), getReplication(), getBlockSize(), getProgress(), getBufferSize(), getReplication(), getBlockSize(), getProgress(),
getChecksumOpt()); getChecksumOpt());
} else {
return getFS().createNonRecursive(getPath(), getPermission(),
getFlags(), getBufferSize(), getReplication(), getBlockSize(),
getProgress());
}
} else if (getFlags().contains(CreateFlag.APPEND)) {
return getFS().append(getPath(), getBufferSize(), getProgress());
}
throw new IOException("Must specify either create, overwrite or append");
} }
@Override @Override
@ -4158,8 +4170,7 @@ public abstract class FileSystem extends Configured implements Closeable {
* HADOOP-14384. Temporarily reduce the visibility of method before the * HADOOP-14384. Temporarily reduce the visibility of method before the
* builder interface becomes stable. * builder interface becomes stable.
*/ */
@InterfaceAudience.Private public FSDataOutputStreamBuilder createFile(Path path) {
protected FSDataOutputStreamBuilder createFile(Path path) {
return new FileSystemDataOutputStreamBuilder(this, path) return new FileSystemDataOutputStreamBuilder(this, path)
.create().overwrite(true); .create().overwrite(true);
} }
@ -4169,8 +4180,7 @@ public abstract class FileSystem extends Configured implements Closeable {
* @param path file path. * @param path file path.
* @return a {@link FSDataOutputStreamBuilder} to build file append request. * @return a {@link FSDataOutputStreamBuilder} to build file append request.
*/ */
@InterfaceAudience.Private public FSDataOutputStreamBuilder appendFile(Path path) {
protected FSDataOutputStreamBuilder appendFile(Path path) {
return new FileSystemDataOutputStreamBuilder(this, path).append(); return new FileSystemDataOutputStreamBuilder(this, path).append();
} }
} }

View File

@ -654,7 +654,7 @@ public class TestLocalFileSystem {
try { try {
FSDataOutputStreamBuilder builder = FSDataOutputStreamBuilder builder =
fileSys.createFile(path); fileSys.createFile(path).recursive();
FSDataOutputStream out = builder.build(); FSDataOutputStream out = builder.build();
String content = "Create with a generic type of createFile!"; String content = "Create with a generic type of createFile!";
byte[] contentOrigin = content.getBytes("UTF8"); byte[] contentOrigin = content.getBytes("UTF8");

View File

@ -60,6 +60,19 @@ public abstract class AbstractContractAppendTest extends AbstractFSContractTestB
ContractTestUtils.compareByteArrays(dataset, bytes, dataset.length); ContractTestUtils.compareByteArrays(dataset, bytes, dataset.length);
} }
@Test
public void testBuilderAppendToEmptyFile() throws Throwable {
touch(getFileSystem(), target);
byte[] dataset = dataset(256, 'a', 'z');
try (FSDataOutputStream outputStream =
getFileSystem().appendFile(target).build()) {
outputStream.write(dataset);
}
byte[] bytes = ContractTestUtils.readDataset(getFileSystem(), target,
dataset.length);
ContractTestUtils.compareByteArrays(dataset, bytes, dataset.length);
}
@Test @Test
public void testAppendNonexistentFile() throws Throwable { public void testAppendNonexistentFile() throws Throwable {
try { try {
@ -78,9 +91,23 @@ public abstract class AbstractContractAppendTest extends AbstractFSContractTestB
byte[] original = dataset(8192, 'A', 'Z'); byte[] original = dataset(8192, 'A', 'Z');
byte[] appended = dataset(8192, '0', '9'); byte[] appended = dataset(8192, '0', '9');
createFile(getFileSystem(), target, false, original); createFile(getFileSystem(), target, false, original);
FSDataOutputStream outputStream = getFileSystem().append(target); try (FSDataOutputStream out = getFileSystem().append(target)) {
outputStream.write(appended); out.write(appended);
outputStream.close(); }
byte[] bytes = ContractTestUtils.readDataset(getFileSystem(), target,
original.length + appended.length);
ContractTestUtils.validateFileContent(bytes,
new byte[] [] { original, appended });
}
@Test
public void testBuilderAppendToExistingFile() throws Throwable {
byte[] original = dataset(8192, 'A', 'Z');
byte[] appended = dataset(8192, '0', '9');
createFile(getFileSystem(), target, false, original);
try (FSDataOutputStream out = getFileSystem().appendFile(target).build()) {
out.write(appended);
}
byte[] bytes = ContractTestUtils.readDataset(getFileSystem(), target, byte[] bytes = ContractTestUtils.readDataset(getFileSystem(), target,
original.length + appended.length); original.length + appended.length);
ContractTestUtils.validateFileContent(bytes, ContractTestUtils.validateFileContent(bytes,

View File

@ -47,24 +47,37 @@ public abstract class AbstractContractCreateTest extends
*/ */
public static final int CREATE_TIMEOUT = 15000; public static final int CREATE_TIMEOUT = 15000;
@Test protected Path path(String filepath, boolean useBuilder) throws IOException {
public void testCreateNewFile() throws Throwable { return super.path(filepath + (useBuilder ? "" : "-builder"));
describe("Foundational 'create a file' test"); }
Path path = path("testCreateNewFile");
private void testCreateNewFile(boolean useBuilder) throws Throwable {
describe("Foundational 'create a file' test, using builder API=" +
useBuilder);
Path path = path("testCreateNewFile", useBuilder);
byte[] data = dataset(256, 'a', 'z'); byte[] data = dataset(256, 'a', 'z');
writeDataset(getFileSystem(), path, data, data.length, 1024 * 1024, false); writeDataset(getFileSystem(), path, data, data.length, 1024 * 1024, false,
useBuilder);
ContractTestUtils.verifyFileContents(getFileSystem(), path, data); ContractTestUtils.verifyFileContents(getFileSystem(), path, data);
} }
@Test @Test
public void testCreateFileOverExistingFileNoOverwrite() throws Throwable { public void testCreateNewFile() throws Throwable {
describe("Verify overwriting an existing file fails"); testCreateNewFile(true);
Path path = path("testCreateFileOverExistingFileNoOverwrite"); testCreateNewFile(false);
}
private void testCreateFileOverExistingFileNoOverwrite(boolean useBuilder)
throws Throwable {
describe("Verify overwriting an existing file fails, using builder API=" +
useBuilder);
Path path = path("testCreateFileOverExistingFileNoOverwrite", useBuilder);
byte[] data = dataset(256, 'a', 'z'); byte[] data = dataset(256, 'a', 'z');
writeDataset(getFileSystem(), path, data, data.length, 1024, false); writeDataset(getFileSystem(), path, data, data.length, 1024, false);
byte[] data2 = dataset(10 * 1024, 'A', 'Z'); byte[] data2 = dataset(10 * 1024, 'A', 'Z');
try { try {
writeDataset(getFileSystem(), path, data2, data2.length, 1024, false); writeDataset(getFileSystem(), path, data2, data2.length, 1024, false,
useBuilder);
fail("writing without overwrite unexpectedly succeeded"); fail("writing without overwrite unexpectedly succeeded");
} catch (FileAlreadyExistsException expected) { } catch (FileAlreadyExistsException expected) {
//expected //expected
@ -76,6 +89,26 @@ public abstract class AbstractContractCreateTest extends
} }
} }
@Test
public void testCreateFileOverExistingFileNoOverwrite() throws Throwable {
testCreateFileOverExistingFileNoOverwrite(false);
testCreateFileOverExistingFileNoOverwrite(true);
}
private void testOverwriteExistingFile(boolean useBuilder) throws Throwable {
describe("Overwrite an existing file and verify the new data is there, " +
"use builder API=" + useBuilder);
Path path = path("testOverwriteExistingFile", useBuilder);
byte[] data = dataset(256, 'a', 'z');
writeDataset(getFileSystem(), path, data, data.length, 1024, false,
useBuilder);
ContractTestUtils.verifyFileContents(getFileSystem(), path, data);
byte[] data2 = dataset(10 * 1024, 'A', 'Z');
writeDataset(getFileSystem(), path, data2, data2.length, 1024, true,
useBuilder);
ContractTestUtils.verifyFileContents(getFileSystem(), path, data2);
}
/** /**
* This test catches some eventual consistency problems that blobstores exhibit, * This test catches some eventual consistency problems that blobstores exhibit,
* as we are implicitly verifying that updates are consistent. This * as we are implicitly verifying that updates are consistent. This
@ -84,25 +117,21 @@ public abstract class AbstractContractCreateTest extends
*/ */
@Test @Test
public void testOverwriteExistingFile() throws Throwable { public void testOverwriteExistingFile() throws Throwable {
describe("Overwrite an existing file and verify the new data is there"); testOverwriteExistingFile(false);
Path path = path("testOverwriteExistingFile"); testOverwriteExistingFile(true);
byte[] data = dataset(256, 'a', 'z');
writeDataset(getFileSystem(), path, data, data.length, 1024, false);
ContractTestUtils.verifyFileContents(getFileSystem(), path, data);
byte[] data2 = dataset(10 * 1024, 'A', 'Z');
writeDataset(getFileSystem(), path, data2, data2.length, 1024, true);
ContractTestUtils.verifyFileContents(getFileSystem(), path, data2);
} }
@Test private void testOverwriteEmptyDirectory(boolean useBuilder)
public void testOverwriteEmptyDirectory() throws Throwable { throws Throwable {
describe("verify trying to create a file over an empty dir fails"); describe("verify trying to create a file over an empty dir fails, " +
"use builder API=" + useBuilder);
Path path = path("testOverwriteEmptyDirectory"); Path path = path("testOverwriteEmptyDirectory");
mkdirs(path); mkdirs(path);
assertIsDirectory(path); assertIsDirectory(path);
byte[] data = dataset(256, 'a', 'z'); byte[] data = dataset(256, 'a', 'z');
try { try {
writeDataset(getFileSystem(), path, data, data.length, 1024, true); writeDataset(getFileSystem(), path, data, data.length, 1024, true,
useBuilder);
assertIsDirectory(path); assertIsDirectory(path);
fail("write of file over empty dir succeeded"); fail("write of file over empty dir succeeded");
} catch (FileAlreadyExistsException expected) { } catch (FileAlreadyExistsException expected) {
@ -121,8 +150,15 @@ public abstract class AbstractContractCreateTest extends
} }
@Test @Test
public void testOverwriteNonEmptyDirectory() throws Throwable { public void testOverwriteEmptyDirectory() throws Throwable {
describe("verify trying to create a file over a non-empty dir fails"); testOverwriteEmptyDirectory(false);
testOverwriteEmptyDirectory(true);
}
private void testOverwriteNonEmptyDirectory(boolean useBuilder)
throws Throwable {
describe("verify trying to create a file over a non-empty dir fails, " +
"use builder API=" + useBuilder);
Path path = path("testOverwriteNonEmptyDirectory"); Path path = path("testOverwriteNonEmptyDirectory");
mkdirs(path); mkdirs(path);
try { try {
@ -140,7 +176,7 @@ public abstract class AbstractContractCreateTest extends
byte[] data = dataset(256, 'a', 'z'); byte[] data = dataset(256, 'a', 'z');
try { try {
writeDataset(getFileSystem(), path, data, data.length, 1024, writeDataset(getFileSystem(), path, data, data.length, 1024,
true); true, useBuilder);
FileStatus status = getFileSystem().getFileStatus(path); FileStatus status = getFileSystem().getFileStatus(path);
boolean isDir = status.isDirectory(); boolean isDir = status.isDirectory();
@ -166,6 +202,12 @@ public abstract class AbstractContractCreateTest extends
assertIsFile(child); assertIsFile(child);
} }
@Test
public void testOverwriteNonEmptyDirectory() throws Throwable {
testOverwriteNonEmptyDirectory(false);
testOverwriteNonEmptyDirectory(true);
}
@Test @Test
public void testCreatedFileIsImmediatelyVisible() throws Throwable { public void testCreatedFileIsImmediatelyVisible() throws Throwable {
describe("verify that a newly created file exists as soon as open returns"); describe("verify that a newly created file exists as soon as open returns");

View File

@ -146,16 +146,45 @@ public class ContractTestUtils extends Assert {
int len, int len,
int buffersize, int buffersize,
boolean overwrite) throws IOException { boolean overwrite) throws IOException {
writeDataset(fs, path, src, len, buffersize, overwrite, false);
}
/**
* Write a file.
* Optional flags control
* whether file overwrite operations should be enabled
* Optional using {@link org.apache.hadoop.fs.FSDataOutputStreamBuilder}
*
* @param fs filesystem
* @param path path to write to
* @param len length of data
* @param overwrite should the create option allow overwrites?
* @param useBuilder should use builder API to create file?
* @throws IOException IO problems
*/
public static void writeDataset(FileSystem fs, Path path, byte[] src,
int len, int buffersize, boolean overwrite, boolean useBuilder)
throws IOException {
assertTrue( assertTrue(
"Not enough data in source array to write " + len + " bytes", "Not enough data in source array to write " + len + " bytes",
src.length >= len); src.length >= len);
FSDataOutputStream out = fs.create(path, FSDataOutputStream out;
if (useBuilder) {
out = fs.createFile(path)
.overwrite(overwrite)
.replication((short) 1)
.bufferSize(buffersize)
.blockSize(buffersize)
.build();
} else {
out = fs.create(path,
overwrite, overwrite,
fs.getConf() fs.getConf()
.getInt(IO_FILE_BUFFER_SIZE_KEY, .getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT), IO_FILE_BUFFER_SIZE_DEFAULT),
(short) 1, (short) 1,
buffersize); buffersize);
}
out.write(src, 0, len); out.write(src, 0, len);
out.close(); out.close();
assertFileHasLength(fs, path, len); assertFileHasLength(fs, path, len);

View File

@ -2742,7 +2742,8 @@ public class DistributedFileSystem extends FileSystem {
*/ */
@Override @Override
public FSDataOutputStream build() throws IOException { public FSDataOutputStream build() throws IOException {
if (getFlags().contains(CreateFlag.CREATE)) { if (getFlags().contains(CreateFlag.CREATE) ||
getFlags().contains(CreateFlag.OVERWRITE)) {
if (isRecursive()) { if (isRecursive()) {
return dfs.create(getPath(), getPermission(), getFlags(), return dfs.create(getPath(), getPermission(), getFlags(),
getBufferSize(), getReplication(), getBlockSize(), getBufferSize(), getReplication(), getBlockSize(),