HDFS-10182. Hedged read might overwrite user's buf. Contributed by zhouyingchao.

(cherry picked from commit d8383c687c)
This commit is contained in:
Walter Su 2016-03-28 15:44:25 +08:00
parent 0286ac2576
commit d71843558b
1 changed files with 7 additions and 9 deletions

View File

@ -1315,7 +1315,7 @@ public class DFSInputStream extends FSInputStream
// chooseDataNode is a commitment. If no node, we go to // chooseDataNode is a commitment. If no node, we go to
// the NN to reget block locations. Only go here on first read. // the NN to reget block locations. Only go here on first read.
chosenNode = chooseDataNode(block, ignored); chosenNode = chooseDataNode(block, ignored);
bb = ByteBuffer.wrap(buf, offset, len); bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode( Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb, chosenNode, block, start, end, bb,
corruptedBlockMap, hedgedReadId++); corruptedBlockMap, hedgedReadId++);
@ -1326,7 +1326,9 @@ public class DFSInputStream extends FSInputStream
Future<ByteBuffer> future = hedgedService.poll( Future<ByteBuffer> future = hedgedService.poll(
conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS); conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
if (future != null) { if (future != null) {
future.get(); ByteBuffer result = future.get();
System.arraycopy(result.array(), result.position(), buf, offset,
len);
return; return;
} }
DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged " DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
@ -1364,13 +1366,9 @@ public class DFSInputStream extends FSInputStream
ByteBuffer result = getFirstToComplete(hedgedService, futures); ByteBuffer result = getFirstToComplete(hedgedService, futures);
// cancel the rest. // cancel the rest.
cancelAll(futures); cancelAll(futures);
if (result.array() != buf) { // compare the array pointers dfsClient.getHedgedReadMetrics().incHedgedReadWins();
dfsClient.getHedgedReadMetrics().incHedgedReadWins(); System.arraycopy(result.array(), result.position(), buf, offset,
System.arraycopy(result.array(), result.position(), buf, offset, len);
len);
} else {
dfsClient.getHedgedReadMetrics().incHedgedReadOps();
}
return; return;
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
// Ignore and retry // Ignore and retry