HDFS-11738. Hedged pread takes more time when block moved from initial locations. Contributed by Vinayakumar B.
(cherry picked from commit b6bfb2fcb2391d51b8de97c01c1290880779132e)
This commit is contained in:
parent
a45ffdcddc
commit
c54c3500ed
@ -61,4 +61,6 @@ public void readFromDatanodeDelay() {}
|
||||
public boolean skipRollingRestartWait() {
|
||||
return false;
|
||||
}
|
||||
|
||||
public void sleepBeforeHedgedGet() {}
|
||||
}
|
||||
|
@ -1036,11 +1036,36 @@ protected void addIntoCorruptedBlockMap(ExtendedBlock blk, DatanodeInfo node,
|
||||
|
||||
private DNAddrPair chooseDataNode(LocatedBlock block,
|
||||
Collection<DatanodeInfo> ignoredNodes) throws IOException {
|
||||
return chooseDataNode(block, ignoredNodes, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Choose datanode to read from.
|
||||
*
|
||||
* @param block Block to choose datanode addr from
|
||||
* @param ignoredNodes Ignored nodes inside.
|
||||
* @param refetchIfRequired Whether to refetch if no nodes to chose
|
||||
* from.
|
||||
* @return Returns chosen DNAddrPair; Can be null if refetchIfRequired is
|
||||
* false.
|
||||
*/
|
||||
private DNAddrPair chooseDataNode(LocatedBlock block,
|
||||
Collection<DatanodeInfo> ignoredNodes, boolean refetchIfRequired)
|
||||
throws IOException {
|
||||
while (true) {
|
||||
DNAddrPair result = getBestNodeDNAddrPair(block, ignoredNodes);
|
||||
if (result != null) {
|
||||
return result;
|
||||
} else if (refetchIfRequired) {
|
||||
block = refetchLocations(block, ignoredNodes);
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private LocatedBlock refetchLocations(LocatedBlock block,
|
||||
Collection<DatanodeInfo> ignoredNodes) throws IOException {
|
||||
String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
|
||||
deadNodes, ignoredNodes);
|
||||
String blockInfo = block.getBlock() + " file=" + src;
|
||||
@ -1070,7 +1095,8 @@ private DNAddrPair chooseDataNode(LocatedBlock block,
|
||||
// will wait 6000ms grace period before retry and the waiting window is
|
||||
// expanded to 9000ms.
|
||||
final int timeWindow = dfsClient.getConf().getTimeWindow();
|
||||
double waitTime = timeWindow * failures + // grace period for the last round of attempt
|
||||
// grace period for the last round of attempt
|
||||
double waitTime = timeWindow * failures +
|
||||
// expanding time window for each failure
|
||||
timeWindow * (failures + 1) *
|
||||
ThreadLocalRandom.current().nextDouble();
|
||||
@ -1086,8 +1112,7 @@ private DNAddrPair chooseDataNode(LocatedBlock block,
|
||||
openInfo(true);
|
||||
block = refreshLocatedBlock(block);
|
||||
failures++;
|
||||
}
|
||||
}
|
||||
return block;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1183,6 +1208,7 @@ private Callable<ByteBuffer> getFromOneDataNode(final DNAddrPair datanode,
|
||||
return new Callable<ByteBuffer>() {
|
||||
@Override
|
||||
public ByteBuffer call() throws Exception {
|
||||
DFSClientFaultInjector.get().sleepBeforeHedgedGet();
|
||||
byte[] buf = bb.array();
|
||||
int offset = bb.position();
|
||||
try (TraceScope ignored = dfsClient.getTracer().
|
||||
@ -1385,20 +1411,22 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
|
||||
// We are starting up a 'hedged' read. We have a read already
|
||||
// ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
|
||||
// If no nodes to do hedged reads against, pass.
|
||||
boolean refetch = false;
|
||||
try {
|
||||
chosenNode = getBestNodeDNAddrPair(block, ignored);
|
||||
if (chosenNode == null) {
|
||||
chosenNode = chooseDataNode(block, ignored);
|
||||
}
|
||||
chosenNode = chooseDataNode(block, ignored, false);
|
||||
if (chosenNode != null) {
|
||||
// Latest block, if refreshed internally
|
||||
block = chosenNode.block;
|
||||
bb = ByteBuffer.allocate(len);
|
||||
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
|
||||
chosenNode, block, start, end, bb,
|
||||
corruptedBlockMap, hedgedReadId++);
|
||||
chosenNode, block, start, end, bb, corruptedBlockMap,
|
||||
hedgedReadId++);
|
||||
Future<ByteBuffer> oneMoreRequest = hedgedService
|
||||
.submit(getFromDataNodeCallable);
|
||||
futures.add(oneMoreRequest);
|
||||
} else {
|
||||
refetch = true;
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
DFSClient.LOG.debug("Failed getting node for hedged read: {}",
|
||||
ioe.getMessage());
|
||||
@ -1416,6 +1444,9 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start,
|
||||
} catch (InterruptedException ie) {
|
||||
// Ignore and retry
|
||||
}
|
||||
if (refetch) {
|
||||
refetchLocations(block, ignored);
|
||||
}
|
||||
// We got here if exception. Ignore this node on next go around IFF
|
||||
// we found a chosenNode to hedge read against.
|
||||
if (chosenNode != null && chosenNode.info != null) {
|
||||
|
@ -629,7 +629,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||
*/
|
||||
@Test
|
||||
public void testPreadFailureWithChangedBlockLocations() throws Exception {
|
||||
doPreadTestWithChangedLocations();
|
||||
doPreadTestWithChangedLocations(1);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -642,21 +642,36 @@ public void testPreadFailureWithChangedBlockLocations() throws Exception {
|
||||
* 7. Consider next calls to getBlockLocations() always returns DN3 as last
|
||||
* location.<br>
|
||||
*/
|
||||
@Test
|
||||
@Test(timeout = 60000)
|
||||
public void testPreadHedgedFailureWithChangedBlockLocations()
|
||||
throws Exception {
|
||||
isHedgedRead = true;
|
||||
doPreadTestWithChangedLocations();
|
||||
DFSClientFaultInjector old = DFSClientFaultInjector.get();
|
||||
try {
|
||||
DFSClientFaultInjector.set(new DFSClientFaultInjector() {
|
||||
public void sleepBeforeHedgedGet() {
|
||||
try {
|
||||
Thread.sleep(500);
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
});
|
||||
doPreadTestWithChangedLocations(2);
|
||||
} finally {
|
||||
DFSClientFaultInjector.set(old);
|
||||
}
|
||||
}
|
||||
|
||||
private void doPreadTestWithChangedLocations()
|
||||
private void doPreadTestWithChangedLocations(int maxFailures)
|
||||
throws IOException, TimeoutException, InterruptedException {
|
||||
GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
|
||||
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||
if (isHedgedRead) {
|
||||
conf.setInt(HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY, 100);
|
||||
conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, 2);
|
||||
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 1000);
|
||||
}
|
||||
try (MiniDFSCluster cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(3).build()) {
|
||||
@ -750,6 +765,9 @@ public Boolean get() {
|
||||
int n = din.read(0, buf, 0, data.length());
|
||||
assertEquals(data.length(), n);
|
||||
assertEquals("Data should be read", data, new String(buf, 0, n));
|
||||
assertTrue("Read should complete with maximum " + maxFailures
|
||||
+ " failures, but completed with " + din.failures,
|
||||
din.failures <= maxFailures);
|
||||
DFSClient.LOG.info("Read completed");
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user