diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 6dbd6665d7c..b2766bd3b0c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -368,6 +368,8 @@ Release 2.0.3-alpha - Unreleased HDFS-4059. Add number of stale DataNodes to metrics. (Jing Zhao via suresh) + HDFS-4155. libhdfs implementation of hsync API (Liang Xie via todd) + IMPROVEMENTS HDFS-3925. Prettify PipelineAck#toString() for printing to a log @@ -556,6 +558,9 @@ Release 2.0.3-alpha - Unreleased HDFS-1331. dfs -test should work like /bin/test (Andy Isaacson via daryn) + HDFS-3979. For hsync, datanode should wait for the local sync to complete + before sending ack. (Lars Hofhansl via szetszwo) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 0f1ccb94351..6995fd2d4ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -319,9 +319,6 @@ class BlockReceiver implements Closeable { * @throws IOException */ void flushOrSync(boolean isSync) throws IOException { - if (isSync && (out != null || checksumOut != null)) { - datanode.metrics.incrFsyncCount(); - } long flushTotalNanos = 0; if (checksumOut != null) { long flushStartNanos = System.nanoTime(); @@ -347,6 +344,9 @@ class BlockReceiver implements Closeable { } if (checksumOut != null || out != null) { datanode.metrics.addFlushNanos(flushTotalNanos); + if (isSync) { + datanode.metrics.incrFsyncCount(); + } } } @@ -438,8 +438,10 @@ class BlockReceiver implements Closeable { int len = header.getDataLen(); boolean syncBlock = header.getSyncBlock(); - // make sure the block gets sync'ed upon close - this.syncOnClose |= syncBlock && lastPacketInBlock; + // avoid double sync'ing on close + if (syncBlock && lastPacketInBlock) { + this.syncOnClose = false; + } // update received bytes long firstByteInBlock = offsetInBlock; @@ -448,11 +450,11 @@ class BlockReceiver implements Closeable { replicaInfo.setNumBytes(offsetInBlock); } - // put in queue for pending acks - if (responder != null) { - ((PacketResponder)responder.getRunnable()).enqueue(seqno, - lastPacketInBlock, offsetInBlock); - } + // put in queue for pending acks, unless sync was requested + if (responder != null && !syncBlock) { + ((PacketResponder) responder.getRunnable()).enqueue(seqno, + lastPacketInBlock, offsetInBlock); + } //First write the packet to the mirror: if (mirrorOut != null && !mirrorError) { @@ -471,8 +473,8 @@ class BlockReceiver implements Closeable { if(LOG.isDebugEnabled()) { LOG.debug("Receiving an empty packet or the end of the block " + block); } - // flush unless close() would flush anyway - if (syncBlock && !lastPacketInBlock) { + // sync block if requested + if (syncBlock) { flushOrSync(true); } } else { @@ -563,8 +565,8 @@ class BlockReceiver implements Closeable { checksumBuf.arrayOffset() + checksumBuf.position(), checksumLen); } - /// flush entire packet, sync unless close() will sync - flushOrSync(syncBlock && !lastPacketInBlock); + /// flush entire packet, sync if requested + flushOrSync(syncBlock); replicaInfo.setLastChecksumAndDataLen( offsetInBlock, lastChunkChecksum @@ -580,6 +582,13 @@ class BlockReceiver implements Closeable { } } + // if sync was requested, put in queue for pending acks here + // (after the fsync finished) + if (responder != null && syncBlock) { + ((PacketResponder) responder.getRunnable()).enqueue(seqno, + lastPacketInBlock, offsetInBlock); + } + if (throttler != null) { // throttle I/O throttler.throttle(len); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c index a180dd24c73..ba980a7a539 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c @@ -1388,6 +1388,32 @@ int hdfsHFlush(hdfsFS fs, hdfsFile f) return 0; } +int hdfsHSync(hdfsFS fs, hdfsFile f) +{ + //Get the JNIEnv* corresponding to current thread + JNIEnv* env = getJNIEnv(); + if (env == NULL) { + errno = EINTERNAL; + return -1; + } + + //Sanity check + if (!f || f->type != OUTPUT) { + errno = EBADF; + return -1; + } + + jobject jOutputStream = f->file; + jthrowable jthr = invokeMethod(env, NULL, INSTANCE, jOutputStream, + HADOOP_OSTRM, "hsync", "()V"); + if (jthr) { + errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hdfsHSync: FSDataOutputStream#hsync"); + return -1; + } + return 0; +} + int hdfsAvailable(hdfsFS fs, hdfsFile f) { // JAVA EQUIVALENT diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h index fa71c8384c8..7973e0a5e3f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h @@ -393,6 +393,17 @@ extern "C" { int hdfsHFlush(hdfsFS fs, hdfsFile file); + /** + * hdfsHSync - Similar to posix fsync, Flush out the data in client's + * user buffer. all the way to the disk device (but the disk may have + * it in its cache). + * @param fs configured filesystem handle + * @param file file handle + * @return 0 on success, -1 on error and sets errno + */ + int hdfsHSync(hdfsFS fs, hdfsFile file); + + /** * hdfsAvailable - Number of bytes that can be read from this * input stream without blocking. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c index d9cb0d9648d..c56c89300f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test_libhdfs_threaded.c @@ -150,6 +150,7 @@ static int doTestHdfsOperations(struct tlhThreadInfo *ti, hdfsFS fs) return EIO; } EXPECT_ZERO(hdfsFlush(fs, file)); + EXPECT_ZERO(hdfsHSync(fs, file)); EXPECT_ZERO(hdfsCloseFile(fs, file)); /* Let's re-open the file for reading */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStartSecureDataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStartSecureDataNode.java index d1b2d668ded..ba5587276c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStartSecureDataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStartSecureDataNode.java @@ -15,7 +15,7 @@ * the License. */ -package org.apache.hadoop.hdfs.server.namenode; +package org.apache.hadoop.hdfs.server.datanode; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull;