YARN-450. Define value for * in the scheduling protocol (Zhijie Shen via bikas)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1462271 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bikas Saha 2013-03-28 19:44:28 +00:00
parent 799e3c344e
commit 520033b1cd
28 changed files with 132 additions and 126 deletions

View File

@ -54,7 +54,6 @@
public abstract class RMContainerRequestor extends RMCommunicator {
private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
static final String ANY = "*";
private int lastResponseID;
private Resource availableResources;
@ -278,7 +277,7 @@ protected void addContainerReq(ContainerRequest req) {
}
// Off-switch
addResourceRequest(req.priority, ANY, req.capability);
addResourceRequest(req.priority, ResourceRequest.ANY, req.capability);
}
protected void decContainerReq(ContainerRequest req) {
@ -291,7 +290,7 @@ protected void decContainerReq(ContainerRequest req) {
decResourceRequest(req.priority, rack, req.capability);
}
decResourceRequest(req.priority, ANY, req.capability);
decResourceRequest(req.priority, ResourceRequest.ANY, req.capability);
}
private void addResourceRequest(Priority priority, String resourceName,

View File

@ -230,7 +230,7 @@ public AllocateResponse allocate(AllocateRequest request)
List<ResourceRequest> askList = request.getAskList();
List<Container> containers = new ArrayList<Container>();
for (ResourceRequest req : askList) {
if (req.getHostName() != "*") {
if (!ResourceRequest.isAnyLocation(req.getHostName())) {
continue;
}
int numContainers = req.getNumContainers();

View File

@ -100,6 +100,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-469. Make scheduling mode in FS pluggable. (kkambatl via tucu)
YARN-450. Define value for * in the scheduling protocol (Zhijie Shen via
bikas)
OPTIMIZATIONS
BUG FIXES

View File

@ -49,6 +49,26 @@
@Public
@Stable
public abstract class ResourceRequest implements Comparable<ResourceRequest> {
/**
* The constant string representing no locality.
* It should be used by all references that want to pass an arbitrary host
* name in.
*/
public static final String ANY = "*";
/**
* Check whether the given <em>host/rack</em> string represents an arbitrary
* host name.
*
* @param hostName <em>host/rack</em> on which the allocation is desired
* @return whether the given <em>host/rack</em> string represents an arbitrary
* host name
*/
public static boolean isAnyLocation(String hostName) {
return ANY.equals(hostName);
}
/**
* Get the <code>Priority</code> of the request.
* @return <code>Priority</code> of the request

View File

@ -34,11 +34,6 @@
@InterfaceStability.Unstable
public interface AMRMClient extends Service {
/**
* Value used to define no locality
*/
static final String ANY = "*";
/**
* Object to represent container request for resources.
* Resources may be localized to nodes and racks.

View File

@ -258,7 +258,8 @@ public synchronized void addContainerRequest(ContainerRequest req) {
}
// Off-switch
addResourceRequest(req.priority, ANY, req.capability, req.containerCount);
addResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
req.containerCount);
}
@Override
@ -276,7 +277,8 @@ public synchronized void removeContainerRequest(ContainerRequest req) {
}
}
decResourceRequest(req.priority, ANY, req.capability, req.containerCount);
decResourceRequest(req.priority, ResourceRequest.ANY, req.capability,
req.containerCount);
}
@Override

View File

@ -18,18 +18,15 @@
package org.apache.hadoop.yarn.client;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@ -57,6 +54,11 @@
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.service.Service.STATE;
import org.apache.hadoop.yarn.util.Records;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestAMRMClient {
Configuration conf = null;
@ -182,7 +184,7 @@ private void testAllocation(final AMRMClientImpl amClient)
int containersRequestedRack = amClient.remoteRequestsTable.get(priority)
.get(rack).get(capability).getNumContainers();
int containersRequestedAny = amClient.remoteRequestsTable.get(priority)
.get(AMRMClient.ANY).get(capability).getNumContainers();
.get(ResourceRequest.ANY).get(capability).getNumContainers();
assertTrue(containersRequestedNode == 2);
assertTrue(containersRequestedRack == 2);

View File

@ -38,9 +38,9 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpConfig;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
@ -63,10 +63,10 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@ -737,7 +737,7 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
// Request a container for the AM.
ResourceRequest request = BuilderUtils.newResourceRequest(
AM_CONTAINER_PRIORITY, "*", appAttempt.submissionContext
AM_CONTAINER_PRIORITY, ResourceRequest.ANY, appAttempt.submissionContext
.getAMContainerSpec().getResource(), 1);
Allocation amContainerAllocation = appAttempt.scheduler.allocate(

View File

@ -36,8 +36,6 @@
*/
public interface RMNode {
public static final String ANY = "*";
/**
* the node id of of this node.
* @return the node id of this node.

View File

@ -38,8 +38,6 @@
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
/**
* This class keeps track of all the consumption of an application. This also
@ -129,7 +127,7 @@ synchronized public void updateResourceRequests(
boolean updatePendingResources = false;
ResourceRequest lastRequest = null;
if (hostName.equals(RMNode.ANY)) {
if (hostName.equals(ResourceRequest.ANY)) {
if (LOG.isDebugEnabled()) {
LOG.debug("update:" + " application=" + applicationId + " request="
+ request);
@ -195,7 +193,7 @@ synchronized public ResourceRequest getResourceRequest(Priority priority,
}
public synchronized Resource getResource(Priority priority) {
ResourceRequest request = getResourceRequest(priority, RMNode.ANY);
ResourceRequest request = getResourceRequest(priority, ResourceRequest.ANY);
return request.getCapability();
}
@ -261,7 +259,7 @@ synchronized private void allocateNodeLocal(
this.requests.get(priority).remove(node.getRackName());
}
decrementOutstanding(requests.get(priority).get(RMNode.ANY));
decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
}
/**
@ -284,7 +282,7 @@ synchronized private void allocateRackLocal(
this.requests.get(priority).remove(node.getRackName());
}
decrementOutstanding(requests.get(priority).get(RMNode.ANY));
decrementOutstanding(requests.get(priority).get(ResourceRequest.ANY));
}
/**
@ -322,7 +320,7 @@ synchronized private void decrementOutstanding(
synchronized private void checkForDeactivation() {
boolean deactivate = true;
for (Priority priority : getPriorities()) {
ResourceRequest request = getResourceRequest(priority, RMNodeImpl.ANY);
ResourceRequest request = getResourceRequest(priority, ResourceRequest.ANY);
if (request.getNumContainers() > 0) {
deactivate = false;
break;
@ -351,7 +349,7 @@ synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
// clear pending resources metrics for the application
QueueMetrics metrics = queue.getMetrics();
for (Map<String, ResourceRequest> asks : requests.values()) {
ResourceRequest request = asks.get(RMNode.ANY);
ResourceRequest request = asks.get(ResourceRequest.ANY);
if (request != null) {
metrics.decrPendingResources(user, request.getNumContainers(),
Resources.multiply(request.getCapability(), request

View File

@ -58,7 +58,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -816,7 +815,8 @@ private synchronized FiCaSchedulerApp getApplication(
for (Priority priority : application.getPriorities()) {
// Required resource
Resource required =
application.getResourceRequest(priority, RMNode.ANY).getCapability();
application.getResourceRequest(
priority, ResourceRequest.ANY).getCapability();
// Do we need containers at this 'priority'?
if (!needContainers(application, priority, required)) {
@ -1161,7 +1161,7 @@ private Resource assignOffSwitchContainers(Resource clusterResource, FiCaSchedul
FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer) {
ResourceRequest request =
application.getResourceRequest(priority, RMNode.ANY);
application.getResourceRequest(priority, ResourceRequest.ANY);
if (request != null) {
if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
reservedContainer)) {
@ -1183,7 +1183,7 @@ boolean canAssign(FiCaSchedulerApp application, Priority priority,
// 'Delay' off-switch
ResourceRequest offSwitchRequest =
application.getResourceRequest(priority, RMNode.ANY);
application.getResourceRequest(priority, ResourceRequest.ANY);
long missedOpportunities = application.getSchedulingOpportunities(priority);
long requiredContainers = offSwitchRequest.getNumContainers();

View File

@ -51,7 +51,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
@ -153,7 +152,7 @@ public ResourceRequest getResourceRequest(Priority priority, String nodeAddress)
}
public synchronized int getTotalRequiredResources(Priority priority) {
return getResourceRequest(priority, RMNode.ANY).getNumContainers();
return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers();
}
public Resource getResource(Priority priority) {

View File

@ -60,8 +60,6 @@ public class FiCaSchedulerNode extends SchedulerNode {
private final RMNode rmNode;
public static final String ANY = "*";
public FiCaSchedulerNode(RMNode node) {
this.rmNode = node;
this.availableResource.setMemory(node.getTotalCapability().getMemory());

View File

@ -37,7 +37,6 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
@ -336,7 +335,7 @@ public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
}
ResourceRequest offSwitchRequest = app.getResourceRequest(priority,
RMNode.ANY);
ResourceRequest.ANY);
if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0
&& allowedLocality.equals(NodeType.OFF_SWITCH)) {
return assignContainer(node, app, priority, offSwitchRequest,

View File

@ -40,8 +40,8 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@ -50,7 +50,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
@ -153,7 +152,7 @@ public ResourceRequest getResourceRequest(Priority priority, String nodeAddress)
}
public synchronized int getTotalRequiredResources(Priority priority) {
return getResourceRequest(priority, RMNode.ANY).getNumContainers();
return getResourceRequest(priority, ResourceRequest.ANY).getNumContainers();
}
public Resource getResource(Priority priority) {

View File

@ -59,8 +59,6 @@ public class FSSchedulerNode extends SchedulerNode {
private final RMNode rmNode;
public static final String ANY = "*";
public FSSchedulerNode(RMNode node) {
this.rmNode = node;
this.availableResource.setMemory(node.getTotalCapability().getMemory());

View File

@ -57,8 +57,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@ -393,7 +393,7 @@ private void assignContainers(FiCaSchedulerNode node) {
private int getMaxAllocatableContainers(FiCaSchedulerApp application,
Priority priority, FiCaSchedulerNode node, NodeType type) {
ResourceRequest offSwitchRequest =
application.getResourceRequest(priority, FiCaSchedulerNode.ANY);
application.getResourceRequest(priority, ResourceRequest.ANY);
int maxContainers = offSwitchRequest.getNumContainers();
if (type == NodeType.OFF_SWITCH) {
@ -483,7 +483,7 @@ private int assignRackLocalContainers(FiCaSchedulerNode node,
if (request != null) {
// Don't allocate on this rack if the application doens't need containers
ResourceRequest offSwitchRequest =
application.getResourceRequest(priority, FiCaSchedulerNode.ANY);
application.getResourceRequest(priority, ResourceRequest.ANY);
if (offSwitchRequest.getNumContainers() <= 0) {
return 0;
}
@ -504,7 +504,7 @@ private int assignOffSwitchContainers(FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority) {
int assignedContainers = 0;
ResourceRequest request =
application.getResourceRequest(priority, FiCaSchedulerNode.ANY);
application.getResourceRequest(priority, ResourceRequest.ANY);
if (request != null) {
assignedContainers =
assignContainer(node, application, priority,

View File

@ -43,11 +43,9 @@
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.Task.State;
@ -200,9 +198,7 @@ public synchronized void addTask(Task task) {
}
// Off-switch
addResourceRequest(priority, requests,
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode.ANY,
capability);
addResourceRequest(priority, requests, ResourceRequest.ANY, capability);
}
public synchronized void finishTask(Task task) throws IOException {
@ -377,10 +373,7 @@ private void updateResourceRequests(Map<String, ResourceRequest> requests,
}
}
updateResourceRequest(
requests.get(
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode.ANY)
);
updateResourceRequest(requests.get(ResourceRequest.ANY));
if(LOG.isDebugEnabled()) {
LOG.debug("updateResourceRequests:" + " application=" + applicationId

View File

@ -120,8 +120,8 @@ public List<ResourceRequest> createReq(String[] hosts, int memory, int priority,
reqs.add(rackReq);
}
ResourceRequest offRackReq = createResourceReq("*", memory, priority,
containers);
ResourceRequest offRackReq = createResourceReq(ResourceRequest.ANY, memory,
priority, containers);
reqs.add(offRackReq);
return reqs;

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
public class Task {
@ -57,8 +58,7 @@ public Task(Application application, Priority priority, String[] hosts) {
// Special case: Don't care about locality
if (!(hosts.length == 1 &&
hosts[0].equals(
org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode.ANY))) {
hosts[0].equals(ResourceRequest.ANY))) {
for (String host : hosts) {
this.hosts.add(host);
this.racks.add(Application.resolve(host));

View File

@ -264,14 +264,14 @@ public void testHeadroom() throws Exception {
// Ask for a 1 GB container for app 1
List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), "*",
BuilderUtils.newResource(GB, 1), 1));
ask1.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
ResourceRequest.ANY, BuilderUtils.newResource(GB, 1), 1));
fs.allocate(appAttemptId1, ask1, emptyId);
// Ask for a 2 GB container for app 2
List<ResourceRequest> ask2 = new ArrayList<ResourceRequest>();
ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), "*",
BuilderUtils.newResource(2 * GB, 1), 1));
ask2.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
ResourceRequest.ANY, BuilderUtils.newResource(2 * GB, 1), 1));
fs.allocate(appAttemptId2, ask2, emptyId);
// Trigger container assignment

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -124,7 +125,7 @@ public void testResourceAllocation() throws IOException {
Task t2 = new Task(application, priority1, new String[] {host1, host2});
application.addTask(t2);
Task t3 = new Task(application, priority0, new String[] {RMNode.ANY});
Task t3 = new Task(application, priority0, new String[] {ResourceRequest.ANY});
application.addTask(t3);
// Send resource requests to the scheduler

View File

@ -47,10 +47,9 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.junit.After;
@ -506,7 +505,7 @@ public void testHeadroom() throws Exception {
List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
app_0_0_requests.add(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2,
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
priority_1, recordFactory));
app_0_0.updateResourceRequests(app_0_0_requests);
@ -525,7 +524,7 @@ public void testHeadroom() throws Exception {
List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
app_0_1_requests.add(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2,
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
priority_1, recordFactory));
app_0_1.updateResourceRequests(app_0_1_requests);
@ -544,7 +543,7 @@ public void testHeadroom() throws Exception {
List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
app_1_0_requests.add(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2,
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
priority_1, recordFactory));
app_1_0.updateResourceRequests(app_1_0_requests);

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
@ -183,7 +184,7 @@ public void testCapacityScheduler() throws Exception {
LOG.info("Adding new tasks...");
Task task_1_1 = new Task(application_1, priority_0,
new String[] {RMNode.ANY});
new String[] {ResourceRequest.ANY});
application_1.addTask(task_1_1);
application_1.schedule();

View File

@ -24,8 +24,8 @@
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@ -55,13 +55,12 @@
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -287,7 +286,7 @@ public void testSingleQueueOneUserMetrics() throws Exception {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 3, priority,
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, priority,
recordFactory)));
// Start testing...
@ -409,11 +408,11 @@ public void testSingleQueueWithOneUser() throws Exception {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 3, priority,
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, priority,
recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
recordFactory)));
// Start testing...
@ -542,11 +541,11 @@ public void testUserLimits() throws Exception {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1, priority,
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, priority,
recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
recordFactory)));
/**
@ -635,11 +634,11 @@ public void testHeadroomWithMaxCap() throws Exception {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1, priority,
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, priority,
recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
recordFactory)));
/**
@ -674,7 +673,7 @@ public void testHeadroomWithMaxCap() throws Exception {
// Submit requests for app_1 and set max-cap
a.setMaxCapacity(.1f);
app_2.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, priority,
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, priority,
recordFactory)));
assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
@ -691,7 +690,7 @@ public void testHeadroomWithMaxCap() throws Exception {
// Check headroom for app_2
LOG.info("here");
app_1.updateResourceRequests(Collections.singletonList( // unset
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 0, priority,
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, priority,
recordFactory)));
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
a.assignContainers(clusterResource, node_1);
@ -752,11 +751,11 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 10, priority,
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, priority,
recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 10, priority,
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, priority,
recordFactory)));
/**
@ -786,11 +785,11 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
// Submit resource requests for other apps now to 'activate' them
app_2.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 3*GB, 1, priority,
TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 1, priority,
recordFactory)));
app_3.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
recordFactory)));
// Now allocations should goto app_2 since
@ -914,11 +913,11 @@ public void testReservation() throws Exception {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 4*GB, 1, priority,
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, priority,
recordFactory)));
// Start testing...
@ -1016,7 +1015,7 @@ public void testStolenReservedContainer() throws Exception {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1, priority,
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, priority,
recordFactory)));
// Setup app_1 to request a 4GB container on host_0 and
@ -1027,7 +1026,7 @@ public void testStolenReservedContainer() throws Exception {
priority, recordFactory));
appRequests_1.add(TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1,
priority, recordFactory));
appRequests_1.add(TestUtils.createResourceRequest(RMNodeImpl.ANY, 4*GB, 2,
appRequests_1.add(TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 2,
priority, recordFactory));
app_1.updateResourceRequests(appRequests_1);
@ -1122,11 +1121,11 @@ public void testReservationExchange() throws Exception {
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
recordFactory)));
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 4*GB, 1, priority,
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, priority,
recordFactory)));
// Start testing...
@ -1248,7 +1247,7 @@ public void testLocalityScheduling() throws Exception {
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 3, // one extra
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, // one extra
priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
@ -1313,7 +1312,7 @@ public void testLocalityScheduling() throws Exception {
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
priority, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, // one extra
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, // one extra
priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
assertEquals(2, app_0.getTotalRequiredResources(priority));
@ -1391,7 +1390,7 @@ public void testApplicationPriorityScheduling() throws Exception {
TestUtils.createResourceRequest(rack_1, 1*GB, 1,
priority_1, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2,
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
priority_1, recordFactory));
// P2
@ -1403,7 +1402,7 @@ public void testApplicationPriorityScheduling() throws Exception {
TestUtils.createResourceRequest(rack_2, 2*GB, 1,
priority_2, recordFactory));
app_0_requests_0.add(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1,
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1,
priority_2, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
@ -1528,7 +1527,7 @@ public void testSchedulingConstraints() throws Exception {
// Add one request
app_0_requests_0.clear();
app_0_requests_0.add(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, // only one
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);
@ -1551,7 +1550,7 @@ public void testSchedulingConstraints() throws Exception {
// Add one request
app_0_requests_0.clear();
app_0_requests_0.add(
TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, // only one
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
priority, recordFactory));
app_0.updateResourceRequests(app_0_requests_0);

View File

@ -45,7 +45,6 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
@ -142,7 +141,8 @@ private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
}
private ResourceRequest createResourceRequest(int memory, String host, int priority, int numContainers) {
private ResourceRequest createResourceRequest(int memory, String host,
int priority, int numContainers) {
ResourceRequest request = recordFactory.newRecordInstance(ResourceRequest.class);
request.setCapability(Resources.createResource(memory));
request.setHostName(host);
@ -157,27 +157,33 @@ private ResourceRequest createResourceRequest(int memory, String host, int prior
* Creates a single container priority-1 request and submits to
* scheduler.
*/
private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, String userId) {
private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
String userId) {
return createSchedulingRequest(memory, queueId, userId, 1);
}
private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, String userId, int numContainers) {
private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
String userId, int numContainers) {
return createSchedulingRequest(memory, queueId, userId, numContainers, 1);
}
private ApplicationAttemptId createSchedulingRequest(int memory, String queueId, String userId, int numContainers, int priority) {
private ApplicationAttemptId createSchedulingRequest(int memory, String queueId,
String userId, int numContainers, int priority) {
ApplicationAttemptId id = createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
scheduler.addApplication(id, queueId, userId);
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest request = createResourceRequest(memory, "*", priority, numContainers);
ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY,
priority, numContainers);
ask.add(request);
scheduler.allocate(id, ask, new ArrayList<ContainerId>());
return id;
}
private void createSchedulingRequestExistingApplication(int memory, int priority, ApplicationAttemptId attId) {
private void createSchedulingRequestExistingApplication(int memory, int priority,
ApplicationAttemptId attId) {
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ResourceRequest request = createResourceRequest(memory, "*", priority, 1);
ResourceRequest request = createResourceRequest(memory, ResourceRequest.ANY,
priority, 1);
ask.add(request);
scheduler.allocate(attId, ask, new ArrayList<ContainerId>());
}
@ -459,7 +465,8 @@ public void testQueueDemandCalculation() throws Exception {
// First ask, queue1 requests 1 large (minReqSize * 2).
List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();
ResourceRequest request1 = createResourceRequest(minReqSize * 2, "*", 1, 1);
ResourceRequest request1 =
createResourceRequest(minReqSize * 2, ResourceRequest.ANY, 1, 1);
ask1.add(request1);
scheduler.allocate(id11, ask1, new ArrayList<ContainerId>());
@ -473,7 +480,8 @@ public void testQueueDemandCalculation() throws Exception {
// Third ask, queue2 requests 1 large
List<ResourceRequest> ask3 = new ArrayList<ResourceRequest>();
ResourceRequest request4 = createResourceRequest(2 * minReqSize, "*", 1, 1);
ResourceRequest request4 =
createResourceRequest(2 * minReqSize, ResourceRequest.ANY, 1, 1);
ask3.add(request4);
scheduler.allocate(id22, ask3, new ArrayList<ContainerId>());
@ -1311,7 +1319,7 @@ public void testMultipleNodesSingleRackRequest() throws Exception {
asks.add(createResourceRequest(1024, node3.getHostName(), 1, 1));
asks.add(createResourceRequest(1024, node1.getRackName(), 1, 1));
asks.add(createResourceRequest(1024, node3.getRackName(), 1, 1));
asks.add(createResourceRequest(1024, RMNode.ANY, 1, 2));
asks.add(createResourceRequest(1024, ResourceRequest.ANY, 1, 2));
scheduler.allocate(appId, asks, new ArrayList<ContainerId>());

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.InlineDispatcher;
@ -40,7 +41,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
@ -200,15 +200,15 @@ public void testFifoScheduler() throws Exception {
LOG.info("Adding new tasks...");
Task task_1_1 = new Task(application_1, priority_1,
new String[] {RMNode.ANY});
new String[] {ResourceRequest.ANY});
application_1.addTask(task_1_1);
Task task_1_2 = new Task(application_1, priority_1,
new String[] {RMNode.ANY});
new String[] {ResourceRequest.ANY});
application_1.addTask(task_1_2);
Task task_1_3 = new Task(application_1, priority_0,
new String[] {RMNode.ANY});
new String[] {ResourceRequest.ANY});
application_1.addTask(task_1_3);
application_1.schedule();
@ -222,7 +222,7 @@ public void testFifoScheduler() throws Exception {
application_0.addTask(task_0_2);
Task task_0_3 = new Task(application_0, priority_0,
new String[] {RMNode.ANY});
new String[] {ResourceRequest.ANY});
application_0.addTask(task_0_3);
application_0.schedule();

View File

@ -22,7 +22,6 @@
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileWriter;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@ -70,11 +69,8 @@
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -89,7 +85,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -486,8 +481,8 @@ private Container requestAndGetContainer(AMRMProtocol scheduler,
// Request a container allocation.
List<ResourceRequest> ask = new ArrayList<ResourceRequest>();
ask.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0), "*",
BuilderUtils.newResource(1024, 1), 1));
ask.add(BuilderUtils.newResourceRequest(BuilderUtils.newPriority(0),
ResourceRequest.ANY, BuilderUtils.newResource(1024, 1), 1));
AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
BuilderUtils.newApplicationAttemptId(appID, 1), 0, 0F, ask,