MAPREDUCE-6361. NPE issue in shuffle caused by concurrent issue between copySucceeded() in one thread and copyFailed() in another thread on the same host. Contributed by Junping Du.
This commit is contained in:
parent
273d2f9753
commit
6b03ec5137
|
@ -36,6 +36,10 @@ Release 2.7.1 - UNRELEASED
|
|||
that they don't fail on history-server backed by DFSes with not so strong
|
||||
guarantees. (Craig Welch via vinodkv)
|
||||
|
||||
MAPREDUCE-6361. NPE issue in shuffle caused by concurrent issue between
|
||||
copySucceeded() in one thread and copyFailed() in another thread on the
|
||||
same host. (Junping Du via ozawa)
|
||||
|
||||
Release 2.7.0 - 2015-04-20
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -263,8 +263,16 @@ public class ShuffleSchedulerImpl<K,V> implements ShuffleScheduler<K,V> {
|
|||
failureCounts.put(mapId, new IntWritable(1));
|
||||
}
|
||||
String hostname = host.getHostName();
|
||||
IntWritable hostFailedNum = hostFailures.get(hostname);
|
||||
// MAPREDUCE-6361: hostname could get cleanup from hostFailures in another
|
||||
// thread with copySucceeded.
|
||||
// In this case, add back hostname to hostFailures to get rid of NPE issue.
|
||||
if (hostFailedNum == null) {
|
||||
hostFailures.put(hostname, new IntWritable(1));
|
||||
}
|
||||
//report failure if already retried maxHostFailures times
|
||||
boolean hostFail = hostFailures.get(hostname).get() > getMaxHostFailures() ? true : false;
|
||||
boolean hostFail = hostFailures.get(hostname).get() >
|
||||
getMaxHostFailures() ? true : false;
|
||||
|
||||
if (failures >= abortFailureLimit) {
|
||||
try {
|
||||
|
|
|
@ -213,6 +213,76 @@ public class TestShuffleScheduler {
|
|||
Assert.assertEquals(copyMessage(10, 1, 2), progress.toString());
|
||||
}
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
@Test
|
||||
public <K, V> void TestSucceedAndFailedCopyMap() throws Exception {
|
||||
JobConf job = new JobConf();
|
||||
job.setNumMapTasks(2);
|
||||
//mock creation
|
||||
TaskUmbilicalProtocol mockUmbilical = mock(TaskUmbilicalProtocol.class);
|
||||
Reporter mockReporter = mock(Reporter.class);
|
||||
FileSystem mockFileSystem = mock(FileSystem.class);
|
||||
Class<? extends org.apache.hadoop.mapred.Reducer> combinerClass = job.getCombinerClass();
|
||||
@SuppressWarnings("unchecked") // needed for mock with generic
|
||||
CombineOutputCollector<K, V> mockCombineOutputCollector =
|
||||
(CombineOutputCollector<K, V>) mock(CombineOutputCollector.class);
|
||||
org.apache.hadoop.mapreduce.TaskAttemptID mockTaskAttemptID =
|
||||
mock(org.apache.hadoop.mapreduce.TaskAttemptID.class);
|
||||
LocalDirAllocator mockLocalDirAllocator = mock(LocalDirAllocator.class);
|
||||
CompressionCodec mockCompressionCodec = mock(CompressionCodec.class);
|
||||
Counter mockCounter = mock(Counter.class);
|
||||
TaskStatus mockTaskStatus = mock(TaskStatus.class);
|
||||
Progress mockProgress = mock(Progress.class);
|
||||
MapOutputFile mockMapOutputFile = mock(MapOutputFile.class);
|
||||
Task mockTask = mock(Task.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
MapOutput<K, V> output = mock(MapOutput.class);
|
||||
|
||||
ShuffleConsumerPlugin.Context<K, V> context =
|
||||
new ShuffleConsumerPlugin.Context<K, V>(
|
||||
mockTaskAttemptID, job, mockFileSystem,
|
||||
mockUmbilical, mockLocalDirAllocator,
|
||||
mockReporter, mockCompressionCodec,
|
||||
combinerClass, mockCombineOutputCollector,
|
||||
mockCounter, mockCounter, mockCounter,
|
||||
mockCounter, mockCounter, mockCounter,
|
||||
mockTaskStatus, mockProgress, mockProgress,
|
||||
mockTask, mockMapOutputFile, null);
|
||||
TaskStatus status = new TaskStatus() {
|
||||
@Override
|
||||
public boolean getIsMap() {
|
||||
return false;
|
||||
}
|
||||
@Override
|
||||
public void addFetchFailedMap(TaskAttemptID mapTaskId) {
|
||||
}
|
||||
};
|
||||
Progress progress = new Progress();
|
||||
ShuffleSchedulerImpl<K, V> scheduler = new ShuffleSchedulerImpl<K, V>(job,
|
||||
status, null, null, progress, context.getShuffledMapsCounter(),
|
||||
context.getReduceShuffleBytes(), context.getFailedShuffleCounter());
|
||||
|
||||
MapHost host1 = new MapHost("host1", null);
|
||||
TaskAttemptID failedAttemptID = new TaskAttemptID(
|
||||
new org.apache.hadoop.mapred.TaskID(
|
||||
new JobID("test",0), TaskType.MAP, 0), 0);
|
||||
|
||||
TaskAttemptID succeedAttemptID = new TaskAttemptID(
|
||||
new org.apache.hadoop.mapred.TaskID(
|
||||
new JobID("test",0), TaskType.MAP, 1), 1);
|
||||
|
||||
// handle output fetch failure for failedAttemptID, part I
|
||||
scheduler.hostFailed(host1.getHostName());
|
||||
|
||||
// handle output fetch succeed for succeedAttemptID
|
||||
long bytes = (long)500 * 1024 * 1024;
|
||||
scheduler.copySucceeded(succeedAttemptID, host1, bytes, 0, 500000, output);
|
||||
|
||||
// handle output fetch failure for failedAttemptID, part II
|
||||
// for MAPREDUCE-6361: verify no NPE exception get thrown out
|
||||
scheduler.copyFailed(failedAttemptID, host1, true, false);
|
||||
}
|
||||
|
||||
private static String copyMessage(int attemptNo, double rate1, double rate2) {
|
||||
int attemptZero = attemptNo - 1;
|
||||
return String.format("copy task(attempt_test_0000_m_%06d_%d succeeded at %1.2f MB/s)"
|
||||
|
|
Loading…
Reference in New Issue