MAPREDUCE-6303. Read timeout when retrying a fetch error can be fatal to a reducer. Contributed by Jason Lowe.
(cherry picked from commit eccb7d46ef
)
This commit is contained in:
parent
6e36dbf03a
commit
cacadea632
|
@ -265,6 +265,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
MAPREDUCE-6285. ClientServiceDelegate should not retry upon
|
MAPREDUCE-6285. ClientServiceDelegate should not retry upon
|
||||||
AuthenticationException. (Jonathan Eagles via ozawa)
|
AuthenticationException. (Jonathan Eagles via ozawa)
|
||||||
|
|
||||||
|
MAPREDUCE-6303. Read timeout when retrying a fetch error can be fatal
|
||||||
|
to a reducer. (Jason Lowe via junping_du)
|
||||||
|
|
||||||
Release 2.6.1 - UNRELEASED
|
Release 2.6.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -258,6 +258,39 @@ class Fetcher<K,V> extends Thread {
|
||||||
closeConnection();
|
closeConnection();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private DataInputStream openShuffleUrl(MapHost host,
|
||||||
|
Set<TaskAttemptID> remaining, URL url) {
|
||||||
|
DataInputStream input = null;
|
||||||
|
|
||||||
|
try {
|
||||||
|
setupConnectionsWithRetry(host, remaining, url);
|
||||||
|
if (stopped) {
|
||||||
|
abortConnect(host, remaining);
|
||||||
|
} else {
|
||||||
|
input = new DataInputStream(connection.getInputStream());
|
||||||
|
}
|
||||||
|
} catch (IOException ie) {
|
||||||
|
boolean connectExcpt = ie instanceof ConnectException;
|
||||||
|
ioErrs.increment(1);
|
||||||
|
LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
|
||||||
|
" map outputs", ie);
|
||||||
|
|
||||||
|
// If connect did not succeed, just mark all the maps as failed,
|
||||||
|
// indirectly penalizing the host
|
||||||
|
scheduler.hostFailed(host.getHostName());
|
||||||
|
for(TaskAttemptID left: remaining) {
|
||||||
|
scheduler.copyFailed(left, host, false, connectExcpt);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add back all the remaining maps, WITHOUT marking them as failed
|
||||||
|
for(TaskAttemptID left: remaining) {
|
||||||
|
scheduler.putBackKnownMapOutput(host, left);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return input;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The crux of the matter...
|
* The crux of the matter...
|
||||||
*
|
*
|
||||||
|
@ -286,38 +319,12 @@ class Fetcher<K,V> extends Thread {
|
||||||
Set<TaskAttemptID> remaining = new HashSet<TaskAttemptID>(maps);
|
Set<TaskAttemptID> remaining = new HashSet<TaskAttemptID>(maps);
|
||||||
|
|
||||||
// Construct the url and connect
|
// Construct the url and connect
|
||||||
DataInputStream input = null;
|
|
||||||
URL url = getMapOutputURL(host, maps);
|
URL url = getMapOutputURL(host, maps);
|
||||||
try {
|
DataInputStream input = openShuffleUrl(host, remaining, url);
|
||||||
setupConnectionsWithRetry(host, remaining, url);
|
if (input == null) {
|
||||||
|
|
||||||
if (stopped) {
|
|
||||||
abortConnect(host, remaining);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
} catch (IOException ie) {
|
|
||||||
boolean connectExcpt = ie instanceof ConnectException;
|
|
||||||
ioErrs.increment(1);
|
|
||||||
LOG.warn("Failed to connect to " + host + " with " + remaining.size() +
|
|
||||||
" map outputs", ie);
|
|
||||||
|
|
||||||
// If connect did not succeed, just mark all the maps as failed,
|
|
||||||
// indirectly penalizing the host
|
|
||||||
scheduler.hostFailed(host.getHostName());
|
|
||||||
for(TaskAttemptID left: remaining) {
|
|
||||||
scheduler.copyFailed(left, host, false, connectExcpt);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add back all the remaining maps, WITHOUT marking them as failed
|
|
||||||
for(TaskAttemptID left: remaining) {
|
|
||||||
scheduler.putBackKnownMapOutput(host, left);
|
|
||||||
}
|
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
input = new DataInputStream(connection.getInputStream());
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Loop through available map-outputs and fetch them
|
// Loop through available map-outputs and fetch them
|
||||||
// On any error, faildTasks is not null and we exit
|
// On any error, faildTasks is not null and we exit
|
||||||
|
@ -333,14 +340,10 @@ class Fetcher<K,V> extends Thread {
|
||||||
connection.disconnect();
|
connection.disconnect();
|
||||||
// Get map output from remaining tasks only.
|
// Get map output from remaining tasks only.
|
||||||
url = getMapOutputURL(host, remaining);
|
url = getMapOutputURL(host, remaining);
|
||||||
|
input = openShuffleUrl(host, remaining, url);
|
||||||
// Connect with retry as expecting host's recovery take sometime.
|
if (input == null) {
|
||||||
setupConnectionsWithRetry(host, remaining, url);
|
|
||||||
if (stopped) {
|
|
||||||
abortConnect(host, remaining);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
input = new DataInputStream(connection.getInputStream());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -591,7 +594,7 @@ class Fetcher<K,V> extends Thread {
|
||||||
// Retry is not timeout, let's do retry with throwing an exception.
|
// Retry is not timeout, let's do retry with throwing an exception.
|
||||||
if (currentTime - retryStartTime < this.fetchRetryTimeout) {
|
if (currentTime - retryStartTime < this.fetchRetryTimeout) {
|
||||||
LOG.warn("Shuffle output from " + host.getHostName() +
|
LOG.warn("Shuffle output from " + host.getHostName() +
|
||||||
" failed, retry it.");
|
" failed, retry it.", ioe);
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} else {
|
} else {
|
||||||
// timeout, prepare to be failed.
|
// timeout, prepare to be failed.
|
||||||
|
|
|
@ -388,6 +388,39 @@ public class TestFetcher {
|
||||||
anyBoolean(), anyBoolean());
|
anyBoolean(), anyBoolean());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Test(timeout=10000)
|
||||||
|
public void testCopyFromHostWithRetryThenTimeout() throws Exception {
|
||||||
|
InMemoryMapOutput<Text, Text> immo = mock(InMemoryMapOutput.class);
|
||||||
|
Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(jobWithRetry,
|
||||||
|
id, ss, mm, r, metrics, except, key, connection);
|
||||||
|
|
||||||
|
String replyHash = SecureShuffleUtils.generateHash(encHash.getBytes(), key);
|
||||||
|
|
||||||
|
when(connection.getResponseCode()).thenReturn(200)
|
||||||
|
.thenThrow(new SocketTimeoutException("forced timeout"));
|
||||||
|
when(connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH))
|
||||||
|
.thenReturn(replyHash);
|
||||||
|
ShuffleHeader header = new ShuffleHeader(map1ID.toString(), 10, 10, 1);
|
||||||
|
ByteArrayOutputStream bout = new ByteArrayOutputStream();
|
||||||
|
header.write(new DataOutputStream(bout));
|
||||||
|
ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray());
|
||||||
|
when(connection.getInputStream()).thenReturn(in);
|
||||||
|
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
|
||||||
|
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
|
||||||
|
when(connection.getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))
|
||||||
|
.thenReturn(ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
|
||||||
|
when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt()))
|
||||||
|
.thenReturn(immo);
|
||||||
|
doThrow(new IOException("forced error")).when(immo).shuffle(
|
||||||
|
any(MapHost.class), any(InputStream.class), anyLong(),
|
||||||
|
anyLong(), any(ShuffleClientMetrics.class), any(Reporter.class));
|
||||||
|
|
||||||
|
underTest.copyFromHost(host);
|
||||||
|
verify(allErrs).increment(1);
|
||||||
|
verify(ss).copyFailed(map1ID, host, false, false);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCopyFromHostExtraBytes() throws Exception {
|
public void testCopyFromHostExtraBytes() throws Exception {
|
||||||
Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
|
Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
|
||||||
|
|
Loading…
Reference in New Issue