YARN-1387. RMWebServices should use ClientRMService for filtering applications (Karthik Kambatla via Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1540856 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2013-11-11 21:52:07 +00:00
parent 01808fa1b5
commit fd1dd57fc5
7 changed files with 447 additions and 52 deletions

View File

@ -73,6 +73,9 @@ Release 2.3.0 - UNRELEASED
YARN-1121. Changed ResourceManager's state-store to drain all events on
shut-down. (Jian He via vinodkv)
YARN-1387. RMWebServices should use ClientRMService for filtering
applications (Karthik Kambatla via Sandy Ryza)
OPTIMIZATIONS
BUG FIXES

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.api.protocolrecords;
import java.util.EnumSet;
import java.util.Set;
import org.apache.commons.collections.buffer.UnboundedFifoBuffer;
import org.apache.commons.lang.math.LongRange;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
@ -150,4 +152,109 @@ public abstract class GetApplicationsRequest {
@Unstable
public abstract void
setApplicationStates(EnumSet<YarnApplicationState> applicationStates);
/**
* Set the application states to filter applications on
*
* @param applicationStates all lower-case string representation of the
* application states to filter on
*/
@Private
@Unstable
public abstract void setApplicationStates(Set<String> applicationStates);
/**
* Get the users to filter applications on
*
* @return set of users to filter applications on
*/
@Private
@Unstable
public abstract Set<String> getUsers();
/**
* Set the users to filter applications on
*
* @param users set of users to filter applications on
*/
@Private
@Unstable
public abstract void setUsers(Set<String> users);
/**
* Get the queues to filter applications on
*
* @return set of queues to filter applications on
*/
@Private
@Unstable
public abstract Set<String> getQueues();
/**
* Set the queue to filter applications on
*
* @param queue user to filter applications on
*/
@Private
@Unstable
public abstract void setQueues(Set<String> queue);
/**
* Get the limit on the number applications to return
*
* @return number of applications to limit to
*/
@Private
@Unstable
public abstract long getLimit();
/**
* Limit the number applications to return
*
* @param limit number of applications to limit to
*/
@Private
@Unstable
public abstract void setLimit(long limit);
/**
* Get the range of start times to filter applications on
*
* @return {@link LongRange} of start times to filter applications on
*/
@Private
@Unstable
public abstract LongRange getStartRange();
/**
* Set the range of start times to filter applications on
*
* @param begin beginning of the range
* @param end end of the range
* @throws IllegalArgumentException
*/
@Private
@Unstable
public abstract void setStartRange(long begin, long end)
throws IllegalArgumentException;
/**
* Get the range of finish times to filter applications on
*
* @return {@link LongRange} of finish times to filter applications on
*/
@Private
@Unstable
public abstract LongRange getFinishRange();
/**
* Set the range of finish times to filter applications on
*
* @param begin beginning of the range
* @param end end of the range
* @throws IllegalArgumentException
*/
@Private
@Unstable
public abstract void setFinishRange(long begin, long end);
}

View File

@ -125,6 +125,13 @@ message GetClusterMetricsResponseProto {
message GetApplicationsRequestProto {
repeated string application_types = 1;
repeated YarnApplicationStateProto application_states = 2;
repeated string users = 3;
repeated string queues = 4;
optional int64 limit = 5;
optional int64 start_begin = 6;
optional int64 start_end = 7;
optional int64 finish_begin = 8;
optional int64 finish_end = 9;
}
message GetApplicationsResponseProto {

View File

@ -24,6 +24,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.commons.lang.math.LongRange;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
@ -44,6 +45,10 @@ public class GetApplicationsRequestPBImpl extends GetApplicationsRequest {
Set<String> applicationTypes = null;
EnumSet<YarnApplicationState> applicationStates = null;
Set<String> users = null;
Set<String> queues = null;
long limit = Long.MAX_VALUE;
LongRange start = null, finish = null;
public GetApplicationsRequestPBImpl() {
builder = GetApplicationsRequestProto.newBuilder();
@ -148,6 +153,26 @@ public class GetApplicationsRequestPBImpl extends GetApplicationsRequest {
}
}
private void initUsers() {
if (this.users != null) {
return;
}
GetApplicationsRequestProtoOrBuilder p = viaProto ? proto : builder;
List<String> usersList = p.getUsersList();
this.users = new HashSet<String>();
this.users.addAll(usersList);
}
private void initQueues() {
if (this.queues != null) {
return;
}
GetApplicationsRequestProtoOrBuilder p = viaProto ? proto : builder;
List<String> queuesList = p.getQueuesList();
this.queues = new HashSet<String>();
this.queues.addAll(queuesList);
}
@Override
public Set<String> getApplicationTypes() {
initApplicationTypes();
@ -177,6 +202,111 @@ public class GetApplicationsRequestPBImpl extends GetApplicationsRequest {
this.applicationStates = applicationStates;
}
@Override
public void setApplicationStates(Set<String> applicationStates) {
EnumSet<YarnApplicationState> appStates = null;
for (YarnApplicationState state : YarnApplicationState.values()) {
if (applicationStates.contains(state.name().toLowerCase())) {
if (appStates == null) {
appStates = EnumSet.of(state);
} else {
appStates.add(state);
}
}
}
setApplicationStates(appStates);
}
@Override
public Set<String> getUsers() {
initUsers();
return this.users;
}
@Override
public void setUsers(Set<String> users) {
maybeInitBuilder();
if (users == null) {
builder.clearUsers();
}
this.users = users;
}
@Override
public Set<String> getQueues() {
initQueues();
return this.queues;
}
@Override
public void setQueues(Set<String> queues) {
maybeInitBuilder();
if (queues == null) {
builder.clearQueues();
}
this.queues = queues;
}
@Override
public long getLimit() {
if (this.limit == Long.MAX_VALUE) {
GetApplicationsRequestProtoOrBuilder p = viaProto ? proto : builder;
this.limit = p.hasLimit() ? p.getLimit() : Long.MAX_VALUE;
}
return this.limit;
}
@Override
public void setLimit(long limit) {
maybeInitBuilder();
this.limit = limit;
}
@Override
public LongRange getStartRange() {
if (this.start == null) {
GetApplicationsRequestProtoOrBuilder p = viaProto ? proto: builder;
if (p.hasStartBegin() || p.hasFinishBegin()) {
long begin = p.hasStartBegin() ? p.getStartBegin() : 0L;
long end = p.hasStartEnd() ? p.getStartEnd() : Long.MAX_VALUE;
this.start = new LongRange(begin, end);
}
}
return this.start;
}
@Override
public void setStartRange(long begin, long end)
throws IllegalArgumentException {
if (begin > end) {
throw new IllegalArgumentException("begin > end in range (begin, " +
"end): (" + begin + ", " + end + ")");
}
this.start = new LongRange(begin, end);
}
@Override
public LongRange getFinishRange() {
if (this.finish == null) {
GetApplicationsRequestProtoOrBuilder p = viaProto ? proto: builder;
if (p.hasFinishBegin() || p.hasFinishEnd()) {
long begin = p.hasFinishBegin() ? p.getFinishBegin() : 0L;
long end = p.hasFinishEnd() ? p.getFinishEnd() : Long.MAX_VALUE;
this.finish = new LongRange(begin, end);
}
}
return this.finish;
}
@Override
public void setFinishRange(long begin, long end) {
if (begin > end) {
throw new IllegalArgumentException("begin > end in range (begin, " +
"end): (" + begin + ", " + end + ")");
}
this.finish = new LongRange(begin, end);
}
@Override
public int hashCode() {
return getProto().hashCode();

View File

@ -28,6 +28,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.math.LongRange;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -401,6 +402,18 @@ public class ClientRMService extends AbstractService implements
@Override
public GetApplicationsResponse getApplications(
GetApplicationsRequest request) throws YarnException {
return getApplications(request, true);
}
/**
* Get applications matching the {@link GetApplicationsRequest}. If
* caseSensitive is set to false, applicationTypes in
* GetApplicationRequest are expected to be in all-lowercase
*/
@Private
public GetApplicationsResponse getApplications(
GetApplicationsRequest request, boolean caseSensitive)
throws YarnException {
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
@ -412,11 +425,23 @@ public class ClientRMService extends AbstractService implements
Set<String> applicationTypes = request.getApplicationTypes();
EnumSet<YarnApplicationState> applicationStates =
request.getApplicationStates();
Set<String> users = request.getUsers();
Set<String> queues = request.getQueues();
long limit = request.getLimit();
LongRange start = request.getStartRange();
LongRange finish = request.getFinishRange();
List<ApplicationReport> reports = new ArrayList<ApplicationReport>();
long count = 0;
for (RMApp application : this.rmContext.getRMApps().values()) {
if (++count > limit) {
break;
}
if (applicationTypes != null && !applicationTypes.isEmpty()) {
if (!applicationTypes.contains(application.getApplicationType())) {
String appTypeToMatch = caseSensitive
? application.getApplicationType()
: application.getApplicationType().toLowerCase();
if (!applicationTypes.contains(appTypeToMatch)) {
continue;
}
}
@ -427,6 +452,25 @@ public class ClientRMService extends AbstractService implements
continue;
}
}
if (users != null && !users.isEmpty() &&
!users.contains(application.getUser())) {
continue;
}
if (queues != null && !queues.isEmpty() &&
!queues.contains(application.getQueue())) {
continue;
}
if (start != null && !start.containsLong(application.getStartTime())) {
continue;
}
if (finish != null && !finish.containsLong(application.getFinishTime())) {
continue;
}
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
ApplicationAccessType.VIEW_APP, application);
reports.add(application.createAndGetApplicationReport(

View File

@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
@ -38,14 +39,20 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
@ -85,6 +92,8 @@ import com.google.inject.Singleton;
@Singleton
@Path("/ws/v1/cluster")
public class RMWebServices {
private static final Log LOG =
LogFactory.getLog(RMWebServices.class.getName());
private static final String EMPTY = "";
private static final String ANY = "*";
private final ResourceManager rm;
@ -253,7 +262,6 @@ public class RMWebServices {
@QueryParam("finishedTimeBegin") String finishBegin,
@QueryParam("finishedTimeEnd") String finishEnd,
@QueryParam("applicationTypes") Set<String> applicationTypes) {
long num = 0;
boolean checkCount = false;
boolean checkStart = false;
boolean checkEnd = false;
@ -328,19 +336,66 @@ public class RMWebServices {
checkAppStates = true;
}
final ConcurrentMap<ApplicationId, RMApp> apps = rm.getRMContext()
.getRMApps();
GetApplicationsRequest request = GetApplicationsRequest.newInstance();
if (checkStart) {
request.setStartRange(sBegin, sEnd);
}
if (checkEnd) {
request.setFinishRange(fBegin, fEnd);
}
if (checkCount) {
request.setLimit(countNum);
}
if (checkAppTypes) {
request.setApplicationTypes(appTypes);
}
if (checkAppStates) {
request.setApplicationStates(appStates);
}
if (queueQuery != null && !queueQuery.isEmpty()) {
ResourceScheduler rs = rm.getResourceScheduler();
if (rs instanceof CapacityScheduler) {
CapacityScheduler cs = (CapacityScheduler) rs;
// validate queue exists
try {
cs.getQueueInfo(queueQuery, false, false);
} catch (IOException e) {
throw new BadRequestException(e.getMessage());
}
}
Set<String> queues = new HashSet<String>(1);
queues.add(queueQuery);
request.setQueues(queues);
}
if (userQuery != null && !userQuery.isEmpty()) {
Set<String> users = new HashSet<String>(1);
users.add(userQuery);
request.setUsers(users);
}
List<ApplicationReport> appReports = null;
try {
appReports = rm.getClientRMService()
.getApplications(request, false).getApplicationList();
} catch (YarnException e) {
LOG.error("Unable to retrieve apps from ClientRMService", e);
throw new YarnRuntimeException(
"Unable to retrieve apps from ClientRMService", e);
}
final ConcurrentMap<ApplicationId, RMApp> apps =
rm.getRMContext().getRMApps();
AppsInfo allApps = new AppsInfo();
for (RMApp rmapp : apps.values()) {
for (ApplicationReport report : appReports) {
RMApp rmapp = apps.get(report.getApplicationId());
if (checkCount && num == countNum) {
break;
}
if (checkAppStates && !appStates.contains(
rmapp.createApplicationState().toString().toLowerCase())) {
continue;
}
if (finalStatusQuery != null && !finalStatusQuery.isEmpty()) {
FinalApplicationStatus.valueOf(finalStatusQuery);
if (!rmapp.getFinalApplicationStatus().toString()
@ -348,43 +403,9 @@ public class RMWebServices {
continue;
}
}
if (userQuery != null && !userQuery.isEmpty()) {
if (!rmapp.getUser().equals(userQuery)) {
continue;
}
}
if (queueQuery != null && !queueQuery.isEmpty()) {
ResourceScheduler rs = rm.getResourceScheduler();
if (rs instanceof CapacityScheduler) {
CapacityScheduler cs = (CapacityScheduler) rs;
// validate queue exists
try {
cs.getQueueInfo(queueQuery, false, false);
} catch (IOException e) {
throw new BadRequestException(e.getMessage());
}
}
if (!rmapp.getQueue().equals(queueQuery)) {
continue;
}
}
if (checkAppTypes && !appTypes.contains(
rmapp.getApplicationType().trim().toLowerCase())) {
continue;
}
if (checkStart
&& (rmapp.getStartTime() < sBegin || rmapp.getStartTime() > sEnd)) {
continue;
}
if (checkEnd
&& (rmapp.getFinishTime() < fBegin || rmapp.getFinishTime() > fEnd)) {
continue;
}
AppInfo app = new AppInfo(rmapp, hasAccess(rmapp, hsr));
allApps.add(app);
num++;
}
return allApps;
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.any;
@ -50,7 +51,6 @@ import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
@ -409,6 +409,89 @@ public class TestClientRMService {
getAllApplicationsResponse.getApplicationList()
.get(0).getApplicationId());
}
@Test
public void testGetApplications() throws IOException, YarnException {
/**
* 1. Submit 3 applications alternately in two queues
* 2. Test each of the filters
*/
// Basic setup
YarnScheduler yarnScheduler = mockYarnScheduler();
RMContext rmContext = mock(RMContext.class);
mockRMContext(yarnScheduler, rmContext);
RMStateStore stateStore = mock(RMStateStore.class);
when(rmContext.getStateStore()).thenReturn(stateStore);
RMAppManager appManager = new RMAppManager(rmContext, yarnScheduler,
null, mock(ApplicationACLsManager.class), new Configuration());
when(rmContext.getDispatcher().getEventHandler()).thenReturn(
new EventHandler<Event>() {
public void handle(Event event) {}
});
ApplicationACLsManager mockAclsManager = mock(ApplicationACLsManager.class);
QueueACLsManager mockQueueACLsManager = mock(QueueACLsManager.class);
when(mockQueueACLsManager.checkAccess(any(UserGroupInformation.class),
any(QueueACL.class), anyString())).thenReturn(true);
ClientRMService rmService =
new ClientRMService(rmContext, yarnScheduler, appManager,
mockAclsManager, mockQueueACLsManager, null);
// Initialize appnames and queues
String[] queues = {"Q-1", "Q-2"};
String[] appNames =
{MockApps.newAppName(), MockApps.newAppName(), MockApps.newAppName()};
ApplicationId[] appIds =
{getApplicationId(101), getApplicationId(102), getApplicationId(103)};
// Submit applications
for (int i = 0; i < appIds.length; i++) {
ApplicationId appId = appIds[i];
when(mockAclsManager.checkAccess(UserGroupInformation.getCurrentUser(),
ApplicationAccessType.VIEW_APP, null, appId)).thenReturn(true);
SubmitApplicationRequest submitRequest = mockSubmitAppRequest(
appId, appNames[i], queues[i % queues.length]);
rmService.submitApplication(submitRequest);
}
// Test different cases of ClientRMService#getApplications()
GetApplicationsRequest request = GetApplicationsRequest.newInstance();
assertEquals("Incorrect total number of apps", 6,
rmService.getApplications(request).getApplicationList().size());
// Check limit
request.setLimit(1L);
assertEquals("Failed to limit applications", 1,
rmService.getApplications(request).getApplicationList().size());
// Check queue
request = GetApplicationsRequest.newInstance();
Set<String> queueSet = new HashSet<String>();
request.setQueues(queueSet);
queueSet.add(queues[0]);
assertEquals("Incorrect number of applications in queue", 2,
rmService.getApplications(request).getApplicationList().size());
assertEquals("Incorrect number of applications in queue", 2,
rmService.getApplications(request, false).getApplicationList().size());
queueSet.add(queues[1]);
assertEquals("Incorrect number of applications in queue", 3,
rmService.getApplications(request).getApplicationList().size());
// Check user
request = GetApplicationsRequest.newInstance();
Set<String> userSet = new HashSet<String>();
request.setUsers(userSet);
userSet.add("random-user-name");
assertEquals("Incorrect number of applications for user", 0,
rmService.getApplications(request).getApplicationList().size());
userSet.add(UserGroupInformation.getCurrentUser().getShortUserName());
assertEquals("Incorrect number of applications for user", 3,
rmService.getApplications(request).getApplicationList().size());
}
@Test(timeout=4000)
public void testConcurrentAppSubmit()
@ -492,10 +575,10 @@ public class TestClientRMService {
submissionContext.setResource(resource);
submissionContext.setApplicationType(appType);
SubmitApplicationRequest submitRequest =
recordFactory.newRecordInstance(SubmitApplicationRequest.class);
submitRequest.setApplicationSubmissionContext(submissionContext);
return submitRequest;
SubmitApplicationRequest submitRequest =
recordFactory.newRecordInstance(SubmitApplicationRequest.class);
submitRequest.setApplicationSubmissionContext(submissionContext);
return submitRequest;
}
private void mockRMContext(YarnScheduler yarnScheduler, RMContext rmContext)