YARN-8299. Yarn Service Upgrade: Add GET APIs that returns instances matching query params. (Chandni Singh via wangda)

Change-Id: I7f2aef89c6053d5dc5955f79fdfcd4cd74f74737
This commit is contained in:
Wangda Tan 2018-11-16 10:32:10 -08:00
parent 095635d984
commit 077a7ab37b
21 changed files with 772 additions and 202 deletions

View File

@ -29,6 +29,7 @@ import java.util.Map;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.UriBuilder;
import com.google.common.base.Preconditions;
import org.apache.commons.codec.binary.Base64;
@ -55,10 +56,8 @@ import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.api.records.ServiceStatus;
import org.apache.hadoop.yarn.service.conf.RestApiConstants;
import org.apache.hadoop.yarn.service.utils.JsonSerDeser;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.apache.hadoop.yarn.util.RMHAUtils;
import org.codehaus.jackson.map.PropertyNamingStrategy;
import org.eclipse.jetty.util.UrlEncoded;
import org.ietf.jgss.GSSContext;
import org.ietf.jgss.GSSException;
@ -216,11 +215,7 @@ public class ApiServiceClient extends AppAdminClient {
api.append("/");
api.append(appName);
}
Configuration conf = getConfig();
if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase("simple")) {
api.append("?user.name=" + UrlEncoded
.encodeString(System.getProperty("user.name")));
}
appendUserNameIfRequired(api);
return api.toString();
}
@ -231,15 +226,27 @@ public class ApiServiceClient extends AppAdminClient {
api.append(url);
api.append("/app/v1/services/").append(appName).append("/")
.append(RestApiConstants.COMP_INSTANCES);
Configuration conf = getConfig();
if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase(
"simple")) {
api.append("?user.name=" + UrlEncoded
.encodeString(System.getProperty("user.name")));
}
appendUserNameIfRequired(api);
return api.toString();
}
private String getInstancePath(String appName, List<String> components,
String version, List<String> containerStates) throws IOException {
UriBuilder builder = UriBuilder.fromUri(getInstancesPath(appName));
if (components != null && !components.isEmpty()) {
components.forEach(compName ->
builder.queryParam(RestApiConstants.PARAM_COMP_NAME, compName));
}
if (!Strings.isNullOrEmpty(version)){
builder.queryParam(RestApiConstants.PARAM_VERSION, version);
}
if (containerStates != null && !containerStates.isEmpty()){
containerStates.forEach(state ->
builder.queryParam(RestApiConstants.PARAM_CONTAINER_STATE, state));
}
return builder.build().toString();
}
private String getComponentsPath(String appName) throws IOException {
Preconditions.checkNotNull(appName);
String url = getRMWebAddress();
@ -247,13 +254,17 @@ public class ApiServiceClient extends AppAdminClient {
api.append(url);
api.append("/app/v1/services/").append(appName).append("/")
.append(RestApiConstants.COMPONENTS);
appendUserNameIfRequired(api);
return api.toString();
}
private void appendUserNameIfRequired(StringBuilder builder) {
Configuration conf = getConfig();
if (conf.get("hadoop.http.authentication.type").equalsIgnoreCase(
"simple")) {
api.append("?user.name=" + UrlEncoded
builder.append("?user.name=").append(UrlEncoded
.encodeString(System.getProperty("user.name")));
}
return api.toString();
}
private Builder getApiClient() throws IOException {
@ -627,7 +638,7 @@ public class ApiServiceClient extends AppAdminClient {
container.setState(ContainerState.UPGRADING);
toUpgrade[idx++] = container;
}
String buffer = CONTAINER_JSON_SERDE.toJson(toUpgrade);
String buffer = ServiceApiUtil.CONTAINER_JSON_SERDE.toJson(toUpgrade);
ClientResponse response = getApiClient(getInstancesPath(appName))
.put(ClientResponse.class, buffer);
result = processResponse(response);
@ -651,7 +662,7 @@ public class ApiServiceClient extends AppAdminClient {
component.setState(ComponentState.UPGRADING);
toUpgrade[idx++] = component;
}
String buffer = COMP_JSON_SERDE.toJson(toUpgrade);
String buffer = ServiceApiUtil.COMP_JSON_SERDE.toJson(toUpgrade);
ClientResponse response = getApiClient(getComponentsPath(appName))
.put(ClientResponse.class, buffer);
result = processResponse(response);
@ -673,11 +684,25 @@ public class ApiServiceClient extends AppAdminClient {
return result;
}
private static final JsonSerDeser<Container[]> CONTAINER_JSON_SERDE =
new JsonSerDeser<>(Container[].class,
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
private static final JsonSerDeser<Component[]> COMP_JSON_SERDE =
new JsonSerDeser<>(Component[].class,
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
@Override
public String getInstances(String appName, List<String> components,
String version, List<String> containerStates) throws IOException,
YarnException {
try {
String uri = getInstancePath(appName, components, version,
containerStates);
ClientResponse response = getApiClient(uri).get(ClientResponse.class);
if (response.getStatus() != 200) {
StringBuilder sb = new StringBuilder();
sb.append("Failed: HTTP error code: ");
sb.append(response.getStatus());
sb.append(" ErrorMsg: ").append(response.getEntity(String.class));
return sb.toString();
}
return response.getEntity(String.class);
} catch (Exception e) {
LOG.error("Fail to get containers {}", e);
}
return null;
}
}

View File

@ -44,14 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.*;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@ -61,13 +54,7 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
import static org.apache.hadoop.yarn.service.api.records.ServiceState.ACCEPTED;
@ -582,6 +569,40 @@ public class ApiServer {
return Response.status(Status.NO_CONTENT).build();
}
@GET
@Path(COMP_INSTANCES_PATH)
@Produces({RestApiConstants.MEDIA_TYPE_JSON_UTF8})
public Response getComponentInstances(@Context HttpServletRequest request,
@PathParam(SERVICE_NAME) String serviceName,
@QueryParam(PARAM_COMP_NAME) List<String> componentNames,
@QueryParam(PARAM_VERSION) String version,
@QueryParam(PARAM_CONTAINER_STATE) List<String> containerStates) {
try {
UserGroupInformation ugi = getProxyUser(request);
LOG.info("GET: component instances for service = {}, compNames in {}, " +
"version = {}, containerStates in {}, user = {}", serviceName,
Objects.toString(componentNames, "[]"), Objects.toString(version, ""),
Objects.toString(containerStates, "[]"), ugi);
List<ContainerState> containerStatesDe = containerStates.stream().map(
ContainerState::valueOf).collect(Collectors.toList());
return Response.ok(getContainers(ugi, serviceName, componentNames,
version, containerStatesDe)).build();
} catch (IllegalArgumentException iae) {
return formatResponse(Status.BAD_REQUEST, "valid container states are: " +
Arrays.toString(ContainerState.values()));
} catch (AccessControlException e) {
return formatResponse(Response.Status.FORBIDDEN, e.getMessage());
} catch (IOException | InterruptedException e) {
return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
e.getMessage());
} catch (UndeclaredThrowableException e) {
return formatResponse(Response.Status.INTERNAL_SERVER_ERROR,
e.getCause().getMessage());
}
}
private Response flexService(Service service, UserGroupInformation ugi)
throws IOException, InterruptedException {
String appName = service.getName();
@ -752,6 +773,22 @@ public class ApiServer {
});
}
private Container[] getContainers(UserGroupInformation ugi,
String serviceName, List<String> componentNames, String version,
List<ContainerState> containerStates) throws IOException,
InterruptedException {
return ugi.doAs((PrivilegedExceptionAction<Container[]>) () -> {
Container[] result;
ServiceClient sc = getServiceClient();
sc.init(YARN_CONFIG);
sc.start();
result = sc.getContainers(serviceName, componentNames, version,
containerStates);
sc.close();
return result;
});
}
/**
* Used by negative test case.
*

View File

@ -23,6 +23,8 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRespons
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
@ -55,4 +57,7 @@ public interface ClientAMProtocol {
CompInstancesUpgradeResponseProto upgrade(
CompInstancesUpgradeRequestProto request) throws IOException,
YarnException;
GetCompInstancesResponseProto getCompInstances(
GetCompInstancesRequestProto request) throws IOException, YarnException;
}

View File

@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRespons
import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
@ -43,15 +45,18 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.StopResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
import org.apache.hadoop.yarn.service.utils.FilterUtils;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import static org.apache.hadoop.yarn.service.component.ComponentEventType.FLEX;
@ -194,4 +199,13 @@ public class ClientAMService extends AbstractService
}
return CompInstancesUpgradeResponseProto.newBuilder().build();
}
@Override
public GetCompInstancesResponseProto getCompInstances(
GetCompInstancesRequestProto request) throws IOException {
List<Container> containers = FilterUtils.filterInstances(context, request);
return GetCompInstancesResponseProto.newBuilder().setCompInstances(
ServiceApiUtil.CONTAINER_JSON_SERDE.toJson(containers.toArray(
new Container[containers.size()]))).build();
}
}

View File

@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.ComponentCountProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
@ -66,6 +68,7 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto
import org.apache.hadoop.yarn.service.ClientAMProtocol;
import org.apache.hadoop.yarn.service.ServiceMaster;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
@ -100,6 +103,7 @@ import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import static org.apache.hadoop.yarn.api.records.YarnApplicationState.*;
import static org.apache.hadoop.yarn.service.conf.YarnServiceConf.*;
@ -318,6 +322,49 @@ public class ServiceClient extends AppAdminClient implements SliderExitCodes,
}
}
@Override
public String getInstances(String appName,
List<String> components, String version, List<String> containerStates)
throws IOException, YarnException {
GetCompInstancesResponseProto result = filterContainers(appName, components,
version, containerStates);
return result.getCompInstances();
}
public Container[] getContainers(String appName, List<String> components,
String version, List<ContainerState> containerStates)
throws IOException, YarnException {
GetCompInstancesResponseProto result = filterContainers(appName, components,
version, containerStates != null ? containerStates.stream()
.map(Enum::toString).collect(Collectors.toList()) : null);
return ServiceApiUtil.CONTAINER_JSON_SERDE.fromJson(
result.getCompInstances());
}
private GetCompInstancesResponseProto filterContainers(String appName,
List<String> components, String version,
List<String> containerStates) throws IOException, YarnException {
ApplicationReport appReport = yarnClient.getApplicationReport(getAppId(
appName));
if (StringUtils.isEmpty(appReport.getHost())) {
throw new YarnException(appName + " AM hostname is empty.");
}
ClientAMProtocol proxy = createAMProxy(appName, appReport);
GetCompInstancesRequestProto.Builder req = GetCompInstancesRequestProto
.newBuilder();
if (components != null && !components.isEmpty()) {
req.addAllComponentNames(components);
}
if (version != null) {
req.setVersion(version);
}
if (containerStates != null && !containerStates.isEmpty()){
req.addAllContainerStates(containerStates);
}
return proxy.getCompInstances(req.build());
}
public int actionUpgrade(Service service, List<Container> compInstances)
throws IOException, YarnException {
ApplicationReport appReport =

View File

@ -99,6 +99,7 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
private long containerStartedTime = 0;
// This container object is used for rest API query
private org.apache.hadoop.yarn.service.api.records.Container containerSpec;
private String serviceVersion;
private static final StateMachineFactory<ComponentInstance,
@ -196,6 +197,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
compInstance.getCompSpec().addContainer(container);
compInstance.containerStartedTime = containerStartTime;
compInstance.component.incRunningContainers();
compInstance.serviceVersion = compInstance.scheduler.getApp()
.getVersion();
if (compInstance.timelineServiceEnabled) {
compInstance.serviceTimelinePublisher
@ -212,6 +215,8 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
if (compInstance.getState().equals(ComponentInstanceState.UPGRADING)) {
compInstance.component.incContainersReady(false);
compInstance.component.decContainersThatNeedUpgrade();
compInstance.serviceVersion = compInstance.component.getUpgradeEvent()
.getUpgradeVersion();
ComponentEvent checkState = new ComponentEvent(
compInstance.component.getName(), ComponentEventType.CHECK_STABLE);
compInstance.scheduler.getDispatcher().getEventHandler().handle(
@ -397,6 +402,30 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
}
}
/**
* Returns the version of service at which the instance is at.
*/
public String getServiceVersion() {
this.readLock.lock();
try {
return this.serviceVersion;
} finally {
this.readLock.unlock();
}
}
/**
* Returns the state of the container in the container spec.
*/
public ContainerState getContainerState() {
this.readLock.lock();
try {
return this.containerSpec.getState();
} finally {
this.readLock.unlock();
}
}
@Override
public void handle(ComponentInstanceEvent event) {
try {
@ -682,8 +711,16 @@ public class ComponentInstance implements EventHandler<ComponentInstanceEvent>,
return result;
}
@VisibleForTesting public org.apache.hadoop.yarn.service.api.records
/**
* Returns container spec.
*/
public org.apache.hadoop.yarn.service.api.records
.Container getContainerSpec() {
readLock.lock();
try {
return containerSpec;
} finally {
readLock.unlock();
}
}
}

View File

@ -37,11 +37,14 @@ public interface RestApiConstants {
String COMPONENTS = "components";
String COMPONENTS_PATH = SERVICE_PATH + "/" + COMPONENTS;
// Query param
String SERVICE_NAME = "service_name";
String COMPONENT_NAME = "component_name";
String COMP_INSTANCE_NAME = "component_instance_name";
String PARAM_COMP_NAME = "componentName";
String PARAM_VERSION = "version";
String PARAM_CONTAINER_STATE = "containerState";
String MEDIA_TYPE_JSON_UTF8 = MediaType.APPLICATION_JSON + ";charset=utf-8";
Long DEFAULT_UNLIMITED_LIFETIME = -1l;

View File

@ -34,6 +34,8 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRespons
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
import org.apache.hadoop.yarn.service.impl.pb.service.ClientAMProtocolPB;
@ -128,4 +130,15 @@ public class ClientAMProtocolPBClientImpl
}
return null;
}
@Override
public GetCompInstancesResponseProto getCompInstances(
GetCompInstancesRequestProto request) throws IOException, YarnException {
try {
return proxy.getCompInstances(null, request);
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
}
return null;
}
}

View File

@ -25,6 +25,8 @@ import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequest
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.FlexComponentsResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetStatusResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.RestartServiceRequestProto;
@ -103,4 +105,15 @@ public class ClientAMProtocolPBServiceImpl implements ClientAMProtocolPB {
throw new ServiceException(e);
}
}
@Override
public GetCompInstancesResponseProto getCompInstances(
RpcController controller, GetCompInstancesRequestProto request)
throws ServiceException {
try {
return real.getCompInstances(request);
} catch (IOException | YarnException e) {
throw new ServiceException(e);
}
}
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.utils;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.proto.ClientAMProtocol;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class FilterUtils {
/**
* Returns containers filtered by requested fields.
*
* @param context service context
* @param filterReq filter request
*/
public static List<Container> filterInstances(ServiceContext context,
ClientAMProtocol.GetCompInstancesRequestProto filterReq) {
List<Container> results = new ArrayList<>();
Map<ContainerId, ComponentInstance> instances =
context.scheduler.getLiveInstances();
instances.forEach(((containerId, instance) -> {
boolean include = true;
if (filterReq.getComponentNamesList() != null &&
!filterReq.getComponentNamesList().isEmpty()) {
// filter by component name
if (!filterReq.getComponentNamesList().contains(
instance.getComponent().getName())) {
include = false;
}
}
if (filterReq.getVersion() != null && !filterReq.getVersion().isEmpty()) {
// filter by version
String instanceServiceVersion = instance.getServiceVersion();
if (instanceServiceVersion == null || !instanceServiceVersion.equals(
filterReq.getVersion())) {
include = false;
}
}
if (filterReq.getContainerStatesList() != null &&
!filterReq.getContainerStatesList().isEmpty()) {
// filter by state
if (!filterReq.getContainerStatesList().contains(
instance.getContainerState().toString())) {
include = false;
}
}
if (include) {
results.add(instance.getContainerSpec());
}
}));
return results;
}
}

View File

@ -70,6 +70,15 @@ public class ServiceApiUtil {
public static JsonSerDeser<Service> jsonSerDeser =
new JsonSerDeser<>(Service.class,
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
public static final JsonSerDeser<Container[]> CONTAINER_JSON_SERDE =
new JsonSerDeser<>(Container[].class,
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
public static final JsonSerDeser<Component[]> COMP_JSON_SERDE =
new JsonSerDeser<>(Component[].class,
PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
private static final PatternValidator namePattern
= new PatternValidator("[a-z][a-z0-9-]*");

View File

@ -32,6 +32,8 @@ service ClientAMProtocolService {
returns (RestartServiceResponseProto);
rpc upgrade(CompInstancesUpgradeRequestProto) returns
(CompInstancesUpgradeResponseProto);
rpc getCompInstances(GetCompInstancesRequestProto) returns
(GetCompInstancesResponseProto);
}
message FlexComponentsRequestProto {
@ -82,3 +84,13 @@ message CompInstancesUpgradeRequestProto {
message CompInstancesUpgradeResponseProto {
}
message GetCompInstancesRequestProto {
repeated string componentNames = 1;
optional string version = 2;
repeated string containerStates = 3;
}
message GetCompInstancesResponseProto {
optional string compInstances = 1;
}

View File

@ -0,0 +1,154 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.component.Component;
import org.apache.hadoop.yarn.service.component.ComponentEvent;
import org.apache.hadoop.yarn.service.component.ComponentEventType;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.Map;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* Mocked service context for a running service.
*/
public class MockRunningServiceContext extends ServiceContext {
public MockRunningServiceContext(ServiceTestUtils.ServiceFSWatcher fsWatcher,
Service serviceDef) throws Exception {
super();
this.service = serviceDef;
this.fs = fsWatcher.getFs();
ContainerLaunchService mockLaunchService = mock(
ContainerLaunchService.class);
this.scheduler = new ServiceScheduler(this) {
@Override
protected YarnRegistryViewForProviders
createYarnRegistryOperations(
ServiceContext context, RegistryOperations registryClient) {
return mock(YarnRegistryViewForProviders.class);
}
@Override
public NMClientAsync createNMClient() {
NMClientAsync nmClientAsync = super.createNMClient();
NMClient nmClient = mock(NMClient.class);
try {
when(nmClient.getContainerStatus(anyObject(), anyObject()))
.thenAnswer(
(Answer<ContainerStatus>) invocation -> ContainerStatus
.newInstance((ContainerId) invocation.getArguments()[0],
org.apache.hadoop.yarn.api.records.ContainerState
.RUNNING,
"", 0));
} catch (YarnException | IOException e) {
throw new RuntimeException(e);
}
nmClientAsync.setClient(nmClient);
return nmClientAsync;
}
@Override
public ContainerLaunchService getContainerLaunchService() {
return mockLaunchService;
}
};
this.scheduler.init(fsWatcher.getConf());
ServiceTestUtils.createServiceManager(this);
doNothing().when(mockLaunchService).
reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject());
stabilizeComponents(this);
}
private void stabilizeComponents(ServiceContext context) {
ApplicationId appId = ApplicationId.fromString(context.service.getId());
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
context.attemptId = attemptId;
Map<String, Component>
componentState = context.scheduler.getAllComponents();
int counter = 0;
for (org.apache.hadoop.yarn.service.api.records.Component componentSpec :
context.service.getComponents()) {
Component component = new org.apache.hadoop.yarn.service.component.
Component(componentSpec, 1L, context);
componentState.put(component.getName(), component);
component.handle(new ComponentEvent(component.getName(),
ComponentEventType.FLEX));
for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) {
counter++;
assignNewContainer(attemptId, counter, component);
}
component.handle(new ComponentEvent(component.getName(),
ComponentEventType.CHECK_STABLE));
}
}
public void assignNewContainer(ApplicationAttemptId attemptId,
long containerNum, Component component) {
Container container = org.apache.hadoop.yarn.api.records.Container
.newInstance(ContainerId.newContainerId(attemptId, containerNum),
NODE_ID, "localhost", null, null,
null);
component.handle(new ComponentEvent(component.getName(),
ComponentEventType.CONTAINER_ALLOCATED)
.setContainer(container).setContainerId(container.getId()));
ComponentInstance instance = this.scheduler.getLiveInstances().get(
container.getId());
ComponentInstanceEvent startEvent = new ComponentInstanceEvent(
container.getId(), ComponentInstanceEventType.START);
instance.handle(startEvent);
ComponentInstanceEvent readyEvent = new ComponentInstanceEvent(
container.getId(), ComponentInstanceEventType.BECOME_READY);
instance.handle(readyEvent);
}
private static final NodeId NODE_ID = NodeId.fromString("localhost:0");
}

View File

@ -166,7 +166,7 @@ public class TestServiceCLI {
checkApp(serviceName, "master", 1L, 1000L, "qname");
}
@Test (timeout = 180000)
@Test
public void testInitiateServiceUpgrade() throws Exception {
String[] args = {"app", "-upgrade", "app-1",
"-initiate", ExampleAppJson.resourceName(ExampleAppJson.APP_JSON),
@ -185,7 +185,7 @@ public class TestServiceCLI {
Assert.assertEquals(result, 0);
}
@Test (timeout = 180000)
@Test
public void testUpgradeInstances() throws Exception {
conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE,
DummyServiceClient.class.getName());
@ -197,7 +197,7 @@ public class TestServiceCLI {
Assert.assertEquals(result, 0);
}
@Test (timeout = 180000)
@Test
public void testUpgradeComponents() throws Exception {
conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE,
DummyServiceClient.class.getName());
@ -209,6 +209,18 @@ public class TestServiceCLI {
Assert.assertEquals(result, 0);
}
@Test
public void testGetInstances() throws Exception {
conf.set(YARN_APP_ADMIN_CLIENT_PREFIX + DUMMY_APP_TYPE,
DummyServiceClient.class.getName());
cli.setConf(conf);
String[] args = {"container", "-list", "app-1",
"-components", "comp1,comp2",
"-appTypes", DUMMY_APP_TYPE};
int result = cli.run(ApplicationCLI.preProcessArgs(args));
Assert.assertEquals(result, 0);
}
@Test (timeout = 180000)
public void testEnableFastLaunch() throws Exception {
fs.getFileSystem().create(new Path(basedir.getAbsolutePath(), "test.jar"))
@ -313,5 +325,12 @@ public class TestServiceCLI {
throws IOException, YarnException {
return 0;
}
@Override
public String getInstances(String appName, List<String> components,
String version, List<String> containerStates)
throws IOException, YarnException {
return "";
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.service.client;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -32,8 +33,12 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.ClientAMProtocol;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.CompInstancesUpgradeResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesResponseProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceRequestProto;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.UpgradeServiceResponseProto;
import org.apache.hadoop.yarn.service.MockRunningServiceContext;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.api.records.Component;
import org.apache.hadoop.yarn.service.api.records.Container;
@ -41,6 +46,7 @@ import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.api.records.ServiceState;
import org.apache.hadoop.yarn.service.conf.YarnServiceConf;
import org.apache.hadoop.yarn.service.exceptions.ErrorStrings;
import org.apache.hadoop.yarn.service.utils.FilterUtils;
import org.apache.hadoop.yarn.service.utils.ServiceApiUtil;
import org.junit.Assert;
import org.junit.Rule;
@ -52,6 +58,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -122,6 +129,26 @@ public class TestServiceClient {
client.stop();
}
@Test
public void testGetCompInstances() throws Exception {
Service service = createService();
MockServiceClient client = MockServiceClient.create(rule, service, true);
//upgrade the service
service.setVersion("v2");
client.initiateUpgrade(service);
//add containers to the component that needs to be upgraded.
Component comp = service.getComponents().iterator().next();
ContainerId containerId = ContainerId.newContainerId(client.attemptId, 1L);
comp.addContainer(new Container().id(containerId.toString()));
Container[] containers = client.getContainers(service.getName(),
Lists.newArrayList("compa"), "v1", null);
Assert.assertEquals("num containers", 2, containers.length);
client.stop();
}
private Service createService() throws IOException,
YarnException {
Service service = ServiceTestUtils.createExampleApplication();
@ -137,6 +164,7 @@ public class TestServiceClient {
private final ClientAMProtocol amProxy;
private Object proxyResponse;
private Service service;
private ServiceContext context;
private MockServiceClient() {
amProxy = mock(ClientAMProtocol.class);
@ -147,8 +175,12 @@ public class TestServiceClient {
static MockServiceClient create(ServiceTestUtils.ServiceFSWatcher rule,
Service service, boolean enableUpgrade)
throws IOException, YarnException {
throws Exception {
MockServiceClient client = new MockServiceClient();
ApplicationId applicationId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
service.setId(applicationId.toString());
client.context = new MockRunningServiceContext(rule, service);
YarnClient yarnClient = createMockYarnClient();
ApplicationReport appReport = mock(ApplicationReport.class);
@ -179,6 +211,24 @@ public class TestServiceClient {
client.proxyResponse = response;
return response;
});
when(client.amProxy.getCompInstances(Matchers.any(
GetCompInstancesRequestProto.class))).thenAnswer(
(Answer<GetCompInstancesResponseProto>) invocation -> {
GetCompInstancesRequestProto req = (GetCompInstancesRequestProto)
invocation.getArguments()[0];
List<Container> containers = FilterUtils.filterInstances(
client.context, req);
GetCompInstancesResponseProto response =
GetCompInstancesResponseProto.newBuilder().setCompInstances(
ServiceApiUtil.CONTAINER_JSON_SERDE.toJson(
containers.toArray(new Container[containers.size()])))
.build();
client.proxyResponse = response;
return response;
});
client.setFileSystem(rule.getFs());
client.setYarnClient(yarnClient);
client.service = service;

View File

@ -18,19 +18,10 @@
package org.apache.hadoop.yarn.service.component;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.ServiceScheduler;
import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.TestServiceManager;
import org.apache.hadoop.yarn.service.api.records.ComponentState;
@ -38,23 +29,15 @@ import org.apache.hadoop.yarn.service.api.records.Service;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstance;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEvent;
import org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType;
import org.apache.hadoop.yarn.service.containerlaunch.ContainerLaunchService;
import org.apache.hadoop.yarn.service.registry.YarnRegistryViewForProviders;
import org.apache.hadoop.yarn.service.MockRunningServiceContext;
import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.stubbing.Answer;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import static org.apache.hadoop.yarn.service.component.instance.ComponentInstanceEventType.STOP;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -63,7 +46,6 @@ import static org.mockito.Mockito.when;
*/
public class TestComponent {
private static final int WAIT_MS_PER_LOOP = 1000;
static final Logger LOG = Logger.getLogger(TestComponent.class);
@Rule
@ -115,7 +97,7 @@ public class TestComponent {
@Test
public void testContainerCompletedWhenUpgrading() throws Exception {
String serviceName = "testContainerComplete";
ServiceContext context = createTestContext(rule, serviceName);
MockRunningServiceContext context = createTestContext(rule, serviceName);
Component comp = context.scheduler.getAllComponents().entrySet().iterator()
.next().getValue();
@ -148,7 +130,7 @@ public class TestComponent {
ComponentState.FLEXING, comp.getComponentSpec().getState());
// new container get allocated
assignNewContainer(context.attemptId, 10, context, comp);
context.assignNewContainer(context.attemptId, 10, comp);
// second instance finished upgrading
ComponentInstance instance2 = instanceIter.next();
@ -174,7 +156,7 @@ public class TestComponent {
serviceName);
TestServiceManager.createDef(serviceName, testService);
ServiceContext context = createTestContext(rule, testService);
ServiceContext context = new MockRunningServiceContext(rule, testService);
for (Component comp : context.scheduler.getAllComponents().values()) {
@ -226,114 +208,11 @@ public class TestComponent {
return spec;
}
public static ServiceContext createTestContext(
public static MockRunningServiceContext createTestContext(
ServiceTestUtils.ServiceFSWatcher fsWatcher, String serviceName)
throws Exception {
return createTestContext(fsWatcher,
return new MockRunningServiceContext(fsWatcher,
TestServiceManager.createBaseDef(serviceName));
}
public static ServiceContext createTestContext(
ServiceTestUtils.ServiceFSWatcher fsWatcher, Service serviceDef)
throws Exception {
ServiceContext context = new ServiceContext();
context.service = serviceDef;
context.fs = fsWatcher.getFs();
ContainerLaunchService mockLaunchService = mock(
ContainerLaunchService.class);
context.scheduler = new ServiceScheduler(context) {
@Override protected YarnRegistryViewForProviders
createYarnRegistryOperations(
ServiceContext context, RegistryOperations registryClient) {
return mock(YarnRegistryViewForProviders.class);
}
@Override public NMClientAsync createNMClient() {
NMClientAsync nmClientAsync = super.createNMClient();
NMClient nmClient = mock(NMClient.class);
try {
when(nmClient.getContainerStatus(anyObject(), anyObject()))
.thenAnswer(
(Answer<ContainerStatus>) invocation -> ContainerStatus
.newInstance((ContainerId) invocation.getArguments()[0],
org.apache.hadoop.yarn.api.records.ContainerState
.RUNNING,
"", 0));
} catch (YarnException | IOException e) {
throw new RuntimeException(e);
}
nmClientAsync.setClient(nmClient);
return nmClientAsync;
}
@Override public ContainerLaunchService getContainerLaunchService() {
return mockLaunchService;
}
};
context.scheduler.init(fsWatcher.getConf());
ServiceTestUtils.createServiceManager(context);
doNothing().when(mockLaunchService).
reInitCompInstance(anyObject(), anyObject(), anyObject(), anyObject());
stabilizeComponents(context);
return context;
}
private static void stabilizeComponents(ServiceContext context) {
ApplicationId appId = ApplicationId.fromString(context.service.getId());
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance(appId, 1);
context.attemptId = attemptId;
Map<String, Component>
componentState = context.scheduler.getAllComponents();
int counter = 0;
for (org.apache.hadoop.yarn.service.api.records.Component componentSpec :
context.service.getComponents()) {
Component component = new org.apache.hadoop.yarn.service.component.
Component(componentSpec, 1L, context);
componentState.put(component.getName(), component);
component.handle(new ComponentEvent(component.getName(),
ComponentEventType.FLEX));
for (int i = 0; i < componentSpec.getNumberOfContainers(); i++) {
counter++;
assignNewContainer(attemptId, counter, context, component);
}
component.handle(new ComponentEvent(component.getName(),
ComponentEventType.CHECK_STABLE));
}
}
private static void assignNewContainer(
ApplicationAttemptId attemptId, long containerNum,
ServiceContext context, Component component) {
Container container = org.apache.hadoop.yarn.api.records.Container
.newInstance(ContainerId.newContainerId(attemptId, containerNum),
NODE_ID, "localhost", null, null,
null);
component.handle(new ComponentEvent(component.getName(),
ComponentEventType.CONTAINER_ALLOCATED)
.setContainer(container).setContainerId(container.getId()));
ComponentInstance instance = context.scheduler.getLiveInstances().get(
container.getId());
ComponentInstanceEvent startEvent = new ComponentInstanceEvent(
container.getId(), ComponentInstanceEventType.START);
instance.handle(startEvent);
ComponentInstanceEvent readyEvent = new ComponentInstanceEvent(
container.getId(), ComponentInstanceEventType.BECOME_READY);
instance.handle(readyEvent);
}
private static final NodeId NODE_ID = NodeId.fromString("localhost:0");
}

View File

@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -60,19 +60,20 @@ import static org.mockito.Mockito.when;
*/
public class TestComponentInstance {
@Rule public ServiceTestUtils.ServiceFSWatcher rule =
@Rule
public ServiceTestUtils.ServiceFSWatcher rule =
new ServiceTestUtils.ServiceFSWatcher();
@Test public void testContainerUpgrade() throws Exception {
@Test
public void testContainerUpgrade() throws Exception {
ServiceContext context = TestComponent.createTestContext(rule,
"testContainerUpgrade");
Component component =
context.scheduler.getAllComponents().entrySet().iterator().next()
.getValue();
Component component = context.scheduler.getAllComponents().entrySet()
.iterator().next().getValue();
upgradeComponent(component);
ComponentInstance instance =
component.getAllComponentInstances().iterator().next();
ComponentInstance instance = component.getAllComponentInstances().iterator()
.next();
ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent(
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
instance.handle(instanceEvent);
@ -82,16 +83,16 @@ public class TestComponentInstance {
containerSpec.getState());
}
@Test public void testContainerReadyAfterUpgrade() throws Exception {
@Test
public void testContainerReadyAfterUpgrade() throws Exception {
ServiceContext context = TestComponent.createTestContext(rule,
"testContainerStarted");
Component component =
context.scheduler.getAllComponents().entrySet().iterator().next()
.getValue();
Component component = context.scheduler.getAllComponents().entrySet()
.iterator().next().getValue();
upgradeComponent(component);
ComponentInstance instance =
component.getAllComponentInstances().iterator().next();
ComponentInstance instance = component.getAllComponentInstances().iterator()
.next();
ComponentInstanceEvent instanceEvent = new ComponentInstanceEvent(
instance.getContainer().getId(), ComponentInstanceEventType.UPGRADE);
@ -100,9 +101,8 @@ public class TestComponentInstance {
instance.handle(new ComponentInstanceEvent(instance.getContainer().getId(),
ComponentInstanceEventType.BECOME_READY));
Assert.assertEquals("instance not ready", ContainerState.READY,
instance.getCompSpec()
.getContainer(instance.getContainer().getId().toString())
.getState());
instance.getCompSpec().getContainer(
instance.getContainer().getId().toString()).getState());
}
private void upgradeComponent(Component component) {
@ -113,9 +113,8 @@ public class TestComponentInstance {
private Component createComponent(ServiceScheduler scheduler,
org.apache.hadoop.yarn.service.api.records.Component.RestartPolicyEnum
restartPolicy,
int nSucceededInstances, int nFailedInstances, int totalAsk,
int componentId) {
restartPolicy, int nSucceededInstances, int nFailedInstances,
int totalAsk, int componentId) {
assert (nSucceededInstances + nFailedInstances) <= totalAsk;
@ -214,7 +213,8 @@ public class TestComponentInstance {
return componentInstance;
}
@Test public void testComponentRestartPolicy() {
@Test
public void testComponentRestartPolicy() {
Map<String, Component> allComponents = new HashMap<>();
Service mockService = mock(Service.class);

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.service.utils;
import com.google.common.collect.Lists;
import org.apache.hadoop.yarn.proto.ClientAMProtocol.GetCompInstancesRequestProto;
import org.apache.hadoop.yarn.service.ServiceContext;
import org.apache.hadoop.yarn.service.ServiceTestUtils;
import org.apache.hadoop.yarn.service.TestServiceManager;
import org.apache.hadoop.yarn.service.api.records.Container;
import org.apache.hadoop.yarn.service.MockRunningServiceContext;
import org.apache.hadoop.yarn.service.api.records.ContainerState;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import java.util.List;
public class TestFilterUtils {
@Rule
public ServiceTestUtils.ServiceFSWatcher rule =
new ServiceTestUtils.ServiceFSWatcher();
@Test
public void testNoFilter() throws Exception {
GetCompInstancesRequestProto req = GetCompInstancesRequestProto.newBuilder()
.build();
List<Container> containers = FilterUtils.filterInstances(
new MockRunningServiceContext(rule,
TestServiceManager.createBaseDef("service")), req);
Assert.assertEquals("num containers", 4, containers.size());
}
@Test
public void testFilterWithComp() throws Exception {
GetCompInstancesRequestProto req = GetCompInstancesRequestProto.newBuilder()
.addAllComponentNames(Lists.newArrayList("compa")).build();
List<Container> containers = FilterUtils.filterInstances(
new MockRunningServiceContext(rule,
TestServiceManager.createBaseDef("service")), req);
Assert.assertEquals("num containers", 2, containers.size());
}
@Test
public void testFilterWithVersion() throws Exception {
ServiceContext sc = new MockRunningServiceContext(rule,
TestServiceManager.createBaseDef("service"));
GetCompInstancesRequestProto.Builder reqBuilder =
GetCompInstancesRequestProto.newBuilder();
reqBuilder.setVersion("v2");
Assert.assertEquals("num containers", 0,
FilterUtils.filterInstances(sc, reqBuilder.build()).size());
reqBuilder.addAllComponentNames(Lists.newArrayList("compa"))
.setVersion("v1").build();
Assert.assertEquals("num containers", 2,
FilterUtils.filterInstances(sc, reqBuilder.build()).size());
reqBuilder.setVersion("v2").build();
Assert.assertEquals("num containers", 0,
FilterUtils.filterInstances(sc, reqBuilder.build()).size());
}
@Test
public void testFilterWithState() throws Exception {
ServiceContext sc = new MockRunningServiceContext(rule,
TestServiceManager.createBaseDef("service"));
GetCompInstancesRequestProto.Builder reqBuilder =
GetCompInstancesRequestProto.newBuilder();
reqBuilder.addAllContainerStates(Lists.newArrayList(
ContainerState.READY.toString()));
Assert.assertEquals("num containers", 4,
FilterUtils.filterInstances(sc, reqBuilder.build()).size());
reqBuilder.clearContainerStates();
reqBuilder.addAllContainerStates(Lists.newArrayList(
ContainerState.STOPPED.toString()));
Assert.assertEquals("num containers", 0,
FilterUtils.filterInstances(sc, reqBuilder.build()).size());
}
}

View File

@ -105,6 +105,8 @@ public class ApplicationCLI extends YarnCLI {
public static final String UPGRADE_FINALIZE = "finalize";
public static final String COMPONENT_INSTS = "instances";
public static final String COMPONENTS = "components";
public static final String VERSION = "version";
public static final String STATES = "states";
private static String firstArg = null;
@ -294,10 +296,39 @@ public class ApplicationCLI extends YarnCLI {
opts.addOption(STATUS_CMD, true,
"Prints the status of the container.");
opts.addOption(LIST_CMD, true,
"List containers for application attempt.");
"List containers for application attempt when application " +
"attempt ID is provided. When application name is provided, " +
"then it finds the instances of the application based on app's " +
"own implementation, and -appTypes option must be specified " +
"unless it is the default yarn-service type. With app name, it " +
"supports optional use of -version to filter instances based on " +
"app version, -components to filter instances based on component " +
"names, -states to filter instances based on instance state.");
opts.addOption(HELP_CMD, false, "Displays help for all commands.");
opts.getOption(STATUS_CMD).setArgName("Container ID");
opts.getOption(LIST_CMD).setArgName("Application Attempt ID");
opts.getOption(LIST_CMD).setArgName("Application Name or Attempt ID");
opts.addOption(APP_TYPE_CMD, true, "Works with -list to " +
"specify the app type when application name is provided.");
opts.getOption(APP_TYPE_CMD).setValueSeparator(',');
opts.getOption(APP_TYPE_CMD).setArgs(Option.UNLIMITED_VALUES);
opts.getOption(APP_TYPE_CMD).setArgName("Types");
opts.addOption(VERSION, true, "Works with -list "
+ "to filter instances based on input application version.");
opts.getOption(VERSION).setArgs(1);
opts.addOption(COMPONENTS, true, "Works with -list to " +
"filter instances based on input comma-separated list of " +
"component names.");
opts.getOption(COMPONENTS).setValueSeparator(',');
opts.getOption(COMPONENTS).setArgs(Option.UNLIMITED_VALUES);
opts.addOption(STATES, true, "Works with -list to " +
"filter instances based on input comma-separated list of " +
"instance states.");
opts.getOption(STATES).setValueSeparator(',');
opts.getOption(STATES).setArgs(Option.UNLIMITED_VALUES);
opts.addOption(SIGNAL_CMD, true,
"Signal the container. The available signal commands are " +
java.util.Arrays.asList(SignalContainerCommand.values()) +
@ -426,11 +457,40 @@ public class ApplicationCLI extends YarnCLI {
}
listApplicationAttempts(cliParser.getOptionValue(LIST_CMD));
} else if (title.equalsIgnoreCase(CONTAINER)) {
if (hasAnyOtherCLIOptions(cliParser, opts, LIST_CMD)) {
if (hasAnyOtherCLIOptions(cliParser, opts, LIST_CMD, APP_TYPE_CMD,
VERSION, COMPONENTS, STATES)) {
printUsage(title, opts);
return exitCode;
}
listContainers(cliParser.getOptionValue(LIST_CMD));
String appAttemptIdOrName = cliParser.getOptionValue(LIST_CMD);
try {
// try parsing attempt id, if it succeeds, it means it's appId
ApplicationAttemptId.fromString(appAttemptIdOrName);
listContainers(appAttemptIdOrName);
} catch (IllegalArgumentException e) {
// not appAttemptId format, it could be appName. If app-type is not
// provided, assume it is yarn-service type.
AppAdminClient client = AppAdminClient
.createAppAdminClient(getSingleAppTypeFromCLI(cliParser),
getConf());
String version = cliParser.getOptionValue(VERSION);
String[] components = cliParser.getOptionValues(COMPONENTS);
String[] instanceStates = cliParser.getOptionValues(STATES);
try {
sysout.println(client.getInstances(appAttemptIdOrName,
components == null ? null : Arrays.asList(components),
version, instanceStates == null ? null :
Arrays.asList(instanceStates)));
return 0;
} catch (ApplicationNotFoundException exception) {
System.err.println("Application with name '" + appAttemptIdOrName
+ "' doesn't exist in RM or Timeline Server.");
return -1;
} catch (Exception ex) {
System.err.println(ex.getMessage());
return -1;
}
}
}
} else if (cliParser.hasOption(KILL_CMD)) {
if (hasAnyOtherCLIOptions(cliParser, opts, KILL_CMD)) {

View File

@ -2281,13 +2281,17 @@ public class TestYarnCLI {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter pw = new PrintWriter(baos);
pw.println("usage: container");
pw.println(" -appTypes <Types> Works with -list to specify the app type when application name is provided.");
pw.println(" -components <arg> Works with -list to filter instances based on input comma-separated list of component names.");
pw.println(" -help Displays help for all commands.");
pw.println(" -list <Application Attempt ID> List containers for application attempt.");
pw.println(" -list <Application Name or Attempt ID> List containers for application attempt when application attempt ID is provided. When application name is provided, then it finds the instances of the application based on app's own implementation, and -appTypes option must be specified unless it is the default yarn-service type. With app name, it supports optional use of -version to filter instances based on app version, -components to filter instances based on component names, -states to filter instances based on instance state.");
pw.println(" -signal <container ID [signal command]> Signal the container.");
pw.println("The available signal commands are ");
pw.println(java.util.Arrays.asList(SignalContainerCommand.values()));
pw.println(" Default command is OUTPUT_THREAD_DUMP.");
pw.println(" -states <arg> Works with -list to filter instances based on input comma-separated list of instance states.");
pw.println(" -status <Container ID> Prints the status of the container.");
pw.println(" -version <arg> Works with -list to filter instances based on input application version. ");
pw.close();
try {
return normalize(baos.toString("UTF-8"));

View File

@ -282,4 +282,10 @@ public abstract class AppAdminClient extends CompositeService {
public abstract int actionCleanUp(String appName, String userName) throws
IOException, YarnException;
@Public
@Unstable
public abstract String getInstances(String appName,
List<String> components, String version, List<String> containerStates)
throws IOException, YarnException;
}