Merged 1245118 from trunk for HDFS-2655.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1245122 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
91ebbeefb4
commit
ecacbc94a3
|
@ -19,6 +19,9 @@ Release 0.23.2 - UNRELEASED
|
|||
HDFS-2931. Switch DataNode's BlockVolumeChoosingPolicy to private-audience.
|
||||
(harsh via szetszwo)
|
||||
|
||||
HDFS-2655. BlockReaderLocal#skip performs unnecessary IO. (Brandon Li
|
||||
via jitendra)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -369,26 +369,68 @@ class BlockReaderLocal implements BlockReader {
|
|||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("skip " + n);
|
||||
}
|
||||
if (n <= 0) {
|
||||
return 0;
|
||||
}
|
||||
if (!verifyChecksum) {
|
||||
return dataIn.skip(n);
|
||||
}
|
||||
// Skip by reading the data so we stay in sync with checksums.
|
||||
// This could be implemented more efficiently in the future to
|
||||
// skip to the beginning of the appropriate checksum chunk
|
||||
// and then only read to the middle of that chunk.
|
||||
|
||||
// caller made sure newPosition is not beyond EOF.
|
||||
int remaining = dataBuff.remaining();
|
||||
int position = dataBuff.position();
|
||||
int newPosition = position + (int)n;
|
||||
|
||||
// if the new offset is already read into dataBuff, just reposition
|
||||
if (n <= remaining) {
|
||||
assert offsetFromChunkBoundary == 0;
|
||||
dataBuff.position(newPosition);
|
||||
return n;
|
||||
}
|
||||
|
||||
// for small gap, read through to keep the data/checksum in sync
|
||||
if (n - remaining <= bytesPerChecksum) {
|
||||
dataBuff.position(position + remaining);
|
||||
if (skipBuf == null) {
|
||||
skipBuf = new byte[bytesPerChecksum];
|
||||
}
|
||||
int ret = read(skipBuf, 0, (int)(n - remaining));
|
||||
return ret;
|
||||
}
|
||||
|
||||
// optimize for big gap: discard the current buffer, skip to
|
||||
// the beginning of the appropriate checksum chunk and then
|
||||
// read to the middle of that chunk to be in sync with checksums.
|
||||
this.offsetFromChunkBoundary = newPosition % bytesPerChecksum;
|
||||
long toskip = n - remaining - this.offsetFromChunkBoundary;
|
||||
|
||||
dataBuff.clear();
|
||||
checksumBuff.clear();
|
||||
|
||||
long dataSkipped = dataIn.skip(toskip);
|
||||
if (dataSkipped != toskip) {
|
||||
throw new IOException("skip error in data input stream");
|
||||
}
|
||||
long checkSumOffset = (dataSkipped / bytesPerChecksum) * checksumSize;
|
||||
if (checkSumOffset > 0) {
|
||||
long skipped = checksumIn.skip(checkSumOffset);
|
||||
if (skipped != checkSumOffset) {
|
||||
throw new IOException("skip error in checksum input stream");
|
||||
}
|
||||
}
|
||||
|
||||
// read into the middle of the chunk
|
||||
if (skipBuf == null) {
|
||||
skipBuf = new byte[bytesPerChecksum];
|
||||
}
|
||||
long nSkipped = 0;
|
||||
while ( nSkipped < n ) {
|
||||
int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
|
||||
int ret = read(skipBuf, 0, toSkip);
|
||||
if ( ret <= 0 ) {
|
||||
return nSkipped;
|
||||
}
|
||||
nSkipped += ret;
|
||||
assert skipBuf.length == bytesPerChecksum;
|
||||
assert this.offsetFromChunkBoundary < bytesPerChecksum;
|
||||
int ret = read(skipBuf, 0, this.offsetFromChunkBoundary);
|
||||
if (ret == -1) { // EOS
|
||||
return toskip;
|
||||
} else {
|
||||
return (toskip + ret);
|
||||
}
|
||||
return nSkipped;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -238,7 +238,53 @@ public class TestShortCircuitLocalRead {
|
|||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSkipWithVerifyChecksum() throws IOException {
|
||||
int size = blockSize;
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
|
||||
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false);
|
||||
conf.set(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY,
|
||||
UserGroupInformation.getCurrentUser().getShortUserName());
|
||||
if (simulatedStorage) {
|
||||
conf.setBoolean(SimulatedFSDataset.CONFIG_PROPERTY_SIMULATED, true);
|
||||
}
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
|
||||
.format(true).build();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
try {
|
||||
// check that / exists
|
||||
Path path = new Path("/");
|
||||
assertTrue("/ should be a directory", fs.getFileStatus(path)
|
||||
.isDirectory() == true);
|
||||
|
||||
byte[] fileData = AppendTestUtil.randomBytes(seed, size*3);
|
||||
// create a new file in home directory. Do not close it.
|
||||
Path file1 = new Path("filelocal.dat");
|
||||
FSDataOutputStream stm = createFile(fs, file1, 1);
|
||||
|
||||
// write to file
|
||||
stm.write(fileData);
|
||||
stm.close();
|
||||
|
||||
// now test the skip function
|
||||
FSDataInputStream instm = fs.open(file1);
|
||||
byte[] actual = new byte[fileData.length];
|
||||
// read something from the block first, otherwise BlockReaderLocal.skip()
|
||||
// will not be invoked
|
||||
int nread = instm.read(actual, 0, 3);
|
||||
long skipped = 2*size+3;
|
||||
instm.seek(skipped);
|
||||
nread = instm.read(actual, (int)(skipped + nread), 3);
|
||||
instm.close();
|
||||
|
||||
} finally {
|
||||
fs.close();
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to run benchmarks between shortcircuit read vs regular read with
|
||||
* specified number of threads simultaneously reading.
|
||||
|
|
Loading…
Reference in New Issue