HDFS-7410. Support CreateFlags with append() to support hsync() for appending streams (Vinayakumar B via Colin P. McCabe)
This commit is contained in:
parent
87130bf6b2
commit
61df1b27a7
|
@ -806,6 +806,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
HDFS-7976. Update NFS user guide for mount option "sync" to minimize or
|
HDFS-7976. Update NFS user guide for mount option "sync" to minimize or
|
||||||
avoid reordered writes. (brandonli)
|
avoid reordered writes. (brandonli)
|
||||||
|
|
||||||
|
HDFS-7410. Support CreateFlags with append() to support hsync() for
|
||||||
|
appending streams (Vinayakumar B via Colin P. McCabe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
|
||||||
|
|
|
@ -1827,10 +1827,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
||||||
try {
|
try {
|
||||||
LastBlockWithStatus blkWithStatus = namenode.append(src, clientName,
|
LastBlockWithStatus blkWithStatus = namenode.append(src, clientName,
|
||||||
new EnumSetWritable<>(flag, CreateFlag.class));
|
new EnumSetWritable<>(flag, CreateFlag.class));
|
||||||
return DFSOutputStream.newStreamForAppend(this, src,
|
return DFSOutputStream.newStreamForAppend(this, src, flag, buffersize,
|
||||||
flag.contains(CreateFlag.NEW_BLOCK),
|
progress, blkWithStatus.getLastBlock(),
|
||||||
buffersize, progress, blkWithStatus.getLastBlock(),
|
blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(),
|
||||||
blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(), favoredNodes);
|
favoredNodes);
|
||||||
} catch(RemoteException re) {
|
} catch(RemoteException re) {
|
||||||
throw re.unwrapRemoteException(AccessControlException.class,
|
throw re.unwrapRemoteException(AccessControlException.class,
|
||||||
FileNotFoundException.class,
|
FileNotFoundException.class,
|
||||||
|
|
|
@ -278,11 +278,14 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Construct a new output stream for append. */
|
/** Construct a new output stream for append. */
|
||||||
private DFSOutputStream(DFSClient dfsClient, String src, boolean toNewBlock,
|
private DFSOutputStream(DFSClient dfsClient, String src,
|
||||||
Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat,
|
EnumSet<CreateFlag> flags, Progressable progress, LocatedBlock lastBlock,
|
||||||
DataChecksum checksum) throws IOException {
|
HdfsFileStatus stat, DataChecksum checksum) throws IOException {
|
||||||
this(dfsClient, src, progress, stat, checksum);
|
this(dfsClient, src, progress, stat, checksum);
|
||||||
initialFileSize = stat.getLen(); // length of file when opened
|
initialFileSize = stat.getLen(); // length of file when opened
|
||||||
|
this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK);
|
||||||
|
|
||||||
|
boolean toNewBlock = flags.contains(CreateFlag.NEW_BLOCK);
|
||||||
|
|
||||||
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
|
this.fileEncryptionInfo = stat.getFileEncryptionInfo();
|
||||||
|
|
||||||
|
@ -338,13 +341,13 @@ public class DFSOutputStream extends FSOutputSummer
|
||||||
}
|
}
|
||||||
|
|
||||||
static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
|
static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src,
|
||||||
boolean toNewBlock, int bufferSize, Progressable progress,
|
EnumSet<CreateFlag> flags, int bufferSize, Progressable progress,
|
||||||
LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum,
|
LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum,
|
||||||
String[] favoredNodes) throws IOException {
|
String[] favoredNodes) throws IOException {
|
||||||
TraceScope scope =
|
TraceScope scope =
|
||||||
dfsClient.getPathTraceScope("newStreamForAppend", src);
|
dfsClient.getPathTraceScope("newStreamForAppend", src);
|
||||||
try {
|
try {
|
||||||
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, toNewBlock,
|
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags,
|
||||||
progress, lastBlock, stat, checksum);
|
progress, lastBlock, stat, checksum);
|
||||||
if (favoredNodes != null && favoredNodes.length != 0) {
|
if (favoredNodes != null && favoredNodes.length != 0) {
|
||||||
out.streamer.setFavoredNodes(favoredNodes);
|
out.streamer.setFavoredNodes(favoredNodes);
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
|
@ -30,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.AppendTestUtil;
|
import org.apache.hadoop.hdfs.AppendTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.io.RandomDatum;
|
import org.apache.hadoop.io.RandomDatum;
|
||||||
|
@ -51,15 +53,30 @@ public class TestHSync {
|
||||||
/** Test basic hsync cases */
|
/** Test basic hsync cases */
|
||||||
@Test
|
@Test
|
||||||
public void testHSync() throws Exception {
|
public void testHSync() throws Exception {
|
||||||
|
testHSyncOperation(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHSyncWithAppend() throws Exception {
|
||||||
|
testHSyncOperation(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testHSyncOperation(boolean testWithAppend) throws IOException {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
final FileSystem fs = cluster.getFileSystem();
|
final DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
|
||||||
final Path p = new Path("/testHSync/foo");
|
final Path p = new Path("/testHSync/foo");
|
||||||
final int len = 1 << 16;
|
final int len = 1 << 16;
|
||||||
FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
|
FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
|
||||||
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
|
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
|
||||||
4096, (short) 1, len, null);
|
4096, (short) 1, len, null);
|
||||||
|
if (testWithAppend) {
|
||||||
|
// re-open the file with append call
|
||||||
|
out.close();
|
||||||
|
out = fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.SYNC_BLOCK),
|
||||||
|
4096, null);
|
||||||
|
}
|
||||||
out.hflush();
|
out.hflush();
|
||||||
// hflush does not sync
|
// hflush does not sync
|
||||||
checkSyncMetric(cluster, 0);
|
checkSyncMetric(cluster, 0);
|
||||||
|
|
Loading…
Reference in New Issue