YARN-8081. Add support to upgrade a component.
Contributed by Chandni Singh
(cherry picked from commit 8d3b39de89
)
This commit is contained in:
parent
899a7ec2fb
commit
051d9cff9c
|
@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.service.api.records.Component;
|
||||
import org.apache.hadoop.yarn.service.api.records.ComponentState;
|
||||
import org.apache.hadoop.yarn.service.api.records.Container;
|
||||
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||
|
@ -170,6 +171,22 @@ public class ApiServiceClient extends AppAdminClient {
|
|||
return api.toString();
|
||||
}
|
||||
|
||||
private String getComponentsPath(String appName) throws IOException {
|
||||
Preconditions.checkNotNull(appName);
|
||||
String url = getRMWebAddress();
|
||||
StringBuilder api = new StringBuilder();
|
||||
api.append(url);
|
||||
api.append("/app/v1/services/").append(appName).append("/")
|
||||
.append(RestApiConstants.COMPONENTS);
|
||||
Configuration conf = getConfig();
|
||||
if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase(
|
||||
"simple")) {
|
||||
api.append("?user.name=" + UrlEncoded
|
||||
.encodeString(System.getProperty("user.name")));
|
||||
}
|
||||
return api.toString();
|
||||
}
|
||||
|
||||
private Builder getApiClient() throws IOException {
|
||||
return getApiClient(getServicePath(null));
|
||||
}
|
||||
|
@ -536,7 +553,7 @@ public class ApiServiceClient extends AppAdminClient {
|
|||
container.setState(ContainerState.UPGRADING);
|
||||
toUpgrade[idx++] = container;
|
||||
}
|
||||
String buffer = containerJsonSerde.toJson(toUpgrade);
|
||||
String buffer = CONTAINER_JSON_SERDE.toJson(toUpgrade);
|
||||
ClientResponse response = getApiClient(getInstancesPath(appName))
|
||||
.put(ClientResponse.class, buffer);
|
||||
result = processResponse(response);
|
||||
|
@ -547,7 +564,35 @@ public class ApiServiceClient extends AppAdminClient {
|
|||
return result;
|
||||
}
|
||||
|
||||
private static JsonSerDeser<Container[]> containerJsonSerde =
|
||||
@Override
|
||||
public int actionUpgradeComponents(String appName, List<String> components)
|
||||
throws IOException, YarnException {
|
||||
int result;
|
||||
Component[] toUpgrade = new Component[components.size()];
|
||||
try {
|
||||
int idx = 0;
|
||||
for (String compName : components) {
|
||||
Component component = new Component();
|
||||
component.setName(compName);
|
||||
component.setState(ComponentState.UPGRADING);
|
||||
toUpgrade[idx++] = component;
|
||||
}
|
||||
String buffer = COMP_JSON_SERDE.toJson(toUpgrade);
|
||||
ClientResponse response = getApiClient(getComponentsPath(appName))
|
||||
.put(ClientResponse.class, buffer);
|
||||
result = processResponse(response);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to upgrade components: ", e);
|
||||
result = EXIT_EXCEPTION_THROWN;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private static final JsonSerDeser<Container[]> CONTAINER_JSON_SERDE =
|
||||
new JsonSerDeser<>(Container[].class,
|
||||
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
|
||||
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
|
||||
|
||||
private static final JsonSerDeser<Component[]> COMP_JSON_SERDE =
|
||||
new JsonSerDeser<>(Component[].class,
|
||||
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
|
||||
}
|
||||
|
|
|
@ -17,7 +17,9 @@
|
|||
|
||||
package org.apache.hadoop.yarn.service.webapp;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
|
||||
|
@ -29,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.service.api.records.Component;
|
||||
import org.apache.hadoop.yarn.service.api.records.ComponentState;
|
||||
import org.apache.hadoop.yarn.service.api.records.Container;
|
||||
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||
|
@ -61,8 +64,10 @@ import java.security.PrivilegedExceptionAction;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED;
|
||||
|
@ -306,6 +311,42 @@ public class ApiServer {
|
|||
return formatResponse(Status.OK, serviceStatus);
|
||||
}
|
||||
|
||||
@PUT
|
||||
@Path(COMPONENTS_PATH)
|
||||
@Consumes({MediaType.APPLICATION_JSON})
|
||||
@Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8, MediaType.TEXT_PLAIN})
|
||||
public Response updateComponents(@Context HttpServletRequest request,
|
||||
@PathParam(SERVICE_NAME) String serviceName,
|
||||
List<Component> requestComponents) {
|
||||
|
||||
try {
|
||||
if (requestComponents == null || requestComponents.isEmpty()) {
|
||||
throw new YarnException("No components provided.");
|
||||
}
|
||||
UserGroupInformation ugi = getProxyUser(request);
|
||||
Set<String> compNamesToUpgrade = new HashSet<>();
|
||||
requestComponents.forEach(reqComp -> {
|
||||
if (reqComp.getState() != null &&
|
||||
reqComp.getState().equals(ComponentState.UPGRADING)) {
|
||||
compNamesToUpgrade.add(reqComp.getName());
|
||||
}
|
||||
});
|
||||
LOG.info("PUT: upgrade components {} for service {} " +
|
||||
"user = {}", compNamesToUpgrade, serviceName, ugi);
|
||||
return processComponentsUpgrade(ugi, serviceName, compNamesToUpgrade);
|
||||
} catch (AccessControlException e) {
|
||||
return formatResponse(Response.Status.FORBIDDEN, e.getMessage());
|
||||
} catch (YarnException e) {
|
||||
return formatResponse(Response.Status.BAD_REQUEST, e.getMessage());
|
||||
} catch (IOException | InterruptedException e) {
|
||||
return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
|
||||
e.getMessage());
|
||||
} catch (UndeclaredThrowableException e) {
|
||||
return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
|
||||
e.getCause().getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@PUT
|
||||
@Path(COMPONENT_PATH)
|
||||
@Consumes({ MediaType.APPLICATION_JSON })
|
||||
|
@ -326,6 +367,15 @@ public class ApiServer {
|
|||
+ componentName + ")";
|
||||
throw new YarnException(msg);
|
||||
}
|
||||
UserGroupInformation ugi = getProxyUser(request);
|
||||
if (component.getState() != null &&
|
||||
component.getState().equals(ComponentState.UPGRADING)) {
|
||||
LOG.info("PUT: upgrade component {} for service {} " +
|
||||
"user = {}", component.getName(), appName, ugi);
|
||||
return processComponentsUpgrade(ugi, appName,
|
||||
Sets.newHashSet(componentName));
|
||||
}
|
||||
|
||||
if (component.getNumberOfContainers() == null) {
|
||||
throw new YarnException("No container count provided");
|
||||
}
|
||||
|
@ -334,7 +384,6 @@ public class ApiServer {
|
|||
+ component.getNumberOfContainers();
|
||||
throw new YarnException(message);
|
||||
}
|
||||
UserGroupInformation ugi = getProxyUser(request);
|
||||
Map<String, Long> original = ugi
|
||||
.doAs(new PrivilegedExceptionAction<Map<String, Long>>() {
|
||||
@Override
|
||||
|
@ -472,7 +521,7 @@ public class ApiServer {
|
|||
|
||||
if (reqContainer.getState() != null
|
||||
&& reqContainer.getState().equals(ContainerState.UPGRADING)) {
|
||||
return processContainerUpgrade(ugi, service,
|
||||
return processContainersUpgrade(ugi, service,
|
||||
Lists.newArrayList(liveContainer));
|
||||
}
|
||||
} catch (AccessControlException e) {
|
||||
|
@ -517,7 +566,7 @@ public class ApiServer {
|
|||
List<Container> liveContainers = ServiceApiUtil
|
||||
.getLiveContainers(service, toUpgrade);
|
||||
|
||||
return processContainerUpgrade(ugi, service, liveContainers);
|
||||
return processContainersUpgrade(ugi, service, liveContainers);
|
||||
}
|
||||
} catch (AccessControlException e) {
|
||||
return formatResponse(Response.Status.FORBIDDEN, e.getMessage());
|
||||
|
@ -629,7 +678,29 @@ public class ApiServer {
|
|||
return formatResponse(Status.ACCEPTED, status);
|
||||
}
|
||||
|
||||
private Response processContainerUpgrade(UserGroupInformation ugi,
|
||||
private Response processComponentsUpgrade(UserGroupInformation ugi,
|
||||
String serviceName, Set<String> compNames) throws YarnException,
|
||||
IOException, InterruptedException {
|
||||
Service service = getServiceFromClient(ugi, serviceName);
|
||||
if (service.getState() != ServiceState.UPGRADING) {
|
||||
throw new YarnException(
|
||||
String.format("The upgrade of service %s has not been initiated.",
|
||||
service.getName()));
|
||||
}
|
||||
List<Container> containersToUpgrade = ServiceApiUtil
|
||||
.validateAndResolveCompsUpgrade(service, compNames);
|
||||
Integer result = invokeContainersUpgrade(ugi, service, containersToUpgrade);
|
||||
if (result == EXIT_SUCCESS) {
|
||||
ServiceStatus status = new ServiceStatus();
|
||||
status.setDiagnostics(
|
||||
"Upgrading components " + Joiner.on(',').join(compNames) + ".");
|
||||
return formatResponse(Response.Status.ACCEPTED, status);
|
||||
}
|
||||
// If result is not a success, consider it a no-op
|
||||
return Response.status(Response.Status.NO_CONTENT).build();
|
||||
}
|
||||
|
||||
private Response processContainersUpgrade(UserGroupInformation ugi,
|
||||
Service service, List<Container> containers) throws YarnException,
|
||||
IOException, InterruptedException {
|
||||
|
||||
|
@ -638,25 +709,8 @@ public class ApiServer {
|
|||
String.format("The upgrade of service %s has not been initiated.",
|
||||
service.getName()));
|
||||
}
|
||||
for (Container liveContainer : containers) {
|
||||
if (liveContainer.getState() != ContainerState.NEEDS_UPGRADE) {
|
||||
// Nothing to upgrade
|
||||
throw new YarnException(String.format(
|
||||
"The component instance (%s) does not need an upgrade.",
|
||||
liveContainer.getComponentInstanceName()));
|
||||
}
|
||||
}
|
||||
|
||||
Integer result = ugi.doAs((PrivilegedExceptionAction<Integer>) () -> {
|
||||
int result1;
|
||||
ServiceClient sc = getServiceClient();
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
result1 = sc.actionUpgrade(service, containers);
|
||||
sc.close();
|
||||
return result1;
|
||||
});
|
||||
|
||||
ServiceApiUtil.validateInstancesUpgrade(containers);
|
||||
Integer result = invokeContainersUpgrade(ugi, service, containers);
|
||||
if (result == EXIT_SUCCESS) {
|
||||
ServiceStatus status = new ServiceStatus();
|
||||
status.setDiagnostics(
|
||||
|
@ -668,6 +722,20 @@ public class ApiServer {
|
|||
return Response.status(Response.Status.NO_CONTENT).build();
|
||||
}
|
||||
|
||||
private int invokeContainersUpgrade(UserGroupInformation ugi,
|
||||
Service service, List<Container> containers) throws IOException,
|
||||
InterruptedException {
|
||||
return ugi.doAs((PrivilegedExceptionAction<Integer>) () -> {
|
||||
int result1;
|
||||
ServiceClient sc = getServiceClient();
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
result1 = sc.actionUpgrade(service, containers);
|
||||
sc.close();
|
||||
return result1;
|
||||
});
|
||||
}
|
||||
|
||||
private Service getServiceFromClient(UserGroupInformation ugi,
|
||||
String serviceName) throws IOException, InterruptedException {
|
||||
|
||||
|
|
|
@ -34,7 +34,10 @@ import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
|
|||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* A mock version of ServiceClient - This class is design
|
||||
|
@ -46,6 +49,7 @@ public class ServiceClientTest extends ServiceClient {
|
|||
private Configuration conf = new Configuration();
|
||||
private Service goodServiceStatus = buildLiveGoodService();
|
||||
private boolean initialized;
|
||||
private Set<String> expectedInstances = new HashSet<>();
|
||||
|
||||
public ServiceClientTest() {
|
||||
super();
|
||||
|
@ -61,11 +65,12 @@ public class ServiceClientTest extends ServiceClient {
|
|||
|
||||
@Override
|
||||
public void stop() {
|
||||
// This is needed for testing API Server which use client to get status
|
||||
// This is needed for testing API Server which uses client to get status
|
||||
// and then perform an action.
|
||||
}
|
||||
|
||||
public void forceStop() {
|
||||
expectedInstances.clear();
|
||||
super.stop();
|
||||
}
|
||||
|
||||
|
@ -144,17 +149,27 @@ public class ServiceClientTest extends ServiceClient {
|
|||
@Override
|
||||
public int actionUpgrade(Service service, List<Container> compInstances)
|
||||
throws IOException, YarnException {
|
||||
if (service.getName() != null && service.getName().equals("jenkins")) {
|
||||
return EXIT_SUCCESS;
|
||||
} else {
|
||||
throw new IllegalArgumentException();
|
||||
if (service.getName() != null && service.getName().equals("jenkins")
|
||||
&& compInstances != null) {
|
||||
Set<String> actualInstances = compInstances.stream().map(
|
||||
Container::getComponentInstanceName).collect(Collectors.toSet());
|
||||
if (actualInstances.equals(expectedInstances)) {
|
||||
return EXIT_SUCCESS;
|
||||
}
|
||||
}
|
||||
throw new IllegalArgumentException();
|
||||
}
|
||||
|
||||
Service getGoodServiceStatus() {
|
||||
return goodServiceStatus;
|
||||
}
|
||||
|
||||
void setExpectedInstances(Set<String> instances) {
|
||||
if (instances != null) {
|
||||
expectedInstances.addAll(instances);
|
||||
}
|
||||
}
|
||||
|
||||
static Service buildGoodService() {
|
||||
Service service = new Service();
|
||||
service.setName("jenkins");
|
||||
|
@ -166,13 +181,15 @@ public class ServiceClientTest extends ServiceClient {
|
|||
resource.setCpus(1);
|
||||
resource.setMemory("2048");
|
||||
List<Component> components = new ArrayList<>();
|
||||
Component c = new Component();
|
||||
c.setName("jenkins");
|
||||
c.setNumberOfContainers(2L);
|
||||
c.setArtifact(artifact);
|
||||
c.setLaunchCommand("");
|
||||
c.setResource(resource);
|
||||
components.add(c);
|
||||
for (int i = 0; i < 2; i++) {
|
||||
Component c = new Component();
|
||||
c.setName("jenkins" + i);
|
||||
c.setNumberOfContainers(2L);
|
||||
c.setArtifact(artifact);
|
||||
c.setLaunchCommand("");
|
||||
c.setResource(resource);
|
||||
components.add(c);
|
||||
}
|
||||
service.setComponents(components);
|
||||
return service;
|
||||
}
|
||||
|
|
|
@ -23,18 +23,22 @@ import java.io.BufferedWriter;
|
|||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.core.Response;
|
||||
import javax.ws.rs.core.Response.Status;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.service.api.records.Artifact;
|
||||
import org.apache.hadoop.yarn.service.api.records.Artifact.TypeEnum;
|
||||
import org.apache.hadoop.yarn.service.api.records.Component;
|
||||
import org.apache.hadoop.yarn.service.api.records.ComponentState;
|
||||
import org.apache.hadoop.yarn.service.api.records.Container;
|
||||
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.service.api.records.Resource;
|
||||
|
@ -523,8 +527,11 @@ public class TestApiServer {
|
|||
// and container state needs to be in NEEDS_UPGRADE.
|
||||
Service serviceStatus = mockServerClient.getGoodServiceStatus();
|
||||
serviceStatus.setState(ServiceState.UPGRADING);
|
||||
serviceStatus.getComponents().iterator().next().getContainers().iterator()
|
||||
.next().setState(ContainerState.NEEDS_UPGRADE);
|
||||
Container liveContainer = serviceStatus.getComponents().iterator().next()
|
||||
.getContainers().iterator().next();
|
||||
liveContainer.setState(ContainerState.NEEDS_UPGRADE);
|
||||
mockServerClient.setExpectedInstances(Sets.newHashSet(
|
||||
liveContainer.getComponentInstanceName()));
|
||||
|
||||
final Response actual = apiServer.updateComponentInstance(request,
|
||||
goodService.getName(), comp.getName(),
|
||||
|
@ -545,9 +552,14 @@ public class TestApiServer {
|
|||
// and container state needs to be in NEEDS_UPGRADE.
|
||||
Service serviceStatus = mockServerClient.getGoodServiceStatus();
|
||||
serviceStatus.setState(ServiceState.UPGRADING);
|
||||
Set<String> expectedInstances = new HashSet<>();
|
||||
serviceStatus.getComponents().iterator().next().getContainers().forEach(
|
||||
container -> container.setState(ContainerState.NEEDS_UPGRADE)
|
||||
container -> {
|
||||
container.setState(ContainerState.NEEDS_UPGRADE);
|
||||
expectedInstances.add(container.getComponentInstanceName());
|
||||
}
|
||||
);
|
||||
mockServerClient.setExpectedInstances(expectedInstances);
|
||||
|
||||
final Response actual = apiServer.updateComponentInstances(request,
|
||||
goodService.getName(), comp.getContainers());
|
||||
|
@ -555,4 +567,57 @@ public class TestApiServer {
|
|||
Response.status(Status.ACCEPTED).build().getStatus(),
|
||||
actual.getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpgradeComponent() {
|
||||
Service goodService = ServiceClientTest.buildLiveGoodService();
|
||||
Component comp = goodService.getComponents().iterator().next();
|
||||
comp.setState(ComponentState.UPGRADING);
|
||||
|
||||
// To be able to upgrade, the service needs to be in UPGRADING
|
||||
// and component state needs to be in NEEDS_UPGRADE.
|
||||
Service serviceStatus = mockServerClient.getGoodServiceStatus();
|
||||
serviceStatus.setState(ServiceState.UPGRADING);
|
||||
Component liveComp = serviceStatus.getComponent(comp.getName());
|
||||
liveComp.setState(ComponentState.NEEDS_UPGRADE);
|
||||
Set<String> expectedInstances = new HashSet<>();
|
||||
liveComp.getContainers().forEach(container -> {
|
||||
expectedInstances.add(container.getComponentInstanceName());
|
||||
container.setState(ContainerState.NEEDS_UPGRADE);
|
||||
});
|
||||
mockServerClient.setExpectedInstances(expectedInstances);
|
||||
|
||||
final Response actual = apiServer.updateComponent(request,
|
||||
goodService.getName(), comp.getName(), comp);
|
||||
assertEquals("Component upgrade is ",
|
||||
Response.status(Status.ACCEPTED).build().getStatus(),
|
||||
actual.getStatus());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpgradeMultipleComps() {
|
||||
Service goodService = ServiceClientTest.buildLiveGoodService();
|
||||
goodService.getComponents().forEach(comp ->
|
||||
comp.setState(ComponentState.UPGRADING));
|
||||
|
||||
// To be able to upgrade, the live service needs to be in UPGRADING
|
||||
// and component states needs to be in NEEDS_UPGRADE.
|
||||
Service serviceStatus = mockServerClient.getGoodServiceStatus();
|
||||
serviceStatus.setState(ServiceState.UPGRADING);
|
||||
Set<String> expectedInstances = new HashSet<>();
|
||||
serviceStatus.getComponents().forEach(liveComp -> {
|
||||
liveComp.setState(ComponentState.NEEDS_UPGRADE);
|
||||
liveComp.getContainers().forEach(liveContainer -> {
|
||||
expectedInstances.add(liveContainer.getComponentInstanceName());
|
||||
liveContainer.setState(ContainerState.NEEDS_UPGRADE);
|
||||
});
|
||||
});
|
||||
mockServerClient.setExpectedInstances(expectedInstances);
|
||||
|
||||
final Response actual = apiServer.updateComponents(request,
|
||||
goodService.getName(), goodService.getComponents());
|
||||
assertEquals("Component upgrade is ",
|
||||
Response.status(Status.ACCEPTED).build().getStatus(),
|
||||
actual.getStatus());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -298,5 +298,17 @@ public class TestApiServiceClient {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testComponentsUpgrade() {
|
||||
String appName = "example-app";
|
||||
try {
|
||||
int result = asc.actionUpgradeComponents(appName, Lists.newArrayList(
|
||||
"comp"));
|
||||
assertEquals(EXIT_SUCCESS, result);
|
||||
} catch (IOException | YarnException e) {
|
||||
fail();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -26,5 +26,5 @@ import org.apache.hadoop.classification.InterfaceStability;
|
|||
@InterfaceStability.Unstable
|
||||
@ApiModel(description = "The current state of a component.")
|
||||
public enum ComponentState {
|
||||
FLEXING, STABLE, NEEDS_UPGRADE;
|
||||
FLEXING, STABLE, NEEDS_UPGRADE, UPGRADING;
|
||||
}
|
||||
|
|
|
@ -294,6 +294,17 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|||
Service persistedService = ServiceApiUtil.loadService(fs, appName);
|
||||
List<Container> containersToUpgrade = ServiceApiUtil.
|
||||
getLiveContainers(persistedService, componentInstances);
|
||||
ServiceApiUtil.validateInstancesUpgrade(containersToUpgrade);
|
||||
return actionUpgrade(persistedService, containersToUpgrade);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int actionUpgradeComponents(String appName,
|
||||
List<String> components) throws IOException, YarnException {
|
||||
checkAppExistOnHdfs(appName);
|
||||
Service persistedService = ServiceApiUtil.loadService(fs, appName);
|
||||
List<Container> containersToUpgrade = ServiceApiUtil
|
||||
.validateAndResolveCompsUpgrade(persistedService, components);
|
||||
return actionUpgrade(persistedService, containersToUpgrade);
|
||||
}
|
||||
|
||||
|
|
|
@ -34,6 +34,8 @@ public interface RestApiConstants {
|
|||
"/component-instances/{component_instance_name}";
|
||||
String COMP_INSTANCES = "component-instances";
|
||||
String COMP_INSTANCES_PATH = SERVICE_PATH + "/" + COMP_INSTANCES;
|
||||
String COMPONENTS = "components";
|
||||
String COMPONENTS_PATH = SERVICE_PATH + "/" + COMPONENTS;
|
||||
|
||||
// Query param
|
||||
String SERVICE_NAME = "service_name";
|
||||
|
|
|
@ -105,4 +105,10 @@ public interface RestApiErrorMessages {
|
|||
+ "expression name defined for this component only.";
|
||||
String ERROR_KEYTAB_URI_SCHEME_INVALID = "Unsupported keytab URI scheme: %s";
|
||||
String ERROR_KEYTAB_URI_INVALID = "Invalid keytab URI: %s";
|
||||
|
||||
String ERROR_COMP_INSTANCE_DOES_NOT_NEED_UPGRADE = "The component instance " +
|
||||
"(%s) does not need an upgrade.";
|
||||
|
||||
String ERROR_COMP_DOES_NOT_NEED_UPGRADE = "The component (%s) does not need" +
|
||||
" an upgrade.";
|
||||
}
|
||||
|
|
|
@ -19,8 +19,10 @@
|
|||
package org.apache.hadoop.yarn.service.utils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ArrayListMultimap;
|
||||
import com.google.common.collect.Multimap;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -29,14 +31,16 @@ import org.apache.hadoop.registry.client.api.RegistryConstants;
|
|||
import org.apache.hadoop.registry.client.binding.RegistryUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.service.api.records.ComponentState;
|
||||
import org.apache.hadoop.yarn.service.api.records.Container;
|
||||
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||
import org.apache.hadoop.yarn.service.api.records.Artifact;
|
||||
import org.apache.hadoop.yarn.service.api.records.Component;
|
||||
import org.apache.hadoop.yarn.service.api.records.Configuration;
|
||||
import org.apache.hadoop.yarn.service.api.records.Container;
|
||||
import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal;
|
||||
import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
|
||||
import org.apache.hadoop.yarn.service.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||
import org.apache.hadoop.yarn.service.exceptions.SliderException;
|
||||
import org.apache.hadoop.yarn.service.conf.RestApiConstants;
|
||||
import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages;
|
||||
|
@ -58,6 +62,9 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages.ERROR_COMP_DOES_NOT_NEED_UPGRADE;
|
||||
import static org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages.ERROR_COMP_INSTANCE_DOES_NOT_NEED_UPGRADE;
|
||||
|
||||
public class ServiceApiUtil {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(ServiceApiUtil.class);
|
||||
|
@ -545,6 +552,48 @@ public class ServiceApiUtil {
|
|||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates that the component instances that are requested to upgrade
|
||||
* require an upgrade.
|
||||
*/
|
||||
public static void validateInstancesUpgrade(List<Container>
|
||||
liveContainers) throws YarnException {
|
||||
for (Container liveContainer : liveContainers) {
|
||||
if (!liveContainer.getState().equals(ContainerState.NEEDS_UPGRADE)) {
|
||||
// Nothing to upgrade
|
||||
throw new YarnException(String.format(
|
||||
ERROR_COMP_INSTANCE_DOES_NOT_NEED_UPGRADE,
|
||||
liveContainer.getComponentInstanceName()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the components that are requested to upgrade require an upgrade.
|
||||
* It returns the instances of the components which need upgrade.
|
||||
*/
|
||||
public static List<Container> validateAndResolveCompsUpgrade(
|
||||
Service liveService, Collection<String> compNames) throws YarnException {
|
||||
Preconditions.checkNotNull(compNames);
|
||||
HashSet<String> requestedComps = Sets.newHashSet(compNames);
|
||||
List<Container> containerNeedUpgrade = new ArrayList<>();
|
||||
for (Component liveComp : liveService.getComponents()) {
|
||||
if (requestedComps.contains(liveComp.getName())) {
|
||||
if (!liveComp.getState().equals(ComponentState.NEEDS_UPGRADE)) {
|
||||
// Nothing to upgrade
|
||||
throw new YarnException(String.format(
|
||||
ERROR_COMP_DOES_NOT_NEED_UPGRADE, liveComp.getName()));
|
||||
}
|
||||
liveComp.getContainers().forEach(liveContainer -> {
|
||||
if (liveContainer.getState().equals(ContainerState.NEEDS_UPGRADE)) {
|
||||
containerNeedUpgrade.add(liveContainer);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
return containerNeedUpgrade;
|
||||
}
|
||||
|
||||
private static String parseComponentName(String componentInstanceName)
|
||||
throws YarnException {
|
||||
int idx = componentInstanceName.lastIndexOf('-');
|
||||
|
|
|
@ -193,6 +193,18 @@ public class TestServiceCLI {
|
|||
Assert.assertEquals(result, 0);
|
||||
}
|
||||
|
||||
@Test (timeout = 180000)
|
||||
public void testUpgradeComponents() throws Exception {
|
||||
conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE,
|
||||
DummyServiceClient.class.getName());
|
||||
cli.setConf(conf);
|
||||
String[] args = {"app", "-upgrade", "app-1",
|
||||
"-components", "comp1,comp2",
|
||||
"-appTypes", DUMMY_APP_TYPE};
|
||||
int result = cli.run(ApplicationCLI.preProcessArgs(args));
|
||||
Assert.assertEquals(result, 0);
|
||||
}
|
||||
|
||||
@Test (timeout = 180000)
|
||||
public void testEnableFastLaunch() throws Exception {
|
||||
fs.getFileSystem().create(new Path(basedir.getAbsolutePath(), "test.jar"))
|
||||
|
@ -291,5 +303,11 @@ public class TestServiceCLI {
|
|||
List<String> componentInstances) throws IOException, YarnException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int actionUpgradeComponents(String appName, List<String> components)
|
||||
throws IOException, YarnException {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -258,4 +258,16 @@ public abstract class AppAdminClient extends CompositeService {
|
|||
public abstract int actionUpgradeInstances(String appName,
|
||||
List<String> componentInstances) throws IOException, YarnException;
|
||||
|
||||
|
||||
/**
|
||||
* Upgrade components of a long running service.
|
||||
*
|
||||
* @param appName the name of the application.
|
||||
* @param components the name of the components.
|
||||
*/
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract int actionUpgradeComponents(String appName,
|
||||
List<String> components) throws IOException, YarnException;
|
||||
|
||||
}
|
||||
|
|
|
@ -104,6 +104,7 @@ public class ApplicationCLI extends YarnCLI {
|
|||
public static final String UPGRADE_AUTO_FINALIZE = "autoFinalize";
|
||||
public static final String UPGRADE_FINALIZE = "finalize";
|
||||
public static final String COMPONENT_INSTS = "instances";
|
||||
public static final String COMPONENTS = "components";
|
||||
|
||||
private static String firstArg = null;
|
||||
|
||||
|
@ -250,6 +251,8 @@ public class ApplicationCLI extends YarnCLI {
|
|||
opts.addOption(COMPONENT_INSTS, true, "Works with -upgrade option to " +
|
||||
"trigger the upgrade of specified component instances of the " +
|
||||
"application.");
|
||||
opts.addOption(COMPONENTS, true, "Works with -upgrade option to " +
|
||||
"trigger the upgrade of specified components of the application.");
|
||||
opts.addOption(UPGRADE_FINALIZE, false, "Works with -upgrade option to " +
|
||||
"finalize the upgrade.");
|
||||
opts.addOption(UPGRADE_AUTO_FINALIZE, false, "Works with -upgrade and " +
|
||||
|
@ -274,6 +277,9 @@ public class ApplicationCLI extends YarnCLI {
|
|||
opts.getOption(COMPONENT_INSTS).setArgName("Component Instances");
|
||||
opts.getOption(COMPONENT_INSTS).setValueSeparator(',');
|
||||
opts.getOption(COMPONENT_INSTS).setArgs(Option.UNLIMITED_VALUES);
|
||||
opts.getOption(COMPONENTS).setArgName("Components");
|
||||
opts.getOption(COMPONENTS).setValueSeparator(',');
|
||||
opts.getOption(COMPONENTS).setArgs(Option.UNLIMITED_VALUES);
|
||||
} else if (title != null && title.equalsIgnoreCase(APPLICATION_ATTEMPT)) {
|
||||
opts.addOption(STATUS_CMD, true,
|
||||
"Prints the status of the application attempt.");
|
||||
|
@ -574,7 +580,7 @@ public class ApplicationCLI extends YarnCLI {
|
|||
cliParser.getOptionValue(CHANGE_APPLICATION_QUEUE));
|
||||
} else if (cliParser.hasOption(UPGRADE_CMD)) {
|
||||
if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, UPGRADE_INITIATE,
|
||||
UPGRADE_AUTO_FINALIZE, UPGRADE_FINALIZE, COMPONENT_INSTS,
|
||||
UPGRADE_AUTO_FINALIZE, UPGRADE_FINALIZE, COMPONENT_INSTS, COMPONENTS,
|
||||
APP_TYPE_CMD)) {
|
||||
printUsage(title, opts);
|
||||
return exitCode;
|
||||
|
@ -603,6 +609,15 @@ public class ApplicationCLI extends YarnCLI {
|
|||
}
|
||||
String[] instances = cliParser.getOptionValues(COMPONENT_INSTS);
|
||||
return client.actionUpgradeInstances(appName, Arrays.asList(instances));
|
||||
} else if (cliParser.hasOption(COMPONENTS)) {
|
||||
if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD,
|
||||
COMPONENTS, APP_TYPE_CMD)) {
|
||||
printUsage(title, opts);
|
||||
return exitCode;
|
||||
}
|
||||
String[] components = cliParser.getOptionValues(COMPONENTS);
|
||||
return client.actionUpgradeComponents(appName,
|
||||
Arrays.asList(components));
|
||||
} else if (cliParser.hasOption(UPGRADE_FINALIZE)) {
|
||||
if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD,
|
||||
UPGRADE_FINALIZE, APP_TYPE_CMD)) {
|
||||
|
|
|
@ -2143,6 +2143,9 @@ public class TestYarnCLI {
|
|||
pw.println(" long-running service. Supports");
|
||||
pw.println(" absolute or relative changes,");
|
||||
pw.println(" such as +1, 2, or -3.");
|
||||
pw.println(" -components <Components> Works with -upgrade option to");
|
||||
pw.println(" trigger the upgrade of specified");
|
||||
pw.println(" components of the application.");
|
||||
pw.println(" -destroy <Application Name> Destroys a saved application");
|
||||
pw.println(" specification and removes all");
|
||||
pw.println(" application data permanently.");
|
||||
|
|
Loading…
Reference in New Issue