YARN-807. When querying apps by queue, iterating over all apps is inefficient and limiting (Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1548985 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2013-12-08 03:14:43 +00:00
parent 1c1645d17f
commit cee74196c2
17 changed files with 283 additions and 20 deletions

View File

@ -867,5 +867,10 @@ public class ResourceSchedulerWrapper implements ResourceScheduler,
ApplicationAttemptId appAttemptId) { ApplicationAttemptId appAttemptId) {
return scheduler.getAppResourceUsageReport(appAttemptId); return scheduler.getAppResourceUsageReport(appAttemptId);
} }
@Override
public List<ApplicationAttemptId> getAppsInQueue(String queue) {
return scheduler.getAppsInQueue(queue);
}
} }

View File

@ -127,6 +127,9 @@ Release 2.4.0 - UNRELEASED
YARN-546. Allow disabling the Fair Scheduler event log (Sandy Ryza) YARN-546. Allow disabling the Fair Scheduler event log (Sandy Ryza)
YARN-807. When querying apps by queue, iterating over all apps is
inefficient and limiting (Sandy Ryza)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -24,7 +24,9 @@ import java.security.AccessControlException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -67,6 +69,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@ -431,12 +434,51 @@ public class ClientRMService extends AbstractService implements
LongRange start = request.getStartRange(); LongRange start = request.getStartRange();
LongRange finish = request.getFinishRange(); LongRange finish = request.getFinishRange();
List<ApplicationReport> reports = new ArrayList<ApplicationReport>(); final Map<ApplicationId, RMApp> apps = rmContext.getRMApps();
long count = 0; Iterator<RMApp> appsIter;
for (RMApp application : this.rmContext.getRMApps().values()) { // If the query filters by queues, we can avoid considering apps outside
if (++count > limit) { // of those queues by asking the scheduler for the apps in those queues.
break; if (queues != null && !queues.isEmpty()) {
// Construct an iterator over apps in given queues
// Collect list of lists to avoid copying all apps
final List<List<ApplicationAttemptId>> queueAppLists =
new ArrayList<List<ApplicationAttemptId>>();
for (String queue : queues) {
List<ApplicationAttemptId> appsInQueue = scheduler.getAppsInQueue(queue);
if (appsInQueue != null && !appsInQueue.isEmpty()) {
queueAppLists.add(appsInQueue);
}
} }
appsIter = new Iterator<RMApp>() {
Iterator<List<ApplicationAttemptId>> appListIter = queueAppLists.iterator();
Iterator<ApplicationAttemptId> schedAppsIter;
@Override
public boolean hasNext() {
// Because queueAppLists has no empty lists, hasNext is whether the
// current list hasNext or whether there are any remaining lists
return (schedAppsIter != null && schedAppsIter.hasNext())
|| appListIter.hasNext();
}
@Override
public RMApp next() {
if (schedAppsIter == null || !schedAppsIter.hasNext()) {
schedAppsIter = appListIter.next().iterator();
}
return apps.get(schedAppsIter.next().getApplicationId());
}
@Override
public void remove() {
throw new UnsupportedOperationException("Remove not supported");
}
};
} else {
appsIter = apps.values().iterator();
}
List<ApplicationReport> reports = new ArrayList<ApplicationReport>();
while (appsIter.hasNext() && reports.size() < limit) {
RMApp application = appsIter.next();
if (applicationTypes != null && !applicationTypes.isEmpty()) { if (applicationTypes != null && !applicationTypes.isEmpty()) {
String appTypeToMatch = caseSensitive String appTypeToMatch = caseSensitive
? application.getApplicationType() ? application.getApplicationType()
@ -458,11 +500,6 @@ public class ClientRMService extends AbstractService implements
continue; continue;
} }
if (queues != null && !queues.isEmpty() &&
!queues.contains(application.getQueue())) {
continue;
}
if (start != null && !start.containsLong(application.getStartTime())) { if (start != null && !start.containsLong(application.getStartTime())) {
continue; continue;
} }
@ -515,13 +552,12 @@ public class ClientRMService extends AbstractService implements
request.getRecursive()); request.getRecursive());
List<ApplicationReport> appReports = EMPTY_APPS_REPORT; List<ApplicationReport> appReports = EMPTY_APPS_REPORT;
if (request.getIncludeApplications()) { if (request.getIncludeApplications()) {
Collection<RMApp> apps = this.rmContext.getRMApps().values(); List<ApplicationAttemptId> apps =
appReports = new ArrayList<ApplicationReport>( scheduler.getAppsInQueue(request.getQueueName());
apps.size()); appReports = new ArrayList<ApplicationReport>(apps.size());
for (RMApp app : apps) { for (ApplicationAttemptId app : apps) {
if (app.getQueue().equals(queueInfo.getQueueName())) { RMApp rmApp = rmContext.getRMApps().get(app.getApplicationId());
appReports.add(app.createAndGetApplicationReport(null, true)); appReports.add(rmApp.createAndGetApplicationReport(null, true));
}
} }
} }
queueInfo.setApplications(appReports); queueInfo.setApplications(appReports);

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.List; import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate; import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@ -160,4 +161,13 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
*/ */
boolean checkAccess(UserGroupInformation callerUGI, boolean checkAccess(UserGroupInformation callerUGI,
QueueACL acl, String queueName); QueueACL acl, String queueName);
/**
* Gets the apps under a given queue
* @param queueName the name of the queue.
* @return a collection of app attempt ids in the given queue.
*/
@LimitedPrivate("yarn")
@Stable
public List<ApplicationAttemptId> getAppsInQueue(String queueName);
} }

View File

@ -19,12 +19,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.List; import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
@ -33,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -228,4 +231,10 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
*/ */
public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application, public void recoverContainer(Resource clusterResource, FiCaSchedulerApp application,
Container container); Container container);
/**
* Adds all applications in the queue and its subqueues to the given collection.
* @param apps the collection to add the applications to
*/
public void collectSchedulerApplications(Collection<ApplicationAttemptId> apps);
} }

View File

@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@ -941,4 +942,14 @@ public class CapacityScheduler
return queue.hasAccess(acl, callerUGI); return queue.hasAccess(acl, callerUGI);
} }
@Override
public List<ApplicationAttemptId> getAppsInQueue(String queueName) {
CSQueue queue = queues.get(queueName);
if (queue == null) {
return null;
}
List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>();
queue.collectSchedulerApplications(apps);
return apps;
}
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
@ -58,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@ -1622,4 +1624,12 @@ public class LeafQueue implements CSQueue {
return ret; return ret;
} }
@Override
public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
for (FiCaSchedulerApp app : activeApplications) {
apps.add(app.getApplicationAttemptId());
}
}
} }

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
@ -50,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@ -764,4 +766,12 @@ public class ParentQueue implements CSQueue {
parent.recoverContainer(clusterResource, application, container); parent.recoverContainer(clusterResource, application, container);
} }
} }
@Override
public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
for (CSQueue queue : childQueues) {
queue.collectSchedulerApplications(apps);
}
}
} }

View File

@ -29,11 +29,13 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@Private @Private
@Unstable @Unstable
@ -105,6 +107,17 @@ public class FSLeafQueue extends FSQueue {
public List<AppSchedulable> getNonRunnableAppSchedulables() { public List<AppSchedulable> getNonRunnableAppSchedulables() {
return nonRunnableAppScheds; return nonRunnableAppScheds;
} }
@Override
public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
for (AppSchedulable appSched : runnableAppScheds) {
apps.add(appSched.getApp().getApplicationAttemptId());
}
for (AppSchedulable appSched : nonRunnableAppScheds) {
apps.add(appSched.getApp().getApplicationAttemptId());
}
}
@Override @Override
public void setPolicy(SchedulingPolicy policy) public void setPolicy(SchedulingPolicy policy)

View File

@ -28,10 +28,12 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@Private @Private
@Unstable @Unstable
@ -184,4 +186,12 @@ public class FSParentQueue extends FSQueue {
public int getNumRunnableApps() { public int getNumRunnableApps() {
return runnableApps; return runnableApps;
} }
@Override
public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
for (FSQueue childQueue : childQueues) {
childQueue.collectSchedulerApplications(apps);
}
}
} }

View File

@ -24,6 +24,7 @@ import java.util.Collection;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueInfo;
@ -158,7 +159,14 @@ public abstract class FSQueue extends Schedulable implements Queue {
* Gets the children of this queue, if any. * Gets the children of this queue, if any.
*/ */
public abstract Collection<FSQueue> getChildQueues(); public abstract Collection<FSQueue> getChildQueues();
/**
* Adds all applications in the queue and its subqueues to the given collection.
* @param apps the collection to add the applications to
*/
public abstract void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps);
/** /**
* Return the number of apps for which containers can be allocated. * Return the number of apps for which containers can be allocated.
* Includes apps in subqueues. * Includes apps in subqueues.

View File

@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -1267,4 +1268,15 @@ public class FairScheduler implements ResourceScheduler {
} }
} }
@Override
public List<ApplicationAttemptId> getAppsInQueue(String queueName) {
FSQueue queue = queueMgr.getQueue(queueName);
if (queue == null) {
return null;
}
List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>();
queue.collectSchedulerApplications(apps);
return apps;
}
} }

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -850,5 +851,19 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
QueueACL acl, String queueName) { QueueACL acl, String queueName) {
return DEFAULT_QUEUE.hasAccess(acl, callerUGI); return DEFAULT_QUEUE.hasAccess(acl, callerUGI);
} }
@Override
public synchronized List<ApplicationAttemptId> getAppsInQueue(String queueName) {
if (queueName.equals(DEFAULT_QUEUE.getQueueName())) {
List<ApplicationAttemptId> apps = new ArrayList<ApplicationAttemptId>(
applications.size());
for (FiCaSchedulerApp app : applications.values()) {
apps.add(app.getApplicationAttemptId());
}
return apps;
} else {
return null;
}
}
} }

View File

@ -30,9 +30,12 @@ import static org.mockito.Mockito.spy;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -106,6 +109,9 @@ public class TestClientRMService {
private static RMDelegationTokenSecretManager dtsm; private static RMDelegationTokenSecretManager dtsm;
private final static String QUEUE_1 = "Q-1";
private final static String QUEUE_2 = "Q-2";
@BeforeClass @BeforeClass
public static void setupSecretManager() throws IOException { public static void setupSecretManager() throws IOException {
RMContext rmContext = mock(RMContext.class); RMContext rmContext = mock(RMContext.class);
@ -438,7 +444,7 @@ public class TestClientRMService {
mockAclsManager, mockQueueACLsManager, null); mockAclsManager, mockQueueACLsManager, null);
// Initialize appnames and queues // Initialize appnames and queues
String[] queues = {"Q-1", "Q-2"}; String[] queues = {QUEUE_1, QUEUE_2};
String[] appNames = String[] appNames =
{MockApps.newAppName(), MockApps.newAppName(), MockApps.newAppName()}; {MockApps.newAppName(), MockApps.newAppName(), MockApps.newAppName()};
ApplicationId[] appIds = ApplicationId[] appIds =
@ -596,6 +602,8 @@ public class TestClientRMService {
ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext, ConcurrentHashMap<ApplicationId, RMApp> apps = getRMApps(rmContext,
yarnScheduler); yarnScheduler);
when(rmContext.getRMApps()).thenReturn(apps); when(rmContext.getRMApps()).thenReturn(apps);
when(yarnScheduler.getAppsInQueue(eq("testqueue"))).thenReturn(
getSchedulerApps(apps));
} }
private ConcurrentHashMap<ApplicationId, RMApp> getRMApps( private ConcurrentHashMap<ApplicationId, RMApp> getRMApps(
@ -614,10 +622,23 @@ public class TestClientRMService {
config, "testqueue")); config, "testqueue"));
return apps; return apps;
} }
private List<ApplicationAttemptId> getSchedulerApps(
Map<ApplicationId, RMApp> apps) {
List<ApplicationAttemptId> schedApps = new ArrayList<ApplicationAttemptId>();
// Return app IDs for the apps in testqueue (as defined in getRMApps)
schedApps.add(ApplicationAttemptId.newInstance(getApplicationId(1), 0));
schedApps.add(ApplicationAttemptId.newInstance(getApplicationId(3), 0));
return schedApps;
}
private ApplicationId getApplicationId(int id) { private static ApplicationId getApplicationId(int id) {
return ApplicationId.newInstance(123456, id); return ApplicationId.newInstance(123456, id);
} }
private static ApplicationAttemptId getApplicationAttemptId(int id) {
return ApplicationAttemptId.newInstance(getApplicationId(id), 1);
}
private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler, private RMAppImpl getRMApp(RMContext rmContext, YarnScheduler yarnScheduler,
ApplicationId applicationId3, YarnConfiguration config, String queueName) { ApplicationId applicationId3, YarnConfiguration config, String queueName) {
@ -641,6 +662,10 @@ public class TestClientRMService {
when(yarnScheduler.getMaximumResourceCapability()).thenReturn( when(yarnScheduler.getMaximumResourceCapability()).thenReturn(
Resources.createResource( Resources.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB)); YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB));
when(yarnScheduler.getAppsInQueue(QUEUE_1)).thenReturn(
Arrays.asList(getApplicationAttemptId(101), getApplicationAttemptId(102)));
when(yarnScheduler.getAppsInQueue(QUEUE_2)).thenReturn(
Arrays.asList(getApplicationAttemptId(103)));
return yarnScheduler; return yarnScheduler;
} }
} }

View File

@ -651,5 +651,35 @@ public class TestCapacityScheduler {
} }
assertFalse(failed.get()); assertFalse(failed.get());
} }
@Test
public void testGetAppsInQueue() throws Exception {
Application application_0 = new Application("user_0", "a1", resourceManager);
application_0.submit();
Application application_1 = new Application("user_0", "a2", resourceManager);
application_1.submit();
Application application_2 = new Application("user_0", "b2", resourceManager);
application_2.submit();
ResourceScheduler scheduler = resourceManager.getResourceScheduler();
List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
assertEquals(1, appsInA1.size());
List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
assertTrue(appsInA.contains(application_0.getApplicationAttemptId()));
assertTrue(appsInA.contains(application_1.getApplicationAttemptId()));
assertEquals(2, appsInA.size());
List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
assertTrue(appsInRoot.contains(application_0.getApplicationAttemptId()));
assertTrue(appsInRoot.contains(application_1.getApplicationAttemptId()));
assertTrue(appsInRoot.contains(application_2.getApplicationAttemptId()));
assertEquals(3, appsInRoot.size());
Assert.assertNull(scheduler.getAppsInQueue("nonexistentqueue"));
}
} }

View File

@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@ -2490,4 +2491,40 @@ public class TestFairScheduler {
assertEquals("Incorrect number of containers allocated", 1, app assertEquals("Incorrect number of containers allocated", 1, app
.getLiveContainers().size()); .getLiveContainers().size());
} }
@Test
public void testGetAppsInQueue() throws Exception {
scheduler.reinitialize(conf, resourceManager.getRMContext());
ApplicationAttemptId appAttId1 =
createSchedulingRequest(1024, 1, "queue1.subqueue1", "user1");
ApplicationAttemptId appAttId2 =
createSchedulingRequest(1024, 1, "queue1.subqueue2", "user1");
ApplicationAttemptId appAttId3 =
createSchedulingRequest(1024, 1, "default", "user1");
List<ApplicationAttemptId> apps =
scheduler.getAppsInQueue("queue1.subqueue1");
assertEquals(1, apps.size());
assertEquals(appAttId1, apps.get(0));
// with and without root prefix should work
apps = scheduler.getAppsInQueue("root.queue1.subqueue1");
assertEquals(1, apps.size());
assertEquals(appAttId1, apps.get(0));
apps = scheduler.getAppsInQueue("user1");
assertEquals(1, apps.size());
assertEquals(appAttId3, apps.get(0));
// with and without root prefix should work
apps = scheduler.getAppsInQueue("root.user1");
assertEquals(1, apps.size());
assertEquals(appAttId3, apps.get(0));
// apps in subqueues should be included
apps = scheduler.getAppsInQueue("queue1");
Assert.assertEquals(2, apps.size());
Set<ApplicationAttemptId> appAttIds = Sets.newHashSet(apps.get(0), apps.get(1));
assertTrue(appAttIds.contains(appAttId1));
assertTrue(appAttIds.contains(appAttId2));
}
} }

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Method; import java.lang.reflect.Method;
@ -555,6 +556,24 @@ public class TestFifoScheduler {
Assert.assertFalse(fs.getApplication(appAttemptId).isBlacklisted(host)); Assert.assertFalse(fs.getApplication(appAttemptId).isBlacklisted(host));
rm.stop(); rm.stop();
} }
@Test
public void testGetAppsInQueue() throws Exception {
Application application_0 = new Application("user_0", resourceManager);
application_0.submit();
Application application_1 = new Application("user_0", resourceManager);
application_1.submit();
ResourceScheduler scheduler = resourceManager.getResourceScheduler();
List<ApplicationAttemptId> appsInDefault = scheduler.getAppsInQueue("default");
assertTrue(appsInDefault.contains(application_0.getApplicationAttemptId()));
assertTrue(appsInDefault.contains(application_1.getApplicationAttemptId()));
assertEquals(2, appsInDefault.size());
Assert.assertNull(scheduler.getAppsInQueue("someotherqueue"));
}
private void checkApplicationResourceUsage(int expected, private void checkApplicationResourceUsage(int expected,
Application application) { Application application) {