HDFS-8076. Code cleanup for DFSInputStream: use offset instead of LocatedBlock when possible. Contributed by Zhe Zhang.
(cherry picked from commit a42bb1cd91
)
This commit is contained in:
parent
55b794e7fa
commit
9792500c54
|
@ -81,6 +81,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
HDFS-8046. Allow better control of getContentSummary (kihwal)
|
HDFS-8046. Allow better control of getContentSummary (kihwal)
|
||||||
|
|
||||||
|
HDFS-8076. Code cleanup for DFSInputStream: use offset instead of
|
||||||
|
LocatedBlock when possible. (Zhe Zhang via wang)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -1045,16 +1045,16 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
return errMsgr.toString();
|
return errMsgr.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void fetchBlockByteRange(LocatedBlock block, long start, long end,
|
private void fetchBlockByteRange(long blockStartOffset, long start, long end,
|
||||||
byte[] buf, int offset,
|
byte[] buf, int offset,
|
||||||
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
block = getBlockAt(block.getStartOffset());
|
LocatedBlock block = getBlockAt(blockStartOffset);
|
||||||
while (true) {
|
while (true) {
|
||||||
DNAddrPair addressPair = chooseDataNode(block, null);
|
DNAddrPair addressPair = chooseDataNode(block, null);
|
||||||
try {
|
try {
|
||||||
actualGetFromOneDataNode(addressPair, block, start, end, buf, offset,
|
actualGetFromOneDataNode(addressPair, blockStartOffset, start, end,
|
||||||
corruptedBlockMap);
|
buf, offset, corruptedBlockMap);
|
||||||
return;
|
return;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// Ignore. Already processed inside the function.
|
// Ignore. Already processed inside the function.
|
||||||
|
@ -1064,7 +1064,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
}
|
}
|
||||||
|
|
||||||
private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
|
private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
|
||||||
final LocatedBlock block, final long start, final long end,
|
final long blockStartOffset, final long start, final long end,
|
||||||
final ByteBuffer bb,
|
final ByteBuffer bb,
|
||||||
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
|
final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap,
|
||||||
final int hedgedReadId) {
|
final int hedgedReadId) {
|
||||||
|
@ -1077,8 +1077,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
TraceScope scope =
|
TraceScope scope =
|
||||||
Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
|
Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan);
|
||||||
try {
|
try {
|
||||||
actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
|
actualGetFromOneDataNode(datanode, blockStartOffset, start, end, buf,
|
||||||
corruptedBlockMap);
|
offset, corruptedBlockMap);
|
||||||
return bb;
|
return bb;
|
||||||
} finally {
|
} finally {
|
||||||
scope.close();
|
scope.close();
|
||||||
|
@ -1088,7 +1088,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
}
|
}
|
||||||
|
|
||||||
private void actualGetFromOneDataNode(final DNAddrPair datanode,
|
private void actualGetFromOneDataNode(final DNAddrPair datanode,
|
||||||
LocatedBlock block, final long start, final long end, byte[] buf,
|
long blockStartOffset, final long start, final long end, byte[] buf,
|
||||||
int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
DFSClientFaultInjector.get().startFetchFromDatanode();
|
DFSClientFaultInjector.get().startFetchFromDatanode();
|
||||||
|
@ -1101,7 +1101,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
// start of the loop.
|
// start of the loop.
|
||||||
CachingStrategy curCachingStrategy;
|
CachingStrategy curCachingStrategy;
|
||||||
boolean allowShortCircuitLocalReads;
|
boolean allowShortCircuitLocalReads;
|
||||||
block = getBlockAt(block.getStartOffset());
|
LocatedBlock block = getBlockAt(blockStartOffset);
|
||||||
synchronized(infoLock) {
|
synchronized(infoLock) {
|
||||||
curCachingStrategy = cachingStrategy;
|
curCachingStrategy = cachingStrategy;
|
||||||
allowShortCircuitLocalReads = !shortCircuitForbidden();
|
allowShortCircuitLocalReads = !shortCircuitForbidden();
|
||||||
|
@ -1189,7 +1189,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
* if the first read is taking longer than configured amount of
|
* if the first read is taking longer than configured amount of
|
||||||
* time. We then wait on which ever read returns first.
|
* time. We then wait on which ever read returns first.
|
||||||
*/
|
*/
|
||||||
private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
|
private void hedgedFetchBlockByteRange(long blockStartOffset, long start,
|
||||||
long end, byte[] buf, int offset,
|
long end, byte[] buf, int offset,
|
||||||
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -1201,7 +1201,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
ByteBuffer bb = null;
|
ByteBuffer bb = null;
|
||||||
int len = (int) (end - start + 1);
|
int len = (int) (end - start + 1);
|
||||||
int hedgedReadId = 0;
|
int hedgedReadId = 0;
|
||||||
block = getBlockAt(block.getStartOffset());
|
LocatedBlock block = getBlockAt(blockStartOffset);
|
||||||
while (true) {
|
while (true) {
|
||||||
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
|
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
|
||||||
hedgedReadOpsLoopNumForTesting++;
|
hedgedReadOpsLoopNumForTesting++;
|
||||||
|
@ -1213,8 +1213,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
chosenNode = chooseDataNode(block, ignored);
|
chosenNode = chooseDataNode(block, ignored);
|
||||||
bb = ByteBuffer.wrap(buf, offset, len);
|
bb = ByteBuffer.wrap(buf, offset, len);
|
||||||
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
|
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
|
||||||
chosenNode, block, start, end, bb, corruptedBlockMap,
|
chosenNode, block.getStartOffset(), start, end, bb,
|
||||||
hedgedReadId++);
|
corruptedBlockMap, hedgedReadId++);
|
||||||
Future<ByteBuffer> firstRequest = hedgedService
|
Future<ByteBuffer> firstRequest = hedgedService
|
||||||
.submit(getFromDataNodeCallable);
|
.submit(getFromDataNodeCallable);
|
||||||
futures.add(firstRequest);
|
futures.add(firstRequest);
|
||||||
|
@ -1251,8 +1251,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
}
|
}
|
||||||
bb = ByteBuffer.allocate(len);
|
bb = ByteBuffer.allocate(len);
|
||||||
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
|
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
|
||||||
chosenNode, block, start, end, bb, corruptedBlockMap,
|
chosenNode, block.getStartOffset(), start, end, bb,
|
||||||
hedgedReadId++);
|
corruptedBlockMap, hedgedReadId++);
|
||||||
Future<ByteBuffer> oneMoreRequest = hedgedService
|
Future<ByteBuffer> oneMoreRequest = hedgedService
|
||||||
.submit(getFromDataNodeCallable);
|
.submit(getFromDataNodeCallable);
|
||||||
futures.add(oneMoreRequest);
|
futures.add(oneMoreRequest);
|
||||||
|
@ -1405,11 +1405,13 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
|
||||||
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
|
long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
|
||||||
try {
|
try {
|
||||||
if (dfsClient.isHedgedReadsEnabled()) {
|
if (dfsClient.isHedgedReadsEnabled()) {
|
||||||
hedgedFetchBlockByteRange(blk, targetStart, targetStart + bytesToRead
|
hedgedFetchBlockByteRange(blk.getStartOffset(), targetStart,
|
||||||
- 1, buffer, offset, corruptedBlockMap);
|
targetStart + bytesToRead - 1, buffer, offset,
|
||||||
|
corruptedBlockMap);
|
||||||
} else {
|
} else {
|
||||||
fetchBlockByteRange(blk, targetStart, targetStart + bytesToRead - 1,
|
fetchBlockByteRange(blk.getStartOffset(), targetStart,
|
||||||
buffer, offset, corruptedBlockMap);
|
targetStart + bytesToRead - 1, buffer, offset,
|
||||||
|
corruptedBlockMap);
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
// Check and report if any block replicas are corrupted.
|
// Check and report if any block replicas are corrupted.
|
||||||
|
|
Loading…
Reference in New Issue