YARN-9923. Introduce HealthReporter interface to support multiple health checker files. Contributed by Adam Antal
This commit is contained in:
parent
7a87007545
commit
631dbbc6f2
|
@ -1966,24 +1966,37 @@ public class YarnConfiguration extends Configuration {
|
|||
*/
|
||||
public static final long DEFAULT_NM_MIN_PER_DISK_FREE_SPACE_MB = 0;
|
||||
|
||||
/** The health checker scripts. */
|
||||
public static final String NM_HEALTH_CHECK_SCRIPTS =
|
||||
NM_PREFIX + "health-checker.scripts";
|
||||
public static final String[] DEFAULT_NM_HEALTH_CHECK_SCRIPTS = {"script"};
|
||||
|
||||
/** Frequency of running node health script.*/
|
||||
public static final String NM_HEALTH_CHECK_INTERVAL_MS =
|
||||
NM_PREFIX + "health-checker.interval-ms";
|
||||
NM_PREFIX + "health-checker.interval-ms";
|
||||
public static final long DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS = 10 * 60 * 1000;
|
||||
|
||||
/** Health check time out period for all scripts.*/
|
||||
public static final String NM_HEALTH_CHECK_TIMEOUT_MS =
|
||||
NM_PREFIX + "health-checker.timeout-ms";
|
||||
public static final long DEFAULT_NM_HEALTH_CHECK_TIMEOUT_MS =
|
||||
2 * DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS;
|
||||
|
||||
/** Health check script time out period.*/
|
||||
public static final String NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS =
|
||||
NM_PREFIX + "health-checker.script.timeout-ms";
|
||||
public static final long DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS =
|
||||
2 * DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS;
|
||||
public static final String NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS_TEMPLATE =
|
||||
NM_PREFIX + "health-checker.%s.timeout-ms";
|
||||
|
||||
/** The health check script to run.*/
|
||||
public static final String NM_HEALTH_CHECK_SCRIPT_PATH =
|
||||
NM_PREFIX + "health-checker.script.path";
|
||||
public static final String NM_HEALTH_CHECK_SCRIPT_PATH_TEMPLATE =
|
||||
NM_PREFIX + "health-checker.%s.path";
|
||||
|
||||
/** The arguments to pass to the health check script.*/
|
||||
public static final String NM_HEALTH_CHECK_SCRIPT_OPTS =
|
||||
NM_PREFIX + "health-checker.script.opts";
|
||||
public static final String NM_HEALTH_CHECK_SCRIPT_OPTS_TEMPLATE =
|
||||
NM_PREFIX + "health-checker.%s.opts";
|
||||
|
||||
/** Frequency of running node health script. */
|
||||
public static final String NM_HEALTH_CHECK_SCRIPT_INTERVAL_MS_TEMPLATE =
|
||||
NM_PREFIX + "health-checker.%s.interval-ms";
|
||||
|
||||
/** The JVM options used on forking ContainerLocalizer process
|
||||
by container executor. */
|
||||
|
|
|
@ -1617,27 +1617,21 @@
|
|||
</property>
|
||||
|
||||
<property>
|
||||
<description>Frequency of running node health script.</description>
|
||||
<name>yarn.nodemanager.health-checker.interval-ms</name>
|
||||
<value>600000</value>
|
||||
<description>The nodemanager health check scripts to run.</description>
|
||||
<name>yarn.nodemanager.health-checker.scripts</name>
|
||||
<value>script</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>Script time out period.</description>
|
||||
<name>yarn.nodemanager.health-checker.script.timeout-ms</name>
|
||||
<description>Health check script time out period.</description>
|
||||
<name>yarn.nodemanager.health-checker.timeout-ms</name>
|
||||
<value>1200000</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The health check script to run.</description>
|
||||
<name>yarn.nodemanager.health-checker.script.path</name>
|
||||
<value></value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<description>The arguments to pass to the health check script.</description>
|
||||
<name>yarn.nodemanager.health-checker.script.opts</name>
|
||||
<value></value>
|
||||
<description>Frequency of running node health scripts.</description>
|
||||
<name>yarn.nodemanager.health-checker.interval-ms</name>
|
||||
<value>600000</value>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
|
|
|
@ -29,9 +29,11 @@ import java.util.Timer;
|
|||
import java.util.TimerTask;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
import org.apache.hadoop.util.DiskValidator;
|
||||
import org.apache.hadoop.util.DiskValidatorFactory;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.HealthReporter;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -42,7 +44,6 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.LocalDirAllocator;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
|
@ -54,7 +55,8 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
|||
* directories of a node. This specifically manages nodemanager-local-dirs and
|
||||
* nodemanager-log-dirs by periodically checking their health.
|
||||
*/
|
||||
public class LocalDirsHandlerService extends AbstractService {
|
||||
public class LocalDirsHandlerService extends AbstractService
|
||||
implements HealthReporter {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(LocalDirsHandlerService.class);
|
||||
|
@ -426,6 +428,11 @@ public class LocalDirsHandlerService extends AbstractService {
|
|||
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHealthReport() {
|
||||
return getDisksHealthReport(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* The minimum fraction of number of disks needed to be healthy for a node to
|
||||
* be considered healthy in terms of disks is configured using
|
||||
|
@ -457,10 +464,20 @@ public class LocalDirsHandlerService extends AbstractService {
|
|||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isHealthy() {
|
||||
return areDisksHealthy();
|
||||
}
|
||||
|
||||
public long getLastDisksCheckTime() {
|
||||
return lastDisksCheckTime;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastHealthReportTime() {
|
||||
return getLastDisksCheckTime();
|
||||
}
|
||||
|
||||
public boolean isGoodLocalDir(String path) {
|
||||
return isInGoodDirs(getLocalDirs(), path);
|
||||
}
|
||||
|
|
|
@ -1,123 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Strings;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.service.CompositeService;
|
||||
import org.apache.hadoop.util.NodeHealthScriptRunner;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
||||
/**
|
||||
* The class which provides functionality of checking the health of the node and
|
||||
* reporting back to the service for which the health checker has been asked to
|
||||
* report.
|
||||
*/
|
||||
public class NodeHealthCheckerService extends CompositeService {
|
||||
|
||||
private NodeHealthScriptRunner nodeHealthScriptRunner;
|
||||
private LocalDirsHandlerService dirsHandler;
|
||||
private Exception nodeHealthException;
|
||||
private long nodeHealthExceptionReportTime;
|
||||
|
||||
static final String SEPARATOR = ";";
|
||||
|
||||
public NodeHealthCheckerService(NodeHealthScriptRunner scriptRunner,
|
||||
LocalDirsHandlerService dirHandlerService) {
|
||||
super(NodeHealthCheckerService.class.getName());
|
||||
nodeHealthScriptRunner = scriptRunner;
|
||||
dirsHandler = dirHandlerService;
|
||||
nodeHealthException = null;
|
||||
nodeHealthExceptionReportTime = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
if (nodeHealthScriptRunner != null) {
|
||||
addService(nodeHealthScriptRunner);
|
||||
}
|
||||
addService(dirsHandler);
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the reporting string of health of the node
|
||||
*/
|
||||
String getHealthReport() {
|
||||
String scriptReport = Strings.emptyToNull(
|
||||
nodeHealthScriptRunner == null ? null :
|
||||
nodeHealthScriptRunner.getHealthReport());
|
||||
String discReport =
|
||||
Strings.emptyToNull(
|
||||
dirsHandler.getDisksHealthReport(false));
|
||||
String exceptionReport = Strings.emptyToNull(
|
||||
nodeHealthException == null ? null :
|
||||
nodeHealthException.getMessage());
|
||||
|
||||
return Joiner.on(SEPARATOR).skipNulls()
|
||||
.join(scriptReport, discReport, exceptionReport);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return <em>true</em> if the node is healthy
|
||||
*/
|
||||
boolean isHealthy() {
|
||||
boolean scriptHealthy = nodeHealthScriptRunner == null ||
|
||||
nodeHealthScriptRunner.isHealthy();
|
||||
return nodeHealthException == null &&
|
||||
scriptHealthy && dirsHandler.areDisksHealthy();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return when the last time the node health status is reported
|
||||
*/
|
||||
long getLastHealthReportTime() {
|
||||
return Collections.max(Arrays.asList(
|
||||
dirsHandler.getLastDisksCheckTime(),
|
||||
nodeHealthScriptRunner == null ? 0 :
|
||||
nodeHealthScriptRunner.getLastReportedTime(),
|
||||
nodeHealthExceptionReportTime));
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the disk handler
|
||||
*/
|
||||
public LocalDirsHandlerService getDiskHandler() {
|
||||
return dirsHandler;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the node health script runner
|
||||
*/
|
||||
NodeHealthScriptRunner getNodeHealthScriptRunner() {
|
||||
return nodeHealthScriptRunner;
|
||||
}
|
||||
|
||||
/**
|
||||
* Report an exception to mark the node as unhealthy.
|
||||
* @param ex the exception that makes the node unhealthy
|
||||
*/
|
||||
void reportException(Exception ex) {
|
||||
nodeHealthException = ex;
|
||||
nodeHealthExceptionReportTime = System.currentTimeMillis();
|
||||
}
|
||||
}
|
|
@ -33,7 +33,7 @@ import org.apache.hadoop.service.CompositeService;
|
|||
import org.apache.hadoop.util.ExitUtil;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
import org.apache.hadoop.util.JvmPauseMonitor;
|
||||
import org.apache.hadoop.util.NodeHealthScriptRunner;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.hadoop.util.ShutdownHookManager;
|
||||
|
@ -347,27 +347,6 @@ public class NodeManager extends CompositeService
|
|||
}
|
||||
}
|
||||
|
||||
public static NodeHealthScriptRunner getNodeHealthScriptRunner(Configuration conf) {
|
||||
String nodeHealthScript =
|
||||
conf.get(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_PATH);
|
||||
if(!NodeHealthScriptRunner.shouldRun(nodeHealthScript)) {
|
||||
LOG.info("Node Manager health check script is not available "
|
||||
+ "or doesn't have execute permission, so not "
|
||||
+ "starting the node health script runner.");
|
||||
return null;
|
||||
}
|
||||
long nmCheckintervalTime = conf.getLong(
|
||||
YarnConfiguration.NM_HEALTH_CHECK_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS);
|
||||
long scriptTimeout = conf.getLong(
|
||||
YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS,
|
||||
YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS);
|
||||
String[] scriptArgs = conf.getStrings(
|
||||
YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_OPTS, new String[] {});
|
||||
return new NodeHealthScriptRunner(nodeHealthScript,
|
||||
nmCheckintervalTime, scriptTimeout, scriptArgs);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected ResourcePluginManager createResourcePluginManager() {
|
||||
return new ResourcePluginManager();
|
||||
|
@ -431,12 +410,9 @@ public class NodeManager extends CompositeService
|
|||
// NodeManager level dispatcher
|
||||
this.dispatcher = createNMDispatcher();
|
||||
|
||||
nodeHealthChecker =
|
||||
new NodeHealthCheckerService(
|
||||
getNodeHealthScriptRunner(conf), dirsHandler);
|
||||
this.nodeHealthChecker = new NodeHealthCheckerService(dirsHandler);
|
||||
addService(nodeHealthChecker);
|
||||
|
||||
|
||||
((NMContext)context).setContainerExecutor(exec);
|
||||
((NMContext)context).setDeletionService(del);
|
||||
|
||||
|
|
|
@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePlugin;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.health;
|
||||
|
||||
/**
|
||||
* Simple {@link HealthReporter} implementation which reports whether a fatal
|
||||
* exception has happened in the NodeManager.
|
||||
*
|
||||
* See the <code>reportException</code> call of
|
||||
* {@link org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl}
|
||||
*/
|
||||
public class ExceptionReporter implements HealthReporter {
|
||||
private Exception nodeHealthException;
|
||||
private long nodeHealthExceptionReportTime;
|
||||
|
||||
ExceptionReporter() {
|
||||
this.nodeHealthException = null;
|
||||
this.nodeHealthExceptionReportTime = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized boolean isHealthy() {
|
||||
return nodeHealthException == null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized String getHealthReport() {
|
||||
return nodeHealthException == null ? null :
|
||||
nodeHealthException.getMessage();
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized long getLastHealthReportTime() {
|
||||
return nodeHealthExceptionReportTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Report an exception to mark the node as unhealthy.
|
||||
* @param ex the exception that makes the node unhealthy
|
||||
*/
|
||||
public synchronized void reportException(Exception ex) {
|
||||
nodeHealthException = ex;
|
||||
nodeHealthExceptionReportTime = System.currentTimeMillis();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.health;
|
||||
|
||||
/**
|
||||
* Interface providing information about the health of a service.
|
||||
*
|
||||
* Associated pieces of information:
|
||||
* <ul>
|
||||
* <li>whether the service is healthy ({@link #isHealthy()})</li>
|
||||
* <li>report of the healthiness ({@link #getHealthReport()})</li>
|
||||
* <li>latest timestamp of the health check
|
||||
* ({@link #getLastHealthReportTime()})</li>
|
||||
* </ul>
|
||||
*
|
||||
* Classes implementing this interface are used in
|
||||
* {@link NodeHealthCheckerService}.
|
||||
*
|
||||
* Developers are discouraged to implement new Java-based health scripts,
|
||||
* they should rather try to implement it as a script and use the
|
||||
* {@link NodeHealthScriptRunner} implementation.
|
||||
*
|
||||
* @see TimedHealthReporterService
|
||||
* @see org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService
|
||||
*/
|
||||
public interface HealthReporter {
|
||||
|
||||
/**
|
||||
* Gets whether the node is healthy or not.
|
||||
*
|
||||
* @return true if node is healthy
|
||||
*/
|
||||
boolean isHealthy();
|
||||
|
||||
/**
|
||||
* Returns output from health check. If node is healthy then an empty string
|
||||
* is returned.
|
||||
*
|
||||
* @return output from health check
|
||||
*/
|
||||
String getHealthReport();
|
||||
|
||||
/**
|
||||
* Returns time stamp when node health check was last run.
|
||||
*
|
||||
* @return timestamp when node health script was last run
|
||||
*/
|
||||
long getLastHealthReportTime();
|
||||
}
|
|
@ -0,0 +1,164 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.health;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Strings;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.service.CompositeService;
|
||||
import org.apache.hadoop.service.Service;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* This class provides functionality of checking the health of a node and
|
||||
* reporting back to the service for which the health checker has been asked to
|
||||
* report.
|
||||
*
|
||||
* It is a {@link CompositeService}: every {@link Service} must be registered
|
||||
* first in serviceInit, and should also implement the {@link HealthReporter}
|
||||
* interface - otherwise an exception is thrown.
|
||||
*
|
||||
* Calling functions of HealthReporter merge its dependent
|
||||
* services' reports.
|
||||
*
|
||||
* @see HealthReporter
|
||||
* @see LocalDirsHandlerService
|
||||
* @see TimedHealthReporterService
|
||||
*/
|
||||
public class NodeHealthCheckerService extends CompositeService
|
||||
implements HealthReporter {
|
||||
|
||||
public static final Logger LOG =
|
||||
LoggerFactory.getLogger(NodeHealthCheckerService.class);
|
||||
private static final int MAX_SCRIPTS = 4;
|
||||
|
||||
private List<HealthReporter> reporters;
|
||||
private LocalDirsHandlerService dirsHandler;
|
||||
private ExceptionReporter exceptionReporter;
|
||||
|
||||
public static final String SEPARATOR = ";";
|
||||
|
||||
public NodeHealthCheckerService(
|
||||
LocalDirsHandlerService dirHandlerService) {
|
||||
super(NodeHealthCheckerService.class.getName());
|
||||
|
||||
this.reporters = new ArrayList<>();
|
||||
this.dirsHandler = dirHandlerService;
|
||||
this.exceptionReporter = new ExceptionReporter();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
reporters.add(exceptionReporter);
|
||||
addHealthReporter(dirsHandler);
|
||||
String[] configuredScripts = conf.getTrimmedStrings(
|
||||
YarnConfiguration.NM_HEALTH_CHECK_SCRIPTS,
|
||||
YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_SCRIPTS);
|
||||
if (configuredScripts.length > MAX_SCRIPTS) {
|
||||
throw new IllegalArgumentException("Due to performance reasons " +
|
||||
"running more than " + MAX_SCRIPTS + "scripts is not allowed.");
|
||||
}
|
||||
for (String configuredScript : configuredScripts) {
|
||||
addHealthReporter(NodeHealthScriptRunner.newInstance(
|
||||
configuredScript, conf));
|
||||
}
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a {@link Service} implementing the {@link HealthReporter} interface,
|
||||
* if that service has not been added to this {@link CompositeService} yet.
|
||||
*
|
||||
* @param service to add
|
||||
* @throws Exception if not a {@link HealthReporter}
|
||||
* implementation is provided to this function
|
||||
*/
|
||||
@VisibleForTesting
|
||||
void addHealthReporter(Service service) throws Exception {
|
||||
if (service != null) {
|
||||
if (getServices().stream()
|
||||
.noneMatch(x -> x.getName().equals(service.getName()))) {
|
||||
if (!(service instanceof HealthReporter)) {
|
||||
throw new Exception("Attempted to add service to " +
|
||||
"NodeHealthCheckerService that does not implement " +
|
||||
"HealthReporter.");
|
||||
}
|
||||
reporters.add((HealthReporter) service);
|
||||
addService(service);
|
||||
} else {
|
||||
LOG.debug("Omitting duplicate service: {}.", service.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Joining the health reports of the dependent services.
|
||||
*
|
||||
* @return the report string about the health of the node
|
||||
*/
|
||||
@Override
|
||||
public String getHealthReport() {
|
||||
ArrayList<String> reports = reporters.stream()
|
||||
.map(reporter -> Strings.emptyToNull(reporter.getHealthReport()))
|
||||
.collect(Collectors.toCollection(ArrayList::new));
|
||||
return Joiner.on(SEPARATOR).skipNulls().join(reports);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return <em>true</em> if the node is healthy
|
||||
*/
|
||||
@Override
|
||||
public boolean isHealthy() {
|
||||
return reporters.stream().allMatch(HealthReporter::isHealthy);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return when the last time the node health status is reported
|
||||
*/
|
||||
@Override
|
||||
public long getLastHealthReportTime() {
|
||||
Optional<Long> max = reporters.stream()
|
||||
.map(HealthReporter::getLastHealthReportTime).max(Long::compareTo);
|
||||
return max.orElse(0L);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the disk handler
|
||||
*/
|
||||
public LocalDirsHandlerService getDiskHandler() {
|
||||
return dirsHandler;
|
||||
}
|
||||
|
||||
/**
|
||||
* Propagating an exception to {@link ExceptionReporter}.
|
||||
* @param exception the exception to propagate
|
||||
*/
|
||||
public void reportException(Exception exception) {
|
||||
exceptionReporter.reportException(exception);
|
||||
}
|
||||
}
|
|
@ -16,7 +16,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.util;
|
||||
package org.apache.hadoop.yarn.server.nodemanager.health;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -27,51 +27,95 @@ import java.util.TimerTask;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.util.Shell.ExitCodeException;
|
||||
import org.apache.hadoop.util.Shell.ShellCommandExecutor;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
*
|
||||
* The class which provides functionality of checking the health of the node
|
||||
* using the configured node health script and reporting back to the service
|
||||
* for which the health checker has been asked to report.
|
||||
*/
|
||||
public class NodeHealthScriptRunner extends AbstractService {
|
||||
public class NodeHealthScriptRunner extends TimedHealthReporterService {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(NodeHealthScriptRunner.class);
|
||||
|
||||
/** Absolute path to the health script. */
|
||||
private String nodeHealthScript;
|
||||
/** Delay after which node health script to be executed */
|
||||
private long intervalTime;
|
||||
/** Time after which the script should be timedout */
|
||||
/** Time after which the script should be timed out. */
|
||||
private long scriptTimeout;
|
||||
/** Timer used to schedule node health monitoring script execution */
|
||||
private Timer nodeHealthScriptScheduler;
|
||||
/** ShellCommandExecutor used to execute monitoring script. */
|
||||
private ShellCommandExecutor commandExecutor = null;
|
||||
|
||||
/** ShellCommandExecutor used to execute monitoring script */
|
||||
ShellCommandExecutor shexec = null;
|
||||
/** Pattern used for searching in the output of the node health script. */
|
||||
private static final String ERROR_PATTERN = "ERROR";
|
||||
|
||||
/** Pattern used for searching in the output of the node health script */
|
||||
static private final String ERROR_PATTERN = "ERROR";
|
||||
/** Time out error message. */
|
||||
static final String NODE_HEALTH_SCRIPT_TIMED_OUT_MSG =
|
||||
"Node health script timed out";
|
||||
|
||||
/** Time out error message */
|
||||
public static final String NODE_HEALTH_SCRIPT_TIMED_OUT_MSG = "Node health script timed out";
|
||||
private NodeHealthScriptRunner(String scriptName, long checkInterval,
|
||||
long timeout, String[] scriptArgs) {
|
||||
super(NodeHealthScriptRunner.class.getName(), checkInterval);
|
||||
this.nodeHealthScript = scriptName;
|
||||
this.scriptTimeout = timeout;
|
||||
setTimerTask(new NodeHealthMonitorExecutor(scriptArgs));
|
||||
}
|
||||
|
||||
private boolean isHealthy;
|
||||
public static NodeHealthScriptRunner newInstance(String scriptName,
|
||||
Configuration conf) {
|
||||
String nodeHealthScriptsConfig = String.format(
|
||||
YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_PATH_TEMPLATE, scriptName);
|
||||
String nodeHealthScript = conf.get(nodeHealthScriptsConfig);
|
||||
if (!shouldRun(scriptName, nodeHealthScript)) {
|
||||
return null;
|
||||
}
|
||||
|
||||
private String healthReport;
|
||||
// Determine check interval ms
|
||||
String checkIntervalMsConfig = String.format(
|
||||
YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_INTERVAL_MS_TEMPLATE,
|
||||
scriptName);
|
||||
long checkIntervalMs = conf.getLong(checkIntervalMsConfig, 0L);
|
||||
if (checkIntervalMs == 0L) {
|
||||
checkIntervalMs = conf.getLong(
|
||||
YarnConfiguration.NM_HEALTH_CHECK_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS);
|
||||
}
|
||||
if (checkIntervalMs < 0) {
|
||||
throw new IllegalArgumentException("The node health-checker's " +
|
||||
"interval-ms can not be set to a negative number.");
|
||||
}
|
||||
|
||||
private long lastReportedTime;
|
||||
// Determine time out
|
||||
String scriptTimeoutConfig = String.format(
|
||||
YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS_TEMPLATE,
|
||||
scriptName);
|
||||
long scriptTimeout = conf.getLong(scriptTimeoutConfig, 0L);
|
||||
if (scriptTimeout == 0L) {
|
||||
scriptTimeout = conf.getLong(
|
||||
YarnConfiguration.NM_HEALTH_CHECK_TIMEOUT_MS,
|
||||
YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_TIMEOUT_MS);
|
||||
}
|
||||
if (scriptTimeout <= 0) {
|
||||
throw new IllegalArgumentException("The node health-checker's " +
|
||||
"timeout can only be set to a positive number.");
|
||||
}
|
||||
|
||||
// Determine script arguments
|
||||
String scriptArgsConfig = String.format(
|
||||
YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_OPTS_TEMPLATE,
|
||||
scriptName);
|
||||
String[] scriptArgs = conf.getStrings(scriptArgsConfig, new String[]{});
|
||||
|
||||
return new NodeHealthScriptRunner(nodeHealthScript,
|
||||
checkIntervalMs, scriptTimeout, scriptArgs);
|
||||
}
|
||||
|
||||
private TimerTask timer;
|
||||
|
||||
private enum HealthCheckerExitStatus {
|
||||
SUCCESS,
|
||||
TIMED_OUT,
|
||||
|
@ -84,19 +128,17 @@ public class NodeHealthScriptRunner extends AbstractService {
|
|||
/**
|
||||
* Class which is used by the {@link Timer} class to periodically execute the
|
||||
* node health script.
|
||||
*
|
||||
*/
|
||||
private class NodeHealthMonitorExecutor extends TimerTask {
|
||||
private String exceptionStackTrace = "";
|
||||
|
||||
String exceptionStackTrace = "";
|
||||
|
||||
public NodeHealthMonitorExecutor(String[] args) {
|
||||
NodeHealthMonitorExecutor(String[] args) {
|
||||
ArrayList<String> execScript = new ArrayList<String>();
|
||||
execScript.add(nodeHealthScript);
|
||||
if (args != null) {
|
||||
execScript.addAll(Arrays.asList(args));
|
||||
}
|
||||
shexec = new ShellCommandExecutor(execScript
|
||||
commandExecutor = new ShellCommandExecutor(execScript
|
||||
.toArray(new String[execScript.size()]), null, null, scriptTimeout);
|
||||
}
|
||||
|
||||
|
@ -104,18 +146,18 @@ public class NodeHealthScriptRunner extends AbstractService {
|
|||
public void run() {
|
||||
HealthCheckerExitStatus status = HealthCheckerExitStatus.SUCCESS;
|
||||
try {
|
||||
shexec.execute();
|
||||
commandExecutor.execute();
|
||||
} catch (ExitCodeException e) {
|
||||
// ignore the exit code of the script
|
||||
status = HealthCheckerExitStatus.FAILED_WITH_EXIT_CODE;
|
||||
// On Windows, we will not hit the Stream closed IOException
|
||||
// thrown by stdout buffered reader for timeout event.
|
||||
if (Shell.WINDOWS && shexec.isTimedOut()) {
|
||||
if (Shell.WINDOWS && commandExecutor.isTimedOut()) {
|
||||
status = HealthCheckerExitStatus.TIMED_OUT;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Caught exception : " + e.getMessage());
|
||||
if (!shexec.isTimedOut()) {
|
||||
if (!commandExecutor.isTimedOut()) {
|
||||
status = HealthCheckerExitStatus.FAILED_WITH_EXCEPTION;
|
||||
} else {
|
||||
status = HealthCheckerExitStatus.TIMED_OUT;
|
||||
|
@ -123,7 +165,7 @@ public class NodeHealthScriptRunner extends AbstractService {
|
|||
exceptionStackTrace = StringUtils.stringifyException(e);
|
||||
} finally {
|
||||
if (status == HealthCheckerExitStatus.SUCCESS) {
|
||||
if (hasErrors(shexec.getOutput())) {
|
||||
if (hasErrors(commandExecutor.getOutput())) {
|
||||
status = HealthCheckerExitStatus.FAILED;
|
||||
}
|
||||
}
|
||||
|
@ -134,49 +176,49 @@ public class NodeHealthScriptRunner extends AbstractService {
|
|||
/**
|
||||
* Method which is used to parse output from the node health monitor and
|
||||
* send to the report address.
|
||||
*
|
||||
*
|
||||
* The timed out script or script which causes IOException output is
|
||||
* ignored.
|
||||
*
|
||||
*
|
||||
* The node is marked unhealthy if
|
||||
* <ol>
|
||||
* <li>The node health script times out</li>
|
||||
* <li>The node health scripts output has a line which begins with ERROR</li>
|
||||
* <li>The node health scripts output has a line which begins
|
||||
* with ERROR</li>
|
||||
* <li>An exception is thrown while executing the script</li>
|
||||
* </ol>
|
||||
* If the script throws {@link IOException} or {@link ExitCodeException} the
|
||||
* output is ignored and node is left remaining healthy, as script might
|
||||
* have syntax error.
|
||||
*
|
||||
*
|
||||
* @param status
|
||||
*/
|
||||
void reportHealthStatus(HealthCheckerExitStatus status) {
|
||||
long now = System.currentTimeMillis();
|
||||
switch (status) {
|
||||
case SUCCESS:
|
||||
setHealthStatus(true, "", now);
|
||||
break;
|
||||
case TIMED_OUT:
|
||||
setHealthStatus(false, NODE_HEALTH_SCRIPT_TIMED_OUT_MSG);
|
||||
break;
|
||||
case FAILED_WITH_EXCEPTION:
|
||||
setHealthStatus(false, exceptionStackTrace);
|
||||
break;
|
||||
case FAILED_WITH_EXIT_CODE:
|
||||
// see Javadoc above - we don't report bad health intentionally
|
||||
setHealthStatus(true, "", now);
|
||||
setHealthyWithoutReport();
|
||||
break;
|
||||
case TIMED_OUT:
|
||||
setUnhealthyWithReport(NODE_HEALTH_SCRIPT_TIMED_OUT_MSG);
|
||||
break;
|
||||
case FAILED_WITH_EXCEPTION:
|
||||
setUnhealthyWithReport(exceptionStackTrace);
|
||||
break;
|
||||
case FAILED:
|
||||
setHealthStatus(false, shexec.getOutput());
|
||||
setUnhealthyWithReport(commandExecutor.getOutput());
|
||||
break;
|
||||
default:
|
||||
LOG.warn("Unknown HealthCheckerExitStatus - ignored.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to check if the output string has line which begins with ERROR.
|
||||
*
|
||||
* @param output
|
||||
* string
|
||||
*
|
||||
* @param output the output of the node health script to process
|
||||
* @return true if output string has error pattern in it.
|
||||
*/
|
||||
private boolean hasErrors(String output) {
|
||||
|
@ -190,150 +232,46 @@ public class NodeHealthScriptRunner extends AbstractService {
|
|||
}
|
||||
}
|
||||
|
||||
public NodeHealthScriptRunner(String scriptName, long chkInterval, long timeout,
|
||||
String[] scriptArgs) {
|
||||
super(NodeHealthScriptRunner.class.getName());
|
||||
this.lastReportedTime = System.currentTimeMillis();
|
||||
this.isHealthy = true;
|
||||
this.healthReport = "";
|
||||
this.nodeHealthScript = scriptName;
|
||||
this.intervalTime = chkInterval;
|
||||
this.scriptTimeout = timeout;
|
||||
this.timer = new NodeHealthMonitorExecutor(scriptArgs);
|
||||
}
|
||||
|
||||
/*
|
||||
* Method which initializes the values for the script path and interval time.
|
||||
*/
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Method used to start the Node health monitoring.
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
nodeHealthScriptScheduler = new Timer("NodeHealthMonitor-Timer", true);
|
||||
// Start the timer task immediately and
|
||||
// then periodically at interval time.
|
||||
nodeHealthScriptScheduler.scheduleAtFixedRate(timer, 0, intervalTime);
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
/**
|
||||
* Method used to terminate the node health monitoring service.
|
||||
*
|
||||
*/
|
||||
@Override
|
||||
protected void serviceStop() {
|
||||
if (nodeHealthScriptScheduler != null) {
|
||||
nodeHealthScriptScheduler.cancel();
|
||||
}
|
||||
if (shexec != null) {
|
||||
Process p = shexec.getProcess();
|
||||
public void serviceStop() throws Exception {
|
||||
if (commandExecutor != null) {
|
||||
Process p = commandExecutor.getProcess();
|
||||
if (p != null) {
|
||||
p.destroy();
|
||||
}
|
||||
}
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the if the node is healthy or not
|
||||
*
|
||||
* @return true if node is healthy
|
||||
*/
|
||||
public boolean isHealthy() {
|
||||
return isHealthy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets if the node is healthy or not considering disks' health also.
|
||||
*
|
||||
* @param isHealthy
|
||||
* if or not node is healthy
|
||||
*/
|
||||
private synchronized void setHealthy(boolean isHealthy) {
|
||||
this.isHealthy = isHealthy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns output from health script. if node is healthy then an empty string
|
||||
* is returned.
|
||||
*
|
||||
* @return output from health script
|
||||
*/
|
||||
public String getHealthReport() {
|
||||
return healthReport;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the health report from the node health script. Also set the disks'
|
||||
* health info obtained from DiskHealthCheckerService.
|
||||
* Method used to determine whether the {@link NodeHealthScriptRunner}
|
||||
* should be started or not.<p>
|
||||
* Returns true if following conditions are met:
|
||||
*
|
||||
* @param healthReport
|
||||
*/
|
||||
private synchronized void setHealthReport(String healthReport) {
|
||||
this.healthReport = healthReport;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns time stamp when node health script was last run.
|
||||
*
|
||||
* @return timestamp when node health script was last run
|
||||
*/
|
||||
public long getLastReportedTime() {
|
||||
return lastReportedTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the last run time of the node health script.
|
||||
*
|
||||
* @param lastReportedTime
|
||||
*/
|
||||
private synchronized void setLastReportedTime(long lastReportedTime) {
|
||||
this.lastReportedTime = lastReportedTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Method used to determine if or not node health monitoring service should be
|
||||
* started or not. Returns true if following conditions are met:
|
||||
*
|
||||
* <ol>
|
||||
* <li>Path to Node health check script is not empty</li>
|
||||
* <li>Node health check script file exists</li>
|
||||
* </ol>
|
||||
*
|
||||
*
|
||||
* @return true if node health monitoring service can be started.
|
||||
*/
|
||||
public static boolean shouldRun(String healthScript) {
|
||||
static boolean shouldRun(String script, String healthScript) {
|
||||
if (healthScript == null || healthScript.trim().isEmpty()) {
|
||||
LOG.info("Missing location for the node health check script \"{}\".",
|
||||
script);
|
||||
return false;
|
||||
}
|
||||
File f = new File(healthScript);
|
||||
return f.exists() && FileUtil.canExecute(f);
|
||||
}
|
||||
|
||||
private synchronized void setHealthStatus(boolean isHealthy, String output) {
|
||||
LOG.info("health status being set as " + output);
|
||||
this.setHealthy(isHealthy);
|
||||
this.setHealthReport(output);
|
||||
}
|
||||
|
||||
private synchronized void setHealthStatus(boolean isHealthy, String output,
|
||||
long time) {
|
||||
LOG.info("health status being set as " + output);
|
||||
this.setHealthStatus(isHealthy, output);
|
||||
this.setLastReportedTime(time);
|
||||
}
|
||||
|
||||
/**
|
||||
* Used only by tests to access the timer task directly
|
||||
* @return the timer task
|
||||
*/
|
||||
public TimerTask getTimerTask() {
|
||||
return timer;
|
||||
if (!f.exists()) {
|
||||
LOG.warn("File {} for script \"{}\" does not exist.",
|
||||
healthScript, script);
|
||||
return false;
|
||||
}
|
||||
if (!FileUtil.canExecute(f)) {
|
||||
LOG.warn("File {} for script \"{}\" can not be executed.",
|
||||
healthScript, script);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,146 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager.health;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
|
||||
/**
|
||||
* A {@link HealthReporter} skeleton for regularly checking a specific
|
||||
* {@link TimerTask} and obtaining information about it.
|
||||
*
|
||||
* @see NodeHealthScriptRunner
|
||||
*/
|
||||
public abstract class TimedHealthReporterService extends AbstractService
|
||||
implements HealthReporter {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TimedHealthReporterService.class);
|
||||
|
||||
private boolean isHealthy;
|
||||
private String healthReport;
|
||||
private long lastReportedTime;
|
||||
|
||||
private Timer timer;
|
||||
private TimerTask task;
|
||||
private long intervalMs;
|
||||
|
||||
TimedHealthReporterService(String name, long intervalMs) {
|
||||
super(name);
|
||||
this.isHealthy = true;
|
||||
this.healthReport = "";
|
||||
this.lastReportedTime = System.currentTimeMillis();
|
||||
this.intervalMs = intervalMs;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setTimerTask(TimerTask timerTask) {
|
||||
task = timerTask;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
TimerTask getTimerTask() {
|
||||
return task;
|
||||
}
|
||||
|
||||
/**
|
||||
* Method used to start the health monitoring.
|
||||
*/
|
||||
@Override
|
||||
public void serviceStart() throws Exception {
|
||||
if (task == null) {
|
||||
throw new Exception("Health reporting task hasn't been set!");
|
||||
}
|
||||
timer = new Timer("HealthReporterService-Timer", true);
|
||||
timer.scheduleAtFixedRate(task, 0, intervalMs);
|
||||
super.serviceStart();
|
||||
}
|
||||
|
||||
/**
|
||||
* Method used to terminate the health monitoring service.
|
||||
*/
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
if (timer != null) {
|
||||
timer.cancel();
|
||||
}
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isHealthy() {
|
||||
return isHealthy;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets if the node is healthy or not.
|
||||
*
|
||||
* @param healthy whether the node is healthy
|
||||
*/
|
||||
protected synchronized void setHealthy(boolean healthy) {
|
||||
this.isHealthy = healthy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHealthReport() {
|
||||
return healthReport;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the health report from the node health check. Also set the disks'
|
||||
* health info obtained from DiskHealthCheckerService.
|
||||
*
|
||||
* @param report report String
|
||||
*/
|
||||
private synchronized void setHealthReport(String report) {
|
||||
this.healthReport = report;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastHealthReportTime() {
|
||||
return lastReportedTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the last run time of the node health check.
|
||||
*
|
||||
* @param lastReportedTime last reported time in long
|
||||
*/
|
||||
private synchronized void setLastReportedTime(long lastReportedTime) {
|
||||
this.lastReportedTime = lastReportedTime;
|
||||
}
|
||||
|
||||
synchronized void setHealthyWithoutReport() {
|
||||
this.setHealthy(true);
|
||||
this.setHealthReport("");
|
||||
this.setLastReportedTime(System.currentTimeMillis());
|
||||
}
|
||||
|
||||
synchronized void setUnhealthyWithReport(String output) {
|
||||
LOG.info("Health status being set as: \"" + output + "\".");
|
||||
this.setHealthy(false);
|
||||
this.setHealthReport(output);
|
||||
this.setLastReportedTime(System.currentTimeMillis());
|
||||
}
|
||||
}
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRe
|
|||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
|
||||
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRe
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UnRegisterNodeManagerResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
||||
|
@ -102,8 +103,8 @@ public class TestEventFlow {
|
|||
DeletionService del = new DeletionService(exec);
|
||||
Dispatcher dispatcher = new AsyncDispatcher();
|
||||
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
|
||||
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService(
|
||||
NodeManager.getNodeHealthScriptRunner(conf), dirsHandler);
|
||||
NodeHealthCheckerService healthChecker =
|
||||
new NodeHealthCheckerService(dirsHandler);
|
||||
healthChecker.init(conf);
|
||||
NodeManagerMetrics metrics = NodeManagerMetrics.create();
|
||||
NodeStatusUpdater nodeStatusUpdater =
|
||||
|
|
|
@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionMatcher;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
|
|
@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
|
||||
|
|
|
@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.junit.After;
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils.newNodeHeartbeatResponse;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
@ -107,6 +108,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||
|
@ -1994,4 +1996,21 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
|
|||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExceptionReported() {
|
||||
nm = new NodeManager();
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
nm.init(conf);
|
||||
NodeStatusUpdater nodeStatusUpdater = nm.getNodeStatusUpdater();
|
||||
NodeHealthCheckerService nodeHealthChecker = nm.getNodeHealthChecker();
|
||||
|
||||
assertThat(nodeHealthChecker.isHealthy()).isTrue();
|
||||
|
||||
String message = "exception message";
|
||||
Exception e = new Exception(message);
|
||||
nodeStatusUpdater.reportException(e);
|
||||
assertThat(nodeHealthChecker.isHealthy()).isFalse();
|
||||
assertThat(nodeHealthChecker.getHealthReport()).isEqualTo(message);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
|||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeAttributesProvider;
|
||||
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
|
||||
import org.junit.After;
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
|||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
|
||||
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
|
||||
import org.junit.After;
|
||||
|
|
|
@ -75,7 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalRMInterface;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||
|
@ -218,8 +218,7 @@ public abstract class BaseContainerManagerTest {
|
|||
delSrvc.init(conf);
|
||||
|
||||
dirsHandler = new LocalDirsHandlerService();
|
||||
nodeHealthChecker = new NodeHealthCheckerService(
|
||||
NodeManager.getNodeHealthScriptRunner(conf), dirsHandler);
|
||||
nodeHealthChecker = new NodeHealthCheckerService(dirsHandler);
|
||||
nodeHealthChecker.init(conf);
|
||||
containerManager = createContainerManager(delSrvc);
|
||||
((NMContext)context).setContainerManager(containerManager);
|
||||
|
|
|
@ -85,8 +85,7 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
|
@ -157,8 +156,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
|
|||
delSrvc.init(conf);
|
||||
exec = createContainerExecutor();
|
||||
dirsHandler = new LocalDirsHandlerService();
|
||||
nodeHealthChecker = new NodeHealthCheckerService(
|
||||
NodeManager.getNodeHealthScriptRunner(conf), dirsHandler);
|
||||
nodeHealthChecker = new NodeHealthCheckerService(dirsHandler);
|
||||
nodeHealthChecker.init(conf);
|
||||
|
||||
}
|
||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManagerTestBase;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||
|
@ -44,6 +43,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resource
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandler;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.linux.resources.ResourceHandlerChain;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.deviceframework.*;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.TestResourceUtils;
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.nodemanager.health;
|
||||
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* Tests for the {@link ExceptionReporter} class.
|
||||
*/
|
||||
public class TestExceptionReporter {
|
||||
@Test
|
||||
public void testUnhealthy() {
|
||||
ExceptionReporter reporter = new ExceptionReporter();
|
||||
assertThat(reporter.isHealthy()).isTrue();
|
||||
assertThat(reporter.getLastHealthReportTime()).isZero();
|
||||
|
||||
String message = "test";
|
||||
Exception exception = new Exception(message);
|
||||
reporter.reportException(exception);
|
||||
assertThat(reporter.isHealthy()).isFalse();
|
||||
assertThat(reporter.getHealthReport()).isEqualTo(message);
|
||||
assertThat(reporter.getLastHealthReportTime()).isNotEqualTo(0);
|
||||
}
|
||||
}
|
|
@ -16,12 +16,15 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
package org.apache.hadoop.yarn.server.nodemanager.health;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -31,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.util.NodeHealthScriptRunner;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
|
@ -42,58 +44,73 @@ import org.junit.Assert;
|
|||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
||||
public class TestNodeHealthService {
|
||||
/**
|
||||
* Test class for {@link NodeHealthCheckerService}.
|
||||
*/
|
||||
public class TestNodeHealthCheckerService {
|
||||
|
||||
private static volatile Logger LOG =
|
||||
LoggerFactory.getLogger(TestNodeHealthService.class);
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestNodeHealthCheckerService.class);
|
||||
|
||||
protected static File testRootDir = new File("target",
|
||||
TestNodeHealthService.class.getName() + "-localDir").getAbsoluteFile();
|
||||
private static final File TEST_ROOT_DIR = new File("target",
|
||||
TestNodeHealthCheckerService.class.getName() + "-localDir")
|
||||
.getAbsoluteFile();
|
||||
|
||||
final static File nodeHealthConfigFile = new File(testRootDir,
|
||||
private static final File NODE_HEALTH_CONFIG_FILE = new File(TEST_ROOT_DIR,
|
||||
"modified-mapred-site.xml");
|
||||
|
||||
private File nodeHealthscriptFile = new File(testRootDir,
|
||||
private File nodeHealthScriptFile = new File(TEST_ROOT_DIR,
|
||||
Shell.appendScriptExtension("failingscript"));
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
testRootDir.mkdirs();
|
||||
TEST_ROOT_DIR.mkdirs();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
if (testRootDir.exists()) {
|
||||
if (TEST_ROOT_DIR.exists()) {
|
||||
FileContext.getLocalFSFileContext().delete(
|
||||
new Path(testRootDir.getAbsolutePath()), true);
|
||||
new Path(TEST_ROOT_DIR.getAbsolutePath()), true);
|
||||
}
|
||||
}
|
||||
|
||||
private void writeNodeHealthScriptFile(String scriptStr, boolean setExecutable)
|
||||
throws IOException {
|
||||
PrintWriter pw = null;
|
||||
try {
|
||||
FileUtil.setWritable(nodeHealthscriptFile, true);
|
||||
FileUtil.setReadable(nodeHealthscriptFile, true);
|
||||
pw = new PrintWriter(new FileOutputStream(nodeHealthscriptFile));
|
||||
pw.println(scriptStr);
|
||||
pw.flush();
|
||||
} finally {
|
||||
pw.close();
|
||||
}
|
||||
FileUtil.setExecutable(nodeHealthscriptFile, setExecutable);
|
||||
}
|
||||
|
||||
private Configuration getConfForNodeHealthScript() {
|
||||
private void writeNodeHealthScriptFile() throws IOException,
|
||||
InterruptedException {
|
||||
try (PrintWriter pw = new PrintWriter(
|
||||
new FileOutputStream(nodeHealthScriptFile))) {
|
||||
FileUtil.chmod(nodeHealthScriptFile.getCanonicalPath(), "u+rwx");
|
||||
pw.println("");
|
||||
pw.flush();
|
||||
}
|
||||
}
|
||||
|
||||
private Configuration getConfForNodeHealthScript(String scriptName) {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_PATH,
|
||||
nodeHealthscriptFile.getAbsolutePath());
|
||||
conf.setLong(YarnConfiguration.NM_HEALTH_CHECK_INTERVAL_MS, 500);
|
||||
conf.setLong(
|
||||
YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS, 1000);
|
||||
conf.set(YarnConfiguration.NM_HEALTH_CHECK_SCRIPTS, scriptName);
|
||||
String timeoutConfig =
|
||||
String.format(
|
||||
YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS_TEMPLATE,
|
||||
scriptName);
|
||||
conf.setLong(timeoutConfig, 1000L);
|
||||
|
||||
String intervalConfig =
|
||||
String.format(
|
||||
YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_INTERVAL_MS_TEMPLATE,
|
||||
scriptName);
|
||||
conf.setLong(intervalConfig, 500L);
|
||||
|
||||
String pathConfig =
|
||||
String.format(
|
||||
YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_PATH_TEMPLATE,
|
||||
scriptName);
|
||||
conf.set(pathConfig, nodeHealthScriptFile.getAbsolutePath());
|
||||
|
||||
return conf;
|
||||
}
|
||||
|
||||
|
@ -109,16 +126,22 @@ public class TestNodeHealthService {
|
|||
RecordFactory factory = RecordFactoryProvider.getRecordFactory(null);
|
||||
NodeHealthStatus healthStatus =
|
||||
factory.newRecordInstance(NodeHealthStatus.class);
|
||||
Configuration conf = getConfForNodeHealthScript();
|
||||
conf.writeXml(new FileOutputStream(nodeHealthConfigFile));
|
||||
conf.addResource(nodeHealthConfigFile.getName());
|
||||
writeNodeHealthScriptFile("", true);
|
||||
String scriptName = "test";
|
||||
Configuration conf = getConfForNodeHealthScript(scriptName);
|
||||
conf.writeXml(new FileOutputStream(NODE_HEALTH_CONFIG_FILE));
|
||||
conf.addResource(NODE_HEALTH_CONFIG_FILE.getName());
|
||||
writeNodeHealthScriptFile();
|
||||
|
||||
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
|
||||
NodeHealthScriptRunner nodeHealthScriptRunner =
|
||||
spy(NodeManager.getNodeHealthScriptRunner(conf));
|
||||
NodeHealthCheckerService nodeHealthChecker = new NodeHealthCheckerService(
|
||||
nodeHealthScriptRunner, dirsHandler);
|
||||
NodeHealthScriptRunner.newInstance(scriptName, conf);
|
||||
if (nodeHealthScriptRunner == null) {
|
||||
fail("Should have created NodeHealthScriptRunner instance");
|
||||
}
|
||||
nodeHealthScriptRunner = spy(nodeHealthScriptRunner);
|
||||
NodeHealthCheckerService nodeHealthChecker =
|
||||
new NodeHealthCheckerService(dirsHandler);
|
||||
nodeHealthChecker.addHealthReporter(nodeHealthScriptRunner);
|
||||
nodeHealthChecker.init(conf);
|
||||
|
||||
doReturn(true).when(nodeHealthScriptRunner).isHealthy();
|
||||
|
@ -133,7 +156,7 @@ public class TestNodeHealthService {
|
|||
Assert.assertTrue("Node health status reported unhealthy", healthStatus
|
||||
.getHealthReport().equals(nodeHealthChecker.getHealthReport()));
|
||||
|
||||
doReturn(false).when(nodeHealthScriptRunner).isHealthy();
|
||||
doReturn(false).when(nodeHealthScriptRunner).isHealthy();
|
||||
// update health status
|
||||
setHealthStatus(healthStatus, nodeHealthChecker.isHealthy(),
|
||||
nodeHealthChecker.getHealthReport(),
|
||||
|
@ -174,4 +197,63 @@ public class TestNodeHealthService {
|
|||
.getDisksHealthReport(false))
|
||||
)));
|
||||
}
|
||||
|
||||
private abstract class HealthReporterService extends AbstractService
|
||||
implements HealthReporter {
|
||||
HealthReporterService() {
|
||||
super(HealthReporterService.class.getName());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCustomHealthReporter() throws Exception {
|
||||
String healthReport = "dummy health report";
|
||||
HealthReporterService customHealthReporter = new HealthReporterService() {
|
||||
private int counter = 0;
|
||||
|
||||
@Override
|
||||
public boolean isHealthy() {
|
||||
return counter++ % 2 == 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getHealthReport() {
|
||||
return healthReport;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLastHealthReportTime() {
|
||||
return Long.MAX_VALUE;
|
||||
}
|
||||
};
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
|
||||
NodeHealthCheckerService nodeHealthChecker =
|
||||
new NodeHealthCheckerService(dirsHandler);
|
||||
nodeHealthChecker.addHealthReporter(customHealthReporter);
|
||||
nodeHealthChecker.init(conf);
|
||||
|
||||
assertThat(nodeHealthChecker.isHealthy()).isTrue();
|
||||
assertThat(nodeHealthChecker.isHealthy()).isFalse();
|
||||
assertThat(nodeHealthChecker.getHealthReport()).isEqualTo(healthReport);
|
||||
assertThat(nodeHealthChecker.getLastHealthReportTime())
|
||||
.isEqualTo(Long.MAX_VALUE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExceptionReported() {
|
||||
Configuration conf = new Configuration();
|
||||
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
|
||||
NodeHealthCheckerService nodeHealthChecker =
|
||||
new NodeHealthCheckerService(dirsHandler);
|
||||
nodeHealthChecker.init(conf);
|
||||
assertThat(nodeHealthChecker.isHealthy()).isTrue();
|
||||
|
||||
String message = "An exception was thrown.";
|
||||
Exception exception = new Exception(message);
|
||||
nodeHealthChecker.reportException(exception);
|
||||
assertThat(nodeHealthChecker.isHealthy()).isFalse();
|
||||
assertThat(nodeHealthChecker.getHealthReport()).isEqualTo(message);
|
||||
}
|
||||
}
|
|
@ -16,7 +16,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.util;
|
||||
package org.apache.hadoop.yarn.server.nodemanager.health;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
|
@ -28,14 +28,22 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.util.Shell;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
* Test class for {@link NodeHealthScriptRunner}.
|
||||
*/
|
||||
public class TestNodeHealthScriptRunner {
|
||||
|
||||
protected static File testRootDir = new File("target",
|
||||
private static File testRootDir = new File("target",
|
||||
TestNodeHealthScriptRunner.class.getName() +
|
||||
"-localDir").getAbsoluteFile();
|
||||
|
||||
|
@ -55,8 +63,8 @@ public class TestNodeHealthScriptRunner {
|
|||
}
|
||||
}
|
||||
|
||||
private void writeNodeHealthScriptFile(String scriptStr, boolean setExecutable)
|
||||
throws IOException {
|
||||
private void writeNodeHealthScriptFile(String scriptStr,
|
||||
boolean setExecutable) throws IOException {
|
||||
PrintWriter pw = null;
|
||||
try {
|
||||
FileUtil.setWritable(nodeHealthscriptFile, true);
|
||||
|
@ -70,20 +78,46 @@ public class TestNodeHealthScriptRunner {
|
|||
FileUtil.setExecutable(nodeHealthscriptFile, setExecutable);
|
||||
}
|
||||
|
||||
private NodeHealthScriptRunner createNodeHealthScript() {
|
||||
String scriptName = "custom";
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.set(YarnConfiguration.NM_HEALTH_CHECK_SCRIPTS, scriptName);
|
||||
String timeoutConfig =
|
||||
String.format(
|
||||
YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS_TEMPLATE,
|
||||
scriptName);
|
||||
conf.setLong(timeoutConfig, 1000L);
|
||||
|
||||
String intervalConfig =
|
||||
String.format(
|
||||
YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_INTERVAL_MS_TEMPLATE,
|
||||
scriptName);
|
||||
conf.setLong(intervalConfig, 500L);
|
||||
|
||||
String pathConfig =
|
||||
String.format(
|
||||
YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_PATH_TEMPLATE,
|
||||
scriptName);
|
||||
conf.set(pathConfig, nodeHealthscriptFile.getAbsolutePath());
|
||||
|
||||
return NodeHealthScriptRunner.newInstance("custom", conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNodeHealthScriptShouldRun() throws IOException {
|
||||
Assert.assertFalse("Node health script should start",
|
||||
NodeHealthScriptRunner.shouldRun(
|
||||
assertFalse("Node health script should start",
|
||||
NodeHealthScriptRunner.shouldRun("script",
|
||||
nodeHealthscriptFile.getAbsolutePath()));
|
||||
writeNodeHealthScriptFile("", false);
|
||||
// Node health script should not start if the node health script is not
|
||||
// executable.
|
||||
Assert.assertFalse("Node health script should start",
|
||||
NodeHealthScriptRunner.shouldRun(
|
||||
assertFalse("Node health script should start",
|
||||
NodeHealthScriptRunner.shouldRun("script",
|
||||
nodeHealthscriptFile.getAbsolutePath()));
|
||||
writeNodeHealthScriptFile("", true);
|
||||
Assert.assertTrue("Node health script should start",
|
||||
NodeHealthScriptRunner.shouldRun(
|
||||
assertTrue("Node health script should start",
|
||||
NodeHealthScriptRunner.shouldRun("script",
|
||||
nodeHealthscriptFile.getAbsolutePath()));
|
||||
}
|
||||
|
||||
|
@ -92,54 +126,53 @@ public class TestNodeHealthScriptRunner {
|
|||
String errorScript = "echo ERROR\n echo \"Tracker not healthy\"";
|
||||
String normalScript = "echo \"I am all fine\"";
|
||||
String timeOutScript =
|
||||
Shell.WINDOWS ? "@echo off\nping -n 4 127.0.0.1 >nul\necho \"I am fine\""
|
||||
: "sleep 4\necho \"I am fine\"";
|
||||
Shell.WINDOWS ?
|
||||
"@echo off\nping -n 4 127.0.0.1 >nul\necho \"I am fine\""
|
||||
: "sleep 4\necho \"I am fine\"";
|
||||
String exitCodeScript = "exit 127";
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
writeNodeHealthScriptFile(normalScript, true);
|
||||
NodeHealthScriptRunner nodeHealthScriptRunner = new NodeHealthScriptRunner(
|
||||
nodeHealthscriptFile.getAbsolutePath(),
|
||||
500, 1000, new String[] {});
|
||||
NodeHealthScriptRunner nodeHealthScriptRunner = createNodeHealthScript();
|
||||
nodeHealthScriptRunner.init(conf);
|
||||
TimerTask timerTask = nodeHealthScriptRunner.getTimerTask();
|
||||
|
||||
timerTask.run();
|
||||
// Normal Script runs successfully
|
||||
Assert.assertTrue("Node health status reported unhealthy",
|
||||
assertTrue("Node health status reported unhealthy",
|
||||
nodeHealthScriptRunner.isHealthy());
|
||||
Assert.assertEquals("", nodeHealthScriptRunner.getHealthReport());
|
||||
assertTrue(nodeHealthScriptRunner.getHealthReport().isEmpty());
|
||||
|
||||
// Error script.
|
||||
writeNodeHealthScriptFile(errorScript, true);
|
||||
// Run timer
|
||||
timerTask.run();
|
||||
Assert.assertFalse("Node health status reported healthy",
|
||||
assertFalse("Node health status reported healthy",
|
||||
nodeHealthScriptRunner.isHealthy());
|
||||
Assert.assertTrue(
|
||||
assertTrue(
|
||||
nodeHealthScriptRunner.getHealthReport().contains("ERROR"));
|
||||
|
||||
|
||||
// Healthy script.
|
||||
writeNodeHealthScriptFile(normalScript, true);
|
||||
timerTask.run();
|
||||
Assert.assertTrue("Node health status reported unhealthy",
|
||||
assertTrue("Node health status reported unhealthy",
|
||||
nodeHealthScriptRunner.isHealthy());
|
||||
Assert.assertEquals("", nodeHealthScriptRunner.getHealthReport());
|
||||
assertTrue(nodeHealthScriptRunner.getHealthReport().isEmpty());
|
||||
|
||||
// Timeout script.
|
||||
writeNodeHealthScriptFile(timeOutScript, true);
|
||||
timerTask.run();
|
||||
Assert.assertFalse("Node health status reported healthy even after timeout",
|
||||
nodeHealthScriptRunner.isHealthy());
|
||||
Assert.assertEquals(
|
||||
NodeHealthScriptRunner.NODE_HEALTH_SCRIPT_TIMED_OUT_MSG,
|
||||
nodeHealthScriptRunner.getHealthReport());
|
||||
assertFalse("Node health status reported healthy even after timeout",
|
||||
nodeHealthScriptRunner.isHealthy());
|
||||
assertEquals(
|
||||
NodeHealthScriptRunner.NODE_HEALTH_SCRIPT_TIMED_OUT_MSG,
|
||||
nodeHealthScriptRunner.getHealthReport());
|
||||
|
||||
// Exit code 127
|
||||
writeNodeHealthScriptFile(exitCodeScript, true);
|
||||
timerTask.run();
|
||||
Assert.assertTrue("Node health status reported unhealthy",
|
||||
assertTrue("Node health status reported unhealthy",
|
||||
nodeHealthScriptRunner.isHealthy());
|
||||
Assert.assertEquals("", nodeHealthScriptRunner.getHealthReport());
|
||||
assertEquals("", nodeHealthScriptRunner.getHealthReport());
|
||||
}
|
||||
}
|
|
@ -43,7 +43,6 @@ import org.apache.hadoop.fs.FileUtil;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.nativeio.NativeIO;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.NodeHealthScriptRunner;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -57,7 +56,6 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
|||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
|
@ -65,6 +63,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.ContainerLogsPage.ContainersLogsBlock;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
|
@ -79,10 +78,9 @@ import com.google.inject.Module;
|
|||
|
||||
public class TestContainerLogsPage {
|
||||
|
||||
private NodeHealthCheckerService createNodeHealthCheckerService(Configuration conf) {
|
||||
NodeHealthScriptRunner scriptRunner = NodeManager.getNodeHealthScriptRunner(conf);
|
||||
private NodeHealthCheckerService createNodeHealthCheckerService() {
|
||||
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
|
||||
return new NodeHealthCheckerService(scriptRunner, dirsHandler);
|
||||
return new NodeHealthCheckerService(dirsHandler);
|
||||
}
|
||||
|
||||
@Test(timeout=30000)
|
||||
|
@ -92,7 +90,7 @@ public class TestContainerLogsPage {
|
|||
String logdirwithFile = absLogDir.toURI().toString();
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, logdirwithFile);
|
||||
NodeHealthCheckerService healthChecker = createNodeHealthCheckerService(conf);
|
||||
NodeHealthCheckerService healthChecker = createNodeHealthCheckerService();
|
||||
healthChecker.init(conf);
|
||||
LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
|
||||
NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler,
|
||||
|
@ -215,7 +213,7 @@ public class TestContainerLogsPage {
|
|||
"kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
|
||||
NodeHealthCheckerService healthChecker = createNodeHealthCheckerService(conf);
|
||||
NodeHealthCheckerService healthChecker = createNodeHealthCheckerService();
|
||||
healthChecker.init(conf);
|
||||
LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
|
||||
// Add an application and the corresponding containers
|
||||
|
|
|
@ -20,14 +20,13 @@ package org.apache.hadoop.yarn.server.nodemanager.webapp;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.util.NodeHealthScriptRunner;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.eclipse.jetty.websocket.api.Session;
|
||||
import org.eclipse.jetty.websocket.api.UpgradeRequest;
|
||||
|
@ -105,8 +104,7 @@ public class TestNMContainerWebSocket {
|
|||
};
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, TESTROOTDIR.getAbsolutePath());
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
|
||||
NodeHealthCheckerService healthChecker = createNodeHealthCheckerService(
|
||||
conf);
|
||||
NodeHealthCheckerService healthChecker = createNodeHealthCheckerService();
|
||||
healthChecker.init(conf);
|
||||
LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
|
||||
conf.set(YarnConfiguration.NM_WEBAPP_ADDRESS, webAddr);
|
||||
|
@ -120,12 +118,9 @@ public class TestNMContainerWebSocket {
|
|||
}
|
||||
}
|
||||
|
||||
private NodeHealthCheckerService createNodeHealthCheckerService(
|
||||
Configuration conf) {
|
||||
NodeHealthScriptRunner scriptRunner = NodeManager.getNodeHealthScriptRunner(
|
||||
conf);
|
||||
private NodeHealthCheckerService createNodeHealthCheckerService() {
|
||||
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
|
||||
return new NodeHealthCheckerService(scriptRunner, dirsHandler);
|
||||
return new NodeHealthCheckerService(dirsHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -28,7 +28,6 @@ import java.io.Writer;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.util.NodeHealthScriptRunner;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -42,19 +41,19 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
|||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -79,10 +78,9 @@ public class TestNMWebServer {
|
|||
FileUtil.fullyDelete(testLogDir);
|
||||
}
|
||||
|
||||
private NodeHealthCheckerService createNodeHealthCheckerService(Configuration conf) {
|
||||
NodeHealthScriptRunner scriptRunner = NodeManager.getNodeHealthScriptRunner(conf);
|
||||
private NodeHealthCheckerService createNodeHealthCheckerService() {
|
||||
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
|
||||
return new NodeHealthCheckerService(scriptRunner, dirsHandler);
|
||||
return new NodeHealthCheckerService(dirsHandler);
|
||||
}
|
||||
|
||||
private int startNMWebAppServer(String webAddr) {
|
||||
|
@ -113,7 +111,7 @@ public class TestNMWebServer {
|
|||
};
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
|
||||
NodeHealthCheckerService healthChecker = createNodeHealthCheckerService(conf);
|
||||
NodeHealthCheckerService healthChecker = createNodeHealthCheckerService();
|
||||
healthChecker.init(conf);
|
||||
LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
|
||||
conf.set(YarnConfiguration.NM_WEBAPP_ADDRESS, webAddr);
|
||||
|
@ -176,7 +174,7 @@ public class TestNMWebServer {
|
|||
};
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
|
||||
NodeHealthCheckerService healthChecker = createNodeHealthCheckerService(conf);
|
||||
NodeHealthCheckerService healthChecker = createNodeHealthCheckerService();
|
||||
healthChecker.init(conf);
|
||||
LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
|
||||
|
||||
|
|
|
@ -47,7 +47,6 @@ import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo;
|
|||
import org.apache.hadoop.yarn.logaggregation.TestContainerLogsUtils;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
|
||||
|
@ -57,6 +56,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.AssignedGpuDevice;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.gpu.GpuDevice;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer.NMWebApp;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.NMResourceInfo;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.gpu.GpuDeviceInformation;
|
||||
|
@ -141,8 +141,8 @@ public class TestNMWebServices extends JerseyTestBase {
|
|||
conf.set(YarnConfiguration.YARN_LOG_SERVER_WEBSERVICE_URL,
|
||||
LOGSERVICEWSADDR);
|
||||
dirsHandler = new LocalDirsHandlerService();
|
||||
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService(
|
||||
NodeManager.getNodeHealthScriptRunner(conf), dirsHandler);
|
||||
NodeHealthCheckerService healthChecker =
|
||||
new NodeHealthCheckerService(dirsHandler);
|
||||
healthChecker.init(conf);
|
||||
aclsManager = new ApplicationACLsManager(conf);
|
||||
nmContext = new NodeManager.NMContext(null, null, dirsHandler,
|
||||
|
|
|
@ -47,13 +47,13 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer.NMWebApp;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.dao.AppsInfo;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
|
@ -104,8 +104,8 @@ public class TestNMWebServicesApps extends JerseyTestBase {
|
|||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
|
||||
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
|
||||
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService(
|
||||
NodeManager.getNodeHealthScriptRunner(conf), dirsHandler);
|
||||
NodeHealthCheckerService healthChecker =
|
||||
new NodeHealthCheckerService(dirsHandler);
|
||||
healthChecker.init(conf);
|
||||
dirsHandler = healthChecker.getDiskHandler();
|
||||
aclsManager = new ApplicationACLsManager(conf);
|
||||
|
|
|
@ -44,11 +44,11 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.records.AuxServiceRecord;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer.NMWebApp;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||
|
@ -124,8 +124,8 @@ public class TestNMWebServicesAuxServices extends JerseyTestBase {
|
|||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
|
||||
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
|
||||
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService(
|
||||
NodeManager.getNodeHealthScriptRunner(conf), dirsHandler);
|
||||
NodeHealthCheckerService healthChecker =
|
||||
new NodeHealthCheckerService(dirsHandler);
|
||||
healthChecker.init(conf);
|
||||
dirsHandler = healthChecker.getDiskHandler();
|
||||
ApplicationACLsManager aclsManager = new ApplicationACLsManager(conf);
|
||||
|
|
|
@ -28,7 +28,6 @@ import static org.junit.Assert.fail;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.StringReader;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -48,16 +47,15 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer.NMWebApp;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
|
||||
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
|
||||
|
@ -131,8 +129,8 @@ public class TestNMWebServicesContainers extends JerseyTestBase {
|
|||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
|
||||
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
|
||||
NodeHealthCheckerService healthChecker = new NodeHealthCheckerService(
|
||||
NodeManager.getNodeHealthScriptRunner(conf), dirsHandler);
|
||||
NodeHealthCheckerService healthChecker =
|
||||
new NodeHealthCheckerService(dirsHandler);
|
||||
healthChecker.init(conf);
|
||||
dirsHandler = healthChecker.getDiskHandler();
|
||||
aclsManager = new ApplicationACLsManager(conf);
|
||||
|
|
|
@ -26,13 +26,12 @@ import javax.ws.rs.core.MediaType;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.http.JettyUtils;
|
||||
import org.apache.hadoop.util.NodeHealthScriptRunner;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
@ -54,12 +53,9 @@ public class TestNMWebTerminal {
|
|||
private WebServer server;
|
||||
private int port;
|
||||
|
||||
private NodeHealthCheckerService createNodeHealthCheckerService(
|
||||
Configuration conf) {
|
||||
NodeHealthScriptRunner scriptRunner = NodeManager
|
||||
.getNodeHealthScriptRunner(conf);
|
||||
private NodeHealthCheckerService createNodeHealthCheckerService() {
|
||||
LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
|
||||
return new NodeHealthCheckerService(scriptRunner, dirsHandler);
|
||||
return new NodeHealthCheckerService(dirsHandler);
|
||||
}
|
||||
|
||||
private int startNMWebAppServer(String webAddr) {
|
||||
|
@ -90,7 +86,7 @@ public class TestNMWebTerminal {
|
|||
};
|
||||
conf.set(YarnConfiguration.NM_LOCAL_DIRS, TESTROOTDIR.getAbsolutePath());
|
||||
conf.set(YarnConfiguration.NM_LOG_DIRS, TESTLOGDIR.getAbsolutePath());
|
||||
healthChecker = createNodeHealthCheckerService(conf);
|
||||
healthChecker = createNodeHealthCheckerService();
|
||||
healthChecker.init(conf);
|
||||
LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
|
||||
conf.set(YarnConfiguration.NM_WEBAPP_ADDRESS, webAddr);
|
||||
|
|
|
@ -70,7 +70,6 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdaterImpl;
|
||||
|
@ -80,8 +79,7 @@ import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
|
||||
|
||||
|
||||
import org.apache.hadoop.yarn.server.nodemanager.health.NodeHealthCheckerService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
||||
|
|
|
@ -42,9 +42,9 @@ The following configuration parameters can be used to modify the disk checks:
|
|||
| `yarn.nodemanager.disk-health-checker.max-disk-utilization-per-disk-percentage` | Float between 0-100 | The maximum percentage of disk space that may be utilized before a disk is marked as unhealthy by the disk checker service. This check is run for every disk used by the NodeManager. The default value is 90 i.e. 90% of the disk can be used. |
|
||||
| `yarn.nodemanager.disk-health-checker.min-free-space-per-disk-mb` | Integer | The minimum amount of free space that must be available on the disk for the disk checker service to mark the disk as healthy. This check is run for every disk used by the NodeManager. The default value is 0 i.e. the entire disk can be used. |
|
||||
|
||||
###External Health Script
|
||||
### External Health Script
|
||||
|
||||
Users may specify their own health checker script that will be invoked by the health checker service. Users may specify a timeout as well as options to be passed to the script. If the script times out, results in an exception being thrown or outputs a line which begins with the string ERROR, the node is marked as unhealthy. Please note that:
|
||||
Users may specify their own health checker scripts that will be invoked by the health checker service. Users may specify a timeout as well as options to be passed to the script. If the script times out, results in an exception being thrown or outputs a line which begins with the string ERROR, the node is marked as unhealthy. Please note that:
|
||||
|
||||
* Exit code other than 0 is **not** considered to be a failure because it might have been caused by a syntax error. Therefore the node will **not** be marked as unhealthy.
|
||||
|
||||
|
@ -52,15 +52,24 @@ Users may specify their own health checker script that will be invoked by the he
|
|||
|
||||
* Specifying a health check script is not mandatory. If no script is specified, only the disk checker status will be used to determine the health of the node.
|
||||
|
||||
The following configuration parameters can be used to set the health script:
|
||||
Users can specify up to 4 scripts to run individually with the `yarn.nodemanager.health-checker.scripts` configuration. Also these options can be configured for all scripts (global configurations):
|
||||
|
||||
| Configuration Name | Allowed Values | Description |
|
||||
|:---- |:---- |:---- |
|
||||
| `yarn.nodemanager.health-checker.interval-ms` | Postive integer | The interval, in milliseconds, at which health checker service runs; the default value is 10 minutes. |
|
||||
| `yarn.nodemanager.health-checker.script.timeout-ms` | Postive integer | The timeout for the health script that's executed; the default value is 20 minutes. |
|
||||
| `yarn.nodemanager.health-checker.script.path` | String | Absolute path to the health check script to be run. |
|
||||
| `yarn.nodemanager.health-checker.script.opts` | String | Arguments to be passed to the script when the script is executed. |
|
||||
|`yarn.nodemanager.health-checker.script`| String | The keywords for the health checker scripts separated by a comma. The default is "script". |
|
||||
| `yarn.nodemanager.health-checker.interval-ms` | Positive integer | The interval, in milliseconds, at which health checker service runs; the default value is 10 minutes. |
|
||||
| `yarn.nodemanager.health-checker.timeout-ms` | Positive integer | The timeout for the health script that's executed; the default value is 20 minutes. |
|
||||
|
||||
The following options can be set for every health checker script. The %s symbol is substituted with each keyword provided in `yarn.nodemanager.health-checker.script`.
|
||||
|
||||
| Configuration Name | Allowed Values | Description |
|
||||
|:---- |:---- |:---- |
|
||||
| `yarn.nodemanager.health-checker.%s.path` | String | Absolute path to the health check script to be run. Mandatory argument for each script. |
|
||||
| `yarn.nodemanager.health-checker.%s.opts` | String | Arguments to be passed to the script when the script is executed. Mandatory argument for each script. |
|
||||
| `yarn.nodemanager.health-checker.%s.interval-ms` | Positive integer | The interval, in milliseconds, at which health checker service runs. |
|
||||
| `yarn.nodemanager.health-checker.%s.timeout-ms` | Positive integer | The timeout for the health script that's executed. |
|
||||
|
||||
The interval and timeout options are not required to be specified. In that case the global configurations will be used.
|
||||
|
||||
NodeManager Restart
|
||||
-------------------
|
||||
|
|
Loading…
Reference in New Issue