MAPREDUCE-6957. shuffle hangs after a node manager connection timeout. Contributed by Jooseong Kim

(cherry picked from commit 4d98936eec)
This commit is contained in:
Jason Lowe 2017-09-13 17:21:13 -05:00
parent 78e9a98c4a
commit 12e9558ed1
4 changed files with 93 additions and 13 deletions

View File

@ -278,9 +278,6 @@ class Fetcher<K,V> extends Thread {
LOG.warn("Connection rejected by the host " + te.host + LOG.warn("Connection rejected by the host " + te.host +
". Will retry later."); ". Will retry later.");
scheduler.penalize(host, te.backoff); scheduler.penalize(host, te.backoff);
for (TaskAttemptID left : remaining) {
scheduler.putBackKnownMapOutput(host, left);
}
} catch (IOException ie) { } catch (IOException ie) {
boolean connectExcpt = ie instanceof ConnectException; boolean connectExcpt = ie instanceof ConnectException;
ioErrs.increment(1); ioErrs.increment(1);
@ -293,11 +290,6 @@ class Fetcher<K,V> extends Thread {
for(TaskAttemptID left: remaining) { for(TaskAttemptID left: remaining) {
scheduler.copyFailed(left, host, false, connectExcpt); scheduler.copyFailed(left, host, false, connectExcpt);
} }
// Add back all the remaining maps, WITHOUT marking them as failed
for(TaskAttemptID left: remaining) {
scheduler.putBackKnownMapOutput(host, left);
}
} }
return input; return input;
@ -332,12 +324,14 @@ class Fetcher<K,V> extends Thread {
// Construct the url and connect // Construct the url and connect
URL url = getMapOutputURL(host, maps); URL url = getMapOutputURL(host, maps);
DataInputStream input = openShuffleUrl(host, remaining, url); DataInputStream input = null;
try {
input = openShuffleUrl(host, remaining, url);
if (input == null) { if (input == null) {
return; return;
} }
try {
// Loop through available map-outputs and fetch them // Loop through available map-outputs and fetch them
// On any error, faildTasks is not null and we exit // On any error, faildTasks is not null and we exit
// after putting back the remaining maps to the // after putting back the remaining maps to the

View File

@ -217,6 +217,9 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
reduceShuffleBytes.increment(bytes); reduceShuffleBytes.increment(bytes);
lastProgressTime = Time.monotonicNow(); lastProgressTime = Time.monotonicNow();
LOG.debug("map " + mapId + " done " + status.getStateString()); LOG.debug("map " + mapId + " done " + status.getStateString());
} else {
LOG.warn("Aborting already-finished MapOutput for " + mapId);
output.abort();
} }
} }

View File

@ -471,7 +471,10 @@ public class TestFetcher {
underTest.copyFromHost(host); underTest.copyFromHost(host);
verify(allErrs).increment(1); verify(allErrs).increment(1);
verify(ss).copyFailed(map1ID, host, false, false); verify(ss, times(1)).copyFailed(map1ID, host, false, false);
verify(ss, times(1)).copyFailed(map2ID, host, false, false);
verify(ss, times(1)).putBackKnownMapOutput(any(MapHost.class), eq(map1ID));
verify(ss, times(1)).putBackKnownMapOutput(any(MapHost.class), eq(map2ID));
} }
@Test @Test

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.task.reduce; package org.apache.hadoop.mapreduce.task.reduce;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodec;
@ -283,6 +285,84 @@ public class TestShuffleScheduler {
scheduler.copyFailed(failedAttemptID, host1, true, false); scheduler.copyFailed(failedAttemptID, host1, true, false);
} }
@Test
public <K, V> void testDuplicateCopySucceeded() throws Exception {
JobConf job = new JobConf();
job.setNumMapTasks(2);
//mock creation
TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
Reporter mockReporter = mock(Reporter.class);
FileSystem mockFileSystem = mock(FileSystem.class);
Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass =
job.getCombinerClass();
@SuppressWarnings("unchecked") // needed for mock with generic
CombineOutputCollector<K, V> mockCombineOutputCollector =
(CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
Counter mockCounter = mock(Counter.class);
TaskStatus mockTaskStatus = mock(TaskStatus.class);
Progress mockProgress = mock(Progress.class);
MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
Task mockTask = mock(Task.class);
@SuppressWarnings("unchecked")
MapOutput<K, V> output1 = mock(MapOutput.class);
@SuppressWarnings("unchecked")
MapOutput<K, V> output2 = mock(MapOutput.class);
@SuppressWarnings("unchecked")
MapOutput<K, V> output3 = mock(MapOutput.class);
ShuffleConsumerPlugin.Context<K, V> context =
new ShuffleConsumerPlugin.Context<K, V>(
mockTaskAttemptID, job, mockFileSystem,
mockUmbilical, mockLocalDirAllocator,
mockReporter, mockCompressionCodec,
combinerClass, mockCombineOutputCollector,
mockCounter, mockCounter, mockCounter,
mockCounter, mockCounter, mockCounter,
mockTaskStatus, mockProgress, mockProgress,
mockTask, mockMapOutputFile, null);
TaskStatus status = new TaskStatus() {
@Override
public boolean getIsMap() {
return false;
}
@Override
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
}
};
Progress progress = new Progress();
ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job,
status, null, null, progress, context.getShuffledMapsCounter(),
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
MapHost host1 = new MapHost("host1", null);
TaskAttemptID succeedAttempt1ID = new TaskAttemptID(
new org.apache.hadoop.mapred.TaskID(
new JobID("test", 0), TaskType.MAP, 0), 0);
TaskAttemptID succeedAttempt2ID = new TaskAttemptID(
new org.apache.hadoop.mapred.TaskID(
new JobID("test", 0), TaskType.MAP, 0), 1);
TaskAttemptID succeedAttempt3ID = new TaskAttemptID(
new org.apache.hadoop.mapred.TaskID(
new JobID("test", 0), TaskType.MAP, 1), 0);
long bytes = (long)500 * 1024 * 1024;
//First successful copy for map 0 should commit output
scheduler.copySucceeded(succeedAttempt1ID, host1, bytes, 0, 1, output1);
verify(output1).commit();
//Second successful copy for map 0 should abort output
scheduler.copySucceeded(succeedAttempt2ID, host1, bytes, 0, 1, output2);
verify(output2).abort();
//First successful copy for map 1 should commit output
scheduler.copySucceeded(succeedAttempt3ID, host1, bytes, 0, 1, output3);
verify(output3).commit();
}
private static String copyMessage(int attemptNo, double rate1, double rate2) { private static String copyMessage(int attemptNo, double rate1, double rate2) {
int attemptZero = attemptNo - 1; int attemptZero = attemptNo - 1;
return String.format("copy task(attempt_test_0000_m_%06d_%d succeeded at %1.2f MB/s)" return String.format("copy task(attempt_test_0000_m_%06d_%d succeeded at %1.2f MB/s)"