YARN-6716. Native services support for specifying component start order. Contributed by Billie Rinaldi

This commit is contained in:
Jian He 2017-06-26 17:00:12 -07:00
parent b7e5d739ba
commit e86d828548
29 changed files with 628 additions and 1347 deletions

View File

@ -297,18 +297,21 @@ definitions:
ReadinessCheck:
description: A custom command or a pluggable helper container to determine the readiness of a container of a component. Readiness for every application is different. Hence the need for a simple interface, with scope to support advanced usecases.
required:
- uri
- type
properties:
type:
type: string
description: E.g. HTTP (YARN will perform a simple REST call at a regular interval and expect a 204 No content).
enum:
- HTTP
uri:
type: string
description: Fully qualified REST uri endpoint.
- PORT
props:
type: object
description: A blob of key value pairs that will be used to configure the check.
additionalProperties:
type: string
artifact:
description: Artifact of the pluggable readiness check helper container (optional). If specified, this helper container typically hosts the http uri and encapsulates the complex scripts required to perform actual container readiness check. At the end it is expected to respond a 204 No content just like the simplified use case. This pluggable framework benefits application owners who can run applications without any packaging modifications. Note, artifacts of type docker only is supported for now.
description: Artifact of the pluggable readiness check helper container (optional). If specified, this helper container typically hosts the http uri and encapsulates the complex scripts required to perform actual container readiness check. At the end it is expected to respond a 204 No content just like the simplified use case. This pluggable framework benefits application owners who can run applications without any packaging modifications. Note, artifacts of type docker only is supported for now. NOT IMPLEMENTED YET
$ref: '#/definitions/Artifact'
Configuration:
description: Set of configuration properties that can be injected into the application components via envs, files and custom pluggable helper docker containers. Files of several standard formats like xml, properties, json, yaml and templates will be supported.

View File

@ -196,4 +196,15 @@ public interface InternalKeys {
* default value: {@value}
*/
int DEFAULT_ESCALATION_CHECK_INTERVAL = 30;
/**
* interval between readiness checks: {@value}
*/
String MONITOR_INTERVAL = "monitor.interval.seconds";
/**
* default value: {@value}
*/
int DEFAULT_MONITOR_INTERVAL = 30;
}

View File

@ -19,13 +19,15 @@
package org.apache.slider.api;
/**
* Enumeration of state values
* Enumeration of state values.
*/
public class StateValues {
private StateValues() {}
/**
* Specification is incomplete & cannot
* be used: {@value}
* be used: {@value}.
*/
public static final int STATE_INCOMPLETE = 0;
@ -42,12 +44,20 @@ public class StateValues {
*/
public static final int STATE_LIVE = 3;
/**
* Stopped
* Not ready.
*/
public static final int STATE_STOPPED = 4;
public static final int STATE_NOT_READY = 4;
/**
* destroyed
* Ready.
*/
public static final int STATE_DESTROYED = 5;
public static final int STATE_READY = 5;
/**
* Stopped.
*/
public static final int STATE_STOPPED = 99;
/**
* Destroyed.
*/
public static final int STATE_DESTROYED = 100;
}

View File

@ -21,6 +21,8 @@ import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import com.fasterxml.jackson.annotation.JsonProperty;
@ -39,7 +41,8 @@ public class ReadinessCheck implements Serializable {
private static final long serialVersionUID = -3836839816887186801L;
public enum TypeEnum {
HTTP("HTTP");
HTTP("HTTP"),
PORT("PORT");
private String value;
@ -55,7 +58,7 @@ public class ReadinessCheck implements Serializable {
}
private TypeEnum type = null;
private String uri = null;
private Map<String, String> props = new HashMap<String, String>();
private Artifact artifact = null;
/**
@ -77,22 +80,27 @@ public class ReadinessCheck implements Serializable {
this.type = type;
}
/**
* Fully qualified REST uri endpoint.
**/
public ReadinessCheck uri(String uri) {
this.uri = uri;
public ReadinessCheck props(Map<String, String> props) {
this.props = props;
return this;
}
@ApiModelProperty(example = "null", required = true, value = "Fully qualified REST uri endpoint.")
@JsonProperty("uri")
public String getUri() {
return uri;
public ReadinessCheck putPropsItem(String key, String propsItem) {
this.props.put(key, propsItem);
return this;
}
public void setUri(String uri) {
this.uri = uri;
/**
* A blob of key value pairs that will be used to configure the check.
* @return props
**/
@ApiModelProperty(example = "null", value = "A blob of key value pairs that will be used to configure the check.")
public Map<String, String> getProps() {
return props;
}
public void setProps(Map<String, String> props) {
this.props = props;
}
/**
@ -128,23 +136,24 @@ public class ReadinessCheck implements Serializable {
return false;
}
ReadinessCheck readinessCheck = (ReadinessCheck) o;
return Objects.equals(this.type, readinessCheck.type)
&& Objects.equals(this.uri, readinessCheck.uri)
&& Objects.equals(this.artifact, readinessCheck.artifact);
return Objects.equals(this.type, readinessCheck.type) &&
Objects.equals(this.props, readinessCheck.props) &&
Objects.equals(this.artifact, readinessCheck.artifact);
}
@Override
public int hashCode() {
return Objects.hash(type, uri, artifact);
return Objects.hash(type, props, artifact);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("class ReadinessCheck {\n");
sb.append(" type: ").append(toIndentedString(type)).append("\n");
sb.append(" uri: ").append(toIndentedString(uri)).append("\n");
sb.append(" props: ").append(toIndentedString(props)).append("\n");
sb.append(" artifact: ").append(toIndentedString(artifact)).append("\n");
sb.append("}");
return sb.toString();

View File

@ -21,6 +21,8 @@ package org.apache.slider.providers;
import org.apache.slider.api.ResourceKeys;
import org.apache.slider.api.resource.Component;
import org.apache.slider.server.appmaster.state.RoleInstance;
import org.apache.slider.server.servicemonitor.MonitorUtils;
import org.apache.slider.server.servicemonitor.Probe;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -34,7 +36,6 @@ import java.util.concurrent.atomic.AtomicLong;
*/
public final class ProviderRole {
public final String name;
public final String group;
public final int id;
public int placementPolicy;
public int nodeFailureThreshold;
@ -43,6 +44,8 @@ public final class ProviderRole {
public final Component component;
public AtomicLong componentIdCounter = null;
public Queue<RoleInstance> failedInstances = new ConcurrentLinkedQueue<>();
public Probe probe;
public ProviderRole(String name, int id) {
this(name,
id,
@ -69,7 +72,6 @@ public final class ProviderRole {
long placementTimeoutSeconds,
String labelExpression) {
this(name,
name,
id,
policy,
nodeFailureThreshold,
@ -81,7 +83,6 @@ public final class ProviderRole {
/**
* Create a provider role with a role group
* @param name role/component name
* @param group role/component group
* @param id ID. This becomes the YARN priority
* @param policy placement policy
* @param nodeFailureThreshold threshold for node failures (within a reset interval)
@ -89,15 +90,10 @@ public final class ProviderRole {
* @param placementTimeoutSeconds for lax placement, timeout in seconds before
* @param labelExpression label expression for requests; may be null
*/
public ProviderRole(String name, String group, int id, int policy,
public ProviderRole(String name, int id, int policy,
int nodeFailureThreshold, long placementTimeoutSeconds,
String labelExpression, Component component) {
this.name = name;
if (group == null) {
this.group = name;
} else {
this.group = group;
}
this.id = id;
this.placementPolicy = policy;
this.nodeFailureThreshold = nodeFailureThreshold;
@ -107,6 +103,7 @@ public final class ProviderRole {
if(component.getUniqueComponentSupport()) {
componentIdCounter = new AtomicLong(0);
}
this.probe = MonitorUtils.getProbe(component.getReadinessCheck());
}
@ -132,7 +129,6 @@ public final class ProviderRole {
public String toString() {
final StringBuilder sb = new StringBuilder("ProviderRole{");
sb.append("name='").append(name).append('\'');
sb.append(", group=").append(group);
sb.append(", id=").append(id);
sb.append(", placementPolicy=").append(placementPolicy);
sb.append(", nodeFailureThreshold=").append(nodeFailureThreshold);

View File

@ -254,17 +254,6 @@ public class ProviderUtils implements RoleKeys, SliderKeys {
}
}
public static void addEnvForSubstitution(Map<String, String> env,
Map<String, String> tokensForSubstitution) {
if (env == null || env.isEmpty() || tokensForSubstitution == null
|| tokensForSubstitution.isEmpty()) {
return;
}
for (Map.Entry<String, String> entry : env.entrySet()) {
tokensForSubstitution.put($(entry.getKey()), entry.getValue());
}
}
// 1. Create all config files for a component on hdfs for localization
// 2. Add the config file to localResource
public synchronized void createConfigFileAndAddLocalResource(
@ -284,10 +273,6 @@ public class ProviderUtils implements RoleKeys, SliderKeys {
log.info("Component instance conf dir already exists: " + compInstanceDir);
}
// add Configuration#env into tokens substitution
addEnvForSubstitution(component.getConfiguration().getEnv(),
tokensForSubstitution);
log.info("Tokens substitution for component: " + roleInstance
.getCompInstanceName() + System.lineSeparator()
+ tokensForSubstitution);

View File

@ -133,7 +133,6 @@ public class RoleLaunchService
return "RoleLauncher{" +
"container=" + container.getId() +
", containerRole='" + role.name + '\'' +
", containerGroup='" + role.group + '\'' +
'}';
}

View File

@ -117,6 +117,7 @@ import org.apache.slider.server.appmaster.actions.ActionStopSlider;
import org.apache.slider.server.appmaster.actions.ActionUpgradeContainers;
import org.apache.slider.server.appmaster.actions.AsyncAction;
import org.apache.slider.server.appmaster.actions.EscalateOutstandingRequests;
import org.apache.slider.server.appmaster.actions.MonitorComponentInstances;
import org.apache.slider.server.appmaster.actions.QueueExecutor;
import org.apache.slider.server.appmaster.actions.QueueService;
import org.apache.slider.server.appmaster.actions.RegisterComponentInstance;
@ -340,7 +341,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
* ProviderService of this cluster
*/
@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
private List<ProviderService> providers = new ArrayList<>();
private Set<ProviderService> providers = new HashSet<>();
/**
* The YARN registry service
@ -868,6 +869,7 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
scheduleFailureWindowResets(application.getConfiguration());
scheduleEscalation(application.getConfiguration());
scheduleMonitoring(application.getConfiguration());
try {
// schedule YARN Registry registration
@ -1644,9 +1646,22 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
new RenewingAction<>(escalate, seconds, seconds, TimeUnit.SECONDS, 0);
actionQueues.renewing("escalation", renew);
}
/**
* Look at where the current node state is -and whether it should be changed
* Schedule monitor action
*/
private void scheduleMonitoring(
org.apache.slider.api.resource.Configuration conf) {
MonitorComponentInstances monitor = new MonitorComponentInstances();
long seconds = conf.getPropertyLong(InternalKeys.MONITOR_INTERVAL,
InternalKeys.DEFAULT_MONITOR_INTERVAL);
RenewingAction<MonitorComponentInstances> renew =
new RenewingAction<>(monitor, seconds, seconds, TimeUnit.SECONDS, 0);
actionQueues.renewing("monitoring", renew);
}
/**
* Look at where the current node state is and whether it should be changed.
* @param reason reason for operation
*/
private synchronized void reviewRequestAndReleaseNodes(String reason) {
@ -1711,6 +1726,15 @@ public class SliderAppMaster extends AbstractSliderLaunchedService
execute(operations);
}
public void monitorComponentInstances() {
// TODO use health checks?
// TODO publish timeline events for monitoring changes?
if (appState.monitorComponentInstances()) {
// monitoring change
reviewRequestAndReleaseNodes("monitoring change");
}
}
/**
* Shutdown operation: release all containers

View File

@ -6,7 +6,8 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -14,16 +15,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.appmaster.actions;
package org.apache.slider.server.servicemonitor;
import org.apache.slider.server.appmaster.SliderAppMaster;
import org.apache.slider.server.appmaster.state.AppState;
/**
* This exception is raised when the probe loop detects that it has been requested to stop
*
* Execute readiness checks on component instances.
*/
public class ProbeInterruptedException extends Exception {
public class MonitorComponentInstances extends AsyncAction {
public ProbeInterruptedException() {
super("Probe Interrupted");
public MonitorComponentInstances() {
super("MonitorComponentInstance");
}
@Override
public void execute(SliderAppMaster appMaster, QueueAccess queueService,
AppState appState) throws Exception {
appMaster.monitorComponentInstances();
}
}

View File

@ -73,6 +73,7 @@ import org.apache.slider.server.appmaster.operations.ContainerReleaseOperation;
import org.apache.slider.server.appmaster.operations.ContainerRequestOperation;
import org.apache.slider.server.appmaster.operations.UpdateBlacklistOperation;
import org.apache.slider.server.appmaster.timelineservice.ServiceTimelinePublisher;
import org.apache.slider.util.ServiceApiUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -344,16 +345,18 @@ public class AppState {
DEFAULT_NODE_FAILURE_THRESHOLD);
initGlobalTokensForSubstitute(binding);
//build the initial component list
// build the initial component list
Collection<Component> sortedComponents = ServiceApiUtil
.sortByDependencies(app.getComponents());
int priority = 1;
for (Component component : app.getComponents()) {
for (Component component : sortedComponents) {
priority = getNewPriority(priority);
String name = component.getName();
if (roles.containsKey(name)) {
continue;
}
log.info("Adding component: " + name);
createComponent(name, name, component, priority++);
createComponent(name, component, priority++);
}
//then pick up the requirements
@ -433,8 +436,8 @@ public class AppState {
});
}
public ProviderRole createComponent(String name, String group,
Component component, int priority) throws BadConfigException {
public ProviderRole createComponent(String name, Component component,
int priority) throws BadConfigException {
org.apache.slider.api.resource.Configuration conf =
component.getConfiguration();
long placementTimeout = conf.getPropertyLong(PLACEMENT_ESCALATE_DELAY,
@ -446,7 +449,7 @@ public class AppState {
String label = conf.getProperty(YARN_LABEL_EXPRESSION,
DEF_YARN_LABEL_EXPRESSION);
ProviderRole newRole =
new ProviderRole(name, group, priority, (int)placementPolicy, threshold,
new ProviderRole(name, priority, (int)placementPolicy, threshold,
placementTimeout, label, component);
buildRole(newRole, component);
log.info("Created a new role " + newRole);
@ -1535,7 +1538,8 @@ public class AppState {
allOperations.add(blacklistOperation);
}
for (RoleStatus roleStatus : getRoleStatusMap().values()) {
if (!roleStatus.isExcludeFromFlexing()) {
if (!roleStatus.isExcludeFromFlexing() &&
areDependenciesReady(roleStatus)) {
List<AbstractRMOperation> operations = reviewOneRole(roleStatus);
allOperations.addAll(operations);
}
@ -1543,6 +1547,47 @@ public class AppState {
return allOperations;
}
@VisibleForTesting
public boolean areDependenciesReady(RoleStatus roleStatus) {
List<String> dependencies = roleStatus.getProviderRole().component
.getDependencies();
if (SliderUtils.isEmpty(dependencies)) {
return true;
}
for (String dependency : dependencies) {
ProviderRole providerRole = roles.get(dependency);
if (providerRole == null) {
log.error("Couldn't find dependency {} for {} (should never happen)",
dependency, roleStatus.getName());
continue;
}
RoleStatus other = getRoleStatusMap().get(providerRole.id);
if (other.getRunning() < other.getDesired()) {
log.info("Dependency {} not satisfied for {}, only {} of {} instances" +
" running", dependency, roleStatus.getName(), other.getRunning(),
other.getDesired());
return false;
}
if (providerRole.probe == null) {
continue;
}
List<RoleInstance> dependencyInstances = enumLiveNodesInRole(
providerRole.name);
if (dependencyInstances.size() < other.getDesired()) {
log.info("Dependency {} not satisfied for {}, only {} of {} instances" +
" live", dependency, roleStatus.getName(),
dependencyInstances.size(), other.getDesired());
return false;
}
for (RoleInstance instance : dependencyInstances) {
if (instance.state != STATE_READY) {
return false;
}
}
}
return true;
}
/**
* Check the "recent" failure threshold for a role
* @param role role to examine
@ -1620,6 +1665,31 @@ public class AppState {
return operations;
}
public synchronized boolean monitorComponentInstances() {
boolean hasChanged = false;
for (RoleInstance instance : getLiveContainers().values()) {
if (instance.providerRole.probe == null) {
continue;
}
boolean ready = instance.providerRole.probe.ping(instance).isSuccess();
if (ready) {
if (instance.state != STATE_READY) {
instance.state = STATE_READY;
hasChanged = true;
log.info("State of {} changed to ready", instance.role);
}
} else {
if (instance.state == STATE_READY) {
instance.state = STATE_NOT_READY;
hasChanged = true;
log.info("State of {} changed from ready to not ready", instance
.role);
}
}
}
return hasChanged;
}
/**
* Look at the allocation status of one role, and trigger add/release
* actions if the number of desired role instances doesn't equal

View File

@ -45,7 +45,6 @@ import java.util.Map;
public final class RoleStatus implements MetricSet {
private final String name;
private final String group;
/**
* Role priority
@ -66,7 +65,6 @@ public final class RoleStatus implements MetricSet {
public RoleStatus(ProviderRole providerRole) {
this.providerRole = providerRole;
this.name = providerRole.name;
this.group = providerRole.group;
this.key = providerRole.id;
componentMetrics =
SliderMetrics.register(this.name, "Metrics for component " + this.name);
@ -95,10 +93,6 @@ public final class RoleStatus implements MetricSet {
return name;
}
public String getGroup() {
return group;
}
public int getKey() {
return key;
}

View File

@ -18,30 +18,50 @@
package org.apache.slider.server.servicemonitor;
import org.apache.hadoop.conf.Configuration;
import org.apache.slider.server.appmaster.state.RoleInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Map;
public class HttpProbe extends Probe {
protected static final Logger log = LoggerFactory.getLogger(HttpProbe.class);
private final URL url;
private static final String HOST_TOKEN = "${THIS_HOST}";
private final String urlString;
private final int timeout;
private final int min, max;
public HttpProbe(URL url, int timeout, int min, int max, Configuration conf) throws IOException {
public HttpProbe(String url, int timeout, int min, int max, Configuration
conf) {
super("Http probe of " + url + " [" + min + "-" + max + "]", conf);
this.url = url;
this.urlString = url;
this.timeout = timeout;
this.min = min;
this.max = max;
}
public static HttpURLConnection getConnection(URL url, int timeout) throws IOException {
public static HttpProbe create(Map<String, String> props)
throws IOException {
String urlString = getProperty(props, WEB_PROBE_URL, null);
new URL(urlString);
int timeout = getPropertyInt(props, WEB_PROBE_CONNECT_TIMEOUT,
WEB_PROBE_CONNECT_TIMEOUT_DEFAULT);
int minSuccess = getPropertyInt(props, WEB_PROBE_MIN_SUCCESS,
WEB_PROBE_MIN_SUCCESS_DEFAULT);
int maxSuccess = getPropertyInt(props, WEB_PROBE_MAX_SUCCESS,
WEB_PROBE_MAX_SUCCESS_DEFAULT);
return new HttpProbe(urlString, timeout, minSuccess, maxSuccess, null);
}
private static HttpURLConnection getConnection(URL url, int timeout) throws
IOException {
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setInstanceFollowRedirects(true);
connection.setConnectTimeout(timeout);
@ -49,13 +69,17 @@ public class HttpProbe extends Probe {
}
@Override
public ProbeStatus ping(boolean livePing) {
public ProbeStatus ping(RoleInstance roleInstance) {
ProbeStatus status = new ProbeStatus();
String ip = roleInstance.ip;
if (ip == null) {
status.fail(this, new IOException("IP is not available yet"));
return status;
}
HttpURLConnection connection = null;
try {
if (log.isDebugEnabled()) {
// LOG.debug("Fetching " + url + " with timeout " + timeout);
}
URL url = new URL(urlString.replace(HOST_TOKEN, ip));
connection = getConnection(url, this.timeout);
int rc = connection.getResponseCode();
if (rc < min || rc > max) {
@ -66,8 +90,8 @@ public class HttpProbe extends Probe {
} else {
status.succeed(this);
}
} catch (IOException e) {
String error = "Probe " + url + " failed: " + e;
} catch (Throwable e) {
String error = "Probe " + urlString + " failed for IP " + ip + ": " + e;
log.info(error, e);
status.fail(this,
new IOException(error, e));

View File

@ -23,257 +23,44 @@ package org.apache.slider.server.servicemonitor;
public interface MonitorKeys {
/**
* Prefix of all other configuration options: {@value}
* Port probing key : port to attempt to create a TCP connection to {@value}.
*/
String MONITOR_KEY_PREFIX = "service.monitor.";
String PORT_PROBE_PORT = "port";
/**
* Classname of the reporter Key: {@value}
* Port probing key : timeout for the the connection attempt {@value}.
*/
String MONITOR_REPORTER =
MONITOR_KEY_PREFIX + "report.classname";
String PORT_PROBE_CONNECT_TIMEOUT = "timeout";
/**
* Interval in milliseconds between reporting health status to the reporter
* Key: {@value}
* Port probing default : timeout for the connection attempt {@value}.
*/
String MONITOR_REPORT_INTERVAL =
MONITOR_KEY_PREFIX + "report.interval";
/**
* Time in millis between the last probing cycle ending and the new one
* beginning. Key: {@value}
*/
String MONITOR_PROBE_INTERVAL =
MONITOR_KEY_PREFIX + "probe.interval";
/**
* How long in milliseconds does the probing loop have to be blocked before
* that is considered a liveness failure Key: {@value}
*/
String MONITOR_PROBE_TIMEOUT =
MONITOR_KEY_PREFIX + "probe.timeout";
/**
* How long in milliseconds does the probing loop have to be blocked before
* that is considered a liveness failure Key: {@value}
*/
String MONITOR_BOOTSTRAP_TIMEOUT =
MONITOR_KEY_PREFIX + "bootstrap.timeout";
/**
* does the monitor depend on DFS being live
*/
String MONITOR_DEPENDENCY_DFSLIVE =
MONITOR_KEY_PREFIX + "dependency.dfslive";
/**
* default timeout for the entire bootstrap phase {@value}
*/
int BOOTSTRAP_TIMEOUT_DEFAULT = 60000;
/**
* Default value if the key is not in the config file: {@value}
*/
int REPORT_INTERVAL_DEFAULT = 10000;
/**
* Default value if the key is not in the config file: {@value}
*/
int PROBE_INTERVAL_DEFAULT = 10000;
/**
* Default value if the key is not in the config file: {@value}
*/
int PROBE_TIMEOUT_DEFAULT = 60000;
/**
* Port probe enabled/disabled flag Key: {@value}
*/
String PORT_PROBE_ENABLED =
MONITOR_KEY_PREFIX + "portprobe.enabled";
/**
* Port probing key : port to attempt to create a TCP connection to {@value}
*/
String PORT_PROBE_PORT =
MONITOR_KEY_PREFIX + "portprobe.port";
/**
* Port probing key : port to attempt to create a TCP connection to {@value}
*/
String PORT_PROBE_HOST =
MONITOR_KEY_PREFIX + "portprobe.host";
/**
* Port probing key : timeout of the connection attempt {@value}
*/
String PORT_PROBE_CONNECT_TIMEOUT =
MONITOR_KEY_PREFIX + "portprobe.connect.timeout";
/**
* Port probing key : bootstrap timeout -how long in milliseconds should the
* port probing take to connect before the failure to connect is considered a
* liveness failure. That is: how long should the IPC port take to come up?
* {@value}
*/
String PORT_PROBE_BOOTSTRAP_TIMEOUT =
MONITOR_KEY_PREFIX + "portprobe.bootstrap.timeout";
/**
* default timeout for port probes {@value}
*/
int PORT_PROBE_BOOTSTRAP_TIMEOUT_DEFAULT = 60000;
/**
* default value for port probe connection attempts {@value}
*/
int PORT_PROBE_CONNECT_TIMEOUT_DEFAULT = 1000;
/**
* default port for probes {@value}
* Web probing key : URL {@value}.
*/
int DEFAULT_PROBE_PORT = 8020;
String WEB_PROBE_URL = "url";
/**
* default host for probes {@value}
* Web probing key : min success code {@value}.
*/
String DEFAULT_PROBE_HOST = "localhost";
String WEB_PROBE_MIN_SUCCESS = "min.success";
/**
* Probe enabled/disabled flag Key: {@value}
* Web probing key : max success code {@value}.
*/
String LS_PROBE_ENABLED =
MONITOR_KEY_PREFIX + "lsprobe.enabled";
String WEB_PROBE_MAX_SUCCESS = "max.success";
/**
* Probe path for LS operation Key: {@value}
* Web probing default : min successful response code {@value}.
*/
String LS_PROBE_PATH =
MONITOR_KEY_PREFIX + "lsprobe.path";
int WEB_PROBE_MIN_SUCCESS_DEFAULT = 200;
/**
* Default path for LS operation Key: {@value}
* Web probing default : max successful response code {@value}.
*/
String LS_PROBE_DEFAULT = "/";
int WEB_PROBE_MAX_SUCCESS_DEFAULT = 299;
/**
* Port probing key : bootstrap timeout -how long in milliseconds should the
* port probing take to connect before the failure to connect is considered a
* liveness failure. That is: how long should the IPC port take to come up?
* {@value}
* Web probing key : timeout for the connection attempt {@value}
*/
String LS_PROBE_BOOTSTRAP_TIMEOUT =
MONITOR_KEY_PREFIX + "lsprobe.bootstrap.timeout";
String WEB_PROBE_CONNECT_TIMEOUT = "timeout";
/**
* default timeout for port probes {@value}
* Port probing default : timeout for the connection attempt {@value}.
*/
int LS_PROBE_BOOTSTRAP_TIMEOUT_DEFAULT = PORT_PROBE_BOOTSTRAP_TIMEOUT_DEFAULT;
/**
* Probe enabled/disabled flag Key: {@value}
*/
String WEB_PROBE_ENABLED =
MONITOR_KEY_PREFIX + "webprobe.enabled";
/**
* Probe URL Key: {@value}
*/
String WEB_PROBE_URL =
MONITOR_KEY_PREFIX + "webprobe.url";
/**
* Default path for web probe Key: {@value}
*/
String WEB_PROBE_DEFAULT_URL = "http://localhost:50070/";
/**
* min error code Key: {@value}
*/
String WEB_PROBE_MIN =
MONITOR_KEY_PREFIX + "webprobe.min";
/**
* min error code Key: {@value}
*/
String WEB_PROBE_MAX =
MONITOR_KEY_PREFIX + "webprobe.max";
/**
* Port probing key : timeout of the connection attempt {@value}
*/
String WEB_PROBE_CONNECT_TIMEOUT =
MONITOR_KEY_PREFIX + "webprobe.connect.timeout";
/**
* Default HTTP response code expected from the far end for
* the endpoint to be considered live.
*/
int WEB_PROBE_DEFAULT_CODE = 200;
/**
* Port probing key : bootstrap timeout -how long in milliseconds should the
* port probing take to connect before the failure to connect is considered a
* liveness failure. That is: how long should the IPC port take to come up?
* {@value}
*/
String WEB_PROBE_BOOTSTRAP_TIMEOUT =
MONITOR_KEY_PREFIX + "webprobe.bootstrap.timeout";
/**
* default timeout for port probes {@value}
*/
int WEB_PROBE_BOOTSTRAP_TIMEOUT_DEFAULT = PORT_PROBE_BOOTSTRAP_TIMEOUT_DEFAULT;
/**
* Probe enabled/disabled flag Key: {@value}
*/
String JT_PROBE_ENABLED =
MONITOR_KEY_PREFIX + "jtprobe.enabled";
/**
* Port probing key : bootstrap timeout -how long in milliseconds should the
* port probing take to connect before the failure to connect is considered a
* liveness failure. That is: how long should the IPC port take to come up?
* {@value}
*/
String JT_PROBE_BOOTSTRAP_TIMEOUT =
MONITOR_KEY_PREFIX + "jtprobe.bootstrap.timeout";
/**
* default timeout for port probes {@value}
*/
int JT_PROBE_BOOTSTRAP_TIMEOUT_DEFAULT = PORT_PROBE_BOOTSTRAP_TIMEOUT_DEFAULT;
/**
* Probe enabled/disabled flag Key: {@value}
*/
String PID_PROBE_ENABLED =
MONITOR_KEY_PREFIX + "pidprobe.enabled";
/**
* PID probing key : pid to attempt to create a TCP connection to {@value}
*/
String PID_PROBE_PIDFILE =
MONITOR_KEY_PREFIX + "pidprobe.pidfile";
int WEB_PROBE_CONNECT_TIMEOUT_DEFAULT = 1000;
}

View File

@ -17,25 +17,19 @@
package org.apache.slider.server.servicemonitor;
import org.apache.slider.api.resource.ReadinessCheck;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Formatter;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.TreeSet;
/**
* Various utils to work with the monitor
*/
public final class MonitorUtils {
protected static final Logger log = LoggerFactory.getLogger(MonitorUtils.class);
protected static final Logger LOG = LoggerFactory.getLogger(MonitorUtils
.class);
private MonitorUtils() {
}
@ -44,25 +38,6 @@ public final class MonitorUtils {
return val != 1 ? "s" : "";
}
/**
* Convert the arguments -including dropping any empty strings that creep in
* @param args arguments
* @return a list view with no empty strings
*/
public static List<String> prepareArgs(String[] args) {
List<String> argsList = new ArrayList<String>(args.length);
StringBuilder argsStr = new StringBuilder("Arguments: [");
for (String arg : args) {
argsStr.append('"').append(arg).append("\" ");
if (!arg.isEmpty()) {
argsList.add(arg);
}
}
argsStr.append(']');
log.debug(argsStr.toString());
return argsList;
}
/**
* Convert milliseconds to human time -the exact format is unspecified
* @param milliseconds a time in milliseconds
@ -85,25 +60,25 @@ public final class MonitorUtils {
return sb.toString();
}
public static InetSocketAddress getURIAddress(URI uri) {
String host = uri.getHost();
int port = uri.getPort();
return new InetSocketAddress(host, port);
}
/**
* Get the localhost -may be null
* @return the localhost if known
*/
public static InetAddress getLocalHost() {
InetAddress localHost;
try {
localHost = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
localHost = null;
public static Probe getProbe(ReadinessCheck readinessCheck) {
if (readinessCheck == null) {
return null;
}
if (readinessCheck.getType() == null) {
return null;
}
try {
switch (readinessCheck.getType()) {
case HTTP:
return HttpProbe.create(readinessCheck.getProps());
case PORT:
return PortProbe.create(readinessCheck.getProps());
default:
return null;
}
} catch (Throwable t) {
throw new IllegalArgumentException("Error creating readiness check " +
t);
}
return localHost;
}
}

View File

@ -17,91 +17,77 @@
package org.apache.slider.server.servicemonitor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.slider.server.appmaster.state.RoleInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Map;
/**
* Probe for a port being open
* Probe for a port being open.
*/
public class PortProbe extends Probe {
protected static final Logger log = LoggerFactory.getLogger(PortProbe.class);
private final String host;
private final int port;
private final int timeout;
public PortProbe(String host, int port, int timeout, String name, Configuration conf)
throws IOException {
super("Port probe " + name + " " + host + ":" + port + " for " + timeout + "ms",
conf);
this.host = host;
public PortProbe(int port, int timeout) {
super("Port probe of " + port + " for " + timeout + "ms", null);
this.port = port;
this.timeout = timeout;
}
public static PortProbe createPortProbe(Configuration conf,
String hostname,
int port) throws IOException {
PortProbe portProbe = new PortProbe(hostname,
port,
conf.getInt(
PORT_PROBE_CONNECT_TIMEOUT,
PORT_PROBE_CONNECT_TIMEOUT_DEFAULT),
"",
conf);
public static PortProbe create(Map<String, String> props)
throws IOException {
int port = getPropertyInt(props, PORT_PROBE_PORT, null);
return portProbe;
}
@Override
public void init() throws IOException {
if (port >= 65536) {
throw new IOException("Port is out of range: " + port);
throw new IOException(PORT_PROBE_PORT + " " + port + " is out of " +
"range");
}
InetAddress target;
if (host != null) {
log.debug("looking up host " + host);
target = InetAddress.getByName(host);
} else {
log.debug("Host is null, retrieving localhost address");
target = InetAddress.getLocalHost();
}
log.info("Checking " + target + ":" + port);
int timeout = getPropertyInt(props, PORT_PROBE_CONNECT_TIMEOUT,
PORT_PROBE_CONNECT_TIMEOUT_DEFAULT);
return new PortProbe(port, timeout);
}
/**
* Try to connect to the (host,port); a failure to connect within
* the specified timeout is a failure
* @param livePing is the ping live: true for live; false for boot time
* the specified timeout is a failure.
* @param roleInstance role instance
* @return the outcome
*/
@Override
public ProbeStatus ping(boolean livePing) {
public ProbeStatus ping(RoleInstance roleInstance) {
ProbeStatus status = new ProbeStatus();
InetSocketAddress sockAddr = new InetSocketAddress(host, port);
String ip = roleInstance.ip;
if (ip == null) {
status.fail(this, new IOException("IP is not available yet"));
return status;
}
InetSocketAddress sockAddr = new InetSocketAddress(ip, port);
Socket socket = new Socket();
try {
if (log.isDebugEnabled()) {
log.debug("Connecting to " + sockAddr.toString() + " connection-timeout=" +
MonitorUtils.millisToHumanTime(timeout));
log.debug("Connecting to " + sockAddr.toString() + "timeout=" +
MonitorUtils.millisToHumanTime(timeout));
}
socket.connect(sockAddr, timeout);
status.succeed(this);
} catch (IOException e) {
} catch (Throwable e) {
String error = "Probe " + sockAddr + " failed: " + e;
log.debug(error, e);
status.fail(this,
new IOException(error, e));
status.fail(this, new IOException(error, e));
} finally {
IOUtils.closeSocket(socket);
}
return status;
}
}

View File

@ -17,9 +17,12 @@
package org.apache.slider.server.servicemonitor;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.slider.server.appmaster.state.RoleInstance;
import java.io.IOException;
import java.util.Map;
/**
* Base class of all probes.
@ -29,19 +32,6 @@ public abstract class Probe implements MonitorKeys {
protected final Configuration conf;
private String name;
// =======================================================
/*
* These fields are all used by the probe loops
* to maintain state. Please Leave them alone.
*/
public int successCount;
public int failureCount;
public long bootstrapStarted;
public long bootstrapFinished;
private boolean booted = false;
// =======================================================
/**
* Create a probe of a specific name
*
@ -65,11 +55,31 @@ public abstract class Probe implements MonitorKeys {
@Override
public String toString() {
return getName() +
" {" +
"successCount=" + successCount +
", failureCount=" + failureCount +
'}';
return getName();
}
public static String getProperty(Map<String, String> props, String name,
String defaultValue) throws IOException {
String value = props.get(name);
if (StringUtils.isEmpty(value)) {
if (defaultValue == null) {
throw new IOException(name + " not specified");
}
return defaultValue;
}
return value;
}
public static int getPropertyInt(Map<String, String> props, String name,
Integer defaultValue) throws IOException {
String value = props.get(name);
if (StringUtils.isEmpty(value)) {
if (defaultValue == null) {
throw new IOException(name + " not specified");
}
return defaultValue;
}
return Integer.parseInt(value);
}
/**
@ -83,25 +93,9 @@ public abstract class Probe implements MonitorKeys {
* Ping the endpoint. All exceptions must be caught and included in the
* (failure) status.
*
* @param livePing is the ping live: true for live; false for boot time
* @param roleInstance instance to ping
* @return the status
*/
public abstract ProbeStatus ping(boolean livePing);
public abstract ProbeStatus ping(RoleInstance roleInstance);
public void beginBootstrap() {
bootstrapStarted = System.currentTimeMillis();
}
public void endBootstrap() {
setBooted(true);
bootstrapFinished = System.currentTimeMillis();
}
public boolean isBooted() {
return booted;
}
public void setBooted(boolean booted) {
this.booted = booted;
}
}

View File

@ -1,32 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.servicemonitor;
/**
* An exception to raise on a probe failure
*/
public class ProbeFailedException extends Exception {
public final ProbeStatus status;
public ProbeFailedException(String text, ProbeStatus status) {
super((text == null ? "Probe Failed" : (text + ": ")) + status, status.getThrown());
this.status = status;
}
}

View File

@ -1,56 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.servicemonitor;
/**
* Probe phases. The names are for strings; the index is the order in which things happen;
* -any state can got to terminating directly.
*/
public enum ProbePhase {
INIT("Initializing", 0),
DEPENDENCY_CHECKING("Dependencies", 1),
BOOTSTRAPPING("Bootstrapping", 2),
LIVE("Live", 3),
TERMINATING("Terminating", 4);
private final String name;
private final int index;
ProbePhase(String name, int index) {
this.name = name;
this.index = index;
}
public String getName() {
return name;
}
public int getIndex() {
return index;
}
/**
* How many phases are there?
*/
public static final int PHASE_COUNT = TERMINATING.index + 1;
@Override
public String toString() {
return name;
}
}

View File

@ -1,79 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.servicemonitor;
/**
* This interface is for use by the Poll Workers to send events to the reporters.
*
* It is up the reporters what to do with the specific events.
*/
public interface ProbeReportHandler {
/**
* The probe process has changed state.
* @param probePhase the new process phrase
*/
void probeProcessStateChange(ProbePhase probePhase);
/**
* Report a probe outcome
* @param phase the current phase of probing
* @param status the probe status
*/
void probeResult(ProbePhase phase, ProbeStatus status);
/**
* A probe has failed
*/
void probeFailure(ProbeFailedException exception);
/**
* A probe has just booted
* @param status probe status
*/
void probeBooted(ProbeStatus status);
boolean commence(String name, String description);
void unregister();
/**
* A heartbeat event should be raised
* @param status the probe status
*/
void heartbeat(ProbeStatus status);
/**
* A probe has timed out
* @param currentPhase the current execution phase
* @param probe the probe that timed out
* @param lastStatus the last status that was successfully received -which is implicitly
* not the status of the timed out probe
* @param currentTime the current time
*/
void probeTimedOut(ProbePhase currentPhase,
Probe probe,
ProbeStatus lastStatus,
long currentTime);
/**
* Event to say that the live probe cycle completed so the entire
* system can be considered functional.
*/
void liveProbeCycleCompleted();
}

View File

@ -34,7 +34,6 @@ public final class ProbeStatus implements Serializable {
private String message;
private Throwable thrown;
private transient Probe originator;
private ProbePhase probePhase;
public ProbeStatus() {
}
@ -99,14 +98,6 @@ public final class ProbeStatus implements Serializable {
this.thrown = thrown;
}
public ProbePhase getProbePhase() {
return probePhase;
}
public void setProbePhase(ProbePhase probePhase) {
this.probePhase = probePhase;
}
/**
* Get the probe that generated this result. May be null
* @return a possibly null reference to a probe
@ -147,7 +138,6 @@ public final class ProbeStatus implements Serializable {
public String toString() {
LogEntryBuilder builder = new LogEntryBuilder("Probe Status");
builder.elt("time", timestampText)
.elt("phase", probePhase)
.elt("outcome", (success ? "success" : "failure"));
if (success != realOutcome) {
@ -161,10 +151,6 @@ public final class ProbeStatus implements Serializable {
return builder.toString();
}
public boolean inPhase(ProbePhase phase) {
return getProbePhase().equals(phase);
}
/**
* Flip the success bit on while the real outcome bit is kept false
*/

View File

@ -1,446 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.servicemonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* This is the entry point to do work. A list of probes is taken in, in order of
* booting. Once live they go to the live probes list.
*
* The dependency probes are a set of probes for dependent services, all of which
* must be live before boot probes commence.
*
* The boot probes are executed and are allowed to fail; failure is interpreted as "not yet live"
*
* Once all boot probes are live, the live list is used for probes; these must not fail.
*
* There is no timeout on dependency probe bootstrap time, because of the notion that
* restarting this service will have no effect on the dependencies.
*/
public class ProbeWorker implements Runnable {
protected static final Logger log = LoggerFactory.getLogger(ProbeWorker.class);
public static final String FAILED_TO_BOOT = "Monitored service failed to bootstrap after ";
public static final String FAILURE_OF_A_LIVE_PROBE_DURING_BOOTSTRAPPING = "Failure of a live probe during bootstrapping";
private final List<Probe> monitorProbes;
private final List<Probe> dependencyProbes;
public final int interval;
protected volatile ProbeStatus lastStatus;
protected volatile ProbeStatus lastFailingBootstrapProbe;
protected volatile Probe currentProbe;
private volatile boolean mustExit;
private final int bootstrapTimeout;
private long bootstrapEndtime;
private ProbeReportHandler reportHandler;
private volatile ProbePhase probePhase = ProbePhase.INIT;
/**
* Create a probe worker
* @param monitorProbes list of probes that must boot and then go live -after which
* they must stay live.
* @param dependencyProbes the list of dependency probes that must all succeed before
* any attempt to probe the direct probe list is performed. Once the
* dependency phase has completed, these probes are never checked again.
* @param interval probe interval in milliseconds.
* @param bootstrapTimeout timeout for bootstrap in milliseconds
*/
public ProbeWorker(List<Probe> monitorProbes, List<Probe> dependencyProbes, int interval, int bootstrapTimeout) {
this.monitorProbes = monitorProbes;
this.dependencyProbes = dependencyProbes != null ? dependencyProbes : new ArrayList<Probe>(0);
this.interval = interval;
lastStatus = new ProbeStatus(now(),
"Initial status");
lastStatus.setProbePhase(ProbePhase.INIT);
this.bootstrapTimeout = bootstrapTimeout;
}
public void init() throws IOException {
for (Probe probe : monitorProbes) {
probe.init();
}
for (Probe probe : dependencyProbes) {
probe.init();
}
}
public void setReportHandler(ProbeReportHandler reportHandler) {
this.reportHandler = reportHandler;
}
public void setMustExit() {
this.mustExit = true;
}
public ProbeStatus getLastStatus() {
return lastStatus;
}
public synchronized Probe getCurrentProbe() {
return currentProbe;
}
public ProbePhase getProbePhase() {
return probePhase;
}
/**
* Enter the new process state, and report it to the report handler.
* This is synchronized just to make sure there isn't more than one
* invocation at the same time.
* @param status the new process status
*/
private synchronized void enterProbePhase(ProbePhase status) {
this.probePhase = status;
if (reportHandler != null) {
reportHandler.probeProcessStateChange(status);
}
}
/**
* Report the probe status to the listener -setting the probe phase field
* before doing so.
* The value is also stored in the {@link #lastStatus} field
* @param status the new status
*/
private void reportProbeStatus(ProbeStatus status) {
ProbePhase phase = getProbePhase();
status.setProbePhase(phase);
lastStatus = status;
reportHandler.probeResult(phase, status);
}
/**
* Ping one probe. Logs the operation at debug level; sets the field <code>currentProbe</code>
* to the probe for the duration of the operation -this is used when identifying the
* cause of a hung reporting loop
* @param probe probe to ping
* @param live flag to indicate whether or not the operation is live or bootstrapping
* @return the status of the ping
* @throws ProbeInterruptedException if the probe has been told to exit
*/
private ProbeStatus ping(Probe probe, boolean live) throws ProbeInterruptedException {
if (log.isDebugEnabled()) {
log.debug("Executing " + probe);
}
checkForExitRequest();
currentProbe = probe;
try {
return probe.ping(live);
} finally {
currentProbe = null;
}
}
/**
* Check for an exit request -and convert it to an exception if made
* @throws ProbeInterruptedException iff {@link #mustExit} is true
*/
private void checkForExitRequest() throws ProbeInterruptedException {
if (mustExit) {
throw new ProbeInterruptedException();
}
}
/**
* Check the dependencies.
* The moment a failing test is reached the call returns without
* any reporting.
*
* All successful probes are reported, so as to keep the heartbeats happy.
*
* @return the status of the last dependency check. If this is a success
* them every probe passed.
*/
private ProbeStatus checkDependencyProbes() throws ProbeInterruptedException {
ProbeStatus status = null;
for (Probe dependency : dependencyProbes) {
//ping them, making clear they are not to run any bootstrap logic
status = ping(dependency, true);
if (!status.isSuccess()) {
//the first failure means the rest of the list can be skipped
break;
}
reportProbeStatus(status);
}
//return the last status
return status;
}
/**
* Run through all the dependency probes and report their outcomes successes (even if they fail)
* @return true iff all the probes have succeeded.
* @throws ProbeInterruptedException if the process was interrupted.
*/
public boolean checkAndReportDependencyProbes() throws ProbeInterruptedException {
ProbeStatus status;
status = checkDependencyProbes();
if (status != null && !status.isSuccess()) {
//during dependency checking, a failure is still reported as a success
status.markAsSuccessful();
reportProbeStatus(status);
//then return without checking anything else
return false;
}
//all dependencies are done.
return true;
}
/**
* Begin bootstrapping by telling each probe that they have started.
* This sets the timeouts up, as well as permits any other set-up actions
* to begin.
*/
private void beginBootstrapProbes() {
synchronized (this) {
bootstrapEndtime = now() + bootstrapTimeout;
}
for (Probe probe : monitorProbes) {
probe.beginBootstrap();
}
}
private long now() {
return System.currentTimeMillis();
}
/**
* Check the bootstrap probe list. All successful probes get reported.
* The first unsuccessful probe will be returned and not reported (left for policy upstream).
* If the failing probe has timed out, that is turned into a {@link ProbeFailedException}
* @return the last (unsuccessful) probe, or null if they all succeeded
* @throws ProbeInterruptedException interrupts
* @throws ProbeFailedException on a boot timeout
*/
private boolean checkBootstrapProbes() throws ProbeInterruptedException, ProbeFailedException {
verifyBootstrapHasNotTimedOut();
boolean probeFailed = false;
//now run through all the bootstrap probes
for (Probe probe : monitorProbes) {
//ping them
ProbeStatus status = ping(probe, false);
if (!status.isSuccess()) {
probeFailed = true;
lastFailingBootstrapProbe = status;
probe.failureCount++;
if (log.isDebugEnabled()) {
log.debug("Booting probe failed: " + status);
}
//at this point check to see if the timeout has occurred -and if so, force in the last probe status.
//this is a failure but not a timeout
//during boot, a failure of a probe that hasn't booted is still reported as a success
if (!probe.isBooted()) {
//so the success bit is flipped
status.markAsSuccessful();
reportProbeStatus(status);
} else {
//the probe had booted but then it switched to failing
//update the status unedited
reportProbeStatus(status);
//then fail
throw raiseProbeFailure(status, FAILURE_OF_A_LIVE_PROBE_DURING_BOOTSTRAPPING);
}
} else {
//this probe is working
if (!probe.isBooted()) {
//if it is new, mark it as live
if (log.isDebugEnabled()) {
log.debug("Booting probe is now live: " + probe);
}
probe.endBootstrap();
//tell the report handler that another probe has booted
reportHandler.probeBooted(status);
}
//push out its status
reportProbeStatus(status);
probe.successCount++;
}
}
return !probeFailed;
}
public int getBootstrapTimeout() {
return bootstrapTimeout;
}
/**
* This checks that bootstrap operations have not timed out
* @throws ProbeFailedException if the bootstrap has failed
*/
public void verifyBootstrapHasNotTimedOut() throws ProbeFailedException {
//first step -look for a timeout
if (isBootstrapTimeExceeded()) {
String text = FAILED_TO_BOOT
+ MonitorUtils.millisToHumanTime(bootstrapTimeout);
ProbeStatus status;
if (lastFailingBootstrapProbe != null) {
status = lastFailingBootstrapProbe;
status.setSuccess(false);
} else {
status = new ProbeStatus();
status.finish(null, false, text, null);
}
throw raiseProbeFailure(status,
text);
}
}
/**
* predicate that gets current time and checks for its time being exceeded.
* @return true iff the current time is > the end time
*/
public synchronized boolean isBootstrapTimeExceeded() {
return now() > bootstrapEndtime;
}
/**
* run through all the bootstrap probes and see if they are live.
* @return true iff all boot probes succeeded
* @throws ProbeInterruptedException the probe interruption flags
* @throws ProbeFailedException if a probe failed.
*/
public boolean checkAndReportBootstrapProbes() throws ProbeInterruptedException,
ProbeFailedException {
if (bootstrapTimeout <= 0) {
//there is no period of grace for bootstrapping probes, so return true saying
//this phase is complete
return true;
}
//now the bootstrapping probes
return checkBootstrapProbes();
}
/**
* run through all the live probes, pinging and reporting them.
* A single probe failure is turned into an exception
* @throws ProbeFailedException a probe failed
* @throws ProbeInterruptedException the probe process was explicitly interrupted
*/
protected void checkAndReportLiveProbes() throws ProbeFailedException, ProbeInterruptedException {
ProbeStatus status = null;
//go through the live list
if (log.isDebugEnabled()) {
log.debug("Checking live probes");
}
for (Probe probe : monitorProbes) {
status = ping(probe, true);
reportProbeStatus(status);
if (!status.isSuccess()) {
throw raiseProbeFailure(status, "Failure of probe in \"live\" monitor");
}
probe.successCount++;
}
//here all is well, so notify the reporter
reportHandler.liveProbeCycleCompleted();
}
/**
* Run the set of probes relevant for this phase of the probe lifecycle.
* @throws ProbeFailedException a probe failed
* @throws ProbeInterruptedException the probe process was explicitly interrupted
*/
protected void executeProbePhases() throws ProbeFailedException, ProbeInterruptedException {
switch (probePhase) {
case INIT:
enterProbePhase(ProbePhase.DEPENDENCY_CHECKING);
//fall through straight into the dependency check
case DEPENDENCY_CHECKING:
if (checkAndReportDependencyProbes()) {
enterProbePhase(ProbePhase.BOOTSTRAPPING);
beginBootstrapProbes();
}
break;
case BOOTSTRAPPING:
if (checkAndReportBootstrapProbes()) {
enterProbePhase(ProbePhase.LIVE);
}
break;
case LIVE:
checkAndReportLiveProbes();
break;
case TERMINATING:
default:
//do nothing.
break;
}
}
/**
* Raise a probe failure; injecting the phase into the status result first
*
* @param status ping result
* @param text optional text -null or "" means "none"
* @return an exception ready to throw
*/
private ProbeFailedException raiseProbeFailure(ProbeStatus status, String text) {
status.setProbePhase(probePhase);
log.info("Probe failed: " + status);
return new ProbeFailedException(text, status);
}
@Override
public void run() {
int size = monitorProbes.size();
log.info("Probe Worker Starting; " + size + " probe" + MonitorUtils.toPlural(size) + ":");
enterProbePhase(ProbePhase.DEPENDENCY_CHECKING);
for (Probe probe : monitorProbes) {
log.info(probe.getName());
}
while (!mustExit) {
try {
Thread.sleep(interval);
executeProbePhases();
} catch (ProbeFailedException e) {
//relay to the inner loop handler
probeFailed(e);
} catch (InterruptedException interrupted) {
break;
} catch (ProbeInterruptedException e) {
//exit raised.
//this will be true, just making extra-sure
break;
}
}
log.info("Probe Worker Exiting");
enterProbePhase(ProbePhase.TERMINATING);
}
protected void probeFailed(ProbeFailedException e) {
reportHandler.probeFailure(e);
}
}

View File

@ -1,265 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.servicemonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
/**
* This is the monitor service
*/
public final class ReportingLoop implements Runnable, ProbeReportHandler, MonitorKeys, Closeable {
protected static final Logger log = LoggerFactory.getLogger(ReportingLoop.class);
private final ProbeWorker worker;
private final Thread workerThread;
private final int reportInterval;
private final int probeTimeout;
private final int bootstrapTimeout;
private ProbeReportHandler reporter;
private final String name;
private volatile boolean mustExit;
public ReportingLoop(String name,
ProbeReportHandler reporter,
List<Probe> probes,
List<Probe> dependencyProbes,
int probeInterval,
int reportInterval,
int probeTimeout,
int bootstrapTimeout) throws IOException {
this(name,
reporter,
new ProbeWorker(probes, dependencyProbes, probeInterval, bootstrapTimeout),
reportInterval,
probeTimeout);
}
/**
* Create a new reporting loop -and bond the worker's ProbeReportHandler
* to us
* @param name
* @param reporter
* @param worker
* @param reportInterval
* @param probeTimeout
*/
public ReportingLoop(String name,
ProbeReportHandler reporter,
ProbeWorker worker,
int reportInterval,
int probeTimeout) throws IOException {
this.name = name;
this.reporter = reporter;
this.reportInterval = reportInterval;
this.probeTimeout = probeTimeout;
this.worker = worker;
this.bootstrapTimeout = worker.getBootstrapTimeout();
worker.setReportHandler(this);
workerThread = new Thread(worker, "probe thread - " + name);
worker.init();
}
public int getBootstrapTimeout() {
return bootstrapTimeout;
}
public ReportingLoop withReporter(ProbeReportHandler reporter) {
assert this.reporter == null : "attempting to reassign reporter ";
assert reporter != null : "new reporter is null";
this.reporter = reporter;
return this;
}
/**
* Start the monitoring.
*
* @return false if the monitoring did not start and that the worker threads
* should be run up.
*/
public boolean startReporting() {
String description = "Service Monitor for " + name + ", probe-interval= "
+ MonitorUtils.millisToHumanTime(worker.interval)
+ ", report-interval=" + MonitorUtils.millisToHumanTime(reportInterval)
+ ", probe-timeout=" + timeoutToStr(probeTimeout)
+ ", bootstrap-timeout=" + timeoutToStr(bootstrapTimeout);
log.info("Starting reporting"
+ " to " + reporter
+ description);
return reporter.commence(name, description);
}
private String timeoutToStr(int timeout) {
return timeout >= 0 ? MonitorUtils.millisToHumanTime(timeout) : "not set";
}
private void startWorker() {
log.info("Starting reporting worker thread ");
workerThread.setDaemon(true);
workerThread.start();
}
/**
* This exits the process cleanly
*/
@Override
public void close() {
log.info("Stopping reporting");
mustExit = true;
if (worker != null) {
worker.setMustExit();
workerThread.interrupt();
}
if (reporter != null) {
reporter.unregister();
}
}
@Override
public void probeFailure(ProbeFailedException exception) {
reporter.probeFailure(exception);
}
@Override
public void probeProcessStateChange(ProbePhase probePhase) {
reporter.probeProcessStateChange(probePhase);
}
@Override
public void probeBooted(ProbeStatus status) {
reporter.probeBooted(status);
}
private long now() {
return System.currentTimeMillis();
}
@Override
public void probeResult(ProbePhase phase, ProbeStatus status) {
reporter.probeResult(phase, status);
}
@Override
public boolean commence(String n, String description) {
return true;
}
@Override
public void unregister() {
}
@Override
public void heartbeat(ProbeStatus status) {
}
@Override
public void probeTimedOut(ProbePhase currentPhase, Probe probe, ProbeStatus lastStatus,
long currentTime) {
}
@Override
public void liveProbeCycleCompleted() {
//delegate to the reporter
reporter.liveProbeCycleCompleted();
}
/**
* The reporting loop
*/
void reportingLoop() {
while (!mustExit) {
try {
ProbeStatus workerStatus = worker.getLastStatus();
long now = now();
long lastStatusIssued = workerStatus.getTimestamp();
long timeSinceLastStatusIssued = now - lastStatusIssued;
//two actions can occur here: a heartbeat is issued or a timeout reported.
//this flag decides which
boolean heartbeat;
//based on phase, decide whether to heartbeat or timeout
ProbePhase probePhase = worker.getProbePhase();
switch (probePhase) {
case DEPENDENCY_CHECKING:
//no timeouts in dependency phase
heartbeat = true;
break;
case BOOTSTRAPPING:
//the timeout here is fairly straightforward: heartbeats are
//raised while the worker hasn't timed out
heartbeat = bootstrapTimeout < 0 || timeSinceLastStatusIssued < bootstrapTimeout;
break;
case LIVE:
//use the probe timeout interval between the current time
//and the time the last status event was received.
heartbeat = timeSinceLastStatusIssued < probeTimeout;
break;
case INIT:
case TERMINATING:
default:
//send a heartbeat, because this isn't the time to be failing
heartbeat = true;
}
if (heartbeat) {
//a heartbeat is sent to the reporter
reporter.heartbeat(workerStatus);
} else {
//no response from the worker -it is hung.
reporter.probeTimedOut(probePhase,
worker.getCurrentProbe(),
workerStatus,
now
);
}
//now sleep
Thread.sleep(reportInterval);
} catch (InterruptedException e) {
//interrupted -always exit the loop.
break;
}
}
//this point is reached if and only if a clean exit was requested or something failed.
}
/**
* This can be run in a separate thread, or it can be run directly from the caller.
* Test runs do the latter, HAM runs multiple reporting threads.
*/
@Override
public void run() {
try {
startWorker();
reportingLoop();
} catch (RuntimeException e) {
log.warn("Failure in the reporting loop: " + e, e);
//rethrow so that inline code can pick it up (e.g. test runs)
throw e;
}
}
}

View File

@ -62,6 +62,10 @@ public interface RestApiErrorMessages {
"Invalid no of containers specified";
String ERROR_CONTAINERS_COUNT_FOR_COMP_INVALID =
ERROR_CONTAINERS_COUNT_INVALID + ERROR_SUFFIX_FOR_COMPONENT;
String ERROR_DEPENDENCY_INVALID = "Dependency %s for component %s is " +
"invalid, does not exist as a component";
String ERROR_DEPENDENCY_CYCLE = "Invalid dependencies, a cycle may " +
"exist: %s";
String ERROR_RESOURCE_PROFILE_MULTIPLE_VALUES_NOT_SUPPORTED =
"Cannot specify" + " cpus/memory along with profile";

View File

@ -32,14 +32,18 @@ import org.apache.slider.common.tools.SliderUtils;
import org.apache.slider.core.persist.JsonSerDeser;
import org.apache.slider.providers.AbstractClientProvider;
import org.apache.slider.providers.SliderProviderFactory;
import org.apache.slider.server.servicemonitor.MonitorUtils;
import org.codehaus.jackson.map.PropertyNamingStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class ServiceApiUtil {
@ -176,9 +180,22 @@ public class ServiceApiUtil {
if (comp.getLaunchCommand() == null) {
comp.setLaunchCommand(globalLaunchCommand);
}
// validate dependency existence
if (comp.getDependencies() != null) {
for (String dependency : comp.getDependencies()) {
if (!componentNames.contains(dependency)) {
throw new IllegalArgumentException(String.format(
RestApiErrorMessages.ERROR_DEPENDENCY_INVALID, dependency,
comp.getName()));
}
}
}
validateComponent(comp, fs.getFileSystem());
}
// validate dependency tree
sortByDependencies(application.getComponents());
// Application lifetime if not specified, is set to unlimited lifetime
if (application.getLifetime() == null) {
application.setLifetime(RestApiConstants.DEFAULT_UNLIMITED_LIFETIME);
@ -207,6 +224,8 @@ public class ServiceApiUtil {
}
compClientProvider.validateConfigFiles(comp.getConfiguration()
.getFiles(), fs);
MonitorUtils.getProbe(comp.getReadinessCheck());
}
@VisibleForTesting
@ -301,6 +320,67 @@ public class ServiceApiUtil {
return comp;
}
public static Collection<Component> sortByDependencies(List<Component>
components) {
Map<String, Component> sortedComponents =
sortByDependencies(components, null);
return sortedComponents.values();
}
/**
* Each internal call of sortByDependencies will identify all of the
* components with the same dependency depth (the lowest depth that has not
* been processed yet) and add them to the sortedComponents list, preserving
* their original ordering in the components list.
*
* So the first time it is called, all components with no dependencies
* (depth 0) will be identified. The next time it is called, all components
* that have dependencies only on the the depth 0 components will be
* identified (depth 1). This will be repeated until all components have
* been added to the sortedComponents list. If no new components are
* identified but the sortedComponents list is not complete, an error is
* thrown.
*/
private static Map<String, Component> sortByDependencies(List<Component>
components, Map<String, Component> sortedComponents) {
if (sortedComponents == null) {
sortedComponents = new LinkedHashMap<>();
}
Map<String, Component> componentsToAdd = new LinkedHashMap<>();
List<Component> componentsSkipped = new ArrayList<>();
for (Component component : components) {
String name = component.getName();
if (sortedComponents.containsKey(name)) {
continue;
}
boolean dependenciesAlreadySorted = true;
if (!SliderUtils.isEmpty(component.getDependencies())) {
for (String dependency : component.getDependencies()) {
if (!sortedComponents.containsKey(dependency)) {
dependenciesAlreadySorted = false;
break;
}
}
}
if (dependenciesAlreadySorted) {
componentsToAdd.put(name, component);
} else {
componentsSkipped.add(component);
}
}
if (componentsToAdd.size() == 0) {
throw new IllegalArgumentException(String.format(RestApiErrorMessages
.ERROR_DEPENDENCY_CYCLE, componentsSkipped));
}
sortedComponents.putAll(componentsToAdd);
if (sortedComponents.size() == components.size()) {
return sortedComponents;
}
return sortByDependencies(components, sortedComponents);
}
public static String $(String s) {
return "${" + s +"}";
}

View File

@ -0,0 +1,162 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.slider.server.appmaster.model.appstate;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.slider.api.types.ApplicationLivenessInformation;
import org.apache.slider.server.appmaster.model.mock.BaseMockAppStateTest;
import org.apache.slider.server.appmaster.model.mock.MockRoles;
import org.apache.slider.server.appmaster.operations.AbstractRMOperation;
import org.apache.slider.server.appmaster.state.ContainerAssignment;
import org.apache.slider.server.appmaster.state.RoleInstance;
import org.apache.slider.server.appmaster.state.RoleStatus;
import org.apache.slider.server.servicemonitor.ProbeStatus;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* Test for postponing container requests until dependencies are ready.
*/
public class TestMockAppStateDependencies extends BaseMockAppStateTest
implements MockRoles {
private org.apache.slider.server.servicemonitor.Probe successProbe =
new org.apache.slider.server.servicemonitor.Probe("success", null) {
@Override
public ProbeStatus ping(RoleInstance roleInstance) {
ProbeStatus status = new ProbeStatus();
status.succeed(this);
return status;
}
};
private org.apache.slider.server.servicemonitor.Probe failureProbe =
new org.apache.slider.server.servicemonitor.Probe("failure", null) {
@Override
public ProbeStatus ping(RoleInstance roleInstance) {
ProbeStatus status = new ProbeStatus();
status.fail(this, new Exception());
return status;
}
};
@Override
public String getTestName() {
return "TestMockAppStateDependencies";
}
@Test
public void testDependencies() throws Throwable {
RoleStatus role0Status = getRole0Status();
RoleStatus role1Status = getRole1Status();
// set desired instances for role0 to 1
role0Status.setDesired(1);
// set probe for role0 to use a ping that will always succeed
role0Status.getProviderRole().probe = successProbe;
// set desired instances for role1 to 1
role1Status.setDesired(1);
// set role0 as a dependency of role1
role1Status.getProviderRole().component.setDependencies(Collections
.singletonList(ROLE0));
// role0 has no dependencies, so its dependencies are ready
assertTrue(appState.areDependenciesReady(role0Status));
// role1 dependency (role0) is not ready yet
assertFalse(appState.areDependenciesReady(role1Status));
// start the single requested instance for role0
review(ROLE0, 2);
// role0 is still not ready because a ping has not been issued
assertFalse(appState.areDependenciesReady(role1Status));
// issue pings
appState.monitorComponentInstances();
// now role0 is ready
assertTrue(appState.areDependenciesReady(role1Status));
// increase the desired containers for role0
role0Status.setDesired(2);
// role0 is no longer ready
assertFalse(appState.areDependenciesReady(role1Status));
// start a second instance for role0
review(ROLE0, 2);
// role0 is not ready because ping has not been issued for the new instance
assertFalse(appState.areDependenciesReady(role1Status));
// issue pings
appState.monitorComponentInstances();
// role0 is ready
assertTrue(appState.areDependenciesReady(role1Status));
// set probe for role0 to use a ping that will always fail
role0Status.getProviderRole().probe = failureProbe;
// issue pings
appState.monitorComponentInstances();
// role0 is not ready (failure probe works)
assertFalse(appState.areDependenciesReady(role1Status));
// set probe for role0 to use a ping that will always succeed
role0Status.getProviderRole().probe = successProbe;
// issue pings
appState.monitorComponentInstances();
// role0 is ready
assertTrue(appState.areDependenciesReady(role1Status));
// now role1 instances can be started
review(ROLE1, 1);
}
public void review(String expectedRole, int outstanding) throws Exception {
List<AbstractRMOperation> ops = appState.reviewRequestAndReleaseNodes();
// expect one request in the list
assertEquals(1, ops.size());
// and in a liveness check, expected outstanding
ApplicationLivenessInformation liveness =
appState.getApplicationLivenessInformation();
assertEquals(outstanding, liveness.requestsOutstanding);
assertFalse(liveness.allRequestsSatisfied);
// record container allocated and verify it has the expected role
List<Container> allocations = engine.execute(ops);
List<ContainerAssignment> assignments = new ArrayList<>();
List<AbstractRMOperation> releases = new ArrayList<>();
appState.onContainersAllocated(allocations, assignments, releases);
assertEquals(1, assignments.size());
ContainerAssignment assigned = assignments.get(0);
Container target = assigned.container;
RoleInstance ri = roleInstance(assigned);
assertEquals(expectedRole, ri.role);
// one fewer request outstanding
liveness = appState.getApplicationLivenessInformation();
assertEquals(outstanding - 1, liveness.requestsOutstanding);
// record container start submitted
appState.containerStartSubmitted(target, ri);
// additional review results in no additional requests
ops = appState.reviewRequestAndReleaseNodes();
assertTrue(ops.isEmpty());
// record container start
appState.innerOnNodeManagerContainerStarted(target.getId());
}
}

View File

@ -103,8 +103,6 @@ public class TestMockAppStateUniqueNames extends BaseMockAppStateTest
assertEquals(i, instance.componentId);
assertEquals(group, instance.role);
assertEquals(group, instance.providerRole.name);
assertEquals(group, instance.providerRole.group);
// TODO remove group from provider role if it continues to be unused
i++;
}
}
@ -124,7 +122,6 @@ public class TestMockAppStateUniqueNames extends BaseMockAppStateTest
assertEquals(0, roleStatus.getDesired());
assertEquals(1024L, roleStatus.getResourceRequirements().getMemorySize());
assertEquals(2, roleStatus.getResourceRequirements().getVirtualCores());
assertEquals("group1", roleStatus.getGroup());
// now flex back up
appState.updateComponents(Collections.singletonMap("group1", 3L));
@ -147,7 +144,6 @@ public class TestMockAppStateUniqueNames extends BaseMockAppStateTest
RoleStatus group1 = appState.lookupRoleStatus("group1");
assertEquals(3, group1.getDesired());
assertEquals(1024L, group1.getResourceRequirements().getMemorySize());
assertEquals("group1", group1.getGroup());
}
}

View File

@ -343,7 +343,7 @@ public class TestRoleHistoryOutstandingRequestTracker extends
public void testBuildResourceRequirements() throws Throwable {
// Store original values
Application application = appState.getClusterStatus();
Component role0 = application.getComponent(getRole0Status().getGroup());
Component role0 = application.getComponent(getRole0Status().getName());
String origMem = role0.getResource().getMemory();
Integer origVcores = role0.getResource().getCpus();

View File

@ -17,20 +17,25 @@
package org.apache.slider.server.servicemonitor;
import org.apache.hadoop.conf.Configuration;
import org.apache.slider.server.appmaster.model.mock.MockFactory;
import org.apache.slider.server.appmaster.state.RoleInstance;
import org.junit.Assert;
import org.junit.Test;
public class TestPortProbe extends Assert {
private final MockFactory factory = MockFactory.INSTANCE;
/**
* Assert that a port probe failed if the port is closed
* @throws Throwable
*/
@Test
public void testPortProbeFailsClosedPort() throws Throwable {
PortProbe probe = new PortProbe("127.0.0.1", 65500, 100, "", new Configuration());
PortProbe probe = new PortProbe(65500, 100);
probe.init();
ProbeStatus status = probe.ping(true);
RoleInstance roleInstance = new RoleInstance(factory.newContainer());
roleInstance.ip = "127.0.0.1";
ProbeStatus status = probe.ping(roleInstance);
assertFalse("Expected a failure but got successful result: " + status,
status.isSuccess());
}

View File

@ -34,6 +34,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import static org.apache.slider.util.RestApiConstants.DEFAULT_COMPONENT_NAME;
import static org.apache.slider.util.RestApiConstants.DEFAULT_UNLIMITED_LIFETIME;
@ -390,4 +393,52 @@ public class TestServiceApiUtil {
// original component replaced by external component
assertNotNull(app.getComponent("comp1"));
}
public static void verifyDependencySorting(List<Component> components,
Component... expectedSorting) {
Collection<Component> actualSorting = ServiceApiUtil.sortByDependencies(
components);
assertEquals(expectedSorting.length, actualSorting.size());
int i = 0;
for (Component component : actualSorting) {
assertEquals(expectedSorting[i++], component);
}
}
@Test
public void testDependencySorting() throws IOException {
Component a = new Component().name("a");
Component b = new Component().name("b");
Component c = new Component().name("c");
Component d = new Component().name("d").dependencies(Arrays.asList("c"));
Component e = new Component().name("e").dependencies(Arrays.asList("b",
"d"));
verifyDependencySorting(Arrays.asList(a, b, c), a, b, c);
verifyDependencySorting(Arrays.asList(c, a, b), c, a, b);
verifyDependencySorting(Arrays.asList(a, b, c, d, e), a, b, c, d, e);
verifyDependencySorting(Arrays.asList(e, d, c, b, a), c, b, a, d, e);
c.setDependencies(Arrays.asList("e"));
try {
verifyDependencySorting(Arrays.asList(a, b, c, d, e));
Assert.fail(EXCEPTION_PREFIX + "components with dependency cycle");
} catch (IllegalArgumentException ex) {
assertEquals(String.format(
RestApiErrorMessages.ERROR_DEPENDENCY_CYCLE, Arrays.asList(c, d,
e)), ex.getMessage());
}
SliderFileSystem sfs = initMock(null);
Application application = createValidApplication(null);
application.setComponents(Arrays.asList(c, d, e));
try {
ServiceApiUtil.validateAndResolveApplication(application, sfs);
Assert.fail(EXCEPTION_PREFIX + "components with bad dependencies");
} catch (IllegalArgumentException ex) {
assertEquals(String.format(
RestApiErrorMessages.ERROR_DEPENDENCY_INVALID, "b", "e"), ex
.getMessage());
}
}
}