HDFS-10930. Refactor: Wrap Datanode IO related operations. Contributed by Xiaoyu Yao.

This commit is contained in:
Xiaoyu Yao 2016-12-05 13:04:39 -08:00
parent 43cb1678cc
commit df983b524a
20 changed files with 470 additions and 273 deletions

View File

@ -24,10 +24,7 @@
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.EOFException; import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.io.Writer; import java.io.Writer;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -53,7 +50,6 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -88,8 +84,6 @@ class BlockReceiver implements Closeable {
* the DataNode needs to recalculate checksums before writing. * the DataNode needs to recalculate checksums before writing.
*/ */
private final boolean needsChecksumTranslation; private final boolean needsChecksumTranslation;
private OutputStream out = null; // to block file at local disk
private FileDescriptor outFd;
private DataOutputStream checksumOut = null; // to crc file at local disk private DataOutputStream checksumOut = null; // to crc file at local disk
private final int bytesPerChecksum; private final int bytesPerChecksum;
private final int checksumSize; private final int checksumSize;
@ -250,7 +244,8 @@ class BlockReceiver implements Closeable {
final boolean isCreate = isDatanode || isTransfer final boolean isCreate = isDatanode || isTransfer
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE; || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
streams = replicaInfo.createStreams(isCreate, requestedChecksum); streams = replicaInfo.createStreams(isCreate, requestedChecksum,
datanodeSlowLogThresholdMs);
assert streams != null : "null streams!"; assert streams != null : "null streams!";
// read checksum meta information // read checksum meta information
@ -260,13 +255,6 @@ class BlockReceiver implements Closeable {
this.bytesPerChecksum = diskChecksum.getBytesPerChecksum(); this.bytesPerChecksum = diskChecksum.getBytesPerChecksum();
this.checksumSize = diskChecksum.getChecksumSize(); this.checksumSize = diskChecksum.getChecksumSize();
this.out = streams.getDataOut();
if (out instanceof FileOutputStream) {
this.outFd = ((FileOutputStream)out).getFD();
} else {
LOG.warn("Could not get file descriptor for outputstream of class " +
out.getClass());
}
this.checksumOut = new DataOutputStream(new BufferedOutputStream( this.checksumOut = new DataOutputStream(new BufferedOutputStream(
streams.getChecksumOut(), DFSUtilClient.getSmallBufferSize( streams.getChecksumOut(), DFSUtilClient.getSmallBufferSize(
datanode.getConf()))); datanode.getConf())));
@ -319,7 +307,7 @@ public void close() throws IOException {
packetReceiver.close(); packetReceiver.close();
IOException ioe = null; IOException ioe = null;
if (syncOnClose && (out != null || checksumOut != null)) { if (syncOnClose && (streams.getDataOut() != null || checksumOut != null)) {
datanode.metrics.incrFsyncCount(); datanode.metrics.incrFsyncCount();
} }
long flushTotalNanos = 0; long flushTotalNanos = 0;
@ -348,9 +336,9 @@ public void close() throws IOException {
} }
// close block file // close block file
try { try {
if (out != null) { if (streams.getDataOut() != null) {
long flushStartNanos = System.nanoTime(); long flushStartNanos = System.nanoTime();
out.flush(); streams.flushDataOut();
long flushEndNanos = System.nanoTime(); long flushEndNanos = System.nanoTime();
if (syncOnClose) { if (syncOnClose) {
long fsyncStartNanos = flushEndNanos; long fsyncStartNanos = flushEndNanos;
@ -359,14 +347,13 @@ public void close() throws IOException {
} }
flushTotalNanos += flushEndNanos - flushStartNanos; flushTotalNanos += flushEndNanos - flushStartNanos;
measuredFlushTime = true; measuredFlushTime = true;
out.close(); streams.closeDataStream();
out = null;
} }
} catch (IOException e) { } catch (IOException e) {
ioe = e; ioe = e;
} }
finally{ finally{
IOUtils.closeStream(out); streams.close();
} }
if (replicaHandler != null) { if (replicaHandler != null) {
IOUtils.cleanup(null, replicaHandler); IOUtils.cleanup(null, replicaHandler);
@ -419,9 +406,9 @@ void flushOrSync(boolean isSync) throws IOException {
} }
flushTotalNanos += flushEndNanos - flushStartNanos; flushTotalNanos += flushEndNanos - flushStartNanos;
} }
if (out != null) { if (streams.getDataOut() != null) {
long flushStartNanos = System.nanoTime(); long flushStartNanos = System.nanoTime();
out.flush(); streams.flushDataOut();
long flushEndNanos = System.nanoTime(); long flushEndNanos = System.nanoTime();
if (isSync) { if (isSync) {
long fsyncStartNanos = flushEndNanos; long fsyncStartNanos = flushEndNanos;
@ -430,10 +417,10 @@ void flushOrSync(boolean isSync) throws IOException {
} }
flushTotalNanos += flushEndNanos - flushStartNanos; flushTotalNanos += flushEndNanos - flushStartNanos;
} }
if (checksumOut != null || out != null) { if (checksumOut != null || streams.getDataOut() != null) {
datanode.metrics.addFlushNanos(flushTotalNanos); datanode.metrics.addFlushNanos(flushTotalNanos);
if (isSync) { if (isSync) {
datanode.metrics.incrFsyncCount(); datanode.metrics.incrFsyncCount();
} }
} }
long duration = Time.monotonicNow() - begin; long duration = Time.monotonicNow() - begin;
@ -716,16 +703,12 @@ private int receivePacket() throws IOException {
int numBytesToDisk = (int)(offsetInBlock-onDiskLen); int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
// Write data to disk. // Write data to disk.
long begin = Time.monotonicNow(); long duration = streams.writeToDisk(dataBuf.array(),
out.write(dataBuf.array(), startByteToDisk, numBytesToDisk); startByteToDisk, numBytesToDisk);
long duration = Time.monotonicNow() - begin;
if (duration > maxWriteToDiskMs) { if (duration > maxWriteToDiskMs) {
maxWriteToDiskMs = duration; maxWriteToDiskMs = duration;
} }
if (duration > datanodeSlowLogThresholdMs) {
LOG.warn("Slow BlockReceiver write data to disk cost:" + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
}
final byte[] lastCrc; final byte[] lastCrc;
if (shouldNotWriteChecksum) { if (shouldNotWriteChecksum) {
@ -842,7 +825,7 @@ private static byte[] copyLastChunkChecksum(byte[] array, int size, int end) {
private void manageWriterOsCache(long offsetInBlock) { private void manageWriterOsCache(long offsetInBlock) {
try { try {
if (outFd != null && if (streams.getOutFd() != null &&
offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) { offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) {
long begin = Time.monotonicNow(); long begin = Time.monotonicNow();
// //
@ -857,12 +840,11 @@ private void manageWriterOsCache(long offsetInBlock) {
if (syncBehindWrites) { if (syncBehindWrites) {
if (syncBehindWritesInBackground) { if (syncBehindWritesInBackground) {
this.datanode.getFSDataset().submitBackgroundSyncFileRangeRequest( this.datanode.getFSDataset().submitBackgroundSyncFileRangeRequest(
block, outFd, lastCacheManagementOffset, block, streams, lastCacheManagementOffset,
offsetInBlock - lastCacheManagementOffset, offsetInBlock - lastCacheManagementOffset,
SYNC_FILE_RANGE_WRITE); SYNC_FILE_RANGE_WRITE);
} else { } else {
NativeIO.POSIX.syncFileRangeIfPossible(outFd, streams.syncFileRangeIfPossible(lastCacheManagementOffset,
lastCacheManagementOffset,
offsetInBlock - lastCacheManagementOffset, offsetInBlock - lastCacheManagementOffset,
SYNC_FILE_RANGE_WRITE); SYNC_FILE_RANGE_WRITE);
} }
@ -879,8 +861,8 @@ private void manageWriterOsCache(long offsetInBlock) {
// //
long dropPos = lastCacheManagementOffset - CACHE_DROP_LAG_BYTES; long dropPos = lastCacheManagementOffset - CACHE_DROP_LAG_BYTES;
if (dropPos > 0 && dropCacheBehindWrites) { if (dropPos > 0 && dropCacheBehindWrites) {
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( streams.dropCacheBehindWrites(block.getBlockName(), 0, dropPos,
block.getBlockName(), outFd, 0, dropPos, POSIX_FADV_DONTNEED); POSIX_FADV_DONTNEED);
} }
lastCacheManagementOffset = offsetInBlock; lastCacheManagementOffset = offsetInBlock;
long duration = Time.monotonicNow() - begin; long duration = Time.monotonicNow() - begin;
@ -989,7 +971,7 @@ void receiveBlock(
// The worst case is not recovering this RBW replica. // The worst case is not recovering this RBW replica.
// Client will fall back to regular pipeline recovery. // Client will fall back to regular pipeline recovery.
} finally { } finally {
IOUtils.closeStream(out); IOUtils.closeStream(streams.getDataOut());
} }
try { try {
// Even if the connection is closed after the ack packet is // Even if the connection is closed after the ack packet is
@ -1047,8 +1029,8 @@ private void cleanupBlock() throws IOException {
* will be overwritten. * will be overwritten.
*/ */
private void adjustCrcFilePosition() throws IOException { private void adjustCrcFilePosition() throws IOException {
if (out != null) { if (streams.getDataOut() != null) {
out.flush(); streams.flushDataOut();
} }
if (checksumOut != null) { if (checksumOut != null) {
checksumOut.flush(); checksumOut.flush();
@ -1094,10 +1076,10 @@ private Checksum computePartialChunkCrc(long blkoff, long ckoff)
byte[] crcbuf = new byte[checksumSize]; byte[] crcbuf = new byte[checksumSize];
try (ReplicaInputStreams instr = try (ReplicaInputStreams instr =
datanode.data.getTmpInputStreams(block, blkoff, ckoff)) { datanode.data.getTmpInputStreams(block, blkoff, ckoff)) {
IOUtils.readFully(instr.getDataIn(), buf, 0, sizePartialChunk); instr.readDataFully(buf, 0, sizePartialChunk);
// open meta file and read in crc value computer earlier // open meta file and read in crc value computer earlier
IOUtils.readFully(instr.getChecksumIn(), crcbuf, 0, crcbuf.length); instr.readChecksumFully(crcbuf, 0, crcbuf.length);
} }
// compute crc of partial chunk from data read in the block file. // compute crc of partial chunk from data read in the block file.

View File

@ -20,7 +20,6 @@
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.FileDescriptor;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
@ -42,11 +41,11 @@
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.SocketOutputStream; import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -120,12 +119,11 @@ class BlockSender implements java.io.Closeable {
/** the block to read from */ /** the block to read from */
private final ExtendedBlock block; private final ExtendedBlock block;
/** Stream to read block data from */
private InputStream blockIn; /** InputStreams and file descriptors to read block/checksum. */
private ReplicaInputStreams ris;
/** updated while using transferTo() */ /** updated while using transferTo() */
private long blockInPosition = -1; private long blockInPosition = -1;
/** Stream to read checksum */
private DataInputStream checksumIn;
/** Checksum utility */ /** Checksum utility */
private final DataChecksum checksum; private final DataChecksum checksum;
/** Initial position to read */ /** Initial position to read */
@ -153,11 +151,6 @@ class BlockSender implements java.io.Closeable {
private volatile ChunkChecksum lastChunkChecksum = null; private volatile ChunkChecksum lastChunkChecksum = null;
private DataNode datanode; private DataNode datanode;
/** The file descriptor of the block being sent */
private FileDescriptor blockInFd;
/** The reference to the volume where the block is located */
private FsVolumeReference volumeRef;
/** The replica of the block that is being read. */ /** The replica of the block that is being read. */
private final Replica replica; private final Replica replica;
@ -201,6 +194,9 @@ class BlockSender implements java.io.Closeable {
boolean sendChecksum, DataNode datanode, String clientTraceFmt, boolean sendChecksum, DataNode datanode, String clientTraceFmt,
CachingStrategy cachingStrategy) CachingStrategy cachingStrategy)
throws IOException { throws IOException {
InputStream blockIn = null;
DataInputStream checksumIn = null;
FsVolumeReference volumeRef = null;
try { try {
this.block = block; this.block = block;
this.corruptChecksumOk = corruptChecksumOk; this.corruptChecksumOk = corruptChecksumOk;
@ -281,7 +277,7 @@ class BlockSender implements java.io.Closeable {
(!is32Bit || length <= Integer.MAX_VALUE); (!is32Bit || length <= Integer.MAX_VALUE);
// Obtain a reference before reading data // Obtain a reference before reading data
this.volumeRef = datanode.data.getVolume(block).obtainReference(); volumeRef = datanode.data.getVolume(block).obtainReference();
/* /*
* (corruptChecksumOK, meta_file_exist): operation * (corruptChecksumOK, meta_file_exist): operation
@ -405,14 +401,9 @@ class BlockSender implements java.io.Closeable {
DataNode.LOG.debug("replica=" + replica); DataNode.LOG.debug("replica=" + replica);
} }
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
if (blockIn instanceof FileInputStream) { ris = new ReplicaInputStreams(blockIn, checksumIn, volumeRef);
blockInFd = ((FileInputStream)blockIn).getFD();
} else {
blockInFd = null;
}
} catch (IOException ioe) { } catch (IOException ioe) {
IOUtils.closeStream(this); IOUtils.closeStream(this);
IOUtils.closeStream(blockIn);
throw ioe; throw ioe;
} }
} }
@ -422,12 +413,11 @@ class BlockSender implements java.io.Closeable {
*/ */
@Override @Override
public void close() throws IOException { public void close() throws IOException {
if (blockInFd != null && if (ris.getDataInFd() != null &&
((dropCacheBehindAllReads) || ((dropCacheBehindAllReads) ||
(dropCacheBehindLargeReads && isLongRead()))) { (dropCacheBehindLargeReads && isLongRead()))) {
try { try {
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( ris.dropCacheBehindReads(block.getBlockName(), lastCacheDropOffset,
block.getBlockName(), blockInFd, lastCacheDropOffset,
offset - lastCacheDropOffset, POSIX_FADV_DONTNEED); offset - lastCacheDropOffset, POSIX_FADV_DONTNEED);
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Unable to drop cache on file close", e); LOG.warn("Unable to drop cache on file close", e);
@ -437,31 +427,11 @@ public void close() throws IOException {
curReadahead.cancel(); curReadahead.cancel();
} }
IOException ioe = null; try {
if(checksumIn!=null) { ris.closeStreams();
try { } finally {
checksumIn.close(); // close checksum file IOUtils.closeStream(ris);
} catch (IOException e) { ris = null;
ioe = e;
}
checksumIn = null;
}
if(blockIn!=null) {
try {
blockIn.close(); // close data file
} catch (IOException e) {
ioe = e;
}
blockIn = null;
blockInFd = null;
}
if (volumeRef != null) {
IOUtils.cleanup(null, volumeRef);
volumeRef = null;
}
// throw IOException if there is any
if(ioe!= null) {
throw ioe;
} }
} }
@ -565,7 +535,7 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
int checksumOff = pkt.position(); int checksumOff = pkt.position();
byte[] buf = pkt.array(); byte[] buf = pkt.array();
if (checksumSize > 0 && checksumIn != null) { if (checksumSize > 0 && ris.getChecksumIn() != null) {
readChecksum(buf, checksumOff, checksumDataLen); readChecksum(buf, checksumOff, checksumDataLen);
// write in progress that we need to use to get last checksum // write in progress that we need to use to get last checksum
@ -581,7 +551,7 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
int dataOff = checksumOff + checksumDataLen; int dataOff = checksumOff + checksumDataLen;
if (!transferTo) { // normal transfer if (!transferTo) { // normal transfer
IOUtils.readFully(blockIn, buf, dataOff, dataLen); ris.readDataFully(buf, dataOff, dataLen);
if (verifyChecksum) { if (verifyChecksum) {
verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff); verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
@ -595,7 +565,7 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
sockOut.write(buf, headerOff, dataOff - headerOff); sockOut.write(buf, headerOff, dataOff - headerOff);
// no need to flush since we know out is not a buffered stream // no need to flush since we know out is not a buffered stream
FileChannel fileCh = ((FileInputStream)blockIn).getChannel(); FileChannel fileCh = ((FileInputStream)ris.getDataIn()).getChannel();
LongWritable waitTime = new LongWritable(); LongWritable waitTime = new LongWritable();
LongWritable transferTime = new LongWritable(); LongWritable transferTime = new LongWritable();
sockOut.transferToFully(fileCh, blockInPosition, dataLen, sockOut.transferToFully(fileCh, blockInPosition, dataLen,
@ -630,7 +600,7 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) { if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
LOG.error("BlockSender.sendChunks() exception: ", e); LOG.error("BlockSender.sendChunks() exception: ", e);
datanode.getBlockScanner().markSuspectBlock( datanode.getBlockScanner().markSuspectBlock(
volumeRef.getVolume().getStorageID(), ris.getVolumeRef().getVolume().getStorageID(),
block); block);
} }
} }
@ -653,16 +623,15 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
*/ */
private void readChecksum(byte[] buf, final int checksumOffset, private void readChecksum(byte[] buf, final int checksumOffset,
final int checksumLen) throws IOException { final int checksumLen) throws IOException {
if (checksumSize <= 0 && checksumIn == null) { if (checksumSize <= 0 && ris.getChecksumIn() == null) {
return; return;
} }
try { try {
checksumIn.readFully(buf, checksumOffset, checksumLen); ris.readChecksumFully(buf, checksumOffset, checksumLen);
} catch (IOException e) { } catch (IOException e) {
LOG.warn(" Could not read or failed to verify checksum for data" LOG.warn(" Could not read or failed to verify checksum for data"
+ " at offset " + offset + " for block " + block, e); + " at offset " + offset + " for block " + block, e);
IOUtils.closeStream(checksumIn); ris.closeChecksumStream();
checksumIn = null;
if (corruptChecksumOk) { if (corruptChecksumOk) {
if (checksumOffset < checksumLen) { if (checksumOffset < checksumLen) {
// Just fill the array with zeros. // Just fill the array with zeros.
@ -746,10 +715,10 @@ private long doSendBlock(DataOutputStream out, OutputStream baseStream,
lastCacheDropOffset = initialOffset; lastCacheDropOffset = initialOffset;
if (isLongRead() && blockInFd != null) { if (isLongRead() && ris.getDataInFd() != null) {
// Advise that this file descriptor will be accessed sequentially. // Advise that this file descriptor will be accessed sequentially.
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( ris.dropCacheBehindReads(block.getBlockName(), 0, 0,
block.getBlockName(), blockInFd, 0, 0, POSIX_FADV_SEQUENTIAL); POSIX_FADV_SEQUENTIAL);
} }
// Trigger readahead of beginning of file if configured. // Trigger readahead of beginning of file if configured.
@ -761,9 +730,10 @@ private long doSendBlock(DataOutputStream out, OutputStream baseStream,
int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN; int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
boolean transferTo = transferToAllowed && !verifyChecksum boolean transferTo = transferToAllowed && !verifyChecksum
&& baseStream instanceof SocketOutputStream && baseStream instanceof SocketOutputStream
&& blockIn instanceof FileInputStream; && ris.getDataIn() instanceof FileInputStream;
if (transferTo) { if (transferTo) {
FileChannel fileChannel = ((FileInputStream)blockIn).getChannel(); FileChannel fileChannel =
((FileInputStream)ris.getDataIn()).getChannel();
blockInPosition = fileChannel.position(); blockInPosition = fileChannel.position();
streamForSendChunks = baseStream; streamForSendChunks = baseStream;
maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE); maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
@ -818,14 +788,16 @@ private long doSendBlock(DataOutputStream out, OutputStream baseStream,
private void manageOsCache() throws IOException { private void manageOsCache() throws IOException {
// We can't manage the cache for this block if we don't have a file // We can't manage the cache for this block if we don't have a file
// descriptor to work with. // descriptor to work with.
if (blockInFd == null) return; if (ris.getDataInFd() == null) {
return;
}
// Perform readahead if necessary // Perform readahead if necessary
if ((readaheadLength > 0) && (datanode.readaheadPool != null) && if ((readaheadLength > 0) && (datanode.readaheadPool != null) &&
(alwaysReadahead || isLongRead())) { (alwaysReadahead || isLongRead())) {
curReadahead = datanode.readaheadPool.readaheadStream( curReadahead = datanode.readaheadPool.readaheadStream(
clientTraceFmt, blockInFd, offset, readaheadLength, Long.MAX_VALUE, clientTraceFmt, ris.getDataInFd(), offset, readaheadLength,
curReadahead); Long.MAX_VALUE, curReadahead);
} }
// Drop what we've just read from cache, since we aren't // Drop what we've just read from cache, since we aren't
@ -835,8 +807,7 @@ private void manageOsCache() throws IOException {
long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES; long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
if (offset >= nextCacheDropOffset) { if (offset >= nextCacheDropOffset) {
long dropLength = offset - lastCacheDropOffset; long dropLength = offset - lastCacheDropOffset;
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( ris.dropCacheBehindReads(block.getBlockName(), lastCacheDropOffset,
block.getBlockName(), blockInFd, lastCacheDropOffset,
dropLength, POSIX_FADV_DONTNEED); dropLength, POSIX_FADV_DONTNEED);
lastCacheDropOffset = offset; lastCacheDropOffset = offset;
} }

View File

@ -402,6 +402,10 @@ public int getVolsConfigured() {
return volsConfigured; return volsConfigured;
} }
public long getSlowIoWarningThresholdMs() {
return datanodeSlowIoWarningThresholdMs;
}
int getMaxDataLength() { int getMaxDataLength() {
return maxDataLength; return maxDataLength;
} }

View File

@ -1355,4 +1355,9 @@ synchronized BlockPoolSliceStorage getBlockPoolSliceStorage(
synchronized void removeBlockPoolStorage(String bpId) { synchronized void removeBlockPoolStorage(String bpId) {
bpStorageMap.remove(bpId); bpStorageMap.remove(bpId);
} }
public static boolean fullyDelete(final File dir) {
boolean result = FileUtil.fullyDelete(dir);
return result;
}
} }

View File

@ -29,9 +29,6 @@
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.LocalFileSystem;
@ -46,6 +43,8 @@
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -69,15 +68,7 @@ abstract public class LocalReplica extends ReplicaInfo {
private static final Map<String, File> internedBaseDirs = new HashMap<String, File>(); private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
static final Log LOG = LogFactory.getLog(LocalReplica.class); static final Logger LOG = LoggerFactory.getLogger(LocalReplica.class);
private final static boolean IS_NATIVE_IO_AVAIL;
static {
IS_NATIVE_IO_AVAIL = NativeIO.isAvailable();
if (Path.WINDOWS && !IS_NATIVE_IO_AVAIL) {
LOG.warn("Data node cannot fully support concurrent reading"
+ " and writing without native code extensions on Windows.");
}
}
/** /**
* Constructor * Constructor
@ -199,14 +190,14 @@ private void breakHardlinks(File file, Block b) throws IOException {
File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file)); File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file));
try (FileInputStream in = new FileInputStream(file)) { try (FileInputStream in = new FileInputStream(file)) {
try (FileOutputStream out = new FileOutputStream(tmpFile)){ try (FileOutputStream out = new FileOutputStream(tmpFile)){
IOUtils.copyBytes(in, out, 16 * 1024); copyBytes(in, out, 16 * 1024);
} }
if (file.length() != tmpFile.length()) { if (file.length() != tmpFile.length()) {
throw new IOException("Copy of file " + file + " size " + file.length()+ throw new IOException("Copy of file " + file + " size " + file.length()+
" into file " + tmpFile + " into file " + tmpFile +
" resulted in a size of " + tmpFile.length()); " resulted in a size of " + tmpFile.length());
} }
FileUtil.replaceFile(tmpFile, file); replaceFile(tmpFile, file);
} catch (IOException e) { } catch (IOException e) {
boolean done = tmpFile.delete(); boolean done = tmpFile.delete();
if (!done) { if (!done) {
@ -241,13 +232,13 @@ public boolean breakHardLinksIfNeeded() throws IOException {
} }
File meta = getMetaFile(); File meta = getMetaFile();
int linkCount = HardLink.getLinkCount(file); int linkCount = getHardLinkCount(file);
if (linkCount > 1) { if (linkCount > 1) {
DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " + DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " +
"block " + this); "block " + this);
breakHardlinks(file, this); breakHardlinks(file, this);
} }
if (HardLink.getLinkCount(meta) > 1) { if (getHardLinkCount(meta) > 1) {
breakHardlinks(meta, this); breakHardlinks(meta, this);
} }
return true; return true;
@ -260,18 +251,7 @@ public URI getBlockURI() {
@Override @Override
public InputStream getDataInputStream(long seekOffset) throws IOException { public InputStream getDataInputStream(long seekOffset) throws IOException {
return getDataInputStream(getBlockFile(), seekOffset);
File blockFile = getBlockFile();
if (IS_NATIVE_IO_AVAIL) {
return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset);
} else {
try {
return FsDatasetUtil.openAndSeek(blockFile, seekOffset);
} catch (FileNotFoundException fnfe) {
throw new IOException("Block " + this + " is not valid. " +
"Expected block file at " + blockFile + " does not exist.");
}
}
} }
@Override @Override
@ -286,7 +266,7 @@ public boolean blockDataExists() {
@Override @Override
public boolean deleteBlockData() { public boolean deleteBlockData() {
return getBlockFile().delete(); return fullyDelete(getBlockFile());
} }
@Override @Override
@ -320,7 +300,7 @@ public boolean metadataExists() {
@Override @Override
public boolean deleteMetadata() { public boolean deleteMetadata() {
return getMetaFile().delete(); return fullyDelete(getMetaFile());
} }
@Override @Override
@ -340,7 +320,7 @@ public boolean renameData(URI destURI) throws IOException {
private boolean renameFile(File srcfile, File destfile) throws IOException { private boolean renameFile(File srcfile, File destfile) throws IOException {
try { try {
NativeIO.renameTo(srcfile, destfile); rename(srcfile, destfile);
return true; return true;
} catch (IOException e) { } catch (IOException e) {
throw new IOException("Failed to move block file for " + this throw new IOException("Failed to move block file for " + this
@ -367,22 +347,14 @@ public void updateWithReplica(StorageLocation replicaLocation) {
@Override @Override
public boolean getPinning(LocalFileSystem localFS) throws IOException { public boolean getPinning(LocalFileSystem localFS) throws IOException {
FileStatus fss = return getPinning(localFS, new Path(getBlockFile().getAbsolutePath()));
localFS.getFileStatus(new Path(getBlockFile().getAbsolutePath()));
return fss.getPermission().getStickyBit();
} }
@Override @Override
public void setPinning(LocalFileSystem localFS) throws IOException { public void setPinning(LocalFileSystem localFS) throws IOException {
File f = getBlockFile(); File f = getBlockFile();
Path p = new Path(f.getAbsolutePath()); Path p = new Path(f.getAbsolutePath());
setPinning(localFS, p);
FsPermission oldPermission = localFS.getFileStatus(
new Path(f.getAbsolutePath())).getPermission();
//sticky bit is used for pinning purpose
FsPermission permission = new FsPermission(oldPermission.getUserAction(),
oldPermission.getGroupAction(), oldPermission.getOtherAction(), true);
localFS.setPermission(p, permission);
} }
@Override @Override
@ -398,7 +370,7 @@ public void bumpReplicaGS(long newGS) throws IOException {
} }
try { try {
// calling renameMeta on the ReplicaInfo doesn't work here // calling renameMeta on the ReplicaInfo doesn't work here
NativeIO.renameTo(oldmeta, newmeta); rename(oldmeta, newmeta);
} catch (IOException e) { } catch (IOException e) {
setGenerationStamp(oldGS); // restore old GS setGenerationStamp(oldGS); // restore old GS
throw new IOException("Block " + this + " reopen failed. " + throw new IOException("Block " + this + " reopen failed. " +
@ -417,7 +389,113 @@ public int compareWith(ScanInfo info) {
return info.getBlockFile().compareTo(getBlockFile()); return info.getBlockFile().compareTo(getBlockFile());
} }
static public void truncateBlock(File blockFile, File metaFile, @Override
public void copyMetadata(URI destination) throws IOException {
//for local replicas, we assume the destination URI is file
nativeCopyFileUnbuffered(getMetaFile(), new File(destination), true);
}
@Override
public void copyBlockdata(URI destination) throws IOException {
//for local replicas, we assume the destination URI is file
nativeCopyFileUnbuffered(getBlockFile(), new File(destination), true);
}
public void renameMeta(File newMetaFile) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming " + getMetaFile() + " to " + newMetaFile);
}
renameFile(getMetaFile(), newMetaFile);
}
public void renameBlock(File newBlockFile) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming " + getBlockFile() + " to " + newBlockFile
+ ", file length=" + getBlockFile().length());
}
renameFile(getBlockFile(), newBlockFile);
}
public static void rename(File from, File to) throws IOException {
Storage.rename(from, to);
}
/**
* Get input stream for a local file and optionally seek to the offset.
* @param f path to the file
* @param seekOffset offset to seek
* @return input stream for read
* @throws IOException
*/
private FileInputStream getDataInputStream(File f, long seekOffset)
throws IOException {
FileInputStream fis;
if (NativeIO.isAvailable()) {
fis = NativeIO.getShareDeleteFileInputStream(f, seekOffset);
} else {
try {
fis = FsDatasetUtil.openAndSeek(f, seekOffset);
} catch (FileNotFoundException fnfe) {
throw new IOException("Expected block file at " + f +
" does not exist.");
}
}
return fis;
}
private void nativeCopyFileUnbuffered(File srcFile, File destFile,
boolean preserveFileDate) throws IOException {
Storage.nativeCopyFileUnbuffered(srcFile, destFile, preserveFileDate);
}
private void copyBytes(InputStream in, OutputStream out, int
buffSize) throws IOException{
IOUtils.copyBytes(in, out, buffSize);
}
private void replaceFile(File src, File target) throws IOException {
FileUtil.replaceFile(src, target);
}
public static boolean fullyDelete(final File dir) {
boolean result = DataStorage.fullyDelete(dir);
return result;
}
public static int getHardLinkCount(File fileName) throws IOException {
int linkCount = HardLink.getLinkCount(fileName);
return linkCount;
}
/**
* Get pin status of a file by checking the sticky bit.
* @param localFS local file system
* @param path path to be checked
* @return true if the file is pinned with sticky bit
* @throws IOException
*/
public boolean getPinning(LocalFileSystem localFS, Path path) throws
IOException {
boolean stickyBit =
localFS.getFileStatus(path).getPermission().getStickyBit();
return stickyBit;
}
/**
* Set sticky bit on path to pin file.
* @param localFS local file system
* @param path path to be pinned with sticky bit
* @throws IOException
*/
public void setPinning(LocalFileSystem localFS, Path path) throws
IOException {
FsPermission oldPermission = localFS.getFileStatus(path).getPermission();
FsPermission permission = new FsPermission(oldPermission.getUserAction(),
oldPermission.getGroupAction(), oldPermission.getOtherAction(), true);
localFS.setPermission(path, permission);
}
public static void truncateBlock(File blockFile, File metaFile,
long oldlen, long newlen) throws IOException { long oldlen, long newlen) throws IOException {
LOG.info("truncateBlock: blockFile=" + blockFile LOG.info("truncateBlock: blockFile=" + blockFile
+ ", metaFile=" + metaFile + ", metaFile=" + metaFile
@ -467,19 +545,4 @@ static public void truncateBlock(File blockFile, File metaFile,
metaRAF.close(); metaRAF.close();
} }
} }
@Override
public void copyMetadata(URI destination) throws IOException {
//for local replicas, we assume the destination URI is file
Storage.nativeCopyFileUnbuffered(getMetaFile(),
new File(destination), true);
}
@Override
public void copyBlockdata(URI destination) throws IOException {
//for local replicas, we assume the destination URI is file
Storage.nativeCopyFileUnbuffered(getBlockFile(),
new File(destination), true);
}
} }

View File

@ -30,7 +30,6 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -246,7 +245,8 @@ public int hashCode() {
@Override // ReplicaInPipeline @Override // ReplicaInPipeline
public ReplicaOutputStreams createStreams(boolean isCreate, public ReplicaOutputStreams createStreams(boolean isCreate,
DataChecksum requestedChecksum) throws IOException { DataChecksum requestedChecksum, long slowLogThresholdMs)
throws IOException {
File blockFile = getBlockFile(); File blockFile = getBlockFile();
File metaFile = getMetaFile(); File metaFile = getMetaFile();
if (DataNode.LOG.isDebugEnabled()) { if (DataNode.LOG.isDebugEnabled()) {
@ -313,7 +313,7 @@ public ReplicaOutputStreams createStreams(boolean isCreate,
crcOut.getChannel().position(crcDiskSize); crcOut.getChannel().position(crcDiskSize);
} }
return new ReplicaOutputStreams(blockOut, crcOut, checksum, return new ReplicaOutputStreams(blockOut, crcOut, checksum,
getVolume().isTransientStorage()); getVolume().isTransientStorage(), slowLogThresholdMs);
} catch (IOException e) { } catch (IOException e) {
IOUtils.closeStream(blockOut); IOUtils.closeStream(blockOut);
IOUtils.closeStream(metaRAF); IOUtils.closeStream(metaRAF);
@ -373,40 +373,30 @@ public void moveReplicaFrom(ReplicaInfo oldReplicaInfo, File newBlkFile)
+ " should be derived from LocalReplica"); + " should be derived from LocalReplica");
} }
LocalReplica localReplica = (LocalReplica) oldReplicaInfo; LocalReplica oldReplica = (LocalReplica) oldReplicaInfo;
File oldmeta = oldReplica.getMetaFile();
File oldmeta = localReplica.getMetaFile();
File newmeta = getMetaFile(); File newmeta = getMetaFile();
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming " + oldmeta + " to " + newmeta);
}
try { try {
NativeIO.renameTo(oldmeta, newmeta); oldReplica.renameMeta(newmeta);
} catch (IOException e) { } catch (IOException e) {
throw new IOException("Block " + oldReplicaInfo + " reopen failed. " + throw new IOException("Block " + oldReplicaInfo + " reopen failed. " +
" Unable to move meta file " + oldmeta + " Unable to move meta file " + oldmeta +
" to rbw dir " + newmeta, e); " to rbw dir " + newmeta, e);
} }
File blkfile = localReplica.getBlockFile();
if (LOG.isDebugEnabled()) {
LOG.debug("Renaming " + blkfile + " to " + newBlkFile
+ ", file length=" + blkfile.length());
}
try { try {
NativeIO.renameTo(blkfile, newBlkFile); oldReplica.renameBlock(newBlkFile);
} catch (IOException e) { } catch (IOException e) {
try { try {
NativeIO.renameTo(newmeta, oldmeta); renameMeta(oldmeta);
} catch (IOException ex) { } catch (IOException ex) {
LOG.warn("Cannot move meta file " + newmeta + LOG.warn("Cannot move meta file " + newmeta +
"back to the finalized directory " + oldmeta, ex); "back to the finalized directory " + oldmeta, ex);
} }
throw new IOException("Block " + oldReplicaInfo + " reopen failed. " + throw new IOException("Block " + oldReplicaInfo + " reopen failed. " +
" Unable to move block file " + blkfile + " Unable to move block file " + oldReplica.getBlockFile() +
" to rbw dir " + newBlkFile, e); " to rbw dir " + newBlkFile, e);
} }
} }

View File

@ -69,11 +69,13 @@ public interface ReplicaInPipeline extends Replica {
* *
* @param isCreate if it is for creation * @param isCreate if it is for creation
* @param requestedChecksum the checksum the writer would prefer to use * @param requestedChecksum the checksum the writer would prefer to use
* @param slowLogThresholdMs slow io threshold for logging
* @return output streams for writing * @return output streams for writing
* @throws IOException if any error occurs * @throws IOException if any error occurs
*/ */
public ReplicaOutputStreams createStreams(boolean isCreate, public ReplicaOutputStreams createStreams(boolean isCreate,
DataChecksum requestedChecksum) throws IOException; DataChecksum requestedChecksum, long slowLogThresholdMs)
throws IOException;
/** /**
* Create an output stream to write restart metadata in case of datanode * Create an output stream to write restart metadata in case of datanode

View File

@ -21,7 +21,6 @@
import java.io.Closeable; import java.io.Closeable;
import java.io.EOFException; import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.FileDescriptor;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -605,7 +604,7 @@ BlockLocalPathInfo getBlockLocalPathInfo(ExtendedBlock b
* submit a sync_file_range request to AsyncDiskService. * submit a sync_file_range request to AsyncDiskService.
*/ */
void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block, void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block,
final FileDescriptor fd, final long offset, final long nbytes, final ReplicaOutputStreams outs, final long offset, final long nbytes,
final int flags); final int flags);
/** /**

View File

@ -18,24 +18,45 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset; package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.Closeable; import java.io.Closeable;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.InputStream; import java.io.InputStream;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.slf4j.Logger;
/** /**
* Contains the input streams for the data and checksum of a replica. * Contains the input streams for the data and checksum of a replica.
*/ */
public class ReplicaInputStreams implements Closeable { public class ReplicaInputStreams implements Closeable {
private final InputStream dataIn; public static final Logger LOG = DataNode.LOG;
private final InputStream checksumIn;
private final FsVolumeReference volumeRef; private InputStream dataIn;
private InputStream checksumIn;
private FsVolumeReference volumeRef;
private FileDescriptor dataInFd = null;
/** Create an object with a data input stream and a checksum input stream. */ /** Create an object with a data input stream and a checksum input stream. */
public ReplicaInputStreams(InputStream dataStream, InputStream checksumStream, public ReplicaInputStreams(InputStream dataStream,
FsVolumeReference volumeRef) { InputStream checksumStream, FsVolumeReference volumeRef) {
this.volumeRef = volumeRef; this.volumeRef = volumeRef;
this.dataIn = dataStream; this.dataIn = dataStream;
this.checksumIn = checksumStream; this.checksumIn = checksumStream;
if (dataIn instanceof FileInputStream) {
try {
dataInFd = ((FileInputStream) dataIn).getFD();
} catch (Exception e) {
LOG.warn("Could not get file descriptor for inputstream of class " +
this.dataIn.getClass());
}
} else {
LOG.debug("Could not get file descriptor for inputstream of class " +
this.dataIn.getClass());
}
} }
/** @return the data input stream. */ /** @return the data input stream. */
@ -48,10 +69,81 @@ public InputStream getChecksumIn() {
return checksumIn; return checksumIn;
} }
public FileDescriptor getDataInFd() {
return dataInFd;
}
public FsVolumeReference getVolumeRef() {
return volumeRef;
}
public void readDataFully(byte[] buf, int off, int len)
throws IOException {
IOUtils.readFully(dataIn, buf, off, len);
}
public void readChecksumFully(byte[] buf, int off, int len)
throws IOException {
IOUtils.readFully(checksumIn, buf, off, len);
}
public void skipDataFully(long len) throws IOException {
IOUtils.skipFully(dataIn, len);
}
public void skipChecksumFully(long len) throws IOException {
IOUtils.skipFully(checksumIn, len);
}
public void closeChecksumStream() throws IOException {
IOUtils.closeStream(checksumIn);
checksumIn = null;
}
public void dropCacheBehindReads(String identifier, long offset, long len,
int flags) throws NativeIOException {
assert this.dataInFd != null : "null dataInFd!";
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
identifier, dataInFd, offset, len, flags);
}
public void closeStreams() throws IOException {
IOException ioe = null;
if(checksumIn!=null) {
try {
checksumIn.close(); // close checksum file
} catch (IOException e) {
ioe = e;
}
checksumIn = null;
}
if(dataIn!=null) {
try {
dataIn.close(); // close data file
} catch (IOException e) {
ioe = e;
}
dataIn = null;
dataInFd = null;
}
if (volumeRef != null) {
IOUtils.cleanup(null, volumeRef);
volumeRef = null;
}
// throw IOException if there is any
if(ioe!= null) {
throw ioe;
}
}
@Override @Override
public void close() { public void close() {
IOUtils.closeStream(dataIn); IOUtils.closeStream(dataIn);
dataIn = null;
dataInFd = null;
IOUtils.closeStream(checksumIn); IOUtils.closeStream(checksumIn);
checksumIn = null;
IOUtils.cleanup(null, volumeRef); IOUtils.cleanup(null, volumeRef);
volumeRef = null;
} }
} }

View File

@ -18,32 +18,62 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset; package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.Closeable; import java.io.Closeable;
import java.io.FileDescriptor;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
/** /**
* Contains the output streams for the data and checksum of a replica. * Contains the output streams for the data and checksum of a replica.
*/ */
public class ReplicaOutputStreams implements Closeable { public class ReplicaOutputStreams implements Closeable {
private final OutputStream dataOut; public static final Logger LOG = DataNode.LOG;
private FileDescriptor outFd = null;
/** Stream to block. */
private OutputStream dataOut;
/** Stream to checksum. */
private final OutputStream checksumOut; private final OutputStream checksumOut;
private final DataChecksum checksum; private final DataChecksum checksum;
private final boolean isTransientStorage; private final boolean isTransientStorage;
private final long slowLogThresholdMs;
/** /**
* Create an object with a data output stream, a checksum output stream * Create an object with a data output stream, a checksum output stream
* and a checksum. * and a checksum.
*/ */
public ReplicaOutputStreams(OutputStream dataOut, OutputStream checksumOut, public ReplicaOutputStreams(OutputStream dataOut,
DataChecksum checksum, boolean isTransientStorage) { OutputStream checksumOut, DataChecksum checksum,
boolean isTransientStorage, long slowLogThresholdMs) {
this.dataOut = dataOut; this.dataOut = dataOut;
this.checksumOut = checksumOut;
this.checksum = checksum; this.checksum = checksum;
this.slowLogThresholdMs = slowLogThresholdMs;
this.isTransientStorage = isTransientStorage; this.isTransientStorage = isTransientStorage;
this.checksumOut = checksumOut;
try {
if (this.dataOut instanceof FileOutputStream) {
this.outFd = ((FileOutputStream)this.dataOut).getFD();
} else {
LOG.debug("Could not get file descriptor for outputstream of class " +
this.dataOut.getClass());
}
} catch (IOException e) {
LOG.warn("Could not get file descriptor for outputstream of class " +
this.dataOut.getClass());
}
}
public FileDescriptor getOutFd() {
return outFd;
} }
/** @return the data output stream. */ /** @return the data output stream. */
@ -72,12 +102,17 @@ public void close() {
IOUtils.closeStream(checksumOut); IOUtils.closeStream(checksumOut);
} }
public void closeDataStream() throws IOException {
dataOut.close();
dataOut = null;
}
/** /**
* Sync the data stream if it supports it. * Sync the data stream if it supports it.
*/ */
public void syncDataOut() throws IOException { public void syncDataOut() throws IOException {
if (dataOut instanceof FileOutputStream) { if (dataOut instanceof FileOutputStream) {
((FileOutputStream)dataOut).getChannel().force(true); sync((FileOutputStream)dataOut);
} }
} }
@ -86,8 +121,68 @@ public void syncDataOut() throws IOException {
*/ */
public void syncChecksumOut() throws IOException { public void syncChecksumOut() throws IOException {
if (checksumOut instanceof FileOutputStream) { if (checksumOut instanceof FileOutputStream) {
((FileOutputStream)checksumOut).getChannel().force(true); sync((FileOutputStream)checksumOut);
} }
} }
/**
* Flush the data stream if it supports it.
*/
public void flushDataOut() throws IOException {
flush(dataOut);
}
/**
* Flush the checksum stream if it supports it.
*/
public void flushChecksumOut() throws IOException {
flush(checksumOut);
}
private void flush(OutputStream dos) throws IOException {
long begin = Time.monotonicNow();
dos.flush();
long duration = Time.monotonicNow() - begin;
LOG.trace("ReplicaOutputStreams#flush takes {} ms.", duration);
if (duration > slowLogThresholdMs) {
LOG.warn("Slow flush took {} ms (threshold={} ms)", duration,
slowLogThresholdMs);
}
}
private void sync(FileOutputStream fos) throws IOException {
long begin = Time.monotonicNow();
fos.getChannel().force(true);
long duration = Time.monotonicNow() - begin;
LOG.trace("ReplicaOutputStreams#sync takes {} ms.", duration);
if (duration > slowLogThresholdMs) {
LOG.warn("Slow fsync took {} ms (threshold={} ms)", duration,
slowLogThresholdMs);
}
}
public long writeToDisk(byte[] b, int off, int len) throws IOException {
long begin = Time.monotonicNow();
dataOut.write(b, off, len);
long duration = Time.monotonicNow() - begin;
LOG.trace("DatanodeIO#writeToDisk takes {} ms.", duration);
if (duration > slowLogThresholdMs) {
LOG.warn("Slow BlockReceiver write data to disk cost: {} ms " +
"(threshold={} ms)", duration, slowLogThresholdMs);
}
return duration;
}
public void syncFileRangeIfPossible(long offset, long nbytes,
int flags) throws NativeIOException {
assert this.outFd != null : "null outFd!";
NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, nbytes, flags);
}
public void dropCacheBehindWrites(String identifier,
long offset, long len, int flags) throws NativeIOException {
assert this.outFd != null : "null outFd!";
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
identifier, outFd, offset, len, flags);
}
} }

View File

@ -49,11 +49,13 @@
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder; import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.DiskChecker;
@ -145,7 +147,7 @@ class BlockPoolSlice {
// //
this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP); this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
if (tmpDir.exists()) { if (tmpDir.exists()) {
FileUtil.fullyDelete(tmpDir); DataStorage.fullyDelete(tmpDir);
} }
this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW); this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
if (!rbwDir.mkdirs()) { // create rbw directory if not exist if (!rbwDir.mkdirs()) { // create rbw directory if not exist
@ -436,7 +438,7 @@ private int moveLazyPersistReplicasToFinalized(File source)
final File targetMetaFile = new File(targetDir, metaFile.getName()); final File targetMetaFile = new File(targetDir, metaFile.getName());
try { try {
NativeIO.renameTo(metaFile, targetMetaFile); LocalReplica.rename(metaFile, targetMetaFile);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to move meta file from " LOG.warn("Failed to move meta file from "
+ metaFile + " to " + targetMetaFile, e); + metaFile + " to " + targetMetaFile, e);
@ -446,7 +448,7 @@ private int moveLazyPersistReplicasToFinalized(File source)
final File targetBlockFile = new File(targetDir, blockFile.getName()); final File targetBlockFile = new File(targetDir, blockFile.getName());
try { try {
NativeIO.renameTo(blockFile, targetBlockFile); LocalReplica.rename(blockFile, targetBlockFile);
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to move block file from " LOG.warn("Failed to move block file from "
+ blockFile + " to " + targetBlockFile, e); + blockFile + " to " + targetBlockFile, e);
@ -688,8 +690,6 @@ private void deleteReplica(final ReplicaInfo replicaToDelete) {
* @return the number of valid bytes * @return the number of valid bytes
*/ */
private long validateIntegrityAndSetLength(File blockFile, long genStamp) { private long validateIntegrityAndSetLength(File blockFile, long genStamp) {
DataInputStream checksumIn = null;
InputStream blockIn = null;
try { try {
final File metaFile = FsDatasetUtil.getMetaFile(blockFile, genStamp); final File metaFile = FsDatasetUtil.getMetaFile(blockFile, genStamp);
long blockFileLen = blockFile.length(); long blockFileLen = blockFile.length();
@ -699,57 +699,52 @@ private long validateIntegrityAndSetLength(File blockFile, long genStamp) {
!metaFile.exists() || metaFileLen < crcHeaderLen) { !metaFile.exists() || metaFileLen < crcHeaderLen) {
return 0; return 0;
} }
checksumIn = new DataInputStream( try (DataInputStream checksumIn = new DataInputStream(
new BufferedInputStream(new FileInputStream(metaFile), new BufferedInputStream(new FileInputStream(metaFile),
ioFileBufferSize)); ioFileBufferSize))) {
// read and handle the common header here. For now just a version
// read and handle the common header here. For now just a version final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( checksumIn, metaFile);
checksumIn, metaFile); int bytesPerChecksum = checksum.getBytesPerChecksum();
int bytesPerChecksum = checksum.getBytesPerChecksum(); int checksumSize = checksum.getChecksumSize();
int checksumSize = checksum.getChecksumSize(); long numChunks = Math.min(
long numChunks = Math.min( (blockFileLen + bytesPerChecksum - 1) / bytesPerChecksum,
(blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum, (metaFileLen - crcHeaderLen) / checksumSize);
(metaFileLen - crcHeaderLen)/checksumSize); if (numChunks == 0) {
if (numChunks == 0) { return 0;
return 0; }
} try (InputStream blockIn = new FileInputStream(blockFile);
IOUtils.skipFully(checksumIn, (numChunks-1)*checksumSize); ReplicaInputStreams ris = new ReplicaInputStreams(blockIn,
blockIn = new FileInputStream(blockFile); checksumIn, volume.obtainReference())) {
long lastChunkStartPos = (numChunks-1)*bytesPerChecksum; ris.skipChecksumFully((numChunks - 1) * checksumSize);
IOUtils.skipFully(blockIn, lastChunkStartPos); long lastChunkStartPos = (numChunks - 1) * bytesPerChecksum;
int lastChunkSize = (int)Math.min( ris.skipDataFully(lastChunkStartPos);
bytesPerChecksum, blockFileLen-lastChunkStartPos); int lastChunkSize = (int) Math.min(
byte[] buf = new byte[lastChunkSize+checksumSize]; bytesPerChecksum, blockFileLen - lastChunkStartPos);
checksumIn.readFully(buf, lastChunkSize, checksumSize); byte[] buf = new byte[lastChunkSize + checksumSize];
IOUtils.readFully(blockIn, buf, 0, lastChunkSize); ris.readChecksumFully(buf, lastChunkSize, checksumSize);
ris.readDataFully(buf, 0, lastChunkSize);
checksum.update(buf, 0, lastChunkSize); checksum.update(buf, 0, lastChunkSize);
long validFileLength; long validFileLength;
if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc
validFileLength = lastChunkStartPos + lastChunkSize; validFileLength = lastChunkStartPos + lastChunkSize;
} else { // last chunck is corrupt } else { // last chunk is corrupt
validFileLength = lastChunkStartPos; validFileLength = lastChunkStartPos;
} }
// truncate if extra bytes are present without CRC
// truncate if extra bytes are present without CRC if (blockFile.length() > validFileLength) {
if (blockFile.length() > validFileLength) { try (RandomAccessFile blockRAF =
RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw"); new RandomAccessFile(blockFile, "rw")) {
try { // truncate blockFile
// truncate blockFile blockRAF.setLength(validFileLength);
blockRAF.setLength(validFileLength); }
} finally { }
blockRAF.close(); return validFileLength;
} }
} }
return validFileLength;
} catch (IOException e) { } catch (IOException e) {
FsDatasetImpl.LOG.warn(e); FsDatasetImpl.LOG.warn(e);
return 0; return 0;
} finally {
IOUtils.closeStream(checksumIn);
IOUtils.closeStream(blockIn);
} }
} }

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File; import java.io.File;
import java.io.FileDescriptor;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -38,9 +37,9 @@
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIOException; import org.apache.hadoop.io.nativeio.NativeIOException;
/** /**
@ -202,13 +201,13 @@ synchronized void shutdown() {
} }
public void submitSyncFileRangeRequest(FsVolumeImpl volume, public void submitSyncFileRangeRequest(FsVolumeImpl volume,
final FileDescriptor fd, final long offset, final long nbytes, final ReplicaOutputStreams streams, final long offset, final long nbytes,
final int flags) { final int flags) {
execute(volume, new Runnable() { execute(volume, new Runnable() {
@Override @Override
public void run() { public void run() {
try { try {
NativeIO.POSIX.syncFileRangeIfPossible(fd, offset, nbytes, flags); streams.syncFileRangeIfPossible(offset, nbytes, flags);
} catch (NativeIOException e) { } catch (NativeIOException e) {
LOG.warn("sync_file_range error", e); LOG.warn("sync_file_range error", e);
} }

View File

@ -21,7 +21,6 @@
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.EOFException; import java.io.EOFException;
import java.io.File; import java.io.File;
import java.io.FileDescriptor;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
@ -2755,9 +2754,9 @@ public void onFailLazyPersist(String bpId, long blockId) {
@Override @Override
public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
FileDescriptor fd, long offset, long nbytes, int flags) { ReplicaOutputStreams outs, long offset, long nbytes, int flags) {
FsVolumeImpl fsVolumeImpl = this.getVolume(block); FsVolumeImpl fsVolumeImpl = this.getVolume(block);
asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, fd, offset, asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, outs, offset,
nbytes, flags); nbytes, flags);
} }

View File

@ -1067,7 +1067,7 @@ void deleteBPDirectories(String bpid, boolean force) throws IOException {
DataStorage.STORAGE_DIR_LAZY_PERSIST); DataStorage.STORAGE_DIR_LAZY_PERSIST);
File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW); File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
if (force) { if (force) {
FileUtil.fullyDelete(bpDir); DataStorage.fullyDelete(bpDir);
} else { } else {
if (!rbwDir.delete()) { if (!rbwDir.delete()) {
throw new IOException("Failed to delete " + rbwDir); throw new IOException("Failed to delete " + rbwDir);
@ -1081,7 +1081,7 @@ void deleteBPDirectories(String bpid, boolean force) throws IOException {
!FileUtil.fullyDelete(lazypersistDir)))) { !FileUtil.fullyDelete(lazypersistDir)))) {
throw new IOException("Failed to delete " + lazypersistDir); throw new IOException("Failed to delete " + lazypersistDir);
} }
FileUtil.fullyDelete(tmpDir); DataStorage.fullyDelete(tmpDir);
for (File f : FileUtil.listFiles(bpCurrentDir)) { for (File f : FileUtil.listFiles(bpCurrentDir)) {
if (!f.delete()) { if (!f.delete()) {
throw new IOException("Failed to delete " + f); throw new IOException("Failed to delete " + f);
@ -1437,4 +1437,3 @@ public ReplicaInfo activateSavedReplica(String bpid,
replicaState); replicaState);
} }
} }

View File

@ -701,7 +701,7 @@ public void testConcurrentAppendRead()
ReplicaBeingWritten rbw = ReplicaBeingWritten rbw =
(ReplicaBeingWritten)replicaHandler.getReplica(); (ReplicaBeingWritten)replicaHandler.getReplica();
ReplicaOutputStreams ReplicaOutputStreams
outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM); outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM, 300);
OutputStream dataOutput = outputStreams.getDataOut(); OutputStream dataOutput = outputStreams.getDataOut();
byte[] appendBytes = new byte[1]; byte[] appendBytes = new byte[1];

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import java.io.File; import java.io.File;
import java.io.FileDescriptor;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
@ -261,14 +260,15 @@ synchronized boolean isFinalized() {
@Override @Override
synchronized public ReplicaOutputStreams createStreams(boolean isCreate, synchronized public ReplicaOutputStreams createStreams(boolean isCreate,
DataChecksum requestedChecksum) throws IOException { DataChecksum requestedChecksum, long slowLogThresholdMs)
throws IOException {
if (finalized) { if (finalized) {
throw new IOException("Trying to write to a finalized replica " throw new IOException("Trying to write to a finalized replica "
+ theBlock); + theBlock);
} else { } else {
SimulatedOutputStream crcStream = new SimulatedOutputStream(); SimulatedOutputStream crcStream = new SimulatedOutputStream();
return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum, return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum,
volume.isTransientStorage()); volume.isTransientStorage(), slowLogThresholdMs);
} }
} }
@ -1364,7 +1364,7 @@ public synchronized void removeVolumes(Collection<StorageLocation> volumes,
@Override @Override
public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
FileDescriptor fd, long offset, long nbytes, int flags) { ReplicaOutputStreams outs, long offset, long nbytes, int flags) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@ -673,7 +673,7 @@ public void testNotMatchedReplicaID() throws IOException {
ReplicaOutputStreams streams = null; ReplicaOutputStreams streams = null;
try { try {
streams = replicaInfo.createStreams(true, streams = replicaInfo.createStreams(true,
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300);
streams.getChecksumOut().write('a'); streams.getChecksumOut().write('a');
dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1)); dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1));
BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous = BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =

View File

@ -83,7 +83,7 @@ static int addSomeBlocks(SimulatedFSDataset fsdataset, long startingBlockId,
ReplicaInPipeline bInfo = fsdataset.createRbw( ReplicaInPipeline bInfo = fsdataset.createRbw(
StorageType.DEFAULT, b, false).getReplica(); StorageType.DEFAULT, b, false).getReplica();
ReplicaOutputStreams out = bInfo.createStreams(true, ReplicaOutputStreams out = bInfo.createStreams(true,
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300);
try { try {
OutputStream dataOut = out.getDataOut(); OutputStream dataOut = out.getDataOut();
assertEquals(0, fsdataset.getLength(b)); assertEquals(0, fsdataset.getLength(b));

View File

@ -318,8 +318,8 @@ public void clearRollingUpgradeMarker(String bpid) throws IOException {
} }
@Override @Override
public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, FileDescriptor fd, long offset, long nbytes, int flags) { public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
ReplicaOutputStreams outs, long offset, long nbytes, int flags) {
} }
@Override @Override

View File

@ -58,8 +58,10 @@ public ChunkChecksum getLastChecksumAndDataLen() {
@Override @Override
public ReplicaOutputStreams createStreams(boolean isCreate, public ReplicaOutputStreams createStreams(boolean isCreate,
DataChecksum requestedChecksum) throws IOException { DataChecksum requestedChecksum, long slowLogThresholdMs)
return new ReplicaOutputStreams(null, null, requestedChecksum, false); throws IOException {
return new ReplicaOutputStreams(null, null, requestedChecksum, false,
slowLogThresholdMs);
} }
@Override @Override