From dcedb72af468128458e597f08d22f5c34b744ae5 Mon Sep 17 00:00:00 2001 From: Xiaoyu Yao Date: Mon, 5 Dec 2016 12:08:48 -0800 Subject: [PATCH] Revert "HADOOP-10930. Refactor: Wrap Datanode IO related operations. Contributed by Xiaoyu Yao." This reverts commit aeecfa24f4fb6af289920cbf8830c394e66bd78e. --- .../hdfs/server/datanode/BlockReceiver.java | 66 ++++--- .../hdfs/server/datanode/BlockSender.java | 105 ++++++---- .../hadoop/hdfs/server/datanode/DNConf.java | 4 - .../hdfs/server/datanode/DataStorage.java | 5 - .../hdfs/server/datanode/LocalReplica.java | 179 ++++++------------ .../datanode/LocalReplicaInPipeline.java | 30 ++- .../server/datanode/ReplicaInPipeline.java | 4 +- .../datanode/fsdataset/FsDatasetSpi.java | 3 +- .../fsdataset/ReplicaInputStreams.java | 102 +--------- .../fsdataset/ReplicaOutputStreams.java | 109 +---------- .../fsdataset/impl/BlockPoolSlice.java | 32 ++-- .../impl/FsDatasetAsyncDiskService.java | 7 +- .../fsdataset/impl/FsDatasetImpl.java | 5 +- .../datanode/fsdataset/impl/FsVolumeImpl.java | 5 +- .../apache/hadoop/hdfs/TestFileAppend.java | 2 +- .../server/datanode/SimulatedFSDataset.java | 13 +- .../server/datanode/TestBlockRecovery.java | 2 +- .../datanode/TestSimulatedFSDataset.java | 2 +- .../extdataset/ExternalDatasetImpl.java | 4 +- .../extdataset/ExternalReplicaInPipeline.java | 6 +- 20 files changed, 239 insertions(+), 446 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index f372072a74c..39419c1e013 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -24,7 +24,10 @@ import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; +import java.io.FileDescriptor; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Writer; import java.nio.ByteBuffer; @@ -50,6 +53,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.StringUtils; @@ -84,6 +88,8 @@ class BlockReceiver implements Closeable { * the DataNode needs to recalculate checksums before writing. */ private final boolean needsChecksumTranslation; + private OutputStream out = null; // to block file at local disk + private FileDescriptor outFd; private DataOutputStream checksumOut = null; // to crc file at local disk private final int bytesPerChecksum; private final int checksumSize; @@ -244,8 +250,7 @@ class BlockReceiver implements Closeable { final boolean isCreate = isDatanode || isTransfer || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE; - streams = replicaInfo.createStreams(isCreate, requestedChecksum, - datanodeSlowLogThresholdMs); + streams = replicaInfo.createStreams(isCreate, requestedChecksum); assert streams != null : "null streams!"; // read checksum meta information @@ -255,6 +260,13 @@ class BlockReceiver implements Closeable { this.bytesPerChecksum = diskChecksum.getBytesPerChecksum(); this.checksumSize = diskChecksum.getChecksumSize(); + this.out = streams.getDataOut(); + if (out instanceof FileOutputStream) { + this.outFd = ((FileOutputStream)out).getFD(); + } else { + LOG.warn("Could not get file descriptor for outputstream of class " + + out.getClass()); + } this.checksumOut = new DataOutputStream(new BufferedOutputStream( streams.getChecksumOut(), DFSUtilClient.getSmallBufferSize( datanode.getConf()))); @@ -307,7 +319,7 @@ class BlockReceiver implements Closeable { packetReceiver.close(); IOException ioe = null; - if (syncOnClose && (streams.getDataOut() != null || checksumOut != null)) { + if (syncOnClose && (out != null || checksumOut != null)) { datanode.metrics.incrFsyncCount(); } long flushTotalNanos = 0; @@ -336,9 +348,9 @@ class BlockReceiver implements Closeable { } // close block file try { - if (streams.getDataOut() != null) { + if (out != null) { long flushStartNanos = System.nanoTime(); - streams.flushDataOut(); + out.flush(); long flushEndNanos = System.nanoTime(); if (syncOnClose) { long fsyncStartNanos = flushEndNanos; @@ -347,13 +359,14 @@ class BlockReceiver implements Closeable { } flushTotalNanos += flushEndNanos - flushStartNanos; measuredFlushTime = true; - streams.closeDataStream(); + out.close(); + out = null; } } catch (IOException e) { ioe = e; } finally{ - streams.close(); + IOUtils.closeStream(out); } if (replicaHandler != null) { IOUtils.cleanup(null, replicaHandler); @@ -406,9 +419,9 @@ class BlockReceiver implements Closeable { } flushTotalNanos += flushEndNanos - flushStartNanos; } - if (streams.getDataOut() != null) { + if (out != null) { long flushStartNanos = System.nanoTime(); - streams.flushDataOut(); + out.flush(); long flushEndNanos = System.nanoTime(); if (isSync) { long fsyncStartNanos = flushEndNanos; @@ -417,10 +430,10 @@ class BlockReceiver implements Closeable { } flushTotalNanos += flushEndNanos - flushStartNanos; } - if (checksumOut != null || streams.getDataOut() != null) { + if (checksumOut != null || out != null) { datanode.metrics.addFlushNanos(flushTotalNanos); if (isSync) { - datanode.metrics.incrFsyncCount(); + datanode.metrics.incrFsyncCount(); } } long duration = Time.monotonicNow() - begin; @@ -703,12 +716,16 @@ class BlockReceiver implements Closeable { int numBytesToDisk = (int)(offsetInBlock-onDiskLen); // Write data to disk. - long duration = streams.writeToDisk(dataBuf.array(), - startByteToDisk, numBytesToDisk); - + long begin = Time.monotonicNow(); + out.write(dataBuf.array(), startByteToDisk, numBytesToDisk); + long duration = Time.monotonicNow() - begin; if (duration > maxWriteToDiskMs) { maxWriteToDiskMs = duration; } + if (duration > datanodeSlowLogThresholdMs) { + LOG.warn("Slow BlockReceiver write data to disk cost:" + duration + + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)"); + } final byte[] lastCrc; if (shouldNotWriteChecksum) { @@ -825,7 +842,7 @@ class BlockReceiver implements Closeable { private void manageWriterOsCache(long offsetInBlock) { try { - if (streams.getOutFd() != null && + if (outFd != null && offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) { long begin = Time.monotonicNow(); // @@ -840,11 +857,12 @@ class BlockReceiver implements Closeable { if (syncBehindWrites) { if (syncBehindWritesInBackground) { this.datanode.getFSDataset().submitBackgroundSyncFileRangeRequest( - block, streams, lastCacheManagementOffset, + block, outFd, lastCacheManagementOffset, offsetInBlock - lastCacheManagementOffset, SYNC_FILE_RANGE_WRITE); } else { - streams.syncFileRangeIfPossible(lastCacheManagementOffset, + NativeIO.POSIX.syncFileRangeIfPossible(outFd, + lastCacheManagementOffset, offsetInBlock - lastCacheManagementOffset, SYNC_FILE_RANGE_WRITE); } @@ -861,8 +879,8 @@ class BlockReceiver implements Closeable { // long dropPos = lastCacheManagementOffset - CACHE_DROP_LAG_BYTES; if (dropPos > 0 && dropCacheBehindWrites) { - streams.dropCacheBehindWrites(block.getBlockName(), 0, dropPos, - POSIX_FADV_DONTNEED); + NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( + block.getBlockName(), outFd, 0, dropPos, POSIX_FADV_DONTNEED); } lastCacheManagementOffset = offsetInBlock; long duration = Time.monotonicNow() - begin; @@ -971,7 +989,7 @@ class BlockReceiver implements Closeable { // The worst case is not recovering this RBW replica. // Client will fall back to regular pipeline recovery. } finally { - IOUtils.closeStream(streams.getDataOut()); + IOUtils.closeStream(out); } try { // Even if the connection is closed after the ack packet is @@ -1029,8 +1047,8 @@ class BlockReceiver implements Closeable { * will be overwritten. */ private void adjustCrcFilePosition() throws IOException { - if (streams.getDataOut() != null) { - streams.flushDataOut(); + if (out != null) { + out.flush(); } if (checksumOut != null) { checksumOut.flush(); @@ -1076,10 +1094,10 @@ class BlockReceiver implements Closeable { byte[] crcbuf = new byte[checksumSize]; try (ReplicaInputStreams instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff)) { - instr.readDataFully(buf, 0, sizePartialChunk); + IOUtils.readFully(instr.getDataIn(), buf, 0, sizePartialChunk); // open meta file and read in crc value computer earlier - instr.readChecksumFully(crcbuf, 0, crcbuf.length); + IOUtils.readFully(instr.getChecksumIn(), crcbuf, 0, crcbuf.length); } // compute crc of partial chunk from data read in the block file. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 9182c88ce2b..a1b1f86dbfa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.FileDescriptor; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; @@ -41,11 +42,11 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; +import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.net.SocketOutputStream; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DataChecksum; @@ -119,11 +120,12 @@ class BlockSender implements java.io.Closeable { /** the block to read from */ private final ExtendedBlock block; - - /** InputStreams and file descriptors to read block/checksum. */ - private ReplicaInputStreams ris; + /** Stream to read block data from */ + private InputStream blockIn; /** updated while using transferTo() */ private long blockInPosition = -1; + /** Stream to read checksum */ + private DataInputStream checksumIn; /** Checksum utility */ private final DataChecksum checksum; /** Initial position to read */ @@ -150,6 +152,11 @@ class BlockSender implements java.io.Closeable { private final String clientTraceFmt; private volatile ChunkChecksum lastChunkChecksum = null; private DataNode datanode; + + /** The file descriptor of the block being sent */ + private FileDescriptor blockInFd; + /** The reference to the volume where the block is located */ + private FsVolumeReference volumeRef; /** The replica of the block that is being read. */ private final Replica replica; @@ -194,9 +201,6 @@ class BlockSender implements java.io.Closeable { boolean sendChecksum, DataNode datanode, String clientTraceFmt, CachingStrategy cachingStrategy) throws IOException { - InputStream blockIn = null; - DataInputStream checksumIn = null; - FsVolumeReference volumeRef = null; try { this.block = block; this.corruptChecksumOk = corruptChecksumOk; @@ -277,7 +281,7 @@ class BlockSender implements java.io.Closeable { (!is32Bit || length <= Integer.MAX_VALUE); // Obtain a reference before reading data - volumeRef = datanode.data.getVolume(block).obtainReference(); + this.volumeRef = datanode.data.getVolume(block).obtainReference(); /* * (corruptChecksumOK, meta_file_exist): operation @@ -401,9 +405,14 @@ class BlockSender implements java.io.Closeable { DataNode.LOG.debug("replica=" + replica); } blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset - ris = new ReplicaInputStreams(blockIn, checksumIn, volumeRef); + if (blockIn instanceof FileInputStream) { + blockInFd = ((FileInputStream)blockIn).getFD(); + } else { + blockInFd = null; + } } catch (IOException ioe) { IOUtils.closeStream(this); + IOUtils.closeStream(blockIn); throw ioe; } } @@ -413,11 +422,12 @@ class BlockSender implements java.io.Closeable { */ @Override public void close() throws IOException { - if (ris.getDataInFd() != null && + if (blockInFd != null && ((dropCacheBehindAllReads) || (dropCacheBehindLargeReads && isLongRead()))) { try { - ris.dropCacheBehindReads(block.getBlockName(), lastCacheDropOffset, + NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( + block.getBlockName(), blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset, POSIX_FADV_DONTNEED); } catch (Exception e) { LOG.warn("Unable to drop cache on file close", e); @@ -426,12 +436,32 @@ class BlockSender implements java.io.Closeable { if (curReadahead != null) { curReadahead.cancel(); } - - try { - ris.closeStreams(); - } finally { - IOUtils.closeStream(ris); - ris = null; + + IOException ioe = null; + if(checksumIn!=null) { + try { + checksumIn.close(); // close checksum file + } catch (IOException e) { + ioe = e; + } + checksumIn = null; + } + if(blockIn!=null) { + try { + blockIn.close(); // close data file + } catch (IOException e) { + ioe = e; + } + blockIn = null; + blockInFd = null; + } + if (volumeRef != null) { + IOUtils.cleanup(null, volumeRef); + volumeRef = null; + } + // throw IOException if there is any + if(ioe!= null) { + throw ioe; } } @@ -535,7 +565,7 @@ class BlockSender implements java.io.Closeable { int checksumOff = pkt.position(); byte[] buf = pkt.array(); - if (checksumSize > 0 && ris.getChecksumIn() != null) { + if (checksumSize > 0 && checksumIn != null) { readChecksum(buf, checksumOff, checksumDataLen); // write in progress that we need to use to get last checksum @@ -551,7 +581,7 @@ class BlockSender implements java.io.Closeable { int dataOff = checksumOff + checksumDataLen; if (!transferTo) { // normal transfer - ris.readDataFully(buf, dataOff, dataLen); + IOUtils.readFully(blockIn, buf, dataOff, dataLen); if (verifyChecksum) { verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff); @@ -563,12 +593,12 @@ class BlockSender implements java.io.Closeable { SocketOutputStream sockOut = (SocketOutputStream)out; // First write header and checksums sockOut.write(buf, headerOff, dataOff - headerOff); - + // no need to flush since we know out is not a buffered stream - FileChannel fileCh = ((FileInputStream)ris.getDataIn()).getChannel(); + FileChannel fileCh = ((FileInputStream)blockIn).getChannel(); LongWritable waitTime = new LongWritable(); LongWritable transferTime = new LongWritable(); - sockOut.transferToFully(fileCh, blockInPosition, dataLen, + sockOut.transferToFully(fileCh, blockInPosition, dataLen, waitTime, transferTime); datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get()); datanode.metrics.addSendDataPacketTransferNanos(transferTime.get()); @@ -600,7 +630,7 @@ class BlockSender implements java.io.Closeable { if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) { LOG.error("BlockSender.sendChunks() exception: ", e); datanode.getBlockScanner().markSuspectBlock( - ris.getVolumeRef().getVolume().getStorageID(), + volumeRef.getVolume().getStorageID(), block); } } @@ -623,15 +653,16 @@ class BlockSender implements java.io.Closeable { */ private void readChecksum(byte[] buf, final int checksumOffset, final int checksumLen) throws IOException { - if (checksumSize <= 0 && ris.getChecksumIn() == null) { + if (checksumSize <= 0 && checksumIn == null) { return; } try { - ris.readChecksumFully(buf, checksumOffset, checksumLen); + checksumIn.readFully(buf, checksumOffset, checksumLen); } catch (IOException e) { LOG.warn(" Could not read or failed to verify checksum for data" + " at offset " + offset + " for block " + block, e); - ris.closeChecksumStream(); + IOUtils.closeStream(checksumIn); + checksumIn = null; if (corruptChecksumOk) { if (checksumOffset < checksumLen) { // Just fill the array with zeros. @@ -715,10 +746,10 @@ class BlockSender implements java.io.Closeable { lastCacheDropOffset = initialOffset; - if (isLongRead() && ris.getDataInFd() != null) { + if (isLongRead() && blockInFd != null) { // Advise that this file descriptor will be accessed sequentially. - ris.dropCacheBehindReads(block.getBlockName(), 0, 0, - POSIX_FADV_SEQUENTIAL); + NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( + block.getBlockName(), blockInFd, 0, 0, POSIX_FADV_SEQUENTIAL); } // Trigger readahead of beginning of file if configured. @@ -730,10 +761,9 @@ class BlockSender implements java.io.Closeable { int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN; boolean transferTo = transferToAllowed && !verifyChecksum && baseStream instanceof SocketOutputStream - && ris.getDataIn() instanceof FileInputStream; + && blockIn instanceof FileInputStream; if (transferTo) { - FileChannel fileChannel = - ((FileInputStream)ris.getDataIn()).getChannel(); + FileChannel fileChannel = ((FileInputStream)blockIn).getChannel(); blockInPosition = fileChannel.position(); streamForSendChunks = baseStream; maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE); @@ -788,16 +818,14 @@ class BlockSender implements java.io.Closeable { private void manageOsCache() throws IOException { // We can't manage the cache for this block if we don't have a file // descriptor to work with. - if (ris.getDataInFd() == null) { - return; - } + if (blockInFd == null) return; // Perform readahead if necessary if ((readaheadLength > 0) && (datanode.readaheadPool != null) && (alwaysReadahead || isLongRead())) { curReadahead = datanode.readaheadPool.readaheadStream( - clientTraceFmt, ris.getDataInFd(), offset, readaheadLength, - Long.MAX_VALUE, curReadahead); + clientTraceFmt, blockInFd, offset, readaheadLength, Long.MAX_VALUE, + curReadahead); } // Drop what we've just read from cache, since we aren't @@ -807,7 +835,8 @@ class BlockSender implements java.io.Closeable { long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES; if (offset >= nextCacheDropOffset) { long dropLength = offset - lastCacheDropOffset; - ris.dropCacheBehindReads(block.getBlockName(), lastCacheDropOffset, + NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( + block.getBlockName(), blockInFd, lastCacheDropOffset, dropLength, POSIX_FADV_DONTNEED); lastCacheDropOffset = offset; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index c1487b1a4a3..823d05c425d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -402,10 +402,6 @@ public class DNConf { return volsConfigured; } - public long getSlowIoWarningThresholdMs() { - return datanodeSlowIoWarningThresholdMs; - } - int getMaxDataLength() { return maxDataLength; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java index f4deb6df350..29b14e731ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java @@ -1355,9 +1355,4 @@ public class DataStorage extends Storage { synchronized void removeBlockPoolStorage(String bpId) { bpStorageMap.remove(bpId); } - - public static boolean fullyDelete(final File dir) { - boolean result = FileUtil.fullyDelete(dir); - return result; - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java index 3615cd118b3..f8291112ec7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplica.java @@ -29,6 +29,9 @@ import java.net.URI; import java.util.HashMap; import java.util.Map; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.HardLink; import org.apache.hadoop.fs.LocalFileSystem; @@ -43,8 +46,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.DataChecksum; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; @@ -68,7 +69,15 @@ abstract public class LocalReplica extends ReplicaInfo { private static final Map internedBaseDirs = new HashMap(); - static final Logger LOG = LoggerFactory.getLogger(LocalReplica.class); + static final Log LOG = LogFactory.getLog(LocalReplica.class); + private final static boolean IS_NATIVE_IO_AVAIL; + static { + IS_NATIVE_IO_AVAIL = NativeIO.isAvailable(); + if (Path.WINDOWS && !IS_NATIVE_IO_AVAIL) { + LOG.warn("Data node cannot fully support concurrent reading" + + " and writing without native code extensions on Windows."); + } + } /** * Constructor @@ -190,14 +199,14 @@ abstract public class LocalReplica extends ReplicaInfo { File tmpFile = DatanodeUtil.createTmpFile(b, DatanodeUtil.getUnlinkTmpFile(file)); try (FileInputStream in = new FileInputStream(file)) { try (FileOutputStream out = new FileOutputStream(tmpFile)){ - copyBytes(in, out, 16 * 1024); + IOUtils.copyBytes(in, out, 16 * 1024); } if (file.length() != tmpFile.length()) { throw new IOException("Copy of file " + file + " size " + file.length()+ " into file " + tmpFile + " resulted in a size of " + tmpFile.length()); } - replaceFile(tmpFile, file); + FileUtil.replaceFile(tmpFile, file); } catch (IOException e) { boolean done = tmpFile.delete(); if (!done) { @@ -232,13 +241,13 @@ abstract public class LocalReplica extends ReplicaInfo { } File meta = getMetaFile(); - int linkCount = getHardLinkCount(file); + int linkCount = HardLink.getLinkCount(file); if (linkCount > 1) { DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " + "block " + this); breakHardlinks(file, this); } - if (getHardLinkCount(meta) > 1) { + if (HardLink.getLinkCount(meta) > 1) { breakHardlinks(meta, this); } return true; @@ -251,7 +260,18 @@ abstract public class LocalReplica extends ReplicaInfo { @Override public InputStream getDataInputStream(long seekOffset) throws IOException { - return getDataInputStream(getBlockFile(), seekOffset); + + File blockFile = getBlockFile(); + if (IS_NATIVE_IO_AVAIL) { + return NativeIO.getShareDeleteFileInputStream(blockFile, seekOffset); + } else { + try { + return FsDatasetUtil.openAndSeek(blockFile, seekOffset); + } catch (FileNotFoundException fnfe) { + throw new IOException("Block " + this + " is not valid. " + + "Expected block file at " + blockFile + " does not exist."); + } + } } @Override @@ -266,7 +286,7 @@ abstract public class LocalReplica extends ReplicaInfo { @Override public boolean deleteBlockData() { - return fullyDelete(getBlockFile()); + return getBlockFile().delete(); } @Override @@ -300,7 +320,7 @@ abstract public class LocalReplica extends ReplicaInfo { @Override public boolean deleteMetadata() { - return fullyDelete(getMetaFile()); + return getMetaFile().delete(); } @Override @@ -320,7 +340,7 @@ abstract public class LocalReplica extends ReplicaInfo { private boolean renameFile(File srcfile, File destfile) throws IOException { try { - rename(srcfile, destfile); + NativeIO.renameTo(srcfile, destfile); return true; } catch (IOException e) { throw new IOException("Failed to move block file for " + this @@ -347,14 +367,22 @@ abstract public class LocalReplica extends ReplicaInfo { @Override public boolean getPinning(LocalFileSystem localFS) throws IOException { - return getPinning(localFS, new Path(getBlockFile().getAbsolutePath())); + FileStatus fss = + localFS.getFileStatus(new Path(getBlockFile().getAbsolutePath())); + return fss.getPermission().getStickyBit(); } @Override public void setPinning(LocalFileSystem localFS) throws IOException { File f = getBlockFile(); Path p = new Path(f.getAbsolutePath()); - setPinning(localFS, p); + + FsPermission oldPermission = localFS.getFileStatus( + new Path(f.getAbsolutePath())).getPermission(); + //sticky bit is used for pinning purpose + FsPermission permission = new FsPermission(oldPermission.getUserAction(), + oldPermission.getGroupAction(), oldPermission.getOtherAction(), true); + localFS.setPermission(p, permission); } @Override @@ -370,7 +398,7 @@ abstract public class LocalReplica extends ReplicaInfo { } try { // calling renameMeta on the ReplicaInfo doesn't work here - rename(oldmeta, newmeta); + NativeIO.renameTo(oldmeta, newmeta); } catch (IOException e) { setGenerationStamp(oldGS); // restore old GS throw new IOException("Block " + this + " reopen failed. " + @@ -389,113 +417,7 @@ abstract public class LocalReplica extends ReplicaInfo { return info.getBlockFile().compareTo(getBlockFile()); } - @Override - public void copyMetadata(URI destination) throws IOException { - //for local replicas, we assume the destination URI is file - nativeCopyFileUnbuffered(getMetaFile(), new File(destination), true); - } - - @Override - public void copyBlockdata(URI destination) throws IOException { - //for local replicas, we assume the destination URI is file - nativeCopyFileUnbuffered(getBlockFile(), new File(destination), true); - } - - public void renameMeta(File newMetaFile) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Renaming " + getMetaFile() + " to " + newMetaFile); - } - renameFile(getMetaFile(), newMetaFile); - } - - public void renameBlock(File newBlockFile) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("Renaming " + getBlockFile() + " to " + newBlockFile - + ", file length=" + getBlockFile().length()); - } - renameFile(getBlockFile(), newBlockFile); - } - - public static void rename(File from, File to) throws IOException { - Storage.rename(from, to); - } - - /** - * Get input stream for a local file and optionally seek to the offset. - * @param f path to the file - * @param seekOffset offset to seek - * @return - * @throws IOException - */ - private FileInputStream getDataInputStream(File f, long seekOffset) - throws IOException { - FileInputStream fis; - if (NativeIO.isAvailable()) { - fis = NativeIO.getShareDeleteFileInputStream(f, seekOffset); - } else { - try { - fis = FsDatasetUtil.openAndSeek(f, seekOffset); - } catch (FileNotFoundException fnfe) { - throw new IOException("Expected block file at " + f + - " does not exist."); - } - } - return fis; - } - - private void nativeCopyFileUnbuffered(File srcFile, File destFile, - boolean preserveFileDate) throws IOException { - Storage.nativeCopyFileUnbuffered(srcFile, destFile, preserveFileDate); - } - - private void copyBytes(InputStream in, OutputStream out, int - buffSize) throws IOException{ - IOUtils.copyBytes(in, out, buffSize); - } - - private void replaceFile(File src, File target) throws IOException { - FileUtil.replaceFile(src, target); - } - - public static boolean fullyDelete(final File dir) { - boolean result = DataStorage.fullyDelete(dir); - return result; - } - - public static int getHardLinkCount(File fileName) throws IOException { - int linkCount = HardLink.getLinkCount(fileName); - return linkCount; - } - - /** - * Get pin status of a file by checking the sticky bit. - * @param localFS local file system - * @param path path to be checked - * @return - * @throws IOException - */ - public boolean getPinning(LocalFileSystem localFS, Path path) throws - IOException { - boolean stickyBit = - localFS.getFileStatus(path).getPermission().getStickyBit(); - return stickyBit; - } - - /** - * Set sticky bit on path to pin file. - * @param localFS local file system - * @param path path to be pinned with sticky bit - * @throws IOException - */ - public void setPinning(LocalFileSystem localFS, Path path) throws - IOException { - FsPermission oldPermission = localFS.getFileStatus(path).getPermission(); - FsPermission permission = new FsPermission(oldPermission.getUserAction(), - oldPermission.getGroupAction(), oldPermission.getOtherAction(), true); - localFS.setPermission(path, permission); - } - - public static void truncateBlock(File blockFile, File metaFile, + static public void truncateBlock(File blockFile, File metaFile, long oldlen, long newlen) throws IOException { LOG.info("truncateBlock: blockFile=" + blockFile + ", metaFile=" + metaFile @@ -545,4 +467,19 @@ abstract public class LocalReplica extends ReplicaInfo { metaRAF.close(); } } + + @Override + public void copyMetadata(URI destination) throws IOException { + //for local replicas, we assume the destination URI is file + Storage.nativeCopyFileUnbuffered(getMetaFile(), + new File(destination), true); + } + + @Override + public void copyBlockdata(URI destination) throws IOException { + //for local replicas, we assume the destination URI is file + Storage.nativeCopyFileUnbuffered(getBlockFile(), + new File(destination), true); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java index 1387155f17e..bc7bc6dde38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/LocalReplicaInPipeline.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.StringUtils; @@ -245,8 +246,7 @@ public class LocalReplicaInPipeline extends LocalReplica @Override // ReplicaInPipeline public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum, long slowLogThresholdMs) - throws IOException { + DataChecksum requestedChecksum) throws IOException { File blockFile = getBlockFile(); File metaFile = getMetaFile(); if (DataNode.LOG.isDebugEnabled()) { @@ -313,7 +313,7 @@ public class LocalReplicaInPipeline extends LocalReplica crcOut.getChannel().position(crcDiskSize); } return new ReplicaOutputStreams(blockOut, crcOut, checksum, - getVolume().isTransientStorage(), slowLogThresholdMs); + getVolume().isTransientStorage()); } catch (IOException e) { IOUtils.closeStream(blockOut); IOUtils.closeStream(metaRAF); @@ -373,30 +373,40 @@ public class LocalReplicaInPipeline extends LocalReplica + " should be derived from LocalReplica"); } - LocalReplica oldReplica = (LocalReplica) oldReplicaInfo; - File oldmeta = oldReplica.getMetaFile(); + LocalReplica localReplica = (LocalReplica) oldReplicaInfo; + + File oldmeta = localReplica.getMetaFile(); File newmeta = getMetaFile(); + if (LOG.isDebugEnabled()) { + LOG.debug("Renaming " + oldmeta + " to " + newmeta); + } try { - oldReplica.renameMeta(newmeta); + NativeIO.renameTo(oldmeta, newmeta); } catch (IOException e) { throw new IOException("Block " + oldReplicaInfo + " reopen failed. " + " Unable to move meta file " + oldmeta + " to rbw dir " + newmeta, e); } + File blkfile = localReplica.getBlockFile(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Renaming " + blkfile + " to " + newBlkFile + + ", file length=" + blkfile.length()); + } try { - oldReplica.renameBlock(newBlkFile); + NativeIO.renameTo(blkfile, newBlkFile); } catch (IOException e) { try { - renameMeta(oldmeta); + NativeIO.renameTo(newmeta, oldmeta); } catch (IOException ex) { LOG.warn("Cannot move meta file " + newmeta + "back to the finalized directory " + oldmeta, ex); } throw new IOException("Block " + oldReplicaInfo + " reopen failed. " + - " Unable to move block file " + oldReplica.getBlockFile() + - " to rbw dir " + newBlkFile, e); + " Unable to move block file " + blkfile + + " to rbw dir " + newBlkFile, e); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java index 5fdbec02f1a..efa6ea686f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java @@ -69,13 +69,11 @@ public interface ReplicaInPipeline extends Replica { * * @param isCreate if it is for creation * @param requestedChecksum the checksum the writer would prefer to use - * @param slowLogThresholdMs slow io threshold for logging * @return output streams for writing * @throws IOException if any error occurs */ public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum, long slowLogThresholdMs) - throws IOException; + DataChecksum requestedChecksum) throws IOException; /** * Create an output stream to write restart metadata in case of datanode diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java index 30f045f2e1b..57ec2b4bf8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.Closeable; import java.io.EOFException; import java.io.File; +import java.io.FileDescriptor; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; @@ -604,7 +605,7 @@ public interface FsDatasetSpi extends FSDatasetMBean { * submit a sync_file_range request to AsyncDiskService. */ void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block, - final ReplicaOutputStreams outs, final long offset, final long nbytes, + final FileDescriptor fd, final long offset, final long nbytes, final int flags); /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java index 54d0e964b5c..227179d4f84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaInputStreams.java @@ -18,45 +18,24 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.Closeable; -import java.io.FileDescriptor; -import java.io.FileInputStream; import java.io.InputStream; -import java.io.IOException; -import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.nativeio.NativeIO; -import org.apache.hadoop.io.nativeio.NativeIOException; -import org.slf4j.Logger; /** * Contains the input streams for the data and checksum of a replica. */ public class ReplicaInputStreams implements Closeable { - public static final Logger LOG = DataNode.LOG; - - private InputStream dataIn; - private InputStream checksumIn; - private FsVolumeReference volumeRef; - private FileDescriptor dataInFd = null; + private final InputStream dataIn; + private final InputStream checksumIn; + private final FsVolumeReference volumeRef; /** Create an object with a data input stream and a checksum input stream. */ - public ReplicaInputStreams(InputStream dataStream, - InputStream checksumStream, FsVolumeReference volumeRef) { + public ReplicaInputStreams(InputStream dataStream, InputStream checksumStream, + FsVolumeReference volumeRef) { this.volumeRef = volumeRef; this.dataIn = dataStream; this.checksumIn = checksumStream; - if (dataIn instanceof FileInputStream) { - try { - dataInFd = ((FileInputStream) dataIn).getFD(); - } catch (Exception e) { - LOG.warn("Could not get file descriptor for inputstream of class " + - this.dataIn.getClass()); - } - } else { - LOG.debug("Could not get file descriptor for inputstream of class " + - this.dataIn.getClass()); - } } /** @return the data input stream. */ @@ -69,81 +48,10 @@ public class ReplicaInputStreams implements Closeable { return checksumIn; } - public FileDescriptor getDataInFd() { - return dataInFd; - } - - public FsVolumeReference getVolumeRef() { - return volumeRef; - } - - public void readDataFully(byte[] buf, int off, int len) - throws IOException { - IOUtils.readFully(dataIn, buf, off, len); - } - - public void readChecksumFully(byte[] buf, int off, int len) - throws IOException { - IOUtils.readFully(checksumIn, buf, off, len); - } - - public void skipDataFully(long len) throws IOException { - IOUtils.skipFully(dataIn, len); - } - - public void skipChecksumFully(long len) throws IOException { - IOUtils.skipFully(checksumIn, len); - } - - public void closeChecksumStream() throws IOException { - IOUtils.closeStream(checksumIn); - checksumIn = null; - } - - public void dropCacheBehindReads(String identifier, long offset, long len, - int flags) throws NativeIOException { - assert this.dataInFd != null : "null dataInFd!"; - NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( - identifier, dataInFd, offset, len, flags); - } - - public void closeStreams() throws IOException { - IOException ioe = null; - if(checksumIn!=null) { - try { - checksumIn.close(); // close checksum file - } catch (IOException e) { - ioe = e; - } - checksumIn = null; - } - if(dataIn!=null) { - try { - dataIn.close(); // close data file - } catch (IOException e) { - ioe = e; - } - dataIn = null; - dataInFd = null; - } - if (volumeRef != null) { - IOUtils.cleanup(null, volumeRef); - volumeRef = null; - } - // throw IOException if there is any - if(ioe!= null) { - throw ioe; - } - } - @Override public void close() { IOUtils.closeStream(dataIn); - dataIn = null; - dataInFd = null; IOUtils.closeStream(checksumIn); - checksumIn = null; IOUtils.cleanup(null, volumeRef); - volumeRef = null; } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java index a66847a0cb0..bd1461a25fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ReplicaOutputStreams.java @@ -18,62 +18,32 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset; import java.io.Closeable; -import java.io.FileDescriptor; import java.io.FileOutputStream; import java.io.OutputStream; import java.io.IOException; -import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.nativeio.NativeIO; -import org.apache.hadoop.io.nativeio.NativeIOException; import org.apache.hadoop.util.DataChecksum; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; /** * Contains the output streams for the data and checksum of a replica. */ public class ReplicaOutputStreams implements Closeable { - public static final Logger LOG = DataNode.LOG; - - private FileDescriptor outFd = null; - /** Stream to block. */ - private OutputStream dataOut; - /** Stream to checksum. */ + private final OutputStream dataOut; private final OutputStream checksumOut; private final DataChecksum checksum; private final boolean isTransientStorage; - private final long slowLogThresholdMs; /** * Create an object with a data output stream, a checksum output stream * and a checksum. */ - public ReplicaOutputStreams(OutputStream dataOut, - OutputStream checksumOut, DataChecksum checksum, - boolean isTransientStorage, long slowLogThresholdMs) { + public ReplicaOutputStreams(OutputStream dataOut, OutputStream checksumOut, + DataChecksum checksum, boolean isTransientStorage) { this.dataOut = dataOut; - this.checksum = checksum; - this.slowLogThresholdMs = slowLogThresholdMs; - this.isTransientStorage = isTransientStorage; this.checksumOut = checksumOut; - - try { - if (this.dataOut instanceof FileOutputStream) { - this.outFd = ((FileOutputStream)this.dataOut).getFD(); - } else { - LOG.debug("Could not get file descriptor for outputstream of class " + - this.dataOut.getClass()); - } - } catch (IOException e) { - LOG.warn("Could not get file descriptor for outputstream of class " + - this.dataOut.getClass()); - } - } - - public FileDescriptor getOutFd() { - return outFd; + this.checksum = checksum; + this.isTransientStorage = isTransientStorage; } /** @return the data output stream. */ @@ -102,17 +72,12 @@ public class ReplicaOutputStreams implements Closeable { IOUtils.closeStream(checksumOut); } - public void closeDataStream() throws IOException { - dataOut.close(); - dataOut = null; - } - /** * Sync the data stream if it supports it. */ public void syncDataOut() throws IOException { if (dataOut instanceof FileOutputStream) { - sync((FileOutputStream)dataOut); + ((FileOutputStream)dataOut).getChannel().force(true); } } @@ -121,68 +86,8 @@ public class ReplicaOutputStreams implements Closeable { */ public void syncChecksumOut() throws IOException { if (checksumOut instanceof FileOutputStream) { - sync((FileOutputStream)checksumOut); + ((FileOutputStream)checksumOut).getChannel().force(true); } } - /** - * Flush the data stream if it supports it. - */ - public void flushDataOut() throws IOException { - flush(dataOut); - } - - /** - * Flush the checksum stream if it supports it. - */ - public void flushChecksumOut() throws IOException { - flush(checksumOut); - } - - private void flush(OutputStream dos) throws IOException { - long begin = Time.monotonicNow(); - dos.flush(); - long duration = Time.monotonicNow() - begin; - LOG.trace("ReplicaOutputStreams#flush takes {} ms.", duration); - if (duration > slowLogThresholdMs) { - LOG.warn("Slow flush took {} ms (threshold={} ms)", duration, - slowLogThresholdMs); - } - } - - private void sync(FileOutputStream fos) throws IOException { - long begin = Time.monotonicNow(); - fos.getChannel().force(true); - long duration = Time.monotonicNow() - begin; - LOG.trace("ReplicaOutputStreams#sync takes {} ms.", duration); - if (duration > slowLogThresholdMs) { - LOG.warn("Slow fsync took {} ms (threshold={} ms)", duration, - slowLogThresholdMs); - } - } - - public long writeToDisk(byte[] b, int off, int len) throws IOException { - long begin = Time.monotonicNow(); - dataOut.write(b, off, len); - long duration = Time.monotonicNow() - begin; - LOG.trace("DatanodeIO#writeToDisk takes {} ms.", duration); - if (duration > slowLogThresholdMs) { - LOG.warn("Slow BlockReceiver write data to disk cost: {} ms " + - "(threshold={} ms)", duration, slowLogThresholdMs); - } - return duration; - } - - public void syncFileRangeIfPossible(long offset, long nbytes, - int flags) throws NativeIOException { - assert this.outFd != null : "null outFd!"; - NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, nbytes, flags); - } - - public void dropCacheBehindWrites(String identifier, - long offset, long len, int flags) throws NativeIOException { - assert this.outFd != null : "null outFd!"; - NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible( - identifier, outFd, offset, len, flags); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java index 8323140e4d6..29dbb2963ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java @@ -49,13 +49,11 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.DataStorage; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; -import org.apache.hadoop.hdfs.server.datanode.LocalReplica; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; - import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker; @@ -147,7 +145,7 @@ class BlockPoolSlice { // this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP); if (tmpDir.exists()) { - DataStorage.fullyDelete(tmpDir); + FileUtil.fullyDelete(tmpDir); } this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW); if (!rbwDir.mkdirs()) { // create rbw directory if not exist @@ -438,7 +436,7 @@ class BlockPoolSlice { final File targetMetaFile = new File(targetDir, metaFile.getName()); try { - LocalReplica.rename(metaFile, targetMetaFile); + NativeIO.renameTo(metaFile, targetMetaFile); } catch (IOException e) { LOG.warn("Failed to move meta file from " + metaFile + " to " + targetMetaFile, e); @@ -448,7 +446,7 @@ class BlockPoolSlice { final File targetBlockFile = new File(targetDir, blockFile.getName()); try { - LocalReplica.rename(blockFile, targetBlockFile); + NativeIO.renameTo(blockFile, targetBlockFile); } catch (IOException e) { LOG.warn("Failed to move block file from " + blockFile + " to " + targetBlockFile, e); @@ -690,7 +688,6 @@ class BlockPoolSlice { * @return the number of valid bytes */ private long validateIntegrityAndSetLength(File blockFile, long genStamp) { - ReplicaInputStreams ris = null; DataInputStream checksumIn = null; InputStream blockIn = null; try { @@ -717,22 +714,21 @@ class BlockPoolSlice { if (numChunks == 0) { return 0; } + IOUtils.skipFully(checksumIn, (numChunks-1)*checksumSize); blockIn = new FileInputStream(blockFile); - ris = new ReplicaInputStreams(blockIn, checksumIn, - volume.obtainReference()); - ris.skipChecksumFully((numChunks-1)*checksumSize); long lastChunkStartPos = (numChunks-1)*bytesPerChecksum; - ris.skipDataFully(lastChunkStartPos); + IOUtils.skipFully(blockIn, lastChunkStartPos); int lastChunkSize = (int)Math.min( bytesPerChecksum, blockFileLen-lastChunkStartPos); byte[] buf = new byte[lastChunkSize+checksumSize]; - ris.readChecksumFully(buf, lastChunkSize, checksumSize); - ris.readDataFully(buf, 0, lastChunkSize); + checksumIn.readFully(buf, lastChunkSize, checksumSize); + IOUtils.readFully(blockIn, buf, 0, lastChunkSize); + checksum.update(buf, 0, lastChunkSize); long validFileLength; if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc validFileLength = lastChunkStartPos + lastChunkSize; - } else { // last chunk is corrupt + } else { // last chunck is corrupt validFileLength = lastChunkStartPos; } @@ -752,12 +748,8 @@ class BlockPoolSlice { FsDatasetImpl.LOG.warn(e); return 0; } finally { - if (ris != null) { - ris.close(); - } else { - IOUtils.closeStream(checksumIn); - IOUtils.closeStream(blockIn); - } + IOUtils.closeStream(checksumIn); + IOUtils.closeStream(blockIn); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java index 97dcf8d4a14..b9c731be5ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; import java.io.File; +import java.io.FileDescriptor; import java.io.IOException; import java.util.HashMap; import java.util.HashSet; @@ -37,9 +38,9 @@ import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; -import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIOException; /** @@ -201,13 +202,13 @@ class FsDatasetAsyncDiskService { } public void submitSyncFileRangeRequest(FsVolumeImpl volume, - final ReplicaOutputStreams streams, final long offset, final long nbytes, + final FileDescriptor fd, final long offset, final long nbytes, final int flags) { execute(volume, new Runnable() { @Override public void run() { try { - streams.syncFileRangeIfPossible(offset, nbytes, flags); + NativeIO.POSIX.syncFileRangeIfPossible(fd, offset, nbytes, flags); } catch (NativeIOException e) { LOG.warn("sync_file_range error", e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 6065df230c5..954d6ef7b69 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -21,6 +21,7 @@ import java.io.BufferedOutputStream; import java.io.DataOutputStream; import java.io.EOFException; import java.io.File; +import java.io.FileDescriptor; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; @@ -2754,9 +2755,9 @@ class FsDatasetImpl implements FsDatasetSpi { @Override public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, - ReplicaOutputStreams outs, long offset, long nbytes, int flags) { + FileDescriptor fd, long offset, long nbytes, int flags) { FsVolumeImpl fsVolumeImpl = this.getVolume(block); - asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, outs, offset, + asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, fd, offset, nbytes, flags); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java index 08564de6898..a231e03e647 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java @@ -1067,7 +1067,7 @@ public class FsVolumeImpl implements FsVolumeSpi { DataStorage.STORAGE_DIR_LAZY_PERSIST); File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW); if (force) { - DataStorage.fullyDelete(bpDir); + FileUtil.fullyDelete(bpDir); } else { if (!rbwDir.delete()) { throw new IOException("Failed to delete " + rbwDir); @@ -1081,7 +1081,7 @@ public class FsVolumeImpl implements FsVolumeSpi { !FileUtil.fullyDelete(lazypersistDir)))) { throw new IOException("Failed to delete " + lazypersistDir); } - DataStorage.fullyDelete(tmpDir); + FileUtil.fullyDelete(tmpDir); for (File f : FileUtil.listFiles(bpCurrentDir)) { if (!f.delete()) { throw new IOException("Failed to delete " + f); @@ -1437,3 +1437,4 @@ public class FsVolumeImpl implements FsVolumeSpi { replicaState); } } + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java index e963d416370..20cec6a2c8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileAppend.java @@ -701,7 +701,7 @@ public class TestFileAppend{ ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaHandler.getReplica(); ReplicaOutputStreams - outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM, 300); + outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM); OutputStream dataOutput = outputStreams.getDataOut(); byte[] appendBytes = new byte[1]; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index f0266ef31b0..5d63d079d9d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -17,7 +17,11 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import java.io.*; +import java.io.File; +import java.io.FileDescriptor; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.net.URI; import java.nio.channels.ClosedChannelException; import java.util.Collection; @@ -257,15 +261,14 @@ public class SimulatedFSDataset implements FsDatasetSpi { @Override synchronized public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum, long slowLogThresholdMs) - throws IOException { + DataChecksum requestedChecksum) throws IOException { if (finalized) { throw new IOException("Trying to write to a finalized replica " + theBlock); } else { SimulatedOutputStream crcStream = new SimulatedOutputStream(); return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum, - volume.isTransientStorage(), slowLogThresholdMs); + volume.isTransientStorage()); } } @@ -1361,7 +1364,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { @Override public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, - ReplicaOutputStreams outs, long offset, long nbytes, int flags) { + FileDescriptor fd, long offset, long nbytes, int flags) { throw new UnsupportedOperationException(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 8439991056a..619eda00408 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -673,7 +673,7 @@ public class TestBlockRecovery { ReplicaOutputStreams streams = null; try { streams = replicaInfo.createStreams(true, - DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300); + DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); streams.getChecksumOut().write('a'); dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1)); BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous = diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index fa980c299ef..4e724bc7cb8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -83,7 +83,7 @@ public class TestSimulatedFSDataset { ReplicaInPipeline bInfo = fsdataset.createRbw( StorageType.DEFAULT, b, false).getReplica(); ReplicaOutputStreams out = bInfo.createStreams(true, - DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300); + DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); try { OutputStream dataOut = out.getDataOut(); assertEquals(0, fsdataset.getLength(b)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 2417c9de9e1..41663464f7c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -318,8 +318,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi { } @Override - public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, - ReplicaOutputStreams outs, long offset, long nbytes, int flags) { + public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, FileDescriptor fd, long offset, long nbytes, int flags) { + } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java index 6fa283094d1..90c3b8a56f9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalReplicaInPipeline.java @@ -58,10 +58,8 @@ public class ExternalReplicaInPipeline implements ReplicaInPipeline { @Override public ReplicaOutputStreams createStreams(boolean isCreate, - DataChecksum requestedChecksum, long slowLogThresholdMs) - throws IOException { - return new ReplicaOutputStreams(null, null, requestedChecksum, false, - slowLogThresholdMs); + DataChecksum requestedChecksum) throws IOException { + return new ReplicaOutputStreams(null, null, requestedChecksum, false); } @Override