YARN-8202. DefaultAMSProcessor should properly check units of requested custom resource types against minimum/maximum allocation (snemeth via rkanter)

(cherry picked from commit c8b53c4364)
This commit is contained in:
Robert Kanter 2018-05-10 09:31:59 -07:00
parent f4f2912820
commit 0506c762b2
8 changed files with 961 additions and 333 deletions

View File

@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.rm;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.api.records.Resource;
final class ContainerRequestCreator {
private ContainerRequestCreator() {}
static ContainerRequestEvent createRequest(JobId jobId, int taskAttemptId,
Resource resource, String[] hosts) {
return createRequest(jobId, taskAttemptId, resource, hosts,
false, false);
}
static ContainerRequestEvent createRequest(JobId jobId, int taskAttemptId,
Resource resource, String[] hosts, boolean earlierFailedAttempt,
boolean reduce) {
final TaskId taskId;
if (reduce) {
taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
} else {
taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
}
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
taskAttemptId);
if (earlierFailedAttempt) {
return ContainerRequestEvent
.createContainerRequestEventForFailedContainer(attemptId,
resource);
}
return new ContainerRequestEvent(attemptId, resource, hosts,
new String[]{NetworkTopology.DEFAULT_RACK});
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app.rm; package org.apache.hadoop.mapreduce.v2.app.rm;
import static org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestCreator.createRequest;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyFloat; import static org.mockito.Matchers.anyFloat;
@ -96,7 +97,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.CollectorInfo; import org.apache.hadoop.yarn.api.records.CollectorInfo;
@ -215,13 +215,13 @@ public class TestRMContainerAllocator {
rm.drainEvents(); rm.drainEvents();
// create the container request // create the container request
ContainerRequestEvent event1 = createReq(jobId, 1, 1024, ContainerRequestEvent event1 = ContainerRequestCreator.createRequest(jobId,
new String[] { "h1" }); 1, Resource.newInstance(1024, 1), new String[] {"h1"});
allocator.sendRequest(event1); allocator.sendRequest(event1);
// send 1 more request with different resource req // send 1 more request with different resource req
ContainerRequestEvent event2 = createReq(jobId, 2, 1024, ContainerRequestEvent event2 = ContainerRequestCreator.createRequest(jobId,
new String[] { "h2" }); 2, Resource.newInstance(1024, 1), new String[] {"h2"});
allocator.sendRequest(event2); allocator.sendRequest(event2);
// this tells the scheduler about the requests // this tells the scheduler about the requests
@ -232,8 +232,8 @@ public class TestRMContainerAllocator {
Assert.assertEquals(4, rm.getMyFifoScheduler().lastAsk.size()); Assert.assertEquals(4, rm.getMyFifoScheduler().lastAsk.size());
// send another request with different resource and priority // send another request with different resource and priority
ContainerRequestEvent event3 = createReq(jobId, 3, 1024, ContainerRequestEvent event3 = ContainerRequestCreator.createRequest(jobId,
new String[] { "h3" }); 3, Resource.newInstance(1024, 1), new String[] {"h3"});
allocator.sendRequest(event3); allocator.sendRequest(event3);
// this tells the scheduler about the requests // this tells the scheduler about the requests
@ -303,13 +303,16 @@ public class TestRMContainerAllocator {
rm.drainEvents(); rm.drainEvents();
// create the container requests for maps // create the container requests for maps
ContainerRequestEvent event1 = createReq(jobId, 1, 1024, ContainerRequestEvent event1 = ContainerRequestCreator.createRequest(
jobId, 1, Resource.newInstance(1024, 1),
new String[]{"h1"}); new String[]{"h1"});
allocator.sendRequest(event1); allocator.sendRequest(event1);
ContainerRequestEvent event2 = createReq(jobId, 2, 1024, ContainerRequestEvent event2 = ContainerRequestCreator.createRequest(
jobId, 2, Resource.newInstance(1024, 1),
new String[]{"h1"}); new String[]{"h1"});
allocator.sendRequest(event2); allocator.sendRequest(event2);
ContainerRequestEvent event3 = createReq(jobId, 3, 1024, ContainerRequestEvent event3 = ContainerRequestCreator.createRequest(
jobId, 3, Resource.newInstance(1024, 1),
new String[]{"h2"}); new String[]{"h2"});
allocator.sendRequest(event3); allocator.sendRequest(event3);
@ -381,12 +384,14 @@ public class TestRMContainerAllocator {
rm.drainEvents(); rm.drainEvents();
// create the container request // create the container request
ContainerRequestEvent event1 = createReq(jobId, 1, 1024, ContainerRequestEvent event1 = ContainerRequestCreator.createRequest(
jobId, 1, Resource.newInstance(1024, 1),
new String[] {"h1"}); new String[] {"h1"});
allocator.sendRequest(event1); allocator.sendRequest(event1);
// send 1 more request with different resource req // send 1 more request with different resource req
ContainerRequestEvent event2 = createReq(jobId, 2, 2048, ContainerRequestEvent event2 = ContainerRequestCreator.createRequest(
jobId, 2, Resource.newInstance(1024, 1),
new String[] {"h2"}); new String[] {"h2"});
allocator.sendRequest(event2); allocator.sendRequest(event2);
@ -440,14 +445,18 @@ public class TestRMContainerAllocator {
// create the container request // create the container request
final String[] locations = new String[] {host}; final String[] locations = new String[] {host};
allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true)); allocator.sendRequest(createRequest(jobId, 0,
Resource.newInstance(1024, 1),
locations, false, true));
for (int i = 0; i < 1;) { for (int i = 0; i < 1;) {
rm.drainEvents(); rm.drainEvents();
i += allocator.schedule().size(); i += allocator.schedule().size();
nm.nodeHeartbeat(true); nm.nodeHeartbeat(true);
} }
allocator.sendRequest(createReq(jobId, 0, 1024, locations, true, false)); allocator.sendRequest(createRequest(jobId, 0,
Resource.newInstance(1024, 1),
locations, true, false));
while (allocator.getTaskAttemptKillEvents().size() == 0) { while (allocator.getTaskAttemptKillEvents().size() == 0) {
rm.drainEvents(); rm.drainEvents();
allocator.schedule().size(); allocator.schedule().size();
@ -494,7 +503,8 @@ public class TestRMContainerAllocator {
RMContainerAllocator.ScheduledRequests scheduledRequests = RMContainerAllocator.ScheduledRequests scheduledRequests =
allocator.getScheduledRequests(); allocator.getScheduledRequests();
ContainerRequestEvent event1 = ContainerRequestEvent event1 =
createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); createRequest(jobId, 1, Resource.newInstance(2048, 1),
new String[] {"h1"}, false, false);
scheduledRequests.maps.put(mock(TaskAttemptId.class), scheduledRequests.maps.put(mock(TaskAttemptId.class),
new RMContainerRequestor.ContainerRequest(event1, null, null)); new RMContainerRequestor.ContainerRequest(event1, null, null));
assignedRequests.reduces.put(mock(TaskAttemptId.class), assignedRequests.reduces.put(mock(TaskAttemptId.class),
@ -547,9 +557,12 @@ public class TestRMContainerAllocator {
RMContainerAllocator.ScheduledRequests scheduledRequests = RMContainerAllocator.ScheduledRequests scheduledRequests =
allocator.getScheduledRequests(); allocator.getScheduledRequests();
ContainerRequestEvent event1 = ContainerRequestEvent event1 =
createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); createRequest(jobId, 1,
Resource.newInstance(2048, 1),
new String[] {"h1"}, false, false);
scheduledRequests.maps.put(mock(TaskAttemptId.class), scheduledRequests.maps.put(mock(TaskAttemptId.class),
new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime())); new RMContainerRequestor.ContainerRequest(event1, null,
clock.getTime()));
assignedRequests.reduces.put(mock(TaskAttemptId.class), assignedRequests.reduces.put(mock(TaskAttemptId.class),
mock(Container.class)); mock(Container.class));
@ -608,9 +621,12 @@ public class TestRMContainerAllocator {
RMContainerAllocator.ScheduledRequests scheduledRequests = RMContainerAllocator.ScheduledRequests scheduledRequests =
allocator.getScheduledRequests(); allocator.getScheduledRequests();
ContainerRequestEvent event1 = ContainerRequestEvent event1 =
createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); createRequest(jobId, 1,
Resource.newInstance(2048, 1),
new String[] {"h1"}, false, false);
scheduledRequests.maps.put(mock(TaskAttemptId.class), scheduledRequests.maps.put(mock(TaskAttemptId.class),
new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime())); new RMContainerRequestor.ContainerRequest(event1, null,
clock.getTime()));
assignedRequests.reduces.put(mock(TaskAttemptId.class), assignedRequests.reduces.put(mock(TaskAttemptId.class),
mock(Container.class)); mock(Container.class));
@ -652,12 +668,16 @@ public class TestRMContainerAllocator {
// request to allocate two reduce priority containers // request to allocate two reduce priority containers
final String[] locations = new String[] {host}; final String[] locations = new String[] {host};
allocator.sendRequest(createReq(jobId, 0, 1024, locations, false, true)); allocator.sendRequest(createRequest(jobId, 0,
Resource.newInstance(1024, 1),
locations, false, true));
allocator.scheduleAllReduces(); allocator.scheduleAllReduces();
allocator.makeRemoteRequest(); allocator.makeRemoteRequest();
nm.nodeHeartbeat(true); nm.nodeHeartbeat(true);
rm.drainEvents(); rm.drainEvents();
allocator.sendRequest(createReq(jobId, 1, 1024, locations, false, false)); allocator.sendRequest(createRequest(jobId, 1,
Resource.newInstance(1024, 1),
locations, false, false));
int assignedContainer; int assignedContainer;
for (assignedContainer = 0; assignedContainer < 1;) { for (assignedContainer = 0; assignedContainer < 1;) {
@ -706,13 +726,16 @@ public class TestRMContainerAllocator {
// create some map requests // create some map requests
ContainerRequestEvent reqMapEvents; ContainerRequestEvent reqMapEvents;
reqMapEvents = createReq(jobId, 0, 1024, new String[] { "map" }); reqMapEvents = ContainerRequestCreator.createRequest(jobId, 0,
Resource.newInstance(1024, 1), new String[]{"map"});
allocator.sendRequests(Arrays.asList(reqMapEvents)); allocator.sendRequests(Arrays.asList(reqMapEvents));
// create some reduce requests // create some reduce requests
ContainerRequestEvent reqReduceEvents; ContainerRequestEvent reqReduceEvents;
reqReduceEvents = reqReduceEvents =
createReq(jobId, 0, 2048, new String[] { "reduce" }, false, true); createRequest(jobId, 0,
Resource.newInstance(2048, 1),
new String[] {"reduce"}, false, true);
allocator.sendRequests(Arrays.asList(reqReduceEvents)); allocator.sendRequests(Arrays.asList(reqReduceEvents));
allocator.schedule(); allocator.schedule();
// verify all of the host-specific asks were sent plus one for the // verify all of the host-specific asks were sent plus one for the
@ -883,17 +906,20 @@ public class TestRMContainerAllocator {
// create the container request // create the container request
// send MAP request // send MAP request
ContainerRequestEvent event1 = createReq(jobId, 1, 2048, new String[] { ContainerRequestEvent event1 = createRequest(jobId, 1,
"h1", "h2" }, true, false); Resource.newInstance(2048, 1),
new String[] {"h1", "h2"}, true, false);
allocator.sendRequest(event1); allocator.sendRequest(event1);
// send REDUCE request // send REDUCE request
ContainerRequestEvent event2 = createReq(jobId, 2, 3000, ContainerRequestEvent event2 = createRequest(jobId, 2,
Resource.newInstance(3000, 1),
new String[] {"h1"}, false, true); new String[] {"h1"}, false, true);
allocator.sendRequest(event2); allocator.sendRequest(event2);
// send MAP request // send MAP request
ContainerRequestEvent event3 = createReq(jobId, 3, 2048, ContainerRequestEvent event3 = createRequest(jobId, 3,
Resource.newInstance(2048, 1),
new String[] {"h3"}, false, false); new String[] {"h3"}, false, false);
allocator.sendRequest(event3); allocator.sendRequest(event3);
@ -921,7 +947,7 @@ public class TestRMContainerAllocator {
} }
} }
private static class MyResourceManager extends MockRM { static class MyResourceManager extends MockRM {
private static long fakeClusterTimeStamp = System.currentTimeMillis(); private static long fakeClusterTimeStamp = System.currentTimeMillis();
@ -1251,7 +1277,9 @@ public class TestRMContainerAllocator {
rm.drainEvents(); rm.drainEvents();
// create the map container request // create the map container request
ContainerRequestEvent event = createReq(jobId, 1, 1024, ContainerRequestEvent event =
ContainerRequestCreator.createRequest(jobId, 1,
Resource.newInstance(1024, 1),
new String[] {"h1"}); new String[] {"h1"});
allocator.sendRequest(event); allocator.sendRequest(event);
TaskAttemptId attemptId = event.getAttemptID(); TaskAttemptId attemptId = event.getAttemptID();
@ -1292,8 +1320,10 @@ public class TestRMContainerAllocator {
// updated nodes are reported // updated nodes are reported
Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size()); Assert.assertEquals(1, allocator.getJobUpdatedNodeEvents().size());
Assert.assertEquals(1, allocator.getTaskAttemptKillEvents().size()); Assert.assertEquals(1, allocator.getTaskAttemptKillEvents().size());
Assert.assertEquals(2, allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size()); Assert.assertEquals(2,
Assert.assertEquals(attemptId, allocator.getTaskAttemptKillEvents().get(0).getTaskAttemptID()); allocator.getJobUpdatedNodeEvents().get(0).getUpdatedNodes().size());
Assert.assertEquals(attemptId,
allocator.getTaskAttemptKillEvents().get(0).getTaskAttemptID());
allocator.getJobUpdatedNodeEvents().clear(); allocator.getJobUpdatedNodeEvents().clear();
allocator.getTaskAttemptKillEvents().clear(); allocator.getTaskAttemptKillEvents().clear();
@ -1347,17 +1377,23 @@ public class TestRMContainerAllocator {
rm.drainEvents(); rm.drainEvents();
// create the container request // create the container request
ContainerRequestEvent event1 = createReq(jobId, 1, 1024, ContainerRequestEvent event1 =
ContainerRequestCreator.createRequest(jobId, 1,
Resource.newInstance(1024, 1),
new String[] {"h1"}); new String[] {"h1"});
allocator.sendRequest(event1); allocator.sendRequest(event1);
// send 1 more request with different resource req // send 1 more request with different resource req
ContainerRequestEvent event2 = createReq(jobId, 2, 1024, ContainerRequestEvent event2 =
ContainerRequestCreator.createRequest(jobId, 2,
Resource.newInstance(1024, 1),
new String[] {"h2"}); new String[] {"h2"});
allocator.sendRequest(event2); allocator.sendRequest(event2);
// send another request with different resource and priority // send another request with different resource and priority
ContainerRequestEvent event3 = createReq(jobId, 3, 1024, ContainerRequestEvent event3 =
ContainerRequestCreator.createRequest(jobId, 3,
Resource.newInstance(1024, 1),
new String[] {"h3"}); new String[] {"h3"});
allocator.sendRequest(event3); allocator.sendRequest(event3);
@ -1576,7 +1612,8 @@ public class TestRMContainerAllocator {
int expectedAdditions2, int expectedRemovals2, MyResourceManager rm) int expectedAdditions2, int expectedRemovals2, MyResourceManager rm)
throws Exception { throws Exception {
ContainerRequestEvent reqEvent = ContainerRequestEvent reqEvent =
createReq(jobId, taskAttemptId, memory, hosts); ContainerRequestCreator.createRequest(jobId, taskAttemptId,
Resource.newInstance(memory, 1), hosts);
allocator.sendRequest(reqEvent); allocator.sendRequest(reqEvent);
// Send the request to the RM // Send the request to the RM
@ -1638,7 +1675,9 @@ public class TestRMContainerAllocator {
LOG.info("Requesting 1 Containers _1 on H1"); LOG.info("Requesting 1 Containers _1 on H1");
// create the container request // create the container request
ContainerRequestEvent event1 = createReq(jobId, 1, 1024, ContainerRequestEvent event1 =
ContainerRequestCreator.createRequest(jobId, 1,
Resource.newInstance(1024, 1),
new String[] {"h1"}); new String[] {"h1"});
allocator.sendRequest(event1); allocator.sendRequest(event1);
@ -1667,7 +1706,8 @@ public class TestRMContainerAllocator {
//At this stage, a request should be created for a fast fail map //At this stage, a request should be created for a fast fail map
//Create a FAST_FAIL request for a previously failed map. //Create a FAST_FAIL request for a previously failed map.
ContainerRequestEvent event1f = createReq(jobId, 1, 1024, ContainerRequestEvent event1f = createRequest(jobId, 1,
Resource.newInstance(1024, 1),
new String[] {"h1"}, true, false); new String[] {"h1"}, true, false);
allocator.sendRequest(event1f); allocator.sendRequest(event1f);
@ -1678,7 +1718,9 @@ public class TestRMContainerAllocator {
Assert.assertEquals("No of assignments must be 0", 0, assigned.size()); Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
// send another request with different resource and priority // send another request with different resource and priority
ContainerRequestEvent event3 = createReq(jobId, 3, 1024, ContainerRequestEvent event3 =
ContainerRequestCreator.createRequest(jobId, 3,
Resource.newInstance(1024, 1),
new String[] {"h1", "h3"}); new String[] {"h1", "h3"});
allocator.sendRequest(event3); allocator.sendRequest(event3);
@ -1855,38 +1897,6 @@ public class TestRMContainerAllocator {
} }
} }
private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
int memory, String[] hosts) {
return createReq(jobId, taskAttemptId, memory, 1, hosts, false, false);
}
private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
int mem, String[] hosts, boolean earlierFailedAttempt, boolean reduce) {
return createReq(jobId, taskAttemptId, mem,
1, hosts, earlierFailedAttempt, reduce);
}
private ContainerRequestEvent createReq(JobId jobId, int taskAttemptId,
int memory, int vcore, String[] hosts, boolean earlierFailedAttempt,
boolean reduce) {
TaskId taskId;
if (reduce) {
taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
} else {
taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
}
TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
taskAttemptId);
Resource containerNeed = Resource.newInstance(memory, vcore);
if (earlierFailedAttempt) {
return ContainerRequestEvent
.createContainerRequestEventForFailedContainer(attemptId,
containerNeed);
}
return new ContainerRequestEvent(attemptId, containerNeed, hosts,
new String[] { NetworkTopology.DEFAULT_RACK });
}
private ContainerFailedEvent createFailEvent(JobId jobId, int taskAttemptId, private ContainerFailedEvent createFailEvent(JobId jobId, int taskAttemptId,
String host, boolean reduce) { String host, boolean reduce) {
TaskId taskId; TaskId taskId;
@ -1957,14 +1967,14 @@ public class TestRMContainerAllocator {
// Mock RMContainerAllocator // Mock RMContainerAllocator
// Instead of talking to remote Scheduler,uses the local Scheduler // Instead of talking to remote Scheduler,uses the local Scheduler
private static class MyContainerAllocator extends RMContainerAllocator { static class MyContainerAllocator extends RMContainerAllocator {
static final List<TaskAttemptContainerAssignedEvent> events static final List<TaskAttemptContainerAssignedEvent> events =
= new ArrayList<TaskAttemptContainerAssignedEvent>(); new ArrayList<>();
static final List<TaskAttemptKillEvent> taskAttemptKillEvents static final List<TaskAttemptKillEvent> taskAttemptKillEvents =
= new ArrayList<TaskAttemptKillEvent>(); new ArrayList<>();
static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents =
= new ArrayList<JobUpdatedNodesEvent>(); new ArrayList<>();
static final List<JobEvent> jobEvents = new ArrayList<JobEvent>(); static final List<JobEvent> jobEvents = new ArrayList<>();
private MyResourceManager rm; private MyResourceManager rm;
private boolean isUnregistered = false; private boolean isUnregistered = false;
private AllocateResponse allocateResponse; private AllocateResponse allocateResponse;
@ -2099,8 +2109,7 @@ public class TestRMContainerAllocator {
// run the scheduler // run the scheduler
super.heartbeat(); super.heartbeat();
List<TaskAttemptContainerAssignedEvent> result List<TaskAttemptContainerAssignedEvent> result = new ArrayList<>(events);
= new ArrayList<TaskAttemptContainerAssignedEvent>(events);
events.clear(); events.clear();
return result; return result;
} }
@ -2405,9 +2414,10 @@ public class TestRMContainerAllocator {
MRBuilderUtils.newTaskId( MRBuilderUtils.newTaskId(
MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1); MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1);
ApplicationId applicationId = ApplicationId.newInstance(1, 1); ApplicationId applicationId = ApplicationId.newInstance(1, 1);
ApplicationAttemptId applicationAttemptId = ApplicationAttemptId.newInstance( ApplicationAttemptId applicationAttemptId =
applicationId, 1); ApplicationAttemptId.newInstance(applicationId, 1);
ContainerId containerId = ContainerId.newContainerId(applicationAttemptId, 1); ContainerId containerId =
ContainerId.newContainerId(applicationAttemptId, 1);
ContainerStatus status = ContainerStatus.newInstance( ContainerStatus status = ContainerStatus.newInstance(
containerId, ContainerState.RUNNING, "", 0); containerId, ContainerState.RUNNING, "", 0);
@ -2424,7 +2434,8 @@ public class TestRMContainerAllocator {
abortedStatus, attemptId); abortedStatus, attemptId);
Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType()); Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
ContainerId containerId2 = ContainerId.newContainerId(applicationAttemptId, 2); ContainerId containerId2 =
ContainerId.newContainerId(applicationAttemptId, 2);
ContainerStatus status2 = ContainerStatus.newInstance(containerId2, ContainerStatus status2 = ContainerStatus.newInstance(containerId2,
ContainerState.RUNNING, "", 0); ContainerState.RUNNING, "", 0);
@ -2542,11 +2553,15 @@ public class TestRMContainerAllocator {
// create the container request // create the container request
// send MAP request // send MAP request
ContainerRequestEvent event1 = ContainerRequestEvent event1 =
createReq(jobId, 1, 1024, new String[] { "h1" }); ContainerRequestCreator.createRequest(jobId, 1,
Resource.newInstance(1024, 1),
new String[]{"h1"});
allocator.sendRequest(event1); allocator.sendRequest(event1);
ContainerRequestEvent event2 = ContainerRequestEvent event2 =
createReq(jobId, 2, 2048, new String[] { "h1", "h2" }); ContainerRequestCreator.createRequest(jobId, 2,
Resource.newInstance(2048, 1),
new String[] {"h1", "h2"});
allocator.sendRequest(event2); allocator.sendRequest(event2);
// Send events to blacklist h2 // Send events to blacklist h2
@ -2584,7 +2599,9 @@ public class TestRMContainerAllocator {
// RM // RM
// send container request // send container request
ContainerRequestEvent event3 = ContainerRequestEvent event3 =
createReq(jobId, 3, 1000, new String[] { "h1" }); ContainerRequestCreator.createRequest(jobId, 3,
Resource.newInstance(1000, 1),
new String[]{"h1"});
allocator.sendRequest(event3); allocator.sendRequest(event3);
// send deallocate request // send deallocate request
@ -2628,7 +2645,9 @@ public class TestRMContainerAllocator {
allocator.sendFailure(f2); allocator.sendFailure(f2);
ContainerRequestEvent event4 = ContainerRequestEvent event4 =
createReq(jobId, 4, 2000, new String[] { "h1", "h2" }); ContainerRequestCreator.createRequest(jobId, 4,
Resource.newInstance(2000, 1),
new String[]{"h1", "h2"});
allocator.sendRequest(event4); allocator.sendRequest(event4);
// send allocate request to 2nd RM and get resync command // send allocate request to 2nd RM and get resync command
@ -2639,7 +2658,9 @@ public class TestRMContainerAllocator {
// asks,release,blacklistAaddition // asks,release,blacklistAaddition
// and another containerRequest(event5) // and another containerRequest(event5)
ContainerRequestEvent event5 = ContainerRequestEvent event5 =
createReq(jobId, 5, 3000, new String[] { "h1", "h2", "h3" }); ContainerRequestCreator.createRequest(jobId, 5,
Resource.newInstance(3000, 1),
new String[]{"h1", "h2", "h3"});
allocator.sendRequest(event5); allocator.sendRequest(event5);
// send all outstanding request again. // send all outstanding request again.
@ -2696,9 +2717,10 @@ public class TestRMContainerAllocator {
} }
}; };
ContainerRequestEvent mapRequestEvt = createReq(jobId, 0, final int memory = (int) (maxContainerSupported.getMemorySize() + 10);
(int) (maxContainerSupported.getMemorySize() + 10), ContainerRequestEvent mapRequestEvt = createRequest(jobId, 0,
maxContainerSupported.getVirtualCores(), Resource.newInstance(memory,
maxContainerSupported.getVirtualCores()),
new String[0], false, false); new String[0], false, false);
allocator.sendRequests(Arrays.asList(mapRequestEvt)); allocator.sendRequests(Arrays.asList(mapRequestEvt));
allocator.schedule(); allocator.schedule();
@ -2734,9 +2756,10 @@ public class TestRMContainerAllocator {
} }
}; };
ContainerRequestEvent reduceRequestEvt = createReq(jobId, 0, final int memory = (int) (maxContainerSupported.getMemorySize() + 10);
(int) (maxContainerSupported.getMemorySize() + 10), ContainerRequestEvent reduceRequestEvt = createRequest(jobId, 0,
maxContainerSupported.getVirtualCores(), Resource.newInstance(memory,
maxContainerSupported.getVirtualCores()),
new String[0], false, true); new String[0], false, true);
allocator.sendRequests(Arrays.asList(reduceRequestEvt)); allocator.sendRequests(Arrays.asList(reduceRequestEvt));
// Reducer container requests are added to the pending queue upon request, // Reducer container requests are added to the pending queue upon request,
@ -2788,7 +2811,8 @@ public class TestRMContainerAllocator {
Assert.assertEquals("Should Have 1 Job Event", 1, Assert.assertEquals("Should Have 1 Job Event", 1,
allocator.jobEvents.size()); allocator.jobEvents.size());
JobEvent event = allocator.jobEvents.get(0); JobEvent event = allocator.jobEvents.get(0);
Assert.assertTrue("Should Reboot", event.getType().equals(JobEventType.JOB_AM_REBOOT)); Assert.assertTrue("Should Reboot",
event.getType().equals(JobEventType.JOB_AM_REBOOT));
} }
@Test(timeout=60000) @Test(timeout=60000)
@ -2920,7 +2944,9 @@ public class TestRMContainerAllocator {
// create some map requests // create some map requests
ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT]; ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT];
for (int i = 0; i < reqMapEvents.length; ++i) { for (int i = 0; i < reqMapEvents.length; ++i) {
reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i }); reqMapEvents[i] = ContainerRequestCreator.createRequest(jobId, i,
Resource.newInstance(1024, 1),
new String[] {"h" + i});
} }
allocator.sendRequests(Arrays.asList(reqMapEvents)); allocator.sendRequests(Arrays.asList(reqMapEvents));
// create some reduce requests // create some reduce requests
@ -2928,7 +2954,8 @@ public class TestRMContainerAllocator {
new ContainerRequestEvent[REDUCE_COUNT]; new ContainerRequestEvent[REDUCE_COUNT];
for (int i = 0; i < reqReduceEvents.length; ++i) { for (int i = 0; i < reqReduceEvents.length; ++i) {
reqReduceEvents[i] = reqReduceEvents[i] =
createReq(jobId, i, 1024, new String[] {}, false, true); createRequest(jobId, i, Resource.newInstance(1024, 1),
new String[] {}, false, true);
} }
allocator.sendRequests(Arrays.asList(reqReduceEvents)); allocator.sendRequests(Arrays.asList(reqReduceEvents));
allocator.schedule(); allocator.schedule();
@ -2975,14 +3002,17 @@ public class TestRMContainerAllocator {
// create some map requests // create some map requests
ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT]; ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[MAP_COUNT];
for (int i = 0; i < reqMapEvents.length; ++i) { for (int i = 0; i < reqMapEvents.length; ++i) {
reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i }); reqMapEvents[i] = ContainerRequestCreator.createRequest(jobId, i,
Resource.newInstance(1024, 1), new String[] {"h" + i});
} }
allocator.sendRequests(Arrays.asList(reqMapEvents)); allocator.sendRequests(Arrays.asList(reqMapEvents));
// create some reduce requests // create some reduce requests
ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[REDUCE_COUNT]; ContainerRequestEvent[] reqReduceEvents =
new ContainerRequestEvent[REDUCE_COUNT];
for (int i = 0; i < reqReduceEvents.length; ++i) { for (int i = 0; i < reqReduceEvents.length; ++i) {
reqReduceEvents[i] = createReq(jobId, i, 1024, new String[] {}, reqReduceEvents[i] =
false, true); createRequest(jobId, i, Resource.newInstance(1024, 1),
new String[] {}, false, true);
} }
allocator.sendRequests(Arrays.asList(reqReduceEvents)); allocator.sendRequests(Arrays.asList(reqReduceEvents));
allocator.schedule(); allocator.schedule();
@ -3137,13 +3167,19 @@ public class TestRMContainerAllocator {
// Request 2 maps and 1 reducer(sone on nodes which are not registered). // Request 2 maps and 1 reducer(sone on nodes which are not registered).
ContainerRequestEvent event1 = ContainerRequestEvent event1 =
createReq(jobId, 1, 1024, new String[] { "h1" }); ContainerRequestCreator.createRequest(jobId, 1,
Resource.newInstance(1024, 1),
new String[]{"h1"});
allocator.sendRequest(event1); allocator.sendRequest(event1);
ContainerRequestEvent event2 = ContainerRequestEvent event2 =
createReq(jobId, 2, 1024, new String[] { "h2" }); ContainerRequestCreator.createRequest(jobId, 2,
Resource.newInstance(1024, 1),
new String[]{"h2"});
allocator.sendRequest(event2); allocator.sendRequest(event2);
ContainerRequestEvent event3 = ContainerRequestEvent event3 =
createReq(jobId, 3, 1024, new String[] { "h2" }, false, true); createRequest(jobId, 3,
Resource.newInstance(1024, 1),
new String[]{"h2"}, false, true);
allocator.sendRequest(event3); allocator.sendRequest(event3);
// This will tell the scheduler about the requests but there will be no // This will tell the scheduler about the requests but there will be no
@ -3156,7 +3192,8 @@ public class TestRMContainerAllocator {
// Request for another reducer on h3 which has not registered. // Request for another reducer on h3 which has not registered.
ContainerRequestEvent event4 = ContainerRequestEvent event4 =
createReq(jobId, 4, 1024, new String[] { "h3" }, false, true); createRequest(jobId, 4, Resource.newInstance(1024, 1),
new String[] {"h3"}, false, true);
allocator.sendRequest(event4); allocator.sendRequest(event4);
allocator.schedule(); allocator.schedule();
@ -3301,13 +3338,18 @@ public class TestRMContainerAllocator {
// Request 2 maps and 1 reducer(sone on nodes which are not registered). // Request 2 maps and 1 reducer(sone on nodes which are not registered).
ContainerRequestEvent event1 = ContainerRequestEvent event1 =
createReq(jobId, 1, 1024, new String[] { "h1" }); ContainerRequestCreator.createRequest(jobId, 1,
Resource.newInstance(1024, 1),
new String[]{"h1"});
allocator.sendRequest(event1); allocator.sendRequest(event1);
ContainerRequestEvent event2 = ContainerRequestEvent event2 =
createReq(jobId, 2, 1024, new String[] { "h2" }); ContainerRequestCreator.createRequest(jobId, 2,
Resource.newInstance(1024, 1),
new String[]{"h2"});
allocator.sendRequest(event2); allocator.sendRequest(event2);
ContainerRequestEvent event3 = ContainerRequestEvent event3 =
createReq(jobId, 3, 1024, new String[] { "h2" }, false, true); createRequest(jobId, 3, Resource.newInstance(1024, 1),
new String[]{"h2"}, false, true);
allocator.sendRequest(event3); allocator.sendRequest(event3);
// This will tell the scheduler about the requests but there will be no // This will tell the scheduler about the requests but there will be no
@ -3320,7 +3362,8 @@ public class TestRMContainerAllocator {
// Request for another reducer on h3 which has not registered. // Request for another reducer on h3 which has not registered.
ContainerRequestEvent event4 = ContainerRequestEvent event4 =
createReq(jobId, 4, 1024, new String[] { "h3" }, false, true); createRequest(jobId, 4, Resource.newInstance(1024, 1),
new String[]{"h3"}, false, true);
allocator.sendRequest(event4); allocator.sendRequest(event4);
allocator.schedule(); allocator.schedule();
@ -3433,13 +3476,19 @@ public class TestRMContainerAllocator {
// Request 2 maps and 1 reducer(sone on nodes which are not registered). // Request 2 maps and 1 reducer(sone on nodes which are not registered).
ContainerRequestEvent event1 = ContainerRequestEvent event1 =
createReq(jobId, 1, 1024, new String[] { "h1" }); ContainerRequestCreator.createRequest(jobId, 1,
Resource.newInstance(1024, 1),
new String[]{"h1"});
allocator.sendRequest(event1); allocator.sendRequest(event1);
ContainerRequestEvent event2 = ContainerRequestEvent event2 =
createReq(jobId, 2, 1024, new String[] { "h2" }); ContainerRequestCreator.createRequest(jobId, 2,
Resource.newInstance(1024, 1),
new String[]{"h2"});
allocator.sendRequest(event2); allocator.sendRequest(event2);
ContainerRequestEvent event3 = ContainerRequestEvent event3 =
createReq(jobId, 3, 1024, new String[] { "h1" }, false, true); createRequest(jobId, 3,
Resource.newInstance(1024, 1),
new String[]{"h1"}, false, true);
allocator.sendRequest(event3); allocator.sendRequest(event3);
// This will tell the scheduler about the requests but there will be no // This will tell the scheduler about the requests but there will be no
@ -3449,7 +3498,8 @@ public class TestRMContainerAllocator {
// Request for another reducer on h3 which has not registered. // Request for another reducer on h3 which has not registered.
ContainerRequestEvent event4 = ContainerRequestEvent event4 =
createReq(jobId, 4, 1024, new String[] { "h3" }, false, true); createRequest(jobId, 4, Resource.newInstance(1024, 1),
new String[] {"h3"}, false, true);
allocator.sendRequest(event4); allocator.sendRequest(event4);
allocator.schedule(); allocator.schedule();
@ -3486,7 +3536,9 @@ public class TestRMContainerAllocator {
// Send request for one more mapper. // Send request for one more mapper.
ContainerRequestEvent event5 = ContainerRequestEvent event5 =
createReq(jobId, 5, 1024, new String[] { "h1" }); ContainerRequestCreator.createRequest(jobId, 5,
Resource.newInstance(1024, 1),
new String[]{"h1"});
allocator.sendRequest(event5); allocator.sendRequest(event5);
rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 2)); rm.getMyFifoScheduler().forceResourceLimit(Resource.newInstance(2048, 2));
@ -3528,7 +3580,7 @@ public class TestRMContainerAllocator {
return RegisterApplicationMasterResponse.newInstance( return RegisterApplicationMasterResponse.newInstance(
Resource.newInstance(512, 1), Resource.newInstance(512, 1),
Resource.newInstance(512000, 1024), Resource.newInstance(512000, 1024),
Collections.<ApplicationAccessType,String>emptyMap(), Collections.emptyMap(),
ByteBuffer.wrap("fake_key".getBytes()), ByteBuffer.wrap("fake_key".getBytes()),
Collections.<Container>emptyList(), Collections.<Container>emptyList(),
"default", "default",

View File

@ -175,16 +175,8 @@ public class UnitsConversionUtil {
*/ */
public static int compare(String unitA, long valueA, String unitB, public static int compare(String unitA, long valueA, String unitB,
long valueB) { long valueB) {
if (unitA == null || unitB == null || !KNOWN_UNITS.contains(unitA) checkUnitArgument(unitA);
|| !KNOWN_UNITS.contains(unitB)) { checkUnitArgument(unitB);
throw new IllegalArgumentException("Units cannot be null");
}
if (!KNOWN_UNITS.contains(unitA)) {
throw new IllegalArgumentException("Unknown unit '" + unitA + "'");
}
if (!KNOWN_UNITS.contains(unitB)) {
throw new IllegalArgumentException("Unknown unit '" + unitB + "'");
}
if (unitA.equals(unitB)) { if (unitA.equals(unitB)) {
return Long.compare(valueA, valueB); return Long.compare(valueA, valueB);
} }
@ -218,4 +210,36 @@ public class UnitsConversionUtil {
return tmpA.compareTo(tmpB); return tmpA.compareTo(tmpB);
} }
} }
private static void checkUnitArgument(String unit) {
if (unit == null) {
throw new IllegalArgumentException("Unit cannot be null");
} else if (!KNOWN_UNITS.contains(unit)) {
throw new IllegalArgumentException("Unknown unit '" + unit + "'");
}
}
/**
* Compare a unit to another unit.
* <br>
* Examples:<br>
* 1. 'm' (milli) is smaller than 'k' (kilo), so compareUnits("m", "k")
* will return -1.<br>
* 2. 'M' (MEGA) is greater than 'k' (kilo), so compareUnits("M", "k") will
* return 1.
*
* @param unitA first unit
* @param unitB second unit
* @return +1, 0 or -1 depending on whether the relationship between units
* is smaller than,
* equal to or lesser than.
*/
public static int compareUnits(String unitA, String unitB) {
checkUnitArgument(unitA);
checkUnitArgument(unitB);
int unitAPos = SORTED_UNITS.indexOf(unitA);
int unitBPos = SORTED_UNITS.indexOf(unitB);
return Integer.compare(unitAPos, unitBPos);
}
} }

View File

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.resourcetypes;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Contains helper methods to create Resource and ResourceInformation objects.
* ResourceInformation can be created from a resource name
* and a resource descriptor as well that comprises amount and unit.
*/
public final class ResourceTypesTestHelper {
private static final Pattern RESOURCE_VALUE_AND_UNIT_PATTERN =
Pattern.compile("(\\d+)([A-za-z]*)");
private ResourceTypesTestHelper() {}
private static final RecordFactory RECORD_FACTORY = RecordFactoryProvider
.getRecordFactory(null);
private static final class ResourceValueAndUnit {
private final Long value;
private final String unit;
private ResourceValueAndUnit(Long value, String unit) {
this.value = value;
this.unit = unit;
}
}
public static Resource newResource(long memory, int vCores, Map<String,
String> customResources) {
Resource resource = RECORD_FACTORY.newRecordInstance(Resource.class);
resource.setMemorySize(memory);
resource.setVirtualCores(vCores);
for (Map.Entry<String, String> customResource :
customResources.entrySet()) {
String resourceName = customResource.getKey();
ResourceInformation resourceInformation =
createResourceInformation(resourceName,
customResource.getValue());
resource.setResourceInformation(resourceName, resourceInformation);
}
return resource;
}
public static ResourceInformation createResourceInformation(String
resourceName, String descriptor) {
ResourceValueAndUnit resourceValueAndUnit =
getResourceValueAndUnit(descriptor);
return ResourceInformation
.newInstance(resourceName, resourceValueAndUnit.unit,
resourceValueAndUnit.value);
}
private static ResourceValueAndUnit getResourceValueAndUnit(String val) {
Matcher matcher = RESOURCE_VALUE_AND_UNIT_PATTERN.matcher(val);
if (!matcher.find()) {
throw new RuntimeException("Invalid pattern of resource descriptor: " +
val);
} else if (matcher.groupCount() != 2) {
throw new RuntimeException("Capturing group count in string " +
val + " is not 2!");
}
long value = Long.parseLong(matcher.group(1));
return new ResourceValueAndUnit(value, matcher.group(2));
}
}

View File

@ -476,6 +476,10 @@ public class BuilderUtils {
return resource; return resource;
} }
public static Resource newEmptyResource() {
return recordFactory.newRecordInstance(Resource.class);
}
public static URL newURL(String scheme, String host, int port, String file) { public static URL newURL(String scheme, String host, int port, String file) {
URL url = recordFactory.newRecordInstance(URL.class); URL url = recordFactory.newRecordInstance(URL.class);
url.setScheme(scheme); url.setScheme(scheme);

View File

@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
@ -283,24 +284,10 @@ public class SchedulerUtils {
private static void validateResourceRequest(ResourceRequest resReq, private static void validateResourceRequest(ResourceRequest resReq,
Resource maximumResource, QueueInfo queueInfo, RMContext rmContext) Resource maximumResource, QueueInfo queueInfo, RMContext rmContext)
throws InvalidResourceRequestException { throws InvalidResourceRequestException {
Resource requestedResource = resReq.getCapability(); final Resource requestedResource = resReq.getCapability();
for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) { checkResourceRequestAgainstAvailableResource(requestedResource,
ResourceInformation reqRI = requestedResource.getResourceInformation(i); maximumResource);
ResourceInformation maxRI = maximumResource.getResourceInformation(i);
if (reqRI.getValue() < 0 || reqRI.getValue() > maxRI.getValue()) {
throw new InvalidResourceRequestException(
"Invalid resource request, requested resource type=[" + reqRI
.getName()
+ "] < 0 or greater than maximum allowed allocation. Requested "
+ "resource=" + requestedResource
+ ", maximum allowed allocation=" + maximumResource
+ ", please note that maximum allowed allocation is calculated "
+ "by scheduler based on maximum resource of registered "
+ "NodeManagers, which might be less than configured "
+ "maximum allocation=" + ResourceUtils
.getResourceTypesMaximumAllocation());
}
}
String labelExp = resReq.getNodeLabelExpression(); String labelExp = resReq.getNodeLabelExpression();
// we don't allow specify label expression other than resourceName=ANY now // we don't allow specify label expression other than resourceName=ANY now
if (!ResourceRequest.ANY.equals(resReq.getResourceName()) if (!ResourceRequest.ANY.equals(resReq.getResourceName())
@ -338,6 +325,78 @@ public class SchedulerUtils {
} }
} }
@Private
@VisibleForTesting
static void checkResourceRequestAgainstAvailableResource(Resource reqResource,
Resource availableResource) throws InvalidResourceRequestException {
for (int i = 0; i < ResourceUtils.getNumberOfKnownResourceTypes(); i++) {
final ResourceInformation requestedRI =
reqResource.getResourceInformation(i);
final String reqResourceName = requestedRI.getName();
if (requestedRI.getValue() < 0) {
throwInvalidResourceException(reqResource, availableResource,
reqResourceName);
}
final ResourceInformation availableRI =
availableResource.getResourceInformation(reqResourceName);
long requestedResourceValue = requestedRI.getValue();
long availableResourceValue = availableRI.getValue();
int unitsRelation = UnitsConversionUtil
.compareUnits(requestedRI.getUnits(), availableRI.getUnits());
if (LOG.isDebugEnabled()) {
LOG.debug("Requested resource information: " + requestedRI);
LOG.debug("Available resource information: " + availableRI);
LOG.debug("Relation of units: " + unitsRelation);
}
// requested resource unit is less than available resource unit
// e.g. requestedUnit: "m", availableUnit: "K")
if (unitsRelation < 0) {
availableResourceValue =
UnitsConversionUtil.convert(availableRI.getUnits(),
requestedRI.getUnits(), availableRI.getValue());
// requested resource unit is greater than available resource unit
// e.g. requestedUnit: "G", availableUnit: "M")
} else if (unitsRelation > 0) {
requestedResourceValue =
UnitsConversionUtil.convert(requestedRI.getUnits(),
availableRI.getUnits(), requestedRI.getValue());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Requested resource value after conversion: " +
requestedResourceValue);
LOG.info("Available resource value after conversion: " +
availableResourceValue);
}
if (requestedResourceValue > availableResourceValue) {
throwInvalidResourceException(reqResource, availableResource,
reqResourceName);
}
}
}
private static void throwInvalidResourceException(Resource reqResource,
Resource availableResource, String reqResourceName)
throws InvalidResourceRequestException {
throw new InvalidResourceRequestException(
"Invalid resource request, requested resource type=[" + reqResourceName
+ "] < 0 or greater than maximum allowed allocation. Requested "
+ "resource=" + reqResource + ", maximum allowed allocation="
+ availableResource
+ ", please note that maximum allowed allocation is calculated "
+ "by scheduler based on maximum resource of registered "
+ "NodeManagers, which might be less than configured "
+ "maximum allocation="
+ ResourceUtils.getResourceTypesMaximumAllocation());
}
private static void checkQueueLabelInLabelManager(String labelExpression, private static void checkQueueLabelInLabelManager(String labelExpression,
RMContext rmContext) throws InvalidLabelResourceRequestException { RMContext rmContext) throws InvalidLabelResourceRequestException {
// check node label manager contains this label // check node label manager contains this label

View File

@ -22,9 +22,13 @@ import static java.lang.Thread.sleep;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB; import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES; import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES;
import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
@ -61,6 +65,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles; import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -75,6 +80,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
@ -365,7 +373,7 @@ public class TestApplicationMasterService {
am2.addContainerToBeReleased(cId); am2.addContainerToBeReleased(cId);
try { try {
am2.schedule(); am2.schedule();
Assert.fail("Exception was expected!!"); fail("Exception was expected!!");
} catch (InvalidContainerReleaseException e) { } catch (InvalidContainerReleaseException e) {
StringBuilder sb = new StringBuilder("Cannot release container : "); StringBuilder sb = new StringBuilder("Cannot release container : ");
sb.append(cId.toString()); sb.append(cId.toString());
@ -460,7 +468,7 @@ public class TestApplicationMasterService {
FinalApplicationStatus.FAILED, "", ""); FinalApplicationStatus.FAILED, "", "");
try { try {
am1.unregisterAppAttempt(req, false); am1.unregisterAppAttempt(req, false);
Assert.fail("ApplicationMasterNotRegisteredException should be thrown"); fail("ApplicationMasterNotRegisteredException should be thrown");
} catch (ApplicationMasterNotRegisteredException e) { } catch (ApplicationMasterNotRegisteredException e) {
Assert.assertNotNull(e); Assert.assertNotNull(e);
Assert.assertNotNull(e.getMessage()); Assert.assertNotNull(e.getMessage());
@ -468,7 +476,7 @@ public class TestApplicationMasterService {
"Application Master is trying to unregister before registering for:" "Application Master is trying to unregister before registering for:"
)); ));
} catch (Exception e) { } catch (Exception e) {
Assert.fail("ApplicationMasterNotRegisteredException should be thrown"); fail("ApplicationMasterNotRegisteredException should be thrown");
} }
am1.registerAppAttempt(); am1.registerAppAttempt();
@ -627,11 +635,9 @@ public class TestApplicationMasterService {
Assert.assertEquals("UPDATE_OUTSTANDING_ERROR", Assert.assertEquals("UPDATE_OUTSTANDING_ERROR",
response.getUpdateErrors().get(0).getReason()); response.getUpdateErrors().get(0).getReason());
} finally { } finally {
if (rm != null) {
rm.close(); rm.close();
} }
} }
}
@Test(timeout = 300000) @Test(timeout = 300000)
public void testPriorityInAllocatedResponse() throws Exception { public void testPriorityInAllocatedResponse() throws Exception {
@ -709,33 +715,47 @@ public class TestApplicationMasterService {
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
CapacitySchedulerConfiguration csconf = final YarnConfiguration yarnConf;
if (schedulerCls.getCanonicalName()
.equals(CapacityScheduler.class.getCanonicalName())) {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(); new CapacitySchedulerConfiguration();
csconf.setResourceComparator(DominantResourceCalculator.class); csConf.setResourceComparator(DominantResourceCalculator.class);
yarnConf = new YarnConfiguration(csConf);
} else if (schedulerCls.getCanonicalName()
.equals(FairScheduler.class.getCanonicalName())) {
FairSchedulerConfiguration fsConf = new FairSchedulerConfiguration();
yarnConf = new YarnConfiguration(fsConf);
} else {
throw new IllegalStateException(
"Scheduler class is of wrong type: " + schedulerCls);
}
YarnConfiguration conf = new YarnConfiguration(csconf);
// Don't reset resource types since we have already configured resource // Don't reset resource types since we have already configured resource
// types // types
conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false); yarnConf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES,
conf.setClass(YarnConfiguration.RM_SCHEDULER, schedulerCls, false);
yarnConf.setClass(YarnConfiguration.RM_SCHEDULER, schedulerCls,
ResourceScheduler.class); ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, false); yarnConf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, false);
MockRM rm = new MockRM(conf); MockRM rm = new MockRM(yarnConf);
rm.start(); rm.start();
MockNM nm1 = rm.registerNode("199.99.99.1:1234", TestUtils MockNM nm1 = rm.registerNode("199.99.99.1:1234", TestUtils
.createResource(DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, .createResource(DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, null)); DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, null));
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); RMApp app1 = rm.submitApp(GB, "app", "user", null, "default");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
// Now request resource, memory > allowed // Now request resource, memory > allowed
boolean exception = false; boolean exception = false;
try { try {
am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability( am1.allocate(Collections.singletonList(ResourceRequest.newBuilder()
Resource.newInstance(9 * GB, 1)).numContainers(1).resourceName("*") .capability(Resource.newInstance(9 * GB, 1))
.numContainers(1)
.resourceName("*")
.build()), null); .build()), null);
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
exception = true; exception = true;
@ -744,9 +764,11 @@ public class TestApplicationMasterService {
exception = false; exception = false;
try { try {
// Now request resource, vcore > allowed // Now request resource, vcores > allowed
am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability( am1.allocate(Collections.singletonList(ResourceRequest.newBuilder()
Resource.newInstance(8 * GB, 18)).numContainers(1).resourceName("*") .capability(Resource.newInstance(8 * GB, 18))
.numContainers(1)
.resourceName("*")
.build()), null); .build()), null);
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
exception = true; exception = true;
@ -756,6 +778,73 @@ public class TestApplicationMasterService {
rm.close(); rm.close();
} }
@Test
public void testValidateRequestCapacityAgainstMinMaxAllocationWithDifferentUnits()
throws Exception {
// Initialize resource map for 2 types.
Map<String, ResourceInformation> riMap = new HashMap<>();
// Initialize mandatory resources
ResourceInformation memory =
ResourceInformation.newInstance(ResourceInformation.MEMORY_MB.getName(),
ResourceInformation.MEMORY_MB.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
ResourceInformation vcores =
ResourceInformation.newInstance(ResourceInformation.VCORES.getName(),
ResourceInformation.VCORES.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
ResourceInformation res1 =
ResourceInformation.newInstance("res_1", "G", 0, 4);
riMap.put(ResourceInformation.MEMORY_URI, memory);
riMap.put(ResourceInformation.VCORES_URI, vcores);
riMap.put("res_1", res1);
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
FairSchedulerConfiguration fsConf =
new FairSchedulerConfiguration();
YarnConfiguration yarnConf = new YarnConfiguration(fsConf);
// Don't reset resource types since we have already configured resource
// types
yarnConf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES,
false);
yarnConf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
yarnConf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, false);
MockRM rm = new MockRM(yarnConf);
rm.start();
MockNM nm1 = rm.registerNode("199.99.99.1:1234",
ResourceTypesTestHelper.newResource(
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
ImmutableMap.<String, String> builder()
.put("res_1", "5G").build()));
RMApp app1 = rm.submitApp(GB, "app", "user", null, "default");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
// Now request res_1, 500M < 5G so it should be allowed
try {
am1.allocate(Collections.singletonList(ResourceRequest.newBuilder()
.capability(ResourceTypesTestHelper.newResource(4 * GB, 1,
ImmutableMap.<String, String> builder()
.put("res_1", "500M")
.build()))
.numContainers(1).resourceName("*").build()), null);
} catch (InvalidResourceRequestException e) {
fail(
"Allocate request should be accepted but exception was thrown: " + e);
}
rm.close();
}
@Test(timeout = 300000) @Test(timeout = 300000)
public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceTypes() public void testValidateRequestCapacityAgainstMinMaxAllocationFor3rdResourceTypes()
throws Exception { throws Exception {
@ -774,11 +863,11 @@ public class TestApplicationMasterService {
ResourceInformation.VCORES.getUnits(), ResourceInformation.VCORES.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
ResourceInformation res_1 = ResourceInformation.newInstance("res_1", ResourceInformation res1 = ResourceInformation.newInstance("res_1",
ResourceInformation.VCORES.getUnits(), 0, 4); ResourceInformation.VCORES.getUnits(), 0, 4);
riMap.put(ResourceInformation.MEMORY_URI, memory); riMap.put(ResourceInformation.MEMORY_URI, memory);
riMap.put(ResourceInformation.VCORES_URI, vcores); riMap.put(ResourceInformation.VCORES_URI, vcores);
riMap.put("res_1", res_1); riMap.put("res_1", res1);
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
@ -786,15 +875,16 @@ public class TestApplicationMasterService {
new CapacitySchedulerConfiguration(); new CapacitySchedulerConfiguration();
csconf.setResourceComparator(DominantResourceCalculator.class); csconf.setResourceComparator(DominantResourceCalculator.class);
YarnConfiguration conf = new YarnConfiguration(csconf); YarnConfiguration yarnConf = new YarnConfiguration(csconf);
// Don't reset resource types since we have already configured resource // Don't reset resource types since we have already configured resource
// types // types
conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false); yarnConf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES,
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, false);
yarnConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class); ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, false); yarnConf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, false);
MockRM rm = new MockRM(conf); MockRM rm = new MockRM(yarnConf);
rm.start(); rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
@ -805,18 +895,21 @@ public class TestApplicationMasterService {
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES, DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
ImmutableMap.of("res_1", 4))); ImmutableMap.of("res_1", 4)));
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); RMApp app1 = rm.submitApp(GB, "app", "user", null, "default");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
Assert.assertEquals(Resource.newInstance(1 * GB, 1), Assert.assertEquals(Resource.newInstance(GB, 1),
leafQueue.getUsedResources()); leafQueue.getUsedResources());
// Now request resource, memory > allowed // Now request resource, memory > allowed
boolean exception = false; boolean exception = false;
try { try {
am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability( am1.allocate(Collections.singletonList(ResourceRequest.newBuilder()
TestUtils.createResource(9 * GB, 1, ImmutableMap.of("res_1", 1))) .capability(TestUtils.createResource(9 * GB, 1,
.numContainers(1).resourceName("*").build()), null); ImmutableMap.of("res_1", 1)))
.numContainers(1)
.resourceName("*")
.build()), null);
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
exception = true; exception = true;
} }
@ -824,10 +917,12 @@ public class TestApplicationMasterService {
exception = false; exception = false;
try { try {
// Now request resource, vcore > allowed // Now request resource, vcores > allowed
am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability( am1.allocate(Collections.singletonList(ResourceRequest.newBuilder()
.capability(
TestUtils.createResource(8 * GB, 18, ImmutableMap.of("res_1", 1))) TestUtils.createResource(8 * GB, 18, ImmutableMap.of("res_1", 1)))
.numContainers(1).resourceName("*") .numContainers(1)
.resourceName("*")
.build()), null); .build()), null);
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
exception = true; exception = true;
@ -837,9 +932,11 @@ public class TestApplicationMasterService {
exception = false; exception = false;
try { try {
// Now request resource, res_1 > allowed // Now request resource, res_1 > allowed
am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability( am1.allocate(Collections.singletonList(ResourceRequest.newBuilder()
TestUtils.createResource(8 * GB, 1, ImmutableMap.of("res_1", 100))) .capability(TestUtils.createResource(8 * GB, 1,
.numContainers(1).resourceName("*") ImmutableMap.of("res_1", 100)))
.numContainers(1)
.resourceName("*")
.build()), null); .build()), null);
} catch (InvalidResourceRequestException e) { } catch (InvalidResourceRequestException e) {
exception = true; exception = true;
@ -856,7 +953,7 @@ public class TestApplicationMasterService {
rmContainer.handle( rmContainer.handle(
new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED)); new RMContainerEvent(containerId, RMContainerEventType.LAUNCHED));
} else { } else {
Assert.fail("Cannot find RMContainer"); fail("Cannot find RMContainer");
} }
} }
} }

View File

@ -26,7 +26,9 @@ import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.security.PrivilegedAction; import java.security.PrivilegedAction;
import java.util.Arrays; import java.util.Arrays;
@ -35,6 +37,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -42,6 +45,7 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.LocalConfigurationProvider;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
@ -63,8 +67,10 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException; import org.apache.hadoop.yarn.exceptions.InvalidLabelResourceRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS; import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
@ -83,20 +89,79 @@ import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.junit.rules.ExpectedException;
public class TestSchedulerUtils { public class TestSchedulerUtils {
private static final Log LOG = LogFactory.getLog(TestSchedulerUtils.class); private static final Log LOG = LogFactory.getLog(TestSchedulerUtils.class);
private static Resource configuredMaxAllocation;
private static class CustomResourceTypesConfigurationProvider
extends LocalConfigurationProvider {
@Override
public InputStream getConfigurationInputStream(Configuration bootstrapConf,
String name) throws YarnException, IOException {
if (YarnConfiguration.RESOURCE_TYPES_CONFIGURATION_FILE.equals(name)) {
return new ByteArrayInputStream(
("<configuration>\n" +
" <property>\n" +
" <name>yarn.resource-types</name>\n" +
" <value>custom-resource-1," +
"custom-resource-2,custom-resource-3</value>\n" +
" </property>\n" +
" <property>\n" +
" <name>yarn.resource-types" +
".custom-resource-1.units</name>\n" +
" <value>G</value>\n" +
" </property>\n" +
" <property>\n" +
" <name>yarn.resource-types" +
".custom-resource-2.units</name>\n" +
" <value>G</value>\n" +
" </property>\n" +
"</configuration>\n").getBytes());
} else {
return super.getConfigurationInputStream(bootstrapConf, name);
}
}
}
private RMContext rmContext = getMockRMContext(); private RMContext rmContext = getMockRMContext();
private static YarnConfiguration conf = new YarnConfiguration(); private static YarnConfiguration conf = new YarnConfiguration();
@Rule
public ExpectedException exception = ExpectedException.none();
private void initResourceTypes() {
Configuration yarnConf = new Configuration();
yarnConf.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
CustomResourceTypesConfigurationProvider.class.getName());
ResourceUtils.resetResourceTypes(yarnConf);
}
@Before
public void setUp() {
initResourceTypes();
//this needs to be initialized after initResourceTypes is called
configuredMaxAllocation = Resource.newInstance(8192, 4,
ImmutableMap.<String,
Long>builder()
.put("custom-resource-1", Long.MAX_VALUE)
.put("custom-resource-2", Long.MAX_VALUE)
.put("custom-resource-3", Long.MAX_VALUE)
.build());
}
@Test (timeout = 30000) @Test (timeout = 30000)
public void testNormalizeRequest() { public void testNormalizeRequest() {
ResourceCalculator resourceCalculator = new DefaultResourceCalculator(); ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
@ -150,14 +215,16 @@ public class TestSchedulerUtils {
// multiple of minMemory > maxMemory, then reduce to maxMemory // multiple of minMemory > maxMemory, then reduce to maxMemory
SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource, SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource,
maxResource); maxResource);
assertEquals(maxResource.getMemorySize(), ask.getCapability().getMemorySize()); assertEquals(maxResource.getMemorySize(),
ask.getCapability().getMemorySize());
// ask is more than max // ask is more than max
maxResource = Resources.createResource(maxMemory, 0); maxResource = Resources.createResource(maxMemory, 0);
ask.setCapability(Resources.createResource(maxMemory + 100)); ask.setCapability(Resources.createResource(maxMemory + 100));
SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource, SchedulerUtils.normalizeRequest(ask, resourceCalculator, minResource,
maxResource); maxResource);
assertEquals(maxResource.getMemorySize(), ask.getCapability().getMemorySize()); assertEquals(maxResource.getMemorySize(),
ask.getCapability().getMemorySize());
} }
@Test (timeout = 30000) @Test (timeout = 30000)
@ -201,7 +268,8 @@ public class TestSchedulerUtils {
Set<String> queueAccessibleNodeLabels = Sets.newHashSet(); Set<String> queueAccessibleNodeLabels = Sets.newHashSet();
QueueInfo queueInfo = mock(QueueInfo.class); QueueInfo queueInfo = mock(QueueInfo.class);
when(queueInfo.getQueueName()).thenReturn("queue"); when(queueInfo.getQueueName()).thenReturn("queue");
when(queueInfo.getAccessibleNodeLabels()).thenReturn(queueAccessibleNodeLabels); when(queueInfo.getAccessibleNodeLabels())
.thenReturn(queueAccessibleNodeLabels);
when(scheduler.getQueueInfo(any(String.class), anyBoolean(), anyBoolean())) when(scheduler.getQueueInfo(any(String.class), anyBoolean(), anyBoolean()))
.thenReturn(queueInfo); .thenReturn(queueInfo);
@ -363,7 +431,7 @@ public class TestSchedulerUtils {
rmContext.getNodeLabelManager().removeFromClusterNodeLabels( rmContext.getNodeLabelManager().removeFromClusterNodeLabels(
Arrays.asList("x")); Arrays.asList("x"));
} }
Assert.assertTrue("InvalidLabelResourceRequestException excpeted", Assert.assertTrue("InvalidLabelResourceRequestException expected",
invalidlabelexception); invalidlabelexception);
// queue is "*", always succeeded // queue is "*", always succeeded
try { try {
@ -610,9 +678,7 @@ public class TestSchedulerUtils {
// more than max vcores // more than max vcores
try { try {
Resource resource = Resource resource = Resources.createResource(
Resources
.createResource(
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 1); YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES + 1);
ResourceRequest resReq = ResourceRequest resReq =
@ -648,8 +714,8 @@ public class TestSchedulerUtils {
waitForLaunchedState(attempt); waitForLaunchedState(attempt);
// Create a client to the RM. // Create a client to the RM.
final Configuration conf = rm.getConfig(); final Configuration yarnConf = rm.getConfig();
final YarnRPC rpc = YarnRPC.create(conf); final YarnRPC rpc = YarnRPC.create(yarnConf);
UserGroupInformation currentUser = UserGroupInformation currentUser =
UserGroupInformation.createRemoteUser(applicationAttemptId.toString()); UserGroupInformation.createRemoteUser(applicationAttemptId.toString());
@ -665,7 +731,7 @@ public class TestSchedulerUtils {
@Override @Override
public ApplicationMasterProtocol run() { public ApplicationMasterProtocol run() {
return (ApplicationMasterProtocol) rpc.getProxy( return (ApplicationMasterProtocol) rpc.getProxy(
ApplicationMasterProtocol.class, rmBindAddress, conf); ApplicationMasterProtocol.class, rmBindAddress, yarnConf);
} }
}); });
@ -775,6 +841,127 @@ public class TestSchedulerUtils {
} }
} }
@Test
public void testCustomResourceRequestedUnitIsSmallerThanAvailableUnit()
throws InvalidResourceRequestException {
Resource requestedResource =
ResourceTypesTestHelper.newResource(1, 1,
ImmutableMap.of("custom-resource-1", "11"));
Resource availableResource =
ResourceTypesTestHelper.newResource(1, 1,
ImmutableMap.of("custom-resource-1", "0G"));
exception.expect(InvalidResourceRequestException.class);
exception.expectMessage(InvalidResourceRequestExceptionMessageGenerator
.create().withRequestedResourceType("custom-resource-1")
.withRequestedResource(requestedResource)
.withAvailableAllocation(availableResource)
.withMaxAllocation(configuredMaxAllocation).build());
SchedulerUtils.checkResourceRequestAgainstAvailableResource(
requestedResource, availableResource);
}
@Test
public void testCustomResourceRequestedUnitIsSmallerThanAvailableUnit2() {
Resource requestedResource =
ResourceTypesTestHelper.newResource(1, 1,
ImmutableMap.of("custom-resource-1", "11"));
Resource availableResource =
ResourceTypesTestHelper.newResource(1, 1,
ImmutableMap.of("custom-resource-1", "1G"));
try {
SchedulerUtils.checkResourceRequestAgainstAvailableResource(
requestedResource, availableResource);
} catch (InvalidResourceRequestException e) {
fail(String.format(
"Resource request should be accepted. Requested: %s, available: %s",
requestedResource, availableResource));
}
}
@Test
public void testCustomResourceRequestedUnitIsGreaterThanAvailableUnit()
throws InvalidResourceRequestException {
Resource requestedResource =
ResourceTypesTestHelper.newResource(1, 1,
ImmutableMap.of("custom-resource-1", "1M"));
Resource availableResource = ResourceTypesTestHelper.newResource(1, 1,
ImmutableMap.<String, String> builder().put("custom-resource-1", "120k")
.build());
exception.expect(InvalidResourceRequestException.class);
exception.expectMessage(InvalidResourceRequestExceptionMessageGenerator
.create().withRequestedResourceType("custom-resource-1")
.withRequestedResource(requestedResource)
.withAvailableAllocation(availableResource)
.withMaxAllocation(configuredMaxAllocation).build());
SchedulerUtils.checkResourceRequestAgainstAvailableResource(
requestedResource, availableResource);
}
@Test
public void testCustomResourceRequestedUnitIsGreaterThanAvailableUnit2() {
Resource requestedResource = ResourceTypesTestHelper.newResource(1, 1,
ImmutableMap.<String, String> builder().put("custom-resource-1", "11M")
.build());
Resource availableResource =
ResourceTypesTestHelper.newResource(1, 1,
ImmutableMap.of("custom-resource-1", "1G"));
try {
SchedulerUtils.checkResourceRequestAgainstAvailableResource(
requestedResource, availableResource);
} catch (InvalidResourceRequestException e) {
fail(String.format(
"Resource request should be accepted. Requested: %s, available: %s",
requestedResource, availableResource));
}
}
@Test
public void testCustomResourceRequestedUnitIsSameAsAvailableUnit() {
Resource requestedResource = ResourceTypesTestHelper.newResource(1, 1,
ImmutableMap.of("custom-resource-1", "11M"));
Resource availableResource = ResourceTypesTestHelper.newResource(1, 1,
ImmutableMap.of("custom-resource-1", "100M"));
try {
SchedulerUtils.checkResourceRequestAgainstAvailableResource(
requestedResource, availableResource);
} catch (InvalidResourceRequestException e) {
fail(String.format(
"Resource request should be accepted. Requested: %s, available: %s",
requestedResource, availableResource));
}
}
@Test
public void testCustomResourceRequestedUnitIsSameAsAvailableUnit2()
throws InvalidResourceRequestException {
Resource requestedResource = ResourceTypesTestHelper.newResource(1, 1,
ImmutableMap.of("custom-resource-1", "110M"));
Resource availableResource = ResourceTypesTestHelper.newResource(1, 1,
ImmutableMap.of("custom-resource-1", "100M"));
exception.expect(InvalidResourceRequestException.class);
exception.expectMessage(InvalidResourceRequestExceptionMessageGenerator
.create().withRequestedResourceType("custom-resource-1")
.withRequestedResource(requestedResource)
.withAvailableAllocation(availableResource)
.withMaxAllocation(configuredMaxAllocation).build());
SchedulerUtils.checkResourceRequestAgainstAvailableResource(
requestedResource, availableResource);
}
public static void waitSchedulerApplicationAttemptStopped( public static void waitSchedulerApplicationAttemptStopped(
AbstractYarnScheduler ys, AbstractYarnScheduler ys,
ApplicationAttemptId attemptId) throws InterruptedException { ApplicationAttemptId attemptId) throws InterruptedException {
@ -801,8 +988,7 @@ public class TestSchedulerUtils {
public static SchedulerApplication<SchedulerApplicationAttempt> public static SchedulerApplication<SchedulerApplicationAttempt>
verifyAppAddedAndRemovedFromScheduler( verifyAppAddedAndRemovedFromScheduler(
Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> applications, Map<ApplicationId, SchedulerApplication<SchedulerApplicationAttempt>> applications,
EventHandler<SchedulerEvent> handler, String queueName) EventHandler<SchedulerEvent> handler, String queueName) {
throws Exception {
ApplicationId appId = ApplicationId appId =
ApplicationId.newInstance(System.currentTimeMillis(), 1); ApplicationId.newInstance(System.currentTimeMillis(), 1);
@ -832,4 +1018,60 @@ public class TestSchedulerUtils {
when(rmContext.getNodeLabelManager()).thenReturn(nlm); when(rmContext.getNodeLabelManager()).thenReturn(nlm);
return rmContext; return rmContext;
} }
private static class InvalidResourceRequestExceptionMessageGenerator {
private StringBuilder sb;
private Resource requestedResource;
private Resource availableAllocation;
private Resource configuredMaxAllowedAllocation;
private String resourceType;
InvalidResourceRequestExceptionMessageGenerator(StringBuilder sb) {
this.sb = sb;
}
public static InvalidResourceRequestExceptionMessageGenerator create() {
return new InvalidResourceRequestExceptionMessageGenerator(
new StringBuilder());
}
InvalidResourceRequestExceptionMessageGenerator withRequestedResource(
Resource r) {
this.requestedResource = r;
return this;
}
InvalidResourceRequestExceptionMessageGenerator withRequestedResourceType(
String rt) {
this.resourceType = rt;
return this;
}
InvalidResourceRequestExceptionMessageGenerator withAvailableAllocation(
Resource r) {
this.availableAllocation = r;
return this;
}
InvalidResourceRequestExceptionMessageGenerator withMaxAllocation(
Resource r) {
this.configuredMaxAllowedAllocation = r;
return this;
}
public String build() {
return sb
.append("Invalid resource request, requested resource type=[")
.append(resourceType).append("]")
.append(" < 0 or greater than maximum allowed allocation. ")
.append("Requested resource=").append(requestedResource).append(", ")
.append("maximum allowed allocation=").append(availableAllocation)
.append(", please note that maximum allowed allocation is calculated "
+ "by scheduler based on maximum resource of " +
"registered NodeManagers, which might be less than " +
"configured maximum allocation=")
.append(configuredMaxAllowedAllocation).toString();
}
}
} }