HBASE-24361 Make `RESTApiClusterManager` more resilient (#1701)

* sometimes API calls return with null/empty response bodies. thus,
  wrap all API calls in a retry loop.
* calls that submit work in the form of "commands" now retrieve the
  commandId from successful command submission, and track completion
  of that command before returning control to calling context.
* model CM's process state and use that model to guide state
  transitions more intelligently. this guards against, for example,
  the start command failing with an error message like "Role must be
  stopped".
* improvements to logging levels, avoid spamming logs with the
  side-effects of retries at this and higher contexts.
* include references to API documentation, such as it is.

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Nick Dimiduk 2020-05-19 09:43:55 -07:00 committed by Nick Dimiduk
parent 3605db2fa0
commit fbe0da2672
2 changed files with 353 additions and 81 deletions

View File

@ -73,11 +73,11 @@ public class HBaseClusterManager extends Configured implements ClusterManager {
"timeout %6$s /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo %5$s\""; "timeout %6$s /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo %5$s\"";
private String tunnelSudoCmd; private String tunnelSudoCmd;
private static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts"; static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts";
private static final int DEFAULT_RETRY_ATTEMPTS = 5; static final int DEFAULT_RETRY_ATTEMPTS = 5;
private static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval"; static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval";
private static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000; static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000;
protected RetryCounterFactory retryCounterFactory; protected RetryCounterFactory retryCounterFactory;

View File

@ -18,11 +18,19 @@
package org.apache.hadoop.hbase; package org.apache.hadoop.hbase;
import static org.apache.hadoop.hbase.HBaseClusterManager.DEFAULT_RETRY_ATTEMPTS;
import static org.apache.hadoop.hbase.HBaseClusterManager.DEFAULT_RETRY_SLEEP_INTERVAL;
import static org.apache.hadoop.hbase.HBaseClusterManager.RETRY_ATTEMPTS_KEY;
import static org.apache.hadoop.hbase.HBaseClusterManager.RETRY_SLEEP_INTERVAL_KEY;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import javax.ws.rs.client.Client; import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity; import javax.ws.rs.client.Entity;
@ -32,8 +40,12 @@ import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriBuilder; import javax.ws.rs.core.UriBuilder;
import javax.xml.ws.http.HTTPException; import javax.xml.ws.http.HTTPException;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature; import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -62,6 +74,10 @@ import org.apache.hbase.thirdparty.com.google.gson.JsonParser;
* *
* This class will defer to the ClusterManager terminology in methods that it implements from * This class will defer to the ClusterManager terminology in methods that it implements from
* that interface, but uses Cloudera Manager's terminology when dealing with its API directly. * that interface, but uses Cloudera Manager's terminology when dealing with its API directly.
*
* DEBUG-level logging gives more details of the actions this class takes as they happen. Log at
* TRACE-level to see the API requests and responses. TRACE-level logging on RetryCounter displays
* wait times, so that can be helpful too.
*/ */
public class RESTApiClusterManager extends Configured implements ClusterManager { public class RESTApiClusterManager extends Configured implements ClusterManager {
// Properties that need to be in the Configuration object to interact with the REST API cluster // Properties that need to be in the Configuration object to interact with the REST API cluster
@ -95,12 +111,14 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
private static final String API_VERSION = "v6"; private static final String API_VERSION = "v6";
// Client instances are expensive, so use the same one for all our REST queries. // Client instances are expensive, so use the same one for all our REST queries.
private Client client = ClientBuilder.newClient(); private final Client client = ClientBuilder.newClient();
// An instance of HBaseClusterManager is used for methods like the kill, resume, and suspend // An instance of HBaseClusterManager is used for methods like the kill, resume, and suspend
// because cluster managers don't tend to implement these operations. // because cluster managers don't tend to implement these operations.
private ClusterManager hBaseClusterManager; private ClusterManager hBaseClusterManager;
private RetryCounterFactory retryCounterFactory;
private static final Logger LOG = LoggerFactory.getLogger(RESTApiClusterManager.class); private static final Logger LOG = LoggerFactory.getLogger(RESTApiClusterManager.class);
RESTApiClusterManager() { } RESTApiClusterManager() { }
@ -124,38 +142,111 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
String serverUsername = conf.get(REST_API_CLUSTER_MANAGER_USERNAME, DEFAULT_SERVER_USERNAME); String serverUsername = conf.get(REST_API_CLUSTER_MANAGER_USERNAME, DEFAULT_SERVER_USERNAME);
String serverPassword = conf.get(REST_API_CLUSTER_MANAGER_PASSWORD, DEFAULT_SERVER_PASSWORD); String serverPassword = conf.get(REST_API_CLUSTER_MANAGER_PASSWORD, DEFAULT_SERVER_PASSWORD);
client.register(HttpAuthenticationFeature.basic(serverUsername, serverPassword)); client.register(HttpAuthenticationFeature.basic(serverUsername, serverPassword));
this.retryCounterFactory = new RetryCounterFactory(new RetryConfig()
.setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS))
.setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL)));
} }
@Override @Override
public void start(ServiceType service, String hostname, int port) throws IOException { public void start(ServiceType service, String hostname, int port) {
performClusterManagerCommand(service, hostname, RoleCommand.START); // With Cloudera Manager (6.3.x), sending a START command to a service role
} // that is already in the "Started" state is an error. CM will log a message
// saying "Role must be stopped". It will complain similarly for other
@Override // expected state transitions.
public void stop(ServiceType service, String hostname, int port) throws IOException { // A role process that has been `kill -9`'ed ends up with the service role
performClusterManagerCommand(service, hostname, RoleCommand.STOP); // retaining the "Started" state but with the process marked as "unhealthy".
} // Instead of blindly issuing the START command, first send a STOP command
// to ensure the START will be accepted.
@Override LOG.debug("Performing start of {} on {}:{}", service, hostname, port);
public void restart(ServiceType service, String hostname, int port) throws IOException { final RoleState currentState = getRoleState(service, hostname);
performClusterManagerCommand(service, hostname, RoleCommand.RESTART); switch (currentState) {
} case NA:
case BUSY:
@Override case UNKNOWN:
public boolean isRunning(ServiceType service, String hostname, int port) throws IOException { case HISTORY_NOT_AVAILABLE:
String serviceName = getServiceName(roleServiceType.get(service)); LOG.warn("Unexpected service state detected. Service START requested, but currently in"
String hostId = getHostId(hostname); + " {} state. Attempting to start. {}, {}:{}", currentState, service, hostname, port);
String roleState = getRoleState(serviceName, service.toString(), hostId); performClusterManagerCommand(service, hostname, RoleCommand.START);
String healthSummary = getHealthSummary(serviceName, service.toString(), hostId); return;
boolean isRunning = false; case STOPPING:
LOG.warn("Unexpected service state detected. Service START requested, but currently in"
// Use Yoda condition to prevent NullPointerException. roleState will be null if the "service + " {} state. Waiting for stop before attempting start. {}, {}:{}", currentState,
// type" does not exist on the specified hostname. service, hostname, port);
if ("STARTED".equals(roleState) && "GOOD".equals(healthSummary)) { waitFor(() -> Objects.equals(RoleState.STOPPED, getRoleState(service, hostname)));
isRunning = true; performClusterManagerCommand(service, hostname, RoleCommand.START);
return;
case STOPPED:
performClusterManagerCommand(service, hostname, RoleCommand.START);
return;
case STARTING:
LOG.warn("Unexpected service state detected. Service START requested, but already in"
+ " {} state. Ignoring current request and waiting for start to complete. {}, {}:{}",
currentState, service, hostname, port);
waitFor(()-> Objects.equals(RoleState.STARTED, getRoleState(service, hostname)));
return;
case STARTED:
LOG.warn("Unexpected service state detected. Service START requested, but already in"
+ " {} state. Restarting. {}, {}:{}", currentState, service, hostname, port);
performClusterManagerCommand(service, hostname, RoleCommand.RESTART);
return;
} }
throw new RuntimeException("should not happen.");
}
return isRunning; @Override
public void stop(ServiceType service, String hostname, int port) {
LOG.debug("Performing stop of {} on {}:{}", service, hostname, port);
final RoleState currentState = getRoleState(service, hostname);
switch (currentState) {
case NA:
case BUSY:
case UNKNOWN:
case HISTORY_NOT_AVAILABLE:
LOG.warn("Unexpected service state detected. Service STOP requested, but already in"
+ " {} state. Attempting to stop. {}, {}:{}", currentState, service, hostname, port);
performClusterManagerCommand(service, hostname, RoleCommand.STOP);
return;
case STOPPING:
waitFor(() -> Objects.equals(RoleState.STOPPED, getRoleState(service, hostname)));
return;
case STOPPED:
LOG.warn("Unexpected service state detected. Service STOP requested, but already in"
+ " {} state. Ignoring current request. {}, {}:{}", currentState, service, hostname,
port);
return;
case STARTING:
LOG.warn("Unexpected service state detected. Service STOP requested, but already in"
+ " {} state. Waiting for start to complete. {}, {}:{}", currentState, service, hostname,
port);
waitFor(()-> Objects.equals(RoleState.STARTED, getRoleState(service, hostname)));
performClusterManagerCommand(service, hostname, RoleCommand.STOP);
return;
case STARTED:
performClusterManagerCommand(service, hostname, RoleCommand.STOP);
return;
}
throw new RuntimeException("should not happen.");
}
@Override
public void restart(ServiceType service, String hostname, int port) {
LOG.debug("Performing stop followed by start of {} on {}:{}", service, hostname, port);
stop(service, hostname, port);
start(service, hostname, port);
}
@Override
public boolean isRunning(ServiceType service, String hostname, int port) {
LOG.debug("Issuing isRunning request against {} on {}:{}", service, hostname, port);
return executeWithRetries(() -> {
String serviceName = getServiceName(roleServiceType.get(service));
String hostId = getHostId(hostname);
RoleState roleState = getRoleState(serviceName, service.toString(), hostId);
HealthSummary healthSummary = getHealthSummary(serviceName, service.toString(), hostId);
return Objects.equals(RoleState.STARTED, roleState)
&& Objects.equals(HealthSummary.GOOD, healthSummary);
});
} }
@Override @Override
@ -173,20 +264,38 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
hBaseClusterManager.resume(service, hostname, port); hBaseClusterManager.resume(service, hostname, port);
} }
// Convenience method to execute command against role on hostname. Only graceful commands are // Convenience method to execute command against role on hostname. Only graceful commands are
// supported since cluster management APIs don't tend to let you SIGKILL things. // supported since cluster management APIs don't tend to let you SIGKILL things.
private void performClusterManagerCommand(ServiceType role, String hostname, RoleCommand command) private void performClusterManagerCommand(ServiceType role, String hostname,
throws IOException { RoleCommand command) {
LOG.info("Performing " + command + " command against " + role + " on " + hostname + "..."); // retry submitting the command until the submission succeeds.
String serviceName = getServiceName(roleServiceType.get(role)); final long commandId = executeWithRetries(() -> {
String hostId = getHostId(hostname); final String serviceName = getServiceName(roleServiceType.get(role));
String roleName = getRoleName(serviceName, role.toString(), hostId); final String hostId = getHostId(hostname);
doRoleCommand(serviceName, roleName, command); final String roleName = getRoleName(serviceName, role.toString(), hostId);
return doRoleCommand(serviceName, roleName, command);
});
LOG.debug("Command {} of {} on {} submitted as commandId {}",
command, role, hostname, commandId);
// assume the submitted command was asynchronous. wait on the commandId to be marked as
// successful.
waitFor(() -> hasCommandCompleted(commandId));
if (!executeWithRetries(() -> hasCommandCompletedSuccessfully(commandId))) {
final String msg = String.format("Command %s of %s on %s submitted as commandId %s failed.",
command, role, hostname, commandId);
// TODO: this does not interrupt the monkey. should it?
throw new RuntimeException(msg);
}
LOG.debug("Command {} of {} on {} submitted as commandId {} completed successfully.",
command, role, hostname, commandId);
} }
// Performing a command (e.g. starting or stopping a role) requires a POST instead of a GET. /**
private void doRoleCommand(String serviceName, String roleName, RoleCommand roleCommand) { * Issues a command (e.g. starting or stopping a role).
* @return the commandId of a successfully submitted asynchronous command.
*/
private long doRoleCommand(String serviceName, String roleName, RoleCommand roleCommand) {
URI uri = UriBuilder.fromUri(serverHostname) URI uri = UriBuilder.fromUri(serverHostname)
.path("api") .path("api")
.path(API_VERSION) .path(API_VERSION)
@ -198,29 +307,46 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
.path(roleCommand.toString()) .path(roleCommand.toString())
.build(); .build();
String body = "{ \"items\": [ \"" + roleName + "\" ] }"; String body = "{ \"items\": [ \"" + roleName + "\" ] }";
LOG.info("Executing POST against " + uri + " with body " + body + "..."); LOG.trace("Executing POST against {} with body {} ...", uri, body);
WebTarget webTarget = client.target(uri); WebTarget webTarget = client.target(uri);
Invocation.Builder invocationBuilder = webTarget.request(MediaType.APPLICATION_JSON); Invocation.Builder invocationBuilder = webTarget.request(MediaType.APPLICATION_JSON);
Response response = invocationBuilder.post(Entity.json(body)); Response response = invocationBuilder.post(Entity.json(body));
int statusCode = response.getStatus(); final int statusCode = response.getStatus();
final String responseBody = response.readEntity(String.class);
if (statusCode != Response.Status.OK.getStatusCode()) { if (statusCode != Response.Status.OK.getStatusCode()) {
LOG.warn(
"RoleCommand failed with status code {} and response body {}", statusCode, responseBody);
throw new HTTPException(statusCode); throw new HTTPException(statusCode);
} }
LOG.trace("POST against {} completed with status code {} and response body {}",
uri, statusCode, responseBody);
return parser.parse(responseBody)
.getAsJsonObject()
.get("items")
.getAsJsonArray()
.get(0)
.getAsJsonObject()
.get("id")
.getAsLong();
} }
// Possible healthSummary values include "GOOD" and "BAD." private HealthSummary getHealthSummary(String serviceName, String roleType, String hostId) {
private String getHealthSummary(String serviceName, String roleType, String hostId) return HealthSummary.fromString(
throws IOException { getRolePropertyValue(serviceName, roleType, hostId, "healthSummary"));
return getRolePropertyValue(serviceName, roleType, hostId, "healthSummary");
} }
// This API uses a hostId to execute host-specific commands; get one from a hostname. // This API uses a hostId to execute host-specific commands; get one from a hostname.
private String getHostId(String hostname) throws IOException { private String getHostId(String hostname) {
String hostId = null; String hostId = null;
URI uri = UriBuilder.fromUri(serverHostname)
URI uri = .path("api")
UriBuilder.fromUri(serverHostname).path("api").path(API_VERSION).path("hosts").build(); .path(API_VERSION)
JsonElement hosts = getJsonNodeFromURIGet(uri); .path("hosts")
.build();
JsonElement hosts = parser.parse(getFromURIGet(uri))
.getAsJsonObject()
.get("items");
if (hosts != null) { if (hosts != null) {
// Iterate through the list of hosts, stopping once you've reached the requested hostname. // Iterate through the list of hosts, stopping once you've reached the requested hostname.
for (JsonElement host : hosts.getAsJsonArray()) { for (JsonElement host : hosts.getAsJsonArray()) {
@ -229,42 +355,50 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
break; break;
} }
} }
} else {
hostId = null;
} }
return hostId; return hostId;
} }
// Execute GET against URI, returning a JsonNode object to be traversed. private String getFromURIGet(URI uri) {
private JsonElement getJsonNodeFromURIGet(URI uri) throws IOException { LOG.trace("Executing GET against {} ...", uri);
LOG.debug("Executing GET against " + uri + "...");
final Response response = client.target(uri) final Response response = client.target(uri)
.request(MediaType.APPLICATION_JSON_TYPE) .request(MediaType.APPLICATION_JSON_TYPE)
.get(); .get();
int statusCode = response.getStatus(); int statusCode = response.getStatus();
final String responseBody = response.readEntity(String.class);
if (statusCode != Response.Status.OK.getStatusCode()) { if (statusCode != Response.Status.OK.getStatusCode()) {
LOG.warn(
"request failed with status code {} and response body {}", statusCode, responseBody);
throw new HTTPException(statusCode); throw new HTTPException(statusCode);
} }
// This API folds information as the value to an "items" attribute. // This API folds information as the value to an "items" attribute.
return parser.parse(response.readEntity(String.class)) LOG.trace("GET against {} completed with status code {} and response body {}",
.getAsJsonObject() uri, statusCode, responseBody);
.get("items"); return responseBody;
} }
// This API assigns a unique role name to each host's instance of a role. // This API assigns a unique role name to each host's instance of a role.
private String getRoleName(String serviceName, String roleType, String hostId) private String getRoleName(String serviceName, String roleType, String hostId) {
throws IOException {
return getRolePropertyValue(serviceName, roleType, hostId, "name"); return getRolePropertyValue(serviceName, roleType, hostId, "name");
} }
// Get the value of a property from a role on a particular host. // Get the value of a property from a role on a particular host.
private String getRolePropertyValue(String serviceName, String roleType, String hostId, private String getRolePropertyValue(String serviceName, String roleType, String hostId,
String property) throws IOException { String property) {
String roleValue = null; String roleValue = null;
URI uri = UriBuilder.fromUri(serverHostname).path("api").path(API_VERSION).path("clusters") URI uri = UriBuilder.fromUri(serverHostname)
.path(clusterName).path("services").path(serviceName).path("roles").build(); .path("api")
JsonElement roles = getJsonNodeFromURIGet(uri); .path(API_VERSION)
.path("clusters")
.path(clusterName)
.path("services")
.path(serviceName)
.path("roles")
.build();
JsonElement roles = parser.parse(getFromURIGet(uri))
.getAsJsonObject()
.get("items");
if (roles != null) { if (roles != null) {
// Iterate through the list of roles, stopping once the requested one is found. // Iterate through the list of roles, stopping once the requested one is found.
for (JsonElement role : roles.getAsJsonArray()) { for (JsonElement role : roles.getAsJsonArray()) {
@ -281,18 +415,29 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
return roleValue; return roleValue;
} }
// Possible roleState values include "STARTED" and "STOPPED." private RoleState getRoleState(ServiceType service, String hostname) {
private String getRoleState(String serviceName, String roleType, String hostId) return executeWithRetries(() -> {
throws IOException { String serviceName = getServiceName(roleServiceType.get(service));
return getRolePropertyValue(serviceName, roleType, hostId, "roleState"); String hostId = getHostId(hostname);
RoleState state = getRoleState(serviceName, service.toString(), hostId);
// sometimes the response (usually the first) is null. retry those.
return Objects.requireNonNull(state);
});
}
private RoleState getRoleState(String serviceName, String roleType, String hostId) {
return RoleState.fromString(
getRolePropertyValue(serviceName, roleType, hostId, "roleState"));
} }
// Convert a service (e.g. "HBASE," "HDFS") into a service name (e.g. "HBASE-1," "HDFS-1"). // Convert a service (e.g. "HBASE," "HDFS") into a service name (e.g. "HBASE-1," "HDFS-1").
private String getServiceName(Service service) throws IOException { private String getServiceName(Service service) {
String serviceName = null; String serviceName = null;
URI uri = UriBuilder.fromUri(serverHostname).path("api").path(API_VERSION).path("clusters") URI uri = UriBuilder.fromUri(serverHostname).path("api").path(API_VERSION).path("clusters")
.path(clusterName).path("services").build(); .path(clusterName).path("services").build();
JsonElement services = getJsonNodeFromURIGet(uri); JsonElement services = parser.parse(getFromURIGet(uri))
.getAsJsonObject()
.get("items");
if (services != null) { if (services != null) {
// Iterate through the list of services, stopping once the requested one is found. // Iterate through the list of services, stopping once the requested one is found.
for (JsonElement serviceEntry : services.getAsJsonArray()) { for (JsonElement serviceEntry : services.getAsJsonArray()) {
@ -306,6 +451,98 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
return serviceName; return serviceName;
} }
private Optional<JsonObject> getCommand(final long commandId) {
final URI uri = UriBuilder.fromUri(serverHostname)
.path("api")
.path(API_VERSION)
.path("commands")
.path(Long.toString(commandId))
.build();
return Optional.ofNullable(getFromURIGet(uri))
.map(parser::parse)
.map(JsonElement::getAsJsonObject);
}
/**
* Return {@code true} if the {@code commandId} has finished processing.
*/
private boolean hasCommandCompleted(final long commandId) {
return getCommand(commandId)
.map(val -> {
final boolean isActive = val.get("active").getAsBoolean();
if (isActive) {
LOG.debug("command {} is still active.", commandId);
}
return !isActive;
})
.orElse(false);
}
/**
* Return {@code true} if the {@code commandId} has finished successfully.
*/
private boolean hasCommandCompletedSuccessfully(final long commandId) {
return getCommand(commandId)
.filter(val -> {
final boolean isActive = val.get("active").getAsBoolean();
if (isActive) {
LOG.debug("command {} is still active.", commandId);
}
return !isActive;
})
.map(val -> {
final boolean isSuccess = val.get("success").getAsBoolean();
LOG.debug("command {} completed as {}.", commandId, isSuccess);
return isSuccess;
})
.orElse(false);
}
/**
* Helper method for executing retryable work.
*/
private <T> T executeWithRetries(final Callable<T> callable) {
final RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
return callable.call();
} catch (Exception e) {
if (retryCounter.shouldRetry()) {
LOG.debug("execution failed with exception. Retrying.", e);
} else {
throw new RuntimeException("retries exhausted", e);
}
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
private void waitFor(final Callable<Boolean> predicate) {
final RetryCounter retryCounter = retryCounterFactory.create();
while (true) {
try {
if (Objects.equals(true, predicate.call())) {
return;
}
} catch (Exception e) {
if (retryCounter.shouldRetry()) {
LOG.debug("execution failed with exception. Retrying.", e);
} else {
throw new RuntimeException("retries exhausted", e);
}
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
/* /*
* Some enums to guard against bad calls. * Some enums to guard against bad calls.
*/ */
@ -323,17 +560,52 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
} }
} }
/**
* Represents the configured run state of a role.
* @see <a href="https://archive.cloudera.com/cm6/6.3.0/generic/jar/cm_api/apidocs/json_ApiRoleState.html">
* https://archive.cloudera.com/cm6/6.3.0/generic/jar/cm_api/apidocs/json_ApiRoleState.html</a>
*/
private enum RoleState {
HISTORY_NOT_AVAILABLE, UNKNOWN, STARTING, STARTED, BUSY, STOPPING, STOPPED, NA;
public static RoleState fromString(final String value) {
if (StringUtils.isBlank(value)) {
return null;
}
return RoleState.valueOf(value.toUpperCase());
}
}
/**
* Represents of the high-level health status of a subject in the cluster.
* @see <a href="https://archive.cloudera.com/cm6/6.3.0/generic/jar/cm_api/apidocs/json_ApiHealthSummary.html">
* https://archive.cloudera.com/cm6/6.3.0/generic/jar/cm_api/apidocs/json_ApiHealthSummary.html</a>
*/
private enum HealthSummary {
DISABLED, HISTORY_NOT_AVAILABLE, NOT_AVAILABLE, GOOD, CONCERNING, BAD;
public static HealthSummary fromString(final String value) {
if (StringUtils.isBlank(value)) {
return null;
}
return HealthSummary.valueOf(value.toUpperCase());
}
}
// ClusterManager methods take a "ServiceType" object (e.g. "HBASE_MASTER," "HADOOP_NAMENODE"). // ClusterManager methods take a "ServiceType" object (e.g. "HBASE_MASTER," "HADOOP_NAMENODE").
// These "service types," which cluster managers call "roles" or "components," need to be mapped // These "service types," which cluster managers call "roles" or "components," need to be mapped
// to their corresponding service (e.g. "HBase," "HDFS") in order to be controlled. // to their corresponding service (e.g. "HBase," "HDFS") in order to be controlled.
private static Map<ServiceType, Service> roleServiceType = new HashMap<>(); private static final Map<ServiceType, Service> roleServiceType = buildRoleServiceTypeMap();
static {
roleServiceType.put(ServiceType.HADOOP_NAMENODE, Service.HDFS); private static Map<ServiceType, Service> buildRoleServiceTypeMap() {
roleServiceType.put(ServiceType.HADOOP_DATANODE, Service.HDFS); final Map<ServiceType, Service> ret = new HashMap<>();
roleServiceType.put(ServiceType.HADOOP_JOBTRACKER, Service.MAPREDUCE); ret.put(ServiceType.HADOOP_NAMENODE, Service.HDFS);
roleServiceType.put(ServiceType.HADOOP_TASKTRACKER, Service.MAPREDUCE); ret.put(ServiceType.HADOOP_DATANODE, Service.HDFS);
roleServiceType.put(ServiceType.HBASE_MASTER, Service.HBASE); ret.put(ServiceType.HADOOP_JOBTRACKER, Service.MAPREDUCE);
roleServiceType.put(ServiceType.HBASE_REGIONSERVER, Service.HBASE); ret.put(ServiceType.HADOOP_TASKTRACKER, Service.MAPREDUCE);
ret.put(ServiceType.HBASE_MASTER, Service.HBASE);
ret.put(ServiceType.HBASE_REGIONSERVER, Service.HBASE);
return Collections.unmodifiableMap(ret);
} }
enum Service { enum Service {