HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream. Contributed by Henry Robinson.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1303474 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7b3a98cfcd
commit
f55a1c0876
|
@ -57,6 +57,9 @@ Trunk (unreleased changes)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
|
HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream.
|
||||||
|
(Henry Robinson via todd)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
HDFS-2299. TestOfflineEditsViewer is failing on trunk. (Uma Maheswara Rao G
|
HDFS-2299. TestOfflineEditsViewer is failing on trunk. (Uma Maheswara Rao G
|
||||||
|
|
|
@ -20,11 +20,13 @@ package org.apache.hadoop.hdfs;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.ByteBufferReadable;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A BlockReader is responsible for reading a single block
|
* A BlockReader is responsible for reading a single block
|
||||||
* from a single datanode.
|
* from a single datanode.
|
||||||
*/
|
*/
|
||||||
public interface BlockReader {
|
public interface BlockReader extends ByteBufferReadable {
|
||||||
|
|
||||||
/* same interface as inputStream java.io.InputStream#read()
|
/* same interface as inputStream java.io.InputStream#read()
|
||||||
* used by DFSInputStream#read()
|
* used by DFSInputStream#read()
|
||||||
|
|
|
@ -118,20 +118,32 @@ class BlockReaderLocal implements BlockReader {
|
||||||
private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
|
private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
|
||||||
|
|
||||||
private final FileInputStream dataIn; // reader for the data file
|
private final FileInputStream dataIn; // reader for the data file
|
||||||
private FileInputStream checksumIn; // reader for the checksum file
|
private final FileInputStream checksumIn; // reader for the checksum file
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Offset from the most recent chunk boundary at which the next read should
|
||||||
|
* take place. Is only set to non-zero at construction time, and is
|
||||||
|
* decremented (usually to 0) by subsequent reads. This avoids having to do a
|
||||||
|
* checksum read at construction to position the read cursor correctly.
|
||||||
|
*/
|
||||||
private int offsetFromChunkBoundary;
|
private int offsetFromChunkBoundary;
|
||||||
|
|
||||||
private byte[] skipBuf = null;
|
private byte[] skipBuf = null;
|
||||||
private ByteBuffer dataBuff = null;
|
|
||||||
|
/**
|
||||||
|
* Used for checksummed reads that need to be staged before copying to their
|
||||||
|
* output buffer because they are either a) smaller than the checksum chunk
|
||||||
|
* size or b) issued by the slower read(byte[]...) path
|
||||||
|
*/
|
||||||
|
private ByteBuffer slowReadBuff = null;
|
||||||
private ByteBuffer checksumBuff = null;
|
private ByteBuffer checksumBuff = null;
|
||||||
private DataChecksum checksum;
|
private DataChecksum checksum;
|
||||||
private final boolean verifyChecksum;
|
private final boolean verifyChecksum;
|
||||||
|
|
||||||
private static DirectBufferPool bufferPool = new DirectBufferPool();
|
private static DirectBufferPool bufferPool = new DirectBufferPool();
|
||||||
|
|
||||||
private int bytesPerChecksum;
|
private final int bytesPerChecksum;
|
||||||
private int checksumSize;
|
private final int checksumSize;
|
||||||
|
|
||||||
/** offset in block where reader wants to actually read */
|
/** offset in block where reader wants to actually read */
|
||||||
private long startOffset;
|
private long startOffset;
|
||||||
|
@ -170,7 +182,7 @@ class BlockReaderLocal implements BlockReader {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("New BlockReaderLocal for file " + blkfile + " of size "
|
LOG.debug("New BlockReaderLocal for file " + blkfile + " of size "
|
||||||
+ blkfile.length() + " startOffset " + startOffset + " length "
|
+ blkfile.length() + " startOffset " + startOffset + " length "
|
||||||
+ length + " short circuit checksum " + skipChecksumCheck);
|
+ length + " short circuit checksum " + !skipChecksumCheck);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!skipChecksumCheck) {
|
if (!skipChecksumCheck) {
|
||||||
|
@ -254,6 +266,20 @@ class BlockReaderLocal implements BlockReader {
|
||||||
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
|
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static int getSlowReadBufferNumChunks(Configuration conf, int bytesPerChecksum) {
|
||||||
|
int bufferSizeBytes = conf.getInt(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY,
|
||||||
|
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT);
|
||||||
|
|
||||||
|
if (bufferSizeBytes < bytesPerChecksum) {
|
||||||
|
throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" + bufferSizeBytes + ") " +
|
||||||
|
"is not large enough to hold a single chunk (" + bytesPerChecksum + "). Please configure " +
|
||||||
|
DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY + " appropriately");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Round down to nearest chunk size
|
||||||
|
return bufferSizeBytes / bytesPerChecksum;
|
||||||
|
}
|
||||||
|
|
||||||
private BlockReaderLocal(Configuration conf, String hdfsfile,
|
private BlockReaderLocal(Configuration conf, String hdfsfile,
|
||||||
ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
|
ExtendedBlock block, Token<BlockTokenIdentifier> token, long startOffset,
|
||||||
long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
|
long length, BlockLocalPathInfo pathinfo, FileInputStream dataIn)
|
||||||
|
@ -279,11 +305,16 @@ class BlockReaderLocal implements BlockReader {
|
||||||
this.dataIn = dataIn;
|
this.dataIn = dataIn;
|
||||||
this.checksumIn = checksumIn;
|
this.checksumIn = checksumIn;
|
||||||
this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
|
this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
|
||||||
dataBuff = bufferPool.getBuffer(bytesPerChecksum*64);
|
|
||||||
checksumBuff = bufferPool.getBuffer(checksumSize*64);
|
int chunksPerChecksumRead = getSlowReadBufferNumChunks(conf, bytesPerChecksum);
|
||||||
|
slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
|
||||||
|
checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
|
||||||
// Initially the buffers have nothing to read.
|
// Initially the buffers have nothing to read.
|
||||||
dataBuff.flip();
|
slowReadBuff.flip();
|
||||||
checksumBuff.flip();
|
checksumBuff.flip();
|
||||||
|
boolean success = false;
|
||||||
|
try {
|
||||||
|
// Skip both input streams to beginning of the chunk containing startOffset
|
||||||
long toSkip = firstChunkOffset;
|
long toSkip = firstChunkOffset;
|
||||||
while (toSkip > 0) {
|
while (toSkip > 0) {
|
||||||
long skipped = dataIn.skip(toSkip);
|
long skipped = dataIn.skip(toSkip);
|
||||||
|
@ -293,8 +324,7 @@ class BlockReaderLocal implements BlockReader {
|
||||||
toSkip -= skipped;
|
toSkip -= skipped;
|
||||||
}
|
}
|
||||||
if (checksumIn != null) {
|
if (checksumIn != null) {
|
||||||
long checkSumOffset = (firstChunkOffset / bytesPerChecksum)
|
long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
|
||||||
* checksumSize;
|
|
||||||
while (checkSumOffset > 0) {
|
while (checkSumOffset > 0) {
|
||||||
long skipped = checksumIn.skip(checkSumOffset);
|
long skipped = checksumIn.skip(checkSumOffset);
|
||||||
if (skipped == 0) {
|
if (skipped == 0) {
|
||||||
|
@ -303,9 +333,19 @@ class BlockReaderLocal implements BlockReader {
|
||||||
checkSumOffset -= skipped;
|
checkSumOffset -= skipped;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
success = true;
|
||||||
|
} finally {
|
||||||
|
if (!success) {
|
||||||
|
bufferPool.returnBuffer(slowReadBuff);
|
||||||
|
bufferPool.returnBuffer(checksumBuff);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private int readIntoBuffer(FileInputStream stream, ByteBuffer buf)
|
/**
|
||||||
|
* Reads bytes into a buffer until EOF or the buffer's limit is reached
|
||||||
|
*/
|
||||||
|
private int fillBuffer(FileInputStream stream, ByteBuffer buf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
int bytesRead = stream.getChannel().read(buf);
|
int bytesRead = stream.getChannel().read(buf);
|
||||||
if (bytesRead < 0) {
|
if (bytesRead < 0) {
|
||||||
|
@ -323,45 +363,229 @@ class BlockReaderLocal implements BlockReader {
|
||||||
return bytesRead;
|
return bytesRead;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility method used by read(ByteBuffer) to partially copy a ByteBuffer into
|
||||||
|
* another.
|
||||||
|
*/
|
||||||
|
private void writeSlice(ByteBuffer from, ByteBuffer to, int length) {
|
||||||
|
int oldLimit = from.limit();
|
||||||
|
from.limit(from.position() + length);
|
||||||
|
try {
|
||||||
|
to.put(from);
|
||||||
|
} finally {
|
||||||
|
from.limit(oldLimit);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized int read(ByteBuffer buf) throws IOException {
|
||||||
|
int nRead = 0;
|
||||||
|
if (verifyChecksum) {
|
||||||
|
// A 'direct' read actually has three phases. The first drains any
|
||||||
|
// remaining bytes from the slow read buffer. After this the read is
|
||||||
|
// guaranteed to be on a checksum chunk boundary. If there are still bytes
|
||||||
|
// to read, the fast direct path is used for as many remaining bytes as
|
||||||
|
// possible, up to a multiple of the checksum chunk size. Finally, any
|
||||||
|
// 'odd' bytes remaining at the end of the read cause another slow read to
|
||||||
|
// be issued, which involves an extra copy.
|
||||||
|
|
||||||
|
// Every 'slow' read tries to fill the slow read buffer in one go for
|
||||||
|
// efficiency's sake. As described above, all non-checksum-chunk-aligned
|
||||||
|
// reads will be served from the slower read path.
|
||||||
|
|
||||||
|
if (slowReadBuff.hasRemaining()) {
|
||||||
|
// There are remaining bytes from a small read available. This usually
|
||||||
|
// means this read is unaligned, which falls back to the slow path.
|
||||||
|
int fromSlowReadBuff = Math.min(buf.remaining(), slowReadBuff.remaining());
|
||||||
|
writeSlice(slowReadBuff, buf, fromSlowReadBuff);
|
||||||
|
nRead += fromSlowReadBuff;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (buf.remaining() >= bytesPerChecksum && offsetFromChunkBoundary == 0) {
|
||||||
|
// Since we have drained the 'small read' buffer, we are guaranteed to
|
||||||
|
// be chunk-aligned
|
||||||
|
int len = buf.remaining() - (buf.remaining() % bytesPerChecksum);
|
||||||
|
|
||||||
|
// There's only enough checksum buffer space available to checksum one
|
||||||
|
// entire slow read buffer. This saves keeping the number of checksum
|
||||||
|
// chunks around.
|
||||||
|
len = Math.min(len, slowReadBuff.capacity());
|
||||||
|
int oldlimit = buf.limit();
|
||||||
|
buf.limit(buf.position() + len);
|
||||||
|
int readResult = 0;
|
||||||
|
try {
|
||||||
|
readResult = doByteBufferRead(buf);
|
||||||
|
} finally {
|
||||||
|
buf.limit(oldlimit);
|
||||||
|
}
|
||||||
|
if (readResult == -1) {
|
||||||
|
return nRead;
|
||||||
|
} else {
|
||||||
|
nRead += readResult;
|
||||||
|
buf.position(buf.position() + readResult);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// offsetFromChunkBoundary > 0 => unaligned read, use slow path to read
|
||||||
|
// until chunk boundary
|
||||||
|
if ((buf.remaining() > 0 && buf.remaining() < bytesPerChecksum) || offsetFromChunkBoundary > 0) {
|
||||||
|
int toRead = Math.min(buf.remaining(), bytesPerChecksum - offsetFromChunkBoundary);
|
||||||
|
int readResult = fillSlowReadBuffer(toRead);
|
||||||
|
if (readResult == -1) {
|
||||||
|
return nRead;
|
||||||
|
} else {
|
||||||
|
int fromSlowReadBuff = Math.min(readResult, buf.remaining());
|
||||||
|
writeSlice(slowReadBuff, buf, fromSlowReadBuff);
|
||||||
|
nRead += fromSlowReadBuff;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Non-checksummed reads are much easier; we can just fill the buffer directly.
|
||||||
|
nRead = doByteBufferRead(buf);
|
||||||
|
if (nRead > 0) {
|
||||||
|
buf.position(buf.position() + nRead);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nRead;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tries to read as many bytes as possible into supplied buffer, checksumming
|
||||||
|
* each chunk if needed.
|
||||||
|
*
|
||||||
|
* <b>Preconditions:</b>
|
||||||
|
* <ul>
|
||||||
|
* <li>
|
||||||
|
* If checksumming is enabled, buf.remaining must be a multiple of
|
||||||
|
* bytesPerChecksum. Note that this is not a requirement for clients of
|
||||||
|
* read(ByteBuffer) - in the case of non-checksum-sized read requests,
|
||||||
|
* read(ByteBuffer) will substitute a suitably sized buffer to pass to this
|
||||||
|
* method.
|
||||||
|
* </li>
|
||||||
|
* </ul>
|
||||||
|
* <b>Postconditions:</b>
|
||||||
|
* <ul>
|
||||||
|
* <li>buf.limit and buf.mark are unchanged.</li>
|
||||||
|
* <li>buf.position += min(offsetFromChunkBoundary, totalBytesRead) - so the
|
||||||
|
* requested bytes can be read straight from the buffer</li>
|
||||||
|
* </ul>
|
||||||
|
*
|
||||||
|
* @param buf
|
||||||
|
* byte buffer to write bytes to. If checksums are not required, buf
|
||||||
|
* can have any number of bytes remaining, otherwise there must be a
|
||||||
|
* multiple of the checksum chunk size remaining.
|
||||||
|
* @return <tt>max(min(totalBytesRead, len) - offsetFromChunkBoundary, 0)</tt>
|
||||||
|
* that is, the the number of useful bytes (up to the amount
|
||||||
|
* requested) readable from the buffer by the client.
|
||||||
|
*/
|
||||||
|
private synchronized int doByteBufferRead(ByteBuffer buf) throws IOException {
|
||||||
|
if (verifyChecksum) {
|
||||||
|
assert buf.remaining() % bytesPerChecksum == 0;
|
||||||
|
}
|
||||||
|
int dataRead = -1;
|
||||||
|
|
||||||
|
int oldpos = buf.position();
|
||||||
|
// Read as much as we can into the buffer.
|
||||||
|
dataRead = fillBuffer(dataIn, buf);
|
||||||
|
|
||||||
|
if (dataRead == -1) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (verifyChecksum) {
|
||||||
|
ByteBuffer toChecksum = buf.duplicate();
|
||||||
|
toChecksum.position(oldpos);
|
||||||
|
toChecksum.limit(oldpos + dataRead);
|
||||||
|
|
||||||
|
checksumBuff.clear();
|
||||||
|
// Equivalent to (int)Math.ceil(toChecksum.remaining() * 1.0 / bytesPerChecksum );
|
||||||
|
int numChunks =
|
||||||
|
(toChecksum.remaining() + bytesPerChecksum - 1) / bytesPerChecksum;
|
||||||
|
checksumBuff.limit(checksumSize * numChunks);
|
||||||
|
|
||||||
|
fillBuffer(checksumIn, checksumBuff);
|
||||||
|
checksumBuff.flip();
|
||||||
|
|
||||||
|
checksum.verifyChunkedSums(toChecksum, checksumBuff, filename,
|
||||||
|
this.startOffset);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dataRead >= 0) {
|
||||||
|
buf.position(oldpos + Math.min(offsetFromChunkBoundary, dataRead));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dataRead < offsetFromChunkBoundary) {
|
||||||
|
// yikes, didn't even get enough bytes to honour offset. This can happen
|
||||||
|
// even if we are verifying checksums if we are at EOF.
|
||||||
|
offsetFromChunkBoundary -= dataRead;
|
||||||
|
dataRead = 0;
|
||||||
|
} else {
|
||||||
|
dataRead -= offsetFromChunkBoundary;
|
||||||
|
offsetFromChunkBoundary = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
return dataRead;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ensures that up to len bytes are available and checksummed in the slow read
|
||||||
|
* buffer. The number of bytes available to read is returned. If the buffer is
|
||||||
|
* not already empty, the number of remaining bytes is returned and no actual
|
||||||
|
* read happens.
|
||||||
|
*
|
||||||
|
* @param len
|
||||||
|
* the maximum number of bytes to make available. After len bytes
|
||||||
|
* are read, the underlying bytestream <b>must</b> be at a checksum
|
||||||
|
* boundary, or EOF. That is, (len + currentPosition) %
|
||||||
|
* bytesPerChecksum == 0.
|
||||||
|
* @return the number of bytes available to read, or -1 if EOF.
|
||||||
|
*/
|
||||||
|
private synchronized int fillSlowReadBuffer(int len) throws IOException {
|
||||||
|
int nRead = -1;
|
||||||
|
if (slowReadBuff.hasRemaining()) {
|
||||||
|
// Already got data, good to go.
|
||||||
|
nRead = Math.min(len, slowReadBuff.remaining());
|
||||||
|
} else {
|
||||||
|
// Round a complete read of len bytes (plus any implicit offset) to the
|
||||||
|
// next chunk boundary, since we try and read in multiples of a chunk
|
||||||
|
int nextChunk = len + offsetFromChunkBoundary +
|
||||||
|
(bytesPerChecksum - ((len + offsetFromChunkBoundary) % bytesPerChecksum));
|
||||||
|
int limit = Math.min(nextChunk, slowReadBuff.capacity());
|
||||||
|
assert limit % bytesPerChecksum == 0;
|
||||||
|
|
||||||
|
slowReadBuff.clear();
|
||||||
|
slowReadBuff.limit(limit);
|
||||||
|
|
||||||
|
nRead = doByteBufferRead(slowReadBuff);
|
||||||
|
|
||||||
|
if (nRead > 0) {
|
||||||
|
// So that next time we call slowReadBuff.hasRemaining(), we don't get a
|
||||||
|
// false positive.
|
||||||
|
slowReadBuff.limit(nRead + slowReadBuff.position());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nRead;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized int read(byte[] buf, int off, int len) throws IOException {
|
public synchronized int read(byte[] buf, int off, int len) throws IOException {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.info("read off " + off + " len " + len);
|
LOG.trace("read off " + off + " len " + len);
|
||||||
}
|
}
|
||||||
if (!verifyChecksum) {
|
if (!verifyChecksum) {
|
||||||
return dataIn.read(buf, off, len);
|
return dataIn.read(buf, off, len);
|
||||||
} else {
|
|
||||||
int dataRead = -1;
|
|
||||||
if (dataBuff.remaining() == 0) {
|
|
||||||
dataBuff.clear();
|
|
||||||
checksumBuff.clear();
|
|
||||||
dataRead = readIntoBuffer(dataIn, dataBuff);
|
|
||||||
readIntoBuffer(checksumIn, checksumBuff);
|
|
||||||
checksumBuff.flip();
|
|
||||||
dataBuff.flip();
|
|
||||||
checksum.verifyChunkedSums(dataBuff, checksumBuff, filename,
|
|
||||||
this.startOffset);
|
|
||||||
} else {
|
|
||||||
dataRead = dataBuff.remaining();
|
|
||||||
}
|
|
||||||
if (dataRead > 0) {
|
|
||||||
int nRead = Math.min(dataRead - offsetFromChunkBoundary, len);
|
|
||||||
if (offsetFromChunkBoundary > 0) {
|
|
||||||
dataBuff.position(offsetFromChunkBoundary);
|
|
||||||
// Its either end of file or dataRead is greater than the
|
|
||||||
// offsetFromChunkBoundary
|
|
||||||
offsetFromChunkBoundary = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int nRead = fillSlowReadBuffer(slowReadBuff.capacity());
|
||||||
|
|
||||||
if (nRead > 0) {
|
if (nRead > 0) {
|
||||||
dataBuff.get(buf, off, nRead);
|
// Possible that buffer is filled with a larger read than we need, since
|
||||||
|
// we tried to read as much as possible at once
|
||||||
|
nRead = Math.min(len, nRead);
|
||||||
|
slowReadBuff.get(buf, off, nRead);
|
||||||
|
}
|
||||||
|
|
||||||
return nRead;
|
return nRead;
|
||||||
} else {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -377,20 +601,20 @@ class BlockReaderLocal implements BlockReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
// caller made sure newPosition is not beyond EOF.
|
// caller made sure newPosition is not beyond EOF.
|
||||||
int remaining = dataBuff.remaining();
|
int remaining = slowReadBuff.remaining();
|
||||||
int position = dataBuff.position();
|
int position = slowReadBuff.position();
|
||||||
int newPosition = position + (int)n;
|
int newPosition = position + (int)n;
|
||||||
|
|
||||||
// if the new offset is already read into dataBuff, just reposition
|
// if the new offset is already read into dataBuff, just reposition
|
||||||
if (n <= remaining) {
|
if (n <= remaining) {
|
||||||
assert offsetFromChunkBoundary == 0;
|
assert offsetFromChunkBoundary == 0;
|
||||||
dataBuff.position(newPosition);
|
slowReadBuff.position(newPosition);
|
||||||
return n;
|
return n;
|
||||||
}
|
}
|
||||||
|
|
||||||
// for small gap, read through to keep the data/checksum in sync
|
// for small gap, read through to keep the data/checksum in sync
|
||||||
if (n - remaining <= bytesPerChecksum) {
|
if (n - remaining <= bytesPerChecksum) {
|
||||||
dataBuff.position(position + remaining);
|
slowReadBuff.position(position + remaining);
|
||||||
if (skipBuf == null) {
|
if (skipBuf == null) {
|
||||||
skipBuf = new byte[bytesPerChecksum];
|
skipBuf = new byte[bytesPerChecksum];
|
||||||
}
|
}
|
||||||
|
@ -401,11 +625,16 @@ class BlockReaderLocal implements BlockReader {
|
||||||
// optimize for big gap: discard the current buffer, skip to
|
// optimize for big gap: discard the current buffer, skip to
|
||||||
// the beginning of the appropriate checksum chunk and then
|
// the beginning of the appropriate checksum chunk and then
|
||||||
// read to the middle of that chunk to be in sync with checksums.
|
// read to the middle of that chunk to be in sync with checksums.
|
||||||
this.offsetFromChunkBoundary = newPosition % bytesPerChecksum;
|
|
||||||
long toskip = n - remaining - this.offsetFromChunkBoundary;
|
|
||||||
|
|
||||||
dataBuff.clear();
|
// We can't use this.offsetFromChunkBoundary because we need to know how
|
||||||
checksumBuff.clear();
|
// many bytes of the offset were really read. Calling read(..) with a
|
||||||
|
// positive this.offsetFromChunkBoundary causes that many bytes to get
|
||||||
|
// silently skipped.
|
||||||
|
int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum;
|
||||||
|
long toskip = n - remaining - myOffsetFromChunkBoundary;
|
||||||
|
|
||||||
|
slowReadBuff.position(slowReadBuff.limit());
|
||||||
|
checksumBuff.position(checksumBuff.limit());
|
||||||
|
|
||||||
long dataSkipped = dataIn.skip(toskip);
|
long dataSkipped = dataIn.skip(toskip);
|
||||||
if (dataSkipped != toskip) {
|
if (dataSkipped != toskip) {
|
||||||
|
@ -424,8 +653,10 @@ class BlockReaderLocal implements BlockReader {
|
||||||
skipBuf = new byte[bytesPerChecksum];
|
skipBuf = new byte[bytesPerChecksum];
|
||||||
}
|
}
|
||||||
assert skipBuf.length == bytesPerChecksum;
|
assert skipBuf.length == bytesPerChecksum;
|
||||||
assert this.offsetFromChunkBoundary < bytesPerChecksum;
|
assert myOffsetFromChunkBoundary < bytesPerChecksum;
|
||||||
int ret = read(skipBuf, 0, this.offsetFromChunkBoundary);
|
|
||||||
|
int ret = read(skipBuf, 0, myOffsetFromChunkBoundary);
|
||||||
|
|
||||||
if (ret == -1) { // EOS
|
if (ret == -1) { // EOS
|
||||||
return toskip;
|
return toskip;
|
||||||
} else {
|
} else {
|
||||||
|
@ -439,9 +670,9 @@ class BlockReaderLocal implements BlockReader {
|
||||||
if (checksumIn != null) {
|
if (checksumIn != null) {
|
||||||
checksumIn.close();
|
checksumIn.close();
|
||||||
}
|
}
|
||||||
if (dataBuff != null) {
|
if (slowReadBuff != null) {
|
||||||
bufferPool.returnBuffer(dataBuff);
|
bufferPool.returnBuffer(slowReadBuff);
|
||||||
dataBuff = null;
|
slowReadBuff = null;
|
||||||
}
|
}
|
||||||
if (checksumBuff != null) {
|
if (checksumBuff != null) {
|
||||||
bufferPool.returnBuffer(checksumBuff);
|
bufferPool.returnBuffer(checksumBuff);
|
||||||
|
|
|
@ -287,6 +287,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = false;
|
public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT = false;
|
||||||
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";
|
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY = "dfs.client.read.shortcircuit.skip.checksum";
|
||||||
public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
|
public static final boolean DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_DEFAULT = false;
|
||||||
|
public static final String DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY = "dfs.client.read.shortcircuit.buffer.size";
|
||||||
|
public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024;
|
||||||
|
|
||||||
// property for fsimage compression
|
// property for fsimage compression
|
||||||
public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
|
public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.Socket;
|
import java.net.Socket;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.util.AbstractMap;
|
import java.util.AbstractMap;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -33,6 +34,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.ChecksumException;
|
import org.apache.hadoop.fs.ChecksumException;
|
||||||
|
import org.apache.hadoop.fs.ByteBufferReadable;
|
||||||
import org.apache.hadoop.fs.FSInputStream;
|
import org.apache.hadoop.fs.FSInputStream;
|
||||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||||
|
@ -54,16 +56,16 @@ import org.apache.hadoop.security.token.Token;
|
||||||
* negotiation of the namenode and various datanodes as necessary.
|
* negotiation of the namenode and various datanodes as necessary.
|
||||||
****************************************************************/
|
****************************************************************/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class DFSInputStream extends FSInputStream {
|
public class DFSInputStream extends FSInputStream implements ByteBufferReadable {
|
||||||
private final SocketCache socketCache;
|
private final SocketCache socketCache;
|
||||||
|
|
||||||
private final DFSClient dfsClient;
|
private final DFSClient dfsClient;
|
||||||
private boolean closed = false;
|
private boolean closed = false;
|
||||||
|
|
||||||
private final String src;
|
private final String src;
|
||||||
private long prefetchSize;
|
private final long prefetchSize;
|
||||||
private BlockReader blockReader = null;
|
private BlockReader blockReader = null;
|
||||||
private boolean verifyChecksum;
|
private final boolean verifyChecksum;
|
||||||
private LocatedBlocks locatedBlocks = null;
|
private LocatedBlocks locatedBlocks = null;
|
||||||
private long lastBlockBeingWrittenLength = 0;
|
private long lastBlockBeingWrittenLength = 0;
|
||||||
private DatanodeInfo currentNode = null;
|
private DatanodeInfo currentNode = null;
|
||||||
|
@ -83,17 +85,17 @@ public class DFSInputStream extends FSInputStream {
|
||||||
* capped at maxBlockAcquireFailures
|
* capped at maxBlockAcquireFailures
|
||||||
*/
|
*/
|
||||||
private int failures = 0;
|
private int failures = 0;
|
||||||
private int timeWindow;
|
private final int timeWindow;
|
||||||
|
|
||||||
/* XXX Use of CocurrentHashMap is temp fix. Need to fix
|
/* XXX Use of CocurrentHashMap is temp fix. Need to fix
|
||||||
* parallel accesses to DFSInputStream (through ptreads) properly */
|
* parallel accesses to DFSInputStream (through ptreads) properly */
|
||||||
private ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes =
|
private final ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes =
|
||||||
new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>();
|
new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>();
|
||||||
private int buffersize = 1;
|
private int buffersize = 1;
|
||||||
|
|
||||||
private byte[] oneByteBuf = new byte[1]; // used for 'int read()'
|
private final byte[] oneByteBuf = new byte[1]; // used for 'int read()'
|
||||||
|
|
||||||
private int nCachedConnRetry;
|
private final int nCachedConnRetry;
|
||||||
|
|
||||||
void addToDeadNodes(DatanodeInfo dnInfo) {
|
void addToDeadNodes(DatanodeInfo dnInfo) {
|
||||||
deadNodes.put(dnInfo, dnInfo);
|
deadNodes.put(dnInfo, dnInfo);
|
||||||
|
@ -466,11 +468,63 @@ public class DFSInputStream extends FSInputStream {
|
||||||
return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
|
return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wraps different possible read implementations so that readBuffer can be
|
||||||
|
* strategy-agnostic.
|
||||||
|
*/
|
||||||
|
private interface ReaderStrategy {
|
||||||
|
public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to read bytes into a byte[]
|
||||||
|
*/
|
||||||
|
private static class ByteArrayStrategy implements ReaderStrategy {
|
||||||
|
final byte[] buf;
|
||||||
|
|
||||||
|
public ByteArrayStrategy(byte[] buf) {
|
||||||
|
this.buf = buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException {
|
||||||
|
return blockReader.read(buf, off, len);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to read bytes into a user-supplied ByteBuffer
|
||||||
|
*/
|
||||||
|
private static class ByteBufferStrategy implements ReaderStrategy {
|
||||||
|
final ByteBuffer buf;
|
||||||
|
ByteBufferStrategy(ByteBuffer buf) {
|
||||||
|
this.buf = buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int doRead(BlockReader blockReader, int off, int len) throws ChecksumException, IOException {
|
||||||
|
int oldpos = buf.position();
|
||||||
|
int oldlimit = buf.limit();
|
||||||
|
boolean success = false;
|
||||||
|
try {
|
||||||
|
int ret = blockReader.read(buf);
|
||||||
|
success = true;
|
||||||
|
return ret;
|
||||||
|
} finally {
|
||||||
|
if (!success) {
|
||||||
|
// Reset to original state so that retries work correctly.
|
||||||
|
buf.position(oldpos);
|
||||||
|
buf.limit(oldlimit);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* This is a used by regular read() and handles ChecksumExceptions.
|
/* This is a used by regular read() and handles ChecksumExceptions.
|
||||||
* name readBuffer() is chosen to imply similarity to readBuffer() in
|
* name readBuffer() is chosen to imply similarity to readBuffer() in
|
||||||
* ChecksumFileSystem
|
* ChecksumFileSystem
|
||||||
*/
|
*/
|
||||||
private synchronized int readBuffer(byte buf[], int off, int len,
|
private synchronized int readBuffer(ReaderStrategy reader, int off, int len,
|
||||||
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
IOException ioe;
|
IOException ioe;
|
||||||
|
@ -486,7 +540,7 @@ public class DFSInputStream extends FSInputStream {
|
||||||
while (true) {
|
while (true) {
|
||||||
// retry as many times as seekToNewSource allows.
|
// retry as many times as seekToNewSource allows.
|
||||||
try {
|
try {
|
||||||
return blockReader.read(buf, off, len);
|
return reader.doRead(blockReader, off, len);
|
||||||
} catch ( ChecksumException ce ) {
|
} catch ( ChecksumException ce ) {
|
||||||
DFSClient.LOG.warn("Found Checksum error for "
|
DFSClient.LOG.warn("Found Checksum error for "
|
||||||
+ getCurrentBlock() + " from " + currentNode.getName()
|
+ getCurrentBlock() + " from " + currentNode.getName()
|
||||||
|
@ -522,11 +576,7 @@ public class DFSInputStream extends FSInputStream {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
|
||||||
* Read the entire buffer.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public synchronized int read(byte buf[], int off, int len) throws IOException {
|
|
||||||
dfsClient.checkOpen();
|
dfsClient.checkOpen();
|
||||||
if (closed) {
|
if (closed) {
|
||||||
throw new IOException("Stream closed");
|
throw new IOException("Stream closed");
|
||||||
|
@ -544,7 +594,7 @@ public class DFSInputStream extends FSInputStream {
|
||||||
currentNode = blockSeekTo(pos);
|
currentNode = blockSeekTo(pos);
|
||||||
}
|
}
|
||||||
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
|
int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
|
||||||
int result = readBuffer(buf, off, realLen, corruptedBlockMap);
|
int result = readBuffer(strategy, off, realLen, corruptedBlockMap);
|
||||||
|
|
||||||
if (result >= 0) {
|
if (result >= 0) {
|
||||||
pos += result;
|
pos += result;
|
||||||
|
@ -578,6 +628,24 @@ public class DFSInputStream extends FSInputStream {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Read the entire buffer.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized int read(final byte buf[], int off, int len) throws IOException {
|
||||||
|
ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf);
|
||||||
|
|
||||||
|
return readWithStrategy(byteArrayReader, off, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized int read(final ByteBuffer buf) throws IOException {
|
||||||
|
ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf);
|
||||||
|
|
||||||
|
return readWithStrategy(byteBufferReader, 0, buf.remaining());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add corrupted block replica into map.
|
* Add corrupted block replica into map.
|
||||||
* @param corruptedBlockMap
|
* @param corruptedBlockMap
|
||||||
|
@ -1052,5 +1120,4 @@ public class DFSInputStream extends FSInputStream {
|
||||||
this.addr = addr;
|
this.addr = addr;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,7 +56,7 @@ import org.apache.hadoop.util.DataChecksum;
|
||||||
public class RemoteBlockReader extends FSInputChecker implements BlockReader {
|
public class RemoteBlockReader extends FSInputChecker implements BlockReader {
|
||||||
|
|
||||||
Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
|
Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
|
||||||
private DataInputStream in;
|
private final DataInputStream in;
|
||||||
private DataChecksum checksum;
|
private DataChecksum checksum;
|
||||||
|
|
||||||
/** offset in block of the last chunk received */
|
/** offset in block of the last chunk received */
|
||||||
|
@ -71,8 +71,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
|
||||||
if startOffset is not chunk-aligned */
|
if startOffset is not chunk-aligned */
|
||||||
private final long firstChunkOffset;
|
private final long firstChunkOffset;
|
||||||
|
|
||||||
private int bytesPerChecksum;
|
private final int bytesPerChecksum;
|
||||||
private int checksumSize;
|
private final int checksumSize;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The total number of bytes we need to transfer from the DN.
|
* The total number of bytes we need to transfer from the DN.
|
||||||
|
@ -479,4 +479,9 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
|
||||||
return s.toString() + ":" + poolId + ":" + blockId;
|
return s.toString() + ":" + poolId + ":" + blockId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(ByteBuffer buf) throws IOException {
|
||||||
|
throw new UnsupportedOperationException("readDirect unsupported in RemoteBlockReader");
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -84,7 +84,7 @@ public class RemoteBlockReader2 implements BlockReader {
|
||||||
static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
|
static final Log LOG = LogFactory.getLog(RemoteBlockReader2.class);
|
||||||
|
|
||||||
Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
|
Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
|
||||||
private ReadableByteChannel in;
|
private final ReadableByteChannel in;
|
||||||
private DataChecksum checksum;
|
private DataChecksum checksum;
|
||||||
|
|
||||||
private PacketHeader curHeader;
|
private PacketHeader curHeader;
|
||||||
|
@ -100,11 +100,11 @@ public class RemoteBlockReader2 implements BlockReader {
|
||||||
private final String filename;
|
private final String filename;
|
||||||
|
|
||||||
private static DirectBufferPool bufferPool = new DirectBufferPool();
|
private static DirectBufferPool bufferPool = new DirectBufferPool();
|
||||||
private ByteBuffer headerBuf = ByteBuffer.allocate(
|
private final ByteBuffer headerBuf = ByteBuffer.allocate(
|
||||||
PacketHeader.PKT_HEADER_LEN);
|
PacketHeader.PKT_HEADER_LEN);
|
||||||
|
|
||||||
private int bytesPerChecksum;
|
private final int bytesPerChecksum;
|
||||||
private int checksumSize;
|
private final int checksumSize;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The total number of bytes we need to transfer from the DN.
|
* The total number of bytes we need to transfer from the DN.
|
||||||
|
@ -140,6 +140,26 @@ public class RemoteBlockReader2 implements BlockReader {
|
||||||
return nRead;
|
return nRead;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(ByteBuffer buf) throws IOException {
|
||||||
|
if (curPacketBuf == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
|
||||||
|
readNextPacket();
|
||||||
|
}
|
||||||
|
if (curDataSlice.remaining() == 0) {
|
||||||
|
// we're at EOF now
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int nRead = Math.min(curDataSlice.remaining(), buf.remaining());
|
||||||
|
ByteBuffer writeSlice = curDataSlice.duplicate();
|
||||||
|
writeSlice.limit(writeSlice.position() + nRead);
|
||||||
|
buf.put(writeSlice);
|
||||||
|
curDataSlice.position(writeSlice.position());
|
||||||
|
|
||||||
|
return nRead;
|
||||||
|
}
|
||||||
|
|
||||||
private void readNextPacket() throws IOException {
|
private void readNextPacket() throws IOException {
|
||||||
Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
|
Preconditions.checkState(curHeader == null || !curHeader.isLastPacketInBlock());
|
||||||
|
|
||||||
|
@ -325,6 +345,7 @@ public class RemoteBlockReader2 implements BlockReader {
|
||||||
/**
|
/**
|
||||||
* Take the socket used to talk to the DN.
|
* Take the socket used to talk to the DN.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public Socket takeSocket() {
|
public Socket takeSocket() {
|
||||||
assert hasSentStatusCode() :
|
assert hasSentStatusCode() :
|
||||||
"BlockReader shouldn't give back sockets mid-read";
|
"BlockReader shouldn't give back sockets mid-read";
|
||||||
|
@ -337,6 +358,7 @@ public class RemoteBlockReader2 implements BlockReader {
|
||||||
* Whether the BlockReader has reached the end of its input stream
|
* Whether the BlockReader has reached the end of its input stream
|
||||||
* and successfully sent a status code back to the datanode.
|
* and successfully sent a status code back to the datanode.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public boolean hasSentStatusCode() {
|
public boolean hasSentStatusCode() {
|
||||||
return sentStatusCode;
|
return sentStatusCode;
|
||||||
}
|
}
|
||||||
|
|
|
@ -49,7 +49,11 @@ public class BlockReaderTestUtil {
|
||||||
* Setup the cluster
|
* Setup the cluster
|
||||||
*/
|
*/
|
||||||
public BlockReaderTestUtil(int replicationFactor) throws Exception {
|
public BlockReaderTestUtil(int replicationFactor) throws Exception {
|
||||||
conf = new HdfsConfiguration();
|
this(replicationFactor, new HdfsConfiguration());
|
||||||
|
}
|
||||||
|
|
||||||
|
public BlockReaderTestUtil(int replicationFactor, HdfsConfiguration config) throws Exception {
|
||||||
|
this.conf = config;
|
||||||
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, replicationFactor);
|
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, replicationFactor);
|
||||||
cluster = new MiniDFSCluster.Builder(conf).format(true).build();
|
cluster = new MiniDFSCluster.Builder(conf).format(true).build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
|
|
|
@ -0,0 +1,106 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.ChecksumException;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestBlockReaderLocal {
|
||||||
|
static MiniDFSCluster cluster;
|
||||||
|
static HdfsConfiguration conf;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupCluster() throws IOException {
|
||||||
|
conf = new HdfsConfiguration();
|
||||||
|
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY,
|
||||||
|
false);
|
||||||
|
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
|
||||||
|
UserGroupInformation.getCurrentUser().getShortUserName());
|
||||||
|
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void teardownCluster() {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that, in the case of an error, the position and limit of a ByteBuffer
|
||||||
|
* are left unchanged. This is not mandated by ByteBufferReadable, but clients
|
||||||
|
* of this class might immediately issue a retry on failure, so it's polite.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testStablePositionAfterCorruptRead() throws IOException {
|
||||||
|
final short REPL_FACTOR = 1;
|
||||||
|
final long FILE_LENGTH = 512L;
|
||||||
|
cluster.waitActive();
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
|
||||||
|
Path path = new Path("/corrupted");
|
||||||
|
|
||||||
|
DFSTestUtil.createFile(fs, path, FILE_LENGTH, REPL_FACTOR, 12345L);
|
||||||
|
DFSTestUtil.waitReplication(fs, path, REPL_FACTOR);
|
||||||
|
|
||||||
|
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, path);
|
||||||
|
int blockFilesCorrupted = cluster.corruptBlockOnDataNodes(block);
|
||||||
|
assertEquals("All replicas not corrupted", REPL_FACTOR, blockFilesCorrupted);
|
||||||
|
|
||||||
|
FSDataInputStream dis = cluster.getFileSystem().open(path);
|
||||||
|
ByteBuffer buf = ByteBuffer.allocateDirect((int)FILE_LENGTH);
|
||||||
|
boolean sawException = false;
|
||||||
|
try {
|
||||||
|
dis.read(buf);
|
||||||
|
} catch (ChecksumException ex) {
|
||||||
|
sawException = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(sawException);
|
||||||
|
assertEquals(0, buf.position());
|
||||||
|
assertEquals(buf.capacity(), buf.limit());
|
||||||
|
|
||||||
|
dis = cluster.getFileSystem().open(path);
|
||||||
|
buf.position(3);
|
||||||
|
buf.limit(25);
|
||||||
|
sawException = false;
|
||||||
|
try {
|
||||||
|
dis.read(buf);
|
||||||
|
} catch (ChecksumException ex) {
|
||||||
|
sawException = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue(sawException);
|
||||||
|
assertEquals(3, buf.position());
|
||||||
|
assertEquals(25, buf.limit());
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,177 +18,21 @@
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Random;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
|
||||||
import org.apache.log4j.Level;
|
|
||||||
import org.apache.log4j.LogManager;
|
|
||||||
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import static org.junit.Assert.*;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
public class TestParallelRead extends TestParallelReadUtil {
|
||||||
* Test the use of DFSInputStream by multiple concurrent readers.
|
|
||||||
*/
|
|
||||||
public class TestParallelRead {
|
|
||||||
|
|
||||||
static final Log LOG = LogFactory.getLog(TestParallelRead.class);
|
|
||||||
static BlockReaderTestUtil util = null;
|
|
||||||
static DFSClient dfsClient = null;
|
|
||||||
static final int FILE_SIZE_K = 256;
|
|
||||||
static Random rand = null;
|
|
||||||
|
|
||||||
static {
|
|
||||||
// The client-trace log ends up causing a lot of blocking threads
|
|
||||||
// in this when it's being used as a performance benchmark.
|
|
||||||
LogManager.getLogger(DataNode.class.getName() + ".clienttrace")
|
|
||||||
.setLevel(Level.WARN);
|
|
||||||
}
|
|
||||||
|
|
||||||
private class TestFileInfo {
|
|
||||||
public DFSInputStream dis;
|
|
||||||
public Path filepath;
|
|
||||||
public byte[] authenticData;
|
|
||||||
}
|
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setupCluster() throws Exception {
|
static public void setupCluster() throws Exception {
|
||||||
final int REPLICATION_FACTOR = 2;
|
setupCluster(DEFAULT_REPLICATION_FACTOR, new HdfsConfiguration());
|
||||||
util = new BlockReaderTestUtil(REPLICATION_FACTOR);
|
|
||||||
dfsClient = util.getDFSClient();
|
|
||||||
rand = new Random(System.currentTimeMillis());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@AfterClass
|
||||||
* A worker to do one "unit" of read.
|
static public void teardownCluster() throws Exception {
|
||||||
*/
|
TestParallelReadUtil.teardownCluster();
|
||||||
static class ReadWorker extends Thread {
|
|
||||||
static public final int N_ITERATIONS = 1024;
|
|
||||||
|
|
||||||
private static final double PROPORTION_NON_POSITIONAL_READ = 0.10;
|
|
||||||
|
|
||||||
private TestFileInfo testInfo;
|
|
||||||
private long fileSize;
|
|
||||||
private long bytesRead;
|
|
||||||
private boolean error;
|
|
||||||
|
|
||||||
ReadWorker(TestFileInfo testInfo, int id) {
|
|
||||||
super("ReadWorker-" + id + "-" + testInfo.filepath.toString());
|
|
||||||
this.testInfo = testInfo;
|
|
||||||
fileSize = testInfo.dis.getFileLength();
|
|
||||||
assertEquals(fileSize, testInfo.authenticData.length);
|
|
||||||
bytesRead = 0;
|
|
||||||
error = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Randomly do one of (1) Small read; and (2) Large Pread.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
for (int i = 0; i < N_ITERATIONS; ++i) {
|
|
||||||
int startOff = rand.nextInt((int) fileSize);
|
|
||||||
int len = 0;
|
|
||||||
try {
|
|
||||||
double p = rand.nextDouble();
|
|
||||||
if (p < PROPORTION_NON_POSITIONAL_READ) {
|
|
||||||
// Do a small regular read. Very likely this will leave unread
|
|
||||||
// data on the socket and make the socket uncacheable.
|
|
||||||
len = Math.min(rand.nextInt(64), (int) fileSize - startOff);
|
|
||||||
read(startOff, len);
|
|
||||||
bytesRead += len;
|
|
||||||
} else {
|
|
||||||
// Do a positional read most of the time.
|
|
||||||
len = rand.nextInt((int) (fileSize - startOff));
|
|
||||||
pRead(startOff, len);
|
|
||||||
bytesRead += len;
|
|
||||||
}
|
|
||||||
} catch (Throwable t) {
|
|
||||||
LOG.error(getName() + ": Error while testing read at " + startOff +
|
|
||||||
" length " + len);
|
|
||||||
error = true;
|
|
||||||
fail(t.getMessage());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getBytesRead() {
|
|
||||||
return bytesRead;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Raising error in a thread doesn't seem to fail the test.
|
|
||||||
* So check afterwards.
|
|
||||||
*/
|
|
||||||
public boolean hasError() {
|
|
||||||
return error;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Seek to somewhere random and read.
|
|
||||||
*/
|
|
||||||
private void read(int start, int len) throws Exception {
|
|
||||||
assertTrue(
|
|
||||||
"Bad args: " + start + " + " + len + " should be <= " + fileSize,
|
|
||||||
start + len <= fileSize);
|
|
||||||
DFSInputStream dis = testInfo.dis;
|
|
||||||
|
|
||||||
synchronized (dis) {
|
|
||||||
dis.seek(start);
|
|
||||||
|
|
||||||
byte buf[] = new byte[len];
|
|
||||||
int cnt = 0;
|
|
||||||
while (cnt < len) {
|
|
||||||
cnt += dis.read(buf, cnt, buf.length - cnt);
|
|
||||||
}
|
|
||||||
verifyData("Read data corrupted", buf, start, start + len);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Positional read.
|
|
||||||
*/
|
|
||||||
private void pRead(int start, int len) throws Exception {
|
|
||||||
assertTrue(
|
|
||||||
"Bad args: " + start + " + " + len + " should be <= " + fileSize,
|
|
||||||
start + len <= fileSize);
|
|
||||||
DFSInputStream dis = testInfo.dis;
|
|
||||||
|
|
||||||
byte buf[] = new byte[len];
|
|
||||||
int cnt = 0;
|
|
||||||
while (cnt < len) {
|
|
||||||
cnt += dis.read(start, buf, cnt, buf.length - cnt);
|
|
||||||
}
|
|
||||||
verifyData("Pread data corrupted", buf, start, start + len);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Verify read data vs authentic data
|
|
||||||
*/
|
|
||||||
private void verifyData(String msg, byte actual[], int start, int end)
|
|
||||||
throws Exception {
|
|
||||||
byte auth[] = testInfo.authenticData;
|
|
||||||
if (end > auth.length) {
|
|
||||||
throw new Exception(msg + ": Actual array (" + end +
|
|
||||||
") is past the end of authentic data (" +
|
|
||||||
auth.length + ")");
|
|
||||||
}
|
|
||||||
|
|
||||||
int j = start;
|
|
||||||
for (int i = 0; i < actual.length; ++i, ++j) {
|
|
||||||
if (auth[j] != actual[i]) {
|
|
||||||
throw new Exception(msg + ": Arrays byte " + i + " (at offset " +
|
|
||||||
j + ") differs: expect " +
|
|
||||||
auth[j] + " got " + actual[i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -199,85 +43,17 @@ public class TestParallelRead {
|
||||||
* need to be manually collected, which is inconvenient.
|
* need to be manually collected, which is inconvenient.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testParallelRead() throws IOException {
|
public void testParallelReadCopying() throws IOException {
|
||||||
if (!runParallelRead(1, 4)) {
|
runTestWorkload(new CopyingReadWorkerHelper());
|
||||||
fail("Check log for errors");
|
|
||||||
}
|
|
||||||
if (!runParallelRead(1, 16)) {
|
|
||||||
fail("Check log for errors");
|
|
||||||
}
|
|
||||||
if (!runParallelRead(2, 4)) {
|
|
||||||
fail("Check log for errors");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
@Test
|
||||||
* Start the parallel read with the given parameters.
|
public void testParallelReadByteBuffer() throws IOException {
|
||||||
*/
|
runTestWorkload(new DirectReadWorkerHelper());
|
||||||
boolean runParallelRead(int nFiles, int nWorkerEach) throws IOException {
|
|
||||||
ReadWorker workers[] = new ReadWorker[nFiles * nWorkerEach];
|
|
||||||
TestFileInfo testInfoArr[] = new TestFileInfo[nFiles];
|
|
||||||
|
|
||||||
// Prepare the files and workers
|
|
||||||
int nWorkers = 0;
|
|
||||||
for (int i = 0; i < nFiles; ++i) {
|
|
||||||
TestFileInfo testInfo = new TestFileInfo();
|
|
||||||
testInfoArr[i] = testInfo;
|
|
||||||
|
|
||||||
testInfo.filepath = new Path("/TestParallelRead.dat." + i);
|
|
||||||
testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K);
|
|
||||||
testInfo.dis = dfsClient.open(testInfo.filepath.toString());
|
|
||||||
|
|
||||||
for (int j = 0; j < nWorkerEach; ++j) {
|
|
||||||
workers[nWorkers++] = new ReadWorker(testInfo, nWorkers);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start the workers and wait
|
@Test
|
||||||
long starttime = System.currentTimeMillis();
|
public void testParallelReadMixed() throws IOException {
|
||||||
for (ReadWorker worker : workers) {
|
runTestWorkload(new MixedWorkloadHelper());
|
||||||
worker.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
for (ReadWorker worker : workers) {
|
|
||||||
try {
|
|
||||||
worker.join();
|
|
||||||
} catch (InterruptedException ignored) { }
|
|
||||||
}
|
|
||||||
long endtime = System.currentTimeMillis();
|
|
||||||
|
|
||||||
// Cleanup
|
|
||||||
for (TestFileInfo testInfo : testInfoArr) {
|
|
||||||
testInfo.dis.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Report
|
|
||||||
boolean res = true;
|
|
||||||
long totalRead = 0;
|
|
||||||
for (ReadWorker worker : workers) {
|
|
||||||
long nread = worker.getBytesRead();
|
|
||||||
LOG.info("--- Report: " + worker.getName() + " read " + nread + " B; " +
|
|
||||||
"average " + nread / ReadWorker.N_ITERATIONS + " B per read");
|
|
||||||
totalRead += nread;
|
|
||||||
if (worker.hasError()) {
|
|
||||||
res = false;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
double timeTakenSec = (endtime - starttime) / 1000.0;
|
|
||||||
long totalReadKB = totalRead / 1024;
|
|
||||||
LOG.info("=== Report: " + nWorkers + " threads read " +
|
|
||||||
totalReadKB + " KB (across " +
|
|
||||||
nFiles + " file(s)) in " +
|
|
||||||
timeTakenSec + "s; average " +
|
|
||||||
totalReadKB / timeTakenSec + " KB/s");
|
|
||||||
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void teardownCluster() throws Exception {
|
|
||||||
util.shutdown();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
|
@ -0,0 +1,385 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Driver class for testing the use of DFSInputStream by multiple concurrent
|
||||||
|
* readers, using the different read APIs. See subclasses for the actual test
|
||||||
|
* cases.
|
||||||
|
*/
|
||||||
|
public class TestParallelReadUtil {
|
||||||
|
|
||||||
|
static final Log LOG = LogFactory.getLog(TestParallelReadUtil.class);
|
||||||
|
static BlockReaderTestUtil util = null;
|
||||||
|
static DFSClient dfsClient = null;
|
||||||
|
static final int FILE_SIZE_K = 256;
|
||||||
|
static Random rand = null;
|
||||||
|
static final int DEFAULT_REPLICATION_FACTOR = 2;
|
||||||
|
|
||||||
|
static {
|
||||||
|
// The client-trace log ends up causing a lot of blocking threads
|
||||||
|
// in this when it's being used as a performance benchmark.
|
||||||
|
LogManager.getLogger(DataNode.class.getName() + ".clienttrace")
|
||||||
|
.setLevel(Level.WARN);
|
||||||
|
}
|
||||||
|
|
||||||
|
private class TestFileInfo {
|
||||||
|
public DFSInputStream dis;
|
||||||
|
public Path filepath;
|
||||||
|
public byte[] authenticData;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void setupCluster(int replicationFactor, HdfsConfiguration conf) throws Exception {
|
||||||
|
util = new BlockReaderTestUtil(replicationFactor, conf);
|
||||||
|
dfsClient = util.getDFSClient();
|
||||||
|
long seed = System.currentTimeMillis();
|
||||||
|
LOG.info("Random seed: " + seed);
|
||||||
|
rand = new Random(seed);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Providers of this interface implement two different read APIs. Instances of
|
||||||
|
* this interface are shared across all ReadWorkerThreads, so should be stateless.
|
||||||
|
*/
|
||||||
|
static interface ReadWorkerHelper {
|
||||||
|
public int read(DFSInputStream dis, byte[] target, int startOff, int len) throws IOException;
|
||||||
|
public int pRead(DFSInputStream dis, byte[] target, int startOff, int len) throws IOException;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Uses read(ByteBuffer...) style APIs
|
||||||
|
*/
|
||||||
|
static class DirectReadWorkerHelper implements ReadWorkerHelper {
|
||||||
|
@Override
|
||||||
|
public int read(DFSInputStream dis, byte[] target, int startOff, int len) throws IOException {
|
||||||
|
ByteBuffer bb = ByteBuffer.wrap(target);
|
||||||
|
int cnt = 0;
|
||||||
|
synchronized(dis) {
|
||||||
|
dis.seek(startOff);
|
||||||
|
while (cnt < len) {
|
||||||
|
int read = dis.read(bb);
|
||||||
|
if (read == -1) {
|
||||||
|
return read;
|
||||||
|
}
|
||||||
|
cnt += read;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return cnt;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int pRead(DFSInputStream dis, byte[] target, int startOff, int len) throws IOException {
|
||||||
|
// No pRead for bb read path
|
||||||
|
return read(dis, target, startOff, len);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Uses the read(byte[]...) style APIs
|
||||||
|
*/
|
||||||
|
static class CopyingReadWorkerHelper implements ReadWorkerHelper {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(DFSInputStream dis, byte[] target, int startOff, int len)
|
||||||
|
throws IOException {
|
||||||
|
int cnt = 0;
|
||||||
|
synchronized(dis) {
|
||||||
|
dis.seek(startOff);
|
||||||
|
while (cnt < len) {
|
||||||
|
int read = dis.read(target, cnt, len - cnt);
|
||||||
|
if (read == -1) {
|
||||||
|
return read;
|
||||||
|
}
|
||||||
|
cnt += read;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return cnt;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int pRead(DFSInputStream dis, byte[] target, int startOff, int len)
|
||||||
|
throws IOException {
|
||||||
|
int cnt = 0;
|
||||||
|
while (cnt < len) {
|
||||||
|
int read = dis.read(startOff, target, cnt, len - cnt);
|
||||||
|
if (read == -1) {
|
||||||
|
return read;
|
||||||
|
}
|
||||||
|
cnt += read;
|
||||||
|
}
|
||||||
|
return cnt;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Uses a mix of both copying
|
||||||
|
*/
|
||||||
|
static class MixedWorkloadHelper implements ReadWorkerHelper {
|
||||||
|
private final DirectReadWorkerHelper bb = new DirectReadWorkerHelper();
|
||||||
|
private final CopyingReadWorkerHelper copy = new CopyingReadWorkerHelper();
|
||||||
|
private final double COPYING_PROBABILITY = 0.5;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(DFSInputStream dis, byte[] target, int startOff, int len)
|
||||||
|
throws IOException {
|
||||||
|
double p = rand.nextDouble();
|
||||||
|
if (p > COPYING_PROBABILITY) {
|
||||||
|
return bb.read(dis, target, startOff, len);
|
||||||
|
} else {
|
||||||
|
return copy.read(dis, target, startOff, len);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int pRead(DFSInputStream dis, byte[] target, int startOff, int len)
|
||||||
|
throws IOException {
|
||||||
|
double p = rand.nextDouble();
|
||||||
|
if (p > COPYING_PROBABILITY) {
|
||||||
|
return bb.pRead(dis, target, startOff, len);
|
||||||
|
} else {
|
||||||
|
return copy.pRead(dis, target, startOff, len);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A worker to do one "unit" of read.
|
||||||
|
*/
|
||||||
|
static class ReadWorker extends Thread {
|
||||||
|
|
||||||
|
static public final int N_ITERATIONS = 1024 * 4;
|
||||||
|
|
||||||
|
private static final double PROPORTION_NON_POSITIONAL_READ = 0.10;
|
||||||
|
|
||||||
|
private final TestFileInfo testInfo;
|
||||||
|
private final long fileSize;
|
||||||
|
private long bytesRead;
|
||||||
|
private boolean error;
|
||||||
|
private final ReadWorkerHelper helper;
|
||||||
|
|
||||||
|
ReadWorker(TestFileInfo testInfo, int id, ReadWorkerHelper helper) {
|
||||||
|
super("ReadWorker-" + id + "-" + testInfo.filepath.toString());
|
||||||
|
this.testInfo = testInfo;
|
||||||
|
this.helper = helper;
|
||||||
|
fileSize = testInfo.dis.getFileLength();
|
||||||
|
assertEquals(fileSize, testInfo.authenticData.length);
|
||||||
|
bytesRead = 0;
|
||||||
|
error = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Randomly do one of (1) Small read; and (2) Large Pread.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
for (int i = 0; i < N_ITERATIONS; ++i) {
|
||||||
|
int startOff = rand.nextInt((int) fileSize);
|
||||||
|
int len = 0;
|
||||||
|
try {
|
||||||
|
double p = rand.nextDouble();
|
||||||
|
if (p < PROPORTION_NON_POSITIONAL_READ) {
|
||||||
|
// Do a small regular read. Very likely this will leave unread
|
||||||
|
// data on the socket and make the socket uncacheable.
|
||||||
|
len = Math.min(rand.nextInt(64), (int) fileSize - startOff);
|
||||||
|
read(startOff, len);
|
||||||
|
bytesRead += len;
|
||||||
|
} else {
|
||||||
|
// Do a positional read most of the time.
|
||||||
|
len = rand.nextInt((int) (fileSize - startOff));
|
||||||
|
pRead(startOff, len);
|
||||||
|
bytesRead += len;
|
||||||
|
}
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.error(getName() + ": Error while testing read at " + startOff +
|
||||||
|
" length " + len, t);
|
||||||
|
error = true;
|
||||||
|
fail(t.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getBytesRead() {
|
||||||
|
return bytesRead;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Raising error in a thread doesn't seem to fail the test.
|
||||||
|
* So check afterwards.
|
||||||
|
*/
|
||||||
|
public boolean hasError() {
|
||||||
|
return error;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int readCount = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Seek to somewhere random and read.
|
||||||
|
*/
|
||||||
|
private void read(int start, int len) throws Exception {
|
||||||
|
assertTrue(
|
||||||
|
"Bad args: " + start + " + " + len + " should be <= " + fileSize,
|
||||||
|
start + len <= fileSize);
|
||||||
|
readCount++;
|
||||||
|
DFSInputStream dis = testInfo.dis;
|
||||||
|
|
||||||
|
byte buf[] = new byte[len];
|
||||||
|
helper.read(dis, buf, start, len);
|
||||||
|
|
||||||
|
verifyData("Read data corrupted", buf, start, start + len);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Positional read.
|
||||||
|
*/
|
||||||
|
private void pRead(int start, int len) throws Exception {
|
||||||
|
assertTrue(
|
||||||
|
"Bad args: " + start + " + " + len + " should be <= " + fileSize,
|
||||||
|
start + len <= fileSize);
|
||||||
|
DFSInputStream dis = testInfo.dis;
|
||||||
|
|
||||||
|
byte buf[] = new byte[len];
|
||||||
|
helper.pRead(dis, buf, start, len);
|
||||||
|
|
||||||
|
verifyData("Pread data corrupted", buf, start, start + len);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify read data vs authentic data
|
||||||
|
*/
|
||||||
|
private void verifyData(String msg, byte actual[], int start, int end)
|
||||||
|
throws Exception {
|
||||||
|
byte auth[] = testInfo.authenticData;
|
||||||
|
if (end > auth.length) {
|
||||||
|
throw new Exception(msg + ": Actual array (" + end +
|
||||||
|
") is past the end of authentic data (" +
|
||||||
|
auth.length + ")");
|
||||||
|
}
|
||||||
|
|
||||||
|
int j = start;
|
||||||
|
for (int i = 0; i < actual.length; ++i, ++j) {
|
||||||
|
if (auth[j] != actual[i]) {
|
||||||
|
throw new Exception(msg + ": Arrays byte " + i + " (at offset " +
|
||||||
|
j + ") differs: expect " +
|
||||||
|
auth[j] + " got " + actual[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start the parallel read with the given parameters.
|
||||||
|
*/
|
||||||
|
boolean runParallelRead(int nFiles, int nWorkerEach, ReadWorkerHelper helper) throws IOException {
|
||||||
|
ReadWorker workers[] = new ReadWorker[nFiles * nWorkerEach];
|
||||||
|
TestFileInfo testInfoArr[] = new TestFileInfo[nFiles];
|
||||||
|
|
||||||
|
// Prepare the files and workers
|
||||||
|
int nWorkers = 0;
|
||||||
|
for (int i = 0; i < nFiles; ++i) {
|
||||||
|
TestFileInfo testInfo = new TestFileInfo();
|
||||||
|
testInfoArr[i] = testInfo;
|
||||||
|
|
||||||
|
testInfo.filepath = new Path("/TestParallelRead.dat." + i);
|
||||||
|
testInfo.authenticData = util.writeFile(testInfo.filepath, FILE_SIZE_K);
|
||||||
|
testInfo.dis = dfsClient.open(testInfo.filepath.toString());
|
||||||
|
|
||||||
|
for (int j = 0; j < nWorkerEach; ++j) {
|
||||||
|
workers[nWorkers++] = new ReadWorker(testInfo, nWorkers, helper);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start the workers and wait
|
||||||
|
long starttime = System.currentTimeMillis();
|
||||||
|
for (ReadWorker worker : workers) {
|
||||||
|
worker.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
for (ReadWorker worker : workers) {
|
||||||
|
try {
|
||||||
|
worker.join();
|
||||||
|
} catch (InterruptedException ignored) { }
|
||||||
|
}
|
||||||
|
long endtime = System.currentTimeMillis();
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
for (TestFileInfo testInfo : testInfoArr) {
|
||||||
|
testInfo.dis.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Report
|
||||||
|
boolean res = true;
|
||||||
|
long totalRead = 0;
|
||||||
|
for (ReadWorker worker : workers) {
|
||||||
|
long nread = worker.getBytesRead();
|
||||||
|
LOG.info("--- Report: " + worker.getName() + " read " + nread + " B; " +
|
||||||
|
"average " + nread / ReadWorker.N_ITERATIONS + " B per read");
|
||||||
|
totalRead += nread;
|
||||||
|
if (worker.hasError()) {
|
||||||
|
res = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
double timeTakenSec = (endtime - starttime) / 1000.0;
|
||||||
|
long totalReadKB = totalRead / 1024;
|
||||||
|
LOG.info("=== Report: " + nWorkers + " threads read " +
|
||||||
|
totalReadKB + " KB (across " +
|
||||||
|
nFiles + " file(s)) in " +
|
||||||
|
timeTakenSec + "s; average " +
|
||||||
|
totalReadKB / timeTakenSec + " KB/s");
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Runs a standard workload using a helper class which provides the read
|
||||||
|
* implementation to use.
|
||||||
|
*/
|
||||||
|
public void runTestWorkload(ReadWorkerHelper helper) throws IOException {
|
||||||
|
if (!runParallelRead(1, 4, helper)) {
|
||||||
|
fail("Check log for errors");
|
||||||
|
}
|
||||||
|
if (!runParallelRead(1, 16, helper)) {
|
||||||
|
fail("Check log for errors");
|
||||||
|
}
|
||||||
|
if (!runParallelRead(2, 4, helper)) {
|
||||||
|
fail("Check log for errors");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void teardownCluster() throws Exception {
|
||||||
|
util.shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -21,6 +21,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -28,6 +29,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
@ -63,7 +65,7 @@ public class TestShortCircuitLocalRead {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
FSDataOutputStream stm = fileSys.create(name, true,
|
FSDataOutputStream stm = fileSys.create(name, true,
|
||||||
fileSys.getConf().getInt("io.file.buffer.size", 4096),
|
fileSys.getConf().getInt("io.file.buffer.size", 4096),
|
||||||
(short)repl, (long)blockSize);
|
(short)repl, blockSize);
|
||||||
return stm;
|
return stm;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -112,6 +114,43 @@ public class TestShortCircuitLocalRead {
|
||||||
stm.close();
|
stm.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies that reading a file with the direct read(ByteBuffer) api gives the expected set of bytes.
|
||||||
|
*/
|
||||||
|
static void checkFileContentDirect(FileSystem fs, Path name, byte[] expected,
|
||||||
|
int readOffset) throws IOException {
|
||||||
|
DFSDataInputStream stm = (DFSDataInputStream)fs.open(name);
|
||||||
|
|
||||||
|
ByteBuffer actual = ByteBuffer.allocate(expected.length - readOffset);
|
||||||
|
|
||||||
|
long skipped = stm.skip(readOffset);
|
||||||
|
Assert.assertEquals(skipped, readOffset);
|
||||||
|
|
||||||
|
actual.limit(3);
|
||||||
|
|
||||||
|
//Read a small number of bytes first.
|
||||||
|
int nread = stm.read(actual);
|
||||||
|
actual.limit(nread + 2);
|
||||||
|
nread += stm.read(actual);
|
||||||
|
|
||||||
|
// Read across chunk boundary
|
||||||
|
actual.limit(Math.min(actual.capacity(), nread + 517));
|
||||||
|
nread += stm.read(actual);
|
||||||
|
checkData(actual.array(), readOffset, expected, nread, "A few bytes");
|
||||||
|
//Now read rest of it
|
||||||
|
actual.limit(actual.capacity());
|
||||||
|
while (actual.hasRemaining()) {
|
||||||
|
int nbytes = stm.read(actual);
|
||||||
|
|
||||||
|
if (nbytes < 0) {
|
||||||
|
throw new EOFException("End of file reached before reading fully.");
|
||||||
|
}
|
||||||
|
nread += nbytes;
|
||||||
|
}
|
||||||
|
checkData(actual.array(), readOffset, expected, "Read 3");
|
||||||
|
stm.close();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test that file data can be read by reading the block file
|
* Test that file data can be read by reading the block file
|
||||||
* directly from the local store.
|
* directly from the local store.
|
||||||
|
@ -145,6 +184,7 @@ public class TestShortCircuitLocalRead {
|
||||||
stm.write(fileData);
|
stm.write(fileData);
|
||||||
stm.close();
|
stm.close();
|
||||||
checkFileContent(fs, file1, fileData, readOffset);
|
checkFileContent(fs, file1, fileData, readOffset);
|
||||||
|
checkFileContentDirect(fs, file1, fileData, readOffset);
|
||||||
} finally {
|
} finally {
|
||||||
fs.close();
|
fs.close();
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
@ -328,6 +368,7 @@ public class TestShortCircuitLocalRead {
|
||||||
Thread[] threads = new Thread[threadCount];
|
Thread[] threads = new Thread[threadCount];
|
||||||
for (int i = 0; i < threadCount; i++) {
|
for (int i = 0; i < threadCount; i++) {
|
||||||
threads[i] = new Thread() {
|
threads[i] = new Thread() {
|
||||||
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
for (int i = 0; i < iteration; i++) {
|
for (int i = 0; i < iteration; i++) {
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue