YARN-1387. RMWebServices should use ClientRMService for filtering applications (Karthik Kambatla via Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1540851 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5a5ba62a85
commit
72c6d6255a
|
@ -91,6 +91,9 @@ Release 2.3.0 - UNRELEASED
|
|||
YARN-1121. Changed ResourceManager's state-store to drain all events on
|
||||
shut-down. (Jian He via vinodkv)
|
||||
|
||||
YARN-1387. RMWebServices should use ClientRMService for filtering
|
||||
applications (Karthik Kambatla via Sandy Ryza)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.api.protocolrecords;
|
|||
import java.util.EnumSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.collections.buffer.UnboundedFifoBuffer;
|
||||
import org.apache.commons.lang.math.LongRange;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
|
@ -150,4 +152,109 @@ public abstract class GetApplicationsRequest {
|
|||
@Unstable
|
||||
public abstract void
|
||||
setApplicationStates(EnumSet<YarnApplicationState> applicationStates);
|
||||
|
||||
/**
|
||||
* Set the application states to filter applications on
|
||||
*
|
||||
* @param applicationStates all lower-case string representation of the
|
||||
* application states to filter on
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setApplicationStates(Set<String> applicationStates);
|
||||
|
||||
/**
|
||||
* Get the users to filter applications on
|
||||
*
|
||||
* @return set of users to filter applications on
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract Set<String> getUsers();
|
||||
|
||||
/**
|
||||
* Set the users to filter applications on
|
||||
*
|
||||
* @param users set of users to filter applications on
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setUsers(Set<String> users);
|
||||
|
||||
/**
|
||||
* Get the queues to filter applications on
|
||||
*
|
||||
* @return set of queues to filter applications on
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract Set<String> getQueues();
|
||||
|
||||
/**
|
||||
* Set the queue to filter applications on
|
||||
*
|
||||
* @param queue user to filter applications on
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setQueues(Set<String> queue);
|
||||
|
||||
/**
|
||||
* Get the limit on the number applications to return
|
||||
*
|
||||
* @return number of applications to limit to
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract long getLimit();
|
||||
|
||||
/**
|
||||
* Limit the number applications to return
|
||||
*
|
||||
* @param limit number of applications to limit to
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setLimit(long limit);
|
||||
|
||||
/**
|
||||
* Get the range of start times to filter applications on
|
||||
*
|
||||
* @return {@link LongRange} of start times to filter applications on
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract LongRange getStartRange();
|
||||
|
||||
/**
|
||||
* Set the range of start times to filter applications on
|
||||
*
|
||||
* @param begin beginning of the range
|
||||
* @param end end of the range
|
||||
* @throws IllegalArgumentException
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setStartRange(long begin, long end)
|
||||
throws IllegalArgumentException;
|
||||
|
||||
/**
|
||||
* Get the range of finish times to filter applications on
|
||||
*
|
||||
* @return {@link LongRange} of finish times to filter applications on
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract LongRange getFinishRange();
|
||||
|
||||
/**
|
||||
* Set the range of finish times to filter applications on
|
||||
*
|
||||
* @param begin beginning of the range
|
||||
* @param end end of the range
|
||||
* @throws IllegalArgumentException
|
||||
*/
|
||||
@Private
|
||||
@Unstable
|
||||
public abstract void setFinishRange(long begin, long end);
|
||||
}
|
||||
|
|
|
@ -125,6 +125,13 @@ message GetClusterMetricsResponseProto {
|
|||
message GetApplicationsRequestProto {
|
||||
repeated string application_types = 1;
|
||||
repeated YarnApplicationStateProto application_states = 2;
|
||||
repeated string users = 3;
|
||||
repeated string queues = 4;
|
||||
optional int64 limit = 5;
|
||||
optional int64 start_begin = 6;
|
||||
optional int64 start_end = 7;
|
||||
optional int64 finish_begin = 8;
|
||||
optional int64 finish_end = 9;
|
||||
}
|
||||
|
||||
message GetApplicationsResponseProto {
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.Iterator;
|
|||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.lang.math.LongRange;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
|
@ -44,6 +45,10 @@ public class GetApplicationsRequestPBImpl extends GetApplicationsRequest {
|
|||
|
||||
Set<String> applicationTypes = null;
|
||||
EnumSet<YarnApplicationState> applicationStates = null;
|
||||
Set<String> users = null;
|
||||
Set<String> queues = null;
|
||||
long limit = Long.MAX_VALUE;
|
||||
LongRange start = null, finish = null;
|
||||
|
||||
public GetApplicationsRequestPBImpl() {
|
||||
builder = GetApplicationsRequestProto.newBuilder();
|
||||
|
@ -148,6 +153,26 @@ public class GetApplicationsRequestPBImpl extends GetApplicationsRequest {
|
|||
}
|
||||
}
|
||||
|
||||
private void initUsers() {
|
||||
if (this.users != null) {
|
||||
return;
|
||||
}
|
||||
GetApplicationsRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
List<String> usersList = p.getUsersList();
|
||||
this.users = new HashSet<String>();
|
||||
this.users.addAll(usersList);
|
||||
}
|
||||
|
||||
private void initQueues() {
|
||||
if (this.queues != null) {
|
||||
return;
|
||||
}
|
||||
GetApplicationsRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
List<String> queuesList = p.getQueuesList();
|
||||
this.queues = new HashSet<String>();
|
||||
this.queues.addAll(queuesList);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getApplicationTypes() {
|
||||
initApplicationTypes();
|
||||
|
@ -177,6 +202,111 @@ public class GetApplicationsRequestPBImpl extends GetApplicationsRequest {
|
|||
this.applicationStates = applicationStates;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setApplicationStates(Set<String> applicationStates) {
|
||||
EnumSet<YarnApplicationState> appStates = null;
|
||||
for (YarnApplicationState state : YarnApplicationState.values()) {
|
||||
if (applicationStates.contains(state.name().toLowerCase())) {
|
||||
if (appStates == null) {
|
||||
appStates = EnumSet.of(state);
|
||||
} else {
|
||||
appStates.add(state);
|
||||
}
|
||||
}
|
||||
}
|
||||
setApplicationStates(appStates);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getUsers() {
|
||||
initUsers();
|
||||
return this.users;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setUsers(Set<String> users) {
|
||||
maybeInitBuilder();
|
||||
if (users == null) {
|
||||
builder.clearUsers();
|
||||
}
|
||||
this.users = users;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getQueues() {
|
||||
initQueues();
|
||||
return this.queues;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setQueues(Set<String> queues) {
|
||||
maybeInitBuilder();
|
||||
if (queues == null) {
|
||||
builder.clearQueues();
|
||||
}
|
||||
this.queues = queues;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLimit() {
|
||||
if (this.limit == Long.MAX_VALUE) {
|
||||
GetApplicationsRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||
this.limit = p.hasLimit() ? p.getLimit() : Long.MAX_VALUE;
|
||||
}
|
||||
return this.limit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLimit(long limit) {
|
||||
maybeInitBuilder();
|
||||
this.limit = limit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongRange getStartRange() {
|
||||
if (this.start == null) {
|
||||
GetApplicationsRequestProtoOrBuilder p = viaProto ? proto: builder;
|
||||
if (p.hasStartBegin() || p.hasFinishBegin()) {
|
||||
long begin = p.hasStartBegin() ? p.getStartBegin() : 0L;
|
||||
long end = p.hasStartEnd() ? p.getStartEnd() : Long.MAX_VALUE;
|
||||
this.start = new LongRange(begin, end);
|
||||
}
|
||||
}
|
||||
return this.start;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setStartRange(long begin, long end)
|
||||
throws IllegalArgumentException {
|
||||
if (begin > end) {
|
||||
throw new IllegalArgumentException("begin > end in range (begin, " +
|
||||
"end): (" + begin + ", " + end + ")");
|
||||
}
|
||||
this.start = new LongRange(begin, end);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongRange getFinishRange() {
|
||||
if (this.finish == null) {
|
||||
GetApplicationsRequestProtoOrBuilder p = viaProto ? proto: builder;
|
||||
if (p.hasFinishBegin() || p.hasFinishEnd()) {
|
||||
long begin = p.hasFinishBegin() ? p.getFinishBegin() : 0L;
|
||||
long end = p.hasFinishEnd() ? p.getFinishEnd() : Long.MAX_VALUE;
|
||||
this.finish = new LongRange(begin, end);
|
||||
}
|
||||
}
|
||||
return this.finish;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setFinishRange(long begin, long end) {
|
||||
if (begin > end) {
|
||||
throw new IllegalArgumentException("begin > end in range (begin, " +
|
||||
"end): (" + begin + ", " + end + ")");
|
||||
}
|
||||
this.finish = new LongRange(begin, end);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return getProto().hashCode();
|
||||
|
|
|
@ -28,6 +28,7 @@ import java.util.List;
|
|||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.lang.math.LongRange;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
|
@ -401,6 +402,18 @@ public class ClientRMService extends AbstractService implements
|
|||
@Override
|
||||
public GetApplicationsResponse getApplications(
|
||||
GetApplicationsRequest request) throws YarnException {
|
||||
return getApplications(request, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get applications matching the {@link GetApplicationsRequest}. If
|
||||
* caseSensitive is set to false, applicationTypes in
|
||||
* GetApplicationRequest are expected to be in all-lowercase
|
||||
*/
|
||||
@Private
|
||||
public GetApplicationsResponse getApplications(
|
||||
GetApplicationsRequest request, boolean caseSensitive)
|
||||
throws YarnException {
|
||||
UserGroupInformation callerUGI;
|
||||
try {
|
||||
callerUGI = UserGroupInformation.getCurrentUser();
|
||||
|
@ -412,11 +425,23 @@ public class ClientRMService extends AbstractService implements
|
|||
Set<String> applicationTypes = request.getApplicationTypes();
|
||||
EnumSet<YarnApplicationState> applicationStates =
|
||||
request.getApplicationStates();
|
||||
Set<String> users = request.getUsers();
|
||||
Set<String> queues = request.getQueues();
|
||||
long limit = request.getLimit();
|
||||
LongRange start = request.getStartRange();
|
||||
LongRange finish = request.getFinishRange();
|
||||
|
||||
List<ApplicationReport> reports = new ArrayList<ApplicationReport>();
|
||||
long count = 0;
|
||||
for (RMApp application : this.rmContext.getRMApps().values()) {
|
||||
if (++count > limit) {
|
||||
break;
|
||||
}
|
||||
if (applicationTypes != null && !applicationTypes.isEmpty()) {
|
||||
if (!applicationTypes.contains(application.getApplicationType())) {
|
||||
String appTypeToMatch = caseSensitive
|
||||
? application.getApplicationType()
|
||||
: application.getApplicationType().toLowerCase();
|
||||
if (!applicationTypes.contains(appTypeToMatch)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
@ -427,6 +452,25 @@ public class ClientRMService extends AbstractService implements
|
|||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (users != null && !users.isEmpty() &&
|
||||
!users.contains(application.getUser())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (queues != null && !queues.isEmpty() &&
|
||||
!queues.contains(application.getQueue())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (start != null && !start.containsLong(application.getStartTime())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (finish != null && !finish.containsLong(application.getFinishTime())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
|
||||
ApplicationAccessType.VIEW_APP, application);
|
||||
reports.add(application.createAndGetApplicationReport(
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.Collection;
|
|||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
@ -38,14 +39,20 @@ import javax.ws.rs.QueryParam;
|
|||
import javax.ws.rs.core.Context;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
||||
|
@ -85,6 +92,8 @@ import com.google.inject.Singleton;
|
|||
@Singleton
|
||||
@Path("/ws/v1/cluster")
|
||||
public class RMWebServices {
|
||||
private static final Log LOG =
|
||||
LogFactory.getLog(RMWebServices.class.getName());
|
||||
private static final String EMPTY = "";
|
||||
private static final String ANY = "*";
|
||||
private final ResourceManager rm;
|
||||
|
@ -253,7 +262,6 @@ public class RMWebServices {
|
|||
@QueryParam("finishedTimeBegin") String finishBegin,
|
||||
@QueryParam("finishedTimeEnd") String finishEnd,
|
||||
@QueryParam("applicationTypes") Set<String> applicationTypes) {
|
||||
long num = 0;
|
||||
boolean checkCount = false;
|
||||
boolean checkStart = false;
|
||||
boolean checkEnd = false;
|
||||
|
@ -328,19 +336,66 @@ public class RMWebServices {
|
|||
checkAppStates = true;
|
||||
}
|
||||
|
||||
final ConcurrentMap<ApplicationId, RMApp> apps = rm.getRMContext()
|
||||
.getRMApps();
|
||||
GetApplicationsRequest request = GetApplicationsRequest.newInstance();
|
||||
|
||||
if (checkStart) {
|
||||
request.setStartRange(sBegin, sEnd);
|
||||
}
|
||||
|
||||
if (checkEnd) {
|
||||
request.setFinishRange(fBegin, fEnd);
|
||||
}
|
||||
|
||||
if (checkCount) {
|
||||
request.setLimit(countNum);
|
||||
}
|
||||
|
||||
if (checkAppTypes) {
|
||||
request.setApplicationTypes(appTypes);
|
||||
}
|
||||
|
||||
if (checkAppStates) {
|
||||
request.setApplicationStates(appStates);
|
||||
}
|
||||
|
||||
if (queueQuery != null && !queueQuery.isEmpty()) {
|
||||
ResourceScheduler rs = rm.getResourceScheduler();
|
||||
if (rs instanceof CapacityScheduler) {
|
||||
CapacityScheduler cs = (CapacityScheduler) rs;
|
||||
// validate queue exists
|
||||
try {
|
||||
cs.getQueueInfo(queueQuery, false, false);
|
||||
} catch (IOException e) {
|
||||
throw new BadRequestException(e.getMessage());
|
||||
}
|
||||
}
|
||||
Set<String> queues = new HashSet<String>(1);
|
||||
queues.add(queueQuery);
|
||||
request.setQueues(queues);
|
||||
}
|
||||
|
||||
if (userQuery != null && !userQuery.isEmpty()) {
|
||||
Set<String> users = new HashSet<String>(1);
|
||||
users.add(userQuery);
|
||||
request.setUsers(users);
|
||||
}
|
||||
|
||||
List<ApplicationReport> appReports = null;
|
||||
try {
|
||||
appReports = rm.getClientRMService()
|
||||
.getApplications(request, false).getApplicationList();
|
||||
} catch (YarnException e) {
|
||||
LOG.error("Unable to retrieve apps from ClientRMService", e);
|
||||
throw new YarnRuntimeException(
|
||||
"Unable to retrieve apps from ClientRMService", e);
|
||||
}
|
||||
|
||||
final ConcurrentMap<ApplicationId, RMApp> apps =
|
||||
rm.getRMContext().getRMApps();
|
||||
AppsInfo allApps = new AppsInfo();
|
||||
for (RMApp rmapp : apps.values()) {
|
||||
for (ApplicationReport report : appReports) {
|
||||
RMApp rmapp = apps.get(report.getApplicationId());
|
||||
|
||||
if (checkCount && num == countNum) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (checkAppStates && !appStates.contains(
|
||||
rmapp.createApplicationState().toString().toLowerCase())) {
|
||||
continue;
|
||||
}
|
||||
if (finalStatusQuery != null && !finalStatusQuery.isEmpty()) {
|
||||
FinalApplicationStatus.valueOf(finalStatusQuery);
|
||||
if (!rmapp.getFinalApplicationStatus().toString()
|
||||
|
@ -348,43 +403,9 @@ public class RMWebServices {
|
|||
continue;
|
||||
}
|
||||
}
|
||||
if (userQuery != null && !userQuery.isEmpty()) {
|
||||
if (!rmapp.getUser().equals(userQuery)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (queueQuery != null && !queueQuery.isEmpty()) {
|
||||
ResourceScheduler rs = rm.getResourceScheduler();
|
||||
if (rs instanceof CapacityScheduler) {
|
||||
CapacityScheduler cs = (CapacityScheduler) rs;
|
||||
// validate queue exists
|
||||
try {
|
||||
cs.getQueueInfo(queueQuery, false, false);
|
||||
} catch (IOException e) {
|
||||
throw new BadRequestException(e.getMessage());
|
||||
}
|
||||
}
|
||||
if (!rmapp.getQueue().equals(queueQuery)) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if (checkAppTypes && !appTypes.contains(
|
||||
rmapp.getApplicationType().trim().toLowerCase())) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (checkStart
|
||||
&& (rmapp.getStartTime() < sBegin || rmapp.getStartTime() > sEnd)) {
|
||||
continue;
|
||||
}
|
||||
if (checkEnd
|
||||
&& (rmapp.getFinishTime() < fBegin || rmapp.getFinishTime() > fEnd)) {
|
||||
continue;
|
||||
}
|
||||
AppInfo app = new AppInfo(rmapp, hasAccess(rmapp, hsr));
|
||||
|
||||
allApps.add(app);
|
||||
num++;
|
||||
}
|
||||
return allApps;
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.mockito.Matchers.anyBoolean;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Matchers.any;
|
||||
|
@ -50,7 +51,6 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
|||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
|
||||
|
@ -410,6 +410,89 @@ public class TestClientRMService {
|
|||
.get(0).getApplicationId());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetApplications() throws IOException, YarnException {
|
||||
/**
|
||||
* 1. Submit 3 applications alternately in two queues
|
||||
* 2. Test each of the filters
|
||||
*/
|
||||
// Basic setup
|
||||
YarnScheduler yarnScheduler = mockYarnScheduler();
|
||||
RMContext rmContext = mock(RMContext.class);
|
||||
mockRMContext(yarnScheduler, rmContext);
|
||||
RMStateStore stateStore = mock(RMStateStore.class);
|
||||
when(rmContext.getStateStore()).thenReturn(stateStore);
|
||||
RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
|
||||
null, mock(ApplicationACLsManager.class), new Configuration());
|
||||
when(rmContext.getDispatcher().getEventHandler()).thenReturn(
|
||||
new EventHandler<Event>() {
|
||||
public void handle(Event event) {}
|
||||
});
|
||||
|
||||
ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class);
|
||||
QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
|
||||
when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
|
||||
any(QueueACL.class), anyString())).thenReturn(true);
|
||||
ClientRMService rmService =
|
||||
new ClientRMService(rmContext, yarnScheduler, appManager,
|
||||
mockAclsManager, mockQueueACLsManager, null);
|
||||
|
||||
// Initialize appnames and queues
|
||||
String[] queues = {"Q-1", "Q-2"};
|
||||
String[] appNames =
|
||||
{MockApps.newAppName(), MockApps.newAppName(), MockApps.newAppName()};
|
||||
ApplicationId[] appIds =
|
||||
{getApplicationId(101), getApplicationId(102), getApplicationId(103)};
|
||||
|
||||
// Submit applications
|
||||
for (int i = 0; i < appIds.length; i++) {
|
||||
ApplicationId appId = appIds[i];
|
||||
when(mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(),
|
||||
ApplicationAccessType.VIEW_APP, null, appId)).thenReturn(true);
|
||||
SubmitApplicationRequest submitRequest = mockSubmitAppRequest(
|
||||
appId, appNames[i], queues[i % queues.length]);
|
||||
rmService.submitApplication(submitRequest);
|
||||
}
|
||||
|
||||
// Test different cases of ClientRMService#getApplications()
|
||||
GetApplicationsRequest request = GetApplicationsRequest.newInstance();
|
||||
assertEquals("Incorrect total number of apps", 6,
|
||||
rmService.getApplications(request).getApplicationList().size());
|
||||
|
||||
// Check limit
|
||||
request.setLimit(1L);
|
||||
assertEquals("Failed to limit applications", 1,
|
||||
rmService.getApplications(request).getApplicationList().size());
|
||||
|
||||
// Check queue
|
||||
request = GetApplicationsRequest.newInstance();
|
||||
Set<String> queueSet = new HashSet<String>();
|
||||
request.setQueues(queueSet);
|
||||
|
||||
queueSet.add(queues[0]);
|
||||
assertEquals("Incorrect number of applications in queue", 2,
|
||||
rmService.getApplications(request).getApplicationList().size());
|
||||
assertEquals("Incorrect number of applications in queue", 2,
|
||||
rmService.getApplications(request, false).getApplicationList().size());
|
||||
|
||||
queueSet.add(queues[1]);
|
||||
assertEquals("Incorrect number of applications in queue", 3,
|
||||
rmService.getApplications(request).getApplicationList().size());
|
||||
|
||||
// Check user
|
||||
request = GetApplicationsRequest.newInstance();
|
||||
Set<String> userSet = new HashSet<String>();
|
||||
request.setUsers(userSet);
|
||||
|
||||
userSet.add("random-user-name");
|
||||
assertEquals("Incorrect number of applications for user", 0,
|
||||
rmService.getApplications(request).getApplicationList().size());
|
||||
|
||||
userSet.add(UserGroupInformation.getCurrentUser().getShortUserName());
|
||||
assertEquals("Incorrect number of applications for user", 3,
|
||||
rmService.getApplications(request).getApplicationList().size());
|
||||
}
|
||||
|
||||
@Test(timeout=4000)
|
||||
public void testConcurrentAppSubmit()
|
||||
throws IOException, InterruptedException, BrokenBarrierException,
|
||||
|
@ -492,10 +575,10 @@ public class TestClientRMService {
|
|||
submissionContext.setResource(resource);
|
||||
submissionContext.setApplicationType(appType);
|
||||
|
||||
SubmitApplicationRequest submitRequest =
|
||||
recordFactory.newRecordInstance(SubmitApplicationRequest.class);
|
||||
submitRequest.setApplicationSubmissionContext(submissionContext);
|
||||
return submitRequest;
|
||||
SubmitApplicationRequest submitRequest =
|
||||
recordFactory.newRecordInstance(SubmitApplicationRequest.class);
|
||||
submitRequest.setApplicationSubmissionContext(submissionContext);
|
||||
return submitRequest;
|
||||
}
|
||||
|
||||
private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext)
|
||||
|
|
Loading…
Reference in New Issue