YARN-6406. Remove SchedulerRequestKeys when no more pending ResourceRequest. (Arun Suresh via wangda)
(cherry picked from commit 87e2ef8c98
)
This commit is contained in:
parent
d4f553d42f
commit
6ade9e6ce6
|
@ -25,12 +25,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||||
|
@ -51,9 +47,8 @@ import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeMap;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentSkipListMap;
|
import java.util.concurrent.ConcurrentSkipListSet;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
@ -86,8 +81,8 @@ public class AppSchedulingInfo {
|
||||||
|
|
||||||
private Set<String> requestedPartitions = new HashSet<>();
|
private Set<String> requestedPartitions = new HashSet<>();
|
||||||
|
|
||||||
private final ConcurrentSkipListMap<SchedulerRequestKey, Integer>
|
private final ConcurrentSkipListSet<SchedulerRequestKey>
|
||||||
schedulerKeys = new ConcurrentSkipListMap<>();
|
schedulerKeys = new ConcurrentSkipListSet<>();
|
||||||
final Map<SchedulerRequestKey, SchedulingPlacementSet<SchedulerNode>>
|
final Map<SchedulerRequestKey, SchedulingPlacementSet<SchedulerNode>>
|
||||||
schedulerKeyToPlacementSets = new ConcurrentHashMap<>();
|
schedulerKeyToPlacementSets = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
@ -156,29 +151,6 @@ public class AppSchedulingInfo {
|
||||||
LOG.info("Application " + applicationId + " requests cleared");
|
LOG.info("Application " + applicationId + " requests cleared");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private void incrementSchedulerKeyReference(
|
|
||||||
SchedulerRequestKey schedulerKey) {
|
|
||||||
Integer schedulerKeyCount = schedulerKeys.get(schedulerKey);
|
|
||||||
if (schedulerKeyCount == null) {
|
|
||||||
schedulerKeys.put(schedulerKey, 1);
|
|
||||||
} else {
|
|
||||||
schedulerKeys.put(schedulerKey, schedulerKeyCount + 1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void decrementSchedulerKeyReference(
|
|
||||||
SchedulerRequestKey schedulerKey) {
|
|
||||||
Integer schedulerKeyCount = schedulerKeys.get(schedulerKey);
|
|
||||||
if (schedulerKeyCount != null) {
|
|
||||||
if (schedulerKeyCount > 1) {
|
|
||||||
schedulerKeys.put(schedulerKey, schedulerKeyCount - 1);
|
|
||||||
} else {
|
|
||||||
schedulerKeys.remove(schedulerKey);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public ContainerUpdateContext getUpdateContext() {
|
public ContainerUpdateContext getUpdateContext() {
|
||||||
return updateContext;
|
return updateContext;
|
||||||
}
|
}
|
||||||
|
@ -231,6 +203,10 @@ public class AppSchedulingInfo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void removePlacementSets(SchedulerRequestKey schedulerRequestKey) {
|
||||||
|
schedulerKeyToPlacementSets.remove(schedulerRequestKey);
|
||||||
|
}
|
||||||
|
|
||||||
boolean addToPlacementSets(
|
boolean addToPlacementSets(
|
||||||
boolean recoverPreemptedRequestForAContainer,
|
boolean recoverPreemptedRequestForAContainer,
|
||||||
Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests) {
|
Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests) {
|
||||||
|
@ -269,7 +245,8 @@ public class AppSchedulingInfo {
|
||||||
(lastRequest != null) ? lastRequest.getNumContainers() : 0;
|
(lastRequest != null) ? lastRequest.getNumContainers() : 0;
|
||||||
if (request.getNumContainers() <= 0) {
|
if (request.getNumContainers() <= 0) {
|
||||||
if (lastRequestContainers >= 0) {
|
if (lastRequestContainers >= 0) {
|
||||||
decrementSchedulerKeyReference(schedulerKey);
|
schedulerKeys.remove(schedulerKey);
|
||||||
|
schedulerKeyToPlacementSets.remove(schedulerKey);
|
||||||
}
|
}
|
||||||
LOG.info("checking for deactivate of application :"
|
LOG.info("checking for deactivate of application :"
|
||||||
+ this.applicationId);
|
+ this.applicationId);
|
||||||
|
@ -277,7 +254,7 @@ public class AppSchedulingInfo {
|
||||||
} else {
|
} else {
|
||||||
// Activate application. Metrics activation is done here.
|
// Activate application. Metrics activation is done here.
|
||||||
if (lastRequestContainers <= 0) {
|
if (lastRequestContainers <= 0) {
|
||||||
incrementSchedulerKeyReference(schedulerKey);
|
schedulerKeys.add(schedulerKey);
|
||||||
activeUsersManager.activateApplication(user, applicationId);
|
activeUsersManager.activateApplication(user, applicationId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -367,7 +344,7 @@ public class AppSchedulingInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
public Collection<SchedulerRequestKey> getSchedulerKeys() {
|
public Collection<SchedulerRequestKey> getSchedulerKeys() {
|
||||||
return schedulerKeys.keySet();
|
return schedulerKeys;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -390,7 +367,7 @@ public class AppSchedulingInfo {
|
||||||
public PendingAsk getNextPendingAsk() {
|
public PendingAsk getNextPendingAsk() {
|
||||||
try {
|
try {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
SchedulerRequestKey firstRequestKey = schedulerKeys.firstKey();
|
SchedulerRequestKey firstRequestKey = schedulerKeys.first();
|
||||||
return getPendingAsk(firstRequestKey, ResourceRequest.ANY);
|
return getPendingAsk(firstRequestKey, ResourceRequest.ANY);
|
||||||
} finally {
|
} finally {
|
||||||
readLock.unlock();
|
readLock.unlock();
|
||||||
|
@ -409,7 +386,7 @@ public class AppSchedulingInfo {
|
||||||
@Unstable
|
@Unstable
|
||||||
public synchronized ResourceRequest getNextResourceRequest() {
|
public synchronized ResourceRequest getNextResourceRequest() {
|
||||||
SchedulingPlacementSet<SchedulerNode> ps = schedulerKeyToPlacementSets.get(
|
SchedulingPlacementSet<SchedulerNode> ps = schedulerKeyToPlacementSets.get(
|
||||||
schedulerKeys.firstKey());
|
schedulerKeys.first());
|
||||||
if (null != ps) {
|
if (null != ps) {
|
||||||
for (ResourceRequest rr : ps.getResourceRequests().values()) {
|
for (ResourceRequest rr : ps.getResourceRequests().values()) {
|
||||||
return rr;
|
return rr;
|
||||||
|
|
|
@ -204,15 +204,17 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
|
||||||
private void decrementOutstanding(SchedulerRequestKey schedulerRequestKey,
|
private void decrementOutstanding(SchedulerRequestKey schedulerRequestKey,
|
||||||
ResourceRequest offSwitchRequest) {
|
ResourceRequest offSwitchRequest) {
|
||||||
int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
|
int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
|
||||||
|
|
||||||
// Do not remove ANY
|
|
||||||
offSwitchRequest.setNumContainers(numOffSwitchContainers);
|
offSwitchRequest.setNumContainers(numOffSwitchContainers);
|
||||||
|
|
||||||
// Do we have any outstanding requests?
|
// Do we have any outstanding requests?
|
||||||
// If there is nothing, we need to deactivate this application
|
// If there is nothing, we need to deactivate this application
|
||||||
if (numOffSwitchContainers == 0) {
|
if (numOffSwitchContainers == 0) {
|
||||||
appSchedulingInfo.decrementSchedulerKeyReference(schedulerRequestKey);
|
appSchedulingInfo.getSchedulerKeys().remove(schedulerRequestKey);
|
||||||
appSchedulingInfo.checkForDeactivation();
|
appSchedulingInfo.checkForDeactivation();
|
||||||
|
resourceRequestMap.remove(ResourceRequest.ANY);
|
||||||
|
if (resourceRequestMap.isEmpty()) {
|
||||||
|
appSchedulingInfo.removePlacementSets(schedulerRequestKey);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
appSchedulingInfo.decPendingResource(
|
appSchedulingInfo.decPendingResource(
|
||||||
|
|
|
@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
||||||
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
import org.apache.hadoop.yarn.api.records.ExecutionType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
@ -145,6 +146,7 @@ import java.net.InetSocketAddress;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -2946,6 +2948,164 @@ public class TestCapacityScheduler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSchedulerKeyGarbageCollection() throws Exception {
|
||||||
|
YarnConfiguration conf =
|
||||||
|
new YarnConfiguration(new CapacitySchedulerConfiguration());
|
||||||
|
conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
|
||||||
|
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
memStore.init(conf);
|
||||||
|
MockRM rm = new MockRM(conf, memStore);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
HashMap<NodeId, MockNM> nodes = new HashMap<>();
|
||||||
|
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
|
||||||
|
nodes.put(nm1.getNodeId(), nm1);
|
||||||
|
MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
|
||||||
|
nodes.put(nm2.getNodeId(), nm2);
|
||||||
|
MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
|
||||||
|
nodes.put(nm3.getNodeId(), nm3);
|
||||||
|
MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
|
||||||
|
nodes.put(nm4.getNodeId(), nm4);
|
||||||
|
nm1.registerNode();
|
||||||
|
nm2.registerNode();
|
||||||
|
nm3.registerNode();
|
||||||
|
nm4.registerNode();
|
||||||
|
|
||||||
|
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
|
||||||
|
ApplicationAttemptId attemptId =
|
||||||
|
app1.getCurrentAppAttempt().getAppAttemptId();
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
|
||||||
|
ResourceScheduler scheduler = rm.getResourceScheduler();
|
||||||
|
|
||||||
|
// All nodes 1 - 4 will be applicable for scheduling.
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
nm2.nodeHeartbeat(true);
|
||||||
|
nm3.nodeHeartbeat(true);
|
||||||
|
nm4.nodeHeartbeat(true);
|
||||||
|
|
||||||
|
Thread.sleep(1000);
|
||||||
|
|
||||||
|
AllocateResponse allocateResponse = am1.allocate(
|
||||||
|
Arrays.asList(
|
||||||
|
newResourceRequest(1, 1, ResourceRequest.ANY,
|
||||||
|
Resources.createResource(3 * GB), 1, true,
|
||||||
|
ExecutionType.GUARANTEED),
|
||||||
|
newResourceRequest(2, 2, ResourceRequest.ANY,
|
||||||
|
Resources.createResource(3 * GB), 1, true,
|
||||||
|
ExecutionType.GUARANTEED),
|
||||||
|
newResourceRequest(3, 3, ResourceRequest.ANY,
|
||||||
|
Resources.createResource(3 * GB), 1, true,
|
||||||
|
ExecutionType.GUARANTEED),
|
||||||
|
newResourceRequest(4, 4, ResourceRequest.ANY,
|
||||||
|
Resources.createResource(3 * GB), 1, true,
|
||||||
|
ExecutionType.GUARANTEED)
|
||||||
|
),
|
||||||
|
null);
|
||||||
|
List<Container> allocatedContainers = allocateResponse
|
||||||
|
.getAllocatedContainers();
|
||||||
|
Assert.assertEquals(0, allocatedContainers.size());
|
||||||
|
|
||||||
|
Collection<SchedulerRequestKey> schedulerKeys =
|
||||||
|
((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
|
||||||
|
.getAppSchedulingInfo().getSchedulerKeys();
|
||||||
|
Assert.assertEquals(4, schedulerKeys.size());
|
||||||
|
|
||||||
|
// Get a Node to HB... at which point 1 container should be
|
||||||
|
// allocated
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
Thread.sleep(200);
|
||||||
|
allocateResponse = am1.allocate(new ArrayList<ResourceRequest>(),
|
||||||
|
new ArrayList<ContainerId>());
|
||||||
|
allocatedContainers = allocateResponse.getAllocatedContainers();
|
||||||
|
Assert.assertEquals(1, allocatedContainers.size());
|
||||||
|
|
||||||
|
// Verify 1 outstanding schedulerKey is removed
|
||||||
|
Assert.assertEquals(3, schedulerKeys.size());
|
||||||
|
|
||||||
|
List <ResourceRequest> resReqs =
|
||||||
|
((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
|
||||||
|
.getAppSchedulingInfo().getAllResourceRequests();
|
||||||
|
|
||||||
|
// Verify 1 outstanding schedulerKey is removed from the
|
||||||
|
// rrMap as well
|
||||||
|
Assert.assertEquals(3, resReqs.size());
|
||||||
|
|
||||||
|
// Verify One more container Allocation on node nm2
|
||||||
|
// And ensure the outstanding schedulerKeys go down..
|
||||||
|
nm2.nodeHeartbeat(true);
|
||||||
|
Thread.sleep(200);
|
||||||
|
|
||||||
|
// Update the allocateReq to send 0 numContainer req.
|
||||||
|
// For the satisfied container...
|
||||||
|
allocateResponse = am1.allocate(Arrays.asList(
|
||||||
|
newResourceRequest(1,
|
||||||
|
allocatedContainers.get(0).getAllocationRequestId(),
|
||||||
|
ResourceRequest.ANY,
|
||||||
|
Resources.createResource(3 * GB), 0, true,
|
||||||
|
ExecutionType.GUARANTEED)
|
||||||
|
),
|
||||||
|
new ArrayList<ContainerId>());
|
||||||
|
allocatedContainers = allocateResponse.getAllocatedContainers();
|
||||||
|
Assert.assertEquals(1, allocatedContainers.size());
|
||||||
|
|
||||||
|
// Verify 1 outstanding schedulerKey is removed
|
||||||
|
Assert.assertEquals(2, schedulerKeys.size());
|
||||||
|
|
||||||
|
resReqs = ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
|
||||||
|
.getAppSchedulingInfo().getAllResourceRequests();
|
||||||
|
// Verify the map size is not increased due to 0 req
|
||||||
|
Assert.assertEquals(2, resReqs.size());
|
||||||
|
|
||||||
|
// Now Verify that the AM can cancel 1 Ask:
|
||||||
|
SchedulerRequestKey sk = schedulerKeys.iterator().next();
|
||||||
|
am1.allocate(
|
||||||
|
Arrays.asList(
|
||||||
|
newResourceRequest(sk.getPriority().getPriority(),
|
||||||
|
sk.getAllocationRequestId(),
|
||||||
|
ResourceRequest.ANY, Resources.createResource(3 * GB), 0, true,
|
||||||
|
ExecutionType.GUARANTEED)
|
||||||
|
),
|
||||||
|
null);
|
||||||
|
|
||||||
|
schedulerKeys =
|
||||||
|
((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
|
||||||
|
.getAppSchedulingInfo().getSchedulerKeys();
|
||||||
|
|
||||||
|
Thread.sleep(200);
|
||||||
|
|
||||||
|
// Verify 1 outstanding schedulerKey is removed because of the
|
||||||
|
// cancel ask
|
||||||
|
Assert.assertEquals(1, schedulerKeys.size());
|
||||||
|
|
||||||
|
// Now verify that after the next node heartbeat, we allocate
|
||||||
|
// the last schedulerKey
|
||||||
|
nm3.nodeHeartbeat(true);
|
||||||
|
Thread.sleep(200);
|
||||||
|
allocateResponse = am1.allocate(new ArrayList<ResourceRequest>(),
|
||||||
|
new ArrayList<ContainerId>());
|
||||||
|
allocatedContainers = allocateResponse.getAllocatedContainers();
|
||||||
|
Assert.assertEquals(1, allocatedContainers.size());
|
||||||
|
|
||||||
|
// Verify no more outstanding schedulerKeys..
|
||||||
|
Assert.assertEquals(0, schedulerKeys.size());
|
||||||
|
resReqs =
|
||||||
|
((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
|
||||||
|
.getAppSchedulingInfo().getAllResourceRequests();
|
||||||
|
Assert.assertEquals(0, resReqs.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ResourceRequest newResourceRequest(int priority,
|
||||||
|
long allocReqId, String rName, Resource resource, int numContainers,
|
||||||
|
boolean relaxLoc, ExecutionType eType) {
|
||||||
|
ResourceRequest rr = ResourceRequest.newInstance(
|
||||||
|
Priority.newInstance(priority), rName, resource, numContainers,
|
||||||
|
relaxLoc, null, ExecutionTypeRequest.newInstance(eType, true));
|
||||||
|
rr.setAllocationRequestId(allocReqId);
|
||||||
|
return rr;
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHierarchyQueuesCurrentLimits() throws Exception {
|
public void testHierarchyQueuesCurrentLimits() throws Exception {
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -1053,13 +1053,9 @@ public class TestLeafQueue {
|
||||||
//test case 3
|
//test case 3
|
||||||
qb.finishApplication(app_0.getApplicationId(), user_0);
|
qb.finishApplication(app_0.getApplicationId(), user_0);
|
||||||
qb.finishApplication(app_2.getApplicationId(), user_1);
|
qb.finishApplication(app_2.getApplicationId(), user_1);
|
||||||
qb.releaseResource(clusterResource, app_0,
|
qb.releaseResource(clusterResource, app_0, Resource.newInstance(4*GB, 1),
|
||||||
app_0.getAppSchedulingInfo().getPendingAsk(u0SchedKey)
|
|
||||||
.getPerAllocationResource(),
|
|
||||||
null, null);
|
null, null);
|
||||||
qb.releaseResource(clusterResource, app_2,
|
qb.releaseResource(clusterResource, app_2, Resource.newInstance(4*GB, 1),
|
||||||
app_2.getAppSchedulingInfo().getPendingAsk(u1SchedKey)
|
|
||||||
.getPerAllocationResource(),
|
|
||||||
null, null);
|
null, null);
|
||||||
|
|
||||||
qb.setUserLimit(50);
|
qb.setUserLimit(50);
|
||||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.StringReader;
|
import java.io.StringReader;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
|
||||||
import javax.ws.rs.core.MediaType;
|
import javax.ws.rs.core.MediaType;
|
||||||
|
@ -31,6 +32,7 @@ import javax.xml.parsers.DocumentBuilderFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
@ -44,6 +46,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
|
||||||
|
@ -168,7 +172,37 @@ public class TestRMWebServicesApps extends JerseyTestBase {
|
||||||
assertEquals("incorrect number of elements", 1, nodesApps.getLength());
|
assertEquals("incorrect number of elements", 1, nodesApps.getLength());
|
||||||
NodeList nodes = dom.getElementsByTagName("app");
|
NodeList nodes = dom.getElementsByTagName("app");
|
||||||
assertEquals("incorrect number of elements", 1, nodes.getLength());
|
assertEquals("incorrect number of elements", 1, nodes.getLength());
|
||||||
verifyAppsXML(nodes, app1);
|
verifyAppsXML(nodes, app1, false);
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRunningApp() throws JSONException, Exception {
|
||||||
|
rm.start();
|
||||||
|
MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
|
||||||
|
RMApp app1 = rm.submitApp(CONTAINER_MB, "testwordcount", "user1");
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, amNodeManager);
|
||||||
|
am1.allocate("*", 4096, 1, new ArrayList<ContainerId>());
|
||||||
|
amNodeManager.nodeHeartbeat(true);
|
||||||
|
|
||||||
|
WebResource r = resource();
|
||||||
|
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||||
|
.path("apps").accept(MediaType.APPLICATION_XML)
|
||||||
|
.get(ClientResponse.class);
|
||||||
|
assertEquals(MediaType.APPLICATION_XML_TYPE, response.getType());
|
||||||
|
String xml = response.getEntity(String.class);
|
||||||
|
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
|
||||||
|
DocumentBuilder db = dbf.newDocumentBuilder();
|
||||||
|
InputSource is = new InputSource();
|
||||||
|
is.setCharacterStream(new StringReader(xml));
|
||||||
|
Document dom = db.parse(is);
|
||||||
|
NodeList nodesApps = dom.getElementsByTagName("apps");
|
||||||
|
assertEquals("incorrect number of elements", 1, nodesApps.getLength());
|
||||||
|
NodeList nodes = dom.getElementsByTagName("app");
|
||||||
|
assertEquals("incorrect number of elements", 1, nodes.getLength());
|
||||||
|
verifyAppsXML(nodes, app1, true);
|
||||||
|
|
||||||
|
testAppsHelper("apps/", app1, MediaType.APPLICATION_JSON, true);
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -201,6 +235,11 @@ public class TestRMWebServicesApps extends JerseyTestBase {
|
||||||
|
|
||||||
public void testAppsHelper(String path, RMApp app, String media)
|
public void testAppsHelper(String path, RMApp app, String media)
|
||||||
throws JSONException, Exception {
|
throws JSONException, Exception {
|
||||||
|
testAppsHelper(path, app, media, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testAppsHelper(String path, RMApp app, String media,
|
||||||
|
boolean hasResourceReq) throws JSONException, Exception {
|
||||||
WebResource r = resource();
|
WebResource r = resource();
|
||||||
|
|
||||||
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||||
|
@ -212,7 +251,7 @@ public class TestRMWebServicesApps extends JerseyTestBase {
|
||||||
assertEquals("incorrect number of elements", 1, apps.length());
|
assertEquals("incorrect number of elements", 1, apps.length());
|
||||||
JSONArray array = apps.getJSONArray("app");
|
JSONArray array = apps.getJSONArray("app");
|
||||||
assertEquals("incorrect number of elements", 1, array.length());
|
assertEquals("incorrect number of elements", 1, array.length());
|
||||||
verifyAppInfo(array.getJSONObject(0), app);
|
verifyAppInfo(array.getJSONObject(0), app, hasResourceReq);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -235,7 +274,7 @@ public class TestRMWebServicesApps extends JerseyTestBase {
|
||||||
assertEquals("incorrect number of elements", 1, apps.length());
|
assertEquals("incorrect number of elements", 1, apps.length());
|
||||||
JSONArray array = apps.getJSONArray("app");
|
JSONArray array = apps.getJSONArray("app");
|
||||||
assertEquals("incorrect number of elements", 1, array.length());
|
assertEquals("incorrect number of elements", 1, array.length());
|
||||||
verifyAppInfo(array.getJSONObject(0), app1);
|
verifyAppInfo(array.getJSONObject(0), app1, false);
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -468,7 +507,7 @@ public class TestRMWebServicesApps extends JerseyTestBase {
|
||||||
assertEquals("incorrect number of elements", 1, apps.length());
|
assertEquals("incorrect number of elements", 1, apps.length());
|
||||||
JSONArray array = apps.getJSONArray("app");
|
JSONArray array = apps.getJSONArray("app");
|
||||||
assertEquals("incorrect number of elements", 1, array.length());
|
assertEquals("incorrect number of elements", 1, array.length());
|
||||||
verifyAppInfo(array.getJSONObject(0), app1);
|
verifyAppInfo(array.getJSONObject(0), app1, false);
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1279,7 +1318,7 @@ public class TestRMWebServicesApps extends JerseyTestBase {
|
||||||
JSONObject json = response.getEntity(JSONObject.class);
|
JSONObject json = response.getEntity(JSONObject.class);
|
||||||
|
|
||||||
assertEquals("incorrect number of elements", 1, json.length());
|
assertEquals("incorrect number of elements", 1, json.length());
|
||||||
verifyAppInfo(json.getJSONObject("app"), app);
|
verifyAppInfo(json.getJSONObject("app"), app, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -1302,11 +1341,11 @@ public class TestRMWebServicesApps extends JerseyTestBase {
|
||||||
Document dom = db.parse(is);
|
Document dom = db.parse(is);
|
||||||
NodeList nodes = dom.getElementsByTagName("app");
|
NodeList nodes = dom.getElementsByTagName("app");
|
||||||
assertEquals("incorrect number of elements", 1, nodes.getLength());
|
assertEquals("incorrect number of elements", 1, nodes.getLength());
|
||||||
verifyAppsXML(nodes, app1);
|
verifyAppsXML(nodes, app1, false);
|
||||||
rm.stop();
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void verifyAppsXML(NodeList nodes, RMApp app)
|
public void verifyAppsXML(NodeList nodes, RMApp app, boolean hasResourceReq)
|
||||||
throws JSONException, Exception {
|
throws JSONException, Exception {
|
||||||
|
|
||||||
for (int i = 0; i < nodes.getLength(); i++) {
|
for (int i = 0; i < nodes.getLength(); i++) {
|
||||||
|
@ -1345,32 +1384,40 @@ public class TestRMWebServicesApps extends JerseyTestBase {
|
||||||
WebServicesTestUtils.getXmlString(element, "amNodeLabelExpression"),
|
WebServicesTestUtils.getXmlString(element, "amNodeLabelExpression"),
|
||||||
WebServicesTestUtils.getXmlString(element, "amRPCAddress"));
|
WebServicesTestUtils.getXmlString(element, "amRPCAddress"));
|
||||||
|
|
||||||
assertEquals(element.getElementsByTagName("resourceRequests").getLength(),
|
if (hasResourceReq) {
|
||||||
1);
|
assertEquals(element.getElementsByTagName("resourceRequests")
|
||||||
|
.getLength(), 1);
|
||||||
Element resourceRequests =
|
Element resourceRequests =
|
||||||
(Element) element.getElementsByTagName("resourceRequests").item(0);
|
(Element) element.getElementsByTagName("resourceRequests").item(0);
|
||||||
Element capability =
|
Element capability = (Element) resourceRequests
|
||||||
(Element) resourceRequests.getElementsByTagName("capability").item(0);
|
.getElementsByTagName("capability").item(0);
|
||||||
|
ResourceRequest rr =
|
||||||
verifyResourceRequestsGeneric(app,
|
((AbstractYarnScheduler)rm.getRMContext().getScheduler())
|
||||||
|
.getApplicationAttempt(
|
||||||
|
app.getCurrentAppAttempt().getAppAttemptId())
|
||||||
|
.getAppSchedulingInfo().getAllResourceRequests().get(0);
|
||||||
|
verifyResourceRequestsGeneric(rr,
|
||||||
WebServicesTestUtils.getXmlString(resourceRequests,
|
WebServicesTestUtils.getXmlString(resourceRequests,
|
||||||
"nodeLabelExpression"),
|
"nodeLabelExpression"),
|
||||||
WebServicesTestUtils.getXmlInt(resourceRequests, "numContainers"),
|
WebServicesTestUtils.getXmlInt(resourceRequests, "numContainers"),
|
||||||
WebServicesTestUtils.getXmlBoolean(resourceRequests, "relaxLocality"),
|
WebServicesTestUtils.getXmlBoolean(
|
||||||
|
resourceRequests, "relaxLocality"),
|
||||||
WebServicesTestUtils.getXmlInt(resourceRequests, "priority"),
|
WebServicesTestUtils.getXmlInt(resourceRequests, "priority"),
|
||||||
WebServicesTestUtils.getXmlString(resourceRequests, "resourceName"),
|
WebServicesTestUtils.getXmlString(resourceRequests, "resourceName"),
|
||||||
WebServicesTestUtils.getXmlLong(capability, "memory"),
|
WebServicesTestUtils.getXmlLong(capability, "memory"),
|
||||||
WebServicesTestUtils.getXmlLong(capability, "vCores"),
|
WebServicesTestUtils.getXmlLong(capability, "vCores"),
|
||||||
WebServicesTestUtils.getXmlString(resourceRequests, "executionType"),
|
WebServicesTestUtils.getXmlString(
|
||||||
|
resourceRequests, "executionType"),
|
||||||
WebServicesTestUtils.getXmlBoolean(resourceRequests,
|
WebServicesTestUtils.getXmlBoolean(resourceRequests,
|
||||||
"enforceExecutionType"));
|
"enforceExecutionType"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void verifyAppInfo(JSONObject info, RMApp app) throws JSONException,
|
public void verifyAppInfo(JSONObject info, RMApp app, boolean hasResourceReqs)
|
||||||
Exception {
|
throws JSONException, Exception {
|
||||||
|
|
||||||
int expectedNumberOfElements = 35;
|
int expectedNumberOfElements = 34 + (hasResourceReqs ? 2 : 0);
|
||||||
String appNodeLabelExpression = null;
|
String appNodeLabelExpression = null;
|
||||||
String amNodeLabelExpression = null;
|
String amNodeLabelExpression = null;
|
||||||
if (app.getApplicationSubmissionContext()
|
if (app.getApplicationSubmissionContext()
|
||||||
|
@ -1412,8 +1459,10 @@ public class TestRMWebServicesApps extends JerseyTestBase {
|
||||||
amNodeLabelExpression,
|
amNodeLabelExpression,
|
||||||
amRPCAddress);
|
amRPCAddress);
|
||||||
|
|
||||||
|
if (hasResourceReqs) {
|
||||||
verifyResourceRequests(info.getJSONArray("resourceRequests"), app);
|
verifyResourceRequests(info.getJSONArray("resourceRequests"), app);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void verifyAppInfoGeneric(RMApp app, String id, String user,
|
public void verifyAppInfoGeneric(RMApp app, String id, String user,
|
||||||
String name, String applicationType, String queue, int prioirty,
|
String name, String applicationType, String queue, int prioirty,
|
||||||
|
@ -1441,8 +1490,10 @@ public class TestRMWebServicesApps extends JerseyTestBase {
|
||||||
WebServicesTestUtils.checkStringMatch("finalStatus", app
|
WebServicesTestUtils.checkStringMatch("finalStatus", app
|
||||||
.getFinalApplicationStatus().toString(), finalStatus);
|
.getFinalApplicationStatus().toString(), finalStatus);
|
||||||
assertEquals("progress doesn't match", 0, progress, 0.0);
|
assertEquals("progress doesn't match", 0, progress, 0.0);
|
||||||
|
if ("UNASSIGNED".equals(trackingUI)) {
|
||||||
WebServicesTestUtils.checkStringMatch("trackingUI", "UNASSIGNED",
|
WebServicesTestUtils.checkStringMatch("trackingUI", "UNASSIGNED",
|
||||||
trackingUI);
|
trackingUI);
|
||||||
|
}
|
||||||
WebServicesTestUtils.checkStringEqual("diagnostics",
|
WebServicesTestUtils.checkStringEqual("diagnostics",
|
||||||
app.getDiagnostics().toString(), diagnostics);
|
app.getDiagnostics().toString(), diagnostics);
|
||||||
assertEquals("clusterId doesn't match",
|
assertEquals("clusterId doesn't match",
|
||||||
|
@ -1495,7 +1546,12 @@ public class TestRMWebServicesApps extends JerseyTestBase {
|
||||||
public void verifyResourceRequests(JSONArray resourceRequest, RMApp app)
|
public void verifyResourceRequests(JSONArray resourceRequest, RMApp app)
|
||||||
throws JSONException {
|
throws JSONException {
|
||||||
JSONObject requestInfo = resourceRequest.getJSONObject(0);
|
JSONObject requestInfo = resourceRequest.getJSONObject(0);
|
||||||
verifyResourceRequestsGeneric(app,
|
ResourceRequest rr =
|
||||||
|
((AbstractYarnScheduler) rm.getRMContext().getScheduler())
|
||||||
|
.getApplicationAttempt(
|
||||||
|
app.getCurrentAppAttempt().getAppAttemptId())
|
||||||
|
.getAppSchedulingInfo().getAllResourceRequests().get(0);
|
||||||
|
verifyResourceRequestsGeneric(rr,
|
||||||
requestInfo.getString("nodeLabelExpression"),
|
requestInfo.getString("nodeLabelExpression"),
|
||||||
requestInfo.getInt("numContainers"),
|
requestInfo.getInt("numContainers"),
|
||||||
requestInfo.getBoolean("relaxLocality"), requestInfo.getInt("priority"),
|
requestInfo.getBoolean("relaxLocality"), requestInfo.getInt("priority"),
|
||||||
|
@ -1508,11 +1564,10 @@ public class TestRMWebServicesApps extends JerseyTestBase {
|
||||||
.getBoolean("enforceExecutionType"));
|
.getBoolean("enforceExecutionType"));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void verifyResourceRequestsGeneric(RMApp app,
|
public void verifyResourceRequestsGeneric(ResourceRequest request,
|
||||||
String nodeLabelExpression, int numContainers, boolean relaxLocality,
|
String nodeLabelExpression, int numContainers, boolean relaxLocality,
|
||||||
int priority, String resourceName, long memory, long vCores,
|
int priority, String resourceName, long memory, long vCores,
|
||||||
String executionType, boolean enforceExecutionType) {
|
String executionType, boolean enforceExecutionType) {
|
||||||
ResourceRequest request = app.getAMResourceRequests().get(0);
|
|
||||||
assertEquals("nodeLabelExpression doesn't match",
|
assertEquals("nodeLabelExpression doesn't match",
|
||||||
request.getNodeLabelExpression(), nodeLabelExpression);
|
request.getNodeLabelExpression(), nodeLabelExpression);
|
||||||
assertEquals("numContainers doesn't match", request.getNumContainers(),
|
assertEquals("numContainers doesn't match", request.getNumContainers(),
|
||||||
|
|
Loading…
Reference in New Issue