diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 92e85d9cc14..0e68fb0828a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -256,6 +256,9 @@ Trunk (Unreleased) HDFS-5431. Support cachepool-based limit management in path-based caching (awang via cmccabe) + HDFS-5634. Allow BlockReaderLocal to switch between checksumming and not + (cmccabe) + OPTIMIZATIONS HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java index 2f0686a9beb..b957f00914f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java @@ -18,8 +18,10 @@ package org.apache.hadoop.hdfs; import java.io.IOException; +import java.util.EnumSet; import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.hdfs.client.ClientMmap; import org.apache.hadoop.hdfs.client.ClientMmapManager; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -89,10 +91,10 @@ public interface BlockReader extends ByteBufferReadable { /** * Get a ClientMmap object for this BlockReader. * - * @param curBlock The current block. + * @param opts The read options to use. * @return The ClientMmap object, or null if mmap is not * supported. */ - ClientMmap getClientMmap(LocatedBlock curBlock, + ClientMmap getClientMmap(EnumSet opts, ClientMmapManager mmapManager); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index 9f11327f758..ae98e573c06 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; @@ -98,7 +99,7 @@ public class BlockReaderFactory { // enabled, try to set up a BlockReaderLocal. BlockReader reader = newShortCircuitBlockReader(conf, file, block, blockToken, startOffset, len, peer, datanodeID, - domSockFactory, verifyChecksum, fisCache); + domSockFactory, verifyChecksum, fisCache, cachingStrategy); if (reader != null) { // One we've constructed the short-circuit block reader, we don't // need the socket any more. So let's return it to the cache. @@ -160,7 +161,8 @@ public class BlockReaderFactory { * @param verifyChecksum True if we should verify the checksums. * Note: even if this is true, when * DFS_CLIENT_READ_CHECKSUM_SKIP_CHECKSUM_KEY is - * set, we will skip checksums. + * set or the block is mlocked, we will skip + * checksums. * * @return The BlockReaderLocal, or null if the * DataNode declined to provide short-circuit @@ -172,7 +174,8 @@ public class BlockReaderFactory { Token blockToken, long startOffset, long len, Peer peer, DatanodeID datanodeID, DomainSocketFactory domSockFactory, boolean verifyChecksum, - FileInputStreamCache fisCache) throws IOException { + FileInputStreamCache fisCache, + CachingStrategy cachingStrategy) throws IOException { final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( peer.getOutputStream())); @@ -189,9 +192,18 @@ public class BlockReaderFactory { FileInputStream fis[] = new FileInputStream[2]; sock.recvFileInputStreams(fis, buf, 0, buf.length); try { - reader = new BlockReaderLocal(conf, file, block, - startOffset, len, fis[0], fis[1], datanodeID, verifyChecksum, - fisCache); + reader = new BlockReaderLocal.Builder(conf). + setFilename(file). + setBlock(block). + setStartOffset(startOffset). + setStreams(fis). + setDatanodeID(datanodeID). + setVerifyChecksum(verifyChecksum). + setBlockMetadataHeader( + BlockMetadataHeader.preadHeader(fis[1].getChannel())). + setFileInputStreamCache(fisCache). + setCachingStrategy(cachingStrategy). + build(); } finally { if (reader == null) { IOUtils.cleanup(DFSClient.LOG, fis[0], fis[1]); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java index aeac1757976..f702e9b5c77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java @@ -17,25 +17,30 @@ */ package org.apache.hadoop.hdfs; -import java.io.BufferedInputStream; -import java.io.DataInputStream; import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; -import org.apache.hadoop.conf.Configuration; +import java.nio.channels.FileChannel; +import java.util.EnumSet; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.hdfs.client.ClientMmap; +import org.apache.hadoop.hdfs.DFSClient.Conf; import org.apache.hadoop.hdfs.client.ClientMmapManager; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.util.DirectBufferPool; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DataChecksum; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + /** * BlockReaderLocal enables local short circuited reads. If the DFS client is on * the same machine as the datanode, then the client can read files directly @@ -55,480 +60,566 @@ import org.apache.hadoop.util.DataChecksum; class BlockReaderLocal implements BlockReader { static final Log LOG = LogFactory.getLog(BlockReaderLocal.class); - private final FileInputStream dataIn; // reader for the data file - private final FileInputStream checksumIn; // reader for the checksum file + private static DirectBufferPool bufferPool = new DirectBufferPool(); + + public static class Builder { + private int bufferSize; + private boolean verifyChecksum; + private int maxReadahead; + private String filename; + private FileInputStream streams[]; + private long dataPos; + private DatanodeID datanodeID; + private FileInputStreamCache fisCache; + private boolean mlocked; + private BlockMetadataHeader header; + private ExtendedBlock block; + + public Builder(Conf conf) { + this.maxReadahead = Integer.MAX_VALUE; + this.verifyChecksum = !conf.skipShortCircuitChecksums; + this.bufferSize = conf.shortCircuitBufferSize; + } + + public Builder setVerifyChecksum(boolean verifyChecksum) { + this.verifyChecksum = verifyChecksum; + return this; + } + + public Builder setCachingStrategy(CachingStrategy cachingStrategy) { + long readahead = cachingStrategy.getReadahead() != null ? + cachingStrategy.getReadahead() : + DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT; + this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead); + return this; + } + + public Builder setFilename(String filename) { + this.filename = filename; + return this; + } + + public Builder setStreams(FileInputStream streams[]) { + this.streams = streams; + return this; + } + + public Builder setStartOffset(long startOffset) { + this.dataPos = Math.max(0, startOffset); + return this; + } + + public Builder setDatanodeID(DatanodeID datanodeID) { + this.datanodeID = datanodeID; + return this; + } + + public Builder setFileInputStreamCache(FileInputStreamCache fisCache) { + this.fisCache = fisCache; + return this; + } + + public Builder setMlocked(boolean mlocked) { + this.mlocked = mlocked; + return this; + } + + public Builder setBlockMetadataHeader(BlockMetadataHeader header) { + this.header = header; + return this; + } + + public Builder setBlock(ExtendedBlock block) { + this.block = block; + return this; + } + + public BlockReaderLocal build() { + Preconditions.checkNotNull(streams); + Preconditions.checkArgument(streams.length == 2); + Preconditions.checkNotNull(header); + return new BlockReaderLocal(this); + } + } + + private boolean closed = false; + + /** + * Pair of streams for this block. + */ + private final FileInputStream streams[]; + + /** + * The data FileChannel. + */ + private final FileChannel dataIn; + + /** + * The next place we'll read from in the block data FileChannel. + * + * If data is buffered in dataBuf, this offset will be larger than the + * offset of the next byte which a read() operation will give us. + */ + private long dataPos; + + /** + * The Checksum FileChannel. + */ + private final FileChannel checksumIn; + + /** + * Checksum type and size. + */ + private final DataChecksum checksum; + + /** + * If false, we will always skip the checksum. + */ private final boolean verifyChecksum; /** - * Offset from the most recent chunk boundary at which the next read should - * take place. Is only set to non-zero at construction time, and is - * decremented (usually to 0) by subsequent reads. This avoids having to do a - * checksum read at construction to position the read cursor correctly. + * If true, this block is mlocked on the DataNode. */ - private int offsetFromChunkBoundary; - - private byte[] skipBuf = null; + private final AtomicBoolean mlocked; /** - * Used for checksummed reads that need to be staged before copying to their - * output buffer because they are either a) smaller than the checksum chunk - * size or b) issued by the slower read(byte[]...) path + * Name of the block, for logging purposes. */ - private ByteBuffer slowReadBuff = null; - private ByteBuffer checksumBuff = null; - private DataChecksum checksum; - - private static DirectBufferPool bufferPool = new DirectBufferPool(); - - private final int bytesPerChecksum; - private final int checksumSize; - - /** offset in block where reader wants to actually read */ - private long startOffset; private final String filename; + /** + * DataNode which contained this block. + */ private final DatanodeID datanodeID; + + /** + * Block ID and Block Pool ID. + */ private final ExtendedBlock block; + /** + * Cache of Checksum#bytesPerChecksum. + */ + private int bytesPerChecksum; + + /** + * Cache of Checksum#checksumSize. + */ + private int checksumSize; + + /** + * FileInputStream cache to return the streams to upon closing, + * or null if we should just close them unconditionally. + */ private final FileInputStreamCache fisCache; + + /** + * Maximum number of chunks to allocate. + * + * This is used to allocate dataBuf and checksumBuf, in the event that + * we need them. + */ + private final int maxAllocatedChunks; + + /** + * True if zero readahead was requested. + */ + private final boolean zeroReadaheadRequested; + + /** + * Maximum amount of readahead we'll do. This will always be at least the, + * size of a single chunk, even if {@link zeroReadaheadRequested} is true. + * The reason is because we need to do a certain amount of buffering in order + * to do checksumming. + * + * This determines how many bytes we'll use out of dataBuf and checksumBuf. + * Why do we allocate buffers, and then (potentially) only use part of them? + * The rationale is that allocating a lot of buffers of different sizes would + * make it very difficult for the DirectBufferPool to re-use buffers. + */ + private int maxReadaheadLength; + private ClientMmap clientMmap; - private boolean mmapDisabled; - - private static int getSlowReadBufferNumChunks(int bufSize, - int bytesPerChecksum) { - if (bufSize < bytesPerChecksum) { - throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" + - bufSize + ") is not large enough to hold a single chunk (" + - bytesPerChecksum + "). Please configure " + - DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY + " appropriately"); - } - // Round down to nearest chunk size - return bufSize / bytesPerChecksum; - } + /** + * Buffers data starting at the current dataPos and extending on + * for dataBuf.limit(). + * + * This may be null if we don't need it. + */ + private ByteBuffer dataBuf; - public BlockReaderLocal(DFSClient.Conf conf, String filename, - ExtendedBlock block, long startOffset, long length, - FileInputStream dataIn, FileInputStream checksumIn, - DatanodeID datanodeID, boolean verifyChecksum, - FileInputStreamCache fisCache) throws IOException { - this.dataIn = dataIn; - this.checksumIn = checksumIn; - this.startOffset = Math.max(startOffset, 0); - this.filename = filename; - this.datanodeID = datanodeID; - this.block = block; - this.fisCache = fisCache; - this.clientMmap = null; - this.mmapDisabled = false; + /** + * Buffers checksums starting at the current checksumPos and extending on + * for checksumBuf.limit(). + * + * This may be null if we don't need it. + */ + private ByteBuffer checksumBuf; - // read and handle the common header here. For now just a version - checksumIn.getChannel().position(0); - BlockMetadataHeader header = BlockMetadataHeader - .readHeader(new DataInputStream( - new BufferedInputStream(checksumIn, - BlockMetadataHeader.getHeaderSize()))); - short version = header.getVersion(); - if (version != BlockMetadataHeader.VERSION) { - throw new IOException("Wrong version (" + version + ") of the " + - "metadata file for " + filename + "."); - } - this.verifyChecksum = verifyChecksum && !conf.skipShortCircuitChecksums; - long firstChunkOffset; - if (this.verifyChecksum) { - this.checksum = header.getChecksum(); - this.bytesPerChecksum = this.checksum.getBytesPerChecksum(); - this.checksumSize = this.checksum.getChecksumSize(); - firstChunkOffset = startOffset - - (startOffset % checksum.getBytesPerChecksum()); - this.offsetFromChunkBoundary = (int) (startOffset - firstChunkOffset); + private boolean mmapDisabled = false; - int chunksPerChecksumRead = getSlowReadBufferNumChunks( - conf.shortCircuitBufferSize, bytesPerChecksum); - slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead); - checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead); - // Initially the buffers have nothing to read. - slowReadBuff.flip(); - checksumBuff.flip(); - long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize; - IOUtils.skipFully(checksumIn, checkSumOffset); + private BlockReaderLocal(Builder builder) { + this.streams = builder.streams; + this.dataIn = builder.streams[0].getChannel(); + this.dataPos = builder.dataPos; + this.checksumIn = builder.streams[1].getChannel(); + this.checksum = builder.header.getChecksum(); + this.verifyChecksum = builder.verifyChecksum && + (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL); + this.mlocked = new AtomicBoolean(builder.mlocked); + this.filename = builder.filename; + this.datanodeID = builder.datanodeID; + this.fisCache = builder.fisCache; + this.block = builder.block; + this.bytesPerChecksum = checksum.getBytesPerChecksum(); + this.checksumSize = checksum.getChecksumSize(); + + this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 : + ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum); + // Calculate the effective maximum readahead. + // We can't do more readahead than there is space in the buffer. + int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 : + ((Math.min(builder.bufferSize, builder.maxReadahead) + + bytesPerChecksum - 1) / bytesPerChecksum); + if (maxReadaheadChunks == 0) { + this.zeroReadaheadRequested = true; + maxReadaheadChunks = 1; } else { - firstChunkOffset = startOffset; - this.checksum = null; - this.bytesPerChecksum = 0; - this.checksumSize = 0; - this.offsetFromChunkBoundary = 0; + this.zeroReadaheadRequested = false; } - - boolean success = false; - try { - // Reposition both input streams to the beginning of the chunk - // containing startOffset - this.dataIn.getChannel().position(firstChunkOffset); - success = true; - } finally { - if (success) { - if (LOG.isDebugEnabled()) { - LOG.debug("Created BlockReaderLocal for file " + filename - + " block " + block + " in datanode " + datanodeID); - } - } else { - if (slowReadBuff != null) bufferPool.returnBuffer(slowReadBuff); - if (checksumBuff != null) bufferPool.returnBuffer(checksumBuff); - } + this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum; + } + + private synchronized void createDataBufIfNeeded() { + if (dataBuf == null) { + dataBuf = bufferPool.getBuffer(maxAllocatedChunks * bytesPerChecksum); + dataBuf.position(0); + dataBuf.limit(0); } } - /** - * Reads bytes into a buffer until EOF or the buffer's limit is reached - */ - private int fillBuffer(FileInputStream stream, ByteBuffer buf) + private synchronized void freeDataBufIfExists() { + if (dataBuf != null) { + // When disposing of a dataBuf, we have to move our stored file index + // backwards. + dataPos -= dataBuf.remaining(); + dataBuf.clear(); + bufferPool.returnBuffer(dataBuf); + dataBuf = null; + } + } + + private synchronized void createChecksumBufIfNeeded() { + if (checksumBuf == null) { + checksumBuf = bufferPool.getBuffer(maxAllocatedChunks * checksumSize); + checksumBuf.position(0); + checksumBuf.limit(0); + } + } + + private synchronized void freeChecksumBufIfExists() { + if (checksumBuf != null) { + checksumBuf.clear(); + bufferPool.returnBuffer(checksumBuf); + checksumBuf = null; + } + } + + private synchronized int drainDataBuf(ByteBuffer buf) throws IOException { - int bytesRead = stream.getChannel().read(buf); - if (bytesRead < 0) { - //EOF - return bytesRead; + if (dataBuf == null) return 0; + int oldLimit = dataBuf.limit(); + int nRead = Math.min(dataBuf.remaining(), buf.remaining()); + if (nRead == 0) return 0; + try { + dataBuf.limit(dataBuf.position() + nRead); + buf.put(dataBuf); + } finally { + dataBuf.limit(oldLimit); } - while (buf.remaining() > 0) { - int n = stream.getChannel().read(buf); - if (n < 0) { - //EOF - return bytesRead; + return nRead; + } + + /** + * Read from the block file into a buffer. + * + * This function overwrites checksumBuf. It will increment dataPos. + * + * @param buf The buffer to read into. May be dataBuf. + * The position and limit of this buffer should be set to + * multiples of the checksum size. + * @param canSkipChecksum True if we can skip checksumming. + * + * @return Total bytes read. 0 on EOF. + */ + private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum) + throws IOException { + int total = 0; + long startDataPos = dataPos; + int startBufPos = buf.position(); + while (buf.hasRemaining()) { + int nRead = dataIn.read(buf, dataPos); + if (nRead < 0) { + break; } - bytesRead += n; + dataPos += nRead; + total += nRead; } - return bytesRead; + if (canSkipChecksum) { + freeChecksumBufIfExists(); + return total; + } + if (total > 0) { + try { + buf.limit(buf.position()); + buf.position(startBufPos); + createChecksumBufIfNeeded(); + int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum; + checksumBuf.clear(); + checksumBuf.limit(checksumsNeeded * checksumSize); + long checksumPos = + 7 + ((startDataPos / bytesPerChecksum) * checksumSize); + while (checksumBuf.hasRemaining()) { + int nRead = checksumIn.read(checksumBuf, checksumPos); + if (nRead < 0) { + throw new IOException("Got unexpected checksum file EOF at " + + checksumPos + ", block file position " + startDataPos + " for " + + "block " + block + " of file " + filename); + } + checksumPos += nRead; + } + checksumBuf.flip(); + + checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos); + } finally { + buf.position(buf.limit()); + } + } + return total; + } + + private boolean getCanSkipChecksum() { + return (!verifyChecksum) || mlocked.get(); } - /** - * Utility method used by read(ByteBuffer) to partially copy a ByteBuffer into - * another. - */ - private void writeSlice(ByteBuffer from, ByteBuffer to, int length) { - int oldLimit = from.limit(); - from.limit(from.position() + length); - try { - to.put(from); - } finally { - from.limit(oldLimit); - } - } - @Override public synchronized int read(ByteBuffer buf) throws IOException { - int nRead = 0; - if (verifyChecksum) { - // A 'direct' read actually has three phases. The first drains any - // remaining bytes from the slow read buffer. After this the read is - // guaranteed to be on a checksum chunk boundary. If there are still bytes - // to read, the fast direct path is used for as many remaining bytes as - // possible, up to a multiple of the checksum chunk size. Finally, any - // 'odd' bytes remaining at the end of the read cause another slow read to - // be issued, which involves an extra copy. - - // Every 'slow' read tries to fill the slow read buffer in one go for - // efficiency's sake. As described above, all non-checksum-chunk-aligned - // reads will be served from the slower read path. - - if (slowReadBuff.hasRemaining()) { - // There are remaining bytes from a small read available. This usually - // means this read is unaligned, which falls back to the slow path. - int fromSlowReadBuff = Math.min(buf.remaining(), slowReadBuff.remaining()); - writeSlice(slowReadBuff, buf, fromSlowReadBuff); - nRead += fromSlowReadBuff; + boolean canSkipChecksum = getCanSkipChecksum(); + + String traceString = null; + if (LOG.isTraceEnabled()) { + traceString = new StringBuilder(). + append("read("). + append("buf.remaining=").append(buf.remaining()). + append(", block=").append(block). + append(", filename=").append(filename). + append(", canSkipChecksum=").append(canSkipChecksum). + append(")").toString(); + LOG.info(traceString + ": starting"); + } + int nRead; + try { + if (canSkipChecksum && zeroReadaheadRequested) { + nRead = readWithoutBounceBuffer(buf); + } else { + nRead = readWithBounceBuffer(buf, canSkipChecksum); } + } catch (IOException e) { + if (LOG.isTraceEnabled()) { + LOG.info(traceString + ": I/O error", e); + } + throw e; + } + if (LOG.isTraceEnabled()) { + LOG.info(traceString + ": returning " + nRead); + } + return nRead; + } - if (buf.remaining() >= bytesPerChecksum && offsetFromChunkBoundary == 0) { - // Since we have drained the 'small read' buffer, we are guaranteed to - // be chunk-aligned - int len = buf.remaining() - (buf.remaining() % bytesPerChecksum); + private synchronized int readWithoutBounceBuffer(ByteBuffer buf) + throws IOException { + freeDataBufIfExists(); + freeChecksumBufIfExists(); + int total = 0; + while (buf.hasRemaining()) { + int nRead = dataIn.read(buf, dataPos); + if (nRead < 0) { + break; + } + dataPos += nRead; + total += nRead; + } + return (total == 0) ? -1 : total; + } - // There's only enough checksum buffer space available to checksum one - // entire slow read buffer. This saves keeping the number of checksum - // chunks around. - len = Math.min(len, slowReadBuff.capacity()); - int oldlimit = buf.limit(); - buf.limit(buf.position() + len); - int readResult = 0; + /** + * Fill the data buffer. If necessary, validate the data against the + * checksums. + * + * We always want the offsets of the data contained in dataBuf to be + * aligned to the chunk boundary. If we are validating checksums, we + * accomplish this by seeking backwards in the file until we're on a + * chunk boundary. (This is necessary because we can't checksum a + * partial chunk.) If we are not validating checksums, we simply only + * fill the latter part of dataBuf. + * + * @param canSkipChecksum true if we can skip checksumming. + * @return true if we hit EOF. + * @throws IOException + */ + private synchronized boolean fillDataBuf(boolean canSkipChecksum) + throws IOException { + createDataBufIfNeeded(); + final int slop = (int)(dataPos % bytesPerChecksum); + final long oldDataPos = dataPos; + dataBuf.limit(maxReadaheadLength); + if (canSkipChecksum) { + dataBuf.position(slop); + fillBuffer(dataBuf, canSkipChecksum); + } else { + dataPos -= slop; + dataBuf.position(0); + fillBuffer(dataBuf, canSkipChecksum); + } + dataBuf.limit(dataBuf.position()); + dataBuf.position(Math.min(dataBuf.position(), slop)); + if (LOG.isTraceEnabled()) { + LOG.trace("loaded " + dataBuf.remaining() + " bytes into bounce " + + "buffer from offset " + oldDataPos + " of " + block); + } + return dataBuf.limit() != maxReadaheadLength; + } + + /** + * Read using the bounce buffer. + * + * A 'direct' read actually has three phases. The first drains any + * remaining bytes from the slow read buffer. After this the read is + * guaranteed to be on a checksum chunk boundary. If there are still bytes + * to read, the fast direct path is used for as many remaining bytes as + * possible, up to a multiple of the checksum chunk size. Finally, any + * 'odd' bytes remaining at the end of the read cause another slow read to + * be issued, which involves an extra copy. + * + * Every 'slow' read tries to fill the slow read buffer in one go for + * efficiency's sake. As described above, all non-checksum-chunk-aligned + * reads will be served from the slower read path. + * + * @param buf The buffer to read into. + * @param canSkipChecksum True if we can skip checksums. + */ + private synchronized int readWithBounceBuffer(ByteBuffer buf, + boolean canSkipChecksum) throws IOException { + int total = 0; + boolean eof = false; + while (true) { + int bb = drainDataBuf(buf); // drain bounce buffer if possible + total += bb; + int needed = buf.remaining(); + if (eof || (needed == 0)) { + break; + } else if (buf.isDirect() && (needed >= maxReadaheadLength) + && ((dataPos % bytesPerChecksum) == 0)) { + // Fast lane: try to read directly into user-supplied buffer, bypassing + // bounce buffer. + int oldLimit = buf.limit(); + int nRead; try { - readResult = doByteBufferRead(buf); + buf.limit(buf.position() + maxReadaheadLength); + nRead = fillBuffer(buf, canSkipChecksum); } finally { - buf.limit(oldlimit); + buf.limit(oldLimit); } - if (readResult == -1) { - return nRead; - } else { - nRead += readResult; - buf.position(buf.position() + readResult); + if (nRead < maxReadaheadLength) { + eof = true; + } + total += nRead; + } else { + // Slow lane: refill bounce buffer. + if (fillDataBuf(canSkipChecksum)) { + eof = true; } } - - // offsetFromChunkBoundary > 0 => unaligned read, use slow path to read - // until chunk boundary - if ((buf.remaining() > 0 && buf.remaining() < bytesPerChecksum) || offsetFromChunkBoundary > 0) { - int toRead = Math.min(buf.remaining(), bytesPerChecksum - offsetFromChunkBoundary); - int readResult = fillSlowReadBuffer(toRead); - if (readResult == -1) { - return nRead; - } else { - int fromSlowReadBuff = Math.min(readResult, buf.remaining()); - writeSlice(slowReadBuff, buf, fromSlowReadBuff); - nRead += fromSlowReadBuff; - } - } - } else { - // Non-checksummed reads are much easier; we can just fill the buffer directly. - nRead = doByteBufferRead(buf); - if (nRead > 0) { - buf.position(buf.position() + nRead); - } } - return nRead; - } - - /** - * Tries to read as many bytes as possible into supplied buffer, checksumming - * each chunk if needed. - * - * Preconditions: - * - * Postconditions: - * - * - * @param buf - * byte buffer to write bytes to. If checksums are not required, buf - * can have any number of bytes remaining, otherwise there must be a - * multiple of the checksum chunk size remaining. - * @return max(min(totalBytesRead, len) - offsetFromChunkBoundary, 0) - * that is, the the number of useful bytes (up to the amount - * requested) readable from the buffer by the client. - */ - private synchronized int doByteBufferRead(ByteBuffer buf) throws IOException { - if (verifyChecksum) { - assert buf.remaining() % bytesPerChecksum == 0; - } - int dataRead = -1; - - int oldpos = buf.position(); - // Read as much as we can into the buffer. - dataRead = fillBuffer(dataIn, buf); - - if (dataRead == -1) { - return -1; - } - - if (verifyChecksum) { - ByteBuffer toChecksum = buf.duplicate(); - toChecksum.position(oldpos); - toChecksum.limit(oldpos + dataRead); - - checksumBuff.clear(); - // Equivalent to (int)Math.ceil(toChecksum.remaining() * 1.0 / bytesPerChecksum ); - int numChunks = - (toChecksum.remaining() + bytesPerChecksum - 1) / bytesPerChecksum; - checksumBuff.limit(checksumSize * numChunks); - - fillBuffer(checksumIn, checksumBuff); - checksumBuff.flip(); - - checksum.verifyChunkedSums(toChecksum, checksumBuff, filename, - this.startOffset); - } - - if (dataRead >= 0) { - buf.position(oldpos + Math.min(offsetFromChunkBoundary, dataRead)); - } - - if (dataRead < offsetFromChunkBoundary) { - // yikes, didn't even get enough bytes to honour offset. This can happen - // even if we are verifying checksums if we are at EOF. - offsetFromChunkBoundary -= dataRead; - dataRead = 0; - } else { - dataRead -= offsetFromChunkBoundary; - offsetFromChunkBoundary = 0; - } - - return dataRead; - } - - /** - * Ensures that up to len bytes are available and checksummed in the slow read - * buffer. The number of bytes available to read is returned. If the buffer is - * not already empty, the number of remaining bytes is returned and no actual - * read happens. - * - * @param len - * the maximum number of bytes to make available. After len bytes - * are read, the underlying bytestream must be at a checksum - * boundary, or EOF. That is, (len + currentPosition) % - * bytesPerChecksum == 0. - * @return the number of bytes available to read, or -1 if EOF. - */ - private synchronized int fillSlowReadBuffer(int len) throws IOException { - int nRead = -1; - if (slowReadBuff.hasRemaining()) { - // Already got data, good to go. - nRead = Math.min(len, slowReadBuff.remaining()); - } else { - // Round a complete read of len bytes (plus any implicit offset) to the - // next chunk boundary, since we try and read in multiples of a chunk - int nextChunk = len + offsetFromChunkBoundary + - (bytesPerChecksum - ((len + offsetFromChunkBoundary) % bytesPerChecksum)); - int limit = Math.min(nextChunk, slowReadBuff.capacity()); - assert limit % bytesPerChecksum == 0; - - slowReadBuff.clear(); - slowReadBuff.limit(limit); - - nRead = doByteBufferRead(slowReadBuff); - - if (nRead > 0) { - // So that next time we call slowReadBuff.hasRemaining(), we don't get a - // false positive. - slowReadBuff.limit(nRead + slowReadBuff.position()); - } - } - return nRead; + return total == 0 ? -1 : total; } @Override - public synchronized int read(byte[] buf, int off, int len) throws IOException { + public synchronized int read(byte[] arr, int off, int len) + throws IOException { + boolean canSkipChecksum = getCanSkipChecksum(); + String traceString = null; if (LOG.isTraceEnabled()) { - LOG.trace("read off " + off + " len " + len); + traceString = new StringBuilder(). + append("read(arr.length=").append(arr.length). + append(", off=").append(off). + append(", len=").append(len). + append(", filename=").append(filename). + append(", block=").append(block). + append(", canSkipChecksum=").append(canSkipChecksum). + append(")").toString(); + LOG.trace(traceString + ": starting"); } - if (!verifyChecksum) { - return dataIn.read(buf, off, len); + int nRead; + try { + if (canSkipChecksum && zeroReadaheadRequested) { + nRead = readWithoutBounceBuffer(arr, off, len); + } else { + nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum); + } + } catch (IOException e) { + if (LOG.isTraceEnabled()) { + LOG.trace(traceString + ": I/O error", e); + } + throw e; } - - int nRead = fillSlowReadBuffer(slowReadBuff.capacity()); - - if (nRead > 0) { - // Possible that buffer is filled with a larger read than we need, since - // we tried to read as much as possible at once - nRead = Math.min(len, nRead); - slowReadBuff.get(buf, off, nRead); + if (LOG.isTraceEnabled()) { + LOG.trace(traceString + ": returning " + nRead); } - return nRead; } + private synchronized int readWithoutBounceBuffer(byte arr[], int off, + int len) throws IOException { + freeDataBufIfExists(); + freeChecksumBufIfExists(); + int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos); + if (nRead > 0) { + dataPos += nRead; + } + return nRead == 0 ? -1 : nRead; + } + + private synchronized int readWithBounceBuffer(byte arr[], int off, int len, + boolean canSkipChecksum) throws IOException { + createDataBufIfNeeded(); + if (!dataBuf.hasRemaining()) { + dataBuf.position(0); + dataBuf.limit(maxReadaheadLength); + fillDataBuf(canSkipChecksum); + } + int toRead = Math.min(dataBuf.remaining(), len); + dataBuf.get(arr, off, toRead); + return toRead == 0 ? -1 : toRead; + } + @Override public synchronized long skip(long n) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("skip " + n); + int discardedFromBuf = 0; + long remaining = n; + if ((dataBuf != null) && dataBuf.hasRemaining()) { + discardedFromBuf = (int)Math.min(dataBuf.remaining(), n); + dataBuf.position(dataBuf.position() + discardedFromBuf); + remaining -= discardedFromBuf; } - if (n <= 0) { - return 0; + if (LOG.isTraceEnabled()) { + LOG.trace("skip(n=" + n + ", block=" + block + ", filename=" + + filename + "): discarded " + discardedFromBuf + " bytes from " + + "dataBuf and advanced dataPos by " + remaining); } - if (!verifyChecksum) { - return dataIn.skip(n); - } - - // caller made sure newPosition is not beyond EOF. - int remaining = slowReadBuff.remaining(); - int position = slowReadBuff.position(); - int newPosition = position + (int)n; - - // if the new offset is already read into dataBuff, just reposition - if (n <= remaining) { - assert offsetFromChunkBoundary == 0; - slowReadBuff.position(newPosition); - return n; - } - - // for small gap, read through to keep the data/checksum in sync - if (n - remaining <= bytesPerChecksum) { - slowReadBuff.position(position + remaining); - if (skipBuf == null) { - skipBuf = new byte[bytesPerChecksum]; - } - int ret = read(skipBuf, 0, (int)(n - remaining)); - return ret; - } - - // optimize for big gap: discard the current buffer, skip to - // the beginning of the appropriate checksum chunk and then - // read to the middle of that chunk to be in sync with checksums. - - // We can't use this.offsetFromChunkBoundary because we need to know how - // many bytes of the offset were really read. Calling read(..) with a - // positive this.offsetFromChunkBoundary causes that many bytes to get - // silently skipped. - int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum; - long toskip = n - remaining - myOffsetFromChunkBoundary; - - slowReadBuff.position(slowReadBuff.limit()); - checksumBuff.position(checksumBuff.limit()); - - IOUtils.skipFully(dataIn, toskip); - long checkSumOffset = (toskip / bytesPerChecksum) * checksumSize; - IOUtils.skipFully(checksumIn, checkSumOffset); - - // read into the middle of the chunk - if (skipBuf == null) { - skipBuf = new byte[bytesPerChecksum]; - } - assert skipBuf.length == bytesPerChecksum; - assert myOffsetFromChunkBoundary < bytesPerChecksum; - - int ret = read(skipBuf, 0, myOffsetFromChunkBoundary); - - if (ret == -1) { // EOS - return toskip; - } else { - return (toskip + ret); - } - } - - @Override - public synchronized void close() throws IOException { - if (clientMmap != null) { - clientMmap.unref(); - clientMmap = null; - } - if (fisCache != null) { - if (LOG.isDebugEnabled()) { - LOG.debug("putting FileInputStream for " + filename + - " back into FileInputStreamCache"); - } - fisCache.put(datanodeID, block, new FileInputStream[] {dataIn, checksumIn}); - } else { - LOG.debug("closing FileInputStream for " + filename); - IOUtils.cleanup(LOG, dataIn, checksumIn); - } - if (slowReadBuff != null) { - bufferPool.returnBuffer(slowReadBuff); - slowReadBuff = null; - } - if (checksumBuff != null) { - bufferPool.returnBuffer(checksumBuff); - checksumBuff = null; - } - startOffset = -1; - checksum = null; - } - - @Override - public int readAll(byte[] buf, int offset, int len) throws IOException { - return BlockReaderUtil.readAll(this, buf, offset, len); - } - - @Override - public void readFully(byte[] buf, int off, int len) throws IOException { - BlockReaderUtil.readFully(this, buf, off, len); + dataPos += remaining; + return n; } @Override @@ -537,25 +628,71 @@ class BlockReaderLocal implements BlockReader { return Integer.MAX_VALUE; } + @Override + public synchronized void close() throws IOException { + if (closed) return; + closed = true; + if (LOG.isTraceEnabled()) { + LOG.trace("close(filename=" + filename + ", block=" + block + ")"); + } + if (clientMmap != null) { + clientMmap.unref(); + clientMmap = null; + } + if (fisCache != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("putting FileInputStream for " + filename + + " back into FileInputStreamCache"); + } + fisCache.put(datanodeID, block, streams); + } else { + LOG.debug("closing FileInputStream for " + filename); + IOUtils.cleanup(LOG, dataIn, checksumIn); + } + freeDataBufIfExists(); + freeChecksumBufIfExists(); + } + + @Override + public synchronized void readFully(byte[] arr, int off, int len) + throws IOException { + BlockReaderUtil.readFully(this, arr, off, len); + } + + @Override + public synchronized int readAll(byte[] buf, int off, int len) + throws IOException { + return BlockReaderUtil.readAll(this, buf, off, len); + } + @Override public boolean isLocal() { return true; } - + @Override public boolean isShortCircuit() { return true; } @Override - public ClientMmap getClientMmap(LocatedBlock curBlock, - ClientMmapManager mmapManager) { + public synchronized ClientMmap getClientMmap(EnumSet opts, + ClientMmapManager mmapManager) { + if ((!opts.contains(ReadOption.SKIP_CHECKSUMS)) && + verifyChecksum && (!mlocked.get())) { + if (LOG.isTraceEnabled()) { + LOG.trace("can't get an mmap for " + block + " of " + filename + + " since SKIP_CHECKSUMS was not given, " + + "we aren't skipping checksums, and the block is not mlocked."); + } + return null; + } if (clientMmap == null) { if (mmapDisabled) { return null; } try { - clientMmap = mmapManager.fetch(datanodeID, block, dataIn); + clientMmap = mmapManager.fetch(datanodeID, block, streams[0]); if (clientMmap == null) { mmapDisabled = true; return null; @@ -572,4 +709,24 @@ class BlockReaderLocal implements BlockReader { } return clientMmap; } + + /** + * Set the mlocked state of the BlockReader. + * This method does NOT need to be synchronized because mlocked is atomic. + * + * @param mlocked the new mlocked state of the BlockReader. + */ + public void setMlocked(boolean mlocked) { + this.mlocked.set(mlocked); + } + + @VisibleForTesting + boolean getVerifyChecksum() { + return this.verifyChecksum; + } + + @VisibleForTesting + int getMaxReadaheadLength() { + return this.maxReadaheadLength; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java index 85ee41b6305..2f661933619 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java @@ -24,10 +24,12 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.hdfs.client.ClientMmap; import org.apache.hadoop.hdfs.client.ClientMmapManager; import org.apache.commons.logging.Log; @@ -706,8 +708,8 @@ class BlockReaderLocalLegacy implements BlockReader { } @Override - public ClientMmap getClientMmap(LocatedBlock curBlock, - ClientMmapManager mmapManager) { + public ClientMmap getClientMmap(EnumSet opts, + ClientMmapManager mmapManager) { return null; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index bb9a066afe0..0ab51c7e716 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; import org.apache.hadoop.io.ByteBufferPool; @@ -1073,9 +1074,18 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, DFSClient.LOG.debug("got FileInputStreams for " + block + " from " + "the FileInputStreamCache."); } - return new BlockReaderLocal(dfsClient.getConf(), file, - block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum, - fileInputStreamCache); + return new BlockReaderLocal.Builder(dfsClient.getConf()). + setFilename(file). + setBlock(block). + setStartOffset(startOffset). + setStreams(fis). + setDatanodeID(chosenNode). + setVerifyChecksum(verifyChecksum). + setBlockMetadataHeader(BlockMetadataHeader. + preadHeader(fis[1].getChannel())). + setFileInputStreamCache(fileInputStreamCache). + setCachingStrategy(cachingStrategy). + build(); } // If the legacy local block reader is enabled and we are reading a local @@ -1479,23 +1489,19 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, "at position " + pos); } } - boolean canSkipChecksums = opts.contains(ReadOption.SKIP_CHECKSUMS); - if (canSkipChecksums) { - ByteBuffer buffer = tryReadZeroCopy(maxLength); - if (buffer != null) { - return buffer; - } + ByteBuffer buffer = tryReadZeroCopy(maxLength, opts); + if (buffer != null) { + return buffer; } - ByteBuffer buffer = ByteBufferUtil. - fallbackRead(this, bufferPool, maxLength); + buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength); if (buffer != null) { extendedReadBuffers.put(buffer, bufferPool); } return buffer; } - private synchronized ByteBuffer tryReadZeroCopy(int maxLength) - throws IOException { + private synchronized ByteBuffer tryReadZeroCopy(int maxLength, + EnumSet opts) throws IOException { // Java ByteBuffers can't be longer than 2 GB, because they use // 4-byte signed integers to represent capacity, etc. // So we can't mmap the parts of the block higher than the 2 GB offset. @@ -1518,8 +1524,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, long blockPos = curPos - blockStartInFile; long limit = blockPos + length; ClientMmap clientMmap = - blockReader.getClientMmap(currentLocatedBlock, - dfsClient.getMmapManager()); + blockReader.getClientMmap(opts, dfsClient.getMmapManager()); if (clientMmap == null) { if (DFSClient.LOG.isDebugEnabled()) { DFSClient.LOG.debug("unable to perform a zero-copy read from offset " + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index f587c3b5d58..94a00ccc580 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -23,10 +23,12 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.EnumSet; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FSInputChecker; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.hdfs.client.ClientMmap; import org.apache.hadoop.hdfs.client.ClientMmapManager; import org.apache.hadoop.hdfs.net.Peer; @@ -490,8 +492,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { } @Override - public ClientMmap getClientMmap(LocatedBlock curBlock, - ClientMmapManager mmapManager) { + public ClientMmap getClientMmap(EnumSet opts, + ClientMmapManager mmapManager) { return null; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index 521fb70aa38..e76a65a40b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -25,10 +25,12 @@ import java.io.OutputStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ReadableByteChannel; +import java.util.EnumSet; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.hdfs.client.ClientMmap; import org.apache.hadoop.hdfs.client.ClientMmapManager; import org.apache.hadoop.hdfs.net.Peer; @@ -455,8 +457,8 @@ public class RemoteBlockReader2 implements BlockReader { } @Override - public ClientMmap getClientMmap(LocatedBlock curBlock, - ClientMmapManager manager) { + public ClientMmap getClientMmap(EnumSet opts, + ClientMmapManager mmapManager) { return null; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java index 870f8e58839..6bb9227883b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java @@ -21,10 +21,13 @@ import java.io.BufferedInputStream; import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.EOFException; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DataChecksum; @@ -67,7 +70,29 @@ public class BlockMetadataHeader { return checksum; } - + /** + * Read the header without changing the position of the FileChannel. + * + * @param fc The FileChannel to read. + * @return the Metadata Header. + * @throws IOException on error. + */ + public static BlockMetadataHeader preadHeader(FileChannel fc) + throws IOException { + byte arr[] = new byte[2 + DataChecksum.HEADER_LEN]; + ByteBuffer buf = ByteBuffer.wrap(arr); + + while (buf.hasRemaining()) { + if (fc.read(buf, 0) <= 0) { + throw new EOFException("unexpected EOF while reading " + + "metadata file header"); + } + } + short version = (short)((arr[0] << 8) | (arr[1] & 0xff)); + DataChecksum dataChecksum = DataChecksum.newDataChecksum(arr, 2); + return new BlockMetadataHeader(version, dataChecksum); + } + /** * This reads all the fields till the beginning of checksum. * @param in diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 78c14bb861f..b961c32bb72 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1394,12 +1394,15 @@ dfs.client.cache.readahead - Just like dfs.datanode.readahead.bytes, this setting causes the datanode to + When using remote reads, this setting causes the datanode to read ahead in the block file using posix_fadvise, potentially decreasing I/O wait times. Unlike dfs.datanode.readahead.bytes, this is a client-side setting rather than a setting for the entire datanode. If present, this setting will override the DataNode default. + When using local reads, this setting determines how much readahead we do in + BlockReaderLocal. + If the native libraries are not available to the DataNode, this configuration has no effect. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 74152e27795..8c29b1c0208 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -58,6 +58,7 @@ import org.apache.hadoop.util.VersionInfo; import java.io.*; import java.net.*; +import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.*; import java.util.concurrent.TimeoutException; @@ -1059,4 +1060,10 @@ public class DFSTestUtil { public static void abortStream(DFSOutputStream out) throws IOException { out.abort(); } + + public static byte[] asArray(ByteBuffer buf) { + byte arr[] = new byte[buf.remaining()]; + buf.duplicate().get(arr); + return arr; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java index 57f5ce979ad..aab4df849c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java @@ -32,6 +32,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; +import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; @@ -92,22 +94,35 @@ public class TestBlockReaderLocal { } } - private static interface BlockReaderLocalTest { - final int TEST_LENGTH = 12345; + private static class BlockReaderLocalTest { + final static int TEST_LENGTH = 12345; + final static int BYTES_PER_CHECKSUM = 512; + + public void setConfiguration(HdfsConfiguration conf) { + // default: no-op + } public void setup(File blockFile, boolean usingChecksums) - throws IOException; + throws IOException { + // default: no-op + } public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException; + throws IOException { + // default: no-op + } } public void runBlockReaderLocalTest(BlockReaderLocalTest test, - boolean checksum) throws IOException { + boolean checksum, long readahead) throws IOException { MiniDFSCluster cluster = null; HdfsConfiguration conf = new HdfsConfiguration(); conf.setBoolean(DFSConfigKeys. DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, !checksum); + conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, + BlockReaderLocalTest.BYTES_PER_CHECKSUM); conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C"); - FileInputStream dataIn = null, checkIn = null; + conf.setLong(DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead); + test.setConfiguration(conf); + FileInputStream dataIn = null, metaIn = null; final Path TEST_PATH = new Path("/a"); final long RANDOM_SEED = 4567L; BlockReaderLocal blockReaderLocal = null; @@ -143,45 +158,51 @@ public class TestBlockReaderLocal { cluster.shutdown(); cluster = null; test.setup(dataFile, checksum); - dataIn = new FileInputStream(dataFile); - checkIn = new FileInputStream(metaFile); - blockReaderLocal = new BlockReaderLocal(new DFSClient.Conf(conf), - TEST_PATH.getName(), block, 0, -1, - dataIn, checkIn, datanodeID, checksum, null); + FileInputStream streams[] = { + new FileInputStream(dataFile), + new FileInputStream(metaFile) + }; + dataIn = streams[0]; + metaIn = streams[1]; + blockReaderLocal = new BlockReaderLocal.Builder( + new DFSClient.Conf(conf)). + setFilename(TEST_PATH.getName()). + setBlock(block). + setStreams(streams). + setDatanodeID(datanodeID). + setCachingStrategy(new CachingStrategy(false, readahead)). + setVerifyChecksum(checksum). + setBlockMetadataHeader(BlockMetadataHeader.preadHeader( + metaIn.getChannel())). + build(); dataIn = null; - checkIn = null; + metaIn = null; test.doTest(blockReaderLocal, original); + // BlockReaderLocal should not alter the file position. + Assert.assertEquals(0, streams[0].getChannel().position()); + Assert.assertEquals(0, streams[1].getChannel().position()); } finally { if (fsIn != null) fsIn.close(); if (fs != null) fs.close(); if (cluster != null) cluster.shutdown(); if (dataIn != null) dataIn.close(); - if (checkIn != null) checkIn.close(); + if (metaIn != null) metaIn.close(); if (blockReaderLocal != null) blockReaderLocal.close(); } } private static class TestBlockReaderLocalImmediateClose - implements BlockReaderLocalTest { - @Override - public void setup(File blockFile, boolean usingChecksums) - throws IOException { } - @Override - public void doTest(BlockReaderLocal reader, byte original[]) - throws IOException { } + extends BlockReaderLocalTest { } @Test public void testBlockReaderLocalImmediateClose() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true); - runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false); + runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true, 0); + runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false, 0); } private static class TestBlockReaderSimpleReads - implements BlockReaderLocalTest { - @Override - public void setup(File blockFile, boolean usingChecksums) - throws IOException { } + extends BlockReaderLocalTest { @Override public void doTest(BlockReaderLocal reader, byte original[]) throws IOException { @@ -194,24 +215,43 @@ public class TestBlockReaderLocal { assertArrayRegionsEqual(original, 1024, buf, 1024, 513); reader.readFully(buf, 1537, 514); assertArrayRegionsEqual(original, 1537, buf, 1537, 514); + // Readahead is always at least the size of one chunk in this test. + Assert.assertTrue(reader.getMaxReadaheadLength() >= + BlockReaderLocalTest.BYTES_PER_CHECKSUM); } } @Test public void testBlockReaderSimpleReads() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true); + runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, + DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderSimpleReadsShortReadahead() throws IOException { + runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, + BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1); } @Test public void testBlockReaderSimpleReadsNoChecksum() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false); + runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, + DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderSimpleReadsNoReadahead() throws IOException { + runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, 0); + } + + @Test + public void testBlockReaderSimpleReadsNoChecksumNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, 0); } private static class TestBlockReaderLocalArrayReads2 - implements BlockReaderLocalTest { - @Override - public void setup(File blockFile, boolean usingChecksums) - throws IOException { } + extends BlockReaderLocalTest { @Override public void doTest(BlockReaderLocal reader, byte original[]) throws IOException { @@ -234,21 +274,30 @@ public class TestBlockReaderLocal { @Test public void testBlockReaderLocalArrayReads2() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), - true); + true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); } @Test public void testBlockReaderLocalArrayReads2NoChecksum() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), - false); + false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderLocalArrayReads2NoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), true, 0); + } + + @Test + public void testBlockReaderLocalArrayReads2NoChecksumNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), false, 0); } private static class TestBlockReaderLocalByteBufferReads - implements BlockReaderLocalTest { - @Override - public void setup(File blockFile, boolean usingChecksums) - throws IOException { } + extends BlockReaderLocalTest { @Override public void doTest(BlockReaderLocal reader, byte original[]) throws IOException { @@ -268,19 +317,105 @@ public class TestBlockReaderLocal { @Test public void testBlockReaderLocalByteBufferReads() throws IOException { - runBlockReaderLocalTest( - new TestBlockReaderLocalByteBufferReads(), true); + runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(), + true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); } @Test public void testBlockReaderLocalByteBufferReadsNoChecksum() throws IOException { runBlockReaderLocalTest( - new TestBlockReaderLocalByteBufferReads(), false); + new TestBlockReaderLocalByteBufferReads(), + false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); } + @Test + public void testBlockReaderLocalByteBufferReadsNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(), + true, 0); + } + + @Test + public void testBlockReaderLocalByteBufferReadsNoChecksumNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(), + false, 0); + } + + /** + * Test reads that bypass the bounce buffer (because they are aligned + * and bigger than the readahead). + */ + private static class TestBlockReaderLocalByteBufferFastLaneReads + extends BlockReaderLocalTest { + @Override + public void doTest(BlockReaderLocal reader, byte original[]) + throws IOException { + ByteBuffer buf = ByteBuffer.allocateDirect(TEST_LENGTH); + readFully(reader, buf, 0, 5120); + buf.flip(); + assertArrayRegionsEqual(original, 0, + DFSTestUtil.asArray(buf), 0, + 5120); + reader.skip(1537); + readFully(reader, buf, 0, 1); + buf.flip(); + assertArrayRegionsEqual(original, 6657, + DFSTestUtil.asArray(buf), 0, + 1); + reader.setMlocked(true); + readFully(reader, buf, 0, 5120); + buf.flip(); + assertArrayRegionsEqual(original, 6658, + DFSTestUtil.asArray(buf), 0, + 5120); + reader.setMlocked(false); + readFully(reader, buf, 0, 513); + buf.flip(); + assertArrayRegionsEqual(original, 11778, + DFSTestUtil.asArray(buf), 0, + 513); + reader.skip(3); + readFully(reader, buf, 0, 50); + buf.flip(); + assertArrayRegionsEqual(original, 12294, + DFSTestUtil.asArray(buf), 0, + 50); + } + } + + @Test + public void testBlockReaderLocalByteBufferFastLaneReads() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(), + true, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM); + } + + @Test + public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksum() + throws IOException { + runBlockReaderLocalTest( + new TestBlockReaderLocalByteBufferFastLaneReads(), + false, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM); + } + + @Test + public void testBlockReaderLocalByteBufferFastLaneReadsNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(), + true, 0); + } + + @Test + public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksumNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(), + false, 0); + } + private static class TestBlockReaderLocalReadCorruptStart - implements BlockReaderLocalTest { + extends BlockReaderLocalTest { boolean usingChecksums = false; @Override public void setup(File blockFile, boolean usingChecksums) @@ -314,11 +449,12 @@ public class TestBlockReaderLocal { @Test public void testBlockReaderLocalReadCorruptStart() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true); + runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true, + DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); } private static class TestBlockReaderLocalReadCorrupt - implements BlockReaderLocalTest { + extends BlockReaderLocalTest { boolean usingChecksums = false; @Override public void setup(File blockFile, boolean usingChecksums) @@ -364,8 +500,136 @@ public class TestBlockReaderLocal { @Test public void testBlockReaderLocalReadCorrupt() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true); - runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false); + runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true, + DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderLocalReadCorruptNoChecksum() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false, + DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderLocalReadCorruptNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true, 0); + } + + @Test + public void testBlockReaderLocalReadCorruptNoChecksumNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false, 0); + } + + private static class TestBlockReaderLocalWithMlockChanges + extends BlockReaderLocalTest { + @Override + public void setup(File blockFile, boolean usingChecksums) + throws IOException { + } + + @Override + public void doTest(BlockReaderLocal reader, byte original[]) + throws IOException { + ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]); + reader.skip(1); + readFully(reader, buf, 1, 9); + assertArrayRegionsEqual(original, 1, buf.array(), 1, 9); + readFully(reader, buf, 10, 100); + assertArrayRegionsEqual(original, 10, buf.array(), 10, 100); + reader.setMlocked(true); + readFully(reader, buf, 110, 700); + assertArrayRegionsEqual(original, 110, buf.array(), 110, 700); + reader.setMlocked(false); + reader.skip(1); // skip from offset 810 to offset 811 + readFully(reader, buf, 811, 5); + assertArrayRegionsEqual(original, 811, buf.array(), 811, 5); + } + } + + @Test + public void testBlockReaderLocalWithMlockChanges() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), + true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderLocalWithMlockChangesNoChecksum() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), + false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderLocalWithMlockChangesNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), + true, 0); + } + + @Test + public void testBlockReaderLocalWithMlockChangesNoChecksumNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), + false, 0); + } + + private static class TestBlockReaderLocalOnFileWithoutChecksum + extends BlockReaderLocalTest { + @Override + public void setConfiguration(HdfsConfiguration conf) { + conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "NULL"); + } + + @Override + public void doTest(BlockReaderLocal reader, byte original[]) + throws IOException { + Assert.assertTrue(!reader.getVerifyChecksum()); + ByteBuffer buf = ByteBuffer.wrap(new byte[TEST_LENGTH]); + reader.skip(1); + readFully(reader, buf, 1, 9); + assertArrayRegionsEqual(original, 1, buf.array(), 1, 9); + readFully(reader, buf, 10, 100); + assertArrayRegionsEqual(original, 10, buf.array(), 10, 100); + reader.setMlocked(true); + readFully(reader, buf, 110, 700); + assertArrayRegionsEqual(original, 110, buf.array(), 110, 700); + reader.setMlocked(false); + reader.skip(1); // skip from offset 810 to offset 811 + readFully(reader, buf, 811, 5); + assertArrayRegionsEqual(original, 811, buf.array(), 811, 5); + } + } + + @Test + public void testBlockReaderLocalOnFileWithoutChecksum() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), + true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderLocalOnFileWithoutChecksumNoChecksum() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), + false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + } + + @Test + public void testBlockReaderLocalOnFileWithoutChecksumNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), + true, 0); + } + + @Test + public void testBlockReaderLocalOnFileWithoutChecksumNoChecksumNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), + false, 0); } @Test(timeout=60000) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java index ed6fd745a54..57f1c117b46 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java @@ -259,7 +259,6 @@ public class TestShortCircuitLocalRead { assertTrue("/ should be a directory", fs.getFileStatus(path) .isDirectory() == true); - // create a new file in home directory. Do not close it. byte[] fileData = AppendTestUtil.randomBytes(seed, size); Path file1 = fs.makeQualified(new Path("filelocal.dat")); FSDataOutputStream stm = createFile(fs, file1, 1);