From 6b0963c53be360b710614b9f44a29c4171af6b83 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Fri, 28 Oct 2011 22:18:42 +0000 Subject: [PATCH] HDFS-2465. Add HDFS support for fadvise readahead and drop-behind. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1190626 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 9 ++ .../hdfs/server/datanode/BlockReceiver.java | 42 ++++++++ .../hdfs/server/datanode/BlockSender.java | 100 +++++++++++++++++- .../hadoop/hdfs/server/datanode/DataNode.java | 36 +++++++ 5 files changed, 187 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index e8c689b9902..8507daf4dc4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -848,6 +848,8 @@ Release 0.23.0 - Unreleased HDFS-2500. Avoid file system operations in BPOfferService thread while processing deletes. (todd) + HDFS-2465. Add HDFS support for fadvise readahead and drop-behind. (todd) + BUG FIXES HDFS-2347. Fix checkpointTxnCount's comment about editlog size. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 9c53bc08796..6c10d0e8473 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -54,6 +54,15 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.backup.dnrpc-address"; public static final String DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY = "dfs.datanode.balance.bandwidthPerSec"; public static final long DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024; + public static final String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes"; + public static final long DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 0; + public static final String DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY = "dfs.datanode.drop.cache.behind.writes"; + public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT = false; + public static final String DFS_DATANODE_SYNC_BEHIND_WRITES_KEY = "dfs.datanode.sync.behind.writes"; + public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT = false; + public static final String DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY = "dfs.datanode.drop.cache.behind.reads"; + public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT = false; + public static final String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address"; public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50070"; public static final String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 50e118aaa00..b935aafd412 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -24,6 +24,7 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; +import java.io.FileDescriptor; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -46,6 +47,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.PureJavaCrc32; @@ -57,10 +59,13 @@ class BlockReceiver implements Closeable { public static final Log LOG = DataNode.LOG; static final Log ClientTraceLog = DataNode.ClientTraceLog; + + private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024; private DataInputStream in = null; // from where data are read private DataChecksum checksum; // from where chunks of a block can be read private OutputStream out = null; // to block file at local disk + private FileDescriptor outFd; private OutputStream cout = null; // output stream for cehcksum file private DataOutputStream checksumOut = null; // to crc file at local disk private int bytesPerChecksum; @@ -80,6 +85,11 @@ class BlockReceiver implements Closeable { private final DataNode datanode; volatile private boolean mirrorError; + // Cache management state + private boolean dropCacheBehindWrites; + private boolean syncBehindWrites; + private long lastCacheDropOffset = 0; + /** The client name. It is empty if a datanode is the client */ private final String clientname; private final boolean isClient; @@ -170,6 +180,8 @@ class BlockReceiver implements Closeable { this.checksum = DataChecksum.newDataChecksum(in); this.bytesPerChecksum = checksum.getBytesPerChecksum(); this.checksumSize = checksum.getChecksumSize(); + this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites(); + this.syncBehindWrites = datanode.shouldSyncBehindWrites(); final boolean isCreate = isDatanode || isTransfer || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE; @@ -177,6 +189,12 @@ class BlockReceiver implements Closeable { this.bytesPerChecksum, this.checksumSize); if (streams != null) { this.out = streams.dataOut; + if (out instanceof FileOutputStream) { + this.outFd = ((FileOutputStream)out).getFD(); + } else { + LOG.warn("Could not get file descriptor for outputstream of class " + + out.getClass()); + } this.cout = streams.checksumOut; this.checksumOut = new DataOutputStream(new BufferedOutputStream( streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE)); @@ -631,6 +649,8 @@ private int receivePacket(long offsetInBlock, long seqno, ); datanode.metrics.incrBytesWritten(len); + + dropOsCacheBehindWriter(offsetInBlock); } } catch (IOException iex) { datanode.checkDiskError(iex); @@ -645,6 +665,28 @@ private int receivePacket(long offsetInBlock, long seqno, return lastPacketInBlock?-1:len; } + private void dropOsCacheBehindWriter(long offsetInBlock) throws IOException { + try { + if (outFd != null && + offsetInBlock > lastCacheDropOffset + CACHE_DROP_LAG_BYTES) { + long twoWindowsAgo = lastCacheDropOffset - CACHE_DROP_LAG_BYTES; + if (twoWindowsAgo > 0 && dropCacheBehindWrites) { + NativeIO.posixFadviseIfPossible(outFd, 0, lastCacheDropOffset, + NativeIO.POSIX_FADV_DONTNEED); + } + + if (syncBehindWrites) { + NativeIO.syncFileRangeIfPossible(outFd, lastCacheDropOffset, CACHE_DROP_LAG_BYTES, + NativeIO.SYNC_FILE_RANGE_WRITE); + } + + lastCacheDropOffset += CACHE_DROP_LAG_BYTES; + } + } catch (Throwable t) { + LOG.warn("Couldn't drop os cache behind writer for " + block, t); + } + } + void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException { checksum.writeHeader(mirrorOut); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java index 84b38b37e9a..ca9765ce3ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java @@ -20,6 +20,7 @@ import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.FileDescriptor; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -36,6 +37,9 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.ReadaheadPool; +import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; +import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.net.SocketOutputStream; import org.apache.hadoop.util.DataChecksum; @@ -118,7 +122,9 @@ class BlockSender implements java.io.Closeable { private DataInputStream checksumIn; /** Checksum utility */ private final DataChecksum checksum; - /** Starting position to read */ + /** Initial position to read */ + private long initialOffset; + /** Current position of read */ private long offset; /** Position of last byte to read from block file */ private final long endOffset; @@ -142,6 +148,24 @@ class BlockSender implements java.io.Closeable { private final String clientTraceFmt; private volatile ChunkChecksum lastChunkChecksum = null; + /** The file descriptor of the block being sent */ + private FileDescriptor blockInFd; + + // Cache-management related fields + private final long readaheadLength; + private boolean shouldDropCacheBehindRead; + private ReadaheadRequest curReadahead; + private long lastCacheDropOffset; + private static final long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB + /** + * Minimum length of read below which management of the OS + * buffer cache is disabled. + */ + private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024; + + private static ReadaheadPool readaheadPool = + ReadaheadPool.getInstance(); + /** * Constructor * @@ -165,6 +189,8 @@ class BlockSender implements java.io.Closeable { this.corruptChecksumOk = corruptChecksumOk; this.verifyChecksum = verifyChecksum; this.clientTraceFmt = clientTraceFmt; + this.readaheadLength = datanode.getReadaheadLength(); + this.shouldDropCacheBehindRead = datanode.shouldDropCacheBehindReads(); synchronized(datanode.data) { this.replica = getReplica(block, datanode); @@ -277,6 +303,11 @@ class BlockSender implements java.io.Closeable { DataNode.LOG.debug("replica=" + replica); } blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset + if (blockIn instanceof FileInputStream) { + blockInFd = ((FileInputStream)blockIn).getFD(); + } else { + blockInFd = null; + } } catch (IOException ioe) { IOUtils.closeStream(this); IOUtils.closeStream(blockIn); @@ -288,6 +319,20 @@ class BlockSender implements java.io.Closeable { * close opened files. */ public void close() throws IOException { + if (blockInFd != null && shouldDropCacheBehindRead) { + // drop the last few MB of the file from cache + try { + NativeIO.posixFadviseIfPossible( + blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset, + NativeIO.POSIX_FADV_DONTNEED); + } catch (Exception e) { + LOG.warn("Unable to drop cache on file close", e); + } + } + if (curReadahead != null) { + curReadahead.cancel(); + } + IOException ioe = null; if(checksumIn!=null) { try { @@ -304,6 +349,7 @@ public void close() throws IOException { ioe = e; } blockIn = null; + blockInFd = null; } // throw IOException if there is any if(ioe!= null) { @@ -538,10 +584,20 @@ long sendBlock(DataOutputStream out, OutputStream baseStream, if (out == null) { throw new IOException( "out stream is null" ); } - final long initialOffset = offset; + initialOffset = offset; long totalRead = 0; OutputStream streamForSendChunks = out; + lastCacheDropOffset = initialOffset; + + if (isLongRead() && blockInFd != null) { + // Advise that this file descriptor will be accessed sequentially. + NativeIO.posixFadviseIfPossible(blockInFd, 0, 0, NativeIO.POSIX_FADV_SEQUENTIAL); + } + + // Trigger readahead of beginning of file if configured. + manageOsCache(); + final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; try { writeChecksumHeader(out); @@ -569,6 +625,7 @@ long sendBlock(DataOutputStream out, OutputStream baseStream, ByteBuffer pktBuf = ByteBuffer.allocate(pktSize); while (endOffset > offset) { + manageOsCache(); long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo, throttler); offset += len; @@ -595,6 +652,45 @@ long sendBlock(DataOutputStream out, OutputStream baseStream, } return totalRead; } + + /** + * Manage the OS buffer cache by performing read-ahead + * and drop-behind. + */ + private void manageOsCache() throws IOException { + if (!isLongRead() || blockInFd == null) { + // don't manage cache manually for short-reads, like + // HBase random read workloads. + return; + } + + // Perform readahead if necessary + if (readaheadLength > 0 && readaheadPool != null) { + curReadahead = readaheadPool.readaheadStream( + clientTraceFmt, blockInFd, + offset, readaheadLength, Long.MAX_VALUE, + curReadahead); + } + + // Drop what we've just read from cache, since we aren't + // likely to need it again + long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES; + if (shouldDropCacheBehindRead && + offset >= nextCacheDropOffset) { + long dropLength = offset - lastCacheDropOffset; + if (dropLength >= 1024) { + NativeIO.posixFadviseIfPossible(blockInFd, + lastCacheDropOffset, dropLength, + NativeIO.POSIX_FADV_DONTNEED); + } + lastCacheDropOffset += CACHE_DROP_INTERVAL_BYTES; + } + } + + private boolean isLongRead() { + return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES; + } + /** * Write checksum header to the output stream diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 0ec544d9a8b..e069e4023f4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -104,6 +104,7 @@ import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -418,6 +419,11 @@ void refreshNamenodes(Configuration conf) int socketTimeout; int socketWriteTimeout = 0; boolean transferToAllowed = true; + private boolean dropCacheBehindWrites = false; + private boolean syncBehindWrites = false; + private boolean dropCacheBehindReads = false; + private long readaheadLength = 0; + int writePacketSize = 0; boolean isBlockTokenEnabled; BlockPoolTokenSecretManager blockPoolTokenSecretManager; @@ -501,6 +507,20 @@ private void initConfig(Configuration conf) { DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT); this.writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); + + this.readaheadLength = conf.getLong( + DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY, + DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + this.dropCacheBehindWrites = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, + DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT); + this.syncBehindWrites = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY, + DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT); + this.dropCacheBehindReads = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, + DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT); + this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); this.initialBlockReportDelay = conf.getLong( @@ -2903,4 +2923,20 @@ public Long getBalancerBandwidth() { (DataXceiverServer) this.dataXceiverServer.getRunnable(); return dxcs.balanceThrottler.getBandwidth(); } + + long getReadaheadLength() { + return readaheadLength; + } + + boolean shouldDropCacheBehindWrites() { + return dropCacheBehindWrites; + } + + boolean shouldDropCacheBehindReads() { + return dropCacheBehindReads; + } + + boolean shouldSyncBehindWrites() { + return syncBehindWrites; + } }