HDFS-11738. Hedged pread takes more time when block moved from initial locations. Contributed by Vinayakumar B.

This commit is contained in:
John Zhuge 2017-08-21 13:44:32 -07:00
parent 736ceab2f5
commit b6bfb2fcb2
3 changed files with 112 additions and 61 deletions

View File

@ -61,4 +61,6 @@ public class DFSClientFaultInjector {
public boolean skipRollingRestartWait() { public boolean skipRollingRestartWait() {
return false; return false;
} }
public void sleepBeforeHedgedGet() {}
} }

View File

@ -830,60 +830,85 @@ public class DFSInputStream extends FSInputStream
private DNAddrPair chooseDataNode(LocatedBlock block, private DNAddrPair chooseDataNode(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException { Collection<DatanodeInfo> ignoredNodes) throws IOException {
return chooseDataNode(block, ignoredNodes, true);
}
/**
* Choose datanode to read from.
*
* @param block Block to choose datanode addr from
* @param ignoredNodes Ignored nodes inside.
* @param refetchIfRequired Whether to refetch if no nodes to chose
* from.
* @return Returns chosen DNAddrPair; Can be null if refetchIfRequired is
* false.
*/
private DNAddrPair chooseDataNode(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes, boolean refetchIfRequired)
throws IOException {
while (true) { while (true) {
DNAddrPair result = getBestNodeDNAddrPair(block, ignoredNodes); DNAddrPair result = getBestNodeDNAddrPair(block, ignoredNodes);
if (result != null) { if (result != null) {
return result; return result;
} else if (refetchIfRequired) {
block = refetchLocations(block, ignoredNodes);
} else { } else {
String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(), return null;
deadNodes, ignoredNodes);
String blockInfo = block.getBlock() + " file=" + src;
if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
String description = "Could not obtain block: " + blockInfo;
DFSClient.LOG.warn(description + errMsg
+ ". Throwing a BlockMissingException");
throw new BlockMissingException(src, description,
block.getStartOffset());
}
DatanodeInfo[] nodes = block.getLocations();
if (nodes == null || nodes.length == 0) {
DFSClient.LOG.info("No node available for " + blockInfo);
}
DFSClient.LOG.info("Could not obtain " + block.getBlock()
+ " from any node: " + errMsg
+ ". Will get new block locations from namenode and retry...");
try {
// Introducing a random factor to the wait time before another retry.
// The wait time is dependent on # of failures and a random factor.
// At the first time of getting a BlockMissingException, the wait time
// is a random number between 0..3000 ms. If the first retry
// still fails, we will wait 3000 ms grace period before the 2nd retry.
// Also at the second retry, the waiting window is expanded to 6000 ms
// alleviating the request rate from the server. Similarly the 3rd retry
// will wait 6000ms grace period before retry and the waiting window is
// expanded to 9000ms.
final int timeWindow = dfsClient.getConf().getTimeWindow();
double waitTime = timeWindow * failures + // grace period for the last round of attempt
// expanding time window for each failure
timeWindow * (failures + 1) *
ThreadLocalRandom.current().nextDouble();
DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) +
" IOException, will wait for " + waitTime + " msec.");
Thread.sleep((long)waitTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException(
"Interrupted while choosing DataNode for read.");
}
deadNodes.clear(); //2nd option is to remove only nodes[blockId]
openInfo(true);
block = refreshLocatedBlock(block);
failures++;
} }
} }
} }
private LocatedBlock refetchLocations(LocatedBlock block,
Collection<DatanodeInfo> ignoredNodes) throws IOException {
String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
deadNodes, ignoredNodes);
String blockInfo = block.getBlock() + " file=" + src;
if (failures >= dfsClient.getConf().getMaxBlockAcquireFailures()) {
String description = "Could not obtain block: " + blockInfo;
DFSClient.LOG.warn(description + errMsg
+ ". Throwing a BlockMissingException");
throw new BlockMissingException(src, description,
block.getStartOffset());
}
DatanodeInfo[] nodes = block.getLocations();
if (nodes == null || nodes.length == 0) {
DFSClient.LOG.info("No node available for " + blockInfo);
}
DFSClient.LOG.info("Could not obtain " + block.getBlock()
+ " from any node: " + errMsg
+ ". Will get new block locations from namenode and retry...");
try {
// Introducing a random factor to the wait time before another retry.
// The wait time is dependent on # of failures and a random factor.
// At the first time of getting a BlockMissingException, the wait time
// is a random number between 0..3000 ms. If the first retry
// still fails, we will wait 3000 ms grace period before the 2nd retry.
// Also at the second retry, the waiting window is expanded to 6000 ms
// alleviating the request rate from the server. Similarly the 3rd retry
// will wait 6000ms grace period before retry and the waiting window is
// expanded to 9000ms.
final int timeWindow = dfsClient.getConf().getTimeWindow();
// grace period for the last round of attempt
double waitTime = timeWindow * failures +
// expanding time window for each failure
timeWindow * (failures + 1) *
ThreadLocalRandom.current().nextDouble();
DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) +
" IOException, will wait for " + waitTime + " msec.");
Thread.sleep((long)waitTime);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException(
"Interrupted while choosing DataNode for read.");
}
deadNodes.clear(); //2nd option is to remove only nodes[blockId]
openInfo(true);
block = refreshLocatedBlock(block);
failures++;
return block;
}
/** /**
* Get the best node from which to stream the data. * Get the best node from which to stream the data.
* @param block LocatedBlock, containing nodes in priority order. * @param block LocatedBlock, containing nodes in priority order.
@ -985,6 +1010,7 @@ public class DFSInputStream extends FSInputStream
return new Callable<ByteBuffer>() { return new Callable<ByteBuffer>() {
@Override @Override
public ByteBuffer call() throws Exception { public ByteBuffer call() throws Exception {
DFSClientFaultInjector.get().sleepBeforeHedgedGet();
try (TraceScope ignored = dfsClient.getTracer(). try (TraceScope ignored = dfsClient.getTracer().
newScope("hedgedRead" + hedgedReadId, parentSpanId)) { newScope("hedgedRead" + hedgedReadId, parentSpanId)) {
actualGetFromOneDataNode(datanode, start, end, bb, corruptedBlocks); actualGetFromOneDataNode(datanode, start, end, bb, corruptedBlocks);
@ -1159,20 +1185,22 @@ public class DFSInputStream extends FSInputStream
// We are starting up a 'hedged' read. We have a read already // We are starting up a 'hedged' read. We have a read already
// ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode. // ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
// If no nodes to do hedged reads against, pass. // If no nodes to do hedged reads against, pass.
boolean refetch = false;
try { try {
chosenNode = getBestNodeDNAddrPair(block, ignored); chosenNode = chooseDataNode(block, ignored, false);
if (chosenNode == null) { if (chosenNode != null) {
chosenNode = chooseDataNode(block, ignored); // Latest block, if refreshed internally
block = chosenNode.block;
bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable =
getFromOneDataNode(chosenNode, block, start, end, bb,
corruptedBlocks, hedgedReadId++);
Future<ByteBuffer> oneMoreRequest =
hedgedService.submit(getFromDataNodeCallable);
futures.add(oneMoreRequest);
} else {
refetch = true;
} }
// Latest block, if refreshed internally
block = chosenNode.block;
bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb,
corruptedBlocks, hedgedReadId++);
Future<ByteBuffer> oneMoreRequest = hedgedService
.submit(getFromDataNodeCallable);
futures.add(oneMoreRequest);
} catch (IOException ioe) { } catch (IOException ioe) {
DFSClient.LOG.debug("Failed getting node for hedged read: {}", DFSClient.LOG.debug("Failed getting node for hedged read: {}",
ioe.getMessage()); ioe.getMessage());
@ -1190,6 +1218,9 @@ public class DFSInputStream extends FSInputStream
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
// Ignore and retry // Ignore and retry
} }
if (refetch) {
refetchLocations(block, ignored);
}
// We got here if exception. Ignore this node on next go around IFF // We got here if exception. Ignore this node on next go around IFF
// we found a chosenNode to hedge read against. // we found a chosenNode to hedge read against.
if (chosenNode != null && chosenNode.info != null) { if (chosenNode != null && chosenNode.info != null) {

View File

@ -626,7 +626,7 @@ public class TestPread {
*/ */
@Test @Test
public void testPreadFailureWithChangedBlockLocations() throws Exception { public void testPreadFailureWithChangedBlockLocations() throws Exception {
doPreadTestWithChangedLocations(); doPreadTestWithChangedLocations(1);
} }
/** /**
@ -639,21 +639,36 @@ public class TestPread {
* 7. Consider next calls to getBlockLocations() always returns DN3 as last * 7. Consider next calls to getBlockLocations() always returns DN3 as last
* location.<br> * location.<br>
*/ */
@Test @Test(timeout = 60000)
public void testPreadHedgedFailureWithChangedBlockLocations() public void testPreadHedgedFailureWithChangedBlockLocations()
throws Exception { throws Exception {
isHedgedRead = true; isHedgedRead = true;
doPreadTestWithChangedLocations(); DFSClientFaultInjector old = DFSClientFaultInjector.get();
try {
DFSClientFaultInjector.set(new DFSClientFaultInjector() {
public void sleepBeforeHedgedGet() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
}
}
});
doPreadTestWithChangedLocations(2);
} finally {
DFSClientFaultInjector.set(old);
}
} }
private void doPreadTestWithChangedLocations() private void doPreadTestWithChangedLocations(int maxFailures)
throws IOException, TimeoutException, InterruptedException { throws IOException, TimeoutException, InterruptedException {
GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG); GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2); conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
if (isHedgedRead) { if (isHedgedRead) {
conf.setInt(HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY, 100);
conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, 2); conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, 2);
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 1000);
} }
try (MiniDFSCluster cluster = try (MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(3).build()) { new MiniDFSCluster.Builder(conf).numDataNodes(3).build()) {
@ -747,6 +762,9 @@ public class TestPread {
int n = din.read(0, buf, 0, data.length()); int n = din.read(0, buf, 0, data.length());
assertEquals(data.length(), n); assertEquals(data.length(), n);
assertEquals("Data should be read", data, new String(buf, 0, n)); assertEquals("Data should be read", data, new String(buf, 0, n));
assertTrue("Read should complete with maximum " + maxFailures
+ " failures, but completed with " + din.failures,
din.failures <= maxFailures);
DFSClient.LOG.info("Read completed"); DFSClient.LOG.info("Read completed");
} }
} }