diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 948f7defe06..b2766bd3b0c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -558,6 +558,9 @@ Release 2.0.3-alpha - Unreleased HDFS-1331. dfs -test should work like /bin/test (Andy Isaacson via daryn) + HDFS-3979. For hsync, datanode should wait for the local sync to complete + before sending ack. (Lars Hofhansl via szetszwo) + Release 2.0.2-alpha - 2012-09-07 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 0f1ccb94351..6995fd2d4ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -319,9 +319,6 @@ class BlockReceiver implements Closeable { * @throws IOException */ void flushOrSync(boolean isSync) throws IOException { - if (isSync && (out != null || checksumOut != null)) { - datanode.metrics.incrFsyncCount(); - } long flushTotalNanos = 0; if (checksumOut != null) { long flushStartNanos = System.nanoTime(); @@ -347,6 +344,9 @@ class BlockReceiver implements Closeable { } if (checksumOut != null || out != null) { datanode.metrics.addFlushNanos(flushTotalNanos); + if (isSync) { + datanode.metrics.incrFsyncCount(); + } } } @@ -438,8 +438,10 @@ class BlockReceiver implements Closeable { int len = header.getDataLen(); boolean syncBlock = header.getSyncBlock(); - // make sure the block gets sync'ed upon close - this.syncOnClose |= syncBlock && lastPacketInBlock; + // avoid double sync'ing on close + if (syncBlock && lastPacketInBlock) { + this.syncOnClose = false; + } // update received bytes long firstByteInBlock = offsetInBlock; @@ -448,11 +450,11 @@ class BlockReceiver implements Closeable { replicaInfo.setNumBytes(offsetInBlock); } - // put in queue for pending acks - if (responder != null) { - ((PacketResponder)responder.getRunnable()).enqueue(seqno, - lastPacketInBlock, offsetInBlock); - } + // put in queue for pending acks, unless sync was requested + if (responder != null && !syncBlock) { + ((PacketResponder) responder.getRunnable()).enqueue(seqno, + lastPacketInBlock, offsetInBlock); + } //First write the packet to the mirror: if (mirrorOut != null && !mirrorError) { @@ -471,8 +473,8 @@ class BlockReceiver implements Closeable { if(LOG.isDebugEnabled()) { LOG.debug("Receiving an empty packet or the end of the block " + block); } - // flush unless close() would flush anyway - if (syncBlock && !lastPacketInBlock) { + // sync block if requested + if (syncBlock) { flushOrSync(true); } } else { @@ -563,8 +565,8 @@ class BlockReceiver implements Closeable { checksumBuf.arrayOffset() + checksumBuf.position(), checksumLen); } - /// flush entire packet, sync unless close() will sync - flushOrSync(syncBlock && !lastPacketInBlock); + /// flush entire packet, sync if requested + flushOrSync(syncBlock); replicaInfo.setLastChecksumAndDataLen( offsetInBlock, lastChunkChecksum @@ -580,6 +582,13 @@ class BlockReceiver implements Closeable { } } + // if sync was requested, put in queue for pending acks here + // (after the fsync finished) + if (responder != null && syncBlock) { + ((PacketResponder) responder.getRunnable()).enqueue(seqno, + lastPacketInBlock, offsetInBlock); + } + if (throttler != null) { // throttle I/O throttler.throttle(len); }