From fb5b0ebb459cc8812084090a7ce7ac29e2ad147c Mon Sep 17 00:00:00 2001 From: Tsuyoshi Ozawa Date: Wed, 13 May 2015 00:28:17 +0900 Subject: [PATCH] MAPREDUCE-6361. NPE issue in shuffle caused by concurrent issue between copySucceeded() in one thread and copyFailed() in another thread on the same host. Contributed by Junping Du. (cherry picked from commit f4e2b3cc0b1f4e49c306bc09a9dddd0495225bb2) --- hadoop-mapreduce-project/CHANGES.txt | 4 ++ .../task/reduce/ShuffleSchedulerImpl.java | 14 +++- .../task/reduce/TestShuffleScheduler.java | 70 +++++++++++++++++++ 3 files changed, 85 insertions(+), 3 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 50a5e5aa09c..df8df393a28 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -159,6 +159,10 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6360. TestMapreduceConfigFields is placed in wrong dir, introducing compile error (Arshad Mohammad via vinayakumarb) + MAPREDUCE-6361. NPE issue in shuffle caused by concurrent issue between + copySucceeded() in one thread and copyFailed() in another thread on the + same host. (Junping Du via ozawa) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java index 8317672dfa1..ff0bb4fab24 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleSchedulerImpl.java @@ -239,7 +239,7 @@ private synchronized void updateStatus(String individualProgress) { } private void updateStatus() { - updateStatus(null); + updateStatus(null); } public synchronized void hostFailed(String hostname) { @@ -263,9 +263,17 @@ public synchronized void copyFailed(TaskAttemptID mapId, MapHost host, failureCounts.put(mapId, new IntWritable(1)); } String hostname = host.getHostName(); + IntWritable hostFailedNum = hostFailures.get(hostname); + // MAPREDUCE-6361: hostname could get cleanup from hostFailures in another + // thread with copySucceeded. + // In this case, add back hostname to hostFailures to get rid of NPE issue. + if (hostFailedNum == null) { + hostFailures.put(hostname, new IntWritable(1)); + } //report failure if already retried maxHostFailures times - boolean hostFail = hostFailures.get(hostname).get() > getMaxHostFailures() ? true : false; - + boolean hostFail = hostFailures.get(hostname).get() > + getMaxHostFailures() ? true : false; + if (failures >= abortFailureLimit) { try { throw new IOException(failures + " failures downloading " + mapId); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java index 6ac23200fb2..654b7488b98 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java @@ -213,6 +213,76 @@ public void addFetchFailedMap(TaskAttemptID mapTaskId) { Assert.assertEquals(copyMessage(10, 1, 2), progress.toString()); } + @SuppressWarnings("rawtypes") + @Test + public void TestSucceedAndFailedCopyMap() throws Exception { + JobConf job = new JobConf(); + job.setNumMapTasks(2); + //mock creation + TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class); + Reporter mockReporter = mock(Reporter.class); + FileSystem mockFileSystem = mock(FileSystem.class); + Class combinerClass = job.getCombinerClass(); + @SuppressWarnings("unchecked") // needed for mock with generic + CombineOutputCollector mockCombineOutputCollector = + (CombineOutputCollector) mock(CombineOutputCollector.class); + org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID = + mock(org.apache.hadoop.mapreduce.TaskAttemptID.class); + LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class); + CompressionCodec mockCompressionCodec = mock(CompressionCodec.class); + Counter mockCounter = mock(Counter.class); + TaskStatus mockTaskStatus = mock(TaskStatus.class); + Progress mockProgress = mock(Progress.class); + MapOutputFile mockMapOutputFile = mock(MapOutputFile.class); + Task mockTask = mock(Task.class); + @SuppressWarnings("unchecked") + MapOutput output = mock(MapOutput.class); + + ShuffleConsumerPlugin.Context context = + new ShuffleConsumerPlugin.Context( + mockTaskAttemptID, job, mockFileSystem, + mockUmbilical, mockLocalDirAllocator, + mockReporter, mockCompressionCodec, + combinerClass, mockCombineOutputCollector, + mockCounter, mockCounter, mockCounter, + mockCounter, mockCounter, mockCounter, + mockTaskStatus, mockProgress, mockProgress, + mockTask, mockMapOutputFile, null); + TaskStatus status = new TaskStatus() { + @Override + public boolean getIsMap() { + return false; + } + @Override + public void addFetchFailedMap(TaskAttemptID mapTaskId) { + } + }; + Progress progress = new Progress(); + ShuffleSchedulerImpl scheduler = new ShuffleSchedulerImpl(job, + status, null, null, progress, context.getShuffledMapsCounter(), + context.getReduceShuffleBytes(), context.getFailedShuffleCounter()); + + MapHost host1 = new MapHost("host1", null); + TaskAttemptID failedAttemptID = new TaskAttemptID( + new org.apache.hadoop.mapred.TaskID( + new JobID("test",0), TaskType.MAP, 0), 0); + + TaskAttemptID succeedAttemptID = new TaskAttemptID( + new org.apache.hadoop.mapred.TaskID( + new JobID("test",0), TaskType.MAP, 1), 1); + + // handle output fetch failure for failedAttemptID, part I + scheduler.hostFailed(host1.getHostName()); + + // handle output fetch succeed for succeedAttemptID + long bytes = (long)500 * 1024 * 1024; + scheduler.copySucceeded(succeedAttemptID, host1, bytes, 0, 500000, output); + + // handle output fetch failure for failedAttemptID, part II + // for MAPREDUCE-6361: verify no NPE exception get thrown out + scheduler.copyFailed(failedAttemptID, host1, true, false); + } + private static String copyMessage(int attemptNo, double rate1, double rate2) { int attemptZero = attemptNo - 1; return String.format("copy task(attempt_test_0000_m_%06d_%d succeeded at %1.2f MB/s)"