HDFS-2465. Add HDFS support for fadvise readahead and drop-behind. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1190626 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-10-28 22:18:42 +00:00
parent c0c938f242
commit 6b0963c53b
5 changed files with 187 additions and 2 deletions

View File

@ -848,6 +848,8 @@ Release 0.23.0 - Unreleased
HDFS-2500. Avoid file system operations in BPOfferService thread while
processing deletes. (todd)
HDFS-2465. Add HDFS support for fadvise readahead and drop-behind. (todd)
BUG FIXES
HDFS-2347. Fix checkpointTxnCount's comment about editlog size.

View File

@ -54,6 +54,15 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.backup.dnrpc-address";
public static final String DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY = "dfs.datanode.balance.bandwidthPerSec";
public static final long DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024;
public static final String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
public static final long DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 0;
public static final String DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY = "dfs.datanode.drop.cache.behind.writes";
public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT = false;
public static final String DFS_DATANODE_SYNC_BEHIND_WRITES_KEY = "dfs.datanode.sync.behind.writes";
public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT = false;
public static final String DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY = "dfs.datanode.drop.cache.behind.reads";
public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT = false;
public static final String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";
public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50070";
public static final String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address";

View File

@ -24,6 +24,7 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@ -46,6 +47,7 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.PureJavaCrc32;
@ -57,10 +59,13 @@
class BlockReceiver implements Closeable {
public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
private DataInputStream in = null; // from where data are read
private DataChecksum checksum; // from where chunks of a block can be read
private OutputStream out = null; // to block file at local disk
private FileDescriptor outFd;
private OutputStream cout = null; // output stream for cehcksum file
private DataOutputStream checksumOut = null; // to crc file at local disk
private int bytesPerChecksum;
@ -80,6 +85,11 @@ class BlockReceiver implements Closeable {
private final DataNode datanode;
volatile private boolean mirrorError;
// Cache management state
private boolean dropCacheBehindWrites;
private boolean syncBehindWrites;
private long lastCacheDropOffset = 0;
/** The client name. It is empty if a datanode is the client */
private final String clientname;
private final boolean isClient;
@ -170,6 +180,8 @@ class BlockReceiver implements Closeable {
this.checksum = DataChecksum.newDataChecksum(in);
this.bytesPerChecksum = checksum.getBytesPerChecksum();
this.checksumSize = checksum.getChecksumSize();
this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites();
this.syncBehindWrites = datanode.shouldSyncBehindWrites();
final boolean isCreate = isDatanode || isTransfer
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
@ -177,6 +189,12 @@ class BlockReceiver implements Closeable {
this.bytesPerChecksum, this.checksumSize);
if (streams != null) {
this.out = streams.dataOut;
if (out instanceof FileOutputStream) {
this.outFd = ((FileOutputStream)out).getFD();
} else {
LOG.warn("Could not get file descriptor for outputstream of class " +
out.getClass());
}
this.cout = streams.checksumOut;
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE));
@ -631,6 +649,8 @@ private int receivePacket(long offsetInBlock, long seqno,
);
datanode.metrics.incrBytesWritten(len);
dropOsCacheBehindWriter(offsetInBlock);
}
} catch (IOException iex) {
datanode.checkDiskError(iex);
@ -645,6 +665,28 @@ private int receivePacket(long offsetInBlock, long seqno,
return lastPacketInBlock?-1:len;
}
private void dropOsCacheBehindWriter(long offsetInBlock) throws IOException {
try {
if (outFd != null &&
offsetInBlock > lastCacheDropOffset + CACHE_DROP_LAG_BYTES) {
long twoWindowsAgo = lastCacheDropOffset - CACHE_DROP_LAG_BYTES;
if (twoWindowsAgo > 0 && dropCacheBehindWrites) {
NativeIO.posixFadviseIfPossible(outFd, 0, lastCacheDropOffset,
NativeIO.POSIX_FADV_DONTNEED);
}
if (syncBehindWrites) {
NativeIO.syncFileRangeIfPossible(outFd, lastCacheDropOffset, CACHE_DROP_LAG_BYTES,
NativeIO.SYNC_FILE_RANGE_WRITE);
}
lastCacheDropOffset += CACHE_DROP_LAG_BYTES;
}
} catch (Throwable t) {
LOG.warn("Couldn't drop os cache behind writer for " + block, t);
}
}
void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
checksum.writeHeader(mirrorOut);
}

View File

@ -20,6 +20,7 @@
import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
@ -36,6 +37,9 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.ReadaheadPool;
import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.DataChecksum;
@ -118,7 +122,9 @@ class BlockSender implements java.io.Closeable {
private DataInputStream checksumIn;
/** Checksum utility */
private final DataChecksum checksum;
/** Starting position to read */
/** Initial position to read */
private long initialOffset;
/** Current position of read */
private long offset;
/** Position of last byte to read from block file */
private final long endOffset;
@ -142,6 +148,24 @@ class BlockSender implements java.io.Closeable {
private final String clientTraceFmt;
private volatile ChunkChecksum lastChunkChecksum = null;
/** The file descriptor of the block being sent */
private FileDescriptor blockInFd;
// Cache-management related fields
private final long readaheadLength;
private boolean shouldDropCacheBehindRead;
private ReadaheadRequest curReadahead;
private long lastCacheDropOffset;
private static final long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
/**
* Minimum length of read below which management of the OS
* buffer cache is disabled.
*/
private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
private static ReadaheadPool readaheadPool =
ReadaheadPool.getInstance();
/**
* Constructor
*
@ -165,6 +189,8 @@ class BlockSender implements java.io.Closeable {
this.corruptChecksumOk = corruptChecksumOk;
this.verifyChecksum = verifyChecksum;
this.clientTraceFmt = clientTraceFmt;
this.readaheadLength = datanode.getReadaheadLength();
this.shouldDropCacheBehindRead = datanode.shouldDropCacheBehindReads();
synchronized(datanode.data) {
this.replica = getReplica(block, datanode);
@ -277,6 +303,11 @@ class BlockSender implements java.io.Closeable {
DataNode.LOG.debug("replica=" + replica);
}
blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
if (blockIn instanceof FileInputStream) {
blockInFd = ((FileInputStream)blockIn).getFD();
} else {
blockInFd = null;
}
} catch (IOException ioe) {
IOUtils.closeStream(this);
IOUtils.closeStream(blockIn);
@ -288,6 +319,20 @@ class BlockSender implements java.io.Closeable {
* close opened files.
*/
public void close() throws IOException {
if (blockInFd != null && shouldDropCacheBehindRead) {
// drop the last few MB of the file from cache
try {
NativeIO.posixFadviseIfPossible(
blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset,
NativeIO.POSIX_FADV_DONTNEED);
} catch (Exception e) {
LOG.warn("Unable to drop cache on file close", e);
}
}
if (curReadahead != null) {
curReadahead.cancel();
}
IOException ioe = null;
if(checksumIn!=null) {
try {
@ -304,6 +349,7 @@ public void close() throws IOException {
ioe = e;
}
blockIn = null;
blockInFd = null;
}
// throw IOException if there is any
if(ioe!= null) {
@ -538,10 +584,20 @@ long sendBlock(DataOutputStream out, OutputStream baseStream,
if (out == null) {
throw new IOException( "out stream is null" );
}
final long initialOffset = offset;
initialOffset = offset;
long totalRead = 0;
OutputStream streamForSendChunks = out;
lastCacheDropOffset = initialOffset;
if (isLongRead() && blockInFd != null) {
// Advise that this file descriptor will be accessed sequentially.
NativeIO.posixFadviseIfPossible(blockInFd, 0, 0, NativeIO.POSIX_FADV_SEQUENTIAL);
}
// Trigger readahead of beginning of file if configured.
manageOsCache();
final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
try {
writeChecksumHeader(out);
@ -569,6 +625,7 @@ long sendBlock(DataOutputStream out, OutputStream baseStream,
ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
while (endOffset > offset) {
manageOsCache();
long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks,
transferTo, throttler);
offset += len;
@ -595,6 +652,45 @@ long sendBlock(DataOutputStream out, OutputStream baseStream,
}
return totalRead;
}
/**
* Manage the OS buffer cache by performing read-ahead
* and drop-behind.
*/
private void manageOsCache() throws IOException {
if (!isLongRead() || blockInFd == null) {
// don't manage cache manually for short-reads, like
// HBase random read workloads.
return;
}
// Perform readahead if necessary
if (readaheadLength > 0 && readaheadPool != null) {
curReadahead = readaheadPool.readaheadStream(
clientTraceFmt, blockInFd,
offset, readaheadLength, Long.MAX_VALUE,
curReadahead);
}
// Drop what we've just read from cache, since we aren't
// likely to need it again
long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
if (shouldDropCacheBehindRead &&
offset >= nextCacheDropOffset) {
long dropLength = offset - lastCacheDropOffset;
if (dropLength >= 1024) {
NativeIO.posixFadviseIfPossible(blockInFd,
lastCacheDropOffset, dropLength,
NativeIO.POSIX_FADV_DONTNEED);
}
lastCacheDropOffset += CACHE_DROP_INTERVAL_BYTES;
}
}
private boolean isLongRead() {
return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES;
}
/**
* Write checksum header to the output stream

View File

@ -104,6 +104,7 @@
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -418,6 +419,11 @@ void refreshNamenodes(Configuration conf)
int socketTimeout;
int socketWriteTimeout = 0;
boolean transferToAllowed = true;
private boolean dropCacheBehindWrites = false;
private boolean syncBehindWrites = false;
private boolean dropCacheBehindReads = false;
private long readaheadLength = 0;
int writePacketSize = 0;
boolean isBlockTokenEnabled;
BlockPoolTokenSecretManager blockPoolTokenSecretManager;
@ -501,6 +507,20 @@ private void initConfig(Configuration conf) {
DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT);
this.writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
this.readaheadLength = conf.getLong(
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY,
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
this.dropCacheBehindWrites = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY,
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT);
this.syncBehindWrites = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY,
DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT);
this.dropCacheBehindReads = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
this.initialBlockReportDelay = conf.getLong(
@ -2903,4 +2923,20 @@ public Long getBalancerBandwidth() {
(DataXceiverServer) this.dataXceiverServer.getRunnable();
return dxcs.balanceThrottler.getBandwidth();
}
long getReadaheadLength() {
return readaheadLength;
}
boolean shouldDropCacheBehindWrites() {
return dropCacheBehindWrites;
}
boolean shouldDropCacheBehindReads() {
return dropCacheBehindReads;
}
boolean shouldSyncBehindWrites() {
return syncBehindWrites;
}
}