diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java index 4373124caab..252f37b3dc6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java @@ -44,6 +44,9 @@ import org.apache.hadoop.classification.InterfaceStability; * else append to an existing file. *
  • CREATE|OVERWRITE - to create a file if it does not exist, * else overwrite an existing file.
  • + *
  • SYNC_BLOCK - to force closed blocks to the disk device. + * In addition {@link Syncable#hsync()} should be called after each write, + * if true synchronous behavior is required.
  • * * * Following combination is not valid and will result in @@ -71,7 +74,12 @@ public enum CreateFlag { /** * Append to a file. See javadoc for more description. */ - APPEND((short) 0x04); + APPEND((short) 0x04), + + /** + * Force closed blocks to disk. Similar to POSIX O_SYNC. See javadoc for description. + */ + SYNC_BLOCK((short) 0x08); private final short mode; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index fe908beddd5..14111305336 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -830,6 +830,30 @@ public abstract class FileSystem extends Configured implements Closeable { long blockSize, Progressable progress) throws IOException; + /** + * Create an FSDataOutputStream at the indicated Path with write-progress + * reporting. + * @param f the file name to open + * @param permission + * @param flags {@link CreateFlag}s to use for this stream. + * @param bufferSize the size of the buffer to be used. + * @param replication required block replication for the file. + * @param blockSize + * @param progress + * @throws IOException + * @see #setPermission(Path, FsPermission) + */ + public FSDataOutputStream create(Path f, + FsPermission permission, + EnumSet flags, + int bufferSize, + short replication, + long blockSize, + Progressable progress) throws IOException { + // only DFS support this + return create(f, permission, flags.contains(CreateFlag.OVERWRITE), bufferSize, replication, blockSize, progress); + } + /*. * This create has been added to support the FileContext that processes @@ -954,10 +978,35 @@ public abstract class FileSystem extends Configured implements Closeable { public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { - throw new IOException("createNonRecursive unsupported for this filesystem " - + this.getClass()); + return createNonRecursive(f, permission, + overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) + : EnumSet.of(CreateFlag.CREATE), bufferSize, + replication, blockSize, progress); } + /** + * Opens an FSDataOutputStream at the indicated Path with write-progress + * reporting. Same as create(), except fails if parent directory doesn't + * already exist. + * @param f the file name to open + * @param permission + * @param flags {@link CreateFlag}s to use for this stream. + * @param bufferSize the size of the buffer to be used. + * @param replication required block replication for the file. + * @param blockSize + * @param progress + * @throws IOException + * @see #setPermission(Path, FsPermission) + * @deprecated API only for 0.20-append + */ + @Deprecated + public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, + EnumSet flags, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + throw new IOException("createNonRecursive unsupported for this filesystem " + + this.getClass()); + } + /** * Creates the given Path as a brand-new zero-length file. If * create fails, or if it already existed, return false. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java index 834aadc5fd6..f5b24ac22cf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java @@ -807,7 +807,7 @@ public class SequenceFile { } /** Write key/value pairs to a sequence-format file. */ - public static class Writer implements java.io.Closeable { + public static class Writer implements java.io.Closeable, Syncable { private Configuration conf; FSDataOutputStream out; boolean ownOutputStream = true; @@ -1193,13 +1193,31 @@ public class SequenceFile { } } - /** flush all currently written data to the file system */ + /** + * flush all currently written data to the file system + * @deprecated Use {@link #hsync()} or {@link #hflush()} instead + */ + @Deprecated public void syncFs() throws IOException { if (out != null) { out.sync(); // flush contents to file system } } + @Override + public void hsync() throws IOException { + if (out != null) { + out.hsync(); + } + } + + @Override + public void hflush() throws IOException { + if (out != null) { + out.hflush(); + } + } + /** Returns the configuration of this file. */ Configuration getConf() { return conf; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java index aeb926004b1..647a583d103 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestFilterFileSystem.java @@ -74,6 +74,11 @@ public class TestFilterFileSystem { Progressable progress) throws IOException { return null; } + public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, + EnumSet flags, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + return null; + } public boolean mkdirs(Path f) { return false; } public FSDataInputStream open(Path f) { return null; } public FSDataOutputStream create(Path f) { return null; } @@ -123,6 +128,15 @@ public class TestFilterFileSystem { Progressable progress) { return null; } + public FSDataOutputStream create(Path f, + FsPermission permission, + EnumSet flags, + int bufferSize, + short replication, + long blockSize, + Progressable progress) throws IOException { + return null; + } public String getName() { return null; } public boolean delete(Path f) { return false; } public short getReplication(Path src) { return 0 ; }