Merge -r 1333747:1333748 from trunk to branch. FIXES: MAPREDUCE-4205
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1333749 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c7a47f2b8f
commit
9a04fc6caf
|
@ -57,6 +57,9 @@ Release 2.0.0 - UNRELEASED
|
||||||
MAPREDUCE-4219. make default container-executor.conf.dir be a path
|
MAPREDUCE-4219. make default container-executor.conf.dir be a path
|
||||||
relative to the container-executor binary. (rvs via tucu)
|
relative to the container-executor binary. (rvs via tucu)
|
||||||
|
|
||||||
|
MAPREDUCE-4205. retrofit all JVM shutdown hooks to use ShutdownHookManager
|
||||||
|
(tucu)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -90,6 +90,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
import org.apache.hadoop.yarn.Clock;
|
import org.apache.hadoop.yarn.Clock;
|
||||||
import org.apache.hadoop.yarn.ClusterInfo;
|
import org.apache.hadoop.yarn.ClusterInfo;
|
||||||
import org.apache.hadoop.yarn.SystemClock;
|
import org.apache.hadoop.yarn.SystemClock;
|
||||||
|
@ -130,6 +131,11 @@ public class MRAppMaster extends CompositeService {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
|
private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Priority of the MRAppMaster shutdown hook.
|
||||||
|
*/
|
||||||
|
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
|
||||||
|
|
||||||
private Clock clock;
|
private Clock clock;
|
||||||
private final long startTime;
|
private final long startTime;
|
||||||
private final long appSubmitTime;
|
private final long appSubmitTime;
|
||||||
|
@ -990,8 +996,8 @@ public class MRAppMaster extends CompositeService {
|
||||||
new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
|
new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
|
||||||
Integer.parseInt(nodePortString),
|
Integer.parseInt(nodePortString),
|
||||||
Integer.parseInt(nodeHttpPortString), appSubmitTime);
|
Integer.parseInt(nodeHttpPortString), appSubmitTime);
|
||||||
Runtime.getRuntime().addShutdownHook(
|
ShutdownHookManager.get().addShutdownHook(
|
||||||
new MRAppMasterShutdownHook(appMaster));
|
new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
|
||||||
YarnConfiguration conf = new YarnConfiguration(new JobConf());
|
YarnConfiguration conf = new YarnConfiguration(new JobConf());
|
||||||
conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
|
conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
|
||||||
String jobUserName = System
|
String jobUserName = System
|
||||||
|
@ -1010,7 +1016,7 @@ public class MRAppMaster extends CompositeService {
|
||||||
|
|
||||||
// The shutdown hook that runs when a signal is received AND during normal
|
// The shutdown hook that runs when a signal is received AND during normal
|
||||||
// close of the JVM.
|
// close of the JVM.
|
||||||
static class MRAppMasterShutdownHook extends Thread {
|
static class MRAppMasterShutdownHook implements Runnable {
|
||||||
MRAppMaster appMaster;
|
MRAppMaster appMaster;
|
||||||
MRAppMasterShutdownHook(MRAppMaster appMaster) {
|
MRAppMasterShutdownHook(MRAppMaster appMaster) {
|
||||||
this.appMaster = appMaster;
|
this.appMaster = appMaster;
|
||||||
|
@ -1028,12 +1034,6 @@ public class MRAppMaster extends CompositeService {
|
||||||
appMaster.jobHistoryEventHandler.setSignalled(true);
|
appMaster.jobHistoryEventHandler.setSignalled(true);
|
||||||
}
|
}
|
||||||
appMaster.stop();
|
appMaster.stop();
|
||||||
try {
|
|
||||||
//Close all the FileSystem objects
|
|
||||||
FileSystem.closeAll();
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
LOG.warn("Failed to close all FileSystem objects", ioe);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.mapred.JobConf;
|
||||||
import org.apache.hadoop.mapreduce.MRConfig;
|
import org.apache.hadoop.mapreduce.MRConfig;
|
||||||
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -40,6 +41,12 @@ import org.apache.hadoop.yarn.service.CompositeService;
|
||||||
*
|
*
|
||||||
*****************************************************************/
|
*****************************************************************/
|
||||||
public class JobHistoryServer extends CompositeService {
|
public class JobHistoryServer extends CompositeService {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Priority of the JobHistoryServer shutdown hook.
|
||||||
|
*/
|
||||||
|
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(JobHistoryServer.class);
|
private static final Log LOG = LogFactory.getLog(JobHistoryServer.class);
|
||||||
private HistoryContext historyContext;
|
private HistoryContext historyContext;
|
||||||
private HistoryClientService clientService;
|
private HistoryClientService clientService;
|
||||||
|
@ -118,8 +125,9 @@ public class JobHistoryServer extends CompositeService {
|
||||||
StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG);
|
StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG);
|
||||||
try {
|
try {
|
||||||
JobHistoryServer jobHistoryServer = new JobHistoryServer();
|
JobHistoryServer jobHistoryServer = new JobHistoryServer();
|
||||||
Runtime.getRuntime().addShutdownHook(
|
ShutdownHookManager.get().addShutdownHook(
|
||||||
new CompositeServiceShutdownHook(jobHistoryServer));
|
new CompositeServiceShutdownHook(jobHistoryServer),
|
||||||
|
SHUTDOWN_HOOK_PRIORITY);
|
||||||
YarnConfiguration conf = new YarnConfiguration(new JobConf());
|
YarnConfiguration conf = new YarnConfiguration(new JobConf());
|
||||||
jobHistoryServer.init(conf);
|
jobHistoryServer.init(conf);
|
||||||
jobHistoryServer.start();
|
jobHistoryServer.start();
|
||||||
|
|
|
@ -107,12 +107,11 @@ public class CompositeService extends AbstractService {
|
||||||
* JVM Shutdown hook for CompositeService which will stop the give
|
* JVM Shutdown hook for CompositeService which will stop the give
|
||||||
* CompositeService gracefully in case of JVM shutdown.
|
* CompositeService gracefully in case of JVM shutdown.
|
||||||
*/
|
*/
|
||||||
public static class CompositeServiceShutdownHook extends Thread {
|
public static class CompositeServiceShutdownHook implements Runnable {
|
||||||
|
|
||||||
private CompositeService compositeService;
|
private CompositeService compositeService;
|
||||||
|
|
||||||
public CompositeServiceShutdownHook(CompositeService compositeService) {
|
public CompositeServiceShutdownHook(CompositeService compositeService) {
|
||||||
super("CompositeServiceShutdownHook for " + compositeService.getName());
|
|
||||||
this.compositeService = compositeService;
|
this.compositeService = compositeService;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
@ -54,6 +55,12 @@ import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
public class NodeManager extends CompositeService implements
|
public class NodeManager extends CompositeService implements
|
||||||
ServiceStateChangeListener {
|
ServiceStateChangeListener {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Priority of the NodeManager shutdown hook.
|
||||||
|
*/
|
||||||
|
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(NodeManager.class);
|
private static final Log LOG = LogFactory.getLog(NodeManager.class);
|
||||||
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
|
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
|
||||||
protected ContainerTokenSecretManager containerTokenSecretManager;
|
protected ContainerTokenSecretManager containerTokenSecretManager;
|
||||||
|
@ -250,11 +257,12 @@ public class NodeManager extends CompositeService implements
|
||||||
|
|
||||||
// Remove the old hook if we are rebooting.
|
// Remove the old hook if we are rebooting.
|
||||||
if (hasToReboot && null != nodeManagerShutdownHook) {
|
if (hasToReboot && null != nodeManagerShutdownHook) {
|
||||||
Runtime.getRuntime().removeShutdownHook(nodeManagerShutdownHook);
|
ShutdownHookManager.get().removeShutdownHook(nodeManagerShutdownHook);
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeManagerShutdownHook = new CompositeServiceShutdownHook(this);
|
nodeManagerShutdownHook = new CompositeServiceShutdownHook(this);
|
||||||
Runtime.getRuntime().addShutdownHook(nodeManagerShutdownHook);
|
ShutdownHookManager.get().addShutdownHook(nodeManagerShutdownHook,
|
||||||
|
SHUTDOWN_HOOK_PRIORITY);
|
||||||
|
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
this.init(conf);
|
this.init(conf);
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
|
import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
@ -87,6 +88,12 @@ import org.apache.hadoop.yarn.webapp.WebApps.Builder;
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public class ResourceManager extends CompositeService implements Recoverable {
|
public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Priority of the ResourceManager shutdown hook.
|
||||||
|
*/
|
||||||
|
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(ResourceManager.class);
|
private static final Log LOG = LogFactory.getLog(ResourceManager.class);
|
||||||
public static final long clusterTimeStamp = System.currentTimeMillis();
|
public static final long clusterTimeStamp = System.currentTimeMillis();
|
||||||
|
|
||||||
|
@ -613,8 +620,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
Store store = StoreFactory.getStore(conf);
|
Store store = StoreFactory.getStore(conf);
|
||||||
ResourceManager resourceManager = new ResourceManager(store);
|
ResourceManager resourceManager = new ResourceManager(store);
|
||||||
Runtime.getRuntime().addShutdownHook(
|
ShutdownHookManager.get().addShutdownHook(
|
||||||
new CompositeServiceShutdownHook(resourceManager));
|
new CompositeServiceShutdownHook(resourceManager),
|
||||||
|
SHUTDOWN_HOOK_PRIORITY);
|
||||||
resourceManager.init(conf);
|
resourceManager.init(conf);
|
||||||
//resourceManager.recover(store.restore());
|
//resourceManager.recover(store.restore());
|
||||||
//store.doneWithRecovery();
|
//store.doneWithRecovery();
|
||||||
|
|
|
@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.YarnException;
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
@ -34,6 +35,12 @@ import org.apache.hadoop.yarn.service.CompositeService;
|
||||||
* web interfaces.
|
* web interfaces.
|
||||||
*/
|
*/
|
||||||
public class WebAppProxyServer extends CompositeService {
|
public class WebAppProxyServer extends CompositeService {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Priority of the ResourceManager shutdown hook.
|
||||||
|
*/
|
||||||
|
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(WebAppProxyServer.class);
|
private static final Log LOG = LogFactory.getLog(WebAppProxyServer.class);
|
||||||
|
|
||||||
private WebAppProxy proxy = null;
|
private WebAppProxy proxy = null;
|
||||||
|
@ -69,8 +76,9 @@ public class WebAppProxyServer extends CompositeService {
|
||||||
StringUtils.startupShutdownMessage(WebAppProxyServer.class, args, LOG);
|
StringUtils.startupShutdownMessage(WebAppProxyServer.class, args, LOG);
|
||||||
try {
|
try {
|
||||||
WebAppProxyServer proxy = new WebAppProxyServer();
|
WebAppProxyServer proxy = new WebAppProxyServer();
|
||||||
Runtime.getRuntime().addShutdownHook(
|
ShutdownHookManager.get().addShutdownHook(
|
||||||
new CompositeServiceShutdownHook(proxy));
|
new CompositeServiceShutdownHook(proxy),
|
||||||
|
SHUTDOWN_HOOK_PRIORITY);
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
proxy.init(conf);
|
proxy.init(conf);
|
||||||
proxy.start();
|
proxy.start();
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.tools.CopyListing.*;
|
||||||
import org.apache.hadoop.tools.mapred.CopyMapper;
|
import org.apache.hadoop.tools.mapred.CopyMapper;
|
||||||
import org.apache.hadoop.tools.mapred.CopyOutputFormat;
|
import org.apache.hadoop.tools.mapred.CopyOutputFormat;
|
||||||
import org.apache.hadoop.tools.util.DistCpUtils;
|
import org.apache.hadoop.tools.util.DistCpUtils;
|
||||||
|
import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
|
||||||
|
@ -49,6 +50,12 @@ import java.util.Random;
|
||||||
* behaviour.
|
* behaviour.
|
||||||
*/
|
*/
|
||||||
public class DistCp extends Configured implements Tool {
|
public class DistCp extends Configured implements Tool {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Priority of the ResourceManager shutdown hook.
|
||||||
|
*/
|
||||||
|
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(DistCp.class);
|
private static final Log LOG = LogFactory.getLog(DistCp.class);
|
||||||
|
|
||||||
private DistCpOptions inputOptions;
|
private DistCpOptions inputOptions;
|
||||||
|
@ -353,7 +360,8 @@ public class DistCp extends Configured implements Tool {
|
||||||
DistCp distCp = new DistCp();
|
DistCp distCp = new DistCp();
|
||||||
Cleanup CLEANUP = new Cleanup(distCp);
|
Cleanup CLEANUP = new Cleanup(distCp);
|
||||||
|
|
||||||
Runtime.getRuntime().addShutdownHook(CLEANUP);
|
ShutdownHookManager.get().addShutdownHook(CLEANUP,
|
||||||
|
SHUTDOWN_HOOK_PRIORITY);
|
||||||
System.exit(ToolRunner.run(getDefaultConf(), distCp, argv));
|
System.exit(ToolRunner.run(getDefaultConf(), distCp, argv));
|
||||||
}
|
}
|
||||||
catch (Exception e) {
|
catch (Exception e) {
|
||||||
|
@ -388,7 +396,7 @@ public class DistCp extends Configured implements Tool {
|
||||||
return submitted;
|
return submitted;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class Cleanup extends Thread {
|
private static class Cleanup implements Runnable {
|
||||||
private final DistCp distCp;
|
private final DistCp distCp;
|
||||||
|
|
||||||
public Cleanup(DistCp distCp) {
|
public Cleanup(DistCp distCp) {
|
||||||
|
|
Loading…
Reference in New Issue