HDFS-5868. Make hsync implementation pluggable. (Contributed by Buddy Taylor)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1569978 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2014-02-19 23:38:46 +00:00
parent 2f341414dd
commit 0369aff403
3 changed files with 34 additions and 12 deletions

View File

@ -421,6 +421,9 @@ Release 2.4.0 - UNRELEASED
HDFS-5318. Support read-only and read-write paths to shared replicas.
(Eric Sirianni via Arpit Agarwal)
HDFS-5868. Make hsync implementation pluggable on the DataNode.
(Buddy Taylor via Arpit Agarwal)
OPTIMIZATIONS
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery

View File

@ -78,7 +78,6 @@ class BlockReceiver implements Closeable {
private boolean needsChecksumTranslation;
private OutputStream out = null; // to block file at local disk
private FileDescriptor outFd;
private OutputStream cout = null; // output stream for cehcksum file
private DataOutputStream checksumOut = null; // to crc file at local disk
private int bytesPerChecksum;
private int checksumSize;
@ -223,9 +222,8 @@ class BlockReceiver implements Closeable {
LOG.warn("Could not get file descriptor for outputstream of class " +
out.getClass());
}
this.cout = streams.getChecksumOut();
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
cout, HdfsConstants.SMALL_BUFFER_SIZE));
streams.getChecksumOut(), HdfsConstants.SMALL_BUFFER_SIZE));
// write data chunk header if creating a new replica
if (isCreate) {
BlockMetadataHeader.writeHeader(checksumOut, diskChecksum);
@ -280,9 +278,9 @@ class BlockReceiver implements Closeable {
long flushStartNanos = System.nanoTime();
checksumOut.flush();
long flushEndNanos = System.nanoTime();
if (syncOnClose && (cout instanceof FileOutputStream)) {
if (syncOnClose) {
long fsyncStartNanos = flushEndNanos;
((FileOutputStream)cout).getChannel().force(true);
streams.syncChecksumOut();
datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
}
flushTotalNanos += flushEndNanos - flushStartNanos;
@ -302,9 +300,9 @@ class BlockReceiver implements Closeable {
long flushStartNanos = System.nanoTime();
out.flush();
long flushEndNanos = System.nanoTime();
if (syncOnClose && (out instanceof FileOutputStream)) {
if (syncOnClose) {
long fsyncStartNanos = flushEndNanos;
((FileOutputStream)out).getChannel().force(true);
streams.syncDataOut();
datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
}
flushTotalNanos += flushEndNanos - flushStartNanos;
@ -338,9 +336,9 @@ class BlockReceiver implements Closeable {
long flushStartNanos = System.nanoTime();
checksumOut.flush();
long flushEndNanos = System.nanoTime();
if (isSync && (cout instanceof FileOutputStream)) {
if (isSync) {
long fsyncStartNanos = flushEndNanos;
((FileOutputStream)cout).getChannel().force(true);
streams.syncChecksumOut();
datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
}
flushTotalNanos += flushEndNanos - flushStartNanos;
@ -349,9 +347,9 @@ class BlockReceiver implements Closeable {
long flushStartNanos = System.nanoTime();
out.flush();
long flushEndNanos = System.nanoTime();
if (isSync && (out instanceof FileOutputStream)) {
if (isSync) {
long fsyncStartNanos = flushEndNanos;
((FileOutputStream)out).getChannel().force(true);
streams.syncDataOut();
datanode.metrics.addFsyncNanos(System.nanoTime() - fsyncStartNanos);
}
flushTotalNanos += flushEndNanos - flushStartNanos;

View File

@ -18,7 +18,9 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.Closeable;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.io.IOException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
@ -62,4 +64,23 @@ public class ReplicaOutputStreams implements Closeable {
IOUtils.closeStream(dataOut);
IOUtils.closeStream(checksumOut);
}
/**
* Sync the data stream if it supports it.
*/
public void syncDataOut() throws IOException {
if (dataOut instanceof FileOutputStream) {
((FileOutputStream)dataOut).getChannel().force(true);
}
}
/**
* Sync the checksum stream if it supports it.
*/
public void syncChecksumOut() throws IOException {
if (checksumOut instanceof FileOutputStream) {
((FileOutputStream)checksumOut).getChannel().force(true);
}
}
}