svn merge -c 1455740 FIXES: MAPREDUCE-5060. Fetch failures that time out only count against the first map task. Contributed by Robert Joseph Evans

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1455742 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2013-03-12 22:56:57 +00:00
parent b15c6f6721
commit 77e23898f1
3 changed files with 55 additions and 13 deletions

View File

@ -633,6 +633,9 @@ Release 0.23.7 - UNRELEASED
MAPREDUCE-5023. History Server Web Services missing Job Counters (Ravi MAPREDUCE-5023. History Server Web Services missing Job Counters (Ravi
Prakash via tgraves) Prakash via tgraves)
MAPREDUCE-5060. Fetch failures that time out only count against the first
map task (Robert Joseph Evans via jlowe)
Release 0.23.6 - UNRELEASED Release 0.23.6 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -221,7 +221,6 @@ class Fetcher<K,V> extends Thread {
// Construct the url and connect // Construct the url and connect
DataInputStream input; DataInputStream input;
boolean connectSucceeded = false;
try { try {
URL url = getMapOutputURL(host, maps); URL url = getMapOutputURL(host, maps);
@ -237,7 +236,6 @@ class Fetcher<K,V> extends Thread {
// set the read timeout // set the read timeout
connection.setReadTimeout(readTimeout); connection.setReadTimeout(readTimeout);
connect(connection, connectionTimeout); connect(connection, connectionTimeout);
connectSucceeded = true;
input = new DataInputStream(connection.getInputStream()); input = new DataInputStream(connection.getInputStream());
// Validate response code // Validate response code
@ -265,18 +263,10 @@ class Fetcher<K,V> extends Thread {
// If connect did not succeed, just mark all the maps as failed, // If connect did not succeed, just mark all the maps as failed,
// indirectly penalizing the host // indirectly penalizing the host
if (!connectSucceeded) { for(TaskAttemptID left: remaining) {
for(TaskAttemptID left: remaining) { scheduler.copyFailed(left, host, false, connectExcpt);
scheduler.copyFailed(left, host, connectSucceeded, connectExcpt);
}
} else {
// If we got a read error at this stage, it implies there was a problem
// with the first map, typically lost map. So, penalize only that map
// and add the rest
TaskAttemptID firstMap = maps.get(0);
scheduler.copyFailed(firstMap, host, connectSucceeded, connectExcpt);
} }
// Add back all the remaining maps, WITHOUT marking them as failed // Add back all the remaining maps, WITHOUT marking them as failed
for(TaskAttemptID left: remaining) { for(TaskAttemptID left: remaining) {
scheduler.putBackKnownMapOutput(host, left); scheduler.putBackKnownMapOutput(host, left);

View File

@ -26,6 +26,7 @@ import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.SocketTimeoutException;
import java.net.URL; import java.net.URL;
import java.util.ArrayList; import java.util.ArrayList;
@ -70,6 +71,54 @@ public class TestFetcher {
} }
} }
@SuppressWarnings("unchecked")
@Test(timeout=30000)
public void testCopyFromHostConnectionTimeout() throws Exception {
LOG.info("testCopyFromHostConnectionTimeout");
JobConf job = new JobConf();
TaskAttemptID id = TaskAttemptID.forName("attempt_0_1_r_1_1");
ShuffleScheduler<Text, Text> ss = mock(ShuffleScheduler.class);
MergeManagerImpl<Text, Text> mm = mock(MergeManagerImpl.class);
Reporter r = mock(Reporter.class);
ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
ExceptionReporter except = mock(ExceptionReporter.class);
SecretKey key = JobTokenSecretManager.createSecretKey(new byte[]{0,0,0,0});
HttpURLConnection connection = mock(HttpURLConnection.class);
when(connection.getInputStream()).thenThrow(
new SocketTimeoutException("This is a fake timeout :)"));
Counters.Counter allErrs = mock(Counters.Counter.class);
when(r.getCounter(anyString(), anyString()))
.thenReturn(allErrs);
Fetcher<Text,Text> underTest = new FakeFetcher<Text,Text>(job, id, ss, mm,
r, metrics, except, key, connection);
MapHost host = new MapHost("localhost", "http://localhost:8080/");
ArrayList<TaskAttemptID> maps = new ArrayList<TaskAttemptID>(1);
TaskAttemptID map1ID = TaskAttemptID.forName("attempt_0_1_m_1_1");
maps.add(map1ID);
TaskAttemptID map2ID = TaskAttemptID.forName("attempt_0_1_m_2_1");
maps.add(map2ID);
when(ss.getMapsForHost(host)).thenReturn(maps);
String encHash = "vFE234EIFCiBgYs2tCXY/SjT8Kg=";
underTest.copyFromHost(host);
verify(connection)
.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
encHash);
verify(allErrs).increment(1);
verify(ss).copyFailed(map1ID, host, false, false);
verify(ss).copyFailed(map2ID, host, false, false);
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
verify(ss).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Test @Test
public void testCopyFromHostBogusHeader() throws Exception { public void testCopyFromHostBogusHeader() throws Exception {