diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 50a5e5aa09c..df8df393a28 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -159,6 +159,10 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6360. TestMapreduceConfigFields is placed in wrong dir, introducing compile error (Arshad Mohammad via vinayakumarb) + MAPREDUCE-6361. NPE issue in shuffle caused by concurrent issue between + copySucceeded() in one thread and copyFailed() in another thread on the + same host. (Junping Du via ozawa) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java index 8317672dfa1..ff0bb4fab24 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java @@ -239,7 +239,7 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { } private void updateStatus() { - updateStatus(null); + updateStatus(null); } public synchronized void hostFailed(String hostname) { @@ -263,9 +263,17 @@ public class ShuffleSchedulerImpl implements ShuffleScheduler { failureCounts.put(mapId, new IntWritable(1)); } String hostname = host.getHostName(); + IntWritable hostFailedNum = hostFailures.get(hostname); + // MAPREDUCE-6361: hostname could get cleanup from hostFailures in another + // thread with copySucceeded. + // In this case, add back hostname to hostFailures to get rid of NPE issue. + if (hostFailedNum == null) { + hostFailures.put(hostname, new IntWritable(1)); + } //report failure if already retried maxHostFailures times - boolean hostFail = hostFailures.get(hostname).get() > getMaxHostFailures() ? true : false; - + boolean hostFail = hostFailures.get(hostname).get() > + getMaxHostFailures() ? true : false; + if (failures >= abortFailureLimit) { try { throw new IOException(failures + " failures downloading " + mapId); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java index 6ac23200fb2..654b7488b98 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java @@ -213,6 +213,76 @@ public class TestShuffleScheduler { Assert.assertEquals(copyMessage(10, 1, 2), progress.toString()); } + @SuppressWarnings("rawtypes") + @Test + public void TestSucceedAndFailedCopyMap() throws Exception { + JobConf job = new JobConf(); + job.setNumMapTasks(2); + //mock creation + TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class); + Reporter mockReporter = mock(Reporter.class); + FileSystem mockFileSystem = mock(FileSystem.class); + Class combinerClass = job.getCombinerClass(); + @SuppressWarnings("unchecked") // needed for mock with generic + CombineOutputCollector mockCombineOutputCollector = + (CombineOutputCollector) mock(CombineOutputCollector.class); + org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID = + mock(org.apache.hadoop.mapreduce.TaskAttemptID.class); + LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class); + CompressionCodec mockCompressionCodec = mock(CompressionCodec.class); + Counter mockCounter = mock(Counter.class); + TaskStatus mockTaskStatus = mock(TaskStatus.class); + Progress mockProgress = mock(Progress.class); + MapOutputFile mockMapOutputFile = mock(MapOutputFile.class); + Task mockTask = mock(Task.class); + @SuppressWarnings("unchecked") + MapOutput output = mock(MapOutput.class); + + ShuffleConsumerPlugin.Context context = + new ShuffleConsumerPlugin.Context( + mockTaskAttemptID, job, mockFileSystem, + mockUmbilical, mockLocalDirAllocator, + mockReporter, mockCompressionCodec, + combinerClass, mockCombineOutputCollector, + mockCounter, mockCounter, mockCounter, + mockCounter, mockCounter, mockCounter, + mockTaskStatus, mockProgress, mockProgress, + mockTask, mockMapOutputFile, null); + TaskStatus status = new TaskStatus() { + @Override + public boolean getIsMap() { + return false; + } + @Override + public void addFetchFailedMap(TaskAttemptID mapTaskId) { + } + }; + Progress progress = new Progress(); + ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, + status, null, null, progress, context.getShuffledMapsCounter(), + context.getReduceShuffleBytes(), context.getFailedShuffleCounter()); + + MapHost host1 = new MapHost("host1", null); + TaskAttemptID failedAttemptID = new TaskAttemptID( + new org.apache.hadoop.mapred.TaskID( + new JobID("test",0), TaskType.MAP, 0), 0); + + TaskAttemptID succeedAttemptID = new TaskAttemptID( + new org.apache.hadoop.mapred.TaskID( + new JobID("test",0), TaskType.MAP, 1), 1); + + // handle output fetch failure for failedAttemptID, part I + scheduler.hostFailed(host1.getHostName()); + + // handle output fetch succeed for succeedAttemptID + long bytes = (long)500 * 1024 * 1024; + scheduler.copySucceeded(succeedAttemptID, host1, bytes, 0, 500000, output); + + // handle output fetch failure for failedAttemptID, part II + // for MAPREDUCE-6361: verify no NPE exception get thrown out + scheduler.copyFailed(failedAttemptID, host1, true, false); + } + private static String copyMessage(int attemptNo, double rate1, double rate2) { int attemptZero = attemptNo - 1; return String.format("copy task(attempt_test_0000_m_%06d_%d succeeded at %1.2f MB/s)"