From df983b524ab68ea0c70cee9033bfff2d28052cbf Mon Sep 17 00:00:00 2001 From: Xiaoyu Yao Date: Mon, 5 Dec 2016 13:04:39 -0800 Subject: [PATCH] HDFS-10930. Refactor: Wrap Datanode IO related operations. Contributed by Xiaoyu Yao. --- .../hdfs/server/datanode/BlockReceiver.java | 66 +++---- .../hdfs/server/datanode/BlockSender.java | 105 ++++------ .../hadoop/hdfs/server/datanode/DNConf.java | 4 + .../hdfs/server/datanode/DataStorage.java | 5 + .../hdfs/server/datanode/LocalReplica.java | 179 ++++++++++++------ .../datanode/LocalReplicaInPipeline.java | 30 +-- .../server/datanode/ReplicaInPipeline.java | 4 +- .../datanode/fsdataset/FsDatasetSpi.java | 3 +- .../fsdataset/ReplicaInputStreams.java | 102 +++++++++- .../fsdataset/ReplicaOutputStreams.java | 107 ++++++++++- .../fsdataset/impl/BlockPoolSlice.java | 97 +++++----- .../impl/FsDatasetAsyncDiskService.java | 7 +- .../fsdataset/impl/FsDatasetImpl.java | 5 +- .../datanode/fsdataset/impl/FsVolumeImpl.java | 5 +- .../apache/hadoop/hdfs/TestFileAppend.java | 2 +- .../server/datanode/SimulatedFSDataset.java | 8 +- .../server/datanode/TestBlockRecovery.java | 2 +- .../datanode/TestSimulatedFSDataset.java | 2 +- .../extdataset/ExternalDatasetImpl.java | 4 +- .../extdataset/ExternalReplicaInPipeline.java | 6 +- 20 files changed, 470 insertions(+), 273 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 39419c1e013..f372072a74c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -24,10 +24,7 @@ import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; -import java.io.FileDescriptor; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Writer; import java.nio.ByteBuffer; @@ -53,7 +50,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.StringUtils; @@ -88,8 +84,6 @@ class BlockReceiver implements Closeable { * the DataNode needs to recalculate checksums before writing. */ private final boolean needsChecksumTranslation; - private OutputStream out = null; // to block file at local disk - private FileDescriptor outFd; private DataOutputStream checksumOut = null; // to crc file at local disk private final int bytesPerChecksum; private final int checksumSize; @@ -250,7 +244,8 @@ class BlockReceiver implements Closeable { final boolean isCreate = isDatanode || isTransfer || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE; - streams = replicaInfo.createStreams(isCreate, requestedChecksum); + streams = replicaInfo.createStreams(isCreate, requestedChecksum, + datanodeSlowLogThresholdMs); assert streams != null : "null streams!"; // read checksum meta information @@ -260,13 +255,6 @@ class BlockReceiver implements Closeable { this.bytesPerChecksum = diskChecksum.getBytesPerChecksum(); this.checksumSize = diskChecksum.getChecksumSize(); - this.out = streams.getDataOut(); - if (out instanceof FileOutputStream) { - this.outFd = ((FileOutputStream)out).getFD(); - } else { - LOG.warn("Could not get file descriptor for outputstream of class " + - out.getClass()); - } this.checksumOut = new DataOutputStream(new BufferedOutputStream( streams.getChecksumOut(), DFSUtilClient.getSmallBufferSize( datanode.getConf()))); @@ -319,7 +307,7 @@ class BlockReceiver implements Closeable { packetReceiver.close(); IOException ioe = null; - if (syncOnClose && (out != null || checksumOut != null)) { + if (syncOnClose && (streams.getDataOut() != null || checksumOut != null)) { datanode.metrics.incrFsyncCount(); } long flushTotalNanos = 0; @@ -348,9 +336,9 @@ class BlockReceiver implements Closeable { } // close block file try { - if (out != null) { + if (streams.getDataOut() != null) { long flushStartNanos = System.nanoTime(); - out.flush(); + streams.flushDataOut(); long flushEndNanos = System.nanoTime(); if (syncOnClose) { long fsyncStartNanos = flushEndNanos; @@ -359,14 +347,13 @@ class BlockReceiver implements Closeable { } flushTotalNanos += flushEndNanos - flushStartNanos; measuredFlushTime = true; - out.close(); - out = null; + streams.closeDataStream(); } } catch (IOException e) { ioe = e; } finally{ - IOUtils.closeStream(out); + streams.close(); } if (replicaHandler != null) { IOUtils.cleanup(null, replicaHandler); @@ -419,9 +406,9 @@ class BlockReceiver implements Closeable { } flushTotalNanos += flushEndNanos - flushStartNanos; } - if (out != null) { + if (streams.getDataOut() != null) { long flushStartNanos = System.nanoTime(); - out.flush(); + streams.flushDataOut(); long flushEndNanos = System.nanoTime(); if (isSync) { long fsyncStartNanos = flushEndNanos; @@ -430,10 +417,10 @@ class BlockReceiver implements Closeable { } flushTotalNanos += flushEndNanos - flushStartNanos; } - if (checksumOut != null || out != null) { + if (checksumOut != null || streams.getDataOut() != null) { datanode.metrics.addFlushNanos(flushTotalNanos); if (isSync) { - datanode.metrics.incrFsyncCount(); + datanode.metrics.incrFsyncCount(); } } long duration = Time.monotonicNow() - begin; @@ -716,16 +703,12 @@ class BlockReceiver implements Closeable { int numBytesToDisk = (int)(offsetInBlock-onDiskLen); // Write data to disk. - long begin = Time.monotonicNow(); - out.write(dataBuf.array(), startByteToDisk, numBytesToDisk); - long duration = Time.monotonicNow() - begin; + long duration = streams.writeToDisk(dataBuf.array(), + startByteToDisk, numBytesToDisk); + if (duration > maxWriteToDiskMs) { maxWriteToDiskMs = duration; } - if (duration > datanodeSlowLogThresholdMs) { - LOG.warn("Slow BlockReceiver write data to disk cost:" + duration - + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)"); - } final byte[] lastCrc; if (shouldNotWriteChecksum) { @@ -842,7 +825,7 @@ class BlockReceiver implements Closeable { private void manageWriterOsCache(long offsetInBlock) { try { - if (outFd != null && + if (streams.getOutFd() != null && offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) { long begin = Time.monotonicNow(); // @@ -857,12 +840,11 @@ class BlockReceiver implements Closeable { if (syncBehindWrites) { if (syncBehindWritesInBackground) { this.datanode.getFSDataset().submitBackgroundSyncFileRangeRequest( - block, outFd, lastCacheManagementOffset, + block, streams, lastCacheManagementOffset, offsetInBlock - lastCacheManagementOffset, SYNC_FILE_RANGE_WRITE); } else { - NativeIO.POSIX.syncFileRangeIfPossible(outFd, - lastCacheManagementOffset, + streams.syncFileRangeIfPossible(lastCacheManagementOffset, offsetInBlock - lastCacheManagementOffset, SYNC_FILE_RANGE_WRITE); } @@ -879,8 +861,8 @@ class BlockReceiver implements Closeable { // long dropPos = lastCacheManagementOffset - CACHE_DROP_LAG_BYTES; if (dropPos > 0 && dropCacheBehindWrites) { - NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( - block.getBlockName(), outFd, 0, dropPos, POSIX_FADV_DONTNEED); + streams.dropCacheBehindWrites(block.getBlockName(), 0, dropPos, + POSIX_FADV_DONTNEED); } lastCacheManagementOffset = offsetInBlock; long duration = Time.monotonicNow() - begin; @@ -989,7 +971,7 @@ class BlockReceiver implements Closeable { // The worst case is not recovering this RBW replica. // Client will fall back to regular pipeline recovery. } finally { - IOUtils.closeStream(out); + IOUtils.closeStream(streams.getDataOut()); } try { // Even if the connection is closed after the ack packet is @@ -1047,8 +1029,8 @@ class BlockReceiver implements Closeable { * will be overwritten. */ private void adjustCrcFilePosition() throws IOException { - if (out != null) { - out.flush(); + if (streams.getDataOut() != null) { + streams.flushDataOut(); } if (checksumOut != null) { checksumOut.flush(); @@ -1094,10 +1076,10 @@ class BlockReceiver implements Closeable { byte[] crcbuf = new byte[checksumSize]; try (ReplicaInputStreams instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff)) { - IOUtils.readFully(instr.getDataIn(), buf, 0, sizePartialChunk); + instr.readDataFully(buf, 0, sizePartialChunk); // open meta file and read in crc value computer earlier - IOUtils.readFully(instr.getChecksumIn(), crcbuf, 0, crcbuf.length); + instr.readChecksumFully(crcbuf, 0, crcbuf.length); } // compute crc of partial chunk from data read in the block file. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index a1b1f86dbfa..9182c88ce2b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; -import java.io.FileDescriptor; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -42,11 +41,11 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; -import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.net.SocketOutputStream; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DataChecksum; @@ -120,12 +119,11 @@ class BlockSender implements java.io.Closeable { /** the block to read from */ private final ExtendedBlock block; - /** Stream to read block data from */ - private InputStream blockIn; + + /** InputStreams and file descriptors to read block/checksum. */ + private ReplicaInputStreams ris; /** updated while using transferTo() */ private long blockInPosition = -1; - /** Stream to read checksum */ - private DataInputStream checksumIn; /** Checksum utility */ private final DataChecksum checksum; /** Initial position to read */ @@ -152,11 +150,6 @@ class BlockSender implements java.io.Closeable { private final String clientTraceFmt; private volatile ChunkChecksum lastChunkChecksum = null; private DataNode datanode; - - /** The file descriptor of the block being sent */ - private FileDescriptor blockInFd; - /** The reference to the volume where the block is located */ - private FsVolumeReference volumeRef; /** The replica of the block that is being read. */ private final Replica replica; @@ -201,6 +194,9 @@ class BlockSender implements java.io.Closeable { boolean sendChecksum, DataNode datanode, String clientTraceFmt, CachingStrategy cachingStrategy) throws IOException { + InputStream blockIn = null; + DataInputStream checksumIn = null; + FsVolumeReference volumeRef = null; try { this.block = block; this.corruptChecksumOk = corruptChecksumOk; @@ -281,7 +277,7 @@ class BlockSender implements java.io.Closeable { (!is32Bit || length <= Integer.MAX_VALUE); // Obtain a reference before reading data - this.volumeRef = datanode.data.getVolume(block).obtainReference(); + volumeRef = datanode.data.getVolume(block).obtainReference(); /* * (corruptChecksumOK, meta_file_exist): operation @@ -405,14 +401,9 @@ class BlockSender implements java.io.Closeable { DataNode.LOG.debug("replica=" + replica); } blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset - if (blockIn instanceof FileInputStream) { - blockInFd = ((FileInputStream)blockIn).getFD(); - } else { - blockInFd = null; - } + ris = new ReplicaInputStreams(blockIn, checksumIn, volumeRef); } catch (IOException ioe) { IOUtils.closeStream(this); - IOUtils.closeStream(blockIn); throw ioe; } } @@ -422,12 +413,11 @@ class BlockSender implements java.io.Closeable { */ @Override public void close() throws IOException { - if (blockInFd != null && + if (ris.getDataInFd() != null && ((dropCacheBehindAllReads) || (dropCacheBehindLargeReads && isLongRead()))) { try { - NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( - block.getBlockName(), blockInFd, lastCacheDropOffset, + ris.dropCacheBehindReads(block.getBlockName(), lastCacheDropOffset, offset - lastCacheDropOffset, POSIX_FADV_DONTNEED); } catch (Exception e) { LOG.warn("Unable to drop cache on file close", e); @@ -436,32 +426,12 @@ class BlockSender implements java.io.Closeable { if (curReadahead != null) { curReadahead.cancel(); } - - IOException ioe = null; - if(checksumIn!=null) { - try { - checksumIn.close(); // close checksum file - } catch (IOException e) { - ioe = e; - } - checksumIn = null; - } - if(blockIn!=null) { - try { - blockIn.close(); // close data file - } catch (IOException e) { - ioe = e; - } - blockIn = null; - blockInFd = null; - } - if (volumeRef != null) { - IOUtils.cleanup(null, volumeRef); - volumeRef = null; - } - // throw IOException if there is any - if(ioe!= null) { - throw ioe; + + try { + ris.closeStreams(); + } finally { + IOUtils.closeStream(ris); + ris = null; } } @@ -565,7 +535,7 @@ class BlockSender implements java.io.Closeable { int checksumOff = pkt.position(); byte[] buf = pkt.array(); - if (checksumSize > 0 && checksumIn != null) { + if (checksumSize > 0 && ris.getChecksumIn() != null) { readChecksum(buf, checksumOff, checksumDataLen); // write in progress that we need to use to get last checksum @@ -581,7 +551,7 @@ class BlockSender implements java.io.Closeable { int dataOff = checksumOff + checksumDataLen; if (!transferTo) { // normal transfer - IOUtils.readFully(blockIn, buf, dataOff, dataLen); + ris.readDataFully(buf, dataOff, dataLen); if (verifyChecksum) { verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff); @@ -593,12 +563,12 @@ class BlockSender implements java.io.Closeable { SocketOutputStream sockOut = (SocketOutputStream)out; // First write header and checksums sockOut.write(buf, headerOff, dataOff - headerOff); - + // no need to flush since we know out is not a buffered stream - FileChannel fileCh = ((FileInputStream)blockIn).getChannel(); + FileChannel fileCh = ((FileInputStream)ris.getDataIn()).getChannel(); LongWritable waitTime = new LongWritable(); LongWritable transferTime = new LongWritable(); - sockOut.transferToFully(fileCh, blockInPosition, dataLen, + sockOut.transferToFully(fileCh, blockInPosition, dataLen, waitTime, transferTime); datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get()); datanode.metrics.addSendDataPacketTransferNanos(transferTime.get()); @@ -630,7 +600,7 @@ class BlockSender implements java.io.Closeable { if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) { LOG.error("BlockSender.sendChunks() exception: ", e); datanode.getBlockScanner().markSuspectBlock( - volumeRef.getVolume().getStorageID(), + ris.getVolumeRef().getVolume().getStorageID(), block); } } @@ -653,16 +623,15 @@ class BlockSender implements java.io.Closeable { */ private void readChecksum(byte[] buf, final int checksumOffset, final int checksumLen) throws IOException { - if (checksumSize <= 0 && checksumIn == null) { + if (checksumSize <= 0 && ris.getChecksumIn() == null) { return; } try { - checksumIn.readFully(buf, checksumOffset, checksumLen); + ris.readChecksumFully(buf, checksumOffset, checksumLen); } catch (IOException e) { LOG.warn(" Could not read or failed to verify checksum for data" + " at offset " + offset + " for block " + block, e); - IOUtils.closeStream(checksumIn); - checksumIn = null; + ris.closeChecksumStream(); if (corruptChecksumOk) { if (checksumOffset < checksumLen) { // Just fill the array with zeros. @@ -746,10 +715,10 @@ class BlockSender implements java.io.Closeable { lastCacheDropOffset = initialOffset; - if (isLongRead() && blockInFd != null) { + if (isLongRead() && ris.getDataInFd() != null) { // Advise that this file descriptor will be accessed sequentially. - NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( - block.getBlockName(), blockInFd, 0, 0, POSIX_FADV_SEQUENTIAL); + ris.dropCacheBehindReads(block.getBlockName(), 0, 0, + POSIX_FADV_SEQUENTIAL); } // Trigger readahead of beginning of file if configured. @@ -761,9 +730,10 @@ class BlockSender implements java.io.Closeable { int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN; boolean transferTo = transferToAllowed && !verifyChecksum && baseStream instanceof SocketOutputStream - && blockIn instanceof FileInputStream; + && ris.getDataIn() instanceof FileInputStream; if (transferTo) { - FileChannel fileChannel = ((FileInputStream)blockIn).getChannel(); + FileChannel fileChannel = + ((FileInputStream)ris.getDataIn()).getChannel(); blockInPosition = fileChannel.position(); streamForSendChunks = baseStream; maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE); @@ -818,14 +788,16 @@ class BlockSender implements java.io.Closeable { private void manageOsCache() throws IOException { // We can't manage the cache for this block if we don't have a file // descriptor to work with. - if (blockInFd == null) return; + if (ris.getDataInFd() == null) { + return; + } // Perform readahead if necessary if ((readaheadLength > 0) && (datanode.readaheadPool != null) && (alwaysReadahead || isLongRead())) { curReadahead = datanode.readaheadPool.readaheadStream( - clientTraceFmt, blockInFd, offset, readaheadLength, Long.MAX_VALUE, - curReadahead); + clientTraceFmt, ris.getDataInFd(), offset, readaheadLength, + Long.MAX_VALUE, curReadahead); } // Drop what we've just read from cache, since we aren't @@ -835,8 +807,7 @@ class BlockSender implements java.io.Closeable { long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES; if (offset >= nextCacheDropOffset) { long dropLength = offset - lastCacheDropOffset; - NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( - block.getBlockName(), blockInFd, lastCacheDropOffset, + ris.dropCacheBehindReads(block.getBlockName(), lastCacheDropOffset, dropLength, POSIX_FADV_DONTNEED); lastCacheDropOffset = offset; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 823d05c425d..c1487b1a4a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -402,6 +402,10 @@ public class DNConf { return volsConfigured; } + public long getSlowIoWarningThresholdMs() { + return datanodeSlowIoWarningThresholdMs; + } + int getMaxDataLength() { return maxDataLength; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index 29b14e731ee..f4deb6df350 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -1355,4 +1355,9 @@ public class DataStorage extends Storage { synchronized void removeBlockPoolStorage(String bpId) { bpStorageMap.remove(bpId); } + + public static boolean fullyDelete(final File dir) { + boolean result = FileUtil.fullyDelete(dir); + return result; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java index f8291112ec7..e6f7e12d924 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java @@ -29,9 +29,6 @@ import java.net.URI; import java.util.HashMap; import java.util.Map; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.fs.LocalFileSystem; @@ -46,6 +43,8 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.DataChecksum; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; @@ -69,15 +68,7 @@ abstract public class LocalReplica extends ReplicaInfo { private static final Map internedBaseDirs = new HashMap(); - static final Log LOG = LogFactory.getLog(LocalReplica.class); - private final static boolean IS_NATIVE_IO_AVAIL; - static { - IS_NATIVE_IO_AVAIL = NativeIO.isAvailable(); - if (Path.WINDOWS && !IS_NATIVE_IO_AVAIL) { - LOG.warn("Data node cannot fully support concurrent reading" - + " and writing without native code extensions on Windows."); - } - } + static final Logger LOG = LoggerFactory.getLogger(LocalReplica.class); /** * Constructor @@ -199,14 +190,14 @@ abstract public class LocalReplica extends ReplicaInfo { File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file)); try (FileInputStream in = new FileInputStream(file)) { try (FileOutputStream out = new FileOutputStream(tmpFile)){ - IOUtils.copyBytes(in, out, 16 * 1024); + copyBytes(in, out, 16 * 1024); } if (file.length() != tmpFile.length()) { throw new IOException("Copy of file " + file + " size " + file.length()+ " into file " + tmpFile + " resulted in a size of " + tmpFile.length()); } - FileUtil.replaceFile(tmpFile, file); + replaceFile(tmpFile, file); } catch (IOException e) { boolean done = tmpFile.delete(); if (!done) { @@ -241,13 +232,13 @@ abstract public class LocalReplica extends ReplicaInfo { } File meta = getMetaFile(); - int linkCount = HardLink.getLinkCount(file); + int linkCount = getHardLinkCount(file); if (linkCount > 1) { DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " + "block " + this); breakHardlinks(file, this); } - if (HardLink.getLinkCount(meta) > 1) { + if (getHardLinkCount(meta) > 1) { breakHardlinks(meta, this); } return true; @@ -260,18 +251,7 @@ abstract public class LocalReplica extends ReplicaInfo { @Override public InputStream getDataInputStream(long seekOffset) throws IOException { - - File blockFile = getBlockFile(); - if (IS_NATIVE_IO_AVAIL) { - return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset); - } else { - try { - return FsDatasetUtil.openAndSeek(blockFile, seekOffset); - } catch (FileNotFoundException fnfe) { - throw new IOException("Block " + this + " is not valid. " + - "Expected block file at " + blockFile + " does not exist."); - } - } + return getDataInputStream(getBlockFile(), seekOffset); } @Override @@ -286,7 +266,7 @@ abstract public class LocalReplica extends ReplicaInfo { @Override public boolean deleteBlockData() { - return getBlockFile().delete(); + return fullyDelete(getBlockFile()); } @Override @@ -320,7 +300,7 @@ abstract public class LocalReplica extends ReplicaInfo { @Override public boolean deleteMetadata() { - return getMetaFile().delete(); + return fullyDelete(getMetaFile()); } @Override @@ -340,7 +320,7 @@ abstract public class LocalReplica extends ReplicaInfo { private boolean renameFile(File srcfile, File destfile) throws IOException { try { - NativeIO.renameTo(srcfile, destfile); + rename(srcfile, destfile); return true; } catch (IOException e) { throw new IOException("Failed to move block file for " + this @@ -367,22 +347,14 @@ abstract public class LocalReplica extends ReplicaInfo { @Override public boolean getPinning(LocalFileSystem localFS) throws IOException { - FileStatus fss = - localFS.getFileStatus(new Path(getBlockFile().getAbsolutePath())); - return fss.getPermission().getStickyBit(); + return getPinning(localFS, new Path(getBlockFile().getAbsolutePath())); } @Override public void setPinning(LocalFileSystem localFS) throws IOException { File f = getBlockFile(); Path p = new Path(f.getAbsolutePath()); - - FsPermission oldPermission = localFS.getFileStatus( - new Path(f.getAbsolutePath())).getPermission(); - //sticky bit is used for pinning purpose - FsPermission permission = new FsPermission(oldPermission.getUserAction(), - oldPermission.getGroupAction(), oldPermission.getOtherAction(), true); - localFS.setPermission(p, permission); + setPinning(localFS, p); } @Override @@ -398,7 +370,7 @@ abstract public class LocalReplica extends ReplicaInfo { } try { // calling renameMeta on the ReplicaInfo doesn't work here - NativeIO.renameTo(oldmeta, newmeta); + rename(oldmeta, newmeta); } catch (IOException e) { setGenerationStamp(oldGS); // restore old GS throw new IOException("Block " + this + " reopen failed. " + @@ -417,7 +389,113 @@ abstract public class LocalReplica extends ReplicaInfo { return info.getBlockFile().compareTo(getBlockFile()); } - static public void truncateBlock(File blockFile, File metaFile, + @Override + public void copyMetadata(URI destination) throws IOException { + //for local replicas, we assume the destination URI is file + nativeCopyFileUnbuffered(getMetaFile(), new File(destination), true); + } + + @Override + public void copyBlockdata(URI destination) throws IOException { + //for local replicas, we assume the destination URI is file + nativeCopyFileUnbuffered(getBlockFile(), new File(destination), true); + } + + public void renameMeta(File newMetaFile) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Renaming " + getMetaFile() + " to " + newMetaFile); + } + renameFile(getMetaFile(), newMetaFile); + } + + public void renameBlock(File newBlockFile) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Renaming " + getBlockFile() + " to " + newBlockFile + + ", file length=" + getBlockFile().length()); + } + renameFile(getBlockFile(), newBlockFile); + } + + public static void rename(File from, File to) throws IOException { + Storage.rename(from, to); + } + + /** + * Get input stream for a local file and optionally seek to the offset. + * @param f path to the file + * @param seekOffset offset to seek + * @return input stream for read + * @throws IOException + */ + private FileInputStream getDataInputStream(File f, long seekOffset) + throws IOException { + FileInputStream fis; + if (NativeIO.isAvailable()) { + fis = NativeIO.getShareDeleteFileInputStream(f, seekOffset); + } else { + try { + fis = FsDatasetUtil.openAndSeek(f, seekOffset); + } catch (FileNotFoundException fnfe) { + throw new IOException("Expected block file at " + f + + " does not exist."); + } + } + return fis; + } + + private void nativeCopyFileUnbuffered(File srcFile, File destFile, + boolean preserveFileDate) throws IOException { + Storage.nativeCopyFileUnbuffered(srcFile, destFile, preserveFileDate); + } + + private void copyBytes(InputStream in, OutputStream out, int + buffSize) throws IOException{ + IOUtils.copyBytes(in, out, buffSize); + } + + private void replaceFile(File src, File target) throws IOException { + FileUtil.replaceFile(src, target); + } + + public static boolean fullyDelete(final File dir) { + boolean result = DataStorage.fullyDelete(dir); + return result; + } + + public static int getHardLinkCount(File fileName) throws IOException { + int linkCount = HardLink.getLinkCount(fileName); + return linkCount; + } + + /** + * Get pin status of a file by checking the sticky bit. + * @param localFS local file system + * @param path path to be checked + * @return true if the file is pinned with sticky bit + * @throws IOException + */ + public boolean getPinning(LocalFileSystem localFS, Path path) throws + IOException { + boolean stickyBit = + localFS.getFileStatus(path).getPermission().getStickyBit(); + return stickyBit; + } + + /** + * Set sticky bit on path to pin file. + * @param localFS local file system + * @param path path to be pinned with sticky bit + * @throws IOException + */ + public void setPinning(LocalFileSystem localFS, Path path) throws + IOException { + FsPermission oldPermission = localFS.getFileStatus(path).getPermission(); + FsPermission permission = new FsPermission(oldPermission.getUserAction(), + oldPermission.getGroupAction(), oldPermission.getOtherAction(), true); + localFS.setPermission(path, permission); + } + + public static void truncateBlock(File blockFile, File metaFile, long oldlen, long newlen) throws IOException { LOG.info("truncateBlock: blockFile=" + blockFile + ", metaFile=" + metaFile @@ -467,19 +545,4 @@ abstract public class LocalReplica extends ReplicaInfo { metaRAF.close(); } } - - @Override - public void copyMetadata(URI destination) throws IOException { - //for local replicas, we assume the destination URI is file - Storage.nativeCopyFileUnbuffered(getMetaFile(), - new File(destination), true); - } - - @Override - public void copyBlockdata(URI destination) throws IOException { - //for local replicas, we assume the destination URI is file - Storage.nativeCopyFileUnbuffered(getBlockFile(), - new File(destination), true); - } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java index bc7bc6dde38..1387155f17e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java @@ -30,7 +30,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.StringUtils; @@ -246,7 +245,8 @@ public class LocalReplicaInPipeline extends LocalReplica @Override // ReplicaInPipeline public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum) throws IOException { + DataChecksum requestedChecksum, long slowLogThresholdMs) + throws IOException { File blockFile = getBlockFile(); File metaFile = getMetaFile(); if (DataNode.LOG.isDebugEnabled()) { @@ -313,7 +313,7 @@ public class LocalReplicaInPipeline extends LocalReplica crcOut.getChannel().position(crcDiskSize); } return new ReplicaOutputStreams(blockOut, crcOut, checksum, - getVolume().isTransientStorage()); + getVolume().isTransientStorage(), slowLogThresholdMs); } catch (IOException e) { IOUtils.closeStream(blockOut); IOUtils.closeStream(metaRAF); @@ -373,40 +373,30 @@ public class LocalReplicaInPipeline extends LocalReplica + " should be derived from LocalReplica"); } - LocalReplica localReplica = (LocalReplica) oldReplicaInfo; - - File oldmeta = localReplica.getMetaFile(); + LocalReplica oldReplica = (LocalReplica) oldReplicaInfo; + File oldmeta = oldReplica.getMetaFile(); File newmeta = getMetaFile(); - if (LOG.isDebugEnabled()) { - LOG.debug("Renaming " + oldmeta + " to " + newmeta); - } try { - NativeIO.renameTo(oldmeta, newmeta); + oldReplica.renameMeta(newmeta); } catch (IOException e) { throw new IOException("Block " + oldReplicaInfo + " reopen failed. " + " Unable to move meta file " + oldmeta + " to rbw dir " + newmeta, e); } - File blkfile = localReplica.getBlockFile(); - - if (LOG.isDebugEnabled()) { - LOG.debug("Renaming " + blkfile + " to " + newBlkFile - + ", file length=" + blkfile.length()); - } try { - NativeIO.renameTo(blkfile, newBlkFile); + oldReplica.renameBlock(newBlkFile); } catch (IOException e) { try { - NativeIO.renameTo(newmeta, oldmeta); + renameMeta(oldmeta); } catch (IOException ex) { LOG.warn("Cannot move meta file " + newmeta + "back to the finalized directory " + oldmeta, ex); } throw new IOException("Block " + oldReplicaInfo + " reopen failed. " + - " Unable to move block file " + blkfile + - " to rbw dir " + newBlkFile, e); + " Unable to move block file " + oldReplica.getBlockFile() + + " to rbw dir " + newBlkFile, e); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java index efa6ea686f4..5fdbec02f1a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java @@ -69,11 +69,13 @@ public interface ReplicaInPipeline extends Replica { * * @param isCreate if it is for creation * @param requestedChecksum the checksum the writer would prefer to use + * @param slowLogThresholdMs slow io threshold for logging * @return output streams for writing * @throws IOException if any error occurs */ public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum) throws IOException; + DataChecksum requestedChecksum, long slowLogThresholdMs) + throws IOException; /** * Create an output stream to write restart metadata in case of datanode diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 57ec2b4bf8c..30f045f2e1b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -21,7 +21,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.Closeable; import java.io.EOFException; import java.io.File; -import java.io.FileDescriptor; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -605,7 +604,7 @@ public interface FsDatasetSpi extends FSDatasetMBean { * submit a sync_file_range request to AsyncDiskService. */ void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block, - final FileDescriptor fd, final long offset, final long nbytes, + final ReplicaOutputStreams outs, final long offset, final long nbytes, final int flags); /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java index 227179d4f84..54d0e964b5c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java @@ -18,24 +18,45 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.Closeable; +import java.io.FileDescriptor; +import java.io.FileInputStream; import java.io.InputStream; +import java.io.IOException; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.io.nativeio.NativeIOException; +import org.slf4j.Logger; /** * Contains the input streams for the data and checksum of a replica. */ public class ReplicaInputStreams implements Closeable { - private final InputStream dataIn; - private final InputStream checksumIn; - private final FsVolumeReference volumeRef; + public static final Logger LOG = DataNode.LOG; + + private InputStream dataIn; + private InputStream checksumIn; + private FsVolumeReference volumeRef; + private FileDescriptor dataInFd = null; /** Create an object with a data input stream and a checksum input stream. */ - public ReplicaInputStreams(InputStream dataStream, InputStream checksumStream, - FsVolumeReference volumeRef) { + public ReplicaInputStreams(InputStream dataStream, + InputStream checksumStream, FsVolumeReference volumeRef) { this.volumeRef = volumeRef; this.dataIn = dataStream; this.checksumIn = checksumStream; + if (dataIn instanceof FileInputStream) { + try { + dataInFd = ((FileInputStream) dataIn).getFD(); + } catch (Exception e) { + LOG.warn("Could not get file descriptor for inputstream of class " + + this.dataIn.getClass()); + } + } else { + LOG.debug("Could not get file descriptor for inputstream of class " + + this.dataIn.getClass()); + } } /** @return the data input stream. */ @@ -48,10 +69,81 @@ public class ReplicaInputStreams implements Closeable { return checksumIn; } + public FileDescriptor getDataInFd() { + return dataInFd; + } + + public FsVolumeReference getVolumeRef() { + return volumeRef; + } + + public void readDataFully(byte[] buf, int off, int len) + throws IOException { + IOUtils.readFully(dataIn, buf, off, len); + } + + public void readChecksumFully(byte[] buf, int off, int len) + throws IOException { + IOUtils.readFully(checksumIn, buf, off, len); + } + + public void skipDataFully(long len) throws IOException { + IOUtils.skipFully(dataIn, len); + } + + public void skipChecksumFully(long len) throws IOException { + IOUtils.skipFully(checksumIn, len); + } + + public void closeChecksumStream() throws IOException { + IOUtils.closeStream(checksumIn); + checksumIn = null; + } + + public void dropCacheBehindReads(String identifier, long offset, long len, + int flags) throws NativeIOException { + assert this.dataInFd != null : "null dataInFd!"; + NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( + identifier, dataInFd, offset, len, flags); + } + + public void closeStreams() throws IOException { + IOException ioe = null; + if(checksumIn!=null) { + try { + checksumIn.close(); // close checksum file + } catch (IOException e) { + ioe = e; + } + checksumIn = null; + } + if(dataIn!=null) { + try { + dataIn.close(); // close data file + } catch (IOException e) { + ioe = e; + } + dataIn = null; + dataInFd = null; + } + if (volumeRef != null) { + IOUtils.cleanup(null, volumeRef); + volumeRef = null; + } + // throw IOException if there is any + if(ioe!= null) { + throw ioe; + } + } + @Override public void close() { IOUtils.closeStream(dataIn); + dataIn = null; + dataInFd = null; IOUtils.closeStream(checksumIn); + checksumIn = null; IOUtils.cleanup(null, volumeRef); + volumeRef = null; } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java index bd1461a25fd..a66847a0cb0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java @@ -18,32 +18,62 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.Closeable; +import java.io.FileDescriptor; import java.io.FileOutputStream; import java.io.OutputStream; import java.io.IOException; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.io.nativeio.NativeIOException; import org.apache.hadoop.util.DataChecksum; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; /** * Contains the output streams for the data and checksum of a replica. */ public class ReplicaOutputStreams implements Closeable { - private final OutputStream dataOut; + public static final Logger LOG = DataNode.LOG; + + private FileDescriptor outFd = null; + /** Stream to block. */ + private OutputStream dataOut; + /** Stream to checksum. */ private final OutputStream checksumOut; private final DataChecksum checksum; private final boolean isTransientStorage; + private final long slowLogThresholdMs; /** * Create an object with a data output stream, a checksum output stream * and a checksum. */ - public ReplicaOutputStreams(OutputStream dataOut, OutputStream checksumOut, - DataChecksum checksum, boolean isTransientStorage) { + public ReplicaOutputStreams(OutputStream dataOut, + OutputStream checksumOut, DataChecksum checksum, + boolean isTransientStorage, long slowLogThresholdMs) { this.dataOut = dataOut; - this.checksumOut = checksumOut; this.checksum = checksum; + this.slowLogThresholdMs = slowLogThresholdMs; this.isTransientStorage = isTransientStorage; + this.checksumOut = checksumOut; + + try { + if (this.dataOut instanceof FileOutputStream) { + this.outFd = ((FileOutputStream)this.dataOut).getFD(); + } else { + LOG.debug("Could not get file descriptor for outputstream of class " + + this.dataOut.getClass()); + } + } catch (IOException e) { + LOG.warn("Could not get file descriptor for outputstream of class " + + this.dataOut.getClass()); + } + } + + public FileDescriptor getOutFd() { + return outFd; } /** @return the data output stream. */ @@ -72,12 +102,17 @@ public class ReplicaOutputStreams implements Closeable { IOUtils.closeStream(checksumOut); } + public void closeDataStream() throws IOException { + dataOut.close(); + dataOut = null; + } + /** * Sync the data stream if it supports it. */ public void syncDataOut() throws IOException { if (dataOut instanceof FileOutputStream) { - ((FileOutputStream)dataOut).getChannel().force(true); + sync((FileOutputStream)dataOut); } } @@ -86,8 +121,68 @@ public class ReplicaOutputStreams implements Closeable { */ public void syncChecksumOut() throws IOException { if (checksumOut instanceof FileOutputStream) { - ((FileOutputStream)checksumOut).getChannel().force(true); + sync((FileOutputStream)checksumOut); } } + /** + * Flush the data stream if it supports it. + */ + public void flushDataOut() throws IOException { + flush(dataOut); + } + + /** + * Flush the checksum stream if it supports it. + */ + public void flushChecksumOut() throws IOException { + flush(checksumOut); + } + + private void flush(OutputStream dos) throws IOException { + long begin = Time.monotonicNow(); + dos.flush(); + long duration = Time.monotonicNow() - begin; + LOG.trace("ReplicaOutputStreams#flush takes {} ms.", duration); + if (duration > slowLogThresholdMs) { + LOG.warn("Slow flush took {} ms (threshold={} ms)", duration, + slowLogThresholdMs); + } + } + + private void sync(FileOutputStream fos) throws IOException { + long begin = Time.monotonicNow(); + fos.getChannel().force(true); + long duration = Time.monotonicNow() - begin; + LOG.trace("ReplicaOutputStreams#sync takes {} ms.", duration); + if (duration > slowLogThresholdMs) { + LOG.warn("Slow fsync took {} ms (threshold={} ms)", duration, + slowLogThresholdMs); + } + } + + public long writeToDisk(byte[] b, int off, int len) throws IOException { + long begin = Time.monotonicNow(); + dataOut.write(b, off, len); + long duration = Time.monotonicNow() - begin; + LOG.trace("DatanodeIO#writeToDisk takes {} ms.", duration); + if (duration > slowLogThresholdMs) { + LOG.warn("Slow BlockReceiver write data to disk cost: {} ms " + + "(threshold={} ms)", duration, slowLogThresholdMs); + } + return duration; + } + + public void syncFileRangeIfPossible(long offset, long nbytes, + int flags) throws NativeIOException { + assert this.outFd != null : "null outFd!"; + NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, nbytes, flags); + } + + public void dropCacheBehindWrites(String identifier, + long offset, long len, int flags) throws NativeIOException { + assert this.outFd != null : "null outFd!"; + NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( + identifier, outFd, offset, len, flags); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 29dbb2963ac..63e82f32638 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -49,11 +49,13 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; +import org.apache.hadoop.hdfs.server.datanode.LocalReplica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; + import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker; @@ -145,7 +147,7 @@ class BlockPoolSlice { // this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP); if (tmpDir.exists()) { - FileUtil.fullyDelete(tmpDir); + DataStorage.fullyDelete(tmpDir); } this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW); if (!rbwDir.mkdirs()) { // create rbw directory if not exist @@ -436,7 +438,7 @@ class BlockPoolSlice { final File targetMetaFile = new File(targetDir, metaFile.getName()); try { - NativeIO.renameTo(metaFile, targetMetaFile); + LocalReplica.rename(metaFile, targetMetaFile); } catch (IOException e) { LOG.warn("Failed to move meta file from " + metaFile + " to " + targetMetaFile, e); @@ -446,7 +448,7 @@ class BlockPoolSlice { final File targetBlockFile = new File(targetDir, blockFile.getName()); try { - NativeIO.renameTo(blockFile, targetBlockFile); + LocalReplica.rename(blockFile, targetBlockFile); } catch (IOException e) { LOG.warn("Failed to move block file from " + blockFile + " to " + targetBlockFile, e); @@ -688,8 +690,6 @@ class BlockPoolSlice { * @return the number of valid bytes */ private long validateIntegrityAndSetLength(File blockFile, long genStamp) { - DataInputStream checksumIn = null; - InputStream blockIn = null; try { final File metaFile = FsDatasetUtil.getMetaFile(blockFile, genStamp); long blockFileLen = blockFile.length(); @@ -699,57 +699,52 @@ class BlockPoolSlice { !metaFile.exists() || metaFileLen < crcHeaderLen) { return 0; } - checksumIn = new DataInputStream( + try (DataInputStream checksumIn = new DataInputStream( new BufferedInputStream(new FileInputStream(metaFile), - ioFileBufferSize)); - - // read and handle the common header here. For now just a version - final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( - checksumIn, metaFile); - int bytesPerChecksum = checksum.getBytesPerChecksum(); - int checksumSize = checksum.getChecksumSize(); - long numChunks = Math.min( - (blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum, - (metaFileLen - crcHeaderLen)/checksumSize); - if (numChunks == 0) { - return 0; - } - IOUtils.skipFully(checksumIn, (numChunks-1)*checksumSize); - blockIn = new FileInputStream(blockFile); - long lastChunkStartPos = (numChunks-1)*bytesPerChecksum; - IOUtils.skipFully(blockIn, lastChunkStartPos); - int lastChunkSize = (int)Math.min( - bytesPerChecksum, blockFileLen-lastChunkStartPos); - byte[] buf = new byte[lastChunkSize+checksumSize]; - checksumIn.readFully(buf, lastChunkSize, checksumSize); - IOUtils.readFully(blockIn, buf, 0, lastChunkSize); - - checksum.update(buf, 0, lastChunkSize); - long validFileLength; - if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc - validFileLength = lastChunkStartPos + lastChunkSize; - } else { // last chunck is corrupt - validFileLength = lastChunkStartPos; - } - - // truncate if extra bytes are present without CRC - if (blockFile.length() > validFileLength) { - RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw"); - try { - // truncate blockFile - blockRAF.setLength(validFileLength); - } finally { - blockRAF.close(); + ioFileBufferSize))) { + // read and handle the common header here. For now just a version + final DataChecksum checksum = BlockMetadataHeader.readDataChecksum( + checksumIn, metaFile); + int bytesPerChecksum = checksum.getBytesPerChecksum(); + int checksumSize = checksum.getChecksumSize(); + long numChunks = Math.min( + (blockFileLen + bytesPerChecksum - 1) / bytesPerChecksum, + (metaFileLen - crcHeaderLen) / checksumSize); + if (numChunks == 0) { + return 0; + } + try (InputStream blockIn = new FileInputStream(blockFile); + ReplicaInputStreams ris = new ReplicaInputStreams(blockIn, + checksumIn, volume.obtainReference())) { + ris.skipChecksumFully((numChunks - 1) * checksumSize); + long lastChunkStartPos = (numChunks - 1) * bytesPerChecksum; + ris.skipDataFully(lastChunkStartPos); + int lastChunkSize = (int) Math.min( + bytesPerChecksum, blockFileLen - lastChunkStartPos); + byte[] buf = new byte[lastChunkSize + checksumSize]; + ris.readChecksumFully(buf, lastChunkSize, checksumSize); + ris.readDataFully(buf, 0, lastChunkSize); + checksum.update(buf, 0, lastChunkSize); + long validFileLength; + if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc + validFileLength = lastChunkStartPos + lastChunkSize; + } else { // last chunk is corrupt + validFileLength = lastChunkStartPos; + } + // truncate if extra bytes are present without CRC + if (blockFile.length() > validFileLength) { + try (RandomAccessFile blockRAF = + new RandomAccessFile(blockFile, "rw")) { + // truncate blockFile + blockRAF.setLength(validFileLength); + } + } + return validFileLength; } } - - return validFileLength; } catch (IOException e) { FsDatasetImpl.LOG.warn(e); return 0; - } finally { - IOUtils.closeStream(checksumIn); - IOUtils.closeStream(blockIn); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index b9c731be5ae..97dcf8d4a14 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.File; -import java.io.FileDescriptor; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; @@ -38,9 +37,9 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIOException; /** @@ -202,13 +201,13 @@ class FsDatasetAsyncDiskService { } public void submitSyncFileRangeRequest(FsVolumeImpl volume, - final FileDescriptor fd, final long offset, final long nbytes, + final ReplicaOutputStreams streams, final long offset, final long nbytes, final int flags) { execute(volume, new Runnable() { @Override public void run() { try { - NativeIO.POSIX.syncFileRangeIfPossible(fd, offset, nbytes, flags); + streams.syncFileRangeIfPossible(offset, nbytes, flags); } catch (NativeIOException e) { LOG.warn("sync_file_range error", e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 954d6ef7b69..6065df230c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -21,7 +21,6 @@ import java.io.BufferedOutputStream; import java.io.DataOutputStream; import java.io.EOFException; import java.io.File; -import java.io.FileDescriptor; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; @@ -2755,9 +2754,9 @@ class FsDatasetImpl implements FsDatasetSpi { @Override public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, - FileDescriptor fd, long offset, long nbytes, int flags) { + ReplicaOutputStreams outs, long offset, long nbytes, int flags) { FsVolumeImpl fsVolumeImpl = this.getVolume(block); - asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, fd, offset, + asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, outs, offset, nbytes, flags); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index a231e03e647..08564de6898 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -1067,7 +1067,7 @@ public class FsVolumeImpl implements FsVolumeSpi { DataStorage.STORAGE_DIR_LAZY_PERSIST); File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW); if (force) { - FileUtil.fullyDelete(bpDir); + DataStorage.fullyDelete(bpDir); } else { if (!rbwDir.delete()) { throw new IOException("Failed to delete " + rbwDir); @@ -1081,7 +1081,7 @@ public class FsVolumeImpl implements FsVolumeSpi { !FileUtil.fullyDelete(lazypersistDir)))) { throw new IOException("Failed to delete " + lazypersistDir); } - FileUtil.fullyDelete(tmpDir); + DataStorage.fullyDelete(tmpDir); for (File f : FileUtil.listFiles(bpCurrentDir)) { if (!f.delete()) { throw new IOException("Failed to delete " + f); @@ -1437,4 +1437,3 @@ public class FsVolumeImpl implements FsVolumeSpi { replicaState); } } - diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java index 20cec6a2c8f..e963d416370 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java @@ -701,7 +701,7 @@ public class TestFileAppend{ ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaHandler.getReplica(); ReplicaOutputStreams - outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM); + outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM, 300); OutputStream dataOutput = outputStreams.getDataOut(); byte[] appendBytes = new byte[1]; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 5d63d079d9d..ae529055f7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.File; -import java.io.FileDescriptor; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -261,14 +260,15 @@ public class SimulatedFSDataset implements FsDatasetSpi { @Override synchronized public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum) throws IOException { + DataChecksum requestedChecksum, long slowLogThresholdMs) + throws IOException { if (finalized) { throw new IOException("Trying to write to a finalized replica " + theBlock); } else { SimulatedOutputStream crcStream = new SimulatedOutputStream(); return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum, - volume.isTransientStorage()); + volume.isTransientStorage(), slowLogThresholdMs); } } @@ -1364,7 +1364,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { @Override public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, - FileDescriptor fd, long offset, long nbytes, int flags) { + ReplicaOutputStreams outs, long offset, long nbytes, int flags) { throw new UnsupportedOperationException(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 619eda00408..8439991056a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -673,7 +673,7 @@ public class TestBlockRecovery { ReplicaOutputStreams streams = null; try { streams = replicaInfo.createStreams(true, - DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); + DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300); streams.getChecksumOut().write('a'); dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1)); BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index 4e724bc7cb8..fa980c299ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -83,7 +83,7 @@ public class TestSimulatedFSDataset { ReplicaInPipeline bInfo = fsdataset.createRbw( StorageType.DEFAULT, b, false).getReplica(); ReplicaOutputStreams out = bInfo.createStreams(true, - DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); + DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300); try { OutputStream dataOut = out.getDataOut(); assertEquals(0, fsdataset.getLength(b)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 41663464f7c..2417c9de9e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -318,8 +318,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi { } @Override - public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, FileDescriptor fd, long offset, long nbytes, int flags) { - + public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, + ReplicaOutputStreams outs, long offset, long nbytes, int flags) { } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java index 90c3b8a56f9..6fa283094d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java @@ -58,8 +58,10 @@ public class ExternalReplicaInPipeline implements ReplicaInPipeline { @Override public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum) throws IOException { - return new ReplicaOutputStreams(null, null, requestedChecksum, false); + DataChecksum requestedChecksum, long slowLogThresholdMs) + throws IOException { + return new ReplicaOutputStreams(null, null, requestedChecksum, false, + slowLogThresholdMs); } @Override