MAPREDUCE-6303. Read timeout when retrying a fetch error can be fatal to a reducer. Contributed by Jason Lowe.

(cherry picked from commit eccb7d46ef)
This commit is contained in:
Junping Du 2015-04-02 12:13:03 -07:00
parent acfe44e5b0
commit 86e8c9958a
3 changed files with 74 additions and 35 deletions

View File

@ -189,6 +189,9 @@ Release 2.7.0 - UNRELEASED
MAPREDUCE-6285. ClientServiceDelegate should not retry upon
AuthenticationException. (Jonathan Eagles via ozawa)
MAPREDUCE-6303. Read timeout when retrying a fetch error can be fatal
to a reducer. (Jason Lowe via junping_du)
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -258,6 +258,39 @@ class Fetcher<K,V> extends Thread {
closeConnection();
}
private DataInputStream openShuffleUrl(MapHost host,
Set<TaskAttemptID> remaining, URL url) {
DataInputStream input = null;
try {
setupConnectionsWithRetry(host, remaining, url);
if (stopped) {
abortConnect(host, remaining);
} else {
input = new DataInputStream(connection.getInputStream());
}
} catch (IOException ie) {
boolean connectExcpt = ie instanceof ConnectException;
ioErrs.increment(1);
LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
" map outputs", ie);
// If connect did not succeed, just mark all the maps as failed,
// indirectly penalizing the host
scheduler.hostFailed(host.getHostName());
for(TaskAttemptID left: remaining) {
scheduler.copyFailed(left, host, false, connectExcpt);
}
// Add back all the remaining maps, WITHOUT marking them as failed
for(TaskAttemptID left: remaining) {
scheduler.putBackKnownMapOutput(host, left);
}
}
return input;
}
/**
* The crux of the matter...
*
@ -286,38 +319,12 @@ class Fetcher<K,V> extends Thread {
Set<TaskAttemptID> remaining = new HashSet<TaskAttemptID>(maps);
// Construct the url and connect
DataInputStream input = null;
URL url = getMapOutputURL(host, maps);
try {
setupConnectionsWithRetry(host, remaining, url);
if (stopped) {
abortConnect(host, remaining);
return;
}
} catch (IOException ie) {
boolean connectExcpt = ie instanceof ConnectException;
ioErrs.increment(1);
LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
" map outputs", ie);
// If connect did not succeed, just mark all the maps as failed,
// indirectly penalizing the host
scheduler.hostFailed(host.getHostName());
for(TaskAttemptID left: remaining) {
scheduler.copyFailed(left, host, false, connectExcpt);
}
// Add back all the remaining maps, WITHOUT marking them as failed
for(TaskAttemptID left: remaining) {
scheduler.putBackKnownMapOutput(host, left);
}
DataInputStream input = openShuffleUrl(host, remaining, url);
if (input == null) {
return;
}
input = new DataInputStream(connection.getInputStream());
try {
// Loop through available map-outputs and fetch them
// On any error, faildTasks is not null and we exit
@ -333,14 +340,10 @@ class Fetcher<K,V> extends Thread {
connection.disconnect();
// Get map output from remaining tasks only.
url = getMapOutputURL(host, remaining);
// Connect with retry as expecting host's recovery take sometime.
setupConnectionsWithRetry(host, remaining, url);
if (stopped) {
abortConnect(host, remaining);
input = openShuffleUrl(host, remaining, url);
if (input == null) {
return;
}
input = new DataInputStream(connection.getInputStream());
}
}
@ -591,7 +594,7 @@ class Fetcher<K,V> extends Thread {
// Retry is not timeout, let's do retry with throwing an exception.
if (currentTime - retryStartTime < this.fetchRetryTimeout) {
LOG.warn("Shuffle output from " + host.getHostName() +
" failed, retry it.");
" failed, retry it.", ioe);
throw ioe;
} else {
// timeout, prepare to be failed.

View File

@ -388,6 +388,39 @@ public class TestFetcher {
anyBoolean(), anyBoolean());
}
@SuppressWarnings("unchecked")
@Test(timeout=10000)
public void testCopyFromHostWithRetryThenTimeout() throws Exception {
InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(jobWithRetry,
id, ss, mm, r, metrics, except, key, connection);
String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
when(connection.getResponseCode()).thenReturn(200)
.thenThrow(new SocketTimeoutException("forced timeout"));
when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
.thenReturn(replyHash);
ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
header.write(new DataOutputStream(bout));
ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
when(connection.getInputStream()).thenReturn(in);
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
.thenReturn(immo);
doThrow(new IOException("forced error")).when(immo).shuffle(
any(MapHost.class), any(InputStream.class), anyLong(),
anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
underTest.copyFromHost(host);
verify(allErrs).increment(1);
verify(ss).copyFailed(map1ID, host, false, false);
}
@Test
public void testCopyFromHostExtraBytes() throws Exception {
Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,