HDFS-7410. Support CreateFlags with append() to support hsync() for appending streams (Vinayakumar B via Colin P. McCabe)
(cherry picked from commit 61df1b27a797efd094328c7d9141b9e157e01bf4) (cherry picked from commit 2a750c9aa68ae4f1b1b3ade082ec9362e1f1c5d2)
This commit is contained in:
parent
c808d5fe55
commit
9411cf9b65
@ -456,6 +456,9 @@ Release 2.7.0 - UNRELEASED
|
||||
HDFS-7976. Update NFS user guide for mount option "sync" to minimize or
|
||||
avoid reordered writes. (brandonli)
|
||||
|
||||
HDFS-7410. Support CreateFlags with append() to support hsync() for
|
||||
appending streams (Vinayakumar B via Colin P. McCabe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
||||
|
@ -1818,10 +1818,10 @@ private DFSOutputStream callAppend(String src, int buffersize,
|
||||
try {
|
||||
LastBlockWithStatus blkWithStatus = namenode.append(src, clientName,
|
||||
new EnumSetWritable<>(flag, CreateFlag.class));
|
||||
return DFSOutputStream.newStreamForAppend(this, src,
|
||||
flag.contains(CreateFlag.NEW_BLOCK),
|
||||
buffersize, progress, blkWithStatus.getLastBlock(),
|
||||
blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(), favoredNodes);
|
||||
return DFSOutputStream.newStreamForAppend(this, src, flag, buffersize,
|
||||
progress, blkWithStatus.getLastBlock(),
|
||||
blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(),
|
||||
favoredNodes);
|
||||
} catch(RemoteException re) {
|
||||
throw re.unwrapRemoteException(AccessControlException.class,
|
||||
FileNotFoundException.class,
|
||||
|
@ -1661,11 +1661,14 @@ static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
|
||||
}
|
||||
|
||||
/** Construct a new output stream for append. */
|
||||
private DFSOutputStream(DFSClient dfsClient, String src, boolean toNewBlock,
|
||||
Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat,
|
||||
DataChecksum checksum) throws IOException {
|
||||
private DFSOutputStream(DFSClient dfsClient, String src,
|
||||
EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
|
||||
HdfsFileStatus stat, DataChecksum checksum) throws IOException {
|
||||
this(dfsClient, src, progress, stat, checksum);
|
||||
initialFileSize = stat.getLen(); // length of file when opened
|
||||
this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK);
|
||||
|
||||
boolean toNewBlock = flags.contains(CreateFlag.NEW_BLOCK);
|
||||
|
||||
// The last partial block of the file has to be filled.
|
||||
if (!toNewBlock && lastBlock != null) {
|
||||
@ -1682,13 +1685,13 @@ private DFSOutputStream(DFSClient dfsClient, String src, boolean toNewBlock,
|
||||
}
|
||||
|
||||
static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
|
||||
boolean toNewBlock, int bufferSize, Progressable progress,
|
||||
EnumSet<CreateFlag> flags, int bufferSize, Progressable progress,
|
||||
LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum,
|
||||
String[] favoredNodes) throws IOException {
|
||||
TraceScope scope =
|
||||
dfsClient.getPathTraceScope("newStreamForAppend", src);
|
||||
try {
|
||||
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, toNewBlock,
|
||||
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags,
|
||||
progress, lastBlock, stat, checksum);
|
||||
if (favoredNodes != null && favoredNodes.length != 0) {
|
||||
out.streamer.setFavoredNodes(favoredNodes);
|
||||
|
@ -20,6 +20,7 @@
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Random;
|
||||
|
||||
@ -30,6 +31,7 @@
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.AppendTestUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.io.RandomDatum;
|
||||
@ -51,15 +53,30 @@ private void checkSyncMetric(MiniDFSCluster cluster, long value) {
|
||||
/** Test basic hsync cases */
|
||||
@Test
|
||||
public void testHSync() throws Exception {
|
||||
testHSyncOperation(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHSyncWithAppend() throws Exception {
|
||||
testHSyncOperation(true);
|
||||
}
|
||||
|
||||
private void testHSyncOperation(boolean testWithAppend) throws IOException {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
final FileSystem fs = cluster.getFileSystem();
|
||||
final DistributedFileSystem fs = cluster.getFileSystem();
|
||||
|
||||
final Path p = new Path("/testHSync/foo");
|
||||
final int len = 1 << 16;
|
||||
FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
|
||||
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
|
||||
4096, (short) 1, len, null);
|
||||
if (testWithAppend) {
|
||||
// re-open the file with append call
|
||||
out.close();
|
||||
out = fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.SYNC_BLOCK),
|
||||
4096, null);
|
||||
}
|
||||
out.hflush();
|
||||
// hflush does not sync
|
||||
checkSyncMetric(cluster, 0);
|
||||
|
Loading…
x
Reference in New Issue
Block a user