svn merge -c 1344419 from trunk for HDFS-744. Support hsync in HDFS.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1346056 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-06-04 17:54:48 +00:00
parent 0f1a956cec
commit a3bbdd6449
4 changed files with 94 additions and 5 deletions

View File

@ -44,6 +44,9 @@ import org.apache.hadoop.classification.InterfaceStability;
* else append to an existing file.</li> * else append to an existing file.</li>
* <li> CREATE|OVERWRITE - to create a file if it does not exist, * <li> CREATE|OVERWRITE - to create a file if it does not exist,
* else overwrite an existing file.</li> * else overwrite an existing file.</li>
* <li> SYNC_BLOCK - to force closed blocks to the disk device.
* In addition {@link Syncable#hsync()} should be called after each write,
* if true synchronous behavior is required.</li>
* </ol> * </ol>
* *
* Following combination is not valid and will result in * Following combination is not valid and will result in
@ -71,7 +74,12 @@ public enum CreateFlag {
/** /**
* Append to a file. See javadoc for more description. * Append to a file. See javadoc for more description.
*/ */
APPEND((short) 0x04); APPEND((short) 0x04),
/**
* Force closed blocks to disk. Similar to POSIX O_SYNC. See javadoc for description.
*/
SYNC_BLOCK((short) 0x08);
private final short mode; private final short mode;

View File

@ -830,6 +830,30 @@ public abstract class FileSystem extends Configured implements Closeable {
long blockSize, long blockSize,
Progressable progress) throws IOException; Progressable progress) throws IOException;
/**
* Create an FSDataOutputStream at the indicated Path with write-progress
* reporting.
* @param f the file name to open
* @param permission
* @param flags {@link CreateFlag}s to use for this stream.
* @param bufferSize the size of the buffer to be used.
* @param replication required block replication for the file.
* @param blockSize
* @param progress
* @throws IOException
* @see #setPermission(Path, FsPermission)
*/
public FSDataOutputStream create(Path f,
FsPermission permission,
EnumSet<CreateFlag> flags,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException {
// only DFS support this
return create(f, permission, flags.contains(CreateFlag.OVERWRITE), bufferSize, replication, blockSize, progress);
}
/*. /*.
* This create has been added to support the FileContext that processes * This create has been added to support the FileContext that processes
@ -954,10 +978,35 @@ public abstract class FileSystem extends Configured implements Closeable {
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize, boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException { Progressable progress) throws IOException {
throw new IOException("createNonRecursive unsupported for this filesystem " return createNonRecursive(f, permission,
+ this.getClass()); overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
: EnumSet.of(CreateFlag.CREATE), bufferSize,
replication, blockSize, progress);
} }
/**
* Opens an FSDataOutputStream at the indicated Path with write-progress
* reporting. Same as create(), except fails if parent directory doesn't
* already exist.
* @param f the file name to open
* @param permission
* @param flags {@link CreateFlag}s to use for this stream.
* @param bufferSize the size of the buffer to be used.
* @param replication required block replication for the file.
* @param blockSize
* @param progress
* @throws IOException
* @see #setPermission(Path, FsPermission)
* @deprecated API only for 0.20-append
*/
@Deprecated
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
throw new IOException("createNonRecursive unsupported for this filesystem "
+ this.getClass());
}
/** /**
* Creates the given Path as a brand-new zero-length file. If * Creates the given Path as a brand-new zero-length file. If
* create fails, or if it already existed, return false. * create fails, or if it already existed, return false.

View File

@ -807,7 +807,7 @@ public class SequenceFile {
} }
/** Write key/value pairs to a sequence-format file. */ /** Write key/value pairs to a sequence-format file. */
public static class Writer implements java.io.Closeable { public static class Writer implements java.io.Closeable, Syncable {
private Configuration conf; private Configuration conf;
FSDataOutputStream out; FSDataOutputStream out;
boolean ownOutputStream = true; boolean ownOutputStream = true;
@ -1193,13 +1193,31 @@ public class SequenceFile {
} }
} }
/** flush all currently written data to the file system */ /**
* flush all currently written data to the file system
* @deprecated Use {@link #hsync()} or {@link #hflush()} instead
*/
@Deprecated
public void syncFs() throws IOException { public void syncFs() throws IOException {
if (out != null) { if (out != null) {
out.sync(); // flush contents to file system out.sync(); // flush contents to file system
} }
} }
@Override
public void hsync() throws IOException {
if (out != null) {
out.hsync();
}
}
@Override
public void hflush() throws IOException {
if (out != null) {
out.hflush();
}
}
/** Returns the configuration of this file. */ /** Returns the configuration of this file. */
Configuration getConf() { return conf; } Configuration getConf() { return conf; }

View File

@ -74,6 +74,11 @@ public class TestFilterFileSystem {
Progressable progress) throws IOException { Progressable progress) throws IOException {
return null; return null;
} }
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
EnumSet<CreateFlag> flags, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
return null;
}
public boolean mkdirs(Path f) { return false; } public boolean mkdirs(Path f) { return false; }
public FSDataInputStream open(Path f) { return null; } public FSDataInputStream open(Path f) { return null; }
public FSDataOutputStream create(Path f) { return null; } public FSDataOutputStream create(Path f) { return null; }
@ -123,6 +128,15 @@ public class TestFilterFileSystem {
Progressable progress) { Progressable progress) {
return null; return null;
} }
public FSDataOutputStream create(Path f,
FsPermission permission,
EnumSet<CreateFlag> flags,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException {
return null;
}
public String getName() { return null; } public String getName() { return null; }
public boolean delete(Path f) { return false; } public boolean delete(Path f) { return false; }
public short getReplication(Path src) { return 0 ; } public short getReplication(Path src) { return 0 ; }