From f55a1c08763e5f865fd9193d640c89a06ab49c4a Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Wed, 21 Mar 2012 17:30:37 +0000 Subject: [PATCH] HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream. Contributed by Henry Robinson. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1303474 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../org/apache/hadoop/hdfs/BlockReader.java | 4 +- .../apache/hadoop/hdfs/BlockReaderLocal.java | 373 +++++++++++++---- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 + .../apache/hadoop/hdfs/DFSInputStream.java | 99 ++++- .../apache/hadoop/hdfs/RemoteBlockReader.java | 11 +- .../hadoop/hdfs/RemoteBlockReader2.java | 30 +- .../hadoop/hdfs/BlockReaderTestUtil.java | 6 +- .../hadoop/hdfs/TestBlockReaderLocal.java | 106 +++++ .../apache/hadoop/hdfs/TestParallelRead.java | 254 +----------- .../hadoop/hdfs/TestParallelReadUtil.java | 385 ++++++++++++++++++ .../hdfs/TestShortCircuitLocalRead.java | 43 +- 12 files changed, 980 insertions(+), 336 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 209db5cecb8..4c0f825a2e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -57,6 +57,9 @@ Trunk (unreleased changes) OPTIMIZATIONS + HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream. + (Henry Robinson via todd) + BUG FIXES HDFS-2299. TestOfflineEditsViewer is failing on trunk. (Uma Maheswara Rao G diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java index dfab7309b53..6b5b92839b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java @@ -20,11 +20,13 @@ package org.apache.hadoop.hdfs; import java.io.IOException; import java.net.Socket; +import org.apache.hadoop.fs.ByteBufferReadable; + /** * A BlockReader is responsible for reading a single block * from a single datanode. */ -public interface BlockReader { +public interface BlockReader extends ByteBufferReadable { /* same interface as inputStream java.io.InputStream#read() * used by DFSInputStream#read() diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java index cc61697edea..f0cfa45bd3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java @@ -118,20 +118,32 @@ class BlockReaderLocal implements BlockReader { private static Map localDatanodeInfoMap = new HashMap(); private final FileInputStream dataIn; // reader for the data file - private FileInputStream checksumIn; // reader for the checksum file + private final FileInputStream checksumIn; // reader for the checksum file + /** + * Offset from the most recent chunk boundary at which the next read should + * take place. Is only set to non-zero at construction time, and is + * decremented (usually to 0) by subsequent reads. This avoids having to do a + * checksum read at construction to position the read cursor correctly. + */ private int offsetFromChunkBoundary; private byte[] skipBuf = null; - private ByteBuffer dataBuff = null; + + /** + * Used for checksummed reads that need to be staged before copying to their + * output buffer because they are either a) smaller than the checksum chunk + * size or b) issued by the slower read(byte[]...) path + */ + private ByteBuffer slowReadBuff = null; private ByteBuffer checksumBuff = null; private DataChecksum checksum; private final boolean verifyChecksum; private static DirectBufferPool bufferPool = new DirectBufferPool(); - private int bytesPerChecksum; - private int checksumSize; + private final int bytesPerChecksum; + private final int checksumSize; /** offset in block where reader wants to actually read */ private long startOffset; @@ -170,7 +182,7 @@ class BlockReaderLocal implements BlockReader { if (LOG.isDebugEnabled()) { LOG.debug("New BlockReaderLocal for file " + blkfile + " of size " + blkfile.length() + " startOffset " + startOffset + " length " - + length + " short circuit checksum " + skipChecksumCheck); + + length + " short circuit checksum " + !skipChecksumCheck); } if (!skipChecksumCheck) { @@ -254,6 +266,20 @@ class BlockReaderLocal implements BlockReader { DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT); } + private static int getSlowReadBufferNumChunks(Configuration conf, int bytesPerChecksum) { + int bufferSizeBytes = conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY, + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT); + + if (bufferSizeBytes < bytesPerChecksum) { + throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" + bufferSizeBytes + ") " + + "is not large enough to hold a single chunk (" + bytesPerChecksum + "). Please configure " + + DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY + " appropriately"); + } + + // Round down to nearest chunk size + return bufferSizeBytes / bytesPerChecksum; + } + private BlockReaderLocal(Configuration conf, String hdfsfile, ExtendedBlock block, Token token, long startOffset, long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn) @@ -279,33 +305,47 @@ class BlockReaderLocal implements BlockReader { this.dataIn = dataIn; this.checksumIn = checksumIn; this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset); - dataBuff = bufferPool.getBuffer(bytesPerChecksum*64); - checksumBuff = bufferPool.getBuffer(checksumSize*64); - //Initially the buffers have nothing to read. - dataBuff.flip(); + + int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum); + slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead); + checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead); + // Initially the buffers have nothing to read. + slowReadBuff.flip(); checksumBuff.flip(); - long toSkip = firstChunkOffset; - while (toSkip > 0) { - long skipped = dataIn.skip(toSkip); - if (skipped == 0) { - throw new IOException("Couldn't initialize input stream"); - } - toSkip -= skipped; - } - if (checksumIn != null) { - long checkSumOffset = (firstChunkOffset / bytesPerChecksum) - * checksumSize; - while (checkSumOffset > 0) { - long skipped = checksumIn.skip(checkSumOffset); + boolean success = false; + try { + // Skip both input streams to beginning of the chunk containing startOffset + long toSkip = firstChunkOffset; + while (toSkip > 0) { + long skipped = dataIn.skip(toSkip); if (skipped == 0) { - throw new IOException("Couldn't initialize checksum input stream"); + throw new IOException("Couldn't initialize input stream"); } - checkSumOffset -= skipped; + toSkip -= skipped; + } + if (checksumIn != null) { + long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize; + while (checkSumOffset > 0) { + long skipped = checksumIn.skip(checkSumOffset); + if (skipped == 0) { + throw new IOException("Couldn't initialize checksum input stream"); + } + checkSumOffset -= skipped; + } + } + success = true; + } finally { + if (!success) { + bufferPool.returnBuffer(slowReadBuff); + bufferPool.returnBuffer(checksumBuff); } } } - private int readIntoBuffer(FileInputStream stream, ByteBuffer buf) + /** + * Reads bytes into a buffer until EOF or the buffer's limit is reached + */ + private int fillBuffer(FileInputStream stream, ByteBuffer buf) throws IOException { int bytesRead = stream.getChannel().read(buf); if (bytesRead < 0) { @@ -323,45 +363,229 @@ class BlockReaderLocal implements BlockReader { return bytesRead; } + /** + * Utility method used by read(ByteBuffer) to partially copy a ByteBuffer into + * another. + */ + private void writeSlice(ByteBuffer from, ByteBuffer to, int length) { + int oldLimit = from.limit(); + from.limit(from.position() + length); + try { + to.put(from); + } finally { + from.limit(oldLimit); + } + } + + @Override + public synchronized int read(ByteBuffer buf) throws IOException { + int nRead = 0; + if (verifyChecksum) { + // A 'direct' read actually has three phases. The first drains any + // remaining bytes from the slow read buffer. After this the read is + // guaranteed to be on a checksum chunk boundary. If there are still bytes + // to read, the fast direct path is used for as many remaining bytes as + // possible, up to a multiple of the checksum chunk size. Finally, any + // 'odd' bytes remaining at the end of the read cause another slow read to + // be issued, which involves an extra copy. + + // Every 'slow' read tries to fill the slow read buffer in one go for + // efficiency's sake. As described above, all non-checksum-chunk-aligned + // reads will be served from the slower read path. + + if (slowReadBuff.hasRemaining()) { + // There are remaining bytes from a small read available. This usually + // means this read is unaligned, which falls back to the slow path. + int fromSlowReadBuff = Math.min(buf.remaining(), slowReadBuff.remaining()); + writeSlice(slowReadBuff, buf, fromSlowReadBuff); + nRead += fromSlowReadBuff; + } + + if (buf.remaining() >= bytesPerChecksum && offsetFromChunkBoundary == 0) { + // Since we have drained the 'small read' buffer, we are guaranteed to + // be chunk-aligned + int len = buf.remaining() - (buf.remaining() % bytesPerChecksum); + + // There's only enough checksum buffer space available to checksum one + // entire slow read buffer. This saves keeping the number of checksum + // chunks around. + len = Math.min(len, slowReadBuff.capacity()); + int oldlimit = buf.limit(); + buf.limit(buf.position() + len); + int readResult = 0; + try { + readResult = doByteBufferRead(buf); + } finally { + buf.limit(oldlimit); + } + if (readResult == -1) { + return nRead; + } else { + nRead += readResult; + buf.position(buf.position() + readResult); + } + } + + // offsetFromChunkBoundary > 0 => unaligned read, use slow path to read + // until chunk boundary + if ((buf.remaining() > 0 && buf.remaining() < bytesPerChecksum) || offsetFromChunkBoundary > 0) { + int toRead = Math.min(buf.remaining(), bytesPerChecksum - offsetFromChunkBoundary); + int readResult = fillSlowReadBuffer(toRead); + if (readResult == -1) { + return nRead; + } else { + int fromSlowReadBuff = Math.min(readResult, buf.remaining()); + writeSlice(slowReadBuff, buf, fromSlowReadBuff); + nRead += fromSlowReadBuff; + } + } + } else { + // Non-checksummed reads are much easier; we can just fill the buffer directly. + nRead = doByteBufferRead(buf); + if (nRead > 0) { + buf.position(buf.position() + nRead); + } + } + return nRead; + } + + /** + * Tries to read as many bytes as possible into supplied buffer, checksumming + * each chunk if needed. + * + * Preconditions: + *
    + *
  • + * If checksumming is enabled, buf.remaining must be a multiple of + * bytesPerChecksum. Note that this is not a requirement for clients of + * read(ByteBuffer) - in the case of non-checksum-sized read requests, + * read(ByteBuffer) will substitute a suitably sized buffer to pass to this + * method. + *
  • + *
+ * Postconditions: + *
    + *
  • buf.limit and buf.mark are unchanged.
  • + *
  • buf.position += min(offsetFromChunkBoundary, totalBytesRead) - so the + * requested bytes can be read straight from the buffer
  • + *
+ * + * @param buf + * byte buffer to write bytes to. If checksums are not required, buf + * can have any number of bytes remaining, otherwise there must be a + * multiple of the checksum chunk size remaining. + * @return max(min(totalBytesRead, len) - offsetFromChunkBoundary, 0) + * that is, the the number of useful bytes (up to the amount + * requested) readable from the buffer by the client. + */ + private synchronized int doByteBufferRead(ByteBuffer buf) throws IOException { + if (verifyChecksum) { + assert buf.remaining() % bytesPerChecksum == 0; + } + int dataRead = -1; + + int oldpos = buf.position(); + // Read as much as we can into the buffer. + dataRead = fillBuffer(dataIn, buf); + + if (dataRead == -1) { + return -1; + } + + if (verifyChecksum) { + ByteBuffer toChecksum = buf.duplicate(); + toChecksum.position(oldpos); + toChecksum.limit(oldpos + dataRead); + + checksumBuff.clear(); + // Equivalent to (int)Math.ceil(toChecksum.remaining() * 1.0 / bytesPerChecksum ); + int numChunks = + (toChecksum.remaining() + bytesPerChecksum - 1) / bytesPerChecksum; + checksumBuff.limit(checksumSize * numChunks); + + fillBuffer(checksumIn, checksumBuff); + checksumBuff.flip(); + + checksum.verifyChunkedSums(toChecksum, checksumBuff, filename, + this.startOffset); + } + + if (dataRead >= 0) { + buf.position(oldpos + Math.min(offsetFromChunkBoundary, dataRead)); + } + + if (dataRead < offsetFromChunkBoundary) { + // yikes, didn't even get enough bytes to honour offset. This can happen + // even if we are verifying checksums if we are at EOF. + offsetFromChunkBoundary -= dataRead; + dataRead = 0; + } else { + dataRead -= offsetFromChunkBoundary; + offsetFromChunkBoundary = 0; + } + + return dataRead; + } + + /** + * Ensures that up to len bytes are available and checksummed in the slow read + * buffer. The number of bytes available to read is returned. If the buffer is + * not already empty, the number of remaining bytes is returned and no actual + * read happens. + * + * @param len + * the maximum number of bytes to make available. After len bytes + * are read, the underlying bytestream must be at a checksum + * boundary, or EOF. That is, (len + currentPosition) % + * bytesPerChecksum == 0. + * @return the number of bytes available to read, or -1 if EOF. + */ + private synchronized int fillSlowReadBuffer(int len) throws IOException { + int nRead = -1; + if (slowReadBuff.hasRemaining()) { + // Already got data, good to go. + nRead = Math.min(len, slowReadBuff.remaining()); + } else { + // Round a complete read of len bytes (plus any implicit offset) to the + // next chunk boundary, since we try and read in multiples of a chunk + int nextChunk = len + offsetFromChunkBoundary + + (bytesPerChecksum - ((len + offsetFromChunkBoundary) % bytesPerChecksum)); + int limit = Math.min(nextChunk, slowReadBuff.capacity()); + assert limit % bytesPerChecksum == 0; + + slowReadBuff.clear(); + slowReadBuff.limit(limit); + + nRead = doByteBufferRead(slowReadBuff); + + if (nRead > 0) { + // So that next time we call slowReadBuff.hasRemaining(), we don't get a + // false positive. + slowReadBuff.limit(nRead + slowReadBuff.position()); + } + } + return nRead; + } + @Override public synchronized int read(byte[] buf, int off, int len) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.info("read off " + off + " len " + len); + if (LOG.isTraceEnabled()) { + LOG.trace("read off " + off + " len " + len); } if (!verifyChecksum) { return dataIn.read(buf, off, len); - } else { - int dataRead = -1; - if (dataBuff.remaining() == 0) { - dataBuff.clear(); - checksumBuff.clear(); - dataRead = readIntoBuffer(dataIn, dataBuff); - readIntoBuffer(checksumIn, checksumBuff); - checksumBuff.flip(); - dataBuff.flip(); - checksum.verifyChunkedSums(dataBuff, checksumBuff, filename, - this.startOffset); - } else { - dataRead = dataBuff.remaining(); - } - if (dataRead > 0) { - int nRead = Math.min(dataRead - offsetFromChunkBoundary, len); - if (offsetFromChunkBoundary > 0) { - dataBuff.position(offsetFromChunkBoundary); - // Its either end of file or dataRead is greater than the - // offsetFromChunkBoundary - offsetFromChunkBoundary = 0; - } - if (nRead > 0) { - dataBuff.get(buf, off, nRead); - return nRead; - } else { - return 0; - } - } else { - return -1; - } } + + int nRead = fillSlowReadBuffer(slowReadBuff.capacity()); + + if (nRead > 0) { + // Possible that buffer is filled with a larger read than we need, since + // we tried to read as much as possible at once + nRead = Math.min(len, nRead); + slowReadBuff.get(buf, off, nRead); + } + + return nRead; } @Override @@ -377,20 +601,20 @@ class BlockReaderLocal implements BlockReader { } // caller made sure newPosition is not beyond EOF. - int remaining = dataBuff.remaining(); - int position = dataBuff.position(); + int remaining = slowReadBuff.remaining(); + int position = slowReadBuff.position(); int newPosition = position + (int)n; // if the new offset is already read into dataBuff, just reposition if (n <= remaining) { assert offsetFromChunkBoundary == 0; - dataBuff.position(newPosition); + slowReadBuff.position(newPosition); return n; } // for small gap, read through to keep the data/checksum in sync if (n - remaining <= bytesPerChecksum) { - dataBuff.position(position + remaining); + slowReadBuff.position(position + remaining); if (skipBuf == null) { skipBuf = new byte[bytesPerChecksum]; } @@ -401,11 +625,16 @@ class BlockReaderLocal implements BlockReader { // optimize for big gap: discard the current buffer, skip to // the beginning of the appropriate checksum chunk and then // read to the middle of that chunk to be in sync with checksums. - this.offsetFromChunkBoundary = newPosition % bytesPerChecksum; - long toskip = n - remaining - this.offsetFromChunkBoundary; - dataBuff.clear(); - checksumBuff.clear(); + // We can't use this.offsetFromChunkBoundary because we need to know how + // many bytes of the offset were really read. Calling read(..) with a + // positive this.offsetFromChunkBoundary causes that many bytes to get + // silently skipped. + int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum; + long toskip = n - remaining - myOffsetFromChunkBoundary; + + slowReadBuff.position(slowReadBuff.limit()); + checksumBuff.position(checksumBuff.limit()); long dataSkipped = dataIn.skip(toskip); if (dataSkipped != toskip) { @@ -424,8 +653,10 @@ class BlockReaderLocal implements BlockReader { skipBuf = new byte[bytesPerChecksum]; } assert skipBuf.length == bytesPerChecksum; - assert this.offsetFromChunkBoundary < bytesPerChecksum; - int ret = read(skipBuf, 0, this.offsetFromChunkBoundary); + assert myOffsetFromChunkBoundary < bytesPerChecksum; + + int ret = read(skipBuf, 0, myOffsetFromChunkBoundary); + if (ret == -1) { // EOS return toskip; } else { @@ -439,9 +670,9 @@ class BlockReaderLocal implements BlockReader { if (checksumIn != null) { checksumIn.close(); } - if (dataBuff != null) { - bufferPool.returnBuffer(dataBuff); - dataBuff = null; + if (slowReadBuff != null) { + bufferPool.returnBuffer(slowReadBuff); + slowReadBuff = null; } if (checksumBuff != null) { bufferPool.returnBuffer(checksumBuff); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 4187f1c5c76..db38bdf1224 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -287,6 +287,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = false; public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum"; public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false; + public static final String DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY = "dfs.client.read.shortcircuit.buffer.size"; + public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024; // property for fsimage compression public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 71c8a500a83..04089c9c3d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; +import java.nio.ByteBuffer; import java.util.AbstractMap; import java.util.ArrayList; import java.util.HashMap; @@ -33,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; @@ -54,16 +56,16 @@ import org.apache.hadoop.security.token.Token; * negotiation of the namenode and various datanodes as necessary. ****************************************************************/ @InterfaceAudience.Private -public class DFSInputStream extends FSInputStream { +public class DFSInputStream extends FSInputStream implements ByteBufferReadable { private final SocketCache socketCache; private final DFSClient dfsClient; private boolean closed = false; private final String src; - private long prefetchSize; + private final long prefetchSize; private BlockReader blockReader = null; - private boolean verifyChecksum; + private final boolean verifyChecksum; private LocatedBlocks locatedBlocks = null; private long lastBlockBeingWrittenLength = 0; private DatanodeInfo currentNode = null; @@ -83,17 +85,17 @@ public class DFSInputStream extends FSInputStream { * capped at maxBlockAcquireFailures */ private int failures = 0; - private int timeWindow; + private final int timeWindow; /* XXX Use of CocurrentHashMap is temp fix. Need to fix * parallel accesses to DFSInputStream (through ptreads) properly */ - private ConcurrentHashMap deadNodes = + private final ConcurrentHashMap deadNodes = new ConcurrentHashMap(); private int buffersize = 1; - private byte[] oneByteBuf = new byte[1]; // used for 'int read()' + private final byte[] oneByteBuf = new byte[1]; // used for 'int read()' - private int nCachedConnRetry; + private final int nCachedConnRetry; void addToDeadNodes(DatanodeInfo dnInfo) { deadNodes.put(dnInfo, dnInfo); @@ -466,11 +468,63 @@ public class DFSInputStream extends FSInputStream { return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff); } + /** + * Wraps different possible read implementations so that readBuffer can be + * strategy-agnostic. + */ + private interface ReaderStrategy { + public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException; + } + + /** + * Used to read bytes into a byte[] + */ + private static class ByteArrayStrategy implements ReaderStrategy { + final byte[] buf; + + public ByteArrayStrategy(byte[] buf) { + this.buf = buf; + } + + @Override + public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException { + return blockReader.read(buf, off, len); + } + } + + /** + * Used to read bytes into a user-supplied ByteBuffer + */ + private static class ByteBufferStrategy implements ReaderStrategy { + final ByteBuffer buf; + ByteBufferStrategy(ByteBuffer buf) { + this.buf = buf; + } + + @Override + public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException { + int oldpos = buf.position(); + int oldlimit = buf.limit(); + boolean success = false; + try { + int ret = blockReader.read(buf); + success = true; + return ret; + } finally { + if (!success) { + // Reset to original state so that retries work correctly. + buf.position(oldpos); + buf.limit(oldlimit); + } + } + } + } + /* This is a used by regular read() and handles ChecksumExceptions. * name readBuffer() is chosen to imply similarity to readBuffer() in * ChecksumFileSystem */ - private synchronized int readBuffer(byte buf[], int off, int len, + private synchronized int readBuffer(ReaderStrategy reader, int off, int len, Map> corruptedBlockMap) throws IOException { IOException ioe; @@ -486,7 +540,7 @@ public class DFSInputStream extends FSInputStream { while (true) { // retry as many times as seekToNewSource allows. try { - return blockReader.read(buf, off, len); + return reader.doRead(blockReader, off, len); } catch ( ChecksumException ce ) { DFSClient.LOG.warn("Found Checksum error for " + getCurrentBlock() + " from " + currentNode.getName() @@ -522,11 +576,7 @@ public class DFSInputStream extends FSInputStream { } } - /** - * Read the entire buffer. - */ - @Override - public synchronized int read(byte buf[], int off, int len) throws IOException { + private int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException { dfsClient.checkOpen(); if (closed) { throw new IOException("Stream closed"); @@ -544,7 +594,7 @@ public class DFSInputStream extends FSInputStream { currentNode = blockSeekTo(pos); } int realLen = (int) Math.min(len, (blockEnd - pos + 1L)); - int result = readBuffer(buf, off, realLen, corruptedBlockMap); + int result = readBuffer(strategy, off, realLen, corruptedBlockMap); if (result >= 0) { pos += result; @@ -578,6 +628,24 @@ public class DFSInputStream extends FSInputStream { return -1; } + /** + * Read the entire buffer. + */ + @Override + public synchronized int read(final byte buf[], int off, int len) throws IOException { + ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf); + + return readWithStrategy(byteArrayReader, off, len); + } + + @Override + public synchronized int read(final ByteBuffer buf) throws IOException { + ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf); + + return readWithStrategy(byteBufferReader, 0, buf.remaining()); + } + + /** * Add corrupted block replica into map. * @param corruptedBlockMap @@ -1052,5 +1120,4 @@ public class DFSInputStream extends FSInputStream { this.addr = addr; } } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index b7da8d4d8f5..c5c0e295b54 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -56,7 +56,7 @@ import org.apache.hadoop.util.DataChecksum; public class RemoteBlockReader extends FSInputChecker implements BlockReader { Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read. - private DataInputStream in; + private final DataInputStream in; private DataChecksum checksum; /** offset in block of the last chunk received */ @@ -71,8 +71,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { if startOffset is not chunk-aligned */ private final long firstChunkOffset; - private int bytesPerChecksum; - private int checksumSize; + private final int bytesPerChecksum; + private final int checksumSize; /** * The total number of bytes we need to transfer from the DN. @@ -479,4 +479,9 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { return s.toString() + ":" + poolId + ":" + blockId; } + @Override + public int read(ByteBuffer buf) throws IOException { + throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader"); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index ea247775714..0713de8ca8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -84,7 +84,7 @@ public class RemoteBlockReader2 implements BlockReader { static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class); Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read. - private ReadableByteChannel in; + private final ReadableByteChannel in; private DataChecksum checksum; private PacketHeader curHeader; @@ -100,11 +100,11 @@ public class RemoteBlockReader2 implements BlockReader { private final String filename; private static DirectBufferPool bufferPool = new DirectBufferPool(); - private ByteBuffer headerBuf = ByteBuffer.allocate( + private final ByteBuffer headerBuf = ByteBuffer.allocate( PacketHeader.PKT_HEADER_LEN); - private int bytesPerChecksum; - private int checksumSize; + private final int bytesPerChecksum; + private final int checksumSize; /** * The total number of bytes we need to transfer from the DN. @@ -140,6 +140,26 @@ public class RemoteBlockReader2 implements BlockReader { return nRead; } + + @Override + public int read(ByteBuffer buf) throws IOException { + if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { + readNextPacket(); + } + if (curDataSlice.remaining() == 0) { + // we're at EOF now + return -1; + } + + int nRead = Math.min(curDataSlice.remaining(), buf.remaining()); + ByteBuffer writeSlice = curDataSlice.duplicate(); + writeSlice.limit(writeSlice.position() + nRead); + buf.put(writeSlice); + curDataSlice.position(writeSlice.position()); + + return nRead; + } + private void readNextPacket() throws IOException { Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock()); @@ -325,6 +345,7 @@ public class RemoteBlockReader2 implements BlockReader { /** * Take the socket used to talk to the DN. */ + @Override public Socket takeSocket() { assert hasSentStatusCode() : "BlockReader shouldn't give back sockets mid-read"; @@ -337,6 +358,7 @@ public class RemoteBlockReader2 implements BlockReader { * Whether the BlockReader has reached the end of its input stream * and successfully sent a status code back to the datanode. */ + @Override public boolean hasSentStatusCode() { return sentStatusCode; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java index 9d4f4a2e197..a920865f42f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java @@ -49,7 +49,11 @@ public class BlockReaderTestUtil { * Setup the cluster */ public BlockReaderTestUtil(int replicationFactor) throws Exception { - conf = new HdfsConfiguration(); + this(replicationFactor, new HdfsConfiguration()); + } + + public BlockReaderTestUtil(int replicationFactor, HdfsConfiguration config) throws Exception { + this.conf = config; conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, replicationFactor); cluster = new MiniDFSCluster.Builder(conf).format(true).build(); cluster.waitActive(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java new file mode 100644 index 00000000000..ea1b58c7afc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +public class TestBlockReaderLocal { + static MiniDFSCluster cluster; + static HdfsConfiguration conf; + + @BeforeClass + public static void setupCluster() throws IOException { + conf = new HdfsConfiguration(); + + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, + false); + conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY, + UserGroupInformation.getCurrentUser().getShortUserName()); + + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + } + + @AfterClass + public static void teardownCluster() { + cluster.shutdown(); + } + + /** + * Test that, in the case of an error, the position and limit of a ByteBuffer + * are left unchanged. This is not mandated by ByteBufferReadable, but clients + * of this class might immediately issue a retry on failure, so it's polite. + */ + @Test + public void testStablePositionAfterCorruptRead() throws IOException { + final short REPL_FACTOR = 1; + final long FILE_LENGTH = 512L; + cluster.waitActive(); + FileSystem fs = cluster.getFileSystem(); + + Path path = new Path("/corrupted"); + + DFSTestUtil.createFile(fs, path, FILE_LENGTH, REPL_FACTOR, 12345L); + DFSTestUtil.waitReplication(fs, path, REPL_FACTOR); + + ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, path); + int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block); + assertEquals("All replicas not corrupted", REPL_FACTOR, blockFilesCorrupted); + + FSDataInputStream dis = cluster.getFileSystem().open(path); + ByteBuffer buf = ByteBuffer.allocateDirect((int)FILE_LENGTH); + boolean sawException = false; + try { + dis.read(buf); + } catch (ChecksumException ex) { + sawException = true; + } + + assertTrue(sawException); + assertEquals(0, buf.position()); + assertEquals(buf.capacity(), buf.limit()); + + dis = cluster.getFileSystem().open(path); + buf.position(3); + buf.limit(25); + sawException = false; + try { + dis.read(buf); + } catch (ChecksumException ex) { + sawException = true; + } + + assertTrue(sawException); + assertEquals(3, buf.position()); + assertEquals(25, buf.limit()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java index bbd001236f2..b4320520354 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelRead.java @@ -18,177 +18,21 @@ package org.apache.hadoop.hdfs; import java.io.IOException; -import java.util.Random; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.server.datanode.DataNode; -import org.apache.log4j.Level; -import org.apache.log4j.LogManager; - -import org.junit.Test; import org.junit.AfterClass; import org.junit.BeforeClass; -import static org.junit.Assert.*; +import org.junit.Test; -/** - * Test the use of DFSInputStream by multiple concurrent readers. - */ -public class TestParallelRead { - - static final Log LOG = LogFactory.getLog(TestParallelRead.class); - static BlockReaderTestUtil util = null; - static DFSClient dfsClient = null; - static final int FILE_SIZE_K = 256; - static Random rand = null; - - static { - // The client-trace log ends up causing a lot of blocking threads - // in this when it's being used as a performance benchmark. - LogManager.getLogger(DataNode.class.getName() + ".clienttrace") - .setLevel(Level.WARN); - } - - private class TestFileInfo { - public DFSInputStream dis; - public Path filepath; - public byte[] authenticData; - } +public class TestParallelRead extends TestParallelReadUtil { @BeforeClass - public static void setupCluster() throws Exception { - final int REPLICATION_FACTOR = 2; - util = new BlockReaderTestUtil(REPLICATION_FACTOR); - dfsClient = util.getDFSClient(); - rand = new Random(System.currentTimeMillis()); + static public void setupCluster() throws Exception { + setupCluster(DEFAULT_REPLICATION_FACTOR, new HdfsConfiguration()); } - /** - * A worker to do one "unit" of read. - */ - static class ReadWorker extends Thread { - static public final int N_ITERATIONS = 1024; - - private static final double PROPORTION_NON_POSITIONAL_READ = 0.10; - - private TestFileInfo testInfo; - private long fileSize; - private long bytesRead; - private boolean error; - - ReadWorker(TestFileInfo testInfo, int id) { - super("ReadWorker-" + id + "-" + testInfo.filepath.toString()); - this.testInfo = testInfo; - fileSize = testInfo.dis.getFileLength(); - assertEquals(fileSize, testInfo.authenticData.length); - bytesRead = 0; - error = false; - } - - /** - * Randomly do one of (1) Small read; and (2) Large Pread. - */ - @Override - public void run() { - for (int i = 0; i < N_ITERATIONS; ++i) { - int startOff = rand.nextInt((int) fileSize); - int len = 0; - try { - double p = rand.nextDouble(); - if (p < PROPORTION_NON_POSITIONAL_READ) { - // Do a small regular read. Very likely this will leave unread - // data on the socket and make the socket uncacheable. - len = Math.min(rand.nextInt(64), (int) fileSize - startOff); - read(startOff, len); - bytesRead += len; - } else { - // Do a positional read most of the time. - len = rand.nextInt((int) (fileSize - startOff)); - pRead(startOff, len); - bytesRead += len; - } - } catch (Throwable t) { - LOG.error(getName() + ": Error while testing read at " + startOff + - " length " + len); - error = true; - fail(t.getMessage()); - } - } - } - - public long getBytesRead() { - return bytesRead; - } - - /** - * Raising error in a thread doesn't seem to fail the test. - * So check afterwards. - */ - public boolean hasError() { - return error; - } - - /** - * Seek to somewhere random and read. - */ - private void read(int start, int len) throws Exception { - assertTrue( - "Bad args: " + start + " + " + len + " should be <= " + fileSize, - start + len <= fileSize); - DFSInputStream dis = testInfo.dis; - - synchronized (dis) { - dis.seek(start); - - byte buf[] = new byte[len]; - int cnt = 0; - while (cnt < len) { - cnt += dis.read(buf, cnt, buf.length - cnt); - } - verifyData("Read data corrupted", buf, start, start + len); - } - } - - /** - * Positional read. - */ - private void pRead(int start, int len) throws Exception { - assertTrue( - "Bad args: " + start + " + " + len + " should be <= " + fileSize, - start + len <= fileSize); - DFSInputStream dis = testInfo.dis; - - byte buf[] = new byte[len]; - int cnt = 0; - while (cnt < len) { - cnt += dis.read(start, buf, cnt, buf.length - cnt); - } - verifyData("Pread data corrupted", buf, start, start + len); - } - - /** - * Verify read data vs authentic data - */ - private void verifyData(String msg, byte actual[], int start, int end) - throws Exception { - byte auth[] = testInfo.authenticData; - if (end > auth.length) { - throw new Exception(msg + ": Actual array (" + end + - ") is past the end of authentic data (" + - auth.length + ")"); - } - - int j = start; - for (int i = 0; i < actual.length; ++i, ++j) { - if (auth[j] != actual[i]) { - throw new Exception(msg + ": Arrays byte " + i + " (at offset " + - j + ") differs: expect " + - auth[j] + " got " + actual[i]); - } - } - } + @AfterClass + static public void teardownCluster() throws Exception { + TestParallelReadUtil.teardownCluster(); } /** @@ -199,85 +43,17 @@ public class TestParallelRead { * need to be manually collected, which is inconvenient. */ @Test - public void testParallelRead() throws IOException { - if (!runParallelRead(1, 4)) { - fail("Check log for errors"); - } - if (!runParallelRead(1, 16)) { - fail("Check log for errors"); - } - if (!runParallelRead(2, 4)) { - fail("Check log for errors"); - } + public void testParallelReadCopying() throws IOException { + runTestWorkload(new CopyingReadWorkerHelper()); } - /** - * Start the parallel read with the given parameters. - */ - boolean runParallelRead(int nFiles, int nWorkerEach) throws IOException { - ReadWorker workers[] = new ReadWorker[nFiles * nWorkerEach]; - TestFileInfo testInfoArr[] = new TestFileInfo[nFiles]; - - // Prepare the files and workers - int nWorkers = 0; - for (int i = 0; i < nFiles; ++i) { - TestFileInfo testInfo = new TestFileInfo(); - testInfoArr[i] = testInfo; - - testInfo.filepath = new Path("/TestParallelRead.dat." + i); - testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K); - testInfo.dis = dfsClient.open(testInfo.filepath.toString()); - - for (int j = 0; j < nWorkerEach; ++j) { - workers[nWorkers++] = new ReadWorker(testInfo, nWorkers); - } - } - - // Start the workers and wait - long starttime = System.currentTimeMillis(); - for (ReadWorker worker : workers) { - worker.start(); - } - - for (ReadWorker worker : workers) { - try { - worker.join(); - } catch (InterruptedException ignored) { } - } - long endtime = System.currentTimeMillis(); - - // Cleanup - for (TestFileInfo testInfo : testInfoArr) { - testInfo.dis.close(); - } - - // Report - boolean res = true; - long totalRead = 0; - for (ReadWorker worker : workers) { - long nread = worker.getBytesRead(); - LOG.info("--- Report: " + worker.getName() + " read " + nread + " B; " + - "average " + nread / ReadWorker.N_ITERATIONS + " B per read"); - totalRead += nread; - if (worker.hasError()) { - res = false; - } - } - - double timeTakenSec = (endtime - starttime) / 1000.0; - long totalReadKB = totalRead / 1024; - LOG.info("=== Report: " + nWorkers + " threads read " + - totalReadKB + " KB (across " + - nFiles + " file(s)) in " + - timeTakenSec + "s; average " + - totalReadKB / timeTakenSec + " KB/s"); - - return res; + @Test + public void testParallelReadByteBuffer() throws IOException { + runTestWorkload(new DirectReadWorkerHelper()); } - @AfterClass - public static void teardownCluster() throws Exception { - util.shutdown(); + @Test + public void testParallelReadMixed() throws IOException { + runTestWorkload(new MixedWorkloadHelper()); } - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java new file mode 100644 index 00000000000..0a0fc67343a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestParallelReadUtil.java @@ -0,0 +1,385 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Random; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; + +import static org.junit.Assert.*; + +/** + * Driver class for testing the use of DFSInputStream by multiple concurrent + * readers, using the different read APIs. See subclasses for the actual test + * cases. + */ +public class TestParallelReadUtil { + + static final Log LOG = LogFactory.getLog(TestParallelReadUtil.class); + static BlockReaderTestUtil util = null; + static DFSClient dfsClient = null; + static final int FILE_SIZE_K = 256; + static Random rand = null; + static final int DEFAULT_REPLICATION_FACTOR = 2; + + static { + // The client-trace log ends up causing a lot of blocking threads + // in this when it's being used as a performance benchmark. + LogManager.getLogger(DataNode.class.getName() + ".clienttrace") + .setLevel(Level.WARN); + } + + private class TestFileInfo { + public DFSInputStream dis; + public Path filepath; + public byte[] authenticData; + } + + public static void setupCluster(int replicationFactor, HdfsConfiguration conf) throws Exception { + util = new BlockReaderTestUtil(replicationFactor, conf); + dfsClient = util.getDFSClient(); + long seed = System.currentTimeMillis(); + LOG.info("Random seed: " + seed); + rand = new Random(seed); + } + + /** + * Providers of this interface implement two different read APIs. Instances of + * this interface are shared across all ReadWorkerThreads, so should be stateless. + */ + static interface ReadWorkerHelper { + public int read(DFSInputStream dis, byte[] target, int startOff, int len) throws IOException; + public int pRead(DFSInputStream dis, byte[] target, int startOff, int len) throws IOException; + } + + /** + * Uses read(ByteBuffer...) style APIs + */ + static class DirectReadWorkerHelper implements ReadWorkerHelper { + @Override + public int read(DFSInputStream dis, byte[] target, int startOff, int len) throws IOException { + ByteBuffer bb = ByteBuffer.wrap(target); + int cnt = 0; + synchronized(dis) { + dis.seek(startOff); + while (cnt < len) { + int read = dis.read(bb); + if (read == -1) { + return read; + } + cnt += read; + } + } + return cnt; + } + + @Override + public int pRead(DFSInputStream dis, byte[] target, int startOff, int len) throws IOException { + // No pRead for bb read path + return read(dis, target, startOff, len); + } + + } + + /** + * Uses the read(byte[]...) style APIs + */ + static class CopyingReadWorkerHelper implements ReadWorkerHelper { + + @Override + public int read(DFSInputStream dis, byte[] target, int startOff, int len) + throws IOException { + int cnt = 0; + synchronized(dis) { + dis.seek(startOff); + while (cnt < len) { + int read = dis.read(target, cnt, len - cnt); + if (read == -1) { + return read; + } + cnt += read; + } + } + return cnt; + } + + @Override + public int pRead(DFSInputStream dis, byte[] target, int startOff, int len) + throws IOException { + int cnt = 0; + while (cnt < len) { + int read = dis.read(startOff, target, cnt, len - cnt); + if (read == -1) { + return read; + } + cnt += read; + } + return cnt; + } + + } + + /** + * Uses a mix of both copying + */ + static class MixedWorkloadHelper implements ReadWorkerHelper { + private final DirectReadWorkerHelper bb = new DirectReadWorkerHelper(); + private final CopyingReadWorkerHelper copy = new CopyingReadWorkerHelper(); + private final double COPYING_PROBABILITY = 0.5; + + @Override + public int read(DFSInputStream dis, byte[] target, int startOff, int len) + throws IOException { + double p = rand.nextDouble(); + if (p > COPYING_PROBABILITY) { + return bb.read(dis, target, startOff, len); + } else { + return copy.read(dis, target, startOff, len); + } + } + + @Override + public int pRead(DFSInputStream dis, byte[] target, int startOff, int len) + throws IOException { + double p = rand.nextDouble(); + if (p > COPYING_PROBABILITY) { + return bb.pRead(dis, target, startOff, len); + } else { + return copy.pRead(dis, target, startOff, len); + } + } + + } + + /** + * A worker to do one "unit" of read. + */ + static class ReadWorker extends Thread { + + static public final int N_ITERATIONS = 1024 * 4; + + private static final double PROPORTION_NON_POSITIONAL_READ = 0.10; + + private final TestFileInfo testInfo; + private final long fileSize; + private long bytesRead; + private boolean error; + private final ReadWorkerHelper helper; + + ReadWorker(TestFileInfo testInfo, int id, ReadWorkerHelper helper) { + super("ReadWorker-" + id + "-" + testInfo.filepath.toString()); + this.testInfo = testInfo; + this.helper = helper; + fileSize = testInfo.dis.getFileLength(); + assertEquals(fileSize, testInfo.authenticData.length); + bytesRead = 0; + error = false; + } + + /** + * Randomly do one of (1) Small read; and (2) Large Pread. + */ + @Override + public void run() { + for (int i = 0; i < N_ITERATIONS; ++i) { + int startOff = rand.nextInt((int) fileSize); + int len = 0; + try { + double p = rand.nextDouble(); + if (p < PROPORTION_NON_POSITIONAL_READ) { + // Do a small regular read. Very likely this will leave unread + // data on the socket and make the socket uncacheable. + len = Math.min(rand.nextInt(64), (int) fileSize - startOff); + read(startOff, len); + bytesRead += len; + } else { + // Do a positional read most of the time. + len = rand.nextInt((int) (fileSize - startOff)); + pRead(startOff, len); + bytesRead += len; + } + } catch (Throwable t) { + LOG.error(getName() + ": Error while testing read at " + startOff + + " length " + len, t); + error = true; + fail(t.getMessage()); + } + } + } + + public long getBytesRead() { + return bytesRead; + } + + /** + * Raising error in a thread doesn't seem to fail the test. + * So check afterwards. + */ + public boolean hasError() { + return error; + } + + static int readCount = 0; + + /** + * Seek to somewhere random and read. + */ + private void read(int start, int len) throws Exception { + assertTrue( + "Bad args: " + start + " + " + len + " should be <= " + fileSize, + start + len <= fileSize); + readCount++; + DFSInputStream dis = testInfo.dis; + + byte buf[] = new byte[len]; + helper.read(dis, buf, start, len); + + verifyData("Read data corrupted", buf, start, start + len); + } + + /** + * Positional read. + */ + private void pRead(int start, int len) throws Exception { + assertTrue( + "Bad args: " + start + " + " + len + " should be <= " + fileSize, + start + len <= fileSize); + DFSInputStream dis = testInfo.dis; + + byte buf[] = new byte[len]; + helper.pRead(dis, buf, start, len); + + verifyData("Pread data corrupted", buf, start, start + len); + } + + /** + * Verify read data vs authentic data + */ + private void verifyData(String msg, byte actual[], int start, int end) + throws Exception { + byte auth[] = testInfo.authenticData; + if (end > auth.length) { + throw new Exception(msg + ": Actual array (" + end + + ") is past the end of authentic data (" + + auth.length + ")"); + } + + int j = start; + for (int i = 0; i < actual.length; ++i, ++j) { + if (auth[j] != actual[i]) { + throw new Exception(msg + ": Arrays byte " + i + " (at offset " + + j + ") differs: expect " + + auth[j] + " got " + actual[i]); + } + } + } + } + + /** + * Start the parallel read with the given parameters. + */ + boolean runParallelRead(int nFiles, int nWorkerEach, ReadWorkerHelper helper) throws IOException { + ReadWorker workers[] = new ReadWorker[nFiles * nWorkerEach]; + TestFileInfo testInfoArr[] = new TestFileInfo[nFiles]; + + // Prepare the files and workers + int nWorkers = 0; + for (int i = 0; i < nFiles; ++i) { + TestFileInfo testInfo = new TestFileInfo(); + testInfoArr[i] = testInfo; + + testInfo.filepath = new Path("/TestParallelRead.dat." + i); + testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K); + testInfo.dis = dfsClient.open(testInfo.filepath.toString()); + + for (int j = 0; j < nWorkerEach; ++j) { + workers[nWorkers++] = new ReadWorker(testInfo, nWorkers, helper); + } + } + + // Start the workers and wait + long starttime = System.currentTimeMillis(); + for (ReadWorker worker : workers) { + worker.start(); + } + + for (ReadWorker worker : workers) { + try { + worker.join(); + } catch (InterruptedException ignored) { } + } + long endtime = System.currentTimeMillis(); + + // Cleanup + for (TestFileInfo testInfo : testInfoArr) { + testInfo.dis.close(); + } + + // Report + boolean res = true; + long totalRead = 0; + for (ReadWorker worker : workers) { + long nread = worker.getBytesRead(); + LOG.info("--- Report: " + worker.getName() + " read " + nread + " B; " + + "average " + nread / ReadWorker.N_ITERATIONS + " B per read"); + totalRead += nread; + if (worker.hasError()) { + res = false; + } + } + + double timeTakenSec = (endtime - starttime) / 1000.0; + long totalReadKB = totalRead / 1024; + LOG.info("=== Report: " + nWorkers + " threads read " + + totalReadKB + " KB (across " + + nFiles + " file(s)) in " + + timeTakenSec + "s; average " + + totalReadKB / timeTakenSec + " KB/s"); + + return res; + } + + /** + * Runs a standard workload using a helper class which provides the read + * implementation to use. + */ + public void runTestWorkload(ReadWorkerHelper helper) throws IOException { + if (!runParallelRead(1, 4, helper)) { + fail("Check log for errors"); + } + if (!runParallelRead(1, 16, helper)) { + fail("Check log for errors"); + } + if (!runParallelRead(2, 4, helper)) { + fail("Check log for errors"); + } + } + + public static void teardownCluster() throws Exception { + util.shutdown(); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java index f4052bb148b..ccac0e8c44b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue; import java.io.EOFException; import java.io.IOException; +import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import org.apache.hadoop.conf.Configuration; @@ -28,6 +29,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream; import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -63,7 +65,7 @@ public class TestShortCircuitLocalRead { throws IOException { FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf().getInt("io.file.buffer.size", 4096), - (short)repl, (long)blockSize); + (short)repl, blockSize); return stm; } @@ -112,6 +114,43 @@ public class TestShortCircuitLocalRead { stm.close(); } + /** + * Verifies that reading a file with the direct read(ByteBuffer) api gives the expected set of bytes. + */ + static void checkFileContentDirect(FileSystem fs, Path name, byte[] expected, + int readOffset) throws IOException { + DFSDataInputStream stm = (DFSDataInputStream)fs.open(name); + + ByteBuffer actual = ByteBuffer.allocate(expected.length - readOffset); + + long skipped = stm.skip(readOffset); + Assert.assertEquals(skipped, readOffset); + + actual.limit(3); + + //Read a small number of bytes first. + int nread = stm.read(actual); + actual.limit(nread + 2); + nread += stm.read(actual); + + // Read across chunk boundary + actual.limit(Math.min(actual.capacity(), nread + 517)); + nread += stm.read(actual); + checkData(actual.array(), readOffset, expected, nread, "A few bytes"); + //Now read rest of it + actual.limit(actual.capacity()); + while (actual.hasRemaining()) { + int nbytes = stm.read(actual); + + if (nbytes < 0) { + throw new EOFException("End of file reached before reading fully."); + } + nread += nbytes; + } + checkData(actual.array(), readOffset, expected, "Read 3"); + stm.close(); + } + /** * Test that file data can be read by reading the block file * directly from the local store. @@ -145,6 +184,7 @@ public class TestShortCircuitLocalRead { stm.write(fileData); stm.close(); checkFileContent(fs, file1, fileData, readOffset); + checkFileContentDirect(fs, file1, fileData, readOffset); } finally { fs.close(); cluster.shutdown(); @@ -328,6 +368,7 @@ public class TestShortCircuitLocalRead { Thread[] threads = new Thread[threadCount]; for (int i = 0; i < threadCount; i++) { threads[i] = new Thread() { + @Override public void run() { for (int i = 0; i < iteration; i++) { try {