HDFS-6231. Merging change r1586551 from trunk to branch-2.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1586553 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2014-04-11 03:58:40 +00:00
parent befba101f3
commit cb9c1d3c7b
3 changed files with 19 additions and 10 deletions

View File

@ -122,6 +122,9 @@ Release 2.4.1 - UNRELEASED
HDFS-6208. DataNode caching can leak file descriptors. (cnauroth)
HDFS-6231. DFSClient hangs infinitely if using hedged reads and all eligible
datanodes die. (cnauroth)
Release 2.4.0 - 2014-04-07
INCOMPATIBLE CHANGES

View File

@ -983,12 +983,15 @@ private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
return new Callable<ByteBuffer>() {
@Override
public ByteBuffer call() throws Exception {
byte[] buf = bb.array();
int offset = bb.position();
actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
corruptedBlockMap);
latch.countDown();
return bb;
try {
byte[] buf = bb.array();
int offset = bb.position();
actualGetFromOneDataNode(datanode, block, start, end, buf, offset,
corruptedBlockMap);
return bb;
} finally {
latch.countDown();
}
}
};
}
@ -1101,7 +1104,7 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
ArrayList<Future<ByteBuffer>> futures = null;
ArrayList<Future<ByteBuffer>> futures = new ArrayList<Future<ByteBuffer>>();
ArrayList<DatanodeInfo> ignored = new ArrayList<DatanodeInfo>();
ByteBuffer bb = null;
int len = (int) (end - start + 1);
@ -1112,7 +1115,7 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
DNAddrPair chosenNode = null;
Future<ByteBuffer> future = null;
// futures is null if there is no request already executing.
if (futures == null) {
if (futures.isEmpty()) {
// chooseDataNode is a commitment. If no node, we go to
// the NN to reget block locations. Only go here on first read.
chosenNode = chooseDataNode(block, ignored);
@ -1130,7 +1133,6 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
// Ignore this node on next go around.
ignored.add(chosenNode.info);
dfsClient.getHedgedReadMetrics().incHedgedReadOps();
futures = new ArrayList<Future<ByteBuffer>>();
futures.add(future);
continue; // no need to refresh block locations
} catch (InterruptedException e) {

View File

@ -243,7 +243,7 @@ public void testPreadDFSNoChecksum() throws IOException {
public void testHedgedPreadDFSBasic() throws IOException {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 100);
conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 1);
dfsPreadTest(conf, false, true); // normal pread
dfsPreadTest(conf, true, true); // trigger read code path without
// transferTo.
@ -279,6 +279,10 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
DistributedFileSystem fileSys = cluster.getFileSystem();
DFSClient dfsClient = fileSys.getClient();
DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
// Metrics instance is static, so we need to reset counts from prior tests.
metrics.hedgedReadOps.set(0);
metrics.hedgedReadOpsWin.set(0);
metrics.hedgedReadOpsInCurThread.set(0);
try {
Path file1 = new Path("hedgedReadMaxOut.dat");