diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7ba185c7521..4078bec5607 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -6,6 +6,8 @@ Release 2.0.1-alpha - UNRELEASED NEW FEATURES + HDFS-744. Support hsync in HDFS. (Lars Hofhansl via szetszwo) + IMPROVEMENTS HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index e5e79dc7ced..a89e71281c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -129,11 +129,13 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { private long initialFileSize = 0; // at time of file open private Progressable progress; private final short blockReplication; // replication factor of file + private boolean shouldSyncBlock = false; // force blocks to disk upon close private class Packet { long seqno; // sequencenumber of buffer in block long offsetInBlock; // offset in block - boolean lastPacketInBlock; // is this the last packet in block? + private boolean lastPacketInBlock; // is this the last packet in block? + boolean syncBlock; // this packet forces the current block to disk int numChunks; // number of chunks currently in packet int maxChunks; // max chunks in packet @@ -245,7 +247,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { buffer.mark(); PacketHeader header = new PacketHeader( - pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen); + pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock); header.putInBuffer(buffer); buffer.reset(); @@ -1249,6 +1251,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { long blockSize, Progressable progress, int buffersize, DataChecksum checksum) throws IOException { this(dfsClient, src, blockSize, progress, checksum, replication); + this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK); computePacketChunkSize(dfsClient.getConf().writePacketSize, checksum.getBytesPerChecksum()); @@ -1431,6 +1434,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, bytesCurBlock); currentPacket.lastPacketInBlock = true; + currentPacket.syncBlock = shouldSyncBlock; waitAndQueueCurrentPacket(); bytesCurBlock = 0; lastFlushOffset = 0; @@ -1456,6 +1460,24 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { */ @Override public void hflush() throws IOException { + flushOrSync(false); + } + + /** + * The expected semantics is all data have flushed out to all replicas + * and all replicas have done posix fsync equivalent - ie the OS has + * flushed it to the disk device (but the disk may have it in its cache). + * + * Note that only the current block is flushed to the disk device. + * To guarantee durable sync across block boundaries the stream should + * be created with {@link CreateFlag#SYNC_BLOCK}. + */ + @Override + public void hsync() throws IOException { + flushOrSync(true); + } + + private void flushOrSync(boolean isSync) throws IOException { dfsClient.checkOpen(); isClosed(); try { @@ -1483,7 +1505,13 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { assert bytesCurBlock > lastFlushOffset; // record the valid offset of this flush lastFlushOffset = bytesCurBlock; - waitAndQueueCurrentPacket(); + if (isSync && currentPacket == null) { + // Nothing to send right now, + // but sync was requested. + // Send an empty packet + currentPacket = new Packet(packetSize, chunksPerPacket, + bytesCurBlock); + } } else { // We already flushed up to this offset. // This means that we haven't written anything since the last flush @@ -1493,8 +1521,21 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { assert oldCurrentPacket == null : "Empty flush should not occur with a currentPacket"; - // just discard the current packet since it is already been sent. - currentPacket = null; + if (isSync && bytesCurBlock > 0) { + // Nothing to send right now, + // and the block was partially written, + // and sync was requested. + // So send an empty sync packet. + currentPacket = new Packet(packetSize, chunksPerPacket, + bytesCurBlock); + } else { + // just discard the current packet since it is already been sent. + currentPacket = null; + } + } + if (currentPacket != null) { + currentPacket.syncBlock = isSync; + waitAndQueueCurrentPacket(); } // Restore state of stream. Record the last flush offset // of the last full chunk that was flushed. @@ -1545,18 +1586,6 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { } } - /** - * The expected semantics is all data have flushed out to all replicas - * and all replicas have done posix fsync equivalent - ie the OS has - * flushed it to the disk device (but the disk may have it in its cache). - * - * Right now by default it is implemented as hflush - */ - @Override - public synchronized void hsync() throws IOException { - hflush(); - } - /** * @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}. */ @@ -1681,6 +1710,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable { currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0, bytesCurBlock); currentPacket.lastPacketInBlock = true; + currentPacket.syncBlock = shouldSyncBlock; } flushInternal(); // flush all data to Datanodes diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 43e2b8c7b64..c302d8b888b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -223,12 +223,19 @@ public class DistributedFileSystem extends FileSystem { @Override public HdfsDataOutputStream create(Path f, FsPermission permission, - boolean overwrite, int bufferSize, short replication, long blockSize, + boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + return create(f, permission, + overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) + : EnumSet.of(CreateFlag.CREATE), bufferSize, replication, + blockSize, progress); + } + + @Override + public HdfsDataOutputStream create(Path f, FsPermission permission, + EnumSet cflags, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { statistics.incrementWriteOps(1); - final EnumSet cflags = overwrite? - EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE) - : EnumSet.of(CreateFlag.CREATE); final DFSOutputStream out = dfs.create(getPathName(f), permission, cflags, replication, blockSize, progress, bufferSize); return new HdfsDataOutputStream(out, statistics); @@ -249,6 +256,7 @@ public class DistributedFileSystem extends FileSystem { /** * Same as create(), except fails if parent directory doesn't already exist. */ + @Override public HdfsDataOutputStream createNonRecursive(Path f, FsPermission permission, EnumSet flag, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java index d8b9f2b6206..083e2b0b91a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java @@ -40,6 +40,7 @@ public class PacketHeader { .setSeqno(0) .setLastPacketInBlock(false) .setDataLen(0) + .setSyncBlock(false) .build().getSerializedSize(); public static final int PKT_HEADER_LEN = 6 + PROTO_SIZE; @@ -51,13 +52,14 @@ public class PacketHeader { } public PacketHeader(int packetLen, long offsetInBlock, long seqno, - boolean lastPacketInBlock, int dataLen) { + boolean lastPacketInBlock, int dataLen, boolean syncBlock) { this.packetLen = packetLen; proto = PacketHeaderProto.newBuilder() .setOffsetInBlock(offsetInBlock) .setSeqno(seqno) .setLastPacketInBlock(lastPacketInBlock) .setDataLen(dataLen) + .setSyncBlock(syncBlock) .build(); } @@ -81,6 +83,10 @@ public class PacketHeader { return packetLen; } + public boolean getSyncBlock() { + return proto.getSyncBlock(); + } + @Override public String toString() { return "PacketHeader with packetLen=" + packetLen + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 72591e018ee..f0f7c780a71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; @@ -110,6 +111,8 @@ class BlockReceiver implements Closeable { private final BlockConstructionStage stage; private final boolean isTransfer; + private boolean syncOnClose; + BlockReceiver(final ExtendedBlock block, final DataInputStream in, final String inAddr, final String myAddr, final BlockConstructionStage stage, @@ -245,14 +248,18 @@ class BlockReceiver implements Closeable { * close files. */ public void close() throws IOException { - IOException ioe = null; + if (syncOnClose && (out != null || checksumOut != null)) { + datanode.metrics.incrFsyncCount(); + } // close checksum file try { if (checksumOut != null) { checksumOut.flush(); - if (datanode.getDnConf().syncOnClose && (cout instanceof FileOutputStream)) { + if (syncOnClose && (cout instanceof FileOutputStream)) { + long start = Util.now(); ((FileOutputStream)cout).getChannel().force(true); + datanode.metrics.addFsync(Util.now() - start); } checksumOut.close(); checksumOut = null; @@ -267,8 +274,10 @@ class BlockReceiver implements Closeable { try { if (out != null) { out.flush(); - if (datanode.getDnConf().syncOnClose && (out instanceof FileOutputStream)) { + if (syncOnClose && (out instanceof FileOutputStream)) { + long start = Util.now(); ((FileOutputStream)out).getChannel().force(true); + datanode.metrics.addFsync(Util.now() - start); } out.close(); out = null; @@ -290,12 +299,25 @@ class BlockReceiver implements Closeable { * Flush block data and metadata files to disk. * @throws IOException */ - void flush() throws IOException { + void flushOrSync(boolean isSync) throws IOException { + if (isSync && (out != null || checksumOut != null)) { + datanode.metrics.incrFsyncCount(); + } if (checksumOut != null) { checksumOut.flush(); + if (isSync && (cout instanceof FileOutputStream)) { + long start = Util.now(); + ((FileOutputStream)cout).getChannel().force(true); + datanode.metrics.addFsync(Util.now() - start); + } } if (out != null) { out.flush(); + if (isSync && (out instanceof FileOutputStream)) { + long start = Util.now(); + ((FileOutputStream)out).getChannel().force(true); + datanode.metrics.addFsync(Util.now() - start); + } } } @@ -533,7 +555,9 @@ class BlockReceiver implements Closeable { header.getOffsetInBlock(), header.getSeqno(), header.isLastPacketInBlock(), - header.getDataLen(), endOfHeader); + header.getDataLen(), + header.getSyncBlock(), + endOfHeader); } /** @@ -549,15 +573,19 @@ class BlockReceiver implements Closeable { * returns the number of data bytes that the packet has. */ private int receivePacket(long offsetInBlock, long seqno, - boolean lastPacketInBlock, int len, int endOfHeader) throws IOException { + boolean lastPacketInBlock, int len, boolean syncBlock, + int endOfHeader) throws IOException { if (LOG.isDebugEnabled()){ LOG.debug("Receiving one packet for block " + block + " of length " + len + " seqno " + seqno + " offsetInBlock " + offsetInBlock + + " syncBlock " + syncBlock + " lastPacketInBlock " + lastPacketInBlock); } - + // make sure the block gets sync'ed upon close + this.syncOnClose |= syncBlock && lastPacketInBlock; + // update received bytes long firstByteInBlock = offsetInBlock; offsetInBlock += len; @@ -587,6 +615,10 @@ class BlockReceiver implements Closeable { if(LOG.isDebugEnabled()) { LOG.debug("Receiving an empty packet or the end of the block " + block); } + // flush unless close() would flush anyway + if (syncBlock && !lastPacketInBlock) { + flushOrSync(true); + } } else { int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)* checksumSize; @@ -677,8 +709,8 @@ class BlockReceiver implements Closeable { ); checksumOut.write(pktBuf, checksumOff, checksumLen); } - /// flush entire packet - flush(); + /// flush entire packet, sync unless close() will sync + flushOrSync(syncBlock && !lastPacketInBlock); replicaInfo.setLastChecksumAndDataLen( offsetInBlock, lastChunkChecksum @@ -730,6 +762,7 @@ class BlockReceiver implements Closeable { String mirrAddr, DataTransferThrottler throttlerArg, DatanodeInfo[] downstreams) throws IOException { + syncOnClose = datanode.getDnConf().syncOnClose; boolean responderClosed = false; mirrorOut = mirrOut; mirrorAddr = mirrAddr; @@ -768,7 +801,7 @@ class BlockReceiver implements Closeable { datanode.data.convertTemporaryToRbw(block); } else { // for isDatnode or TRANSFER_FINALIZED - // Finalize the block. Does this fsync()? + // Finalize the block. datanode.data.finalizeBlock(block); } datanode.metrics.incrBlocksWritten(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 6a830dbbf98..12ee56ece7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -701,8 +701,9 @@ class BlockSender implements java.io.Closeable { */ private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) { pkt.clear(); + // both syncBlock and syncPacket are false PacketHeader header = new PacketHeader(packetLen, offset, seqno, - (dataLen == 0), dataLen); + (dataLen == 0), dataLen, false); header.putInBuffer(pkt); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index 9e180078101..a849cdad2cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -61,6 +61,8 @@ public class DataNodeMetrics { @Metric MutableCounterLong writesFromLocalClient; @Metric MutableCounterLong writesFromRemoteClient; @Metric MutableCounterLong blocksGetLocalPathInfo; + + @Metric MutableCounterLong fsyncCount; @Metric MutableCounterLong volumeFailures; @@ -72,6 +74,8 @@ public class DataNodeMetrics { @Metric MutableRate heartbeats; @Metric MutableRate blockReports; + @Metric MutableRate fsync; + final MetricsRegistry registry = new MetricsRegistry("datanode"); final String name; @@ -151,6 +155,14 @@ public class DataNodeMetrics { blocksRead.incr(); } + public void incrFsyncCount() { + fsyncCount.incr(); + } + + public void addFsync(long latency) { + fsync.add(latency); + } + public void shutdown() { DefaultMetricsSystem.shutdown(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto index 316c05cea98..d64f78051ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto @@ -113,6 +113,7 @@ message PacketHeaderProto { required sfixed64 seqno = 2; required bool lastPacketInBlock = 3; required sfixed32 dataLen = 4; + optional bool syncBlock = 5 [default = false]; } enum Status { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java index 07cb4564919..a08c7b57998 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AppendTestUtil.java @@ -139,7 +139,7 @@ public class AppendTestUtil { /** * create a buffer that contains the entire test file data. */ - static byte[] initBuffer(int size) { + public static byte[] initBuffer(int size) { if (seed == -1) seed = nextLong(); return randomBytes(seed, size); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java index 3ef892b4f19..aed15d819d4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java @@ -159,7 +159,8 @@ public class TestDataTransferProtocol extends TestCase { block.getNumBytes(), // OffsetInBlock 100, // sequencenumber true, // lastPacketInBlock - 0); // chunk length + 0, // chunk length + false); // sync block hdr.write(sendOut); sendOut.writeInt(0); // zero checksum @@ -402,7 +403,8 @@ public class TestDataTransferProtocol extends TestCase { 0, // offset in block, 100, // seqno false, // last packet - -1 - random.nextInt(oneMil)); // bad datalen + -1 - random.nextInt(oneMil), // bad datalen + false); hdr.write(sendOut); sendResponse(Status.SUCCESS, "", null, recvOut); @@ -424,7 +426,8 @@ public class TestDataTransferProtocol extends TestCase { 0, // OffsetInBlock 100, // sequencenumber true, // lastPacketInBlock - 0); // chunk length + 0, // chunk length + false); hdr.write(sendOut); sendOut.writeInt(0); // zero checksum sendOut.flush(); @@ -508,8 +511,8 @@ public class TestDataTransferProtocol extends TestCase { 1024, // OffsetInBlock 100, // sequencenumber false, // lastPacketInBlock - 4096); // chunk length - + 4096, // chunk length + false); ByteArrayOutputStream baos = new ByteArrayOutputStream(); hdr.write(new DataOutputStream(baos)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestHSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestHSync.java new file mode 100644 index 00000000000..cf2e4483de1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestHSync.java @@ -0,0 +1,192 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.apache.hadoop.test.MetricsAsserts.*; + +import java.util.EnumSet; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.AppendTestUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.RandomDatum; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.SequenceFile.CompressionType; +import org.apache.hadoop.io.SequenceFile.Writer; +import org.apache.hadoop.io.compress.DefaultCodec; +import org.junit.Test; + +public class TestHSync { + + private void checkSyncMetric(MiniDFSCluster cluster, int dn, long value) { + DataNode datanode = cluster.getDataNodes().get(dn); + assertCounter("FsyncCount", value, getMetrics(datanode.getMetrics().name())); + } + private void checkSyncMetric(MiniDFSCluster cluster, long value) { + checkSyncMetric(cluster, 0, value); + } + /** Test basic hsync cases */ + @Test + public void testHSync() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + final FileSystem fs = cluster.getFileSystem(); + + final Path p = new Path("/testHSync/foo"); + final int len = 1 << 16; + FSDataOutputStream out = fs.create(p, FsPermission.getDefault(), + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK), + 4096, (short) 1, len, null); + out.hflush(); + // hflush does not sync + checkSyncMetric(cluster, 0); + out.hsync(); + // hsync on empty file does nothing + checkSyncMetric(cluster, 0); + out.write(1); + checkSyncMetric(cluster, 0); + out.hsync(); + checkSyncMetric(cluster, 1); + // avoiding repeated hsyncs is a potential future optimization + out.hsync(); + checkSyncMetric(cluster, 2); + out.hflush(); + // hflush still does not sync + checkSyncMetric(cluster, 2); + out.close(); + // close is sync'ing + checkSyncMetric(cluster, 3); + + // same with a file created with out SYNC_BLOCK + out = fs.create(p, FsPermission.getDefault(), + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), + 4096, (short) 1, len, null); + out.hsync(); + checkSyncMetric(cluster, 3); + out.write(1); + checkSyncMetric(cluster, 3); + out.hsync(); + checkSyncMetric(cluster, 4); + // repeated hsyncs + out.hsync(); + checkSyncMetric(cluster, 5); + out.close(); + // close does not sync (not opened with SYNC_BLOCK) + checkSyncMetric(cluster, 5); + cluster.shutdown(); + } + + /** Test hsync on an exact block boundary */ + @Test + public void testHSyncBlockBoundary() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + final FileSystem fs = cluster.getFileSystem(); + + final Path p = new Path("/testHSyncBlockBoundary/foo"); + final int len = 1 << 16; + final byte[] fileContents = AppendTestUtil.initBuffer(len); + FSDataOutputStream out = fs.create(p, FsPermission.getDefault(), + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK), + 4096, (short) 1, len, null); + // fill exactly one block (tests the SYNC_BLOCK case) and flush + out.write(fileContents, 0, len); + out.hflush(); + // the full block should have caused a sync + checkSyncMetric(cluster, 1); + out.hsync(); + // first on block again + checkSyncMetric(cluster, 1); + // write one more byte and sync again + out.write(1); + out.hsync(); + checkSyncMetric(cluster, 2); + out.close(); + checkSyncMetric(cluster, 3); + cluster.shutdown(); + } + + /** Test hsync via SequenceFiles */ + @Test + public void testSequenceFileSync() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); + + final FileSystem fs = cluster.getFileSystem(); + final Path p = new Path("/testSequenceFileSync/foo"); + final int len = 1 << 16; + FSDataOutputStream out = fs.create(p, FsPermission.getDefault(), + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK), + 4096, (short) 1, len, null); + Writer w = SequenceFile.createWriter(new Configuration(), + Writer.stream(out), + Writer.keyClass(RandomDatum.class), + Writer.valueClass(RandomDatum.class), + Writer.compression(CompressionType.NONE, new DefaultCodec())); + w.hflush(); + checkSyncMetric(cluster, 0); + w.hsync(); + checkSyncMetric(cluster, 1); + int seed = new Random().nextInt(); + RandomDatum.Generator generator = new RandomDatum.Generator(seed); + generator.next(); + w.append(generator.getKey(), generator.getValue()); + w.hsync(); + checkSyncMetric(cluster, 2); + w.close(); + checkSyncMetric(cluster, 2); + out.close(); + checkSyncMetric(cluster, 3); + cluster.shutdown(); + } + + /** Test that syncBlock is correctly performed at replicas */ + @Test + public void testHSyncWithReplication() throws Exception { + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + final FileSystem fs = cluster.getFileSystem(); + + final Path p = new Path("/testHSyncWithReplication/foo"); + final int len = 1 << 16; + FSDataOutputStream out = fs.create(p, FsPermission.getDefault(), + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK), + 4096, (short) 3, len, null); + out.write(1); + out.hflush(); + checkSyncMetric(cluster, 0, 0); + checkSyncMetric(cluster, 1, 0); + checkSyncMetric(cluster, 2, 0); + out.hsync(); + checkSyncMetric(cluster, 0, 1); + checkSyncMetric(cluster, 1, 1); + checkSyncMetric(cluster, 2, 1); + out.hsync(); + checkSyncMetric(cluster, 0, 2); + checkSyncMetric(cluster, 1, 2); + checkSyncMetric(cluster, 2, 2); + cluster.shutdown(); + } +}