diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java index e2773257708..a3007ecd106 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java @@ -195,6 +195,7 @@ abstract public class Task implements Writable, Configurable { protected SecretKey tokenSecret; protected SecretKey shuffleSecret; protected GcTimeUpdater gcUpdater; + private boolean uberized = false; //////////////////////////////////////////// // Constructors @@ -785,9 +786,6 @@ abstract public class Task implements Writable, Configurable { long taskProgressInterval = MRJobConfUtil. getTaskProgressReportInterval(conf); - boolean uberized = conf.getBoolean("mapreduce.task.uberized", - false); - while (!taskDone.get()) { synchronized (lock) { done = false; @@ -1176,11 +1174,17 @@ abstract public class Task implements Writable, Configurable { public void statusUpdate(TaskUmbilicalProtocol umbilical) throws IOException { int retries = MAX_RETRIES; + while (true) { try { if (!umbilical.statusUpdate(getTaskID(), taskStatus)) { - LOG.warn("Parent died. Exiting "+taskId); - System.exit(66); + if (uberized) { + LOG.warn("Task no longer available: " + taskId); + break; + } else { + LOG.warn("Parent died. Exiting " + taskId); + ExitUtil.terminate(66); + } } taskStatus.clearStatus(); return; @@ -1393,6 +1397,8 @@ abstract public class Task implements Writable, Configurable { NetUtils.addStaticResolution(name, resolvedName); } } + + uberized = conf.getBoolean("mapreduce.task.uberized", false); } public Configuration getConf() { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTask.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTask.java new file mode 100644 index 00000000000..6bf06017615 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTask.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.ExitUtil.ExitException; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TestTask { + @Mock + private TaskUmbilicalProtocol umbilical; + + private Task task; + + @Before + public void setup() { + task = new StubTask(); + ExitUtil.disableSystemExit(); + } + + @Test + public void testStatusUpdateDoesNotExitInUberMode() throws Exception { + setupTest(true); + + task.statusUpdate(umbilical); + } + + @Test(expected = ExitException.class) + public void testStatusUpdateExitsInNonUberMode() throws Exception { + setupTest(false); + + task.statusUpdate(umbilical); + } + + private void setupTest(boolean uberized) + throws IOException, InterruptedException { + Configuration conf = new Configuration(false); + conf.setBoolean("mapreduce.task.uberized", uberized); + task.setConf(conf); + // (false, true) to avoid possible infinite loop + when(umbilical.statusUpdate(any(TaskAttemptID.class), + any(TaskStatus.class))).thenReturn(false, true); + } + + public class StubTask extends Task { + @Override + public void run(JobConf job, TaskUmbilicalProtocol umbilical) + throws IOException, ClassNotFoundException, InterruptedException { + // nop + } + + @Override + public boolean isMapTask() { + return false; + } + } +}