From 2a750c9aa68ae4f1b1b3ade082ec9362e1f1c5d2 Mon Sep 17 00:00:00 2001 From: Colin Patrick Mccabe Date: Thu, 26 Mar 2015 13:21:09 -0700 Subject: [PATCH] HDFS-7410. Support CreateFlags with append() to support hsync() for appending streams (Vinayakumar B via Colin P. McCabe) (cherry picked from commit 61df1b27a797efd094328c7d9141b9e157e01bf4) --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +++ .../org/apache/hadoop/hdfs/DFSClient.java | 8 ++++---- .../apache/hadoop/hdfs/DFSOutputStream.java | 13 ++++++++----- .../hdfs/server/datanode/TestHSync.java | 19 ++++++++++++++++++- 4 files changed, 33 insertions(+), 10 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index bd7c63b0680..743a5c2b935 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -497,6 +497,9 @@ Release 2.7.0 - UNRELEASED HDFS-7976. Update NFS user guide for mount option "sync" to minimize or avoid reordered writes. (brandonli) + HDFS-7410. Support CreateFlags with append() to support hsync() for + appending streams (Vinayakumar B via Colin P. McCabe) + OPTIMIZATIONS HDFS-7454. Reduce memory footprint for AclEntries in NameNode. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 5f8fa5cf0b1..f95acaf2fed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -1829,10 +1829,10 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, try { LastBlockWithStatus blkWithStatus = namenode.append(src, clientName, new EnumSetWritable<>(flag, CreateFlag.class)); - return DFSOutputStream.newStreamForAppend(this, src, - flag.contains(CreateFlag.NEW_BLOCK), - buffersize, progress, blkWithStatus.getLastBlock(), - blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(), favoredNodes); + return DFSOutputStream.newStreamForAppend(this, src, flag, buffersize, + progress, blkWithStatus.getLastBlock(), + blkWithStatus.getFileStatus(), dfsClientConf.createChecksum(), + favoredNodes); } catch(RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileNotFoundException.class, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 1c1df91ea07..3aec8acb1ff 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -278,11 +278,14 @@ public class DFSOutputStream extends FSOutputSummer } /** Construct a new output stream for append. */ - private DFSOutputStream(DFSClient dfsClient, String src, boolean toNewBlock, - Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat, - DataChecksum checksum) throws IOException { + private DFSOutputStream(DFSClient dfsClient, String src, + EnumSet flags, Progressable progress, LocatedBlock lastBlock, + HdfsFileStatus stat, DataChecksum checksum) throws IOException { this(dfsClient, src, progress, stat, checksum); initialFileSize = stat.getLen(); // length of file when opened + this.shouldSyncBlock = flags.contains(CreateFlag.SYNC_BLOCK); + + boolean toNewBlock = flags.contains(CreateFlag.NEW_BLOCK); this.fileEncryptionInfo = stat.getFileEncryptionInfo(); @@ -338,13 +341,13 @@ public class DFSOutputStream extends FSOutputSummer } static DFSOutputStream newStreamForAppend(DFSClient dfsClient, String src, - boolean toNewBlock, int bufferSize, Progressable progress, + EnumSet flags, int bufferSize, Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes) throws IOException { TraceScope scope = dfsClient.getPathTraceScope("newStreamForAppend", src); try { - final DFSOutputStream out = new DFSOutputStream(dfsClient, src, toNewBlock, + final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags, progress, lastBlock, stat, checksum); if (favoredNodes != null && favoredNodes.length != 0) { out.streamer.setFavoredNodes(favoredNodes); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestHSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestHSync.java index b293075ddd3..10f371bc3ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestHSync.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestHSync.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import java.io.IOException; import java.util.EnumSet; import java.util.Random; @@ -30,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.AppendTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.RandomDatum; @@ -51,15 +53,30 @@ public class TestHSync { /** Test basic hsync cases */ @Test public void testHSync() throws Exception { + testHSyncOperation(false); + } + + @Test + public void testHSyncWithAppend() throws Exception { + testHSyncOperation(true); + } + + private void testHSyncOperation(boolean testWithAppend) throws IOException { Configuration conf = new HdfsConfiguration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); - final FileSystem fs = cluster.getFileSystem(); + final DistributedFileSystem fs = cluster.getFileSystem(); final Path p = new Path("/testHSync/foo"); final int len = 1 << 16; FSDataOutputStream out = fs.create(p, FsPermission.getDefault(), EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK), 4096, (short) 1, len, null); + if (testWithAppend) { + // re-open the file with append call + out.close(); + out = fs.append(p, EnumSet.of(CreateFlag.APPEND, CreateFlag.SYNC_BLOCK), + 4096, null); + } out.hflush(); // hflush does not sync checkSyncMetric(cluster, 0);