diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/HBaseClusterManager.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/HBaseClusterManager.java index 957e0946ea1..122fad5a0a9 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/HBaseClusterManager.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/HBaseClusterManager.java @@ -73,11 +73,11 @@ public class HBaseClusterManager extends Configured implements ClusterManager { "timeout %6$s /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo %5$s\""; private String tunnelSudoCmd; - private static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts"; - private static final int DEFAULT_RETRY_ATTEMPTS = 5; + static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts"; + static final int DEFAULT_RETRY_ATTEMPTS = 5; - private static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval"; - private static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000; + static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval"; + static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000; protected RetryCounterFactory retryCounterFactory; diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/RESTApiClusterManager.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/RESTApiClusterManager.java index f5dd93b0960..10954ce164b 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/RESTApiClusterManager.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/RESTApiClusterManager.java @@ -18,11 +18,19 @@ package org.apache.hadoop.hbase; +import static org.apache.hadoop.hbase.HBaseClusterManager.DEFAULT_RETRY_ATTEMPTS; +import static org.apache.hadoop.hbase.HBaseClusterManager.DEFAULT_RETRY_SLEEP_INTERVAL; +import static org.apache.hadoop.hbase.HBaseClusterManager.RETRY_ATTEMPTS_KEY; +import static org.apache.hadoop.hbase.HBaseClusterManager.RETRY_SLEEP_INTERVAL_KEY; import java.io.IOException; import java.net.URI; +import java.util.Collections; import java.util.HashMap; import java.util.Locale; import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.Callable; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.Entity; @@ -32,8 +40,12 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import javax.ws.rs.core.UriBuilder; import javax.xml.ws.http.HTTPException; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.hbase.util.RetryCounter; +import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig; +import org.apache.hadoop.hbase.util.RetryCounterFactory; import org.apache.hadoop.util.ReflectionUtils; import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature; import org.slf4j.Logger; @@ -62,6 +74,10 @@ import org.apache.hbase.thirdparty.com.google.gson.JsonParser; * * This class will defer to the ClusterManager terminology in methods that it implements from * that interface, but uses Cloudera Manager's terminology when dealing with its API directly. + * + * DEBUG-level logging gives more details of the actions this class takes as they happen. Log at + * TRACE-level to see the API requests and responses. TRACE-level logging on RetryCounter displays + * wait times, so that can be helpful too. */ public class RESTApiClusterManager extends Configured implements ClusterManager { // Properties that need to be in the Configuration object to interact with the REST API cluster @@ -95,12 +111,14 @@ public class RESTApiClusterManager extends Configured implements ClusterManager private static final String API_VERSION = "v6"; // Client instances are expensive, so use the same one for all our REST queries. - private Client client = ClientBuilder.newClient(); + private final Client client = ClientBuilder.newClient(); // An instance of HBaseClusterManager is used for methods like the kill, resume, and suspend // because cluster managers don't tend to implement these operations. private ClusterManager hBaseClusterManager; + private RetryCounterFactory retryCounterFactory; + private static final Logger LOG = LoggerFactory.getLogger(RESTApiClusterManager.class); RESTApiClusterManager() { } @@ -124,38 +142,111 @@ public class RESTApiClusterManager extends Configured implements ClusterManager String serverUsername = conf.get(REST_API_CLUSTER_MANAGER_USERNAME, DEFAULT_SERVER_USERNAME); String serverPassword = conf.get(REST_API_CLUSTER_MANAGER_PASSWORD, DEFAULT_SERVER_PASSWORD); client.register(HttpAuthenticationFeature.basic(serverUsername, serverPassword)); + + this.retryCounterFactory = new RetryCounterFactory(new RetryConfig() + .setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS)) + .setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL))); } @Override - public void start(ServiceType service, String hostname, int port) throws IOException { - performClusterManagerCommand(service, hostname, RoleCommand.START); - } - - @Override - public void stop(ServiceType service, String hostname, int port) throws IOException { - performClusterManagerCommand(service, hostname, RoleCommand.STOP); - } - - @Override - public void restart(ServiceType service, String hostname, int port) throws IOException { - performClusterManagerCommand(service, hostname, RoleCommand.RESTART); - } - - @Override - public boolean isRunning(ServiceType service, String hostname, int port) throws IOException { - String serviceName = getServiceName(roleServiceType.get(service)); - String hostId = getHostId(hostname); - String roleState = getRoleState(serviceName, service.toString(), hostId); - String healthSummary = getHealthSummary(serviceName, service.toString(), hostId); - boolean isRunning = false; - - // Use Yoda condition to prevent NullPointerException. roleState will be null if the "service - // type" does not exist on the specified hostname. - if ("STARTED".equals(roleState) && "GOOD".equals(healthSummary)) { - isRunning = true; + public void start(ServiceType service, String hostname, int port) { + // With Cloudera Manager (6.3.x), sending a START command to a service role + // that is already in the "Started" state is an error. CM will log a message + // saying "Role must be stopped". It will complain similarly for other + // expected state transitions. + // A role process that has been `kill -9`'ed ends up with the service role + // retaining the "Started" state but with the process marked as "unhealthy". + // Instead of blindly issuing the START command, first send a STOP command + // to ensure the START will be accepted. + LOG.debug("Performing start of {} on {}:{}", service, hostname, port); + final RoleState currentState = getRoleState(service, hostname); + switch (currentState) { + case NA: + case BUSY: + case UNKNOWN: + case HISTORY_NOT_AVAILABLE: + LOG.warn("Unexpected service state detected. Service START requested, but currently in" + + " {} state. Attempting to start. {}, {}:{}", currentState, service, hostname, port); + performClusterManagerCommand(service, hostname, RoleCommand.START); + return; + case STOPPING: + LOG.warn("Unexpected service state detected. Service START requested, but currently in" + + " {} state. Waiting for stop before attempting start. {}, {}:{}", currentState, + service, hostname, port); + waitFor(() -> Objects.equals(RoleState.STOPPED, getRoleState(service, hostname))); + performClusterManagerCommand(service, hostname, RoleCommand.START); + return; + case STOPPED: + performClusterManagerCommand(service, hostname, RoleCommand.START); + return; + case STARTING: + LOG.warn("Unexpected service state detected. Service START requested, but already in" + + " {} state. Ignoring current request and waiting for start to complete. {}, {}:{}", + currentState, service, hostname, port); + waitFor(()-> Objects.equals(RoleState.STARTED, getRoleState(service, hostname))); + return; + case STARTED: + LOG.warn("Unexpected service state detected. Service START requested, but already in" + + " {} state. Restarting. {}, {}:{}", currentState, service, hostname, port); + performClusterManagerCommand(service, hostname, RoleCommand.RESTART); + return; } + throw new RuntimeException("should not happen."); + } - return isRunning; + @Override + public void stop(ServiceType service, String hostname, int port) { + LOG.debug("Performing stop of {} on {}:{}", service, hostname, port); + final RoleState currentState = getRoleState(service, hostname); + switch (currentState) { + case NA: + case BUSY: + case UNKNOWN: + case HISTORY_NOT_AVAILABLE: + LOG.warn("Unexpected service state detected. Service STOP requested, but already in" + + " {} state. Attempting to stop. {}, {}:{}", currentState, service, hostname, port); + performClusterManagerCommand(service, hostname, RoleCommand.STOP); + return; + case STOPPING: + waitFor(() -> Objects.equals(RoleState.STOPPED, getRoleState(service, hostname))); + return; + case STOPPED: + LOG.warn("Unexpected service state detected. Service STOP requested, but already in" + + " {} state. Ignoring current request. {}, {}:{}", currentState, service, hostname, + port); + return; + case STARTING: + LOG.warn("Unexpected service state detected. Service STOP requested, but already in" + + " {} state. Waiting for start to complete. {}, {}:{}", currentState, service, hostname, + port); + waitFor(()-> Objects.equals(RoleState.STARTED, getRoleState(service, hostname))); + performClusterManagerCommand(service, hostname, RoleCommand.STOP); + return; + case STARTED: + performClusterManagerCommand(service, hostname, RoleCommand.STOP); + return; + } + throw new RuntimeException("should not happen."); + } + + @Override + public void restart(ServiceType service, String hostname, int port) { + LOG.debug("Performing stop followed by start of {} on {}:{}", service, hostname, port); + stop(service, hostname, port); + start(service, hostname, port); + } + + @Override + public boolean isRunning(ServiceType service, String hostname, int port) { + LOG.debug("Issuing isRunning request against {} on {}:{}", service, hostname, port); + return executeWithRetries(() -> { + String serviceName = getServiceName(roleServiceType.get(service)); + String hostId = getHostId(hostname); + RoleState roleState = getRoleState(serviceName, service.toString(), hostId); + HealthSummary healthSummary = getHealthSummary(serviceName, service.toString(), hostId); + return Objects.equals(RoleState.STARTED, roleState) + && Objects.equals(HealthSummary.GOOD, healthSummary); + }); } @Override @@ -173,20 +264,38 @@ public class RESTApiClusterManager extends Configured implements ClusterManager hBaseClusterManager.resume(service, hostname, port); } - // Convenience method to execute command against role on hostname. Only graceful commands are // supported since cluster management APIs don't tend to let you SIGKILL things. - private void performClusterManagerCommand(ServiceType role, String hostname, RoleCommand command) - throws IOException { - LOG.info("Performing " + command + " command against " + role + " on " + hostname + "..."); - String serviceName = getServiceName(roleServiceType.get(role)); - String hostId = getHostId(hostname); - String roleName = getRoleName(serviceName, role.toString(), hostId); - doRoleCommand(serviceName, roleName, command); + private void performClusterManagerCommand(ServiceType role, String hostname, + RoleCommand command) { + // retry submitting the command until the submission succeeds. + final long commandId = executeWithRetries(() -> { + final String serviceName = getServiceName(roleServiceType.get(role)); + final String hostId = getHostId(hostname); + final String roleName = getRoleName(serviceName, role.toString(), hostId); + return doRoleCommand(serviceName, roleName, command); + }); + LOG.debug("Command {} of {} on {} submitted as commandId {}", + command, role, hostname, commandId); + + // assume the submitted command was asynchronous. wait on the commandId to be marked as + // successful. + waitFor(() -> hasCommandCompleted(commandId)); + if (!executeWithRetries(() -> hasCommandCompletedSuccessfully(commandId))) { + final String msg = String.format("Command %s of %s on %s submitted as commandId %s failed.", + command, role, hostname, commandId); + // TODO: this does not interrupt the monkey. should it? + throw new RuntimeException(msg); + } + LOG.debug("Command {} of {} on {} submitted as commandId {} completed successfully.", + command, role, hostname, commandId); } - // Performing a command (e.g. starting or stopping a role) requires a POST instead of a GET. - private void doRoleCommand(String serviceName, String roleName, RoleCommand roleCommand) { + /** + * Issues a command (e.g. starting or stopping a role). + * @return the commandId of a successfully submitted asynchronous command. + */ + private long doRoleCommand(String serviceName, String roleName, RoleCommand roleCommand) { URI uri = UriBuilder.fromUri(serverHostname) .path("api") .path(API_VERSION) @@ -198,29 +307,46 @@ public class RESTApiClusterManager extends Configured implements ClusterManager .path(roleCommand.toString()) .build(); String body = "{ \"items\": [ \"" + roleName + "\" ] }"; - LOG.info("Executing POST against " + uri + " with body " + body + "..."); + LOG.trace("Executing POST against {} with body {} ...", uri, body); WebTarget webTarget = client.target(uri); Invocation.Builder invocationBuilder = webTarget.request(MediaType.APPLICATION_JSON); Response response = invocationBuilder.post(Entity.json(body)); - int statusCode = response.getStatus(); + final int statusCode = response.getStatus(); + final String responseBody = response.readEntity(String.class); if (statusCode != Response.Status.OK.getStatusCode()) { + LOG.warn( + "RoleCommand failed with status code {} and response body {}", statusCode, responseBody); throw new HTTPException(statusCode); } + + LOG.trace("POST against {} completed with status code {} and response body {}", + uri, statusCode, responseBody); + return parser.parse(responseBody) + .getAsJsonObject() + .get("items") + .getAsJsonArray() + .get(0) + .getAsJsonObject() + .get("id") + .getAsLong(); } - // Possible healthSummary values include "GOOD" and "BAD." - private String getHealthSummary(String serviceName, String roleType, String hostId) - throws IOException { - return getRolePropertyValue(serviceName, roleType, hostId, "healthSummary"); + private HealthSummary getHealthSummary(String serviceName, String roleType, String hostId) { + return HealthSummary.fromString( + getRolePropertyValue(serviceName, roleType, hostId, "healthSummary")); } // This API uses a hostId to execute host-specific commands; get one from a hostname. - private String getHostId(String hostname) throws IOException { + private String getHostId(String hostname) { String hostId = null; - - URI uri = - UriBuilder.fromUri(serverHostname).path("api").path(API_VERSION).path("hosts").build(); - JsonElement hosts = getJsonNodeFromURIGet(uri); + URI uri = UriBuilder.fromUri(serverHostname) + .path("api") + .path(API_VERSION) + .path("hosts") + .build(); + JsonElement hosts = parser.parse(getFromURIGet(uri)) + .getAsJsonObject() + .get("items"); if (hosts != null) { // Iterate through the list of hosts, stopping once you've reached the requested hostname. for (JsonElement host : hosts.getAsJsonArray()) { @@ -229,42 +355,50 @@ public class RESTApiClusterManager extends Configured implements ClusterManager break; } } - } else { - hostId = null; } return hostId; } - // Execute GET against URI, returning a JsonNode object to be traversed. - private JsonElement getJsonNodeFromURIGet(URI uri) throws IOException { - LOG.debug("Executing GET against " + uri + "..."); + private String getFromURIGet(URI uri) { + LOG.trace("Executing GET against {} ...", uri); final Response response = client.target(uri) .request(MediaType.APPLICATION_JSON_TYPE) .get(); int statusCode = response.getStatus(); + final String responseBody = response.readEntity(String.class); if (statusCode != Response.Status.OK.getStatusCode()) { + LOG.warn( + "request failed with status code {} and response body {}", statusCode, responseBody); throw new HTTPException(statusCode); } // This API folds information as the value to an "items" attribute. - return parser.parse(response.readEntity(String.class)) - .getAsJsonObject() - .get("items"); + LOG.trace("GET against {} completed with status code {} and response body {}", + uri, statusCode, responseBody); + return responseBody; } // This API assigns a unique role name to each host's instance of a role. - private String getRoleName(String serviceName, String roleType, String hostId) - throws IOException { + private String getRoleName(String serviceName, String roleType, String hostId) { return getRolePropertyValue(serviceName, roleType, hostId, "name"); } // Get the value of a property from a role on a particular host. private String getRolePropertyValue(String serviceName, String roleType, String hostId, - String property) throws IOException { + String property) { String roleValue = null; - URI uri = UriBuilder.fromUri(serverHostname).path("api").path(API_VERSION).path("clusters") - .path(clusterName).path("services").path(serviceName).path("roles").build(); - JsonElement roles = getJsonNodeFromURIGet(uri); + URI uri = UriBuilder.fromUri(serverHostname) + .path("api") + .path(API_VERSION) + .path("clusters") + .path(clusterName) + .path("services") + .path(serviceName) + .path("roles") + .build(); + JsonElement roles = parser.parse(getFromURIGet(uri)) + .getAsJsonObject() + .get("items"); if (roles != null) { // Iterate through the list of roles, stopping once the requested one is found. for (JsonElement role : roles.getAsJsonArray()) { @@ -281,18 +415,29 @@ public class RESTApiClusterManager extends Configured implements ClusterManager return roleValue; } - // Possible roleState values include "STARTED" and "STOPPED." - private String getRoleState(String serviceName, String roleType, String hostId) - throws IOException { - return getRolePropertyValue(serviceName, roleType, hostId, "roleState"); + private RoleState getRoleState(ServiceType service, String hostname) { + return executeWithRetries(() -> { + String serviceName = getServiceName(roleServiceType.get(service)); + String hostId = getHostId(hostname); + RoleState state = getRoleState(serviceName, service.toString(), hostId); + // sometimes the response (usually the first) is null. retry those. + return Objects.requireNonNull(state); + }); + } + + private RoleState getRoleState(String serviceName, String roleType, String hostId) { + return RoleState.fromString( + getRolePropertyValue(serviceName, roleType, hostId, "roleState")); } // Convert a service (e.g. "HBASE," "HDFS") into a service name (e.g. "HBASE-1," "HDFS-1"). - private String getServiceName(Service service) throws IOException { + private String getServiceName(Service service) { String serviceName = null; URI uri = UriBuilder.fromUri(serverHostname).path("api").path(API_VERSION).path("clusters") .path(clusterName).path("services").build(); - JsonElement services = getJsonNodeFromURIGet(uri); + JsonElement services = parser.parse(getFromURIGet(uri)) + .getAsJsonObject() + .get("items"); if (services != null) { // Iterate through the list of services, stopping once the requested one is found. for (JsonElement serviceEntry : services.getAsJsonArray()) { @@ -306,6 +451,98 @@ public class RESTApiClusterManager extends Configured implements ClusterManager return serviceName; } + private Optional getCommand(final long commandId) { + final URI uri = UriBuilder.fromUri(serverHostname) + .path("api") + .path(API_VERSION) + .path("commands") + .path(Long.toString(commandId)) + .build(); + return Optional.ofNullable(getFromURIGet(uri)) + .map(parser::parse) + .map(JsonElement::getAsJsonObject); + } + + /** + * Return {@code true} if the {@code commandId} has finished processing. + */ + private boolean hasCommandCompleted(final long commandId) { + return getCommand(commandId) + .map(val -> { + final boolean isActive = val.get("active").getAsBoolean(); + if (isActive) { + LOG.debug("command {} is still active.", commandId); + } + return !isActive; + }) + .orElse(false); + } + + /** + * Return {@code true} if the {@code commandId} has finished successfully. + */ + private boolean hasCommandCompletedSuccessfully(final long commandId) { + return getCommand(commandId) + .filter(val -> { + final boolean isActive = val.get("active").getAsBoolean(); + if (isActive) { + LOG.debug("command {} is still active.", commandId); + } + return !isActive; + }) + .map(val -> { + final boolean isSuccess = val.get("success").getAsBoolean(); + LOG.debug("command {} completed as {}.", commandId, isSuccess); + return isSuccess; + }) + .orElse(false); + } + + /** + * Helper method for executing retryable work. + */ + private T executeWithRetries(final Callable callable) { + final RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + return callable.call(); + } catch (Exception e) { + if (retryCounter.shouldRetry()) { + LOG.debug("execution failed with exception. Retrying.", e); + } else { + throw new RuntimeException("retries exhausted", e); + } + } + try { + retryCounter.sleepUntilNextRetry(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private void waitFor(final Callable predicate) { + final RetryCounter retryCounter = retryCounterFactory.create(); + while (true) { + try { + if (Objects.equals(true, predicate.call())) { + return; + } + } catch (Exception e) { + if (retryCounter.shouldRetry()) { + LOG.debug("execution failed with exception. Retrying.", e); + } else { + throw new RuntimeException("retries exhausted", e); + } + } + try { + retryCounter.sleepUntilNextRetry(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + /* * Some enums to guard against bad calls. */ @@ -323,17 +560,52 @@ public class RESTApiClusterManager extends Configured implements ClusterManager } } + /** + * Represents the configured run state of a role. + * @see + * https://archive.cloudera.com/cm6/6.3.0/generic/jar/cm_api/apidocs/json_ApiRoleState.html + */ + private enum RoleState { + HISTORY_NOT_AVAILABLE, UNKNOWN, STARTING, STARTED, BUSY, STOPPING, STOPPED, NA; + + public static RoleState fromString(final String value) { + if (StringUtils.isBlank(value)) { + return null; + } + return RoleState.valueOf(value.toUpperCase()); + } + } + + /** + * Represents of the high-level health status of a subject in the cluster. + * @see + * https://archive.cloudera.com/cm6/6.3.0/generic/jar/cm_api/apidocs/json_ApiHealthSummary.html + */ + private enum HealthSummary { + DISABLED, HISTORY_NOT_AVAILABLE, NOT_AVAILABLE, GOOD, CONCERNING, BAD; + + public static HealthSummary fromString(final String value) { + if (StringUtils.isBlank(value)) { + return null; + } + return HealthSummary.valueOf(value.toUpperCase()); + } + } + // ClusterManager methods take a "ServiceType" object (e.g. "HBASE_MASTER," "HADOOP_NAMENODE"). // These "service types," which cluster managers call "roles" or "components," need to be mapped // to their corresponding service (e.g. "HBase," "HDFS") in order to be controlled. - private static Map roleServiceType = new HashMap<>(); - static { - roleServiceType.put(ServiceType.HADOOP_NAMENODE, Service.HDFS); - roleServiceType.put(ServiceType.HADOOP_DATANODE, Service.HDFS); - roleServiceType.put(ServiceType.HADOOP_JOBTRACKER, Service.MAPREDUCE); - roleServiceType.put(ServiceType.HADOOP_TASKTRACKER, Service.MAPREDUCE); - roleServiceType.put(ServiceType.HBASE_MASTER, Service.HBASE); - roleServiceType.put(ServiceType.HBASE_REGIONSERVER, Service.HBASE); + private static final Map roleServiceType = buildRoleServiceTypeMap(); + + private static Map buildRoleServiceTypeMap() { + final Map ret = new HashMap<>(); + ret.put(ServiceType.HADOOP_NAMENODE, Service.HDFS); + ret.put(ServiceType.HADOOP_DATANODE, Service.HDFS); + ret.put(ServiceType.HADOOP_JOBTRACKER, Service.MAPREDUCE); + ret.put(ServiceType.HADOOP_TASKTRACKER, Service.MAPREDUCE); + ret.put(ServiceType.HBASE_MASTER, Service.HBASE); + ret.put(ServiceType.HBASE_REGIONSERVER, Service.HBASE); + return Collections.unmodifiableMap(ret); } enum Service {