YARN-7939. Added support to upgrade a component instance.

Contributed by Chandni Singh

(cherry picked from commit 86ae380e55ca3b1b12744b9338d60b34e8c717a6)
This commit is contained in:
Eric Yang 2018-04-26 15:47:55 -04:00
parent 050bd4cd2a
commit b39d183d92
41 changed files with 1963 additions and 289 deletions

View File

@ -26,6 +26,7 @@ import java.util.Map;
import javax.ws.rs.core.MediaType;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -40,11 +41,16 @@ import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.api.records.ServiceStatus;
import org.apache.hadoop.yarn.service.conf.RestApiConstants;
import org.apache.hadoop.yarn.service.utils.JsonSerDeser;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.util.RMHAUtils;
import org.codehaus.jackson.map.PropertyNamingStrategy;
import org.eclipse.jetty.util.UrlEncoded;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -131,7 +137,7 @@ public class ApiServiceClient extends AppAdminClient {
* @return URI to API Service
* @throws IOException
*/
private String getApiUrl(String appName) throws IOException {
private String getServicePath(String appName) throws IOException {
String url = getRMWebAddress();
StringBuilder api = new StringBuilder();
api.append(url);
@ -148,23 +154,40 @@ public class ApiServiceClient extends AppAdminClient {
return api.toString();
}
private String getInstancesPath(String appName) throws IOException {
Preconditions.checkNotNull(appName);
String url = getRMWebAddress();
StringBuilder api = new StringBuilder();
api.append(url);
api.append("/app/v1/services/").append(appName).append("/")
.append(RestApiConstants.COMP_INSTANCES);
Configuration conf = getConfig();
if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase(
"simple")) {
api.append("?user.name=" + UrlEncoded
.encodeString(System.getProperty("user.name")));
}
return api.toString();
}
private Builder getApiClient() throws IOException {
return getApiClient(null);
return getApiClient(getServicePath(null));
}
/**
* Setup API service web request.
*
* @param appName
* @param requestPath
* @return
* @throws IOException
*/
private Builder getApiClient(String appName) throws IOException {
private Builder getApiClient(String requestPath)
throws IOException {
Client client = Client.create(getClientConfig());
Configuration conf = getConfig();
client.setChunkedEncodingSize(null);
Builder builder = client
.resource(getApiUrl(appName)).type(MediaType.APPLICATION_JSON);
.resource(requestPath).type(MediaType.APPLICATION_JSON);
if (conf.get("hadoop.http.authentication.type").equals("kerberos")) {
AuthenticatedURL.Token token = new AuthenticatedURL.Token();
builder.header("WWW-Authenticate", token);
@ -312,7 +335,7 @@ public class ApiServiceClient extends AppAdminClient {
service.setName(appName);
service.setState(ServiceState.STOPPED);
String buffer = jsonSerDeser.toJson(service);
ClientResponse response = getApiClient(appName)
ClientResponse response = getApiClient(getServicePath(appName))
.put(ClientResponse.class, buffer);
result = processResponse(response);
} catch (Exception e) {
@ -335,7 +358,7 @@ public class ApiServiceClient extends AppAdminClient {
service.setName(appName);
service.setState(ServiceState.STARTED);
String buffer = jsonSerDeser.toJson(service);
ClientResponse response = getApiClient(appName)
ClientResponse response = getApiClient(getServicePath(appName))
.put(ClientResponse.class, buffer);
result = processResponse(response);
} catch (Exception e) {
@ -381,7 +404,7 @@ public class ApiServiceClient extends AppAdminClient {
public int actionDestroy(String appName) throws IOException, YarnException {
int result = EXIT_SUCCESS;
try {
ClientResponse response = getApiClient(appName)
ClientResponse response = getApiClient(getServicePath(appName))
.delete(ClientResponse.class);
result = processResponse(response);
} catch (Exception e) {
@ -413,7 +436,7 @@ public class ApiServiceClient extends AppAdminClient {
service.addComponent(component);
}
String buffer = jsonSerDeser.toJson(service);
ClientResponse response = getApiClient(appName)
ClientResponse response = getApiClient(getServicePath(appName))
.put(ClientResponse.class, buffer);
result = processResponse(response);
} catch (Exception e) {
@ -454,7 +477,8 @@ public class ApiServiceClient extends AppAdminClient {
ServiceApiUtil.validateNameFormat(appName, getConfig());
}
try {
ClientResponse response = getApiClient(appName).get(ClientResponse.class);
ClientResponse response = getApiClient(getServicePath(appName))
.get(ClientResponse.class);
if (response.getStatus() != 200) {
StringBuilder sb = new StringBuilder();
sb.append(appName);
@ -470,16 +494,20 @@ public class ApiServiceClient extends AppAdminClient {
}
@Override
public int actionUpgrade(String appName,
String fileName) throws IOException, YarnException {
public int initiateUpgrade(String appName,
String fileName, boolean autoFinalize) throws IOException, YarnException {
int result;
try {
Service service =
loadAppJsonFromLocalFS(fileName, appName, null, null);
service.setState(ServiceState.UPGRADING);
if (autoFinalize) {
service.setState(ServiceState.UPGRADING_AUTO_FINALIZE);
} else {
service.setState(ServiceState.UPGRADING);
}
String buffer = jsonSerDeser.toJson(service);
ClientResponse response = getApiClient()
.post(ClientResponse.class, buffer);
ClientResponse response = getApiClient(getServicePath(appName))
.put(ClientResponse.class, buffer);
result = processResponse(response);
} catch (Exception e) {
LOG.error("Failed to upgrade application: ", e);
@ -487,4 +515,32 @@ public class ApiServiceClient extends AppAdminClient {
}
return result;
}
@Override
public int actionUpgradeInstances(String appName, List<String> compInstances)
throws IOException, YarnException {
int result;
Container[] toUpgrade = new Container[compInstances.size()];
try {
int idx = 0;
for (String instanceName : compInstances) {
Container container = new Container();
container.setComponentInstanceName(instanceName);
container.setState(ContainerState.UPGRADING);
toUpgrade[idx++] = container;
}
String buffer = containerJsonSerde.toJson(toUpgrade);
ClientResponse response = getApiClient(getInstancesPath(appName))
.put(ClientResponse.class, buffer);
result = processResponse(response);
} catch (Exception e) {
LOG.error("Failed to upgrade component instance: ", e);
result = EXIT_EXCEPTION_THROWN;
}
return result;
}
private static JsonSerDeser<Container[]> containerJsonSerde =
new JsonSerDeser<>(Container[].class,
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
}

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.yarn.service.webapp;
import com.google.common.collect.Lists;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@ -28,10 +29,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.api.records.ServiceStatus;
import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.service.conf.RestApiConstants;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -53,9 +58,12 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED;
import static org.apache.hadoop.yarn.service.conf.RestApiConstants.*;
@ -177,17 +185,7 @@ public class ApiServer {
}
UserGroupInformation ugi = getProxyUser(request);
LOG.info("GET: getService for appName = {} user = {}", appName, ugi);
Service app = ugi.doAs(new PrivilegedExceptionAction<Service>() {
@Override
public Service run() throws IOException, YarnException {
ServiceClient sc = getServiceClient();
sc.init(YARN_CONFIG);
sc.start();
Service app = sc.getStatus(appName);
sc.close();
return app;
}
});
Service app = getServiceFromClient(ugi, appName);
return Response.ok(app).build();
} catch (AccessControlException e) {
return formatResponse(Status.FORBIDDEN, e.getMessage());
@ -393,17 +391,19 @@ public class ApiServer {
return startService(appName, ugi);
}
// If an UPGRADE is requested
if (updateServiceData.getState() != null && (
updateServiceData.getState() == ServiceState.UPGRADING ||
updateServiceData.getState() ==
ServiceState.UPGRADING_AUTO_FINALIZE)) {
return upgradeService(updateServiceData, ugi);
}
// If new lifetime value specified then update it
if (updateServiceData.getLifetime() != null
&& updateServiceData.getLifetime() > 0) {
return updateLifetime(appName, updateServiceData, ugi);
}
// If an UPGRADE is requested
if (updateServiceData.getState() != null &&
updateServiceData.getState() == ServiceState.UPGRADING) {
return upgradeService(updateServiceData, ugi);
}
} catch (UndeclaredThrowableException e) {
return formatResponse(Status.BAD_REQUEST,
e.getCause().getMessage());
@ -427,6 +427,103 @@ public class ApiServer {
return Response.status(Status.NO_CONTENT).build();
}
@PUT
@Path(COMP_INSTANCE_LONG_PATH)
@Consumes({MediaType.APPLICATION_JSON})
@Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8, MediaType.TEXT_PLAIN})
public Response updateComponentInstance(@Context HttpServletRequest request,
@PathParam(SERVICE_NAME) String serviceName,
@PathParam(COMPONENT_NAME) String componentName,
@PathParam(COMP_INSTANCE_NAME) String compInstanceName,
Container reqContainer) {
try {
UserGroupInformation ugi = getProxyUser(request);
LOG.info("PUT: update component instance {} for component = {}" +
" service = {} user = {}", compInstanceName, componentName,
serviceName, ugi);
if (reqContainer == null) {
throw new YarnException("No container data provided.");
}
Service service = getServiceFromClient(ugi, serviceName);
Component component = service.getComponent(componentName);
if (component == null) {
throw new YarnException(String.format(
"The component name in the URI path (%s) is invalid.",
componentName));
}
Container liveContainer = component.getComponentInstance(
compInstanceName);
if (liveContainer == null) {
throw new YarnException(String.format(
"The component (%s) does not have a component instance (%s).",
componentName, compInstanceName));
}
if (reqContainer.getState() != null
&& reqContainer.getState().equals(ContainerState.UPGRADING)) {
return processContainerUpgrade(ugi, service,
Lists.newArrayList(liveContainer));
}
} catch (AccessControlException e) {
return formatResponse(Response.Status.FORBIDDEN, e.getMessage());
} catch (YarnException e) {
return formatResponse(Response.Status.BAD_REQUEST, e.getMessage());
} catch (IOException | InterruptedException e) {
return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
} catch (UndeclaredThrowableException e) {
return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
e.getCause().getMessage());
}
return Response.status(Status.NO_CONTENT).build();
}
@PUT
@Path(COMP_INSTANCES_PATH)
@Consumes({MediaType.APPLICATION_JSON})
@Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8, MediaType.TEXT_PLAIN})
public Response updateComponentInstances(@Context HttpServletRequest request,
@PathParam(SERVICE_NAME) String serviceName,
List<Container> requestContainers) {
try {
if (requestContainers == null || requestContainers.isEmpty()) {
throw new YarnException("No containers provided.");
}
UserGroupInformation ugi = getProxyUser(request);
List<String> toUpgrade = new ArrayList<>();
for (Container reqContainer : requestContainers) {
if (reqContainer.getState() != null &&
reqContainer.getState().equals(ContainerState.UPGRADING)) {
toUpgrade.add(reqContainer.getComponentInstanceName());
}
}
if (!toUpgrade.isEmpty()) {
Service service = getServiceFromClient(ugi, serviceName);
LOG.info("PUT: upgrade component instances {} for service = {} " +
"user = {}", toUpgrade, serviceName, ugi);
List<Container> liveContainers = ServiceApiUtil
.getLiveContainers(service, toUpgrade);
return processContainerUpgrade(ugi, service, liveContainers);
}
} catch (AccessControlException e) {
return formatResponse(Response.Status.FORBIDDEN, e.getMessage());
} catch (YarnException e) {
return formatResponse(Response.Status.BAD_REQUEST, e.getMessage());
} catch (IOException | InterruptedException e) {
return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
} catch (UndeclaredThrowableException e) {
return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
e.getCause().getMessage());
}
return Response.status(Status.NO_CONTENT).build();
}
private Response flexService(Service service, UserGroupInformation ugi)
throws IOException, InterruptedException {
String appName = service.getName();
@ -511,17 +608,70 @@ public class ApiServer {
ServiceClient sc = getServiceClient();
sc.init(YARN_CONFIG);
sc.start();
sc.actionUpgrade(service);
sc.initiateUpgrade(service);
sc.close();
return null;
});
LOG.info("Service {} version {} upgrade initialized");
LOG.info("Service {} version {} upgrade initialized", service.getName(),
service.getVersion());
status.setDiagnostics("Service " + service.getName() +
" version " + service.getVersion() + " saved.");
status.setState(ServiceState.ACCEPTED);
return formatResponse(Status.ACCEPTED, status);
}
private Response processContainerUpgrade(UserGroupInformation ugi,
Service service, List<Container> containers) throws YarnException,
IOException, InterruptedException {
if (service.getState() != ServiceState.UPGRADING) {
throw new YarnException(
String.format("The upgrade of service %s has not been initiated.",
service.getName()));
}
for (Container liveContainer : containers) {
if (liveContainer.getState() != ContainerState.NEEDS_UPGRADE) {
// Nothing to upgrade
throw new YarnException(String.format(
"The component instance (%s) does not need an upgrade.",
liveContainer.getComponentInstanceName()));
}
}
Integer result = ugi.doAs((PrivilegedExceptionAction<Integer>) () -> {
int result1;
ServiceClient sc = getServiceClient();
sc.init(YARN_CONFIG);
sc.start();
result1 = sc.actionUpgrade(service, containers);
sc.close();
return result1;
});
if (result == EXIT_SUCCESS) {
ServiceStatus status = new ServiceStatus();
status.setDiagnostics(
"Upgrading component instances " + containers.stream()
.map(Container::getId).collect(Collectors.joining(",")) + ".");
return formatResponse(Response.Status.ACCEPTED, status);
}
// If result is not a success, consider it a no-op
return Response.status(Response.Status.NO_CONTENT).build();
}
private Service getServiceFromClient(UserGroupInformation ugi,
String serviceName) throws IOException, InterruptedException {
return ugi.doAs((PrivilegedExceptionAction<Service>) () -> {
ServiceClient sc = getServiceClient();
sc.init(YARN_CONFIG);
sc.start();
Service app1 = sc.getStatus(serviceName);
sc.close();
return app1;
});
}
/**
* Used by negative test case.
*

View File

@ -17,17 +17,24 @@
package org.apache.hadoop.yarn.service;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* A mock version of ServiceClient - This class is design
* to simulate various error conditions that will happen
@ -36,14 +43,31 @@ import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
public class ServiceClientTest extends ServiceClient {
private Configuration conf = new Configuration();
protected static void init() {
}
private Service goodServiceStatus = buildLiveGoodService();
private boolean initialized;
public ServiceClientTest() {
super();
}
@Override
public void init(Configuration conf) {
if (!initialized) {
super.init(conf);
initialized = true;
}
}
@Override
public void stop() {
// This is needed for testing API Server which use client to get status
// and then perform an action.
}
public void forceStop() {
super.stop();
}
@Override
public Configuration getConfig() {
return conf;
@ -58,11 +82,8 @@ public class ServiceClientTest extends ServiceClient {
@Override
public Service getStatus(String appName) {
if (appName == null) {
throw new NullPointerException();
}
if (appName.equals("jenkins")) {
return new Service();
if (appName != null && appName.equals("jenkins")) {
return goodServiceStatus;
} else {
throw new IllegalArgumentException();
}
@ -71,10 +92,7 @@ public class ServiceClientTest extends ServiceClient {
@Override
public int actionStart(String serviceName)
throws YarnException, IOException {
if (serviceName == null) {
throw new NullPointerException();
}
if (serviceName.equals("jenkins")) {
if (serviceName != null && serviceName.equals("jenkins")) {
return EXIT_SUCCESS;
} else {
throw new ApplicationNotFoundException("");
@ -98,19 +116,77 @@ public class ServiceClientTest extends ServiceClient {
@Override
public int actionDestroy(String serviceName) {
if (serviceName == null) {
throw new NullPointerException();
if (serviceName != null) {
if (serviceName.equals("jenkins")) {
return EXIT_SUCCESS;
} else if (serviceName.equals("jenkins-already-stopped")) {
return EXIT_SUCCESS;
} else if (serviceName.equals("jenkins-doesn't-exist")) {
return EXIT_NOT_FOUND;
} else if (serviceName.equals("jenkins-error-cleaning-registry")) {
return EXIT_OTHER_FAILURE;
}
}
if (serviceName.equals("jenkins")) {
throw new IllegalArgumentException();
}
@Override
public int initiateUpgrade(Service service) throws YarnException,
IOException {
if (service.getName() != null && service.getName().equals("jenkins")) {
return EXIT_SUCCESS;
} else if (serviceName.equals("jenkins-already-stopped")) {
return EXIT_SUCCESS;
} else if (serviceName.equals("jenkins-doesn't-exist")) {
return EXIT_NOT_FOUND;
} else if (serviceName.equals("jenkins-error-cleaning-registry")) {
return EXIT_OTHER_FAILURE;
} else {
throw new IllegalArgumentException();
}
}
@Override
public int actionUpgrade(Service service, List<Container> compInstances)
throws IOException, YarnException {
if (service.getName() != null && service.getName().equals("jenkins")) {
return EXIT_SUCCESS;
} else {
throw new IllegalArgumentException();
}
}
Service getGoodServiceStatus() {
return goodServiceStatus;
}
static Service buildGoodService() {
Service service = new Service();
service.setName("jenkins");
service.setVersion("v1");
Artifact artifact = new Artifact();
artifact.setType(Artifact.TypeEnum.DOCKER);
artifact.setId("jenkins:latest");
Resource resource = new Resource();
resource.setCpus(1);
resource.setMemory("2048");
List<Component> components = new ArrayList<>();
Component c = new Component();
c.setName("jenkins");
c.setNumberOfContainers(2L);
c.setArtifact(artifact);
c.setLaunchCommand("");
c.setResource(resource);
components.add(c);
service.setComponents(components);
return service;
}
static Service buildLiveGoodService() {
Service service = buildGoodService();
Component comp = service.getComponents().iterator().next();
List<Container> containers = new ArrayList<>();
for (int i = 0; i < comp.getNumberOfContainers(); i++) {
Container container = new Container();
container.setComponentInstanceName(comp.getName() + "-" + (i + 1));
container.setState(ContainerState.READY);
containers.add(container);
}
comp.setContainers(containers);
return service;
}
}

View File

@ -35,12 +35,14 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.Artifact.TypeEnum;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.api.records.ServiceStatus;
import org.apache.hadoop.yarn.service.client.ServiceClient;
import org.apache.hadoop.yarn.service.webapp.ApiServer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@ -52,13 +54,14 @@ import org.mockito.Mockito;
public class TestApiServer {
private ApiServer apiServer;
private HttpServletRequest request;
private ServiceClientTest mockServerClient;
@Before
public void setup() throws Exception {
request = Mockito.mock(HttpServletRequest.class);
Mockito.when(request.getRemoteUser())
.thenReturn(System.getProperty("user.name"));
ServiceClient mockServerClient = new ServiceClientTest();
mockServerClient = new ServiceClientTest();
Configuration conf = new Configuration();
conf.set("yarn.api-service.service.client.class",
ServiceClientTest.class.getName());
@ -66,6 +69,11 @@ public class TestApiServer {
apiServer.setServiceClient(mockServerClient);
}
@After
public void teardown() {
mockServerClient.forceStop();
}
@Test
public void testPathAnnotation() {
assertNotNull(this.apiServer.getClass().getAnnotation(Path.class));
@ -107,24 +115,7 @@ public class TestApiServer {
BufferedWriter bw = new BufferedWriter(new FileWriter(dockerConfig));
bw.write(json);
bw.close();
Service service = new Service();
service.setName("jenkins");
service.setVersion("v1");
Artifact artifact = new Artifact();
artifact.setType(TypeEnum.DOCKER);
artifact.setId("jenkins:latest");
Resource resource = new Resource();
resource.setCpus(1);
resource.setMemory("2048");
List<Component> components = new ArrayList<Component>();
Component c = new Component();
c.setName("jenkins");
c.setNumberOfContainers(1L);
c.setArtifact(artifact);
c.setLaunchCommand("");
c.setResource(resource);
components.add(c);
service.setComponents(components);
Service service = ServiceClientTest.buildGoodService();
final Response actual = apiServer.createService(request, service);
assertEquals("Create service is ",
Response.status(Status.ACCEPTED).build().getStatus(),
@ -495,4 +486,60 @@ public class TestApiServer {
+ "that in the URI path (jenkins-master)",
serviceStatus.getDiagnostics());
}
@Test
public void testInitiateUpgrade() {
Service goodService = ServiceClientTest.buildLiveGoodService();
goodService.setVersion("v2");
goodService.setState(ServiceState.UPGRADING);
final Response actual = apiServer.updateService(request,
goodService.getName(), goodService);
assertEquals("Initiate upgrade is ",
Response.status(Status.ACCEPTED).build().getStatus(),
actual.getStatus());
}
@Test
public void testUpgradeSingleInstance() {
Service goodService = ServiceClientTest.buildLiveGoodService();
Component comp = goodService.getComponents().iterator().next();
Container container = comp.getContainers().iterator().next();
container.setState(ContainerState.UPGRADING);
// To be able to upgrade, the service needs to be in UPGRADING
// and container state needs to be in NEEDS_UPGRADE.
Service serviceStatus = mockServerClient.getGoodServiceStatus();
serviceStatus.setState(ServiceState.UPGRADING);
serviceStatus.getComponents().iterator().next().getContainers().iterator()
.next().setState(ContainerState.NEEDS_UPGRADE);
final Response actual = apiServer.updateComponentInstance(request,
goodService.getName(), comp.getName(),
container.getComponentInstanceName(), container);
assertEquals("Instance upgrade is ",
Response.status(Status.ACCEPTED).build().getStatus(),
actual.getStatus());
}
@Test
public void testUpgradeMultipleInstances() {
Service goodService = ServiceClientTest.buildLiveGoodService();
Component comp = goodService.getComponents().iterator().next();
comp.getContainers().forEach(container ->
container.setState(ContainerState.UPGRADING));
// To be able to upgrade, the service needs to be in UPGRADING
// and container state needs to be in NEEDS_UPGRADE.
Service serviceStatus = mockServerClient.getGoodServiceStatus();
serviceStatus.setState(ServiceState.UPGRADING);
serviceStatus.getComponents().iterator().next().getContainers().forEach(
container -> container.setState(ContainerState.NEEDS_UPGRADE)
);
final Response actual = apiServer.updateComponentInstances(request,
goodService.getName(), comp.getContainers());
assertEquals("Instance upgrade is ",
Response.status(Status.ACCEPTED).build().getStatus(),
actual.getStatus());
}
}

View File

@ -26,6 +26,7 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.eclipse.jetty.server.Server;
@ -256,4 +257,29 @@ public class TestApiServiceClient {
}
}
@Test
public void testInitiateServiceUpgrade() {
String appName = "example-app";
String upgradeFileName = "target/test-classes/example-app.json";
try {
int result = asc.initiateUpgrade(appName, upgradeFileName, false);
assertEquals(EXIT_SUCCESS, result);
} catch (IOException | YarnException e) {
fail();
}
}
@Test
public void testInstancesUpgrade() {
String appName = "example-app";
try {
int result = asc.actionUpgradeInstances(appName, Lists.newArrayList(
"comp-1", "comp-2"));
assertEquals(EXIT_SUCCESS, result);
} catch (IOException | YarnException e) {
fail();
}
}
}

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.service;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
@ -49,4 +51,8 @@ public interface ClientAMProtocol {
RestartServiceResponseProto restart(RestartServiceRequestProto request)
throws IOException, YarnException;
CompInstancesUpgradeResponseProto upgrade(
CompInstancesUpgradeRequestProto request) throws IOException,
YarnException;
}

View File

@ -26,8 +26,11 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
@ -40,6 +43,8 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -151,12 +156,16 @@ public class ClientAMService extends AbstractService
@Override
public UpgradeServiceResponseProto upgrade(
UpgradeServiceRequestProto request) throws IOException {
ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE);
event.setVersion(request.getVersion());
context.scheduler.getDispatcher().getEventHandler().handle(event);
LOG.info("Upgrading service to version {} by {}", request.getVersion(),
UserGroupInformation.getCurrentUser());
return UpgradeServiceResponseProto.newBuilder().build();
try {
context.getServiceManager().processUpgradeRequest(request.getVersion(),
request.getAutoFinalize());
LOG.info("Upgrading service to version {} by {}", request.getVersion(),
UserGroupInformation.getCurrentUser());
return UpgradeServiceResponseProto.newBuilder().build();
} catch (Exception ex) {
return UpgradeServiceResponseProto.newBuilder().setError(ex.getMessage())
.build();
}
}
@Override
@ -167,4 +176,21 @@ public class ClientAMService extends AbstractService
LOG.info("Restart service by {}", UserGroupInformation.getCurrentUser());
return RestartServiceResponseProto.newBuilder().build();
}
@Override
public CompInstancesUpgradeResponseProto upgrade(
CompInstancesUpgradeRequestProto request)
throws IOException, YarnException {
if (!request.getContainerIdsList().isEmpty()) {
for (String containerId : request.getContainerIdsList()) {
ComponentInstanceEvent event =
new ComponentInstanceEvent(ContainerId.fromString(containerId),
ComponentInstanceEventType.UPGRADE);
LOG.info("Upgrade container {}", containerId);
context.scheduler.getDispatcher().getEventHandler().handle(event);
}
}
return CompInstancesUpgradeResponseProto.newBuilder().build();
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.service;
import com.google.common.base.Preconditions;
import com.google.common.cache.LoadingCache;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
@ -42,8 +43,17 @@ public class ServiceContext {
public String principal;
// AM keytab location
public String keytab;
private ServiceManager serviceManager;
public ServiceContext() {
}
public ServiceManager getServiceManager() {
return serviceManager;
}
void setServiceManager(ServiceManager serviceManager) {
this.serviceManager = Preconditions.checkNotNull(serviceManager);
}
}

View File

@ -28,6 +28,7 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> {
private final ServiceEventType type;
private String version;
private boolean autoFinalize;
public ServiceEvent(ServiceEventType serviceEventType) {
super(serviceEventType);
@ -46,4 +47,13 @@ public class ServiceEvent extends AbstractEvent<ServiceEventType> {
this.version = version;
return this;
}
public boolean isAutoFinalize() {
return autoFinalize;
}
public ServiceEvent setAutoFinalize(boolean autoFinalize) {
this.autoFinalize = autoFinalize;
return this;
}
}

View File

@ -24,5 +24,5 @@ package org.apache.hadoop.yarn.service;
public enum ServiceEventType {
START,
UPGRADE,
STOP_UPGRADE
CHECK_STABLE
}

View File

@ -1,4 +1,4 @@
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.ComponentEventType;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
@ -39,10 +40,13 @@ import java.io.IOException;
import java.text.MessageFormat;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.yarn.service.utils.ServiceApiUtil.jsonSerDeser;
/**
* Manages the state of the service.
* Manages the state of Service.
*/
public class ServiceManager implements EventHandler<ServiceEvent> {
private static final Logger LOG = LoggerFactory.getLogger(
@ -56,10 +60,10 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
private final StateMachine<State, ServiceEventType, ServiceEvent>
stateMachine;
private final UpgradeComponentsFinder componentsFinder;
private final AsyncDispatcher dispatcher;
private final SliderFileSystem fs;
private final UpgradeComponentsFinder componentsFinder;
private String upgradeVersion;
@ -72,9 +76,16 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
State.UPGRADING), ServiceEventType.UPGRADE,
new StartUpgradeTransition())
.addTransition(State.STABLE, EnumSet.of(State.STABLE),
ServiceEventType.CHECK_STABLE, new CheckStableTransition())
.addTransition(State.UPGRADING, EnumSet.of(State.STABLE,
State.UPGRADING), ServiceEventType.START,
new StopUpgradeTransition())
new CheckStableTransition())
.addTransition(State.UPGRADING, EnumSet.of(State.STABLE,
State.UPGRADING), ServiceEventType.CHECK_STABLE,
new CheckStableTransition())
.installTopology();
public ServiceManager(ServiceContext context) {
@ -102,7 +113,7 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitionException e) {
LOG.error(MessageFormat.format(
"[SERVICE]: Invalid event {0} at {1}.", event.getType(),
"[SERVICE]: Invalid event {1} at {2}.", event.getType(),
oldState), e);
}
if (oldState != getState()) {
@ -130,22 +141,11 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
public State transition(ServiceManager serviceManager,
ServiceEvent event) {
try {
Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
serviceManager.fs, serviceManager.getName(), event.getVersion());
serviceManager.serviceSpec.setState(ServiceState.UPGRADING);
List<org.apache.hadoop.yarn.service.api.records.Component>
compsThatNeedUpgrade = serviceManager.componentsFinder.
findTargetComponentSpecs(serviceManager.serviceSpec, targetSpec);
if (compsThatNeedUpgrade != null && !compsThatNeedUpgrade.isEmpty()) {
compsThatNeedUpgrade.forEach(component -> {
ComponentEvent needUpgradeEvent = new ComponentEvent(
component.getName(), ComponentEventType.UPGRADE).
setTargetSpec(component);
serviceManager.dispatcher.getEventHandler().handle(
needUpgradeEvent);
});
if (!event.isAutoFinalize()) {
serviceManager.serviceSpec.setState(ServiceState.UPGRADING);
} else {
serviceManager.serviceSpec.setState(
ServiceState.UPGRADING_AUTO_FINALIZE);
}
serviceManager.upgradeVersion = event.getVersion();
return State.UPGRADING;
@ -157,22 +157,29 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
}
}
private static class StopUpgradeTransition implements
private static class CheckStableTransition implements
MultipleArcTransition<ServiceManager, ServiceEvent, State> {
@Override
public State transition(ServiceManager serviceManager,
ServiceEvent event) {
//abort is not supported currently
//trigger re-check of service state
ServiceMaster.checkAndUpdateServiceState(serviceManager.scheduler,
true);
if (serviceManager.serviceSpec.getState().equals(ServiceState.STABLE)) {
return serviceManager.finalizeUpgrade() ? State.STABLE :
State.UPGRADING;
} else {
return State.UPGRADING;
//trigger check of service state
ServiceState currState = serviceManager.serviceSpec.getState();
if (currState.equals(ServiceState.STABLE)) {
return State.STABLE;
}
if (currState.equals(ServiceState.UPGRADING_AUTO_FINALIZE) ||
event.getType().equals(ServiceEventType.START)) {
ServiceState targetState = checkIfStable(serviceManager.serviceSpec);
if (targetState.equals(ServiceState.STABLE)) {
if (serviceManager.finalizeUpgrade()) {
LOG.info("Service def state changed from {} -> {}", currState,
serviceManager.serviceSpec.getState());
return State.STABLE;
}
}
}
return State.UPGRADING;
}
}
@ -181,12 +188,21 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
*/
private boolean finalizeUpgrade() {
try {
Service upgradeSpec = ServiceApiUtil.loadServiceUpgrade(
// save the application id and state to
Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
fs, getName(), upgradeVersion);
ServiceApiUtil.writeAppDefinition(fs,
ServiceApiUtil.getServiceJsonPath(fs, getName()), upgradeSpec);
targetSpec.setId(serviceSpec.getId());
targetSpec.setState(ServiceState.STABLE);
Map<String, Component> allComps = scheduler.getAllComponents();
targetSpec.getComponents().forEach(compSpec -> {
Component comp = allComps.get(compSpec.getName());
compSpec.setState(comp.getComponentSpec().getState());
});
jsonSerDeser.save(fs.getFileSystem(),
ServiceApiUtil.getServiceJsonPath(fs, getName()), targetSpec, true);
fs.deleteClusterUpgradeDir(getName(), upgradeVersion);
} catch (IOException e) {
LOG.error("Upgrade did not complete because unable to overwrite the" +
LOG.error("Upgrade did not complete because unable to re-write the" +
" service definition", e);
return false;
}
@ -195,13 +211,79 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
fs.deleteClusterUpgradeDir(getName(), upgradeVersion);
} catch (IOException e) {
LOG.warn("Unable to delete upgrade definition for service {} " +
"version {}", getName(), upgradeVersion);
"version {}", getName(), upgradeVersion);
}
serviceSpec.setState(ServiceState.STABLE);
serviceSpec.setVersion(upgradeVersion);
upgradeVersion = null;
return true;
}
private static ServiceState checkIfStable(Service service) {
// if desired == running
for (org.apache.hadoop.yarn.service.api.records.Component comp :
service.getComponents()) {
if (!comp.getState().equals(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE)) {
return service.getState();
}
}
return ServiceState.STABLE;
}
/**
* Service state gets directly modified by ServiceMaster and Component.
* This is a problem for upgrade and flexing. For now, invoking
* ServiceMaster.checkAndUpdateServiceState here to make it easy to fix
* this in future.
*/
public void checkAndUpdateServiceState(boolean isIncrement) {
writeLock.lock();
try {
if (!getState().equals(State.UPGRADING)) {
ServiceMaster.checkAndUpdateServiceState(this.scheduler,
isIncrement);
}
} finally {
writeLock.unlock();
}
}
void processUpgradeRequest(String upgradeVersion,
boolean autoFinalize) throws IOException {
Service targetSpec = ServiceApiUtil.loadServiceUpgrade(
context.fs, context.service.getName(), upgradeVersion);
List<org.apache.hadoop.yarn.service.api.records.Component>
compsThatNeedUpgrade = componentsFinder.
findTargetComponentSpecs(context.service, targetSpec);
ServiceEvent event = new ServiceEvent(ServiceEventType.UPGRADE)
.setVersion(upgradeVersion)
.setAutoFinalize(autoFinalize);
context.scheduler.getDispatcher().getEventHandler().handle(event);
if (compsThatNeedUpgrade != null && !compsThatNeedUpgrade.isEmpty()) {
if (autoFinalize) {
event.setAutoFinalize(true);
}
compsThatNeedUpgrade.forEach(component -> {
ComponentEvent needUpgradeEvent = new ComponentEvent(
component.getName(), ComponentEventType.UPGRADE)
.setTargetSpec(component)
.setUpgradeVersion(event.getVersion());
context.scheduler.getDispatcher().getEventHandler().handle(
needUpgradeEvent);
});
} else {
// nothing to upgrade if upgrade auto finalize is requested, trigger a
// state check.
if (autoFinalize) {
context.scheduler.getDispatcher().getEventHandler().handle(
new ServiceEvent(ServiceEventType.CHECK_STABLE));
}
}
}
/**
* Returns the name of the service.
*/
@ -216,10 +298,8 @@ public class ServiceManager implements EventHandler<ServiceEvent> {
STABLE, UPGRADING
}
@VisibleForTesting
Service getServiceSpec() {
return serviceSpec;
}
}

View File

@ -329,6 +329,7 @@ public class ServiceScheduler extends CompositeService {
// Since AM has been started and registered, the service is in STARTED state
app.setState(ServiceState.STARTED);
serviceManager = new ServiceManager(context);
context.setServiceManager(serviceManager);
// recover components based on containers sent from RM
recoverComponents(response);
@ -757,6 +758,32 @@ public class ServiceScheduler extends CompositeService {
// automatically which will trigger stopping COMPONENT INSTANCE
}
@Override
public void onContainerReInitialize(ContainerId containerId) {
ComponentInstance instance = liveInstances.get(containerId);
if (instance == null) {
LOG.error("No component instance exists for {}", containerId);
return;
}
ComponentInstanceEvent becomeReadyEvent = new ComponentInstanceEvent(
containerId, ComponentInstanceEventType.BECOME_READY);
dispatcher.getEventHandler().handle(becomeReadyEvent);
}
@Override
public void onContainerReInitializeError(ContainerId containerId,
Throwable t) {
ComponentInstance instance = liveInstances.get(containerId);
if (instance == null) {
LOG.error("No component instance exists for {}", containerId);
return;
}
ComponentEvent event = new ComponentEvent(instance.getCompName(),
ComponentEventType.CONTAINER_COMPLETED)
.setInstance(instance).setContainerId(containerId);
dispatcher.getEventHandler().handle(event);
}
@Override public void onContainerResourceIncreased(ContainerId containerId,
Resource resource) {

View File

@ -250,6 +250,15 @@ public class Component implements Serializable {
return null;
}
public Container getComponentInstance(String compInstanceName) {
for (Container container : containers) {
if (compInstanceName.equals(container.getComponentInstanceName())) {
return container;
}
}
return null;
}
/**
* Run all containers of this component in privileged mode (YARN-4262).
**/
@ -441,4 +450,16 @@ public class Component implements Serializable {
this.setReadinessCheck(that.getReadinessCheck());
}
}
public void overwrite(Component that) {
setArtifact(that.getArtifact());
setResource(that.resource);
setNumberOfContainers(that.getNumberOfContainers());
setLaunchCommand(that.getLaunchCommand());
setConfiguration(that.configuration);
setRunPrivilegedContainer(that.getRunPrivilegedContainer());
setDependencies(that.getDependencies());
setPlacementPolicy(that.getPlacementPolicy());
setReadinessCheck(that.getReadinessCheck());
}
}

View File

@ -26,5 +26,5 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Unstable
public enum ContainerState {
RUNNING_BUT_UNREADY, READY, STOPPED
RUNNING_BUT_UNREADY, READY, STOPPED, NEEDS_UPGRADE, UPGRADING;
}

View File

@ -29,5 +29,6 @@ import org.apache.hadoop.classification.InterfaceStability;
@ApiModel(description = "The current state of an service.")
@javax.annotation.Generated(value = "class io.swagger.codegen.languages.JavaClientCodegen", date = "2016-06-02T08:15:05.615-07:00")
public enum ServiceState {
ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING;
ACCEPTED, STARTED, STABLE, STOPPED, FAILED, FLEX, UPGRADING,
UPGRADING_AUTO_FINALIZE;
}

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.client.util.YarnClientUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
@ -59,8 +60,10 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
import org.apache.hadoop.yarn.service.ClientAMProtocol;
import org.apache.hadoop.yarn.service.ServiceMaster;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
@ -206,15 +209,21 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
}
@Override
public int actionUpgrade(String appName, String fileName)
public int initiateUpgrade(String appName, String fileName,
boolean autoFinalize)
throws IOException, YarnException {
checkAppExistOnHdfs(appName);
Service upgradeService = loadAppJsonFromLocalFS(fileName, appName,
null, null);
return actionUpgrade(upgradeService);
if (autoFinalize) {
upgradeService.setState(ServiceState.UPGRADING_AUTO_FINALIZE);
} else {
upgradeService.setState(ServiceState.UPGRADING);
}
return initiateUpgrade(upgradeService);
}
public int actionUpgrade(Service service) throws YarnException, IOException {
public int initiateUpgrade(Service service) throws YarnException,
IOException {
Service persistedService =
ServiceApiUtil.loadService(fs, service.getName());
if (!StringUtils.isEmpty(persistedService.getId())) {
@ -231,6 +240,15 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
throw new YarnException(message);
}
Service liveService = getStatus(service.getName());
if (!liveService.getState().equals(ServiceState.STABLE)) {
String message = service.getName() + " is at " +
liveService.getState()
+ " state, upgrade can not be invoked when service is STABLE.";
LOG.error(message);
throw new YarnException(message);
}
Path serviceUpgradeDir = checkAppNotExistOnHdfs(service, true);
ServiceApiUtil.validateAndResolveService(service, fs, getConfig());
ServiceApiUtil.createDirAndPersistApp(fs, serviceUpgradeDir, service);
@ -245,8 +263,56 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
UpgradeServiceRequestProto.Builder requestBuilder =
UpgradeServiceRequestProto.newBuilder();
requestBuilder.setVersion(service.getVersion());
if (service.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) {
requestBuilder.setAutoFinalize(true);
}
UpgradeServiceResponseProto responseProto = proxy.upgrade(
requestBuilder.build());
if (responseProto.hasError()) {
LOG.error("Service {} upgrade to version {} failed because {}",
service.getName(), service.getVersion(), responseProto.getError());
throw new YarnException("Failed to upgrade service " + service.getName()
+ " to version " + service.getVersion() + " because " +
responseProto.getError());
}
return EXIT_SUCCESS;
}
proxy.upgrade(requestBuilder.build());
@Override
public int actionUpgradeInstances(String appName,
List<String> componentInstances) throws IOException, YarnException {
checkAppExistOnHdfs(appName);
Service persistedService = ServiceApiUtil.loadService(fs, appName);
List<Container> containersToUpgrade = ServiceApiUtil.
getLiveContainers(persistedService, componentInstances);
return actionUpgrade(persistedService, containersToUpgrade);
}
public int actionUpgrade(Service service, List<Container> compInstances)
throws IOException, YarnException {
ApplicationReport appReport =
yarnClient.getApplicationReport(getAppId(service.getName()));
if (appReport.getYarnApplicationState() != RUNNING) {
String message = service.getName() + " is at " +
appReport.getYarnApplicationState()
+ " state, upgrade can only be invoked when service is running.";
LOG.error(message);
throw new YarnException(message);
}
if (StringUtils.isEmpty(appReport.getHost())) {
throw new YarnException(service.getName() + " AM hostname is empty.");
}
ClientAMProtocol proxy = createAMProxy(service.getName(), appReport);
List<String> containerIdsToUpgrade = new ArrayList<>();
compInstances.forEach(compInst ->
containerIdsToUpgrade.add(compInst.getId()));
LOG.info("instances to upgrade {}", containerIdsToUpgrade);
CompInstancesUpgradeRequestProto.Builder upgradeRequestBuilder =
CompInstancesUpgradeRequestProto.newBuilder();
upgradeRequestBuilder.addAllContainerIds(containerIdsToUpgrade);
proxy.upgrade(upgradeRequestBuilder.build());
return EXIT_SUCCESS;
}
@ -391,6 +457,17 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
LOG.error(message);
throw new YarnException(message);
}
Service liveService = getStatus(serviceName);
if (liveService.getState().equals(ServiceState.UPGRADING) ||
liveService.getState().equals(ServiceState.UPGRADING_AUTO_FINALIZE)) {
String message = serviceName + " is at " +
liveService.getState()
+ " state, flex can not be invoked when service is upgrading. ";
LOG.error(message);
throw new YarnException(message);
}
if (StringUtils.isEmpty(appReport.getHost())) {
throw new YarnException(serviceName + " AM hostname is empty");
}

View File

@ -34,20 +34,23 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.service.ServiceEvent;
import org.apache.hadoop.yarn.service.ServiceEventType;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
import org.apache.hadoop.yarn.service.ContainerFailureTracker;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.ServiceMaster;
import org.apache.hadoop.yarn.service.ServiceMetrics;
import org.apache.hadoop.yarn.service.ServiceScheduler;
import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
import org.apache.hadoop.yarn.service.api.records.ResourceInformation;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceId;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.service.monitor.probe.MonitorUtils;
import org.apache.hadoop.yarn.service.monitor.probe.Probe;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
import org.apache.hadoop.yarn.service.provider.ProviderUtils;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
@ -70,6 +73,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -109,6 +113,10 @@ public class Component implements EventHandler<ComponentEvent> {
// disk_failed containers etc. This will be reset to 0 periodically.
public AtomicInteger currentContainerFailure = new AtomicInteger(0);
private AtomicBoolean upgradeInProgress = new AtomicBoolean(false);
private ComponentEvent upgradeEvent;
private AtomicLong numContainersThatNeedUpgrade = new AtomicLong(0);
private StateMachine<ComponentState, ComponentEventType, ComponentEvent>
stateMachine;
private AsyncDispatcher dispatcher;
@ -131,7 +139,7 @@ public class Component implements EventHandler<ComponentEvent> {
.addTransition(FLEXING, FLEXING, CONTAINER_ALLOCATED,
new ContainerAllocatedTransition())
// container launched on NM
.addTransition(FLEXING, EnumSet.of(STABLE, FLEXING),
.addTransition(FLEXING, EnumSet.of(STABLE, FLEXING, UPGRADING),
CONTAINER_STARTED, new ContainerStartedTransition())
// container failed while flexing
.addTransition(FLEXING, FLEXING, CONTAINER_COMPLETED,
@ -151,12 +159,19 @@ public class Component implements EventHandler<ComponentEvent> {
// For flex down, go to STABLE state
.addTransition(STABLE, EnumSet.of(STABLE, FLEXING),
FLEX, new FlexComponentTransition())
.addTransition(STABLE, UPGRADING, UPGRADE,
.addTransition(STABLE, UPGRADING, ComponentEventType.UPGRADE,
new ComponentNeedsUpgradeTransition())
.addTransition(FLEXING, UPGRADING, UPGRADE,
new ComponentNeedsUpgradeTransition())
.addTransition(UPGRADING, UPGRADING, UPGRADE,
//Upgrade while previous upgrade is still in progress
.addTransition(UPGRADING, UPGRADING, ComponentEventType.UPGRADE,
new ComponentNeedsUpgradeTransition())
.addTransition(UPGRADING, EnumSet.of(UPGRADING, FLEXING, STABLE),
CHECK_STABLE, new CheckStableTransition())
.addTransition(FLEXING, EnumSet.of(UPGRADING, FLEXING, STABLE),
CHECK_STABLE, new CheckStableTransition())
.addTransition(STABLE, EnumSet.of(STABLE), CHECK_STABLE,
new CheckStableTransition())
.addTransition(UPGRADING, FLEXING, CONTAINER_COMPLETED,
new ContainerCompletedTransition())
.installTopology();
public Component(
@ -291,7 +306,10 @@ public class Component implements EventHandler<ComponentEvent> {
component.pendingInstances.remove(instance);
instance.setContainer(container);
ProviderUtils.initCompInstanceDir(component.getContext().fs, instance);
ProviderUtils.initCompInstanceDir(component.getContext().fs,
component.createLaunchContext(component.componentSpec,
component.scheduler.getApp().getVersion()), instance);
component.getScheduler().addLiveCompInstance(container.getId(), instance);
LOG.info("[COMPONENT {}]: Recovered {} for component instance {} on " +
"host {}, num pending component instances reduced to {} ",
@ -317,14 +335,21 @@ public class Component implements EventHandler<ComponentEvent> {
private static ComponentState checkIfStable(Component component) {
// if desired == running
if (component.componentMetrics.containersReady.value() == component
.getComponentSpec().getNumberOfContainers()) {
.getComponentSpec().getNumberOfContainers() &&
component.numContainersThatNeedUpgrade.get() == 0) {
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
return STABLE;
} else {
} else if (component.componentMetrics.containersReady.value() != component
.getComponentSpec().getNumberOfContainers()) {
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
return FLEXING;
} else {
// component.numContainersThatNeedUpgrade.get() > 0
component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
records.ComponentState.NEEDS_UPGRADE);
return UPGRADING;
}
}
@ -336,8 +361,9 @@ public class Component implements EventHandler<ComponentEvent> {
component.componentSpec.getState();
if (isIncrement) {
// check if all containers are in READY state
if (component.componentMetrics.containersReady
.value() == component.componentMetrics.containersDesired.value()) {
if (component.numContainersThatNeedUpgrade.get() == 0 &&
component.componentMetrics.containersReady.value() ==
component.componentMetrics.containersDesired.value()) {
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.STABLE);
if (curState != component.componentSpec.getState()) {
@ -346,8 +372,7 @@ public class Component implements EventHandler<ComponentEvent> {
component.componentSpec.getState());
}
// component state change will trigger re-check of service state
ServiceMaster.checkAndUpdateServiceState(component.scheduler,
isIncrement);
component.context.getServiceManager().checkAndUpdateServiceState(true);
}
} else {
// container moving out of READY state could be because of FLEX down so
@ -362,10 +387,13 @@ public class Component implements EventHandler<ComponentEvent> {
component.componentSpec.getState());
}
// component state change will trigger re-check of service state
ServiceMaster.checkAndUpdateServiceState(component.scheduler,
isIncrement);
component.context.getServiceManager().checkAndUpdateServiceState(false);
}
}
// when the service is stable then the state of component needs to
// transition to stable
component.dispatcher.getEventHandler().handle(new ComponentEvent(
component.getName(), ComponentEventType.CHECK_STABLE));
}
private static class ContainerCompletedTransition extends BaseTransition {
@ -377,15 +405,52 @@ public class Component implements EventHandler<ComponentEvent> {
STOP).setStatus(event.getStatus()));
component.componentSpec.setState(
org.apache.hadoop.yarn.service.api.records.ComponentState.FLEXING);
component.getScheduler().getApp().setState(ServiceState.STARTED);
if (component.context.service.getState().equals(ServiceState.STABLE)) {
component.getScheduler().getApp().setState(ServiceState.STARTED);
LOG.info("Service def state changed from {} -> {}",
ServiceState.STABLE, ServiceState.STARTED);
}
}
}
private static class ComponentNeedsUpgradeTransition extends BaseTransition {
@Override
public void transition(Component component, ComponentEvent event) {
component.upgradeInProgress.set(true);
component.componentSpec.setState(org.apache.hadoop.yarn.service.api.
records.ComponentState.NEEDS_UPGRADE);
component.numContainersThatNeedUpgrade.set(
component.componentSpec.getNumberOfContainers());
component.componentSpec.getContainers().forEach(container ->
container.setState(ContainerState.NEEDS_UPGRADE));
component.upgradeEvent = event;
}
}
private static class CheckStableTransition implements MultipleArcTransition
<Component, ComponentEvent, ComponentState> {
@Override
public ComponentState transition(Component component,
ComponentEvent componentEvent) {
org.apache.hadoop.yarn.service.api.records.ComponentState currState =
component.componentSpec.getState();
if (currState.equals(org.apache.hadoop.yarn.service.api.records
.ComponentState.STABLE)) {
return ComponentState.STABLE;
}
// checkIfStable also updates the state in definition when STABLE
ComponentState targetState = checkIfStable(component);
if (targetState.equals(STABLE) && component.upgradeInProgress.get()) {
component.componentSpec.overwrite(
component.upgradeEvent.getTargetSpec());
component.upgradeEvent = null;
ServiceEvent checkStable = new ServiceEvent(ServiceEventType.
CHECK_STABLE);
component.dispatcher.getEventHandler().handle(checkStable);
component.upgradeInProgress.set(false);
}
return targetState;
}
}
@ -421,8 +486,28 @@ public class Component implements EventHandler<ComponentEvent> {
"[COMPONENT {}]: Assigned {} to component instance {} and launch on host {} ",
getName(), container.getId(), instance.getCompInstanceName(),
container.getNodeId());
scheduler.getContainerLaunchService()
.launchCompInstance(scheduler.getApp(), instance, container);
if (upgradeInProgress.get()) {
scheduler.getContainerLaunchService()
.launchCompInstance(scheduler.getApp(), instance, container,
createLaunchContext(upgradeEvent.getTargetSpec(),
upgradeEvent.getUpgradeVersion()));
} else {
scheduler.getContainerLaunchService().launchCompInstance(
scheduler.getApp(), instance, container,
createLaunchContext(componentSpec, scheduler.getApp().getVersion()));
}
}
public ContainerLaunchService.ComponentLaunchContext createLaunchContext(
org.apache.hadoop.yarn.service.api.records.Component compSpec,
String version) {
ContainerLaunchService.ComponentLaunchContext launchContext =
new ContainerLaunchService.ComponentLaunchContext(compSpec.getName(),
version);
launchContext.setArtifact(compSpec.getArtifact())
.setConfiguration(compSpec.getConfiguration())
.setLaunchCommand(compSpec.getLaunchCommand());
return launchContext;
}
@SuppressWarnings({ "unchecked" })
@ -661,16 +746,24 @@ public class Component implements EventHandler<ComponentEvent> {
scheduler.getServiceMetrics().containersRunning.decr();
}
public void incContainersReady() {
public void incContainersReady(boolean updateDefinition) {
componentMetrics.containersReady.incr();
scheduler.getServiceMetrics().containersReady.incr();
checkAndUpdateComponentState(this, true);
if (updateDefinition) {
checkAndUpdateComponentState(this, true);
}
}
public void decContainersReady() {
public void decContainersReady(boolean updateDefinition) {
componentMetrics.containersReady.decr();
scheduler.getServiceMetrics().containersReady.decr();
checkAndUpdateComponentState(this, false);
if (updateDefinition) {
checkAndUpdateComponentState(this, false);
}
}
public void decContainersThatNeedUpgrade() {
numContainersThatNeedUpgrade.decrementAndGet();
}
public int getNumReadyInstances() {
@ -729,6 +822,16 @@ public class Component implements EventHandler<ComponentEvent> {
this.readLock.unlock();
}
}
public ComponentEvent getUpgradeEvent() {
this.readLock.lock();
try {
return upgradeEvent;
} finally {
this.readLock.unlock();
}
}
public ServiceScheduler getScheduler() {
return scheduler;
}

View File

@ -34,6 +34,7 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
private ContainerStatus status;
private ContainerId containerId;
private org.apache.hadoop.yarn.service.api.records.Component targetSpec;
private String upgradeVersion;
public ContainerId getContainerId() {
return containerId;
@ -103,4 +104,13 @@ public class ComponentEvent extends AbstractEvent<ComponentEventType> {
this.targetSpec = Preconditions.checkNotNull(targetSpec);
return this;
}
public String getUpgradeVersion() {
return upgradeVersion;
}
public ComponentEvent setUpgradeVersion(String upgradeVersion) {
this.upgradeVersion = upgradeVersion;
return this;
}
}

View File

@ -25,5 +25,5 @@ public enum ComponentEventType {
CONTAINER_STARTED,
CONTAINER_COMPLETED,
UPGRADE,
STOP_UPGRADE
CHECK_STABLE
}

View File

@ -41,6 +41,8 @@ import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.service.ServiceScheduler;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.ComponentEventType;
import org.apache.hadoop.yarn.service.monitor.probe.ProbeStatus;
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
import org.apache.hadoop.yarn.service.timelineservice.ServiceTimelinePublisher;
@ -116,10 +118,15 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
.addTransition(READY, STARTED, BECOME_NOT_READY,
new ContainerBecomeNotReadyTransition())
.addTransition(READY, INIT, STOP, new ContainerStoppedTransition())
.addTransition(READY, UPGRADING, UPGRADE,
new ContainerUpgradeTransition())
.addTransition(UPGRADING, UPGRADING, UPGRADE,
new ContainerUpgradeTransition())
.addTransition(UPGRADING, READY, BECOME_READY,
new ContainerBecomeReadyTransition())
.addTransition(UPGRADING, INIT, STOP, new ContainerStoppedTransition())
.installTopology();
public ComponentInstance(Component component,
ComponentInstanceId compInstanceId) {
this.stateMachine = stateMachineFactory.make(this);
@ -186,7 +193,17 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
public void transition(ComponentInstance compInstance,
ComponentInstanceEvent event) {
compInstance.containerSpec.setState(ContainerState.READY);
compInstance.component.incContainersReady();
if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) {
compInstance.component.incContainersReady(false);
compInstance.component.decContainersThatNeedUpgrade();
ComponentEvent checkState = new ComponentEvent(
compInstance.component.getName(), ComponentEventType.CHECK_STABLE);
compInstance.scheduler.getDispatcher().getEventHandler().handle(
checkState);
} else {
compInstance.component.incContainersReady(true);
}
if (compInstance.timelineServiceEnabled) {
compInstance.serviceTimelinePublisher
.componentInstanceBecomeReady(compInstance.containerSpec);
@ -199,7 +216,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
public void transition(ComponentInstance compInstance,
ComponentInstanceEvent event) {
compInstance.containerSpec.setState(ContainerState.RUNNING_BUT_UNREADY);
compInstance.component.decContainersReady();
compInstance.component.decContainersReady(true);
}
}
@ -225,9 +242,11 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
.getDiagnostics();
compInstance.diagnostics.append(containerDiag + System.lineSeparator());
compInstance.cancelContainerStatusRetriever();
if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) {
compInstance.component.decContainersThatNeedUpgrade();
}
if (compInstance.getState().equals(READY)) {
compInstance.component.decContainersReady();
compInstance.component.decContainersReady(true);
}
compInstance.component.decRunningContainers();
boolean shouldExit = false;
@ -287,6 +306,23 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
}
}
private static class ContainerUpgradeTransition extends BaseTransition {
@Override
public void transition(ComponentInstance compInstance,
ComponentInstanceEvent event) {
compInstance.containerSpec.setState(ContainerState.UPGRADING);
compInstance.component.decContainersReady(false);
ComponentEvent upgradeEvent = compInstance.component.getUpgradeEvent();
compInstance.scheduler.getContainerLaunchService()
.reInitCompInstance(compInstance.scheduler.getApp(), compInstance,
compInstance.container,
compInstance.component.createLaunchContext(
upgradeEvent.getTargetSpec(),
upgradeEvent.getUpgradeVersion()));
}
}
public ComponentInstanceState getState() {
this.readLock.lock();
@ -422,7 +458,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
component.decRunningContainers();
}
if (getState() == READY) {
component.decContainersReady();
component.decContainersReady(true);
component.decRunningContainers();
}
getCompSpec().removeContainer(containerSpec);

View File

@ -22,5 +22,6 @@ public enum ComponentInstanceEventType {
START,
STOP,
BECOME_READY,
BECOME_NOT_READY
BECOME_NOT_READY,
UPGRADE
}

View File

@ -17,6 +17,8 @@
package org.apache.hadoop.yarn.service.conf;
import javax.ws.rs.core.MediaType;
public interface RestApiConstants {
// Rest endpoints
@ -26,9 +28,19 @@ public interface RestApiConstants {
String SERVICE_PATH = "/services/{service_name}";
String COMPONENT_PATH = "/services/{service_name}/components/{component_name}";
String COMP_INSTANCE_PATH = SERVICE_PATH +
"/component-instances/{component_instance_name}";
String COMP_INSTANCE_LONG_PATH = COMPONENT_PATH +
"/component-instances/{component_instance_name}";
String COMP_INSTANCES = "component-instances";
String COMP_INSTANCES_PATH = SERVICE_PATH + "/" + COMP_INSTANCES;
// Query param
String SERVICE_NAME = "service_name";
String COMPONENT_NAME = "component_name";
String COMP_INSTANCE_NAME = "component_instance_name";
String MEDIA_TYPE_JSON_UTF8 = MediaType.APPLICATION_JSON + ";charset=utf-8";
Long DEFAULT_UNLIMITED_LIFETIME = -1l;

View File

@ -18,11 +18,12 @@
package org.apache.hadoop.yarn.service.containerlaunch;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.provider.ProviderService;
import org.apache.hadoop.yarn.service.provider.ProviderFactory;
@ -63,36 +64,57 @@ public class ContainerLaunchService extends AbstractService{
}
public void launchCompInstance(Service service,
ComponentInstance instance, Container container) {
ComponentInstance instance, Container container,
ComponentLaunchContext componentLaunchContext) {
ContainerLauncher launcher =
new ContainerLauncher(service, instance, container);
new ContainerLauncher(service, instance, container,
componentLaunchContext, false);
executorService.execute(launcher);
}
public void reInitCompInstance(Service service,
ComponentInstance instance, Container container,
ComponentLaunchContext componentLaunchContext) {
ContainerLauncher reInitializer = new ContainerLauncher(service, instance,
container, componentLaunchContext, true);
executorService.execute(reInitializer);
}
private class ContainerLauncher implements Runnable {
public final Container container;
public final Service service;
public ComponentInstance instance;
private final ComponentLaunchContext componentLaunchContext;
private final boolean reInit;
public ContainerLauncher(
Service service,
ComponentInstance instance, Container container) {
ContainerLauncher(Service service, ComponentInstance instance,
Container container, ComponentLaunchContext componentLaunchContext,
boolean reInit) {
this.container = container;
this.service = service;
this.instance = instance;
this.componentLaunchContext = componentLaunchContext;
this.reInit = reInit;
}
@Override public void run() {
Component compSpec = instance.getCompSpec();
ProviderService provider = ProviderFactory.getProviderService(
compSpec.getArtifact());
componentLaunchContext.getArtifact());
AbstractLauncher launcher = new AbstractLauncher(context);
try {
provider.buildContainerLaunchContext(launcher, service,
instance, fs, getConfig(), container);
instance.getComponent().getScheduler().getNmClient()
.startContainerAsync(container,
launcher.completeContainerLaunch());
instance, fs, getConfig(), container, componentLaunchContext);
if (!reInit) {
LOG.info("launching container {}", container.getId());
instance.getComponent().getScheduler().getNmClient()
.startContainerAsync(container,
launcher.completeContainerLaunch());
} else {
LOG.info("reInitializing container {}", container.getId());
instance.getComponent().getScheduler().getNmClient()
.reInitializeContainerAsync(container.getId(),
launcher.completeContainerLaunch(), true);
}
} catch (Exception e) {
LOG.error(instance.getCompInstanceId()
+ ": Failed to launch container. ", e);
@ -100,4 +122,58 @@ public class ContainerLaunchService extends AbstractService{
}
}
}
/**
* Launch context of a component.
*/
public static class ComponentLaunchContext {
private final String name;
private final String serviceVersion;
private Artifact artifact;
private org.apache.hadoop.yarn.service.api.records.Configuration
configuration;
private String launchCommand;
public ComponentLaunchContext(String name, String serviceVersion) {
this.name = Preconditions.checkNotNull(name);
this.serviceVersion = Preconditions.checkNotNull(serviceVersion);
}
public String getName() {
return name;
}
public String getServiceVersion() {
return serviceVersion;
}
public Artifact getArtifact() {
return artifact;
}
public org.apache.hadoop.yarn.service.api.records.
Configuration getConfiguration() {
return configuration;
}
public String getLaunchCommand() {
return launchCommand;
}
public ComponentLaunchContext setArtifact(Artifact artifact) {
this.artifact = artifact;
return this;
}
public ComponentLaunchContext setConfiguration(org.apache.hadoop.yarn.
service.api.records.Configuration configuration) {
this.configuration = configuration;
return this;
}
public ComponentLaunchContext setLaunchCommand(String launchCommand) {
this.launchCommand = launchCommand;
return this;
}
}
}

View File

@ -30,6 +30,8 @@ import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
@ -114,4 +116,16 @@ public class ClientAMProtocolPBClientImpl
}
return null;
}
@Override
public CompInstancesUpgradeResponseProto upgrade(
CompInstancesUpgradeRequestProto request)
throws IOException, YarnException {
try {
return proxy.upgrade(null, request);
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
}
return null;
}
}

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.service.impl.pb.service;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
@ -91,4 +93,14 @@ public class ClientAMProtocolPBServiceImpl implements ClientAMProtocolPB {
throw new ServiceException(e);
}
}
@Override
public CompInstancesUpgradeResponseProto upgrade(RpcController controller,
CompInstancesUpgradeRequestProto request) throws ServiceException {
try {
return real.upgrade(request);
} catch (IOException | YarnException e) {
throw new ServiceException(e);
}
}
}

View File

@ -23,8 +23,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.service.utils.ServiceUtils;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
@ -60,9 +60,9 @@ public abstract class AbstractProviderService implements ProviderService,
public void buildContainerLaunchContext(AbstractLauncher launcher,
Service service, ComponentInstance instance,
SliderFileSystem fileSystem, Configuration yarnConf, Container container)
SliderFileSystem fileSystem, Configuration yarnConf, Container container,
ContainerLaunchService.ComponentLaunchContext compLaunchContext)
throws IOException, SliderException {
Component component = instance.getComponent().getComponentSpec();;
processArtifact(launcher, instance, fileSystem, service);
ServiceContext context =
@ -72,11 +72,12 @@ public abstract class AbstractProviderService implements ProviderService,
Map<String, String> globalTokens =
instance.getComponent().getScheduler().globalTokens;
Map<String, String> tokensForSubstitution = ProviderUtils
.initCompTokensForSubstitute(instance, container);
.initCompTokensForSubstitute(instance, container,
compLaunchContext);
tokensForSubstitution.putAll(globalTokens);
// Set the environment variables in launcher
launcher.putEnv(ServiceUtils
.buildEnvMap(component.getConfiguration(), tokensForSubstitution));
launcher.putEnv(ServiceUtils.buildEnvMap(
compLaunchContext.getConfiguration(), tokensForSubstitution));
launcher.setEnv("WORK_DIR", ApplicationConstants.Environment.PWD.$());
launcher.setEnv("LOG_DIR", ApplicationConstants.LOG_DIR_EXPANSION_VAR);
if (System.getenv(HADOOP_USER_NAME) != null) {
@ -94,10 +95,10 @@ public abstract class AbstractProviderService implements ProviderService,
// create config file on hdfs and add local resource
ProviderUtils.createConfigFileAndAddLocalResource(launcher, fileSystem,
component, tokensForSubstitution, instance, context);
compLaunchContext, tokensForSubstitution, instance, context);
// substitute launch command
String launchCommand = component.getLaunchCommand();
String launchCommand = compLaunchContext.getLaunchCommand();
// docker container may have empty commands
if (!StringUtils.isEmpty(launchCommand)) {
launchCommand = ProviderUtils
@ -111,12 +112,12 @@ public abstract class AbstractProviderService implements ProviderService,
// By default retry forever every 30 seconds
launcher.setRetryContext(
YarnServiceConf.getInt(CONTAINER_RETRY_MAX, DEFAULT_CONTAINER_RETRY_MAX,
component.getConfiguration(), yarnConf),
compLaunchContext.getConfiguration(), yarnConf),
YarnServiceConf.getInt(CONTAINER_RETRY_INTERVAL,
DEFAULT_CONTAINER_RETRY_INTERVAL, component.getConfiguration(),
yarnConf),
DEFAULT_CONTAINER_RETRY_INTERVAL,
compLaunchContext.getConfiguration(), yarnConf),
YarnServiceConf.getLong(CONTAINER_FAILURES_VALIDITY_INTERVAL,
DEFAULT_CONTAINER_FAILURES_VALIDITY_INTERVAL,
component.getConfiguration(), yarnConf));
compLaunchContext.getConfiguration(), yarnConf));
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.service.provider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
import org.apache.hadoop.yarn.service.utils.SliderFileSystem;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
@ -35,6 +36,8 @@ public interface ProviderService {
*/
void buildContainerLaunchContext(AbstractLauncher containerLauncher,
Service service, ComponentInstance instance,
SliderFileSystem sliderFileSystem, Configuration yarnConf, Container
container) throws IOException, SliderException;
SliderFileSystem sliderFileSystem, Configuration yarnConf,
Container container,
ContainerLaunchService.ComponentLaunchContext componentLaunchContext)
throws IOException, SliderException;
}

View File

@ -27,12 +27,12 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.ConfigFile;
import org.apache.hadoop.yarn.service.api.records.ConfigFormat;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.conf.YarnServiceConstants;
import org.apache.hadoop.yarn.service.containerlaunch.AbstractLauncher;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
import org.apache.hadoop.yarn.service.exceptions.BadCommandArgumentsException;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.utils.PublishedConfiguration;
@ -51,7 +51,11 @@ import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.*;
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.COMPONENT_ID;
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.COMPONENT_INSTANCE_NAME;
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.COMPONENT_NAME;
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.COMPONENT_NAME_LC;
import static org.apache.hadoop.yarn.service.api.ServiceApiConstants.CONTAINER_ID;
/**
* This is a factoring out of methods handy for providers. It's bonded to a log
@ -160,9 +164,11 @@ public class ProviderUtils implements YarnServiceConstants {
}
public static Path initCompInstanceDir(SliderFileSystem fs,
ContainerLaunchService.ComponentLaunchContext compLaunchContext,
ComponentInstance instance) {
Path compDir = new Path(new Path(fs.getAppDir(), "components"),
instance.getCompName());
compLaunchContext.getServiceVersion() + "/" +
compLaunchContext.getName());
Path compInstanceDir = new Path(compDir, instance.getCompInstanceName());
instance.setCompInstanceDir(compInstanceDir);
return compInstanceDir;
@ -171,10 +177,11 @@ public class ProviderUtils implements YarnServiceConstants {
// 1. Create all config files for a component on hdfs for localization
// 2. Add the config file to localResource
public static synchronized void createConfigFileAndAddLocalResource(
AbstractLauncher launcher, SliderFileSystem fs, Component component,
AbstractLauncher launcher, SliderFileSystem fs,
ContainerLaunchService.ComponentLaunchContext compLaunchContext,
Map<String, String> tokensForSubstitution, ComponentInstance instance,
ServiceContext context) throws IOException {
Path compInstanceDir = initCompInstanceDir(fs, instance);
Path compInstanceDir = initCompInstanceDir(fs, compLaunchContext, instance);
if (!fs.getFileSystem().exists(compInstanceDir)) {
log.info(instance.getCompInstanceId() + ": Creating dir on hdfs: " + compInstanceDir);
fs.getFileSystem().mkdirs(compInstanceDir,
@ -189,7 +196,8 @@ public class ProviderUtils implements YarnServiceConstants {
+ tokensForSubstitution);
}
for (ConfigFile originalFile : component.getConfiguration().getFiles()) {
for (ConfigFile originalFile : compLaunchContext.getConfiguration()
.getFiles()) {
ConfigFile configFile = originalFile.copy();
String fileName = new Path(configFile.getDestFile()).getName();
@ -343,11 +351,12 @@ public class ProviderUtils implements YarnServiceConstants {
* @return tokens to replace
*/
public static Map<String, String> initCompTokensForSubstitute(
ComponentInstance instance, Container container) {
ComponentInstance instance, Container container,
ContainerLaunchService.ComponentLaunchContext componentLaunchContext) {
Map<String, String> tokens = new HashMap<>();
tokens.put(COMPONENT_NAME, instance.getCompSpec().getName());
tokens.put(COMPONENT_NAME, componentLaunchContext.getName());
tokens
.put(COMPONENT_NAME_LC, instance.getCompSpec().getName().toLowerCase());
.put(COMPONENT_NAME_LC, componentLaunchContext.getName().toLowerCase());
tokens.put(COMPONENT_INSTANCE_NAME, instance.getCompInstanceName());
tokens.put(CONTAINER_ID, container.getId().toString());
tokens.put(COMPONENT_ID,

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.service.utils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -27,12 +29,13 @@ import org.apache.hadoop.registry.client.api.RegistryConstants;
import org.apache.hadoop.registry.client.binding.RegistryUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.Artifact;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Configuration;
import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
import org.apache.hadoop.yarn.service.api.records.Resource;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.exceptions.SliderException;
import org.apache.hadoop.yarn.service.conf.RestApiConstants;
import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages;
@ -66,6 +69,8 @@ public class ServiceApiUtil {
private static final PatternValidator userNamePattern
= new PatternValidator("[a-z][a-z0-9-.]*");
@VisibleForTesting
public static void setJsonSerDeser(JsonSerDeser jsd) {
jsonSerDeser = jsd;
@ -496,6 +501,47 @@ public class ServiceApiUtil {
return appJson;
}
public static List<Container> getLiveContainers(Service service,
List<String> componentInstances)
throws YarnException {
List<Container> result = new ArrayList<>();
// In order to avoid iterating over all the containers of all components,
// first find the affected components by parsing the instance name.
Multimap<String, String> affectedComps = ArrayListMultimap.create();
for (String instanceName : componentInstances) {
affectedComps.put(
ServiceApiUtil.parseComponentName(instanceName), instanceName);
}
service.getComponents().forEach(comp -> {
// Iterating once over the containers of the affected component to
// find all the containers. Avoiding multiple calls to
// service.getComponent(...) and component.getContainer(...) because they
// iterate over all the components of the service and all the containers
// of the components respectively.
if (affectedComps.get(comp.getName()) != null) {
Collection<String> instanceNames = affectedComps.get(comp.getName());
comp.getContainers().forEach(container -> {
if (instanceNames.contains(container.getComponentInstanceName())) {
result.add(container);
}
});
}
});
return result;
}
private static String parseComponentName(String componentInstanceName)
throws YarnException {
int idx = componentInstanceName.lastIndexOf('-');
if (idx == -1) {
throw new YarnException("Invalid component instance (" +
componentInstanceName + ") name.");
}
return componentInstanceName.substring(0, idx);
}
public static String $(String s) {
return "${" + s +"}";
}

View File

@ -30,6 +30,8 @@ service ClientAMProtocolService {
returns (UpgradeServiceResponseProto);
rpc restartService(RestartServiceRequestProto)
returns (RestartServiceResponseProto);
rpc upgrade(CompInstancesUpgradeRequestProto) returns
(CompInstancesUpgradeResponseProto);
}
message FlexComponentsRequestProto {
@ -61,13 +63,22 @@ message StopResponseProto {
message UpgradeServiceRequestProto {
optional string version = 1;
optional bool autoFinalize = 2;
}
message UpgradeServiceResponseProto {
optional string error = 1;
}
message RestartServiceRequestProto {
}
message RestartServiceResponseProto {
}
message CompInstancesUpgradeRequestProto {
repeated string containerIds = 1;
}
message CompInstancesUpgradeResponseProto {
}

View File

@ -108,6 +108,7 @@ public class TestServiceAM extends ServiceTestUtils{
ApplicationId applicationId = ApplicationId.newInstance(123456, 1);
Service exampleApp = new Service();
exampleApp.setId(applicationId.toString());
exampleApp.setVersion("v1");
exampleApp.setName("testContainerCompleted");
exampleApp.addComponent(createComponent("compa", 1, "pwd"));
@ -146,6 +147,7 @@ public class TestServiceAM extends ServiceTestUtils{
System.currentTimeMillis(), 1);
Service exampleApp = new Service();
exampleApp.setId(applicationId.toString());
exampleApp.setVersion("v1");
exampleApp.setName("testContainersRecovers");
String comp1Name = "comp1";
String comp1InstName = "comp1-0";
@ -189,6 +191,7 @@ public class TestServiceAM extends ServiceTestUtils{
Service exampleApp = new Service();
exampleApp.setId(applicationId.toString());
exampleApp.setName("testContainersRecovers");
exampleApp.setVersion("v1");
String comp1Name = "comp1";
String comp1InstName = "comp1-0";
@ -230,6 +233,7 @@ public class TestServiceAM extends ServiceTestUtils{
Service exampleApp = new Service();
exampleApp.setId(applicationId.toString());
exampleApp.setName("testContainersFromDifferentApp");
exampleApp.setVersion("v1");
String comp1Name = "comp1";
String comp1InstName = "comp1-0";
@ -270,6 +274,7 @@ public class TestServiceAM extends ServiceTestUtils{
Service exampleApp = new Service();
exampleApp.setId(applicationId.toString());
exampleApp.setName("testScheduleWithMultipleResourceTypes");
exampleApp.setVersion("v1");
List<ResourceTypeInfo> resourceTypeInfos = new ArrayList<>(
ResourceUtils.getResourcesTypeInfo());

View File

@ -49,7 +49,7 @@ public class TestServiceManager {
@Test
public void testUpgrade() throws IOException, SliderException {
ServiceManager serviceManager = createTestServiceManager("testUpgrade");
upgrade(serviceManager, "v2", false);
upgrade(serviceManager, "v2", false, false);
Assert.assertEquals("service not upgraded", ServiceState.UPGRADING,
serviceManager.getServiceSpec().getState());
}
@ -57,8 +57,9 @@ public class TestServiceManager {
@Test
public void testRestartNothingToUpgrade()
throws IOException, SliderException {
ServiceManager serviceManager = createTestServiceManager("testRestart");
upgrade(serviceManager, "v2", false);
ServiceManager serviceManager = createTestServiceManager(
"testRestartNothingToUpgrade");
upgrade(serviceManager, "v2", false, false);
//make components stable
serviceManager.getServiceSpec().getComponents().forEach(comp -> {
@ -69,22 +70,119 @@ public class TestServiceManager {
serviceManager.getServiceSpec().getState());
}
@Test
public void testAutoFinalizeNothingToUpgrade() throws IOException,
SliderException {
ServiceManager serviceManager = createTestServiceManager(
"testAutoFinalizeNothingToUpgrade");
upgrade(serviceManager, "v2", false, true);
//make components stable
serviceManager.getServiceSpec().getComponents().forEach(comp ->
comp.setState(ComponentState.STABLE));
serviceManager.handle(new ServiceEvent(ServiceEventType.CHECK_STABLE));
Assert.assertEquals("service stable", ServiceState.STABLE,
serviceManager.getServiceSpec().getState());
}
@Test
public void testRestartWithPendingUpgrade()
throws IOException, SliderException {
ServiceManager serviceManager = createTestServiceManager("testRestart");
upgrade(serviceManager, "v2", true);
upgrade(serviceManager, "v2", true, false);
serviceManager.handle(new ServiceEvent(ServiceEventType.START));
Assert.assertEquals("service should still be upgrading",
ServiceState.UPGRADING, serviceManager.getServiceSpec().getState());
}
@Test
public void testCheckState() throws IOException, SliderException {
ServiceManager serviceManager = createTestServiceManager(
"testCheckState");
upgrade(serviceManager, "v2", true, false);
Assert.assertEquals("service not upgrading", ServiceState.UPGRADING,
serviceManager.getServiceSpec().getState());
private void upgrade(ServiceManager service, String version,
boolean upgradeArtifact)
// make components stable
serviceManager.getServiceSpec().getComponents().forEach(comp -> {
comp.setState(ComponentState.STABLE);
});
ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE);
serviceManager.handle(checkStable);
Assert.assertEquals("service should still be upgrading",
ServiceState.UPGRADING, serviceManager.getServiceSpec().getState());
// finalize service
ServiceEvent restart = new ServiceEvent(ServiceEventType.START);
serviceManager.handle(restart);
Assert.assertEquals("service not stable",
ServiceState.STABLE, serviceManager.getServiceSpec().getState());
validateUpgradeFinalization(serviceManager.getName(), "v2");
}
@Test
public void testCheckStateAutoFinalize() throws IOException, SliderException {
ServiceManager serviceManager = createTestServiceManager(
"testCheckState");
serviceManager.getServiceSpec().setState(
ServiceState.UPGRADING_AUTO_FINALIZE);
upgrade(serviceManager, "v2", true, true);
Assert.assertEquals("service not upgrading",
ServiceState.UPGRADING_AUTO_FINALIZE,
serviceManager.getServiceSpec().getState());
// make components stable
serviceManager.getServiceSpec().getComponents().forEach(comp ->
comp.setState(ComponentState.STABLE));
ServiceEvent checkStable = new ServiceEvent(ServiceEventType.CHECK_STABLE);
serviceManager.handle(checkStable);
Assert.assertEquals("service not stable",
ServiceState.STABLE, serviceManager.getServiceSpec().getState());
validateUpgradeFinalization(serviceManager.getName(), "v2");
}
@Test
public void testInvalidUpgrade() throws IOException, SliderException {
ServiceManager serviceManager = createTestServiceManager(
"testInvalidUpgrade");
serviceManager.getServiceSpec().setState(
ServiceState.UPGRADING_AUTO_FINALIZE);
Service upgradedDef = ServiceTestUtils.createExampleApplication();
upgradedDef.setName(serviceManager.getName());
upgradedDef.setVersion("v2");
upgradedDef.setLifetime(2L);
writeUpgradedDef(upgradedDef);
try {
serviceManager.processUpgradeRequest("v2", true);
} catch (Exception ex) {
Assert.assertTrue(ex instanceof UnsupportedOperationException);
return;
}
Assert.fail();
}
private void validateUpgradeFinalization(String serviceName,
String expectedVersion) throws IOException {
Service savedSpec = ServiceApiUtil.loadService(rule.getFs(), serviceName);
Assert.assertEquals("service def not re-written", expectedVersion,
savedSpec.getVersion());
Assert.assertNotNull("app id not present", savedSpec.getId());
Assert.assertEquals("state not stable", ServiceState.STABLE,
savedSpec.getState());
savedSpec.getComponents().forEach(compSpec -> {
Assert.assertEquals("comp not stable", ComponentState.STABLE,
compSpec.getState());
});
}
private void upgrade(ServiceManager serviceManager, String version,
boolean upgradeArtifact, boolean autoFinalize)
throws IOException, SliderException {
Service upgradedDef = ServiceTestUtils.createExampleApplication();
upgradedDef.setName(service.getName());
upgradedDef.setName(serviceManager.getName());
upgradedDef.setVersion(version);
if (upgradeArtifact) {
Artifact upgradedArtifact = createTestArtifact("2");
@ -93,9 +191,13 @@ public class TestServiceManager {
});
}
writeUpgradedDef(upgradedDef);
serviceManager.processUpgradeRequest(version, autoFinalize);
ServiceEvent upgradeEvent = new ServiceEvent(ServiceEventType.UPGRADE);
upgradeEvent.setVersion("v2");
service.handle(upgradeEvent);
upgradeEvent.setVersion(version);
if (autoFinalize) {
upgradeEvent.setAutoFinalize(true);
}
serviceManager.handle(upgradeEvent);
}
private ServiceManager createTestServiceManager(String name)
@ -124,7 +226,7 @@ public class TestServiceManager {
return new ServiceManager(context);
}
static Service createBaseDef(String name) {
public static Service createBaseDef(String name) {
ApplicationId applicationId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
Service serviceDef = ServiceTestUtils.createExampleApplication();

View File

@ -31,9 +31,9 @@ import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
@ -372,25 +372,47 @@ public class TestYarnNativeServices extends ServiceTestUtils {
}
@Test(timeout = 200000)
public void testUpgradeService() throws Exception {
public void testUpgrade() throws Exception {
setupInternal(NUM_NMS);
ServiceClient client = createClient(getConf());
Service service = createExampleApplication();
client.actionCreate(service);
waitForServiceToBeStarted(client, service);
waitForServiceToBeStable(client, service);
//upgrade the service
// upgrade the service
Component component = service.getComponents().iterator().next();
service.setState(ServiceState.UPGRADING);
service.setVersion("v2");
client.actionUpgrade(service);
component.getConfiguration().getEnv().put("key1", "val1");
client.initiateUpgrade(service);
//wait for service to be in upgrade state
// wait for service to be in upgrade state
waitForServiceToBeInState(client, service, ServiceState.UPGRADING);
SliderFileSystem fs = new SliderFileSystem(getConf());
Service fromFs = ServiceApiUtil.loadServiceUpgrade(fs,
service.getName(), service.getVersion());
Assert.assertEquals(service.getName(), fromFs.getName());
Assert.assertEquals(service.getVersion(), fromFs.getVersion());
// upgrade containers
Service liveService = client.getStatus(service.getName());
client.actionUpgrade(service,
liveService.getComponent(component.getName()).getContainers());
waitForAllCompToBeReady(client, service);
// finalize the upgrade
client.actionStart(service.getName());
waitForServiceToBeStable(client, service);
Service active = client.getStatus(service.getName());
Assert.assertEquals("component not stable", ComponentState.STABLE,
active.getComponent(component.getName()).getState());
Assert.assertEquals("comp does not have new env", "val1",
active.getComponent(component.getName()).getConfiguration()
.getEnv("key1"));
LOG.info("Stop/destroy service {}", service);
client.actionStop(service.getName(), true);
client.actionDestroy(service.getName());
}
// Test to verify ANTI_AFFINITY placement policy

View File

@ -21,9 +21,9 @@ package org.apache.hadoop.yarn.service.client;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.client.api.AppAdminClient;
import org.apache.hadoop.yarn.client.cli.ApplicationCLI;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.conf.ExampleAppJson;
@ -36,12 +36,15 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.io.PrintStream;
import java.util.List;
import static org.apache.hadoop.yarn.client.api.AppAdminClient.YARN_APP_ADMIN_CLIENT_PREFIX;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.YARN_SERVICE_BASE_PATH;
import static org.mockito.Mockito.spy;
public class TestServiceCLI {
private static final Logger LOG = LoggerFactory.getLogger(TestServiceCLI
@ -51,33 +54,36 @@ public class TestServiceCLI {
private File basedir;
private SliderFileSystem fs;
private String basedirProp;
private ApplicationCLI cli;
private void runCLI(String[] args) throws Exception {
LOG.info("running CLI: yarn {}", Arrays.asList(args));
ApplicationCLI cli = new ApplicationCLI();
cli.setSysOutPrintStream(System.out);
cli.setSysErrPrintStream(System.err);
int res = ToolRunner.run(cli, ApplicationCLI.preProcessArgs(args));
cli.stop();
private void createCLI() {
cli = new ApplicationCLI();
PrintStream sysOut = spy(new PrintStream(new ByteArrayOutputStream()));
PrintStream sysErr = spy(new PrintStream(new ByteArrayOutputStream()));
cli.setSysOutPrintStream(sysOut);
cli.setSysErrPrintStream(sysErr);
conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE,
DummyServiceClient.class.getName());
cli.setConf(conf);
}
private void buildApp(String serviceName, String appDef) throws Throwable {
String[] args = {"app",
"-D", basedirProp, "-save", serviceName,
ExampleAppJson.resourceName(appDef),
"-appTypes", AppAdminClient.UNIT_TEST_TYPE};
runCLI(args);
"-appTypes", DUMMY_APP_TYPE};
ToolRunner.run(cli, ApplicationCLI.preProcessArgs(args));
}
private void buildApp(String serviceName, String appDef, String lifetime,
String queue) throws Throwable {
private void buildApp(String serviceName, String appDef,
String lifetime, String queue) throws Throwable {
String[] args = {"app",
"-D", basedirProp, "-save", serviceName,
ExampleAppJson.resourceName(appDef),
"-appTypes", AppAdminClient.UNIT_TEST_TYPE,
"-appTypes", DUMMY_APP_TYPE,
"-updateLifetime", lifetime,
"-changeQueue", queue};
runCLI(args);
ToolRunner.run(cli, ApplicationCLI.preProcessArgs(args));
}
@Before
@ -91,6 +97,7 @@ public class TestServiceCLI {
} else {
basedir.mkdirs();
}
createCLI();
}
@After
@ -98,6 +105,7 @@ public class TestServiceCLI {
if (basedir != null) {
FileUtils.deleteDirectory(basedir);
}
cli.stop();
}
@Test
@ -114,6 +122,38 @@ public class TestServiceCLI {
checkApp(serviceName, "master", 1L, 1000L, "qname");
}
@Test
public void testInitiateServiceUpgrade() throws Exception {
String[] args = {"app", "-upgrade", "app-1",
"-initiate", ExampleAppJson.resourceName(ExampleAppJson.APP_JSON),
"-appTypes", DUMMY_APP_TYPE};
int result = cli.run(ApplicationCLI.preProcessArgs(args));
Assert.assertEquals(result, 0);
}
@Test
public void testInitiateAutoFinalizeServiceUpgrade() throws Exception {
String[] args = {"app", "-upgrade", "app-1",
"-initiate", ExampleAppJson.resourceName(ExampleAppJson.APP_JSON),
"-autoFinalize",
"-appTypes", DUMMY_APP_TYPE};
int result = cli.run(ApplicationCLI.preProcessArgs(args));
Assert.assertEquals(result, 0);
}
@Test
public void testUpgradeInstances() throws Exception {
conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE,
DummyServiceClient.class.getName());
cli.setConf(conf);
String[] args = {"app", "-upgrade", "app-1",
"-instances", "comp1-0,comp1-1",
"-appTypes", DUMMY_APP_TYPE};
int result = cli.run(ApplicationCLI.preProcessArgs(args));
Assert.assertEquals(result, 0);
}
private void checkApp(String serviceName, String compName, long count, Long
lifetime, String queue) throws IOException {
Service service = ServiceApiUtil.loadService(fs, serviceName);
@ -130,4 +170,24 @@ public class TestServiceCLI {
}
Assert.fail();
}
private static final String DUMMY_APP_TYPE = "dummy";
/**
* Dummy service client for test purpose.
*/
public static class DummyServiceClient extends ServiceClient {
@Override
public int initiateUpgrade(String appName, String fileName,
boolean autoFinalize) throws IOException, YarnException {
return 0;
}
@Override
public int actionUpgradeInstances(String appName,
List<String> componentInstances) throws IOException, YarnException {
return 0;
}
}
}

View File

@ -24,17 +24,29 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.ClientAMProtocol;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
@ -47,79 +59,152 @@ import static org.mockito.Mockito.when;
*/
public class TestServiceClient {
private static final Logger LOG = LoggerFactory.getLogger(
TestServiceClient.class);
@Rule
public ServiceTestUtils.ServiceFSWatcher rule =
new ServiceTestUtils.ServiceFSWatcher();
@Test
public void testActionUpgrade() throws Exception {
ApplicationId applicationId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
ServiceClient client = createServiceClient(applicationId);
Service service = ServiceTestUtils.createExampleApplication();
service.setVersion("v1");
client.actionCreate(service);
public void testActionServiceUpgrade() throws Exception {
Service service = createService();
ServiceClient client = MockServiceClient.create(rule, service);
//upgrade the service
service.setVersion("v2");
client.actionUpgrade(service);
client.initiateUpgrade(service);
//wait for service to be in upgrade state
Service fromFs = ServiceApiUtil.loadServiceUpgrade(rule.getFs(),
service.getName(), service.getVersion());
Assert.assertEquals(service.getName(), fromFs.getName());
Assert.assertEquals(service.getVersion(), fromFs.getVersion());
client.stop();
}
@Test
public void testActionCompInstanceUpgrade() throws Exception {
Service service = createService();
MockServiceClient client = MockServiceClient.create(rule, service);
private ServiceClient createServiceClient(ApplicationId applicationId)
throws Exception {
ClientAMProtocol amProxy = mock(ClientAMProtocol.class);
YarnClient yarnClient = createMockYarnClient();
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(
applicationId, 1);
ApplicationAttemptReport attemptReport =
ApplicationAttemptReport.newInstance(attemptId, "localhost", 0,
null, null, null,
YarnApplicationAttemptState.RUNNING, null);
//upgrade the service
service.setVersion("v2");
client.initiateUpgrade(service);
ApplicationReport appReport = mock(ApplicationReport.class);
when(appReport.getHost()).thenReturn("localhost");
//add containers to the component that needs to be upgraded.
Component comp = service.getComponents().iterator().next();
ContainerId containerId = ContainerId.newContainerId(client.attemptId, 1L);
comp.addContainer(new Container().id(containerId.toString()));
when(yarnClient.getApplicationAttemptReport(Matchers.any()))
.thenReturn(attemptReport);
when(yarnClient.getApplicationReport(applicationId)).thenReturn(appReport);
ServiceClient client = new ServiceClient() {
@Override
protected void serviceInit(Configuration configuration) throws Exception {
}
@Override
protected ClientAMProtocol createAMProxy(String serviceName,
ApplicationReport appReport) throws IOException, YarnException {
return amProxy;
}
@Override
ApplicationId submitApp(Service app) throws IOException, YarnException {
return applicationId;
}
};
client.setFileSystem(rule.getFs());
client.setYarnClient(yarnClient);
client.init(rule.getConf());
client.start();
return client;
client.actionUpgrade(service, comp.getContainers());
CompInstancesUpgradeResponseProto response = client.getLastProxyResponse(
CompInstancesUpgradeResponseProto.class);
Assert.assertNotNull("upgrade did not complete", response);
client.stop();
}
private YarnClient createMockYarnClient() throws IOException, YarnException {
private Service createService() throws IOException,
YarnException {
Service service = ServiceTestUtils.createExampleApplication();
service.setVersion("v1");
service.setState(ServiceState.UPGRADING);
return service;
}
private static final class MockServiceClient extends ServiceClient {
private final ApplicationId appId;
private final ApplicationAttemptId attemptId;
private final ClientAMProtocol amProxy;
private Object proxyResponse;
private Service service;
private MockServiceClient() {
amProxy = mock(ClientAMProtocol.class);
appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
LOG.debug("mocking service client for {}", appId);
attemptId = ApplicationAttemptId.newInstance(appId, 1);
}
static MockServiceClient create(ServiceTestUtils.ServiceFSWatcher rule,
Service service)
throws IOException, YarnException {
MockServiceClient client = new MockServiceClient();
YarnClient yarnClient = createMockYarnClient();
ApplicationReport appReport = mock(ApplicationReport.class);
when(appReport.getHost()).thenReturn("localhost");
when(appReport.getYarnApplicationState()).thenReturn(
YarnApplicationState.RUNNING);
ApplicationAttemptReport attemptReport =
ApplicationAttemptReport.newInstance(client.attemptId, "localhost", 0,
null, null, null,
YarnApplicationAttemptState.RUNNING, null);
when(yarnClient.getApplicationAttemptReport(Matchers.any()))
.thenReturn(attemptReport);
when(yarnClient.getApplicationReport(client.appId)).thenReturn(appReport);
when(client.amProxy.upgrade(
Matchers.any(UpgradeServiceRequestProto.class))).thenAnswer(
(Answer<UpgradeServiceResponseProto>) invocation -> {
UpgradeServiceResponseProto response =
UpgradeServiceResponseProto.newBuilder().build();
client.proxyResponse = response;
return response;
});
when(client.amProxy.upgrade(Matchers.any(
CompInstancesUpgradeRequestProto.class))).thenAnswer(
(Answer<CompInstancesUpgradeResponseProto>) invocation -> {
CompInstancesUpgradeResponseProto response =
CompInstancesUpgradeResponseProto.newBuilder().build();
client.proxyResponse = response;
return response;
});
client.setFileSystem(rule.getFs());
client.setYarnClient(yarnClient);
client.service = service;
client.init(rule.getConf());
client.start();
client.actionCreate(service);
return client;
}
@Override
protected void serviceInit(Configuration configuration) throws Exception {
}
@Override
protected ClientAMProtocol createAMProxy(String serviceName,
ApplicationReport appReport) throws IOException, YarnException {
return amProxy;
}
@Override
ApplicationId submitApp(Service app) throws IOException, YarnException {
return appId;
}
@Override
public Service getStatus(String serviceName) throws IOException,
YarnException {
service.setState(ServiceState.STABLE);
return service;
}
private <T> T getLastProxyResponse(Class<T> clazz) {
if (clazz.isInstance(proxyResponse)) {
return clazz.cast(proxyResponse);
}
return null;
}
}
private static YarnClient createMockYarnClient() throws IOException,
YarnException {
YarnClient yarnClient = mock(YarnClient.class);
when(yarnClient.getApplications(Matchers.any(GetApplicationsRequest.class)))
.thenReturn(new ArrayList<>());
when(yarnClient.getApplications(Matchers.any(
GetApplicationsRequest.class))).thenReturn(new ArrayList<>());
return yarnClient;
}
}

View File

@ -0,0 +1,265 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.component;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.ServiceScheduler;
import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.TestServiceManager;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Tests for {@link Component}.
*/
public class TestComponent {
@Rule
public ServiceTestUtils.ServiceFSWatcher rule =
new ServiceTestUtils.ServiceFSWatcher();
@Test
public void testComponentUpgrade() throws Exception {
ServiceContext context = createTestContext(rule, "testComponentUpgrade");
Component comp = context.scheduler.getAllComponents().entrySet().iterator()
.next().getValue();
ComponentEvent upgradeEvent = new ComponentEvent(comp.getName(),
ComponentEventType.UPGRADE);
comp.handle(upgradeEvent);
Assert.assertEquals("component not in need upgrade state",
ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
}
@Test
public void testCheckState() throws Exception {
String serviceName = "testCheckState";
ServiceContext context = createTestContext(rule, serviceName);
Component comp = context.scheduler.getAllComponents().entrySet().iterator()
.next().getValue();
comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE)
.setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1",
"val1")).setUpgradeVersion("v2"));
// one instance finished upgrading
comp.decContainersThatNeedUpgrade();
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in need upgrade state",
ComponentState.NEEDS_UPGRADE, comp.getComponentSpec().getState());
// second instance finished upgrading
comp.decContainersThatNeedUpgrade();
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in stable state",
ComponentState.STABLE, comp.getComponentSpec().getState());
Assert.assertEquals("component did not upgrade successfully", "val1",
comp.getComponentSpec().getConfiguration().getEnv("key1"));
}
@Test
public void testContainerCompletedWhenUpgrading() throws Exception {
String serviceName = "testContainerComplete";
ServiceContext context = createTestContext(rule, serviceName);
Component comp = context.scheduler.getAllComponents().entrySet().iterator()
.next().getValue();
comp.handle(new ComponentEvent(comp.getName(), ComponentEventType.UPGRADE)
.setTargetSpec(createSpecWithEnv(serviceName, comp.getName(), "key1",
"val1")).setUpgradeVersion("v2"));
comp.getAllComponentInstances().forEach(instance -> {
instance.handle(new ComponentInstanceEvent(
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE));
});
Iterator<ComponentInstance> instanceIter = comp.
getAllComponentInstances().iterator();
// reinitialization of a container failed
ContainerStatus status = mock(ContainerStatus.class);
when(status.getExitStatus()).thenReturn(ContainerExitStatus.ABORTED);
ComponentInstance instance = instanceIter.next();
ComponentEvent stopEvent = new ComponentEvent(comp.getName(),
ComponentEventType.CONTAINER_COMPLETED)
.setInstance(instance).setContainerId(instance.getContainer().getId())
.setStatus(status);
comp.handle(stopEvent);
instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
STOP).setStatus(status));
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in flexing state",
ComponentState.FLEXING, comp.getComponentSpec().getState());
// new container get allocated
assignNewContainer(context.attemptId, 10, context, comp);
// second instance finished upgrading
ComponentInstance instance2 = instanceIter.next();
instance2.handle(new ComponentInstanceEvent(
instance2.getContainer().getId(),
ComponentInstanceEventType.BECOME_READY));
comp.handle(new ComponentEvent(comp.getName(),
ComponentEventType.CHECK_STABLE));
Assert.assertEquals("component not in stable state",
ComponentState.STABLE, comp.getComponentSpec().getState());
Assert.assertEquals("component did not upgrade successfully", "val1",
comp.getComponentSpec().getConfiguration().getEnv("key1"));
}
private static org.apache.hadoop.yarn.service.api.records.Component
createSpecWithEnv(String serviceName, String compName, String key,
String val) {
Service service = TestServiceManager.createBaseDef(serviceName);
org.apache.hadoop.yarn.service.api.records.Component spec =
service.getComponent(compName);
spec.getConfiguration().getEnv().put(key, val);
return spec;
}
public static ServiceContext createTestContext(
ServiceTestUtils.ServiceFSWatcher fsWatcher, String serviceName)
throws Exception {
ServiceContext context = new ServiceContext();
context.service = TestServiceManager.createBaseDef(serviceName);
context.fs = fsWatcher.getFs();
ContainerLaunchService mockLaunchService = mock(
ContainerLaunchService.class);
context.scheduler = new ServiceScheduler(context) {
@Override
protected YarnRegistryViewForProviders createYarnRegistryOperations(
ServiceContext context, RegistryOperations registryClient) {
return mock(YarnRegistryViewForProviders.class);
}
@Override
public NMClientAsync createNMClient() {
NMClientAsync nmClientAsync = super.createNMClient();
NMClient nmClient = mock(NMClient.class);
try {
when(nmClient.getContainerStatus(anyObject(), anyObject()))
.thenAnswer((Answer<ContainerStatus>) invocation ->
ContainerStatus.newInstance(
(ContainerId) invocation.getArguments()[0],
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
"", 0));
} catch (YarnException | IOException e) {
throw new RuntimeException(e);
}
nmClientAsync.setClient(nmClient);
return nmClientAsync;
}
@Override
public ContainerLaunchService getContainerLaunchService() {
return mockLaunchService;
}
};
context.scheduler.init(fsWatcher.getConf());
doNothing().when(mockLaunchService).
reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject());
stabilizeComponents(context);
return context;
}
private static void stabilizeComponents(ServiceContext context) {
ApplicationId appId = ApplicationId.fromString(context.service.getId());
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
context.attemptId = attemptId;
Map<String, Component>
componentState = context.scheduler.getAllComponents();
for (org.apache.hadoop.yarn.service.api.records.Component componentSpec :
context.service.getComponents()) {
Component component = new org.apache.hadoop.yarn.service.component.
Component(componentSpec, 1L, context);
componentState.put(component.getName(), component);
component.handle(new ComponentEvent(component.getName(),
ComponentEventType.FLEX));
for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) {
assignNewContainer(attemptId, i + 1, context, component);
}
component.handle(new ComponentEvent(component.getName(),
ComponentEventType.CHECK_STABLE));
}
}
private static void assignNewContainer(
ApplicationAttemptId attemptId, long containerNum,
ServiceContext context, Component component) {
Container container = org.apache.hadoop.yarn.api.records.Container
.newInstance(ContainerId.newContainerId(attemptId, containerNum),
NODE_ID, "localhost", null, null,
null);
component.handle(new ComponentEvent(component.getName(),
ComponentEventType.CONTAINER_ALLOCATED)
.setContainer(container).setContainerId(container.getId()));
ComponentInstance instance = context.scheduler.getLiveInstances().get(
container.getId());
ComponentInstanceEvent startEvent = new ComponentInstanceEvent(
container.getId(), ComponentInstanceEventType.START);
instance.handle(startEvent);
ComponentInstanceEvent readyEvent = new ComponentInstanceEvent(
container.getId(), ComponentInstanceEventType.BECOME_READY);
instance.handle(readyEvent);
}
private static final NodeId NODE_ID = NodeId.fromString("localhost:0");
}

View File

@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.component.instance;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.ComponentEventType;
import org.apache.hadoop.yarn.service.component.TestComponent;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
/**
* Tests for {@link ComponentInstance}.
*/
public class TestComponentInstance {
@Rule
public ServiceTestUtils.ServiceFSWatcher rule =
new ServiceTestUtils.ServiceFSWatcher();
@Test
public void testContainerUpgrade() throws Exception {
ServiceContext context = TestComponent.createTestContext(rule,
"testContainerUpgrade");
Component component = context.scheduler.getAllComponents().entrySet()
.iterator().next().getValue();
upgradeComponent(component);
ComponentInstance instance = component.getAllComponentInstances()
.iterator().next();
ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent(
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
instance.handle(instanceEvent);
Container containerSpec = component.getComponentSpec().getContainer(
instance.getContainer().getId().toString());
Assert.assertEquals("instance not upgrading",
ContainerState.UPGRADING, containerSpec.getState());
}
@Test
public void testContainerReadyAfterUpgrade() throws Exception {
ServiceContext context = TestComponent.createTestContext(rule,
"testContainerStarted");
Component component = context.scheduler.getAllComponents().entrySet()
.iterator().next().getValue();
upgradeComponent(component);
ComponentInstance instance = component.getAllComponentInstances()
.iterator().next();
ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent(
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
instance.handle(instanceEvent);
instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
ComponentInstanceEventType.BECOME_READY));
Assert.assertEquals("instance not ready",
ContainerState.READY, instance.getCompSpec().getContainer(
instance.getContainer().getId().toString()).getState());
}
private void upgradeComponent(Component component) {
component.handle(new ComponentEvent(component.getName(),
ComponentEventType.UPGRADE)
.setTargetSpec(component.getComponentSpec()).setUpgradeVersion("v2"));
}
}

View File

@ -81,6 +81,7 @@ public class TestServiceMonitor extends ServiceTestUtils {
public void testComponentDependency() throws Exception{
ApplicationId applicationId = ApplicationId.newInstance(123456, 1);
Service exampleApp = new Service();
exampleApp.setVersion("v1");
exampleApp.setId(applicationId.toString());
exampleApp.setName("testComponentDependency");
exampleApp.addComponent(createComponent("compa", 1, "sleep 1000"));

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
@ -231,18 +232,30 @@ public abstract class AppAdminClient extends CompositeService {
IOException, YarnException;
/**
* Upgrade a long running service.
*
* @param appName the name of the application
* @param fileName specification of application upgrade to save.
* Initiate upgrade of a long running service.
*
* @param appName the name of the application.
* @param fileName specification of application upgrade to save.
* @param autoFinalize when true, finalization of upgrade will be done
* automatically.
* @return exit code
* @throws IOException IOException
* @throws IOException IOException
* @throws YarnException exception in client or server
*/
@Public
@Unstable
public abstract int actionUpgrade(String appName, String fileName)
throws IOException, YarnException;
public abstract int initiateUpgrade(String appName, String fileName,
boolean autoFinalize) throws IOException, YarnException;
/**
* Upgrade component instances of a long running service.
*
* @param appName the name of the application.
* @param componentInstances the name of the component instances.
*/
@Public
@Unstable
public abstract int actionUpgradeInstances(String appName,
List<String> componentInstances) throws IOException, YarnException;
}

View File

@ -99,6 +99,11 @@ public class ApplicationCLI extends YarnCLI {
public static final String FLEX_CMD = "flex";
public static final String COMPONENT = "component";
public static final String ENABLE_FAST_LAUNCH = "enableFastLaunch";
public static final String UPGRADE_CMD = "upgrade";
public static final String UPGRADE_INITIATE = "initiate";
public static final String UPGRADE_AUTO_FINALIZE = "autoFinalize";
public static final String UPGRADE_FINALIZE = "finalize";
public static final String COMPONENT_INSTS = "instances";
private static String firstArg = null;
@ -236,6 +241,20 @@ public class ApplicationCLI extends YarnCLI {
"to HDFS to make future launches faster. Supports -appTypes option " +
"to specify which client implementation to use. Optionally a " +
"destination folder for the tarball can be specified.");
opts.addOption(UPGRADE_CMD, true, "Upgrades an application/long-" +
"running service. It requires either -initiate, -instances, or " +
"-finalize options.");
opts.addOption(UPGRADE_INITIATE, true, "Works with -upgrade option to " +
"initiate the application upgrade. It requires the upgraded " +
"application specification file.");
opts.addOption(COMPONENT_INSTS, true, "Works with -upgrade option to " +
"trigger the upgrade of specified component instances of the " +
"application.");
opts.addOption(UPGRADE_FINALIZE, false, "Works with -upgrade option to " +
"finalize the upgrade.");
opts.addOption(UPGRADE_AUTO_FINALIZE, false, "Works with -upgrade and " +
"-initiate options to initiate the upgrade of the application with " +
"the ability to finalize the upgrade automatically.");
opts.getOption(LAUNCH_CMD).setArgName("Application Name> <File Name");
opts.getOption(LAUNCH_CMD).setArgs(2);
opts.getOption(START_CMD).setArgName("Application Name");
@ -248,6 +267,13 @@ public class ApplicationCLI extends YarnCLI {
opts.getOption(COMPONENT).setArgs(2);
opts.getOption(ENABLE_FAST_LAUNCH).setOptionalArg(true);
opts.getOption(ENABLE_FAST_LAUNCH).setArgName("Destination Folder");
opts.getOption(UPGRADE_CMD).setArgName("Application Name");
opts.getOption(UPGRADE_CMD).setArgs(1);
opts.getOption(UPGRADE_INITIATE).setArgName("File Name");
opts.getOption(UPGRADE_INITIATE).setArgs(1);
opts.getOption(COMPONENT_INSTS).setArgName("Component Instances");
opts.getOption(COMPONENT_INSTS).setValueSeparator(',');
opts.getOption(COMPONENT_INSTS).setArgs(Option.UNLIMITED_VALUES);
} else if (title != null && title.equalsIgnoreCase(APPLICATION_ATTEMPT)) {
opts.addOption(STATUS_CMD, true,
"Prints the status of the application attempt.");
@ -546,6 +572,45 @@ public class ApplicationCLI extends YarnCLI {
}
moveApplicationAcrossQueues(cliParser.getOptionValue(APP_ID),
cliParser.getOptionValue(CHANGE_APPLICATION_QUEUE));
} else if (cliParser.hasOption(UPGRADE_CMD)) {
if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD, UPGRADE_INITIATE,
UPGRADE_AUTO_FINALIZE, UPGRADE_FINALIZE, COMPONENT_INSTS,
APP_TYPE_CMD)) {
printUsage(title, opts);
return exitCode;
}
String appType = getSingleAppTypeFromCLI(cliParser);
AppAdminClient client = AppAdminClient.createAppAdminClient(appType,
getConf());
String appName = cliParser.getOptionValue(UPGRADE_CMD);
if (cliParser.hasOption(UPGRADE_INITIATE)) {
if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD,
UPGRADE_INITIATE, UPGRADE_AUTO_FINALIZE, APP_TYPE_CMD)) {
printUsage(title, opts);
return exitCode;
}
String fileName = cliParser.getOptionValue(UPGRADE_INITIATE);
if (cliParser.hasOption(UPGRADE_AUTO_FINALIZE)) {
return client.initiateUpgrade(appName, fileName, true);
} else {
return client.initiateUpgrade(appName, fileName, false);
}
} else if (cliParser.hasOption(COMPONENT_INSTS)) {
if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD,
COMPONENT_INSTS, APP_TYPE_CMD)) {
printUsage(title, opts);
return exitCode;
}
String[] instances = cliParser.getOptionValues(COMPONENT_INSTS);
return client.actionUpgradeInstances(appName, Arrays.asList(instances));
} else if (cliParser.hasOption(UPGRADE_FINALIZE)) {
if (hasAnyOtherCLIOptions(cliParser, opts, UPGRADE_CMD,
UPGRADE_FINALIZE, APP_TYPE_CMD)) {
printUsage(title, opts);
return exitCode;
}
return client.actionStart(appName);
}
} else {
syserr.println("Invalid Command Usage : ");
printUsage(title, opts);

View File

@ -2124,6 +2124,11 @@ public class TestYarnCLI {
pw.println(" applications based on input");
pw.println(" comma-separated list of");
pw.println(" application types.");
pw.println(" -autoFinalize Works with -upgrade and");
pw.println(" -initiate options to initiate");
pw.println(" the upgrade of the application");
pw.println(" with the ability to finalize the");
pw.println(" upgrade automatically.");
pw.println(" -changeQueue <Queue Name> Moves application to a new");
pw.println(" queue. ApplicationId can be");
pw.println(" passed using 'appId' option.");
@ -2152,6 +2157,8 @@ public class TestYarnCLI {
pw.println(" Optionally a destination folder");
pw.println(" for the tarball can be");
pw.println(" specified.");
pw.println(" -finalize Works with -upgrade option to");
pw.println(" finalize the upgrade.");
pw.println(" -flex <Application Name or ID> Changes number of running");
pw.println(" containers for a component of an");
pw.println(" application / long-running");
@ -2165,6 +2172,15 @@ public class TestYarnCLI {
pw.println(" which client implementation to");
pw.println(" use.");
pw.println(" -help Displays help for all commands.");
pw.println(" -initiate <File Name> Works with -upgrade option to");
pw.println(" initiate the application");
pw.println(" upgrade. It requires the");
pw.println(" upgraded application");
pw.println(" specification file.");
pw.println(" -instances <Component Instances> Works with -upgrade option to");
pw.println(" trigger the upgrade of specified");
pw.println(" component instances of the");
pw.println(" application.");
pw.println(" -kill <Application ID> Kills the application. Set of");
pw.println(" applications can be provided");
pw.println(" separated with space");
@ -2232,6 +2248,11 @@ public class TestYarnCLI {
pw.println(" -updatePriority <Priority> update priority of an");
pw.println(" application. ApplicationId can");
pw.println(" be passed using 'appId' option.");
pw.println(" -upgrade <Application Name> Upgrades an");
pw.println(" application/long-running");
pw.println(" service. It requires either");
pw.println(" -initiate, -instances, or");
pw.println(" -finalize options.");
pw.close();
String appsHelpStr = baos.toString("UTF-8");
return appsHelpStr;