MAPREDUCE-6002. Made MR task avoid reporting error to AM when the task process is shutting down. Contributed by Wangda Tan.

svn merge --ignore-ancestry -c 1613743 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1613744 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhijie Shen 2014-07-27 01:41:44 +00:00
parent f1d0db87cd
commit 8cde949397
4 changed files with 36 additions and 13 deletions

View File

@ -179,6 +179,9 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5952. LocalContainerLauncher#renameMapOutputForReduce incorrectly MAPREDUCE-5952. LocalContainerLauncher#renameMapOutputForReduce incorrectly
assumes a single dir for mapOutIndex. (Gera Shegalov via kasha) assumes a single dir for mapOutIndex. (Gera Shegalov via kasha)
MAPREDUCE-6002. Made MR task avoid reporting error to AM when the task process
is shutting down. (Wangda Tan via zjshen)
Release 2.4.1 - 2014-06-23 Release 2.4.1 - 2014-06-23
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -31,6 +31,7 @@
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSError; import org.apache.hadoop.fs.FSError;
@ -57,6 +58,7 @@
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -406,7 +408,9 @@ private void runSubtask(org.apache.hadoop.mapred.Task task,
} catch (FSError e) { } catch (FSError e) {
LOG.fatal("FSError from child", e); LOG.fatal("FSError from child", e);
// umbilical: MRAppMaster creates (taskAttemptListener), passes to us // umbilical: MRAppMaster creates (taskAttemptListener), passes to us
if (!ShutdownHookManager.get().isShutdownInProgress()) {
umbilical.fsError(classicAttemptID, e.getMessage()); umbilical.fsError(classicAttemptID, e.getMessage());
}
throw new RuntimeException(); throw new RuntimeException();
} catch (Exception exception) { } catch (Exception exception) {
@ -429,11 +433,13 @@ private void runSubtask(org.apache.hadoop.mapred.Task task,
} catch (Throwable throwable) { } catch (Throwable throwable) {
LOG.fatal("Error running local (uberized) 'child' : " LOG.fatal("Error running local (uberized) 'child' : "
+ StringUtils.stringifyException(throwable)); + StringUtils.stringifyException(throwable));
if (!ShutdownHookManager.get().isShutdownInProgress()) {
Throwable tCause = throwable.getCause(); Throwable tCause = throwable.getCause();
String cause = (tCause == null) String cause =
? throwable.getMessage() (tCause == null) ? throwable.getMessage() : StringUtils
: StringUtils.stringifyException(tCause); .stringifyException(tCause);
umbilical.fatalError(classicAttemptID, cause); umbilical.fatalError(classicAttemptID, cause);
}
throw new RuntimeException(); throw new RuntimeException();
} }
} }

View File

@ -56,6 +56,7 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants;
@ -170,7 +171,9 @@ public Object run() throws Exception {
}); });
} catch (FSError e) { } catch (FSError e) {
LOG.fatal("FSError from child", e); LOG.fatal("FSError from child", e);
if (!ShutdownHookManager.get().isShutdownInProgress()) {
umbilical.fsError(taskid, e.getMessage()); umbilical.fsError(taskid, e.getMessage());
}
} catch (Exception exception) { } catch (Exception exception) {
LOG.warn("Exception running child : " LOG.warn("Exception running child : "
+ StringUtils.stringifyException(exception)); + StringUtils.stringifyException(exception));
@ -195,18 +198,23 @@ public Object run() throws Exception {
} }
// Report back any failures, for diagnostic purposes // Report back any failures, for diagnostic purposes
if (taskid != null) { if (taskid != null) {
umbilical.fatalError(taskid, StringUtils.stringifyException(exception)); if (!ShutdownHookManager.get().isShutdownInProgress()) {
umbilical.fatalError(taskid,
StringUtils.stringifyException(exception));
}
} }
} catch (Throwable throwable) { } catch (Throwable throwable) {
LOG.fatal("Error running child : " LOG.fatal("Error running child : "
+ StringUtils.stringifyException(throwable)); + StringUtils.stringifyException(throwable));
if (taskid != null) { if (taskid != null) {
if (!ShutdownHookManager.get().isShutdownInProgress()) {
Throwable tCause = throwable.getCause(); Throwable tCause = throwable.getCause();
String cause = tCause == null String cause =
? throwable.getMessage() tCause == null ? throwable.getMessage() : StringUtils
: StringUtils.stringifyException(tCause); .stringifyException(tCause);
umbilical.fatalError(taskid, cause); umbilical.fatalError(taskid, cause);
} }
}
} finally { } finally {
RPC.stopProxy(umbilical); RPC.stopProxy(umbilical);
DefaultMetricsSystem.shutdown(); DefaultMetricsSystem.shutdown();

View File

@ -66,6 +66,7 @@
import org.apache.hadoop.util.Progress; import org.apache.hadoop.util.Progress;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringInterner; import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -321,6 +322,11 @@ protected void setWriteSkipRecs(boolean writeSkipRecs) {
protected void reportFatalError(TaskAttemptID id, Throwable throwable, protected void reportFatalError(TaskAttemptID id, Throwable throwable,
String logMsg) { String logMsg) {
LOG.fatal(logMsg); LOG.fatal(logMsg);
if (ShutdownHookManager.get().isShutdownInProgress()) {
return;
}
Throwable tCause = throwable.getCause(); Throwable tCause = throwable.getCause();
String cause = tCause == null String cause = tCause == null
? StringUtils.stringifyException(throwable) ? StringUtils.stringifyException(throwable)