Revert "HDFS-10930. Refactor: Wrap Datanode IO related operations. Contributed by Xiaoyu Yao."

This reverts commit 4c9ca47386.
This commit is contained in:
Xiaoyu Yao 2016-12-09 21:37:13 -08:00
parent 4c9ca47386
commit 0bdaa23334
20 changed files with 213 additions and 383 deletions

View File

@ -24,7 +24,10 @@ import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.ByteBuffer;
@ -50,6 +53,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
@ -84,6 +88,8 @@ class BlockReceiver implements Closeable {
* the DataNode needs to recalculate checksums before writing.
*/
private final boolean needsChecksumTranslation;
private OutputStream out = null; // to block file at local disk
private FileDescriptor outFd;
private DataOutputStream checksumOut = null; // to crc file at local disk
private final int bytesPerChecksum;
private final int checksumSize;
@ -244,8 +250,7 @@ class BlockReceiver implements Closeable {
final boolean isCreate = isDatanode || isTransfer
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
streams = replicaInfo.createStreams(isCreate, requestedChecksum,
datanodeSlowLogThresholdMs);
streams = replicaInfo.createStreams(isCreate, requestedChecksum);
assert streams != null : "null streams!";
// read checksum meta information
@ -255,6 +260,13 @@ class BlockReceiver implements Closeable {
this.bytesPerChecksum = diskChecksum.getBytesPerChecksum();
this.checksumSize = diskChecksum.getChecksumSize();
this.out = streams.getDataOut();
if (out instanceof FileOutputStream) {
this.outFd = ((FileOutputStream)out).getFD();
} else {
LOG.warn("Could not get file descriptor for outputstream of class " +
out.getClass());
}
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
streams.getChecksumOut(), DFSUtilClient.getSmallBufferSize(
datanode.getConf())));
@ -307,7 +319,7 @@ class BlockReceiver implements Closeable {
packetReceiver.close();
IOException ioe = null;
if (syncOnClose && (streams.getDataOut() != null || checksumOut != null)) {
if (syncOnClose && (out != null || checksumOut != null)) {
datanode.metrics.incrFsyncCount();
}
long flushTotalNanos = 0;
@ -336,9 +348,9 @@ class BlockReceiver implements Closeable {
}
// close block file
try {
if (streams.getDataOut() != null) {
if (out != null) {
long flushStartNanos = System.nanoTime();
streams.flushDataOut();
out.flush();
long flushEndNanos = System.nanoTime();
if (syncOnClose) {
long fsyncStartNanos = flushEndNanos;
@ -347,13 +359,14 @@ class BlockReceiver implements Closeable {
}
flushTotalNanos += flushEndNanos - flushStartNanos;
measuredFlushTime = true;
streams.closeDataStream();
out.close();
out = null;
}
} catch (IOException e) {
ioe = e;
}
finally{
streams.close();
IOUtils.closeStream(out);
}
if (replicaHandler != null) {
IOUtils.cleanup(null, replicaHandler);
@ -406,9 +419,9 @@ class BlockReceiver implements Closeable {
}
flushTotalNanos += flushEndNanos - flushStartNanos;
}
if (streams.getDataOut() != null) {
if (out != null) {
long flushStartNanos = System.nanoTime();
streams.flushDataOut();
out.flush();
long flushEndNanos = System.nanoTime();
if (isSync) {
long fsyncStartNanos = flushEndNanos;
@ -417,10 +430,10 @@ class BlockReceiver implements Closeable {
}
flushTotalNanos += flushEndNanos - flushStartNanos;
}
if (checksumOut != null || streams.getDataOut() != null) {
if (checksumOut != null || out != null) {
datanode.metrics.addFlushNanos(flushTotalNanos);
if (isSync) {
datanode.metrics.incrFsyncCount();
datanode.metrics.incrFsyncCount();
}
}
long duration = Time.monotonicNow() - begin;
@ -703,12 +716,16 @@ class BlockReceiver implements Closeable {
int numBytesToDisk = (int)(offsetInBlock-onDiskLen);
// Write data to disk.
long duration = streams.writeToDisk(dataBuf.array(),
startByteToDisk, numBytesToDisk);
long begin = Time.monotonicNow();
out.write(dataBuf.array(), startByteToDisk, numBytesToDisk);
long duration = Time.monotonicNow() - begin;
if (duration > maxWriteToDiskMs) {
maxWriteToDiskMs = duration;
}
if (duration > datanodeSlowLogThresholdMs) {
LOG.warn("Slow BlockReceiver write data to disk cost:" + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
}
final byte[] lastCrc;
if (shouldNotWriteChecksum) {
@ -825,7 +842,7 @@ class BlockReceiver implements Closeable {
private void manageWriterOsCache(long offsetInBlock) {
try {
if (streams.getOutFd() != null &&
if (outFd != null &&
offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) {
long begin = Time.monotonicNow();
//
@ -840,11 +857,12 @@ class BlockReceiver implements Closeable {
if (syncBehindWrites) {
if (syncBehindWritesInBackground) {
this.datanode.getFSDataset().submitBackgroundSyncFileRangeRequest(
block, streams, lastCacheManagementOffset,
block, outFd, lastCacheManagementOffset,
offsetInBlock - lastCacheManagementOffset,
SYNC_FILE_RANGE_WRITE);
} else {
streams.syncFileRangeIfPossible(lastCacheManagementOffset,
NativeIO.POSIX.syncFileRangeIfPossible(outFd,
lastCacheManagementOffset,
offsetInBlock - lastCacheManagementOffset,
SYNC_FILE_RANGE_WRITE);
}
@ -861,8 +879,8 @@ class BlockReceiver implements Closeable {
//
long dropPos = lastCacheManagementOffset - CACHE_DROP_LAG_BYTES;
if (dropPos > 0 && dropCacheBehindWrites) {
streams.dropCacheBehindWrites(block.getBlockName(), 0, dropPos,
POSIX_FADV_DONTNEED);
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
block.getBlockName(), outFd, 0, dropPos, POSIX_FADV_DONTNEED);
}
lastCacheManagementOffset = offsetInBlock;
long duration = Time.monotonicNow() - begin;
@ -971,7 +989,7 @@ class BlockReceiver implements Closeable {
// The worst case is not recovering this RBW replica.
// Client will fall back to regular pipeline recovery.
} finally {
IOUtils.closeStream(streams.getDataOut());
IOUtils.closeStream(out);
}
try {
// Even if the connection is closed after the ack packet is
@ -1029,8 +1047,8 @@ class BlockReceiver implements Closeable {
* will be overwritten.
*/
private void adjustCrcFilePosition() throws IOException {
if (streams.getDataOut() != null) {
streams.flushDataOut();
if (out != null) {
out.flush();
}
if (checksumOut != null) {
checksumOut.flush();
@ -1076,10 +1094,10 @@ class BlockReceiver implements Closeable {
byte[] crcbuf = new byte[checksumSize];
try (ReplicaInputStreams instr =
datanode.data.getTmpInputStreams(block, blkoff, ckoff)) {
instr.readDataFully(buf, 0, sizePartialChunk);
IOUtils.readFully(instr.getDataIn(), buf, 0, sizePartialChunk);
// open meta file and read in crc value computer earlier
instr.readChecksumFully(crcbuf, 0, crcbuf.length);
IOUtils.readFully(instr.getChecksumIn(), crcbuf, 0, crcbuf.length);
}
// compute crc of partial chunk from data read in the block file.

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
@ -40,11 +41,11 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DataChecksum;
@ -119,11 +120,12 @@ class BlockSender implements java.io.Closeable {
/** the block to read from */
private final ExtendedBlock block;
/** InputStreams and file descriptors to read block/checksum. */
private ReplicaInputStreams ris;
/** Stream to read block data from */
private InputStream blockIn;
/** updated while using transferTo() */
private long blockInPosition = -1;
/** Stream to read checksum */
private DataInputStream checksumIn;
/** Checksum utility */
private final DataChecksum checksum;
/** Initial position to read */
@ -150,6 +152,11 @@ class BlockSender implements java.io.Closeable {
private final String clientTraceFmt;
private volatile ChunkChecksum lastChunkChecksum = null;
private DataNode datanode;
/** The file descriptor of the block being sent */
private FileDescriptor blockInFd;
/** The reference to the volume where the block is located */
private FsVolumeReference volumeRef;
/** The replica of the block that is being read. */
private final Replica replica;
@ -194,9 +201,6 @@ class BlockSender implements java.io.Closeable {
boolean sendChecksum, DataNode datanode, String clientTraceFmt,
CachingStrategy cachingStrategy)
throws IOException {
InputStream blockIn = null;
DataInputStream checksumIn = null;
FsVolumeReference volumeRef = null;
try {
this.block = block;
this.corruptChecksumOk = corruptChecksumOk;
@ -277,7 +281,7 @@ class BlockSender implements java.io.Closeable {
(!is32Bit || length <= Integer.MAX_VALUE);
// Obtain a reference before reading data
volumeRef = datanode.data.getVolume(block).obtainReference();
this.volumeRef = datanode.data.getVolume(block).obtainReference();
/*
* (corruptChecksumOK, meta_file_exist): operation
@ -401,9 +405,14 @@ class BlockSender implements java.io.Closeable {
DataNode.LOG.debug("replica=" + replica);
}
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
ris = new ReplicaInputStreams(blockIn, checksumIn, volumeRef);
if (blockIn instanceof FileInputStream) {
blockInFd = ((FileInputStream)blockIn).getFD();
} else {
blockInFd = null;
}
} catch (IOException ioe) {
IOUtils.closeStream(this);
IOUtils.closeStream(blockIn);
throw ioe;
}
}
@ -413,11 +422,12 @@ class BlockSender implements java.io.Closeable {
*/
@Override
public void close() throws IOException {
if (ris.getDataInFd() != null &&
if (blockInFd != null &&
((dropCacheBehindAllReads) ||
(dropCacheBehindLargeReads && isLongRead()))) {
try {
ris.dropCacheBehindReads(block.getBlockName(), lastCacheDropOffset,
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
block.getBlockName(), blockInFd, lastCacheDropOffset,
offset - lastCacheDropOffset, POSIX_FADV_DONTNEED);
} catch (Exception e) {
LOG.warn("Unable to drop cache on file close", e);
@ -426,12 +436,32 @@ class BlockSender implements java.io.Closeable {
if (curReadahead != null) {
curReadahead.cancel();
}
try {
ris.closeStreams();
} finally {
IOUtils.closeStream(ris);
ris = null;
IOException ioe = null;
if(checksumIn!=null) {
try {
checksumIn.close(); // close checksum file
} catch (IOException e) {
ioe = e;
}
checksumIn = null;
}
if(blockIn!=null) {
try {
blockIn.close(); // close data file
} catch (IOException e) {
ioe = e;
}
blockIn = null;
blockInFd = null;
}
if (volumeRef != null) {
IOUtils.cleanup(null, volumeRef);
volumeRef = null;
}
// throw IOException if there is any
if(ioe!= null) {
throw ioe;
}
}
@ -535,7 +565,7 @@ class BlockSender implements java.io.Closeable {
int checksumOff = pkt.position();
byte[] buf = pkt.array();
if (checksumSize > 0 && ris.getChecksumIn() != null) {
if (checksumSize > 0 && checksumIn != null) {
readChecksum(buf, checksumOff, checksumDataLen);
// write in progress that we need to use to get last checksum
@ -551,7 +581,7 @@ class BlockSender implements java.io.Closeable {
int dataOff = checksumOff + checksumDataLen;
if (!transferTo) { // normal transfer
ris.readDataFully(buf, dataOff, dataLen);
IOUtils.readFully(blockIn, buf, dataOff, dataLen);
if (verifyChecksum) {
verifyChecksum(buf, dataOff, dataLen, numChunks, checksumOff);
@ -563,12 +593,12 @@ class BlockSender implements java.io.Closeable {
SocketOutputStream sockOut = (SocketOutputStream)out;
// First write header and checksums
sockOut.write(buf, headerOff, dataOff - headerOff);
// no need to flush since we know out is not a buffered stream
FileChannel fileCh = ((FileInputStream)ris.getDataIn()).getChannel();
FileChannel fileCh = ((FileInputStream)blockIn).getChannel();
LongWritable waitTime = new LongWritable();
LongWritable transferTime = new LongWritable();
sockOut.transferToFully(fileCh, blockInPosition, dataLen,
sockOut.transferToFully(fileCh, blockInPosition, dataLen,
waitTime, transferTime);
datanode.metrics.addSendDataPacketBlockedOnNetworkNanos(waitTime.get());
datanode.metrics.addSendDataPacketTransferNanos(transferTime.get());
@ -600,7 +630,7 @@ class BlockSender implements java.io.Closeable {
if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
LOG.error("BlockSender.sendChunks() exception: ", e);
datanode.getBlockScanner().markSuspectBlock(
ris.getVolumeRef().getVolume().getStorageID(),
volumeRef.getVolume().getStorageID(),
block);
}
}
@ -623,15 +653,16 @@ class BlockSender implements java.io.Closeable {
*/
private void readChecksum(byte[] buf, final int checksumOffset,
final int checksumLen) throws IOException {
if (checksumSize <= 0 && ris.getChecksumIn() == null) {
if (checksumSize <= 0 && checksumIn == null) {
return;
}
try {
ris.readChecksumFully(buf, checksumOffset, checksumLen);
checksumIn.readFully(buf, checksumOffset, checksumLen);
} catch (IOException e) {
LOG.warn(" Could not read or failed to verify checksum for data"
+ " at offset " + offset + " for block " + block, e);
ris.closeChecksumStream();
IOUtils.closeStream(checksumIn);
checksumIn = null;
if (corruptChecksumOk) {
if (checksumOffset < checksumLen) {
// Just fill the array with zeros.
@ -715,10 +746,10 @@ class BlockSender implements java.io.Closeable {
lastCacheDropOffset = initialOffset;
if (isLongRead() && ris.getDataInFd() != null) {
if (isLongRead() && blockInFd != null) {
// Advise that this file descriptor will be accessed sequentially.
ris.dropCacheBehindReads(block.getBlockName(), 0, 0,
POSIX_FADV_SEQUENTIAL);
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
block.getBlockName(), blockInFd, 0, 0, POSIX_FADV_SEQUENTIAL);
}
// Trigger readahead of beginning of file if configured.
@ -730,10 +761,9 @@ class BlockSender implements java.io.Closeable {
int pktBufSize = PacketHeader.PKT_MAX_HEADER_LEN;
boolean transferTo = transferToAllowed && !verifyChecksum
&& baseStream instanceof SocketOutputStream
&& ris.getDataIn() instanceof FileInputStream;
&& blockIn instanceof FileInputStream;
if (transferTo) {
FileChannel fileChannel =
((FileInputStream)ris.getDataIn()).getChannel();
FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
blockInPosition = fileChannel.position();
streamForSendChunks = baseStream;
maxChunksPerPacket = numberOfChunks(TRANSFERTO_BUFFER_SIZE);
@ -788,16 +818,14 @@ class BlockSender implements java.io.Closeable {
private void manageOsCache() throws IOException {
// We can't manage the cache for this block if we don't have a file
// descriptor to work with.
if (ris.getDataInFd() == null) {
return;
}
if (blockInFd == null) return;
// Perform readahead if necessary
if ((readaheadLength > 0) && (datanode.readaheadPool != null) &&
(alwaysReadahead || isLongRead())) {
curReadahead = datanode.readaheadPool.readaheadStream(
clientTraceFmt, ris.getDataInFd(), offset, readaheadLength,
Long.MAX_VALUE, curReadahead);
clientTraceFmt, blockInFd, offset, readaheadLength, Long.MAX_VALUE,
curReadahead);
}
// Drop what we've just read from cache, since we aren't
@ -807,7 +835,8 @@ class BlockSender implements java.io.Closeable {
long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
if (offset >= nextCacheDropOffset) {
long dropLength = offset - lastCacheDropOffset;
ris.dropCacheBehindReads(block.getBlockName(), lastCacheDropOffset,
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
block.getBlockName(), blockInFd, lastCacheDropOffset,
dropLength, POSIX_FADV_DONTNEED);
lastCacheDropOffset = offset;
}

View File

@ -366,10 +366,6 @@ public class DNConf {
return volsConfigured;
}
public long getSlowIoWarningThresholdMs() {
return datanodeSlowIoWarningThresholdMs;
}
int getMaxDataLength() {
return maxDataLength;
}

View File

@ -1394,9 +1394,4 @@ public class DataStorage extends Storage {
synchronized void removeBlockPoolStorage(String bpId) {
bpStorageMap.remove(bpId);
}
public static boolean fullyDelete(final File dir) {
boolean result = FileUtil.fullyDelete(dir);
return result;
}
}

View File

@ -130,7 +130,7 @@ public class ReplicaInPipeline extends ReplicaInfo
public long getBytesAcked() {
return bytesAcked;
}
@Override // ReplicaInPipelineInterface
public void setBytesAcked(long bytesAcked) {
long newBytesAcked = bytesAcked - this.bytesAcked;
@ -234,8 +234,7 @@ public class ReplicaInPipeline extends ReplicaInfo
@Override // ReplicaInPipelineInterface
public ReplicaOutputStreams createStreams(boolean isCreate,
DataChecksum requestedChecksum, long slowLogThresholdMs)
throws IOException {
DataChecksum requestedChecksum) throws IOException {
File blockFile = getBlockFile();
File metaFile = getMetaFile();
if (DataNode.LOG.isDebugEnabled()) {
@ -246,13 +245,13 @@ public class ReplicaInPipeline extends ReplicaInfo
}
long blockDiskSize = 0L;
long crcDiskSize = 0L;
// the checksum that should actually be used -- this
// may differ from requestedChecksum for appends.
final DataChecksum checksum;
RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
if (!isCreate) {
// For append or recovery, we must enforce the existing checksum.
// Also, verify that the file has correct lengths, etc.
@ -260,14 +259,14 @@ public class ReplicaInPipeline extends ReplicaInfo
try {
BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF);
checksum = header.getChecksum();
if (checksum.getBytesPerChecksum() !=
requestedChecksum.getBytesPerChecksum()) {
throw new IOException("Client requested checksum " +
requestedChecksum + " when appending to an existing block " +
"with different chunk size: " + checksum);
}
int bytesPerChunk = checksum.getBytesPerChecksum();
int checksumSize = checksum.getChecksumSize();
@ -289,19 +288,19 @@ public class ReplicaInPipeline extends ReplicaInfo
// for create, we can use the requested checksum
checksum = requestedChecksum;
}
FileOutputStream blockOut = null;
FileOutputStream crcOut = null;
try {
blockOut = new FileOutputStream(
new RandomAccessFile( blockFile, "rw" ).getFD() );
crcOut = new FileOutputStream(metaRAF.getFD());
crcOut = new FileOutputStream(metaRAF.getFD() );
if (!isCreate) {
blockOut.getChannel().position(blockDiskSize);
crcOut.getChannel().position(crcDiskSize);
}
return new ReplicaOutputStreams(blockOut, crcOut, checksum,
getVolume().isTransientStorage(), slowLogThresholdMs);
getVolume().isTransientStorage());
} catch (IOException e) {
IOUtils.closeStream(blockOut);
IOUtils.closeStream(metaRAF);

View File

@ -69,13 +69,11 @@ public interface ReplicaInPipelineInterface extends Replica {
*
* @param isCreate if it is for creation
* @param requestedChecksum the checksum the writer would prefer to use
* @param slowLogThresholdMs threshold in ms to log slow io operation
* @return output streams for writing
* @throws IOException if any error occurs
*/
public ReplicaOutputStreams createStreams(boolean isCreate,
DataChecksum requestedChecksum, long slowLogThresholdMs)
throws IOException;
DataChecksum requestedChecksum) throws IOException;
/**
* Create an output stream to write restart metadata in case of datanode

View File

@ -20,8 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@ -30,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.LightWeightResizableGSet;
@ -233,7 +230,7 @@ abstract public class ReplicaInfo extends Block
try {
FileOutputStream out = new FileOutputStream(tmpFile);
try {
copyBytes(in, out, 16 * 1024);
IOUtils.copyBytes(in, out, 16 * 1024);
} finally {
out.close();
}
@ -245,7 +242,7 @@ abstract public class ReplicaInfo extends Block
" into file " + tmpFile +
" resulted in a size of " + tmpFile.length());
}
replaceFile(tmpFile, file);
FileUtil.replaceFile(tmpFile, file);
} catch (IOException e) {
boolean done = tmpFile.delete();
if (!done) {
@ -280,13 +277,13 @@ abstract public class ReplicaInfo extends Block
}
File meta = getMetaFile();
int linkCount = getHardLinkCount(file);
int linkCount = HardLink.getLinkCount(file);
if (linkCount > 1) {
DataNode.LOG.info("Breaking hardlink for " + linkCount + "x-linked " +
"block " + this);
breakHardlinks(file, this);
}
if (getHardLinkCount(meta) > 1) {
if (HardLink.getLinkCount(meta) > 1) {
breakHardlinks(meta, this);
}
return true;
@ -318,27 +315,4 @@ abstract public class ReplicaInfo extends Block
public void setNext(LightWeightResizableGSet.LinkedElement next) {
this.next = next;
}
public static boolean fullyDelete(final File dir) {
boolean result = DataStorage.fullyDelete(dir);
return result;
}
public static int getHardLinkCount(File fileName) throws IOException {
int linkCount = HardLink.getLinkCount(fileName);
return linkCount;
}
public static void rename(File from, File to) throws IOException {
Storage.rename(from, to);
}
private void copyBytes(InputStream in, OutputStream out, int
buffSize) throws IOException{
IOUtils.copyBytes(in, out, buffSize);
}
private void replaceFile(File src, File target) throws IOException {
FileUtil.replaceFile(src, target);
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
@ -610,7 +611,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* submit a sync_file_range request to AsyncDiskService.
*/
void submitBackgroundSyncFileRangeRequest(final ExtendedBlock block,
final ReplicaOutputStreams outs, final long offset, final long nbytes,
final FileDescriptor fd, final long offset, final long nbytes,
final int flags);
/**

View File

@ -18,45 +18,24 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.Closeable;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.slf4j.Logger;
/**
* Contains the input streams for the data and checksum of a replica.
*/
public class ReplicaInputStreams implements Closeable {
public static final Logger LOG = DataNode.LOG;
private InputStream dataIn;
private InputStream checksumIn;
private FsVolumeReference volumeRef;
private FileDescriptor dataInFd = null;
private final InputStream dataIn;
private final InputStream checksumIn;
private final FsVolumeReference volumeRef;
/** Create an object with a data input stream and a checksum input stream. */
public ReplicaInputStreams(InputStream dataStream,
InputStream checksumStream, FsVolumeReference volumeRef) {
public ReplicaInputStreams(InputStream dataStream, InputStream checksumStream,
FsVolumeReference volumeRef) {
this.volumeRef = volumeRef;
this.dataIn = dataStream;
this.checksumIn = checksumStream;
if (dataIn instanceof FileInputStream) {
try {
dataInFd = ((FileInputStream) dataIn).getFD();
} catch (Exception e) {
LOG.warn("Could not get file descriptor for inputstream of class " +
this.dataIn.getClass());
}
} else {
LOG.debug("Could not get file descriptor for inputstream of class " +
this.dataIn.getClass());
}
}
/** @return the data input stream. */
@ -69,81 +48,10 @@ public class ReplicaInputStreams implements Closeable {
return checksumIn;
}
public FileDescriptor getDataInFd() {
return dataInFd;
}
public FsVolumeReference getVolumeRef() {
return volumeRef;
}
public void readDataFully(byte[] buf, int off, int len)
throws IOException {
IOUtils.readFully(dataIn, buf, off, len);
}
public void readChecksumFully(byte[] buf, int off, int len)
throws IOException {
IOUtils.readFully(checksumIn, buf, off, len);
}
public void skipDataFully(long len) throws IOException {
IOUtils.skipFully(dataIn, len);
}
public void skipChecksumFully(long len) throws IOException {
IOUtils.skipFully(checksumIn, len);
}
public void closeChecksumStream() throws IOException {
IOUtils.closeStream(checksumIn);
checksumIn = null;
}
public void dropCacheBehindReads(String identifier, long offset, long len,
int flags) throws NativeIOException {
assert this.dataInFd != null : "null dataInFd!";
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
identifier, dataInFd, offset, len, flags);
}
public void closeStreams() throws IOException {
IOException ioe = null;
if(checksumIn!=null) {
try {
checksumIn.close(); // close checksum file
} catch (IOException e) {
ioe = e;
}
checksumIn = null;
}
if(dataIn!=null) {
try {
dataIn.close(); // close data file
} catch (IOException e) {
ioe = e;
}
dataIn = null;
dataInFd = null;
}
if (volumeRef != null) {
IOUtils.cleanup(null, volumeRef);
volumeRef = null;
}
// throw IOException if there is any
if(ioe!= null) {
throw ioe;
}
}
@Override
public void close() {
IOUtils.closeStream(dataIn);
dataIn = null;
dataInFd = null;
IOUtils.closeStream(checksumIn);
checksumIn = null;
IOUtils.cleanup(null, volumeRef);
volumeRef = null;
}
}

View File

@ -18,62 +18,32 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.Closeable;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.OutputStream;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIOException;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
/**
* Contains the output streams for the data and checksum of a replica.
*/
public class ReplicaOutputStreams implements Closeable {
public static final Logger LOG = DataNode.LOG;
private FileDescriptor outFd = null;
/** Stream to block. */
private OutputStream dataOut;
/** Stream to checksum. */
private final OutputStream dataOut;
private final OutputStream checksumOut;
private final DataChecksum checksum;
private final boolean isTransientStorage;
private final long slowLogThresholdMs;
/**
* Create an object with a data output stream, a checksum output stream
* and a checksum.
*/
public ReplicaOutputStreams(OutputStream dataOut,
OutputStream checksumOut, DataChecksum checksum,
boolean isTransientStorage, long slowLogThresholdMs) {
public ReplicaOutputStreams(OutputStream dataOut, OutputStream checksumOut,
DataChecksum checksum, boolean isTransientStorage) {
this.dataOut = dataOut;
this.checksum = checksum;
this.slowLogThresholdMs = slowLogThresholdMs;
this.isTransientStorage = isTransientStorage;
this.checksumOut = checksumOut;
try {
if (this.dataOut instanceof FileOutputStream) {
this.outFd = ((FileOutputStream)this.dataOut).getFD();
} else {
LOG.debug("Could not get file descriptor for outputstream of class " +
this.dataOut.getClass());
}
} catch (IOException e) {
LOG.warn("Could not get file descriptor for outputstream of class " +
this.dataOut.getClass());
}
}
public FileDescriptor getOutFd() {
return outFd;
this.checksum = checksum;
this.isTransientStorage = isTransientStorage;
}
/** @return the data output stream. */
@ -102,17 +72,12 @@ public class ReplicaOutputStreams implements Closeable {
IOUtils.closeStream(checksumOut);
}
public void closeDataStream() throws IOException {
dataOut.close();
dataOut = null;
}
/**
* Sync the data stream if it supports it.
*/
public void syncDataOut() throws IOException {
if (dataOut instanceof FileOutputStream) {
sync((FileOutputStream)dataOut);
((FileOutputStream)dataOut).getChannel().force(true);
}
}
@ -121,68 +86,8 @@ public class ReplicaOutputStreams implements Closeable {
*/
public void syncChecksumOut() throws IOException {
if (checksumOut instanceof FileOutputStream) {
sync((FileOutputStream)checksumOut);
((FileOutputStream)checksumOut).getChannel().force(true);
}
}
/**
* Flush the data stream if it supports it.
*/
public void flushDataOut() throws IOException {
flush(dataOut);
}
/**
* Flush the checksum stream if it supports it.
*/
public void flushChecksumOut() throws IOException {
flush(checksumOut);
}
private void flush(OutputStream dos) throws IOException {
long begin = Time.monotonicNow();
dos.flush();
long duration = Time.monotonicNow() - begin;
LOG.trace("ReplicaOutputStreams#flush takes {} ms.", duration);
if (duration > slowLogThresholdMs) {
LOG.warn("Slow flush took {} ms (threshold={} ms)", duration,
slowLogThresholdMs);
}
}
private void sync(FileOutputStream fos) throws IOException {
long begin = Time.monotonicNow();
fos.getChannel().force(true);
long duration = Time.monotonicNow() - begin;
LOG.trace("ReplicaOutputStreams#sync takes {} ms.", duration);
if (duration > slowLogThresholdMs) {
LOG.warn("Slow fsync took {} ms (threshold={} ms)", duration,
slowLogThresholdMs);
}
}
public long writeToDisk(byte[] b, int off, int len) throws IOException {
long begin = Time.monotonicNow();
dataOut.write(b, off, len);
long duration = Time.monotonicNow() - begin;
LOG.trace("DatanodeIO#writeToDisk takes {} ms.", duration);
if (duration > slowLogThresholdMs) {
LOG.warn("Slow BlockReceiver write data to disk cost: {} ms " +
"(threshold={} ms)", duration, slowLogThresholdMs);
}
return duration;
}
public void syncFileRangeIfPossible(long offset, long nbytes,
int flags) throws NativeIOException {
assert this.outFd != null : "null outFd!";
NativeIO.POSIX.syncFileRangeIfPossible(outFd, offset, nbytes, flags);
}
public void dropCacheBehindWrites(String identifier,
long offset, long len, int flags) throws NativeIOException {
assert this.outFd != null : "null outFd!";
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(
identifier, outFd, offset, len, flags);
}
}

View File

@ -52,9 +52,8 @@ import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker;
@ -146,7 +145,7 @@ class BlockPoolSlice {
//
this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
if (tmpDir.exists()) {
DataStorage.fullyDelete(tmpDir);
FileUtil.fullyDelete(tmpDir);
}
this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
final boolean supportAppends = conf.getBoolean(
@ -431,7 +430,7 @@ class BlockPoolSlice {
final File targetMetaFile = new File(targetDir, metaFile.getName());
try {
ReplicaInfo.rename(metaFile, targetMetaFile);
NativeIO.renameTo(metaFile, targetMetaFile);
} catch (IOException e) {
LOG.warn("Failed to move meta file from "
+ metaFile + " to " + targetMetaFile, e);
@ -441,7 +440,7 @@ class BlockPoolSlice {
final File targetBlockFile = new File(targetDir, blockFile.getName());
try {
ReplicaInfo.rename(blockFile, targetBlockFile);
NativeIO.renameTo(blockFile, targetBlockFile);
} catch (IOException e) {
LOG.warn("Failed to move block file from "
+ blockFile + " to " + targetBlockFile, e);
@ -671,6 +670,8 @@ class BlockPoolSlice {
* @return the number of valid bytes
*/
private long validateIntegrityAndSetLength(File blockFile, long genStamp) {
DataInputStream checksumIn = null;
InputStream blockIn = null;
try {
final File metaFile = FsDatasetUtil.getMetaFile(blockFile, genStamp);
long blockFileLen = blockFile.length();
@ -680,52 +681,57 @@ class BlockPoolSlice {
!metaFile.exists() || metaFileLen < crcHeaderLen) {
return 0;
}
try (DataInputStream checksumIn = new DataInputStream(
checksumIn = new DataInputStream(
new BufferedInputStream(new FileInputStream(metaFile),
ioFileBufferSize))) {
// read and handle the common header here. For now just a version
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
checksumIn, metaFile);
int bytesPerChecksum = checksum.getBytesPerChecksum();
int checksumSize = checksum.getChecksumSize();
long numChunks = Math.min(
(blockFileLen + bytesPerChecksum - 1) / bytesPerChecksum,
(metaFileLen - crcHeaderLen) / checksumSize);
if (numChunks == 0) {
return 0;
}
try (InputStream blockIn = new FileInputStream(blockFile);
ReplicaInputStreams ris = new ReplicaInputStreams(blockIn,
checksumIn, volume.obtainReference())) {
ris.skipChecksumFully((numChunks - 1) * checksumSize);
long lastChunkStartPos = (numChunks - 1) * bytesPerChecksum;
ris.skipDataFully(lastChunkStartPos);
int lastChunkSize = (int) Math.min(
bytesPerChecksum, blockFileLen - lastChunkStartPos);
byte[] buf = new byte[lastChunkSize + checksumSize];
ris.readChecksumFully(buf, lastChunkSize, checksumSize);
ris.readDataFully(buf, 0, lastChunkSize);
checksum.update(buf, 0, lastChunkSize);
long validFileLength;
if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc
validFileLength = lastChunkStartPos + lastChunkSize;
} else { // last chunk is corrupt
validFileLength = lastChunkStartPos;
}
// truncate if extra bytes are present without CRC
if (blockFile.length() > validFileLength) {
try (RandomAccessFile blockRAF =
new RandomAccessFile(blockFile, "rw")) {
// truncate blockFile
blockRAF.setLength(validFileLength);
}
}
return validFileLength;
ioFileBufferSize));
// read and handle the common header here. For now just a version
final DataChecksum checksum = BlockMetadataHeader.readDataChecksum(
checksumIn, metaFile);
int bytesPerChecksum = checksum.getBytesPerChecksum();
int checksumSize = checksum.getChecksumSize();
long numChunks = Math.min(
(blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum,
(metaFileLen - crcHeaderLen)/checksumSize);
if (numChunks == 0) {
return 0;
}
IOUtils.skipFully(checksumIn, (numChunks-1)*checksumSize);
blockIn = new FileInputStream(blockFile);
long lastChunkStartPos = (numChunks-1)*bytesPerChecksum;
IOUtils.skipFully(blockIn, lastChunkStartPos);
int lastChunkSize = (int)Math.min(
bytesPerChecksum, blockFileLen-lastChunkStartPos);
byte[] buf = new byte[lastChunkSize+checksumSize];
checksumIn.readFully(buf, lastChunkSize, checksumSize);
IOUtils.readFully(blockIn, buf, 0, lastChunkSize);
checksum.update(buf, 0, lastChunkSize);
long validFileLength;
if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc
validFileLength = lastChunkStartPos + lastChunkSize;
} else { // last chunck is corrupt
validFileLength = lastChunkStartPos;
}
// truncate if extra bytes are present without CRC
if (blockFile.length() > validFileLength) {
RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
try {
// truncate blockFile
blockRAF.setLength(validFileLength);
} finally {
blockRAF.close();
}
}
return validFileLength;
} catch (IOException e) {
FsDatasetImpl.LOG.warn(e);
return 0;
} finally {
IOUtils.closeStream(checksumIn);
IOUtils.closeStream(blockIn);
}
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
import java.io.FileDescriptor;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@ -34,9 +35,9 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIOException;
/**
@ -192,13 +193,13 @@ class FsDatasetAsyncDiskService {
}
public void submitSyncFileRangeRequest(FsVolumeImpl volume,
final ReplicaOutputStreams streams, final long offset, final long nbytes,
final FileDescriptor fd, final long offset, final long nbytes,
final int flags) {
execute(volume.getCurrentDir(), new Runnable() {
@Override
public void run() {
try {
streams.syncFileRangeIfPossible(offset, nbytes, flags);
NativeIO.POSIX.syncFileRangeIfPossible(fd, offset, nbytes, flags);
} catch (NativeIOException e) {
LOG.warn("sync_file_range error", e);
}

View File

@ -21,8 +21,9 @@ import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
@ -2984,9 +2985,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override
public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
ReplicaOutputStreams outs, long offset, long nbytes, int flags) {
FileDescriptor fd, long offset, long nbytes, int flags) {
FsVolumeImpl fsVolumeImpl = this.getVolume(block);
asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, outs, offset,
asyncDiskService.submitSyncFileRangeRequest(fsVolumeImpl, fd, offset,
nbytes, flags);
}

View File

@ -993,7 +993,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
DataStorage.STORAGE_DIR_LAZY_PERSIST);
File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
if (force) {
DataStorage.fullyDelete(bpDir);
FileUtil.fullyDelete(bpDir);
} else {
if (!rbwDir.delete()) {
throw new IOException("Failed to delete " + rbwDir);
@ -1007,7 +1007,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
!FileUtil.fullyDelete(lazypersistDir)))) {
throw new IOException("Failed to delete " + lazypersistDir);
}
DataStorage.fullyDelete(tmpDir);
FileUtil.fullyDelete(tmpDir);
for (File f : FileUtil.listFiles(bpCurrentDir)) {
if (!f.delete()) {
throw new IOException("Failed to delete " + f);
@ -1041,3 +1041,4 @@ public class FsVolumeImpl implements FsVolumeSpi {
return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType);
}
}

View File

@ -700,9 +700,9 @@ public class TestFileAppend{
// write data to block file
ReplicaBeingWritten rbw =
(ReplicaBeingWritten)replicaHandler.getReplica();
ReplicaOutputStreams
outputStreams = rbw.createStreams(false, DEFAULT_CHECKSUM, 300);
(ReplicaBeingWritten) replicaHandler.getReplica();
ReplicaOutputStreams outputStreams =
rbw.createStreams(false, DEFAULT_CHECKSUM);
OutputStream dataOutput = outputStreams.getDataOut();
byte[] appendBytes = new byte[1];

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
import java.io.FileDescriptor;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@ -254,15 +255,14 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
synchronized public ReplicaOutputStreams createStreams(boolean isCreate,
DataChecksum requestedChecksum, long slowLogThresholdMs)
throws IOException {
DataChecksum requestedChecksum) throws IOException {
if (finalized) {
throw new IOException("Trying to write to a finalized replica "
+ theBlock);
} else {
SimulatedOutputStream crcStream = new SimulatedOutputStream();
return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum,
volume.isTransientStorage(), slowLogThresholdMs);
volume.isTransientStorage());
}
}
@ -1328,7 +1328,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
ReplicaOutputStreams outs, long offset, long nbytes, int flags) {
FileDescriptor fd, long offset, long nbytes, int flags) {
throw new UnsupportedOperationException();
}

View File

@ -642,7 +642,7 @@ public class TestBlockRecovery {
ReplicaOutputStreams streams = null;
try {
streams = replicaInfo.createStreams(true,
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300);
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
streams.getChecksumOut().write('a');
dn.data.initReplicaRecovery(new RecoveringBlock(block, null, RECOVERY_ID+1));
BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous =

View File

@ -83,7 +83,7 @@ public class TestSimulatedFSDataset {
ReplicaInPipelineInterface bInfo = fsdataset.createRbw(
StorageType.DEFAULT, b, false).getReplica();
ReplicaOutputStreams out = bInfo.createStreams(true,
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512), 300);
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
try {
OutputStream dataOut = out.getDataOut();
assertEquals(0, fsdataset.getLength(b));

View File

@ -322,8 +322,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
}
@Override
public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block,
ReplicaOutputStreams outs, long offset, long nbytes, int flags) {
public void submitBackgroundSyncFileRangeRequest(ExtendedBlock block, FileDescriptor fd, long offset, long nbytes, int flags) {
}
@Override

View File

@ -57,10 +57,8 @@ public class ExternalReplicaInPipeline implements ReplicaInPipelineInterface {
@Override
public ReplicaOutputStreams createStreams(boolean isCreate,
DataChecksum requestedChecksum, long slowLogThresholdMs)
throws IOException {
return new ReplicaOutputStreams(null, null, requestedChecksum, false,
slowLogThresholdMs);
DataChecksum requestedChecksum) throws IOException {
return new ReplicaOutputStreams(null, null, requestedChecksum, false);
}
@Override