YARN-8299. Added CLI and REST API for query container status.
Contributed by Chandni Singh
This commit is contained in:
parent
efb4e274e5
commit
121865c3f9
|
@ -25,8 +25,10 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.UriBuilder;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Strings;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -48,10 +50,8 @@ import org.apache.hadoop.yarn.service.api.records.Service;
|
|||
import org.apache.hadoop.yarn.service.api.records.ServiceState;
|
||||
import org.apache.hadoop.yarn.service.api.records.ServiceStatus;
|
||||
import org.apache.hadoop.yarn.service.conf.RestApiConstants;
|
||||
import org.apache.hadoop.yarn.service.utils.JsonSerDeser;
|
||||
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
|
||||
import org.apache.hadoop.yarn.util.RMHAUtils;
|
||||
import org.codehaus.jackson.map.PropertyNamingStrategy;
|
||||
import org.eclipse.jetty.util.UrlEncoded;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -147,11 +147,7 @@ public class ApiServiceClient extends AppAdminClient {
|
|||
api.append("/");
|
||||
api.append(appName);
|
||||
}
|
||||
Configuration conf = getConfig();
|
||||
if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase("simple")) {
|
||||
api.append("?user.name=" + UrlEncoded
|
||||
.encodeString(System.getProperty("user.name")));
|
||||
}
|
||||
appendUserNameIfRequired(api);
|
||||
return api.toString();
|
||||
}
|
||||
|
||||
|
@ -162,15 +158,27 @@ public class ApiServiceClient extends AppAdminClient {
|
|||
api.append(url);
|
||||
api.append("/app/v1/services/").append(appName).append("/")
|
||||
.append(RestApiConstants.COMP_INSTANCES);
|
||||
Configuration conf = getConfig();
|
||||
if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase(
|
||||
"simple")) {
|
||||
api.append("?user.name=" + UrlEncoded
|
||||
.encodeString(System.getProperty("user.name")));
|
||||
}
|
||||
appendUserNameIfRequired(api);
|
||||
return api.toString();
|
||||
}
|
||||
|
||||
private String getInstancePath(String appName, List<String> components,
|
||||
String version, List<String> containerStates) throws IOException {
|
||||
UriBuilder builder = UriBuilder.fromUri(getInstancesPath(appName));
|
||||
if (components != null && !components.isEmpty()) {
|
||||
components.forEach(compName ->
|
||||
builder.queryParam(RestApiConstants.PARAM_COMP_NAME, compName));
|
||||
}
|
||||
if (!Strings.isNullOrEmpty(version)){
|
||||
builder.queryParam(RestApiConstants.PARAM_VERSION, version);
|
||||
}
|
||||
if (containerStates != null && !containerStates.isEmpty()){
|
||||
containerStates.forEach(state ->
|
||||
builder.queryParam(RestApiConstants.PARAM_CONTAINER_STATE, state));
|
||||
}
|
||||
return builder.build().toString();
|
||||
}
|
||||
|
||||
private String getComponentsPath(String appName) throws IOException {
|
||||
Preconditions.checkNotNull(appName);
|
||||
String url = getRMWebAddress();
|
||||
|
@ -178,13 +186,17 @@ public class ApiServiceClient extends AppAdminClient {
|
|||
api.append(url);
|
||||
api.append("/app/v1/services/").append(appName).append("/")
|
||||
.append(RestApiConstants.COMPONENTS);
|
||||
appendUserNameIfRequired(api);
|
||||
return api.toString();
|
||||
}
|
||||
|
||||
private void appendUserNameIfRequired(StringBuilder builder) {
|
||||
Configuration conf = getConfig();
|
||||
if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase(
|
||||
"simple")) {
|
||||
api.append("?user.name=" + UrlEncoded
|
||||
builder.append("?user.name=").append(UrlEncoded
|
||||
.encodeString(System.getProperty("user.name")));
|
||||
}
|
||||
return api.toString();
|
||||
}
|
||||
|
||||
private Builder getApiClient() throws IOException {
|
||||
|
@ -553,7 +565,7 @@ public class ApiServiceClient extends AppAdminClient {
|
|||
container.setState(ContainerState.UPGRADING);
|
||||
toUpgrade[idx++] = container;
|
||||
}
|
||||
String buffer = CONTAINER_JSON_SERDE.toJson(toUpgrade);
|
||||
String buffer = ServiceApiUtil.CONTAINER_JSON_SERDE.toJson(toUpgrade);
|
||||
ClientResponse response = getApiClient(getInstancesPath(appName))
|
||||
.put(ClientResponse.class, buffer);
|
||||
result = processResponse(response);
|
||||
|
@ -577,7 +589,7 @@ public class ApiServiceClient extends AppAdminClient {
|
|||
component.setState(ComponentState.UPGRADING);
|
||||
toUpgrade[idx++] = component;
|
||||
}
|
||||
String buffer = COMP_JSON_SERDE.toJson(toUpgrade);
|
||||
String buffer = ServiceApiUtil.COMP_JSON_SERDE.toJson(toUpgrade);
|
||||
ClientResponse response = getApiClient(getComponentsPath(appName))
|
||||
.put(ClientResponse.class, buffer);
|
||||
result = processResponse(response);
|
||||
|
@ -599,11 +611,25 @@ public class ApiServiceClient extends AppAdminClient {
|
|||
return result;
|
||||
}
|
||||
|
||||
private static final JsonSerDeser<Container[]> CONTAINER_JSON_SERDE =
|
||||
new JsonSerDeser<>(Container[].class,
|
||||
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
|
||||
|
||||
private static final JsonSerDeser<Component[]> COMP_JSON_SERDE =
|
||||
new JsonSerDeser<>(Component[].class,
|
||||
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
|
||||
@Override
|
||||
public String getInstances(String appName, List<String> components,
|
||||
String version, List<String> containerStates) throws IOException,
|
||||
YarnException {
|
||||
try {
|
||||
String uri = getInstancePath(appName, components, version,
|
||||
containerStates);
|
||||
ClientResponse response = getApiClient(uri).get(ClientResponse.class);
|
||||
if (response.getStatus() != 200) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("Failed: HTTP error code: ");
|
||||
sb.append(response.getStatus());
|
||||
sb.append(" ErrorMsg: ").append(response.getEntity(String.class));
|
||||
return sb.toString();
|
||||
}
|
||||
return response.getEntity(String.class);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Fail to get containers {}", e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,14 +44,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.ws.rs.Consumes;
|
||||
import javax.ws.rs.DELETE;
|
||||
import javax.ws.rs.GET;
|
||||
import javax.ws.rs.POST;
|
||||
import javax.ws.rs.PUT;
|
||||
import javax.ws.rs.Path;
|
||||
import javax.ws.rs.PathParam;
|
||||
import javax.ws.rs.Produces;
|
||||
import javax.ws.rs.*;
|
||||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.Response;
|
||||
|
@ -61,13 +54,7 @@ import java.io.FileNotFoundException;
|
|||
import java.io.IOException;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED;
|
||||
|
@ -582,6 +569,40 @@ public class ApiServer {
|
|||
return Response.status(Status.NO_CONTENT).build();
|
||||
}
|
||||
|
||||
@GET
|
||||
@Path(COMP_INSTANCES_PATH)
|
||||
@Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8})
|
||||
public Response getComponentInstances(@Context HttpServletRequest request,
|
||||
@PathParam(SERVICE_NAME) String serviceName,
|
||||
@QueryParam(PARAM_COMP_NAME) List<String> componentNames,
|
||||
@QueryParam(PARAM_VERSION) String version,
|
||||
@QueryParam(PARAM_CONTAINER_STATE) List<String> containerStates) {
|
||||
try {
|
||||
UserGroupInformation ugi = getProxyUser(request);
|
||||
LOG.info("GET: component instances for service = {}, compNames in {}, " +
|
||||
"version = {}, containerStates in {}, user = {}", serviceName,
|
||||
Objects.toString(componentNames, "[]"), Objects.toString(version, ""),
|
||||
Objects.toString(containerStates, "[]"), ugi);
|
||||
|
||||
List<ContainerState> containerStatesDe = containerStates.stream().map(
|
||||
ContainerState::valueOf).collect(Collectors.toList());
|
||||
|
||||
return Response.ok(getContainers(ugi, serviceName, componentNames,
|
||||
version, containerStatesDe)).build();
|
||||
} catch (IllegalArgumentException iae) {
|
||||
return formatResponse(Status.BAD_REQUEST, "valid container states are: " +
|
||||
Arrays.toString(ContainerState.values()));
|
||||
} catch (AccessControlException e) {
|
||||
return formatResponse(Response.Status.FORBIDDEN, e.getMessage());
|
||||
} catch (IOException | InterruptedException e) {
|
||||
return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
|
||||
e.getMessage());
|
||||
} catch (UndeclaredThrowableException e) {
|
||||
return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
|
||||
e.getCause().getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
private Response flexService(Service service, UserGroupInformation ugi)
|
||||
throws IOException, InterruptedException {
|
||||
String appName = service.getName();
|
||||
|
@ -752,6 +773,22 @@ public class ApiServer {
|
|||
});
|
||||
}
|
||||
|
||||
private Container[] getContainers(UserGroupInformation ugi,
|
||||
String serviceName, List<String> componentNames, String version,
|
||||
List<ContainerState> containerStates) throws IOException,
|
||||
InterruptedException {
|
||||
return ugi.doAs((PrivilegedExceptionAction<Container[]>) () -> {
|
||||
Container[] result;
|
||||
ServiceClient sc = getServiceClient();
|
||||
sc.init(YARN_CONFIG);
|
||||
sc.start();
|
||||
result = sc.getContainers(serviceName, componentNames, version,
|
||||
containerStates);
|
||||
sc.close();
|
||||
return result;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Used by negative test case.
|
||||
*
|
||||
|
|
|
@ -23,6 +23,8 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRespons
|
|||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
|
||||
|
@ -55,4 +57,7 @@ public interface ClientAMProtocol {
|
|||
CompInstancesUpgradeResponseProto upgrade(
|
||||
CompInstancesUpgradeRequestProto request) throws IOException,
|
||||
YarnException;
|
||||
|
||||
GetCompInstancesResponseProto getCompInstances(
|
||||
GetCompInstancesRequestProto request) throws IOException, YarnException;
|
||||
}
|
||||
|
|
|
@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRespons
|
|||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
|
||||
|
@ -43,15 +45,18 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto;
|
|||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
|
||||
import org.apache.hadoop.yarn.service.api.records.Container;
|
||||
import org.apache.hadoop.yarn.service.component.ComponentEvent;
|
||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
|
||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
|
||||
import org.apache.hadoop.yarn.service.utils.FilterUtils;
|
||||
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hadoop.yarn.service.component.ComponentEventType.FLEX;
|
||||
|
||||
|
@ -194,4 +199,13 @@ public class ClientAMService extends AbstractService
|
|||
}
|
||||
return CompInstancesUpgradeResponseProto.newBuilder().build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetCompInstancesResponseProto getCompInstances(
|
||||
GetCompInstancesRequestProto request) throws IOException {
|
||||
List<Container> containers = FilterUtils.filterInstances(context, request);
|
||||
return GetCompInstancesResponseProto.newBuilder().setCompInstances(
|
||||
ServiceApiUtil.CONTAINER_JSON_SERDE.toJson(containers.toArray(
|
||||
new Container[containers.size()]))).build();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
|
||||
|
@ -66,6 +68,7 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto
|
|||
import org.apache.hadoop.yarn.service.ClientAMProtocol;
|
||||
import org.apache.hadoop.yarn.service.ServiceMaster;
|
||||
import org.apache.hadoop.yarn.service.api.records.Container;
|
||||
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.service.api.records.Component;
|
||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||
import org.apache.hadoop.yarn.service.api.records.ServiceState;
|
||||
|
@ -100,6 +103,7 @@ import java.nio.ByteBuffer;
|
|||
import java.text.MessageFormat;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hadoop.yarn.api.records.YarnApplicationState.*;
|
||||
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.*;
|
||||
|
@ -318,6 +322,49 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getInstances(String appName,
|
||||
List<String> components, String version, List<String> containerStates)
|
||||
throws IOException, YarnException {
|
||||
GetCompInstancesResponseProto result = filterContainers(appName, components,
|
||||
version, containerStates);
|
||||
return result.getCompInstances();
|
||||
}
|
||||
|
||||
public Container[] getContainers(String appName, List<String> components,
|
||||
String version, List<ContainerState> containerStates)
|
||||
throws IOException, YarnException {
|
||||
GetCompInstancesResponseProto result = filterContainers(appName, components,
|
||||
version, containerStates != null ? containerStates.stream()
|
||||
.map(Enum::toString).collect(Collectors.toList()) : null);
|
||||
|
||||
return ServiceApiUtil.CONTAINER_JSON_SERDE.fromJson(
|
||||
result.getCompInstances());
|
||||
}
|
||||
|
||||
private GetCompInstancesResponseProto filterContainers(String appName,
|
||||
List<String> components, String version,
|
||||
List<String> containerStates) throws IOException, YarnException {
|
||||
ApplicationReport appReport = yarnClient.getApplicationReport(getAppId(
|
||||
appName));
|
||||
if (StringUtils.isEmpty(appReport.getHost())) {
|
||||
throw new YarnException(appName + " AM hostname is empty.");
|
||||
}
|
||||
ClientAMProtocol proxy = createAMProxy(appName, appReport);
|
||||
GetCompInstancesRequestProto.Builder req = GetCompInstancesRequestProto
|
||||
.newBuilder();
|
||||
if (components != null && !components.isEmpty()) {
|
||||
req.addAllComponentNames(components);
|
||||
}
|
||||
if (version != null) {
|
||||
req.setVersion(version);
|
||||
}
|
||||
if (containerStates != null && !containerStates.isEmpty()){
|
||||
req.addAllContainerStates(containerStates);
|
||||
}
|
||||
return proxy.getCompInstances(req.build());
|
||||
}
|
||||
|
||||
public int actionUpgrade(Service service, List<Container> compInstances)
|
||||
throws IOException, YarnException {
|
||||
ApplicationReport appReport =
|
||||
|
|
|
@ -97,6 +97,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|||
private long containerStartedTime = 0;
|
||||
// This container object is used for rest API query
|
||||
private org.apache.hadoop.yarn.service.api.records.Container containerSpec;
|
||||
private String serviceVersion;
|
||||
|
||||
|
||||
private static final StateMachineFactory<ComponentInstance,
|
||||
|
@ -194,6 +195,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|||
compInstance.getCompSpec().addContainer(container);
|
||||
compInstance.containerStartedTime = containerStartTime;
|
||||
compInstance.component.incRunningContainers();
|
||||
compInstance.serviceVersion = compInstance.scheduler.getApp()
|
||||
.getVersion();
|
||||
|
||||
if (compInstance.timelineServiceEnabled) {
|
||||
compInstance.serviceTimelinePublisher
|
||||
|
@ -210,6 +213,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|||
if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) {
|
||||
compInstance.component.incContainersReady(false);
|
||||
compInstance.component.decContainersThatNeedUpgrade();
|
||||
compInstance.serviceVersion = compInstance.component.getUpgradeEvent()
|
||||
.getUpgradeVersion();
|
||||
ComponentEvent checkState = new ComponentEvent(
|
||||
compInstance.component.getName(), ComponentEventType.CHECK_STABLE);
|
||||
compInstance.scheduler.getDispatcher().getEventHandler().handle(
|
||||
|
@ -382,6 +387,30 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the version of service at which the instance is at.
|
||||
*/
|
||||
public String getServiceVersion() {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
return this.serviceVersion;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the state of the container in the container spec.
|
||||
*/
|
||||
public ContainerState getContainerState() {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
return this.containerSpec.getState();
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(ComponentInstanceEvent event) {
|
||||
try {
|
||||
|
@ -667,8 +696,16 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
|
|||
return result;
|
||||
}
|
||||
|
||||
@VisibleForTesting public org.apache.hadoop.yarn.service.api.records
|
||||
/**
|
||||
* Returns container spec.
|
||||
*/
|
||||
public org.apache.hadoop.yarn.service.api.records
|
||||
.Container getContainerSpec() {
|
||||
readLock.lock();
|
||||
try {
|
||||
return containerSpec;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,11 +37,14 @@ public interface RestApiConstants {
|
|||
String COMPONENTS = "components";
|
||||
String COMPONENTS_PATH = SERVICE_PATH + "/" + COMPONENTS;
|
||||
|
||||
// Query param
|
||||
String SERVICE_NAME = "service_name";
|
||||
String COMPONENT_NAME = "component_name";
|
||||
String COMP_INSTANCE_NAME = "component_instance_name";
|
||||
|
||||
String PARAM_COMP_NAME = "componentName";
|
||||
String PARAM_VERSION = "version";
|
||||
String PARAM_CONTAINER_STATE = "containerState";
|
||||
|
||||
String MEDIA_TYPE_JSON_UTF8 = MediaType.APPLICATION_JSON + ";charset=utf-8";
|
||||
|
||||
Long DEFAULT_UNLIMITED_LIFETIME = -1l;
|
||||
|
|
|
@ -34,6 +34,8 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRespons
|
|||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
|
||||
import org.apache.hadoop.yarn.service.impl.pb.service.ClientAMProtocolPB;
|
||||
|
@ -128,4 +130,15 @@ public class ClientAMProtocolPBClientImpl
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetCompInstancesResponseProto getCompInstances(
|
||||
GetCompInstancesRequestProto request) throws IOException, YarnException {
|
||||
try {
|
||||
return proxy.getCompInstances(null, request);
|
||||
} catch (ServiceException e) {
|
||||
RPCUtil.unwrapAndThrowException(e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,8 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequest
|
|||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
|
||||
|
@ -103,4 +105,15 @@ public class ClientAMProtocolPBServiceImpl implements ClientAMProtocolPB {
|
|||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public GetCompInstancesResponseProto getCompInstances(
|
||||
RpcController controller, GetCompInstancesRequestProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
return real.getCompInstances(request);
|
||||
} catch (IOException | YarnException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.service.utils;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol;
|
||||
import org.apache.hadoop.yarn.service.ServiceContext;
|
||||
import org.apache.hadoop.yarn.service.api.records.Container;
|
||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class FilterUtils {
|
||||
|
||||
/**
|
||||
* Returns containers filtered by requested fields.
|
||||
*
|
||||
* @param context service context
|
||||
* @param filterReq filter request
|
||||
*/
|
||||
public static List<Container> filterInstances(ServiceContext context,
|
||||
ClientAMProtocol.GetCompInstancesRequestProto filterReq) {
|
||||
List<Container> results = new ArrayList<>();
|
||||
Map<ContainerId, ComponentInstance> instances =
|
||||
context.scheduler.getLiveInstances();
|
||||
|
||||
instances.forEach(((containerId, instance) -> {
|
||||
boolean include = true;
|
||||
if (filterReq.getComponentNamesList() != null &&
|
||||
!filterReq.getComponentNamesList().isEmpty()) {
|
||||
// filter by component name
|
||||
if (!filterReq.getComponentNamesList().contains(
|
||||
instance.getComponent().getName())) {
|
||||
include = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (filterReq.getVersion() != null && !filterReq.getVersion().isEmpty()) {
|
||||
// filter by version
|
||||
String instanceServiceVersion = instance.getServiceVersion();
|
||||
if (instanceServiceVersion == null || !instanceServiceVersion.equals(
|
||||
filterReq.getVersion())) {
|
||||
include = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (filterReq.getContainerStatesList() != null &&
|
||||
!filterReq.getContainerStatesList().isEmpty()) {
|
||||
// filter by state
|
||||
if (!filterReq.getContainerStatesList().contains(
|
||||
instance.getContainerState().toString())) {
|
||||
include = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (include) {
|
||||
results.add(instance.getContainerSpec());
|
||||
}
|
||||
}));
|
||||
|
||||
return results;
|
||||
}
|
||||
}
|
|
@ -72,6 +72,15 @@ public class ServiceApiUtil {
|
|||
public static JsonSerDeser<Service> jsonSerDeser =
|
||||
new JsonSerDeser<>(Service.class,
|
||||
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
|
||||
|
||||
public static final JsonSerDeser<Container[]> CONTAINER_JSON_SERDE =
|
||||
new JsonSerDeser<>(Container[].class,
|
||||
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
|
||||
|
||||
public static final JsonSerDeser<Component[]> COMP_JSON_SERDE =
|
||||
new JsonSerDeser<>(Component[].class,
|
||||
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
|
||||
|
||||
private static final PatternValidator namePattern
|
||||
= new PatternValidator("[a-z][a-z0-9-]*");
|
||||
|
||||
|
|
|
@ -32,6 +32,8 @@ service ClientAMProtocolService {
|
|||
returns (RestartServiceResponseProto);
|
||||
rpc upgrade(CompInstancesUpgradeRequestProto) returns
|
||||
(CompInstancesUpgradeResponseProto);
|
||||
rpc getCompInstances(GetCompInstancesRequestProto) returns
|
||||
(GetCompInstancesResponseProto);
|
||||
}
|
||||
|
||||
message FlexComponentsRequestProto {
|
||||
|
@ -82,3 +84,13 @@ message CompInstancesUpgradeRequestProto {
|
|||
|
||||
message CompInstancesUpgradeResponseProto {
|
||||
}
|
||||
|
||||
message GetCompInstancesRequestProto {
|
||||
repeated string componentNames = 1;
|
||||
optional string version = 2;
|
||||
repeated string containerStates = 3;
|
||||
}
|
||||
|
||||
message GetCompInstancesResponseProto {
|
||||
optional string compInstances = 1;
|
||||
}
|
|
@ -0,0 +1,154 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.service;
|
||||
|
||||
import org.apache.hadoop.registry.client.api.RegistryOperations;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.client.api.NMClient;
|
||||
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||
import org.apache.hadoop.yarn.service.component.Component;
|
||||
import org.apache.hadoop.yarn.service.component.ComponentEvent;
|
||||
import org.apache.hadoop.yarn.service.component.ComponentEventType;
|
||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
|
||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
|
||||
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
|
||||
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Mocked service context for a running service.
|
||||
*/
|
||||
public class MockRunningServiceContext extends ServiceContext {
|
||||
|
||||
public MockRunningServiceContext(ServiceTestUtils.ServiceFSWatcher fsWatcher,
|
||||
Service serviceDef) throws Exception {
|
||||
super();
|
||||
this.service = serviceDef;
|
||||
this.fs = fsWatcher.getFs();
|
||||
|
||||
ContainerLaunchService mockLaunchService = mock(
|
||||
ContainerLaunchService.class);
|
||||
|
||||
this.scheduler = new ServiceScheduler(this) {
|
||||
@Override
|
||||
protected YarnRegistryViewForProviders
|
||||
createYarnRegistryOperations(
|
||||
ServiceContext context, RegistryOperations registryClient) {
|
||||
return mock(YarnRegistryViewForProviders.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public NMClientAsync createNMClient() {
|
||||
NMClientAsync nmClientAsync = super.createNMClient();
|
||||
NMClient nmClient = mock(NMClient.class);
|
||||
try {
|
||||
when(nmClient.getContainerStatus(anyObject(), anyObject()))
|
||||
.thenAnswer(
|
||||
(Answer<ContainerStatus>) invocation -> ContainerStatus
|
||||
.newInstance((ContainerId) invocation.getArguments()[0],
|
||||
org.apache.hadoop.yarn.api.records.ContainerState
|
||||
.RUNNING,
|
||||
"", 0));
|
||||
} catch (YarnException | IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
nmClientAsync.setClient(nmClient);
|
||||
return nmClientAsync;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerLaunchService getContainerLaunchService() {
|
||||
return mockLaunchService;
|
||||
}
|
||||
};
|
||||
this.scheduler.init(fsWatcher.getConf());
|
||||
|
||||
ServiceTestUtils.createServiceManager(this);
|
||||
|
||||
doNothing().when(mockLaunchService).
|
||||
reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject());
|
||||
stabilizeComponents(this);
|
||||
}
|
||||
|
||||
private void stabilizeComponents(ServiceContext context) {
|
||||
|
||||
ApplicationId appId = ApplicationId.fromString(context.service.getId());
|
||||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
|
||||
context.attemptId = attemptId;
|
||||
Map<String, Component>
|
||||
componentState = context.scheduler.getAllComponents();
|
||||
|
||||
int counter = 0;
|
||||
for (org.apache.hadoop.yarn.service.api.records.Component componentSpec :
|
||||
context.service.getComponents()) {
|
||||
Component component = new org.apache.hadoop.yarn.service.component.
|
||||
Component(componentSpec, 1L, context);
|
||||
componentState.put(component.getName(), component);
|
||||
component.handle(new ComponentEvent(component.getName(),
|
||||
ComponentEventType.FLEX));
|
||||
|
||||
for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) {
|
||||
counter++;
|
||||
assignNewContainer(attemptId, counter, component);
|
||||
}
|
||||
|
||||
component.handle(new ComponentEvent(component.getName(),
|
||||
ComponentEventType.CHECK_STABLE));
|
||||
}
|
||||
}
|
||||
|
||||
public void assignNewContainer(ApplicationAttemptId attemptId,
|
||||
long containerNum, Component component) {
|
||||
|
||||
Container container = org.apache.hadoop.yarn.api.records.Container
|
||||
.newInstance(ContainerId.newContainerId(attemptId, containerNum),
|
||||
NODE_ID, "localhost", null, null,
|
||||
null);
|
||||
component.handle(new ComponentEvent(component.getName(),
|
||||
ComponentEventType.CONTAINER_ALLOCATED)
|
||||
.setContainer(container).setContainerId(container.getId()));
|
||||
ComponentInstance instance = this.scheduler.getLiveInstances().get(
|
||||
container.getId());
|
||||
ComponentInstanceEvent startEvent = new ComponentInstanceEvent(
|
||||
container.getId(), ComponentInstanceEventType.START);
|
||||
instance.handle(startEvent);
|
||||
|
||||
ComponentInstanceEvent readyEvent = new ComponentInstanceEvent(
|
||||
container.getId(), ComponentInstanceEventType.BECOME_READY);
|
||||
instance.handle(readyEvent);
|
||||
}
|
||||
|
||||
private static final NodeId NODE_ID = NodeId.fromString("localhost:0");
|
||||
}
|
|
@ -166,7 +166,7 @@ public class TestServiceCLI {
|
|||
checkApp(serviceName, "master", 1L, 1000L, "qname");
|
||||
}
|
||||
|
||||
@Test (timeout = 180000)
|
||||
@Test
|
||||
public void testInitiateServiceUpgrade() throws Exception {
|
||||
String[] args = {"app", "-upgrade", "app-1",
|
||||
"-initiate", ExampleAppJson.resourceName(ExampleAppJson.APP_JSON),
|
||||
|
@ -185,7 +185,7 @@ public class TestServiceCLI {
|
|||
Assert.assertEquals(result, 0);
|
||||
}
|
||||
|
||||
@Test (timeout = 180000)
|
||||
@Test
|
||||
public void testUpgradeInstances() throws Exception {
|
||||
conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE,
|
||||
DummyServiceClient.class.getName());
|
||||
|
@ -197,7 +197,7 @@ public class TestServiceCLI {
|
|||
Assert.assertEquals(result, 0);
|
||||
}
|
||||
|
||||
@Test (timeout = 180000)
|
||||
@Test
|
||||
public void testUpgradeComponents() throws Exception {
|
||||
conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE,
|
||||
DummyServiceClient.class.getName());
|
||||
|
@ -209,6 +209,18 @@ public class TestServiceCLI {
|
|||
Assert.assertEquals(result, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetInstances() throws Exception {
|
||||
conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE,
|
||||
DummyServiceClient.class.getName());
|
||||
cli.setConf(conf);
|
||||
String[] args = {"container", "-list", "app-1",
|
||||
"-components", "comp1,comp2",
|
||||
"-appTypes", DUMMY_APP_TYPE};
|
||||
int result = cli.run(ApplicationCLI.preProcessArgs(args));
|
||||
Assert.assertEquals(result, 0);
|
||||
}
|
||||
|
||||
@Test (timeout = 180000)
|
||||
public void testEnableFastLaunch() throws Exception {
|
||||
fs.getFileSystem().create(new Path(basedir.getAbsolutePath(), "test.jar"))
|
||||
|
@ -313,5 +325,12 @@ public class TestServiceCLI {
|
|||
throws IOException, YarnException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getInstances(String appName, List<String> components,
|
||||
String version, List<String> containerStates)
|
||||
throws IOException, YarnException {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.service.client;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
|
@ -32,8 +33,12 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
|||
import org.apache.hadoop.yarn.service.ClientAMProtocol;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
|
||||
import org.apache.hadoop.yarn.service.MockRunningServiceContext;
|
||||
import org.apache.hadoop.yarn.service.ServiceContext;
|
||||
import org.apache.hadoop.yarn.service.ServiceTestUtils;
|
||||
import org.apache.hadoop.yarn.service.api.records.Component;
|
||||
import org.apache.hadoop.yarn.service.api.records.Container;
|
||||
|
@ -41,6 +46,7 @@ import org.apache.hadoop.yarn.service.api.records.Service;
|
|||
import org.apache.hadoop.yarn.service.api.records.ServiceState;
|
||||
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
|
||||
import org.apache.hadoop.yarn.service.exceptions.ErrorStrings;
|
||||
import org.apache.hadoop.yarn.service.utils.FilterUtils;
|
||||
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
|
@ -52,6 +58,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
@ -122,6 +129,26 @@ public class TestServiceClient {
|
|||
client.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetCompInstances() throws Exception {
|
||||
Service service = createService();
|
||||
MockServiceClient client = MockServiceClient.create(rule, service, true);
|
||||
|
||||
//upgrade the service
|
||||
service.setVersion("v2");
|
||||
client.initiateUpgrade(service);
|
||||
|
||||
//add containers to the component that needs to be upgraded.
|
||||
Component comp = service.getComponents().iterator().next();
|
||||
ContainerId containerId = ContainerId.newContainerId(client.attemptId, 1L);
|
||||
comp.addContainer(new Container().id(containerId.toString()));
|
||||
|
||||
Container[] containers = client.getContainers(service.getName(),
|
||||
Lists.newArrayList("compa"), "v1", null);
|
||||
Assert.assertEquals("num containers", 2, containers.length);
|
||||
client.stop();
|
||||
}
|
||||
|
||||
private Service createService() throws IOException,
|
||||
YarnException {
|
||||
Service service = ServiceTestUtils.createExampleApplication();
|
||||
|
@ -137,6 +164,7 @@ public class TestServiceClient {
|
|||
private final ClientAMProtocol amProxy;
|
||||
private Object proxyResponse;
|
||||
private Service service;
|
||||
private ServiceContext context;
|
||||
|
||||
private MockServiceClient() {
|
||||
amProxy = mock(ClientAMProtocol.class);
|
||||
|
@ -147,8 +175,12 @@ public class TestServiceClient {
|
|||
|
||||
static MockServiceClient create(ServiceTestUtils.ServiceFSWatcher rule,
|
||||
Service service, boolean enableUpgrade)
|
||||
throws IOException, YarnException {
|
||||
throws Exception {
|
||||
MockServiceClient client = new MockServiceClient();
|
||||
ApplicationId applicationId = ApplicationId.newInstance(
|
||||
System.currentTimeMillis(), 1);
|
||||
service.setId(applicationId.toString());
|
||||
client.context = new MockRunningServiceContext(rule, service);
|
||||
|
||||
YarnClient yarnClient = createMockYarnClient();
|
||||
ApplicationReport appReport = mock(ApplicationReport.class);
|
||||
|
@ -179,6 +211,24 @@ public class TestServiceClient {
|
|||
client.proxyResponse = response;
|
||||
return response;
|
||||
});
|
||||
|
||||
when(client.amProxy.getCompInstances(Matchers.any(
|
||||
GetCompInstancesRequestProto.class))).thenAnswer(
|
||||
(Answer<GetCompInstancesResponseProto>) invocation -> {
|
||||
|
||||
GetCompInstancesRequestProto req = (GetCompInstancesRequestProto)
|
||||
invocation.getArguments()[0];
|
||||
List<Container> containers = FilterUtils.filterInstances(
|
||||
client.context, req);
|
||||
GetCompInstancesResponseProto response =
|
||||
GetCompInstancesResponseProto.newBuilder().setCompInstances(
|
||||
ServiceApiUtil.CONTAINER_JSON_SERDE.toJson(
|
||||
containers.toArray(new Container[containers.size()])))
|
||||
.build();
|
||||
client.proxyResponse = response;
|
||||
return response;
|
||||
});
|
||||
|
||||
client.setFileSystem(rule.getFs());
|
||||
client.setYarnClient(yarnClient);
|
||||
client.service = service;
|
||||
|
|
|
@ -18,19 +18,10 @@
|
|||
|
||||
package org.apache.hadoop.yarn.service.component;
|
||||
|
||||
import org.apache.hadoop.registry.client.api.RegistryOperations;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.client.api.NMClient;
|
||||
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.service.ServiceContext;
|
||||
import org.apache.hadoop.yarn.service.ServiceScheduler;
|
||||
import org.apache.hadoop.yarn.service.ServiceTestUtils;
|
||||
import org.apache.hadoop.yarn.service.TestServiceManager;
|
||||
import org.apache.hadoop.yarn.service.api.records.ComponentState;
|
||||
|
@ -38,23 +29,15 @@ import org.apache.hadoop.yarn.service.api.records.Service;
|
|||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
|
||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
|
||||
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
|
||||
|
||||
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
|
||||
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
|
||||
import org.apache.hadoop.yarn.service.MockRunningServiceContext;
|
||||
import org.apache.log4j.Logger;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP;
|
||||
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import static org.mockito.Mockito.doNothing;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -63,7 +46,6 @@ import static org.mockito.Mockito.when;
|
|||
*/
|
||||
public class TestComponent {
|
||||
|
||||
private static final int WAIT_MS_PER_LOOP = 1000;
|
||||
static final Logger LOG = Logger.getLogger(TestComponent.class);
|
||||
|
||||
@Rule
|
||||
|
@ -115,7 +97,7 @@ public class TestComponent {
|
|||
@Test
|
||||
public void testContainerCompletedWhenUpgrading() throws Exception {
|
||||
String serviceName = "testContainerComplete";
|
||||
ServiceContext context = createTestContext(rule, serviceName);
|
||||
MockRunningServiceContext context = createTestContext(rule, serviceName);
|
||||
Component comp = context.scheduler.getAllComponents().entrySet().iterator()
|
||||
.next().getValue();
|
||||
|
||||
|
@ -148,7 +130,7 @@ public class TestComponent {
|
|||
ComponentState.FLEXING, comp.getComponentSpec().getState());
|
||||
|
||||
// new container get allocated
|
||||
assignNewContainer(context.attemptId, 10, context, comp);
|
||||
context.assignNewContainer(context.attemptId, 10, comp);
|
||||
|
||||
// second instance finished upgrading
|
||||
ComponentInstance instance2 = instanceIter.next();
|
||||
|
@ -174,7 +156,7 @@ public class TestComponent {
|
|||
serviceName);
|
||||
TestServiceManager.createDef(serviceName, testService);
|
||||
|
||||
ServiceContext context = createTestContext(rule, testService);
|
||||
ServiceContext context = new MockRunningServiceContext(rule, testService);
|
||||
|
||||
for (Component comp : context.scheduler.getAllComponents().values()) {
|
||||
|
||||
|
@ -225,114 +207,11 @@ public class TestComponent {
|
|||
return spec;
|
||||
}
|
||||
|
||||
public static ServiceContext createTestContext(
|
||||
public static MockRunningServiceContext createTestContext(
|
||||
ServiceTestUtils.ServiceFSWatcher fsWatcher, String serviceName)
|
||||
throws Exception {
|
||||
return createTestContext(fsWatcher,
|
||||
return new MockRunningServiceContext(fsWatcher,
|
||||
TestServiceManager.createBaseDef(serviceName));
|
||||
}
|
||||
|
||||
public static ServiceContext createTestContext(
|
||||
ServiceTestUtils.ServiceFSWatcher fsWatcher, Service serviceDef)
|
||||
throws Exception {
|
||||
ServiceContext context = new ServiceContext();
|
||||
context.service = serviceDef;
|
||||
context.fs = fsWatcher.getFs();
|
||||
|
||||
ContainerLaunchService mockLaunchService = mock(
|
||||
ContainerLaunchService.class);
|
||||
|
||||
context.scheduler = new ServiceScheduler(context) {
|
||||
@Override protected YarnRegistryViewForProviders
|
||||
createYarnRegistryOperations(
|
||||
ServiceContext context, RegistryOperations registryClient) {
|
||||
return mock(YarnRegistryViewForProviders.class);
|
||||
}
|
||||
|
||||
@Override public NMClientAsync createNMClient() {
|
||||
NMClientAsync nmClientAsync = super.createNMClient();
|
||||
NMClient nmClient = mock(NMClient.class);
|
||||
try {
|
||||
when(nmClient.getContainerStatus(anyObject(), anyObject()))
|
||||
.thenAnswer(
|
||||
(Answer<ContainerStatus>) invocation -> ContainerStatus
|
||||
.newInstance((ContainerId) invocation.getArguments()[0],
|
||||
org.apache.hadoop.yarn.api.records.ContainerState
|
||||
.RUNNING,
|
||||
"", 0));
|
||||
} catch (YarnException | IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
nmClientAsync.setClient(nmClient);
|
||||
return nmClientAsync;
|
||||
}
|
||||
|
||||
@Override public ContainerLaunchService getContainerLaunchService() {
|
||||
return mockLaunchService;
|
||||
}
|
||||
};
|
||||
context.scheduler.init(fsWatcher.getConf());
|
||||
|
||||
ServiceTestUtils.createServiceManager(context);
|
||||
|
||||
doNothing().when(mockLaunchService).
|
||||
reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject());
|
||||
stabilizeComponents(context);
|
||||
|
||||
return context;
|
||||
}
|
||||
|
||||
private static void stabilizeComponents(ServiceContext context) {
|
||||
|
||||
ApplicationId appId = ApplicationId.fromString(context.service.getId());
|
||||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
|
||||
context.attemptId = attemptId;
|
||||
Map<String, Component>
|
||||
componentState = context.scheduler.getAllComponents();
|
||||
|
||||
int counter = 0;
|
||||
for (org.apache.hadoop.yarn.service.api.records.Component componentSpec :
|
||||
context.service.getComponents()) {
|
||||
Component component = new org.apache.hadoop.yarn.service.component.
|
||||
Component(componentSpec, 1L, context);
|
||||
componentState.put(component.getName(), component);
|
||||
component.handle(new ComponentEvent(component.getName(),
|
||||
ComponentEventType.FLEX));
|
||||
|
||||
for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) {
|
||||
counter++;
|
||||
assignNewContainer(attemptId, counter, context, component);
|
||||
}
|
||||
|
||||
component.handle(new ComponentEvent(component.getName(),
|
||||
ComponentEventType.CHECK_STABLE));
|
||||
}
|
||||
}
|
||||
|
||||
private static void assignNewContainer(
|
||||
ApplicationAttemptId attemptId, long containerNum,
|
||||
ServiceContext context, Component component) {
|
||||
|
||||
|
||||
Container container = org.apache.hadoop.yarn.api.records.Container
|
||||
.newInstance(ContainerId.newContainerId(attemptId, containerNum),
|
||||
NODE_ID, "localhost", null, null,
|
||||
null);
|
||||
component.handle(new ComponentEvent(component.getName(),
|
||||
ComponentEventType.CONTAINER_ALLOCATED)
|
||||
.setContainer(container).setContainerId(container.getId()));
|
||||
ComponentInstance instance = context.scheduler.getLiveInstances().get(
|
||||
container.getId());
|
||||
ComponentInstanceEvent startEvent = new ComponentInstanceEvent(
|
||||
container.getId(), ComponentInstanceEventType.START);
|
||||
instance.handle(startEvent);
|
||||
|
||||
ComponentInstanceEvent readyEvent = new ComponentInstanceEvent(
|
||||
container.getId(), ComponentInstanceEventType.BECOME_READY);
|
||||
instance.handle(readyEvent);
|
||||
}
|
||||
|
||||
private static final NodeId NODE_ID = NodeId.fromString("localhost:0");
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -6,9 +6,9 @@
|
|||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
|
@ -60,19 +60,20 @@ import static org.mockito.Mockito.when;
|
|||
*/
|
||||
public class TestComponentInstance {
|
||||
|
||||
@Rule public ServiceTestUtils.ServiceFSWatcher rule =
|
||||
@Rule
|
||||
public ServiceTestUtils.ServiceFSWatcher rule =
|
||||
new ServiceTestUtils.ServiceFSWatcher();
|
||||
|
||||
@Test public void testContainerUpgrade() throws Exception {
|
||||
@Test
|
||||
public void testContainerUpgrade() throws Exception {
|
||||
ServiceContext context = TestComponent.createTestContext(rule,
|
||||
"testContainerUpgrade");
|
||||
Component component =
|
||||
context.scheduler.getAllComponents().entrySet().iterator().next()
|
||||
.getValue();
|
||||
Component component = context.scheduler.getAllComponents().entrySet()
|
||||
.iterator().next().getValue();
|
||||
upgradeComponent(component);
|
||||
|
||||
ComponentInstance instance =
|
||||
component.getAllComponentInstances().iterator().next();
|
||||
ComponentInstance instance = component.getAllComponentInstances().iterator()
|
||||
.next();
|
||||
ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent(
|
||||
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
|
||||
instance.handle(instanceEvent);
|
||||
|
@ -82,16 +83,16 @@ public class TestComponentInstance {
|
|||
containerSpec.getState());
|
||||
}
|
||||
|
||||
@Test public void testContainerReadyAfterUpgrade() throws Exception {
|
||||
@Test
|
||||
public void testContainerReadyAfterUpgrade() throws Exception {
|
||||
ServiceContext context = TestComponent.createTestContext(rule,
|
||||
"testContainerStarted");
|
||||
Component component =
|
||||
context.scheduler.getAllComponents().entrySet().iterator().next()
|
||||
.getValue();
|
||||
Component component = context.scheduler.getAllComponents().entrySet()
|
||||
.iterator().next().getValue();
|
||||
upgradeComponent(component);
|
||||
|
||||
ComponentInstance instance =
|
||||
component.getAllComponentInstances().iterator().next();
|
||||
ComponentInstance instance = component.getAllComponentInstances().iterator()
|
||||
.next();
|
||||
|
||||
ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent(
|
||||
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
|
||||
|
@ -100,9 +101,8 @@ public class TestComponentInstance {
|
|||
instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
|
||||
ComponentInstanceEventType.BECOME_READY));
|
||||
Assert.assertEquals("instance not ready", ContainerState.READY,
|
||||
instance.getCompSpec()
|
||||
.getContainer(instance.getContainer().getId().toString())
|
||||
.getState());
|
||||
instance.getCompSpec().getContainer(
|
||||
instance.getContainer().getId().toString()).getState());
|
||||
}
|
||||
|
||||
private void upgradeComponent(Component component) {
|
||||
|
@ -113,9 +113,8 @@ public class TestComponentInstance {
|
|||
|
||||
private Component createComponent(ServiceScheduler scheduler,
|
||||
org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
|
||||
restartPolicy,
|
||||
int nSucceededInstances, int nFailedInstances, int totalAsk,
|
||||
int componentId) {
|
||||
restartPolicy, int nSucceededInstances, int nFailedInstances,
|
||||
int totalAsk, int componentId) {
|
||||
|
||||
assert (nSucceededInstances + nFailedInstances) <= totalAsk;
|
||||
|
||||
|
@ -214,7 +213,8 @@ public class TestComponentInstance {
|
|||
return componentInstance;
|
||||
}
|
||||
|
||||
@Test public void testComponentRestartPolicy() {
|
||||
@Test
|
||||
public void testComponentRestartPolicy() {
|
||||
|
||||
Map<String, Component> allComponents = new HashMap<>();
|
||||
Service mockService = mock(Service.class);
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.service.utils;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto;
|
||||
import org.apache.hadoop.yarn.service.ServiceContext;
|
||||
import org.apache.hadoop.yarn.service.ServiceTestUtils;
|
||||
import org.apache.hadoop.yarn.service.TestServiceManager;
|
||||
import org.apache.hadoop.yarn.service.api.records.Container;
|
||||
import org.apache.hadoop.yarn.service.MockRunningServiceContext;
|
||||
import org.apache.hadoop.yarn.service.api.records.ContainerState;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class TestFilterUtils {
|
||||
|
||||
@Rule
|
||||
public ServiceTestUtils.ServiceFSWatcher rule =
|
||||
new ServiceTestUtils.ServiceFSWatcher();
|
||||
|
||||
@Test
|
||||
public void testNoFilter() throws Exception {
|
||||
GetCompInstancesRequestProto req = GetCompInstancesRequestProto.newBuilder()
|
||||
.build();
|
||||
List<Container> containers = FilterUtils.filterInstances(
|
||||
new MockRunningServiceContext(rule,
|
||||
TestServiceManager.createBaseDef("service")), req);
|
||||
Assert.assertEquals("num containers", 4, containers.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilterWithComp() throws Exception {
|
||||
GetCompInstancesRequestProto req = GetCompInstancesRequestProto.newBuilder()
|
||||
.addAllComponentNames(Lists.newArrayList("compa")).build();
|
||||
List<Container> containers = FilterUtils.filterInstances(
|
||||
new MockRunningServiceContext(rule,
|
||||
TestServiceManager.createBaseDef("service")), req);
|
||||
Assert.assertEquals("num containers", 2, containers.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilterWithVersion() throws Exception {
|
||||
ServiceContext sc = new MockRunningServiceContext(rule,
|
||||
TestServiceManager.createBaseDef("service"));
|
||||
GetCompInstancesRequestProto.Builder reqBuilder =
|
||||
GetCompInstancesRequestProto.newBuilder();
|
||||
|
||||
reqBuilder.setVersion("v2");
|
||||
Assert.assertEquals("num containers", 0,
|
||||
FilterUtils.filterInstances(sc, reqBuilder.build()).size());
|
||||
|
||||
reqBuilder.addAllComponentNames(Lists.newArrayList("compa"))
|
||||
.setVersion("v1").build();
|
||||
|
||||
Assert.assertEquals("num containers", 2,
|
||||
FilterUtils.filterInstances(sc, reqBuilder.build()).size());
|
||||
|
||||
reqBuilder.setVersion("v2").build();
|
||||
Assert.assertEquals("num containers", 0,
|
||||
FilterUtils.filterInstances(sc, reqBuilder.build()).size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFilterWithState() throws Exception {
|
||||
ServiceContext sc = new MockRunningServiceContext(rule,
|
||||
TestServiceManager.createBaseDef("service"));
|
||||
GetCompInstancesRequestProto.Builder reqBuilder =
|
||||
GetCompInstancesRequestProto.newBuilder();
|
||||
|
||||
reqBuilder.addAllContainerStates(Lists.newArrayList(
|
||||
ContainerState.READY.toString()));
|
||||
Assert.assertEquals("num containers", 4,
|
||||
FilterUtils.filterInstances(sc, reqBuilder.build()).size());
|
||||
|
||||
reqBuilder.clearContainerStates();
|
||||
reqBuilder.addAllContainerStates(Lists.newArrayList(
|
||||
ContainerState.STOPPED.toString()));
|
||||
Assert.assertEquals("num containers", 0,
|
||||
FilterUtils.filterInstances(sc, reqBuilder.build()).size());
|
||||
}
|
||||
|
||||
}
|
|
@ -105,6 +105,8 @@ public class ApplicationCLI extends YarnCLI {
|
|||
public static final String UPGRADE_FINALIZE = "finalize";
|
||||
public static final String COMPONENT_INSTS = "instances";
|
||||
public static final String COMPONENTS = "components";
|
||||
public static final String VERSION = "version";
|
||||
public static final String STATES = "states";
|
||||
|
||||
private static String firstArg = null;
|
||||
|
||||
|
@ -294,10 +296,39 @@ public class ApplicationCLI extends YarnCLI {
|
|||
opts.addOption(STATUS_CMD, true,
|
||||
"Prints the status of the container.");
|
||||
opts.addOption(LIST_CMD, true,
|
||||
"List containers for application attempt.");
|
||||
"List containers for application attempt when application " +
|
||||
"attempt ID is provided. When application name is provided, " +
|
||||
"then it finds the instances of the application based on app's " +
|
||||
"own implementation, and -appTypes option must be specified " +
|
||||
"unless it is the default yarn-service type. With app name, it " +
|
||||
"supports optional use of -version to filter instances based on " +
|
||||
"app version, -components to filter instances based on component " +
|
||||
"names, -states to filter instances based on instance state.");
|
||||
opts.addOption(HELP_CMD, false, "Displays help for all commands.");
|
||||
opts.getOption(STATUS_CMD).setArgName("Container ID");
|
||||
opts.getOption(LIST_CMD).setArgName("Application Attempt ID");
|
||||
opts.getOption(LIST_CMD).setArgName("Application Name or Attempt ID");
|
||||
opts.addOption(APP_TYPE_CMD, true, "Works with -list to " +
|
||||
"specify the app type when application name is provided.");
|
||||
opts.getOption(APP_TYPE_CMD).setValueSeparator(',');
|
||||
opts.getOption(APP_TYPE_CMD).setArgs(Option.UNLIMITED_VALUES);
|
||||
opts.getOption(APP_TYPE_CMD).setArgName("Types");
|
||||
|
||||
opts.addOption(VERSION, true, "Works with -list "
|
||||
+ "to filter instances based on input application version.");
|
||||
opts.getOption(VERSION).setArgs(1);
|
||||
|
||||
opts.addOption(COMPONENTS, true, "Works with -list to " +
|
||||
"filter instances based on input comma-separated list of " +
|
||||
"component names.");
|
||||
opts.getOption(COMPONENTS).setValueSeparator(',');
|
||||
opts.getOption(COMPONENTS).setArgs(Option.UNLIMITED_VALUES);
|
||||
|
||||
opts.addOption(STATES, true, "Works with -list to " +
|
||||
"filter instances based on input comma-separated list of " +
|
||||
"instance states.");
|
||||
opts.getOption(STATES).setValueSeparator(',');
|
||||
opts.getOption(STATES).setArgs(Option.UNLIMITED_VALUES);
|
||||
|
||||
opts.addOption(SIGNAL_CMD, true,
|
||||
"Signal the container. The available signal commands are " +
|
||||
java.util.Arrays.asList(SignalContainerCommand.values()) +
|
||||
|
@ -426,11 +457,40 @@ public class ApplicationCLI extends YarnCLI {
|
|||
}
|
||||
listApplicationAttempts(cliParser.getOptionValue(LIST_CMD));
|
||||
} else if (title.equalsIgnoreCase(CONTAINER)) {
|
||||
if (hasAnyOtherCLIOptions(cliParser, opts, LIST_CMD)) {
|
||||
if (hasAnyOtherCLIOptions(cliParser, opts, LIST_CMD, APP_TYPE_CMD,
|
||||
VERSION, COMPONENTS, STATES)) {
|
||||
printUsage(title, opts);
|
||||
return exitCode;
|
||||
}
|
||||
listContainers(cliParser.getOptionValue(LIST_CMD));
|
||||
String appAttemptIdOrName = cliParser.getOptionValue(LIST_CMD);
|
||||
try {
|
||||
// try parsing attempt id, if it succeeds, it means it's appId
|
||||
ApplicationAttemptId.fromString(appAttemptIdOrName);
|
||||
listContainers(appAttemptIdOrName);
|
||||
} catch (IllegalArgumentException e) {
|
||||
// not appAttemptId format, it could be appName. If app-type is not
|
||||
// provided, assume it is yarn-service type.
|
||||
AppAdminClient client = AppAdminClient
|
||||
.createAppAdminClient(getSingleAppTypeFromCLI(cliParser),
|
||||
getConf());
|
||||
String version = cliParser.getOptionValue(VERSION);
|
||||
String[] components = cliParser.getOptionValues(COMPONENTS);
|
||||
String[] instanceStates = cliParser.getOptionValues(STATES);
|
||||
try {
|
||||
sysout.println(client.getInstances(appAttemptIdOrName,
|
||||
components == null ? null : Arrays.asList(components),
|
||||
version, instanceStates == null ? null :
|
||||
Arrays.asList(instanceStates)));
|
||||
return 0;
|
||||
} catch (ApplicationNotFoundException exception) {
|
||||
System.err.println("Application with name '" + appAttemptIdOrName
|
||||
+ "' doesn't exist in RM or Timeline Server.");
|
||||
return -1;
|
||||
} catch (Exception ex) {
|
||||
System.err.println(ex.getMessage());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (cliParser.hasOption(KILL_CMD)) {
|
||||
if (hasAnyOtherCLIOptions(cliParser, opts, KILL_CMD)) {
|
||||
|
|
|
@ -2280,13 +2280,17 @@ public class TestYarnCLI {
|
|||
ByteArrayOutputStream baos = new ByteArrayOutputStream();
|
||||
PrintWriter pw = new PrintWriter(baos);
|
||||
pw.println("usage: container");
|
||||
pw.println(" -appTypes <Types> Works with -list to specify the app type when application name is provided.");
|
||||
pw.println(" -components <arg> Works with -list to filter instances based on input comma-separated list of component names.");
|
||||
pw.println(" -help Displays help for all commands.");
|
||||
pw.println(" -list <Application Attempt ID> List containers for application attempt.");
|
||||
pw.println(" -list <Application Name or Attempt ID> List containers for application attempt when application attempt ID is provided. When application name is provided, then it finds the instances of the application based on app's own implementation, and -appTypes option must be specified unless it is the default yarn-service type. With app name, it supports optional use of -version to filter instances based on app version, -components to filter instances based on component names, -states to filter instances based on instance state.");
|
||||
pw.println(" -signal <container ID [signal command]> Signal the container.");
|
||||
pw.println("The available signal commands are ");
|
||||
pw.println(java.util.Arrays.asList(SignalContainerCommand.values()));
|
||||
pw.println(" Default command is OUTPUT_THREAD_DUMP.");
|
||||
pw.println(" -states <arg> Works with -list to filter instances based on input comma-separated list of instance states.");
|
||||
pw.println(" -status <Container ID> Prints the status of the container.");
|
||||
pw.println(" -version <arg> Works with -list to filter instances based on input application version. ");
|
||||
pw.close();
|
||||
try {
|
||||
return normalize(baos.toString("UTF-8"));
|
||||
|
|
|
@ -282,4 +282,10 @@ public abstract class AppAdminClient extends CompositeService {
|
|||
public abstract int actionCleanUp(String appName, String userName) throws
|
||||
IOException, YarnException;
|
||||
|
||||
@Public
|
||||
@Unstable
|
||||
public abstract String getInstances(String appName,
|
||||
List<String> components, String version, List<String> containerStates)
|
||||
throws IOException, YarnException;
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue