HBASE-24361 Make `RESTApiClusterManager` more resilient (#1701)
* sometimes API calls return with null/empty response bodies. thus, wrap all API calls in a retry loop. * calls that submit work in the form of "commands" now retrieve the commandId from successful command submission, and track completion of that command before returning control to calling context. * model CM's process state and use that model to guide state transitions more intelligently. this guards against, for example, the start command failing with an error message like "Role must be stopped". * improvements to logging levels, avoid spamming logs with the side-effects of retries at this and higher contexts. * include references to API documentation, such as it is. Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
8f4c255b38
commit
5fb9e518ef
|
@ -73,11 +73,11 @@ public class HBaseClusterManager extends Configured implements ClusterManager {
|
|||
"timeout %6$s /usr/bin/ssh %1$s %2$s%3$s%4$s \"sudo %5$s\"";
|
||||
private String tunnelSudoCmd;
|
||||
|
||||
private static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts";
|
||||
private static final int DEFAULT_RETRY_ATTEMPTS = 5;
|
||||
static final String RETRY_ATTEMPTS_KEY = "hbase.it.clustermanager.retry.attempts";
|
||||
static final int DEFAULT_RETRY_ATTEMPTS = 5;
|
||||
|
||||
private static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval";
|
||||
private static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000;
|
||||
static final String RETRY_SLEEP_INTERVAL_KEY = "hbase.it.clustermanager.retry.sleep.interval";
|
||||
static final int DEFAULT_RETRY_SLEEP_INTERVAL = 1000;
|
||||
|
||||
protected RetryCounterFactory retryCounterFactory;
|
||||
|
||||
|
|
|
@ -18,11 +18,19 @@
|
|||
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import static org.apache.hadoop.hbase.HBaseClusterManager.DEFAULT_RETRY_ATTEMPTS;
|
||||
import static org.apache.hadoop.hbase.HBaseClusterManager.DEFAULT_RETRY_SLEEP_INTERVAL;
|
||||
import static org.apache.hadoop.hbase.HBaseClusterManager.RETRY_ATTEMPTS_KEY;
|
||||
import static org.apache.hadoop.hbase.HBaseClusterManager.RETRY_SLEEP_INTERVAL_KEY;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Callable;
|
||||
import javax.ws.rs.client.Client;
|
||||
import javax.ws.rs.client.ClientBuilder;
|
||||
import javax.ws.rs.client.Entity;
|
||||
|
@ -32,8 +40,12 @@ import javax.ws.rs.core.MediaType;
|
|||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.UriBuilder;
|
||||
import javax.xml.ws.http.HTTPException;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter.RetryConfig;
|
||||
import org.apache.hadoop.hbase.util.RetryCounterFactory;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
|
||||
import org.slf4j.Logger;
|
||||
|
@ -62,6 +74,10 @@ import org.apache.hbase.thirdparty.com.google.gson.JsonParser;
|
|||
*
|
||||
* This class will defer to the ClusterManager terminology in methods that it implements from
|
||||
* that interface, but uses Cloudera Manager's terminology when dealing with its API directly.
|
||||
*
|
||||
* DEBUG-level logging gives more details of the actions this class takes as they happen. Log at
|
||||
* TRACE-level to see the API requests and responses. TRACE-level logging on RetryCounter displays
|
||||
* wait times, so that can be helpful too.
|
||||
*/
|
||||
public class RESTApiClusterManager extends Configured implements ClusterManager {
|
||||
// Properties that need to be in the Configuration object to interact with the REST API cluster
|
||||
|
@ -95,12 +111,14 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
|
|||
private static final String API_VERSION = "v6";
|
||||
|
||||
// Client instances are expensive, so use the same one for all our REST queries.
|
||||
private Client client = ClientBuilder.newClient();
|
||||
private final Client client = ClientBuilder.newClient();
|
||||
|
||||
// An instance of HBaseClusterManager is used for methods like the kill, resume, and suspend
|
||||
// because cluster managers don't tend to implement these operations.
|
||||
private ClusterManager hBaseClusterManager;
|
||||
|
||||
private RetryCounterFactory retryCounterFactory;
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(RESTApiClusterManager.class);
|
||||
|
||||
RESTApiClusterManager() { }
|
||||
|
@ -124,38 +142,111 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
|
|||
String serverUsername = conf.get(REST_API_CLUSTER_MANAGER_USERNAME, DEFAULT_SERVER_USERNAME);
|
||||
String serverPassword = conf.get(REST_API_CLUSTER_MANAGER_PASSWORD, DEFAULT_SERVER_PASSWORD);
|
||||
client.register(HttpAuthenticationFeature.basic(serverUsername, serverPassword));
|
||||
|
||||
this.retryCounterFactory = new RetryCounterFactory(new RetryConfig()
|
||||
.setMaxAttempts(conf.getInt(RETRY_ATTEMPTS_KEY, DEFAULT_RETRY_ATTEMPTS))
|
||||
.setSleepInterval(conf.getLong(RETRY_SLEEP_INTERVAL_KEY, DEFAULT_RETRY_SLEEP_INTERVAL)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start(ServiceType service, String hostname, int port) throws IOException {
|
||||
performClusterManagerCommand(service, hostname, RoleCommand.START);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop(ServiceType service, String hostname, int port) throws IOException {
|
||||
performClusterManagerCommand(service, hostname, RoleCommand.STOP);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restart(ServiceType service, String hostname, int port) throws IOException {
|
||||
performClusterManagerCommand(service, hostname, RoleCommand.RESTART);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning(ServiceType service, String hostname, int port) throws IOException {
|
||||
String serviceName = getServiceName(roleServiceType.get(service));
|
||||
String hostId = getHostId(hostname);
|
||||
String roleState = getRoleState(serviceName, service.toString(), hostId);
|
||||
String healthSummary = getHealthSummary(serviceName, service.toString(), hostId);
|
||||
boolean isRunning = false;
|
||||
|
||||
// Use Yoda condition to prevent NullPointerException. roleState will be null if the "service
|
||||
// type" does not exist on the specified hostname.
|
||||
if ("STARTED".equals(roleState) && "GOOD".equals(healthSummary)) {
|
||||
isRunning = true;
|
||||
public void start(ServiceType service, String hostname, int port) {
|
||||
// With Cloudera Manager (6.3.x), sending a START command to a service role
|
||||
// that is already in the "Started" state is an error. CM will log a message
|
||||
// saying "Role must be stopped". It will complain similarly for other
|
||||
// expected state transitions.
|
||||
// A role process that has been `kill -9`'ed ends up with the service role
|
||||
// retaining the "Started" state but with the process marked as "unhealthy".
|
||||
// Instead of blindly issuing the START command, first send a STOP command
|
||||
// to ensure the START will be accepted.
|
||||
LOG.debug("Performing start of {} on {}:{}", service, hostname, port);
|
||||
final RoleState currentState = getRoleState(service, hostname);
|
||||
switch (currentState) {
|
||||
case NA:
|
||||
case BUSY:
|
||||
case UNKNOWN:
|
||||
case HISTORY_NOT_AVAILABLE:
|
||||
LOG.warn("Unexpected service state detected. Service START requested, but currently in"
|
||||
+ " {} state. Attempting to start. {}, {}:{}", currentState, service, hostname, port);
|
||||
performClusterManagerCommand(service, hostname, RoleCommand.START);
|
||||
return;
|
||||
case STOPPING:
|
||||
LOG.warn("Unexpected service state detected. Service START requested, but currently in"
|
||||
+ " {} state. Waiting for stop before attempting start. {}, {}:{}", currentState,
|
||||
service, hostname, port);
|
||||
waitFor(() -> Objects.equals(RoleState.STOPPED, getRoleState(service, hostname)));
|
||||
performClusterManagerCommand(service, hostname, RoleCommand.START);
|
||||
return;
|
||||
case STOPPED:
|
||||
performClusterManagerCommand(service, hostname, RoleCommand.START);
|
||||
return;
|
||||
case STARTING:
|
||||
LOG.warn("Unexpected service state detected. Service START requested, but already in"
|
||||
+ " {} state. Ignoring current request and waiting for start to complete. {}, {}:{}",
|
||||
currentState, service, hostname, port);
|
||||
waitFor(()-> Objects.equals(RoleState.STARTED, getRoleState(service, hostname)));
|
||||
return;
|
||||
case STARTED:
|
||||
LOG.warn("Unexpected service state detected. Service START requested, but already in"
|
||||
+ " {} state. Restarting. {}, {}:{}", currentState, service, hostname, port);
|
||||
performClusterManagerCommand(service, hostname, RoleCommand.RESTART);
|
||||
return;
|
||||
}
|
||||
throw new RuntimeException("should not happen.");
|
||||
}
|
||||
|
||||
return isRunning;
|
||||
@Override
|
||||
public void stop(ServiceType service, String hostname, int port) {
|
||||
LOG.debug("Performing stop of {} on {}:{}", service, hostname, port);
|
||||
final RoleState currentState = getRoleState(service, hostname);
|
||||
switch (currentState) {
|
||||
case NA:
|
||||
case BUSY:
|
||||
case UNKNOWN:
|
||||
case HISTORY_NOT_AVAILABLE:
|
||||
LOG.warn("Unexpected service state detected. Service STOP requested, but already in"
|
||||
+ " {} state. Attempting to stop. {}, {}:{}", currentState, service, hostname, port);
|
||||
performClusterManagerCommand(service, hostname, RoleCommand.STOP);
|
||||
return;
|
||||
case STOPPING:
|
||||
waitFor(() -> Objects.equals(RoleState.STOPPED, getRoleState(service, hostname)));
|
||||
return;
|
||||
case STOPPED:
|
||||
LOG.warn("Unexpected service state detected. Service STOP requested, but already in"
|
||||
+ " {} state. Ignoring current request. {}, {}:{}", currentState, service, hostname,
|
||||
port);
|
||||
return;
|
||||
case STARTING:
|
||||
LOG.warn("Unexpected service state detected. Service STOP requested, but already in"
|
||||
+ " {} state. Waiting for start to complete. {}, {}:{}", currentState, service, hostname,
|
||||
port);
|
||||
waitFor(()-> Objects.equals(RoleState.STARTED, getRoleState(service, hostname)));
|
||||
performClusterManagerCommand(service, hostname, RoleCommand.STOP);
|
||||
return;
|
||||
case STARTED:
|
||||
performClusterManagerCommand(service, hostname, RoleCommand.STOP);
|
||||
return;
|
||||
}
|
||||
throw new RuntimeException("should not happen.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restart(ServiceType service, String hostname, int port) {
|
||||
LOG.debug("Performing stop followed by start of {} on {}:{}", service, hostname, port);
|
||||
stop(service, hostname, port);
|
||||
start(service, hostname, port);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isRunning(ServiceType service, String hostname, int port) {
|
||||
LOG.debug("Issuing isRunning request against {} on {}:{}", service, hostname, port);
|
||||
return executeWithRetries(() -> {
|
||||
String serviceName = getServiceName(roleServiceType.get(service));
|
||||
String hostId = getHostId(hostname);
|
||||
RoleState roleState = getRoleState(serviceName, service.toString(), hostId);
|
||||
HealthSummary healthSummary = getHealthSummary(serviceName, service.toString(), hostId);
|
||||
return Objects.equals(RoleState.STARTED, roleState)
|
||||
&& Objects.equals(HealthSummary.GOOD, healthSummary);
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -173,20 +264,38 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
|
|||
hBaseClusterManager.resume(service, hostname, port);
|
||||
}
|
||||
|
||||
|
||||
// Convenience method to execute command against role on hostname. Only graceful commands are
|
||||
// supported since cluster management APIs don't tend to let you SIGKILL things.
|
||||
private void performClusterManagerCommand(ServiceType role, String hostname, RoleCommand command)
|
||||
throws IOException {
|
||||
LOG.info("Performing " + command + " command against " + role + " on " + hostname + "...");
|
||||
String serviceName = getServiceName(roleServiceType.get(role));
|
||||
String hostId = getHostId(hostname);
|
||||
String roleName = getRoleName(serviceName, role.toString(), hostId);
|
||||
doRoleCommand(serviceName, roleName, command);
|
||||
private void performClusterManagerCommand(ServiceType role, String hostname,
|
||||
RoleCommand command) {
|
||||
// retry submitting the command until the submission succeeds.
|
||||
final long commandId = executeWithRetries(() -> {
|
||||
final String serviceName = getServiceName(roleServiceType.get(role));
|
||||
final String hostId = getHostId(hostname);
|
||||
final String roleName = getRoleName(serviceName, role.toString(), hostId);
|
||||
return doRoleCommand(serviceName, roleName, command);
|
||||
});
|
||||
LOG.debug("Command {} of {} on {} submitted as commandId {}",
|
||||
command, role, hostname, commandId);
|
||||
|
||||
// assume the submitted command was asynchronous. wait on the commandId to be marked as
|
||||
// successful.
|
||||
waitFor(() -> hasCommandCompleted(commandId));
|
||||
if (!executeWithRetries(() -> hasCommandCompletedSuccessfully(commandId))) {
|
||||
final String msg = String.format("Command %s of %s on %s submitted as commandId %s failed.",
|
||||
command, role, hostname, commandId);
|
||||
// TODO: this does not interrupt the monkey. should it?
|
||||
throw new RuntimeException(msg);
|
||||
}
|
||||
LOG.debug("Command {} of {} on {} submitted as commandId {} completed successfully.",
|
||||
command, role, hostname, commandId);
|
||||
}
|
||||
|
||||
// Performing a command (e.g. starting or stopping a role) requires a POST instead of a GET.
|
||||
private void doRoleCommand(String serviceName, String roleName, RoleCommand roleCommand) {
|
||||
/**
|
||||
* Issues a command (e.g. starting or stopping a role).
|
||||
* @return the commandId of a successfully submitted asynchronous command.
|
||||
*/
|
||||
private long doRoleCommand(String serviceName, String roleName, RoleCommand roleCommand) {
|
||||
URI uri = UriBuilder.fromUri(serverHostname)
|
||||
.path("api")
|
||||
.path(API_VERSION)
|
||||
|
@ -198,29 +307,46 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
|
|||
.path(roleCommand.toString())
|
||||
.build();
|
||||
String body = "{ \"items\": [ \"" + roleName + "\" ] }";
|
||||
LOG.info("Executing POST against " + uri + " with body " + body + "...");
|
||||
LOG.trace("Executing POST against {} with body {} ...", uri, body);
|
||||
WebTarget webTarget = client.target(uri);
|
||||
Invocation.Builder invocationBuilder = webTarget.request(MediaType.APPLICATION_JSON);
|
||||
Response response = invocationBuilder.post(Entity.json(body));
|
||||
int statusCode = response.getStatus();
|
||||
final int statusCode = response.getStatus();
|
||||
final String responseBody = response.readEntity(String.class);
|
||||
if (statusCode != Response.Status.OK.getStatusCode()) {
|
||||
LOG.warn(
|
||||
"RoleCommand failed with status code {} and response body {}", statusCode, responseBody);
|
||||
throw new HTTPException(statusCode);
|
||||
}
|
||||
|
||||
LOG.trace("POST against {} completed with status code {} and response body {}",
|
||||
uri, statusCode, responseBody);
|
||||
return parser.parse(responseBody)
|
||||
.getAsJsonObject()
|
||||
.get("items")
|
||||
.getAsJsonArray()
|
||||
.get(0)
|
||||
.getAsJsonObject()
|
||||
.get("id")
|
||||
.getAsLong();
|
||||
}
|
||||
|
||||
// Possible healthSummary values include "GOOD" and "BAD."
|
||||
private String getHealthSummary(String serviceName, String roleType, String hostId)
|
||||
throws IOException {
|
||||
return getRolePropertyValue(serviceName, roleType, hostId, "healthSummary");
|
||||
private HealthSummary getHealthSummary(String serviceName, String roleType, String hostId) {
|
||||
return HealthSummary.fromString(
|
||||
getRolePropertyValue(serviceName, roleType, hostId, "healthSummary"));
|
||||
}
|
||||
|
||||
// This API uses a hostId to execute host-specific commands; get one from a hostname.
|
||||
private String getHostId(String hostname) throws IOException {
|
||||
private String getHostId(String hostname) {
|
||||
String hostId = null;
|
||||
|
||||
URI uri =
|
||||
UriBuilder.fromUri(serverHostname).path("api").path(API_VERSION).path("hosts").build();
|
||||
JsonElement hosts = getJsonNodeFromURIGet(uri);
|
||||
URI uri = UriBuilder.fromUri(serverHostname)
|
||||
.path("api")
|
||||
.path(API_VERSION)
|
||||
.path("hosts")
|
||||
.build();
|
||||
JsonElement hosts = parser.parse(getFromURIGet(uri))
|
||||
.getAsJsonObject()
|
||||
.get("items");
|
||||
if (hosts != null) {
|
||||
// Iterate through the list of hosts, stopping once you've reached the requested hostname.
|
||||
for (JsonElement host : hosts.getAsJsonArray()) {
|
||||
|
@ -229,42 +355,50 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
|
|||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
hostId = null;
|
||||
}
|
||||
|
||||
return hostId;
|
||||
}
|
||||
|
||||
// Execute GET against URI, returning a JsonNode object to be traversed.
|
||||
private JsonElement getJsonNodeFromURIGet(URI uri) throws IOException {
|
||||
LOG.debug("Executing GET against " + uri + "...");
|
||||
private String getFromURIGet(URI uri) {
|
||||
LOG.trace("Executing GET against {} ...", uri);
|
||||
final Response response = client.target(uri)
|
||||
.request(MediaType.APPLICATION_JSON_TYPE)
|
||||
.get();
|
||||
int statusCode = response.getStatus();
|
||||
final String responseBody = response.readEntity(String.class);
|
||||
if (statusCode != Response.Status.OK.getStatusCode()) {
|
||||
LOG.warn(
|
||||
"request failed with status code {} and response body {}", statusCode, responseBody);
|
||||
throw new HTTPException(statusCode);
|
||||
}
|
||||
// This API folds information as the value to an "items" attribute.
|
||||
return parser.parse(response.readEntity(String.class))
|
||||
.getAsJsonObject()
|
||||
.get("items");
|
||||
LOG.trace("GET against {} completed with status code {} and response body {}",
|
||||
uri, statusCode, responseBody);
|
||||
return responseBody;
|
||||
}
|
||||
|
||||
// This API assigns a unique role name to each host's instance of a role.
|
||||
private String getRoleName(String serviceName, String roleType, String hostId)
|
||||
throws IOException {
|
||||
private String getRoleName(String serviceName, String roleType, String hostId) {
|
||||
return getRolePropertyValue(serviceName, roleType, hostId, "name");
|
||||
}
|
||||
|
||||
// Get the value of a property from a role on a particular host.
|
||||
private String getRolePropertyValue(String serviceName, String roleType, String hostId,
|
||||
String property) throws IOException {
|
||||
String property) {
|
||||
String roleValue = null;
|
||||
URI uri = UriBuilder.fromUri(serverHostname).path("api").path(API_VERSION).path("clusters")
|
||||
.path(clusterName).path("services").path(serviceName).path("roles").build();
|
||||
JsonElement roles = getJsonNodeFromURIGet(uri);
|
||||
URI uri = UriBuilder.fromUri(serverHostname)
|
||||
.path("api")
|
||||
.path(API_VERSION)
|
||||
.path("clusters")
|
||||
.path(clusterName)
|
||||
.path("services")
|
||||
.path(serviceName)
|
||||
.path("roles")
|
||||
.build();
|
||||
JsonElement roles = parser.parse(getFromURIGet(uri))
|
||||
.getAsJsonObject()
|
||||
.get("items");
|
||||
if (roles != null) {
|
||||
// Iterate through the list of roles, stopping once the requested one is found.
|
||||
for (JsonElement role : roles.getAsJsonArray()) {
|
||||
|
@ -281,18 +415,29 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
|
|||
return roleValue;
|
||||
}
|
||||
|
||||
// Possible roleState values include "STARTED" and "STOPPED."
|
||||
private String getRoleState(String serviceName, String roleType, String hostId)
|
||||
throws IOException {
|
||||
return getRolePropertyValue(serviceName, roleType, hostId, "roleState");
|
||||
private RoleState getRoleState(ServiceType service, String hostname) {
|
||||
return executeWithRetries(() -> {
|
||||
String serviceName = getServiceName(roleServiceType.get(service));
|
||||
String hostId = getHostId(hostname);
|
||||
RoleState state = getRoleState(serviceName, service.toString(), hostId);
|
||||
// sometimes the response (usually the first) is null. retry those.
|
||||
return Objects.requireNonNull(state);
|
||||
});
|
||||
}
|
||||
|
||||
private RoleState getRoleState(String serviceName, String roleType, String hostId) {
|
||||
return RoleState.fromString(
|
||||
getRolePropertyValue(serviceName, roleType, hostId, "roleState"));
|
||||
}
|
||||
|
||||
// Convert a service (e.g. "HBASE," "HDFS") into a service name (e.g. "HBASE-1," "HDFS-1").
|
||||
private String getServiceName(Service service) throws IOException {
|
||||
private String getServiceName(Service service) {
|
||||
String serviceName = null;
|
||||
URI uri = UriBuilder.fromUri(serverHostname).path("api").path(API_VERSION).path("clusters")
|
||||
.path(clusterName).path("services").build();
|
||||
JsonElement services = getJsonNodeFromURIGet(uri);
|
||||
JsonElement services = parser.parse(getFromURIGet(uri))
|
||||
.getAsJsonObject()
|
||||
.get("items");
|
||||
if (services != null) {
|
||||
// Iterate through the list of services, stopping once the requested one is found.
|
||||
for (JsonElement serviceEntry : services.getAsJsonArray()) {
|
||||
|
@ -306,6 +451,98 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
|
|||
return serviceName;
|
||||
}
|
||||
|
||||
private Optional<JsonObject> getCommand(final long commandId) {
|
||||
final URI uri = UriBuilder.fromUri(serverHostname)
|
||||
.path("api")
|
||||
.path(API_VERSION)
|
||||
.path("commands")
|
||||
.path(Long.toString(commandId))
|
||||
.build();
|
||||
return Optional.ofNullable(getFromURIGet(uri))
|
||||
.map(parser::parse)
|
||||
.map(JsonElement::getAsJsonObject);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return {@code true} if the {@code commandId} has finished processing.
|
||||
*/
|
||||
private boolean hasCommandCompleted(final long commandId) {
|
||||
return getCommand(commandId)
|
||||
.map(val -> {
|
||||
final boolean isActive = val.get("active").getAsBoolean();
|
||||
if (isActive) {
|
||||
LOG.debug("command {} is still active.", commandId);
|
||||
}
|
||||
return !isActive;
|
||||
})
|
||||
.orElse(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Return {@code true} if the {@code commandId} has finished successfully.
|
||||
*/
|
||||
private boolean hasCommandCompletedSuccessfully(final long commandId) {
|
||||
return getCommand(commandId)
|
||||
.filter(val -> {
|
||||
final boolean isActive = val.get("active").getAsBoolean();
|
||||
if (isActive) {
|
||||
LOG.debug("command {} is still active.", commandId);
|
||||
}
|
||||
return !isActive;
|
||||
})
|
||||
.map(val -> {
|
||||
final boolean isSuccess = val.get("success").getAsBoolean();
|
||||
LOG.debug("command {} completed as {}.", commandId, isSuccess);
|
||||
return isSuccess;
|
||||
})
|
||||
.orElse(false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper method for executing retryable work.
|
||||
*/
|
||||
private <T> T executeWithRetries(final Callable<T> callable) {
|
||||
final RetryCounter retryCounter = retryCounterFactory.create();
|
||||
while (true) {
|
||||
try {
|
||||
return callable.call();
|
||||
} catch (Exception e) {
|
||||
if (retryCounter.shouldRetry()) {
|
||||
LOG.debug("execution failed with exception. Retrying.", e);
|
||||
} else {
|
||||
throw new RuntimeException("retries exhausted", e);
|
||||
}
|
||||
}
|
||||
try {
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void waitFor(final Callable<Boolean> predicate) {
|
||||
final RetryCounter retryCounter = retryCounterFactory.create();
|
||||
while (true) {
|
||||
try {
|
||||
if (Objects.equals(true, predicate.call())) {
|
||||
return;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
if (retryCounter.shouldRetry()) {
|
||||
LOG.debug("execution failed with exception. Retrying.", e);
|
||||
} else {
|
||||
throw new RuntimeException("retries exhausted", e);
|
||||
}
|
||||
}
|
||||
try {
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Some enums to guard against bad calls.
|
||||
*/
|
||||
|
@ -323,17 +560,52 @@ public class RESTApiClusterManager extends Configured implements ClusterManager
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents the configured run state of a role.
|
||||
* @see <a href="https://archive.cloudera.com/cm6/6.3.0/generic/jar/cm_api/apidocs/json_ApiRoleState.html">
|
||||
* https://archive.cloudera.com/cm6/6.3.0/generic/jar/cm_api/apidocs/json_ApiRoleState.html</a>
|
||||
*/
|
||||
private enum RoleState {
|
||||
HISTORY_NOT_AVAILABLE, UNKNOWN, STARTING, STARTED, BUSY, STOPPING, STOPPED, NA;
|
||||
|
||||
public static RoleState fromString(final String value) {
|
||||
if (StringUtils.isBlank(value)) {
|
||||
return null;
|
||||
}
|
||||
return RoleState.valueOf(value.toUpperCase());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents of the high-level health status of a subject in the cluster.
|
||||
* @see <a href="https://archive.cloudera.com/cm6/6.3.0/generic/jar/cm_api/apidocs/json_ApiHealthSummary.html">
|
||||
* https://archive.cloudera.com/cm6/6.3.0/generic/jar/cm_api/apidocs/json_ApiHealthSummary.html</a>
|
||||
*/
|
||||
private enum HealthSummary {
|
||||
DISABLED, HISTORY_NOT_AVAILABLE, NOT_AVAILABLE, GOOD, CONCERNING, BAD;
|
||||
|
||||
public static HealthSummary fromString(final String value) {
|
||||
if (StringUtils.isBlank(value)) {
|
||||
return null;
|
||||
}
|
||||
return HealthSummary.valueOf(value.toUpperCase());
|
||||
}
|
||||
}
|
||||
|
||||
// ClusterManager methods take a "ServiceType" object (e.g. "HBASE_MASTER," "HADOOP_NAMENODE").
|
||||
// These "service types," which cluster managers call "roles" or "components," need to be mapped
|
||||
// to their corresponding service (e.g. "HBase," "HDFS") in order to be controlled.
|
||||
private static Map<ServiceType, Service> roleServiceType = new HashMap<>();
|
||||
static {
|
||||
roleServiceType.put(ServiceType.HADOOP_NAMENODE, Service.HDFS);
|
||||
roleServiceType.put(ServiceType.HADOOP_DATANODE, Service.HDFS);
|
||||
roleServiceType.put(ServiceType.HADOOP_JOBTRACKER, Service.MAPREDUCE);
|
||||
roleServiceType.put(ServiceType.HADOOP_TASKTRACKER, Service.MAPREDUCE);
|
||||
roleServiceType.put(ServiceType.HBASE_MASTER, Service.HBASE);
|
||||
roleServiceType.put(ServiceType.HBASE_REGIONSERVER, Service.HBASE);
|
||||
private static final Map<ServiceType, Service> roleServiceType = buildRoleServiceTypeMap();
|
||||
|
||||
private static Map<ServiceType, Service> buildRoleServiceTypeMap() {
|
||||
final Map<ServiceType, Service> ret = new HashMap<>();
|
||||
ret.put(ServiceType.HADOOP_NAMENODE, Service.HDFS);
|
||||
ret.put(ServiceType.HADOOP_DATANODE, Service.HDFS);
|
||||
ret.put(ServiceType.HADOOP_JOBTRACKER, Service.MAPREDUCE);
|
||||
ret.put(ServiceType.HADOOP_TASKTRACKER, Service.MAPREDUCE);
|
||||
ret.put(ServiceType.HBASE_MASTER, Service.HBASE);
|
||||
ret.put(ServiceType.HBASE_REGIONSERVER, Service.HBASE);
|
||||
return Collections.unmodifiableMap(ret);
|
||||
}
|
||||
|
||||
enum Service {
|
||||
|
|
Loading…
Reference in New Issue