HDFS-11303. Hedged read might hang infinitely if read data from all DN failed . Contributed by Chen Zhang, Wei-chiu Chuang, and John Zhuge.
This commit is contained in:
parent
28d97b79b6
commit
8b242f09a6
|
@ -1131,8 +1131,9 @@ public class DFSInputStream extends FSInputStream
|
||||||
Future<ByteBuffer> firstRequest = hedgedService
|
Future<ByteBuffer> firstRequest = hedgedService
|
||||||
.submit(getFromDataNodeCallable);
|
.submit(getFromDataNodeCallable);
|
||||||
futures.add(firstRequest);
|
futures.add(firstRequest);
|
||||||
|
Future<ByteBuffer> future = null;
|
||||||
try {
|
try {
|
||||||
Future<ByteBuffer> future = hedgedService.poll(
|
future = hedgedService.poll(
|
||||||
conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
|
conf.getHedgedReadThresholdMillis(), TimeUnit.MILLISECONDS);
|
||||||
if (future != null) {
|
if (future != null) {
|
||||||
ByteBuffer result = future.get();
|
ByteBuffer result = future.get();
|
||||||
|
@ -1142,16 +1143,18 @@ public class DFSInputStream extends FSInputStream
|
||||||
}
|
}
|
||||||
DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
|
DFSClient.LOG.debug("Waited {}ms to read from {}; spawning hedged "
|
||||||
+ "read", conf.getHedgedReadThresholdMillis(), chosenNode.info);
|
+ "read", conf.getHedgedReadThresholdMillis(), chosenNode.info);
|
||||||
// Ignore this node on next go around.
|
|
||||||
ignored.add(chosenNode.info);
|
|
||||||
dfsClient.getHedgedReadMetrics().incHedgedReadOps();
|
dfsClient.getHedgedReadMetrics().incHedgedReadOps();
|
||||||
// continue; no need to refresh block locations
|
// continue; no need to refresh block locations
|
||||||
} catch (ExecutionException e) {
|
} catch (ExecutionException e) {
|
||||||
// Ignore
|
futures.remove(future);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new InterruptedIOException(
|
throw new InterruptedIOException(
|
||||||
"Interrupted while waiting for reading task");
|
"Interrupted while waiting for reading task");
|
||||||
}
|
}
|
||||||
|
// Ignore this node on next go around.
|
||||||
|
// If poll timeout and the request still ongoing, don't consider it
|
||||||
|
// again. If read data failed, don't consider it either.
|
||||||
|
ignored.add(chosenNode.info);
|
||||||
} else {
|
} else {
|
||||||
// We are starting up a 'hedged' read. We have a read already
|
// We are starting up a 'hedged' read. We have a read already
|
||||||
// ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
|
// ongoing. Call getBestNodeDNAddrPair instead of chooseDataNode.
|
||||||
|
|
|
@ -59,6 +59,8 @@ import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This class tests the DFS positional read functionality in a single node
|
* This class tests the DFS positional read functionality in a single node
|
||||||
|
@ -72,6 +74,9 @@ public class TestPread {
|
||||||
boolean simulatedStorage;
|
boolean simulatedStorage;
|
||||||
boolean isHedgedRead;
|
boolean isHedgedRead;
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(TestPread.class.getName());
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
simulatedStorage = false;
|
simulatedStorage = false;
|
||||||
|
@ -551,6 +556,64 @@ public class TestPread {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=30000)
|
||||||
|
public void testHedgedReadFromAllDNFailed() throws IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
int numHedgedReadPoolThreads = 5;
|
||||||
|
final int hedgedReadTimeoutMillis = 50;
|
||||||
|
|
||||||
|
conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
|
||||||
|
numHedgedReadPoolThreads);
|
||||||
|
conf.setLong(HdfsClientConfigKeys.HedgedRead.THRESHOLD_MILLIS_KEY,
|
||||||
|
hedgedReadTimeoutMillis);
|
||||||
|
conf.setInt(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 0);
|
||||||
|
// Set up the InjectionHandler
|
||||||
|
DFSClientFaultInjector.set(Mockito.mock(DFSClientFaultInjector.class));
|
||||||
|
DFSClientFaultInjector injector = DFSClientFaultInjector.get();
|
||||||
|
Mockito.doAnswer(new Answer<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
if (true) {
|
||||||
|
LOG.info("-------------- throw Checksum Exception");
|
||||||
|
throw new ChecksumException("ChecksumException test", 100);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}).when(injector).fetchFromDatanodeException();
|
||||||
|
|
||||||
|
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3)
|
||||||
|
.format(true).build();
|
||||||
|
DistributedFileSystem fileSys = cluster.getFileSystem();
|
||||||
|
DFSClient dfsClient = fileSys.getClient();
|
||||||
|
FSDataOutputStream output = null;
|
||||||
|
DFSInputStream input = null;
|
||||||
|
String filename = "/hedgedReadMaxOut.dat";
|
||||||
|
DFSHedgedReadMetrics metrics = dfsClient.getHedgedReadMetrics();
|
||||||
|
// Metrics instance is static, so we need to reset counts from prior tests.
|
||||||
|
metrics.hedgedReadOps.set(0);
|
||||||
|
try {
|
||||||
|
Path file = new Path(filename);
|
||||||
|
output = fileSys.create(file, (short) 2);
|
||||||
|
byte[] data = new byte[64 * 1024];
|
||||||
|
output.write(data);
|
||||||
|
output.flush();
|
||||||
|
output.close();
|
||||||
|
byte[] buffer = new byte[64 * 1024];
|
||||||
|
input = dfsClient.open(filename);
|
||||||
|
input.read(0, buffer, 0, 1024);
|
||||||
|
Assert.fail("Reading the block should have thrown BlockMissingException");
|
||||||
|
} catch (BlockMissingException e) {
|
||||||
|
assertEquals(3, input.getHedgedReadOpsLoopNumForTesting());
|
||||||
|
assertTrue(metrics.getHedgedReadOps() == 0);
|
||||||
|
} finally {
|
||||||
|
Mockito.reset(injector);
|
||||||
|
IOUtils.cleanupWithLogger(LOG, input);
|
||||||
|
IOUtils.cleanupWithLogger(LOG, output);
|
||||||
|
fileSys.close();
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scenario: 1. Write a file with RF=2, DN1 and DN2<br>
|
* Scenario: 1. Write a file with RF=2, DN1 and DN2<br>
|
||||||
* 2. Open the stream, Consider Locations are [DN1, DN2] in LocatedBlock.<br>
|
* 2. Open the stream, Consider Locations are [DN1, DN2] in LocatedBlock.<br>
|
||||||
|
|
Loading…
Reference in New Issue