svn merge -c 1344419 from trunk for HDFS-744. Support hsync in HDFS.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1346050 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-06-04 17:43:51 +00:00
parent 3dc10a25d9
commit 0f1a956cec
11 changed files with 327 additions and 39 deletions

View File

@ -6,6 +6,8 @@ Release 2.0.1-alpha - UNRELEASED
NEW FEATURES
HDFS-744. Support hsync in HDFS. (Lars Hofhansl via szetszwo)
IMPROVEMENTS
HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG

View File

@ -129,11 +129,13 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
private long initialFileSize = 0; // at time of file open
private Progressable progress;
private final short blockReplication; // replication factor of file
private boolean shouldSyncBlock = false; // force blocks to disk upon close
private class Packet {
long seqno; // sequencenumber of buffer in block
long offsetInBlock; // offset in block
boolean lastPacketInBlock; // is this the last packet in block?
private boolean lastPacketInBlock; // is this the last packet in block?
boolean syncBlock; // this packet forces the current block to disk
int numChunks; // number of chunks currently in packet
int maxChunks; // max chunks in packet
@ -245,7 +247,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
buffer.mark();
PacketHeader header = new PacketHeader(
pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen);
pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
header.putInBuffer(buffer);
buffer.reset();
@ -1249,6 +1251,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
long blockSize, Progressable progress, int buffersize,
DataChecksum checksum) throws IOException {
this(dfsClient, src, blockSize, progress, checksum, replication);
this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
computePacketChunkSize(dfsClient.getConf().writePacketSize,
checksum.getBytesPerChecksum());
@ -1431,6 +1434,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0,
bytesCurBlock);
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
waitAndQueueCurrentPacket();
bytesCurBlock = 0;
lastFlushOffset = 0;
@ -1456,6 +1460,24 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
*/
@Override
public void hflush() throws IOException {
flushOrSync(false);
}
/**
* The expected semantics is all data have flushed out to all replicas
* and all replicas have done posix fsync equivalent - ie the OS has
* flushed it to the disk device (but the disk may have it in its cache).
*
* Note that only the current block is flushed to the disk device.
* To guarantee durable sync across block boundaries the stream should
* be created with {@link CreateFlag#SYNC_BLOCK}.
*/
@Override
public void hsync() throws IOException {
flushOrSync(true);
}
private void flushOrSync(boolean isSync) throws IOException {
dfsClient.checkOpen();
isClosed();
try {
@ -1483,7 +1505,13 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
assert bytesCurBlock > lastFlushOffset;
// record the valid offset of this flush
lastFlushOffset = bytesCurBlock;
waitAndQueueCurrentPacket();
if (isSync && currentPacket == null) {
// Nothing to send right now,
// but sync was requested.
// Send an empty packet
currentPacket = new Packet(packetSize, chunksPerPacket,
bytesCurBlock);
}
} else {
// We already flushed up to this offset.
// This means that we haven't written anything since the last flush
@ -1493,9 +1521,22 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
assert oldCurrentPacket == null :
"Empty flush should not occur with a currentPacket";
if (isSync && bytesCurBlock > 0) {
// Nothing to send right now,
// and the block was partially written,
// and sync was requested.
// So send an empty sync packet.
currentPacket = new Packet(packetSize, chunksPerPacket,
bytesCurBlock);
} else {
// just discard the current packet since it is already been sent.
currentPacket = null;
}
}
if (currentPacket != null) {
currentPacket.syncBlock = isSync;
waitAndQueueCurrentPacket();
}
// Restore state of stream. Record the last flush offset
// of the last full chunk that was flushed.
//
@ -1545,18 +1586,6 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
}
}
/**
* The expected semantics is all data have flushed out to all replicas
* and all replicas have done posix fsync equivalent - ie the OS has
* flushed it to the disk device (but the disk may have it in its cache).
*
* Right now by default it is implemented as hflush
*/
@Override
public synchronized void hsync() throws IOException {
hflush();
}
/**
* @deprecated use {@link HdfsDataOutputStream#getCurrentBlockReplication()}.
*/
@ -1681,6 +1710,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
currentPacket = new Packet(PacketHeader.PKT_HEADER_LEN, 0,
bytesCurBlock);
currentPacket.lastPacketInBlock = true;
currentPacket.syncBlock = shouldSyncBlock;
}
flushInternal(); // flush all data to Datanodes

View File

@ -225,10 +225,17 @@ public class DistributedFileSystem extends FileSystem {
public HdfsDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
return create(f, permission,
overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
: EnumSet.of(CreateFlag.CREATE), bufferSize, replication,
blockSize, progress);
}
@Override
public HdfsDataOutputStream create(Path f, FsPermission permission,
EnumSet<CreateFlag> cflags, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
final EnumSet<CreateFlag> cflags = overwrite?
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
: EnumSet.of(CreateFlag.CREATE);
final DFSOutputStream out = dfs.create(getPathName(f), permission, cflags,
replication, blockSize, progress, bufferSize);
return new HdfsDataOutputStream(out, statistics);
@ -249,6 +256,7 @@ public class DistributedFileSystem extends FileSystem {
/**
* Same as create(), except fails if parent directory doesn't already exist.
*/
@Override
public HdfsDataOutputStream createNonRecursive(Path f, FsPermission permission,
EnumSet<CreateFlag> flag, int bufferSize, short replication,
long blockSize, Progressable progress) throws IOException {

View File

@ -40,6 +40,7 @@ public class PacketHeader {
.setSeqno(0)
.setLastPacketInBlock(false)
.setDataLen(0)
.setSyncBlock(false)
.build().getSerializedSize();
public static final int PKT_HEADER_LEN =
6 + PROTO_SIZE;
@ -51,13 +52,14 @@ public class PacketHeader {
}
public PacketHeader(int packetLen, long offsetInBlock, long seqno,
boolean lastPacketInBlock, int dataLen) {
boolean lastPacketInBlock, int dataLen, boolean syncBlock) {
this.packetLen = packetLen;
proto = PacketHeaderProto.newBuilder()
.setOffsetInBlock(offsetInBlock)
.setSeqno(seqno)
.setLastPacketInBlock(lastPacketInBlock)
.setDataLen(dataLen)
.setSyncBlock(syncBlock)
.build();
}
@ -81,6 +83,10 @@ public class PacketHeader {
return packetLen;
}
public boolean getSyncBlock() {
return proto.getSyncBlock();
}
@Override
public String toString() {
return "PacketHeader with packetLen=" + packetLen +

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -110,6 +111,8 @@ class BlockReceiver implements Closeable {
private final BlockConstructionStage stage;
private final boolean isTransfer;
private boolean syncOnClose;
BlockReceiver(final ExtendedBlock block, final DataInputStream in,
final String inAddr, final String myAddr,
final BlockConstructionStage stage,
@ -245,14 +248,18 @@ class BlockReceiver implements Closeable {
* close files.
*/
public void close() throws IOException {
IOException ioe = null;
if (syncOnClose && (out != null || checksumOut != null)) {
datanode.metrics.incrFsyncCount();
}
// close checksum file
try {
if (checksumOut != null) {
checksumOut.flush();
if (datanode.getDnConf().syncOnClose && (cout instanceof FileOutputStream)) {
if (syncOnClose && (cout instanceof FileOutputStream)) {
long start = Util.now();
((FileOutputStream)cout).getChannel().force(true);
datanode.metrics.addFsync(Util.now() - start);
}
checksumOut.close();
checksumOut = null;
@ -267,8 +274,10 @@ class BlockReceiver implements Closeable {
try {
if (out != null) {
out.flush();
if (datanode.getDnConf().syncOnClose && (out instanceof FileOutputStream)) {
if (syncOnClose && (out instanceof FileOutputStream)) {
long start = Util.now();
((FileOutputStream)out).getChannel().force(true);
datanode.metrics.addFsync(Util.now() - start);
}
out.close();
out = null;
@ -290,12 +299,25 @@ class BlockReceiver implements Closeable {
* Flush block data and metadata files to disk.
* @throws IOException
*/
void flush() throws IOException {
void flushOrSync(boolean isSync) throws IOException {
if (isSync && (out != null || checksumOut != null)) {
datanode.metrics.incrFsyncCount();
}
if (checksumOut != null) {
checksumOut.flush();
if (isSync && (cout instanceof FileOutputStream)) {
long start = Util.now();
((FileOutputStream)cout).getChannel().force(true);
datanode.metrics.addFsync(Util.now() - start);
}
}
if (out != null) {
out.flush();
if (isSync && (out instanceof FileOutputStream)) {
long start = Util.now();
((FileOutputStream)out).getChannel().force(true);
datanode.metrics.addFsync(Util.now() - start);
}
}
}
@ -533,7 +555,9 @@ class BlockReceiver implements Closeable {
header.getOffsetInBlock(),
header.getSeqno(),
header.isLastPacketInBlock(),
header.getDataLen(), endOfHeader);
header.getDataLen(),
header.getSyncBlock(),
endOfHeader);
}
/**
@ -549,14 +573,18 @@ class BlockReceiver implements Closeable {
* returns the number of data bytes that the packet has.
*/
private int receivePacket(long offsetInBlock, long seqno,
boolean lastPacketInBlock, int len, int endOfHeader) throws IOException {
boolean lastPacketInBlock, int len, boolean syncBlock,
int endOfHeader) throws IOException {
if (LOG.isDebugEnabled()){
LOG.debug("Receiving one packet for block " + block +
" of length " + len +
" seqno " + seqno +
" offsetInBlock " + offsetInBlock +
" syncBlock " + syncBlock +
" lastPacketInBlock " + lastPacketInBlock);
}
// make sure the block gets sync'ed upon close
this.syncOnClose |= syncBlock && lastPacketInBlock;
// update received bytes
long firstByteInBlock = offsetInBlock;
@ -587,6 +615,10 @@ class BlockReceiver implements Closeable {
if(LOG.isDebugEnabled()) {
LOG.debug("Receiving an empty packet or the end of the block " + block);
}
// flush unless close() would flush anyway
if (syncBlock && !lastPacketInBlock) {
flushOrSync(true);
}
} else {
int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
checksumSize;
@ -677,8 +709,8 @@ class BlockReceiver implements Closeable {
);
checksumOut.write(pktBuf, checksumOff, checksumLen);
}
/// flush entire packet
flush();
/// flush entire packet, sync unless close() will sync
flushOrSync(syncBlock && !lastPacketInBlock);
replicaInfo.setLastChecksumAndDataLen(
offsetInBlock, lastChunkChecksum
@ -730,6 +762,7 @@ class BlockReceiver implements Closeable {
String mirrAddr, DataTransferThrottler throttlerArg,
DatanodeInfo[] downstreams) throws IOException {
syncOnClose = datanode.getDnConf().syncOnClose;
boolean responderClosed = false;
mirrorOut = mirrOut;
mirrorAddr = mirrAddr;
@ -768,7 +801,7 @@ class BlockReceiver implements Closeable {
datanode.data.convertTemporaryToRbw(block);
} else {
// for isDatnode or TRANSFER_FINALIZED
// Finalize the block. Does this fsync()?
// Finalize the block.
datanode.data.finalizeBlock(block);
}
datanode.metrics.incrBlocksWritten();

View File

@ -701,8 +701,9 @@ class BlockSender implements java.io.Closeable {
*/
private void writePacketHeader(ByteBuffer pkt, int dataLen, int packetLen) {
pkt.clear();
// both syncBlock and syncPacket are false
PacketHeader header = new PacketHeader(packetLen, offset, seqno,
(dataLen == 0), dataLen);
(dataLen == 0), dataLen, false);
header.putInBuffer(pkt);
}

View File

@ -62,6 +62,8 @@ public class DataNodeMetrics {
@Metric MutableCounterLong writesFromRemoteClient;
@Metric MutableCounterLong blocksGetLocalPathInfo;
@Metric MutableCounterLong fsyncCount;
@Metric MutableCounterLong volumeFailures;
@Metric MutableRate readBlockOp;
@ -72,6 +74,8 @@ public class DataNodeMetrics {
@Metric MutableRate heartbeats;
@Metric MutableRate blockReports;
@Metric MutableRate fsync;
final MetricsRegistry registry = new MetricsRegistry("datanode");
final String name;
@ -151,6 +155,14 @@ public class DataNodeMetrics {
blocksRead.incr();
}
public void incrFsyncCount() {
fsyncCount.incr();
}
public void addFsync(long latency) {
fsync.add(latency);
}
public void shutdown() {
DefaultMetricsSystem.shutdown();
}

View File

@ -113,6 +113,7 @@ message PacketHeaderProto {
required sfixed64 seqno = 2;
required bool lastPacketInBlock = 3;
required sfixed32 dataLen = 4;
optional bool syncBlock = 5 [default = false];
}
enum Status {

View File

@ -139,7 +139,7 @@ public class AppendTestUtil {
/**
* create a buffer that contains the entire test file data.
*/
static byte[] initBuffer(int size) {
public static byte[] initBuffer(int size) {
if (seed == -1)
seed = nextLong();
return randomBytes(seed, size);

View File

@ -159,7 +159,8 @@ public class TestDataTransferProtocol extends TestCase {
block.getNumBytes(), // OffsetInBlock
100, // sequencenumber
true, // lastPacketInBlock
0); // chunk length
0, // chunk length
false); // sync block
hdr.write(sendOut);
sendOut.writeInt(0); // zero checksum
@ -402,7 +403,8 @@ public class TestDataTransferProtocol extends TestCase {
0, // offset in block,
100, // seqno
false, // last packet
-1 - random.nextInt(oneMil)); // bad datalen
-1 - random.nextInt(oneMil), // bad datalen
false);
hdr.write(sendOut);
sendResponse(Status.SUCCESS, "", null, recvOut);
@ -424,7 +426,8 @@ public class TestDataTransferProtocol extends TestCase {
0, // OffsetInBlock
100, // sequencenumber
true, // lastPacketInBlock
0); // chunk length
0, // chunk length
false);
hdr.write(sendOut);
sendOut.writeInt(0); // zero checksum
sendOut.flush();
@ -508,8 +511,8 @@ public class TestDataTransferProtocol extends TestCase {
1024, // OffsetInBlock
100, // sequencenumber
false, // lastPacketInBlock
4096); // chunk length
4096, // chunk length
false);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
hdr.write(new DataOutputStream(baos));

View File

@ -0,0 +1,192 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.test.MetricsAsserts.*;
import java.util.EnumSet;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.AppendTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.RandomDatum;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.junit.Test;
public class TestHSync {
private void checkSyncMetric(MiniDFSCluster cluster, int dn, long value) {
DataNode datanode = cluster.getDataNodes().get(dn);
assertCounter("FsyncCount", value, getMetrics(datanode.getMetrics().name()));
}
private void checkSyncMetric(MiniDFSCluster cluster, long value) {
checkSyncMetric(cluster, 0, value);
}
/** Test basic hsync cases */
@Test
public void testHSync() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
final FileSystem fs = cluster.getFileSystem();
final Path p = new Path("/testHSync/foo");
final int len = 1 << 16;
FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
4096, (short) 1, len, null);
out.hflush();
// hflush does not sync
checkSyncMetric(cluster, 0);
out.hsync();
// hsync on empty file does nothing
checkSyncMetric(cluster, 0);
out.write(1);
checkSyncMetric(cluster, 0);
out.hsync();
checkSyncMetric(cluster, 1);
// avoiding repeated hsyncs is a potential future optimization
out.hsync();
checkSyncMetric(cluster, 2);
out.hflush();
// hflush still does not sync
checkSyncMetric(cluster, 2);
out.close();
// close is sync'ing
checkSyncMetric(cluster, 3);
// same with a file created with out SYNC_BLOCK
out = fs.create(p, FsPermission.getDefault(),
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE),
4096, (short) 1, len, null);
out.hsync();
checkSyncMetric(cluster, 3);
out.write(1);
checkSyncMetric(cluster, 3);
out.hsync();
checkSyncMetric(cluster, 4);
// repeated hsyncs
out.hsync();
checkSyncMetric(cluster, 5);
out.close();
// close does not sync (not opened with SYNC_BLOCK)
checkSyncMetric(cluster, 5);
cluster.shutdown();
}
/** Test hsync on an exact block boundary */
@Test
public void testHSyncBlockBoundary() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
final FileSystem fs = cluster.getFileSystem();
final Path p = new Path("/testHSyncBlockBoundary/foo");
final int len = 1 << 16;
final byte[] fileContents = AppendTestUtil.initBuffer(len);
FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
4096, (short) 1, len, null);
// fill exactly one block (tests the SYNC_BLOCK case) and flush
out.write(fileContents, 0, len);
out.hflush();
// the full block should have caused a sync
checkSyncMetric(cluster, 1);
out.hsync();
// first on block again
checkSyncMetric(cluster, 1);
// write one more byte and sync again
out.write(1);
out.hsync();
checkSyncMetric(cluster, 2);
out.close();
checkSyncMetric(cluster, 3);
cluster.shutdown();
}
/** Test hsync via SequenceFiles */
@Test
public void testSequenceFileSync() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
final FileSystem fs = cluster.getFileSystem();
final Path p = new Path("/testSequenceFileSync/foo");
final int len = 1 << 16;
FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
4096, (short) 1, len, null);
Writer w = SequenceFile.createWriter(new Configuration(),
Writer.stream(out),
Writer.keyClass(RandomDatum.class),
Writer.valueClass(RandomDatum.class),
Writer.compression(CompressionType.NONE, new DefaultCodec()));
w.hflush();
checkSyncMetric(cluster, 0);
w.hsync();
checkSyncMetric(cluster, 1);
int seed = new Random().nextInt();
RandomDatum.Generator generator = new RandomDatum.Generator(seed);
generator.next();
w.append(generator.getKey(), generator.getValue());
w.hsync();
checkSyncMetric(cluster, 2);
w.close();
checkSyncMetric(cluster, 2);
out.close();
checkSyncMetric(cluster, 3);
cluster.shutdown();
}
/** Test that syncBlock is correctly performed at replicas */
@Test
public void testHSyncWithReplication() throws Exception {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
final FileSystem fs = cluster.getFileSystem();
final Path p = new Path("/testHSyncWithReplication/foo");
final int len = 1 << 16;
FSDataOutputStream out = fs.create(p, FsPermission.getDefault(),
EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE, CreateFlag.SYNC_BLOCK),
4096, (short) 3, len, null);
out.write(1);
out.hflush();
checkSyncMetric(cluster, 0, 0);
checkSyncMetric(cluster, 1, 0);
checkSyncMetric(cluster, 2, 0);
out.hsync();
checkSyncMetric(cluster, 0, 1);
checkSyncMetric(cluster, 1, 1);
checkSyncMetric(cluster, 2, 1);
out.hsync();
checkSyncMetric(cluster, 0, 2);
checkSyncMetric(cluster, 1, 2);
checkSyncMetric(cluster, 2, 2);
cluster.shutdown();
}
}