YARN-5823. Update NMTokens in case of requests with only opportunistic containers. (Konstantinos Karanasos via asuresh)

This commit is contained in:
Arun Suresh 2016-11-09 00:11:25 -08:00
parent ed0bebabaa
commit 283fa33feb
6 changed files with 137 additions and 36 deletions

View File

@ -229,6 +229,9 @@ public class TestOpportunisticContainerAllocation {
amClient.registerApplicationMaster("Host", 10000, "");
testOpportunisticAllocation(
(AMRMClientImpl<AMRMClient.ContainerRequest>) amClient);
testAllocation((AMRMClientImpl<AMRMClient.ContainerRequest>)amClient);
amClient
@ -247,7 +250,6 @@ public class TestOpportunisticContainerAllocation {
final AMRMClientImpl<AMRMClient.ContainerRequest> amClient)
throws YarnException, IOException {
// setup container request
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
@ -388,6 +390,73 @@ public class TestOpportunisticContainerAllocation {
assertEquals(0, amClient.release.size());
}
/**
* Tests allocation with requests comprising only opportunistic containers.
*/
private void testOpportunisticAllocation(
final AMRMClientImpl<AMRMClient.ContainerRequest> amClient)
throws YarnException, IOException {
// setup container request
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority, 0,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
amClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority, 0,
true, null,
ExecutionTypeRequest.newInstance(
ExecutionType.OPPORTUNISTIC, true)));
int oppContainersRequestedAny =
amClient.getTable(0).get(priority, ResourceRequest.ANY,
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
.getNumContainers();
assertEquals(2, oppContainersRequestedAny);
assertEquals(1, amClient.ask.size());
assertEquals(0, amClient.release.size());
// RM should allocate container within 2 calls to allocate()
int allocatedContainerCount = 0;
int iterationsLeft = 10;
Set<ContainerId> releases = new TreeSet<>();
amClient.getNMTokenCache().clearCache();
Assert.assertEquals(0,
amClient.getNMTokenCache().numberOfTokensInCache());
HashMap<String, Token> receivedNMTokens = new HashMap<>();
while (allocatedContainerCount < oppContainersRequestedAny
&& iterationsLeft-- > 0) {
AllocateResponse allocResponse = amClient.allocate(0.1f);
assertEquals(0, amClient.ask.size());
assertEquals(0, amClient.release.size());
for (Container container : allocResponse.getAllocatedContainers()) {
allocatedContainerCount++;
ContainerId rejectContainerId = container.getId();
releases.add(rejectContainerId);
}
for (NMToken token : allocResponse.getNMTokens()) {
String nodeID = token.getNodeId().toString();
receivedNMTokens.put(nodeID, token.getToken());
}
if (allocatedContainerCount < oppContainersRequestedAny) {
// sleep to let NM's heartbeat to RM and trigger allocations
sleep(100);
}
}
assertEquals(1, receivedNMTokens.values().size());
}
private void sleep(int sleepTime) {
try {
Thread.sleep(sleepTime);

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -157,12 +156,18 @@ public class OpportunisticContainerAllocator {
}
}
static class PartitionedResourceRequests {
/**
* Class that includes two lists of {@link ResourceRequest}s: one for
* GUARANTEED and one for OPPORTUNISTIC {@link ResourceRequest}s.
*/
public static class PartitionedResourceRequests {
private List<ResourceRequest> guaranteed = new ArrayList<>();
private List<ResourceRequest> opportunistic = new ArrayList<>();
public List<ResourceRequest> getGuaranteed() {
return guaranteed;
}
public List<ResourceRequest> getOpportunistic() {
return opportunistic;
}
@ -186,10 +191,10 @@ public class OpportunisticContainerAllocator {
}
/**
* Entry point into the Opportunistic Container Allocator.
* Allocate OPPORTUNISTIC containers.
* @param request AllocateRequest
* @param applicationAttemptId ApplicationAttemptId
* @param appContext App Specific OpportunisticContainerContext
* @param opportContext App specific OpportunisticContainerContext
* @param rmIdentifier RM Identifier
* @param appSubmitter App Submitter
* @return List of Containers.
@ -197,37 +202,31 @@ public class OpportunisticContainerAllocator {
*/
public List<Container> allocateContainers(
AllocateRequest request, ApplicationAttemptId applicationAttemptId,
OpportunisticContainerContext appContext, long rmIdentifier,
OpportunisticContainerContext opportContext, long rmIdentifier,
String appSubmitter) throws YarnException {
// Partition requests into GUARANTEED and OPPORTUNISTIC reqs
PartitionedResourceRequests partitionedAsks =
partitionAskList(request.getAskList());
if (partitionedAsks.getOpportunistic().isEmpty()) {
return Collections.emptyList();
}
// Update released containers.
List<ContainerId> releasedContainers = request.getReleaseList();
int numReleasedContainers = releasedContainers.size();
if (numReleasedContainers > 0) {
LOG.info("AttemptID: " + applicationAttemptId + " released: "
+ numReleasedContainers);
appContext.getContainersAllocated().removeAll(releasedContainers);
opportContext.getContainersAllocated().removeAll(releasedContainers);
}
// Also, update black list
// Update black list.
ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest();
if (rbr != null) {
appContext.getBlacklist().removeAll(rbr.getBlacklistRemovals());
appContext.getBlacklist().addAll(rbr.getBlacklistAdditions());
opportContext.getBlacklist().removeAll(rbr.getBlacklistRemovals());
opportContext.getBlacklist().addAll(rbr.getBlacklistAdditions());
}
// Add OPPORTUNISTIC reqs to the outstanding reqs
appContext.addToOutstandingReqs(partitionedAsks.getOpportunistic());
// Add OPPORTUNISTIC requests to the outstanding ones.
opportContext.addToOutstandingReqs(request.getAskList());
// Satisfy the outstanding OPPORTUNISTIC requests.
List<Container> allocatedContainers = new ArrayList<>();
for (Priority priority :
appContext.getOutstandingOpReqs().descendingKeySet()) {
opportContext.getOutstandingOpReqs().descendingKeySet()) {
// Allocated containers :
// Key = Requested Capability,
// Value = List of Containers of given cap (the actual container size
@ -235,16 +234,14 @@ public class OpportunisticContainerAllocator {
// we need the requested capability (key) to match against
// the outstanding reqs)
Map<Resource, List<Container>> allocated = allocate(rmIdentifier,
appContext, priority, applicationAttemptId, appSubmitter);
opportContext, priority, applicationAttemptId, appSubmitter);
for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
appContext.matchAllocationToOutstandingRequest(
opportContext.matchAllocationToOutstandingRequest(
e.getKey(), e.getValue());
allocatedContainers.addAll(e.getValue());
}
}
// Send all the GUARANTEED Reqs to RM
request.setAskList(partitionedAsks.getGuaranteed());
return allocatedContainers;
}
@ -359,8 +356,14 @@ public class OpportunisticContainerAllocator {
return containerToken;
}
private PartitionedResourceRequests partitionAskList(List<ResourceRequest>
askList) {
/**
* Partitions a list of ResourceRequest to two separate lists, one for
* GUARANTEED and one for OPPORTUNISTIC ResourceRequests.
* @param askList the list of ResourceRequests to be partitioned
* @return the partitioned ResourceRequests
*/
public PartitionedResourceRequests partitionAskList(
List<ResourceRequest> askList) {
PartitionedResourceRequests partitionedRequests =
new PartitionedResourceRequests();
for (ResourceRequest rr : askList) {

View File

@ -1523,7 +1523,7 @@ public class ContainerManagerImpl extends CompositeService implements
@Override
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
return null;
return OpportunisticContainersStatus.newInstance();
}
@Override

View File

@ -220,16 +220,27 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
DistributedSchedulingAllocateRequest request)
throws YarnException, IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Forwarding allocate request to the" +
"Distributed Scheduler Service on YARN RM");
}
// Partition requests to GUARANTEED and OPPORTUNISTIC.
OpportunisticContainerAllocator.PartitionedResourceRequests
partitionedAsks = containerAllocator
.partitionAskList(request.getAllocateRequest().getAskList());
// Allocate OPPORTUNISTIC containers.
request.getAllocateRequest().setAskList(partitionedAsks.getOpportunistic());
List<Container> allocatedContainers =
containerAllocator.allocateContainers(
request.getAllocateRequest(), applicationAttemptId,
oppContainerContext, rmIdentifier, appSubmitter);
// Prepare request for sending to RM for scheduling GUARANTEED containers.
request.setAllocatedContainers(allocatedContainers);
request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed());
if (LOG.isDebugEnabled()) {
LOG.debug("Forwarding allocate request to the" +
"Distributed Scheduler Service on YARN RM");
}
DistributedSchedulingAllocateResponse dsResp =
getNextInterceptor().allocateForDistributedScheduling(request);

View File

@ -542,7 +542,8 @@ public class ApplicationMasterService extends AbstractService implements
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
AllocateResponse allocateResponse =
recordFactory.newRecordInstance(AllocateResponse.class);
if (!allocation.getContainers().isEmpty()) {
if (allocation.getNMTokens() != null &&
!allocation.getNMTokens().isEmpty()) {
allocateResponse.setNMTokens(allocation.getNMTokens());
}

View File

@ -220,34 +220,51 @@ public class OpportunisticContainerAllocatorAMService
public AllocateResponse allocate(AllocateRequest request) throws
YarnException, IOException {
// Partition requests to GUARANTEED and OPPORTUNISTIC.
OpportunisticContainerAllocator.PartitionedResourceRequests
partitionedAsks =
oppContainerAllocator.partitionAskList(request.getAskList());
// Allocate OPPORTUNISTIC containers.
request.setAskList(partitionedAsks.getOpportunistic());
final ApplicationAttemptId appAttemptId = getAppAttemptId();
SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
rmContext.getScheduler()).getApplicationAttempt(appAttemptId);
OpportunisticContainerContext oppCtx =
appAttempt.getOpportunisticContainerContext();
oppCtx.updateNodeList(getLeastLoadedNodes());
List<Container> oppContainers =
oppContainerAllocator.allocateContainers(request, appAttemptId, oppCtx,
ResourceManager.getClusterTimeStamp(), appAttempt.getUser());
// Create RMContainers and update the NMTokens.
if (!oppContainers.isEmpty()) {
handleNewContainers(oppContainers, false);
appAttempt.updateNMTokens(oppContainers);
}
// Allocate all guaranteed containers
// Allocate GUARANTEED containers.
request.setAskList(partitionedAsks.getGuaranteed());
AllocateResponse allocateResp = super.allocate(request);
// Add allocated OPPORTUNISTIC containers to the AllocateResponse.
if (!oppContainers.isEmpty()) {
allocateResp.getAllocatedContainers().addAll(oppContainers);
}
// Update opportunistic container context with the allocated GUARANTEED
// containers.
oppCtx.updateCompletedContainers(allocateResp);
// Add all opportunistic containers
allocateResp.getAllocatedContainers().addAll(oppContainers);
return allocateResp;
}
@Override
public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling(
registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request) throws YarnException,
IOException {
RegisterApplicationMasterResponse response =