MAPREDUCE-6361. NPE issue in shuffle caused by concurrent issue between copySucceeded() in one thread and copyFailed() in another thread on the same host. Contributed by Junping Du.

(cherry picked from commit f4e2b3cc0b)
This commit is contained in:
Tsuyoshi Ozawa 2015-05-13 00:28:17 +09:00
parent c31c6fbda7
commit fb5b0ebb45
3 changed files with 85 additions and 3 deletions

View File

@ -159,6 +159,10 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6360. TestMapreduceConfigFields is placed in wrong dir, MAPREDUCE-6360. TestMapreduceConfigFields is placed in wrong dir,
introducing compile error (Arshad Mohammad via vinayakumarb) introducing compile error (Arshad Mohammad via vinayakumarb)
MAPREDUCE-6361. NPE issue in shuffle caused by concurrent issue between
copySucceeded() in one thread and copyFailed() in another thread on the
same host. (Junping Du via ozawa)
Release 2.7.1 - UNRELEASED Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -239,7 +239,7 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
} }
private void updateStatus() { private void updateStatus() {
updateStatus(null); updateStatus(null);
} }
public synchronized void hostFailed(String hostname) { public synchronized void hostFailed(String hostname) {
@ -263,9 +263,17 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
failureCounts.put(mapId, new IntWritable(1)); failureCounts.put(mapId, new IntWritable(1));
} }
String hostname = host.getHostName(); String hostname = host.getHostName();
IntWritable hostFailedNum = hostFailures.get(hostname);
// MAPREDUCE-6361: hostname could get cleanup from hostFailures in another
// thread with copySucceeded.
// In this case, add back hostname to hostFailures to get rid of NPE issue.
if (hostFailedNum == null) {
hostFailures.put(hostname, new IntWritable(1));
}
//report failure if already retried maxHostFailures times //report failure if already retried maxHostFailures times
boolean hostFail = hostFailures.get(hostname).get() > getMaxHostFailures() ? true : false; boolean hostFail = hostFailures.get(hostname).get() >
getMaxHostFailures() ? true : false;
if (failures >= abortFailureLimit) { if (failures >= abortFailureLimit) {
try { try {
throw new IOException(failures + " failures downloading " + mapId); throw new IOException(failures + " failures downloading " + mapId);

View File

@ -213,6 +213,76 @@ public class TestShuffleScheduler {
Assert.assertEquals(copyMessage(10, 1, 2), progress.toString()); Assert.assertEquals(copyMessage(10, 1, 2), progress.toString());
} }
@SuppressWarnings("rawtypes")
@Test
public <K, V> void TestSucceedAndFailedCopyMap() throws Exception {
JobConf job = new JobConf();
job.setNumMapTasks(2);
//mock creation
TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
Reporter mockReporter = mock(Reporter.class);
FileSystem mockFileSystem = mock(FileSystem.class);
Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass = job.getCombinerClass();
@SuppressWarnings("unchecked") // needed for mock with generic
CombineOutputCollector<K, V> mockCombineOutputCollector =
(CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
Counter mockCounter = mock(Counter.class);
TaskStatus mockTaskStatus = mock(TaskStatus.class);
Progress mockProgress = mock(Progress.class);
MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
Task mockTask = mock(Task.class);
@SuppressWarnings("unchecked")
MapOutput<K, V> output = mock(MapOutput.class);
ShuffleConsumerPlugin.Context<K, V> context =
new ShuffleConsumerPlugin.Context<K, V>(
mockTaskAttemptID, job, mockFileSystem,
mockUmbilical, mockLocalDirAllocator,
mockReporter, mockCompressionCodec,
combinerClass, mockCombineOutputCollector,
mockCounter, mockCounter, mockCounter,
mockCounter, mockCounter, mockCounter,
mockTaskStatus, mockProgress, mockProgress,
mockTask, mockMapOutputFile, null);
TaskStatus status = new TaskStatus() {
@Override
public boolean getIsMap() {
return false;
}
@Override
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
}
};
Progress progress = new Progress();
ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job,
status, null, null, progress, context.getShuffledMapsCounter(),
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
MapHost host1 = new MapHost("host1", null);
TaskAttemptID failedAttemptID = new TaskAttemptID(
new org.apache.hadoop.mapred.TaskID(
new JobID("test",0), TaskType.MAP, 0), 0);
TaskAttemptID succeedAttemptID = new TaskAttemptID(
new org.apache.hadoop.mapred.TaskID(
new JobID("test",0), TaskType.MAP, 1), 1);
// handle output fetch failure for failedAttemptID, part I
scheduler.hostFailed(host1.getHostName());
// handle output fetch succeed for succeedAttemptID
long bytes = (long)500 * 1024 * 1024;
scheduler.copySucceeded(succeedAttemptID, host1, bytes, 0, 500000, output);
// handle output fetch failure for failedAttemptID, part II
// for MAPREDUCE-6361: verify no NPE exception get thrown out
scheduler.copyFailed(failedAttemptID, host1, true, false);
}
private static String copyMessage(int attemptNo, double rate1, double rate2) { private static String copyMessage(int attemptNo, double rate1, double rate2) {
int attemptZero = attemptNo - 1; int attemptZero = attemptNo - 1;
return String.format("copy task(attempt_test_0000_m_%06d_%d succeeded at %1.2f MB/s)" return String.format("copy task(attempt_test_0000_m_%06d_%d succeeded at %1.2f MB/s)"