YARN-5823. Update NMTokens in case of requests with only opportunistic containers. (Konstantinos Karanasos via asuresh)
(cherry picked from commit 283fa33feb
)
(cherry picked from commit 1c4cc88a754ac9f557cdc8c859b8aadec19a5067)
This commit is contained in:
parent
0e60c7cd33
commit
047772f15f
|
@ -229,6 +229,9 @@ public class TestOpportunisticContainerAllocation {
|
|||
|
||||
amClient.registerApplicationMaster("Host", 10000, "");
|
||||
|
||||
testOpportunisticAllocation(
|
||||
(AMRMClientImpl<AMRMClient.ContainerRequest>) amClient);
|
||||
|
||||
testAllocation((AMRMClientImpl<AMRMClient.ContainerRequest>)amClient);
|
||||
|
||||
amClient
|
||||
|
@ -247,7 +250,6 @@ public class TestOpportunisticContainerAllocation {
|
|||
final AMRMClientImpl<AMRMClient.ContainerRequest> amClient)
|
||||
throws YarnException, IOException {
|
||||
// setup container request
|
||||
|
||||
assertEquals(0, amClient.ask.size());
|
||||
assertEquals(0, amClient.release.size());
|
||||
|
||||
|
@ -388,6 +390,73 @@ public class TestOpportunisticContainerAllocation {
|
|||
assertEquals(0, amClient.release.size());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests allocation with requests comprising only opportunistic containers.
|
||||
*/
|
||||
private void testOpportunisticAllocation(
|
||||
final AMRMClientImpl<AMRMClient.ContainerRequest> amClient)
|
||||
throws YarnException, IOException {
|
||||
// setup container request
|
||||
assertEquals(0, amClient.ask.size());
|
||||
assertEquals(0, amClient.release.size());
|
||||
|
||||
amClient.addContainerRequest(
|
||||
new AMRMClient.ContainerRequest(capability, null, null, priority, 0,
|
||||
true, null,
|
||||
ExecutionTypeRequest.newInstance(
|
||||
ExecutionType.OPPORTUNISTIC, true)));
|
||||
amClient.addContainerRequest(
|
||||
new AMRMClient.ContainerRequest(capability, null, null, priority, 0,
|
||||
true, null,
|
||||
ExecutionTypeRequest.newInstance(
|
||||
ExecutionType.OPPORTUNISTIC, true)));
|
||||
|
||||
int oppContainersRequestedAny =
|
||||
amClient.getTable(0).get(priority, ResourceRequest.ANY,
|
||||
ExecutionType.OPPORTUNISTIC, capability).remoteRequest
|
||||
.getNumContainers();
|
||||
|
||||
assertEquals(2, oppContainersRequestedAny);
|
||||
|
||||
assertEquals(1, amClient.ask.size());
|
||||
assertEquals(0, amClient.release.size());
|
||||
|
||||
// RM should allocate container within 2 calls to allocate()
|
||||
int allocatedContainerCount = 0;
|
||||
int iterationsLeft = 10;
|
||||
Set<ContainerId> releases = new TreeSet<>();
|
||||
|
||||
amClient.getNMTokenCache().clearCache();
|
||||
Assert.assertEquals(0,
|
||||
amClient.getNMTokenCache().numberOfTokensInCache());
|
||||
HashMap<String, Token> receivedNMTokens = new HashMap<>();
|
||||
|
||||
while (allocatedContainerCount < oppContainersRequestedAny
|
||||
&& iterationsLeft-- > 0) {
|
||||
AllocateResponse allocResponse = amClient.allocate(0.1f);
|
||||
assertEquals(0, amClient.ask.size());
|
||||
assertEquals(0, amClient.release.size());
|
||||
|
||||
for (Container container : allocResponse.getAllocatedContainers()) {
|
||||
allocatedContainerCount++;
|
||||
ContainerId rejectContainerId = container.getId();
|
||||
releases.add(rejectContainerId);
|
||||
}
|
||||
|
||||
for (NMToken token : allocResponse.getNMTokens()) {
|
||||
String nodeID = token.getNodeId().toString();
|
||||
receivedNMTokens.put(nodeID, token.getToken());
|
||||
}
|
||||
|
||||
if (allocatedContainerCount < oppContainersRequestedAny) {
|
||||
// sleep to let NM's heartbeat to RM and trigger allocations
|
||||
sleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
assertEquals(1, receivedNMTokens.values().size());
|
||||
}
|
||||
|
||||
private void sleep(int sleepTime) {
|
||||
try {
|
||||
Thread.sleep(sleepTime);
|
||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
|||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -157,12 +156,18 @@ public class OpportunisticContainerAllocator {
|
|||
}
|
||||
}
|
||||
|
||||
static class PartitionedResourceRequests {
|
||||
/**
|
||||
* Class that includes two lists of {@link ResourceRequest}s: one for
|
||||
* GUARANTEED and one for OPPORTUNISTIC {@link ResourceRequest}s.
|
||||
*/
|
||||
public static class PartitionedResourceRequests {
|
||||
private List<ResourceRequest> guaranteed = new ArrayList<>();
|
||||
private List<ResourceRequest> opportunistic = new ArrayList<>();
|
||||
|
||||
public List<ResourceRequest> getGuaranteed() {
|
||||
return guaranteed;
|
||||
}
|
||||
|
||||
public List<ResourceRequest> getOpportunistic() {
|
||||
return opportunistic;
|
||||
}
|
||||
|
@ -186,10 +191,10 @@ public class OpportunisticContainerAllocator {
|
|||
}
|
||||
|
||||
/**
|
||||
* Entry point into the Opportunistic Container Allocator.
|
||||
* Allocate OPPORTUNISTIC containers.
|
||||
* @param request AllocateRequest
|
||||
* @param applicationAttemptId ApplicationAttemptId
|
||||
* @param appContext App Specific OpportunisticContainerContext
|
||||
* @param opportContext App specific OpportunisticContainerContext
|
||||
* @param rmIdentifier RM Identifier
|
||||
* @param appSubmitter App Submitter
|
||||
* @return List of Containers.
|
||||
|
@ -197,37 +202,31 @@ public class OpportunisticContainerAllocator {
|
|||
*/
|
||||
public List<Container> allocateContainers(
|
||||
AllocateRequest request, ApplicationAttemptId applicationAttemptId,
|
||||
OpportunisticContainerContext appContext, long rmIdentifier,
|
||||
OpportunisticContainerContext opportContext, long rmIdentifier,
|
||||
String appSubmitter) throws YarnException {
|
||||
// Partition requests into GUARANTEED and OPPORTUNISTIC reqs
|
||||
PartitionedResourceRequests partitionedAsks =
|
||||
partitionAskList(request.getAskList());
|
||||
|
||||
if (partitionedAsks.getOpportunistic().isEmpty()) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
// Update released containers.
|
||||
List<ContainerId> releasedContainers = request.getReleaseList();
|
||||
int numReleasedContainers = releasedContainers.size();
|
||||
if (numReleasedContainers > 0) {
|
||||
LOG.info("AttemptID: " + applicationAttemptId + " released: "
|
||||
+ numReleasedContainers);
|
||||
appContext.getContainersAllocated().removeAll(releasedContainers);
|
||||
opportContext.getContainersAllocated().removeAll(releasedContainers);
|
||||
}
|
||||
|
||||
// Also, update black list
|
||||
// Update black list.
|
||||
ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest();
|
||||
if (rbr != null) {
|
||||
appContext.getBlacklist().removeAll(rbr.getBlacklistRemovals());
|
||||
appContext.getBlacklist().addAll(rbr.getBlacklistAdditions());
|
||||
opportContext.getBlacklist().removeAll(rbr.getBlacklistRemovals());
|
||||
opportContext.getBlacklist().addAll(rbr.getBlacklistAdditions());
|
||||
}
|
||||
|
||||
// Add OPPORTUNISTIC reqs to the outstanding reqs
|
||||
appContext.addToOutstandingReqs(partitionedAsks.getOpportunistic());
|
||||
// Add OPPORTUNISTIC requests to the outstanding ones.
|
||||
opportContext.addToOutstandingReqs(request.getAskList());
|
||||
|
||||
// Satisfy the outstanding OPPORTUNISTIC requests.
|
||||
List<Container> allocatedContainers = new ArrayList<>();
|
||||
for (Priority priority :
|
||||
appContext.getOutstandingOpReqs().descendingKeySet()) {
|
||||
opportContext.getOutstandingOpReqs().descendingKeySet()) {
|
||||
// Allocated containers :
|
||||
// Key = Requested Capability,
|
||||
// Value = List of Containers of given cap (the actual container size
|
||||
|
@ -235,16 +234,14 @@ public class OpportunisticContainerAllocator {
|
|||
// we need the requested capability (key) to match against
|
||||
// the outstanding reqs)
|
||||
Map<Resource, List<Container>> allocated = allocate(rmIdentifier,
|
||||
appContext, priority, applicationAttemptId, appSubmitter);
|
||||
opportContext, priority, applicationAttemptId, appSubmitter);
|
||||
for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
|
||||
appContext.matchAllocationToOutstandingRequest(
|
||||
opportContext.matchAllocationToOutstandingRequest(
|
||||
e.getKey(), e.getValue());
|
||||
allocatedContainers.addAll(e.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
// Send all the GUARANTEED Reqs to RM
|
||||
request.setAskList(partitionedAsks.getGuaranteed());
|
||||
return allocatedContainers;
|
||||
}
|
||||
|
||||
|
@ -359,8 +356,14 @@ public class OpportunisticContainerAllocator {
|
|||
return containerToken;
|
||||
}
|
||||
|
||||
private PartitionedResourceRequests partitionAskList(List<ResourceRequest>
|
||||
askList) {
|
||||
/**
|
||||
* Partitions a list of ResourceRequest to two separate lists, one for
|
||||
* GUARANTEED and one for OPPORTUNISTIC ResourceRequests.
|
||||
* @param askList the list of ResourceRequests to be partitioned
|
||||
* @return the partitioned ResourceRequests
|
||||
*/
|
||||
public PartitionedResourceRequests partitionAskList(
|
||||
List<ResourceRequest> askList) {
|
||||
PartitionedResourceRequests partitionedRequests =
|
||||
new PartitionedResourceRequests();
|
||||
for (ResourceRequest rr : askList) {
|
||||
|
|
|
@ -1446,7 +1446,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|||
|
||||
@Override
|
||||
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
|
||||
return null;
|
||||
return OpportunisticContainersStatus.newInstance();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -220,16 +220,27 @@ public final class DistributedScheduler extends AbstractRequestInterceptor {
|
|||
public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
|
||||
DistributedSchedulingAllocateRequest request)
|
||||
throws YarnException, IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Forwarding allocate request to the" +
|
||||
"Distributed Scheduler Service on YARN RM");
|
||||
}
|
||||
|
||||
// Partition requests to GUARANTEED and OPPORTUNISTIC.
|
||||
OpportunisticContainerAllocator.PartitionedResourceRequests
|
||||
partitionedAsks = containerAllocator
|
||||
.partitionAskList(request.getAllocateRequest().getAskList());
|
||||
|
||||
// Allocate OPPORTUNISTIC containers.
|
||||
request.getAllocateRequest().setAskList(partitionedAsks.getOpportunistic());
|
||||
List<Container> allocatedContainers =
|
||||
containerAllocator.allocateContainers(
|
||||
request.getAllocateRequest(), applicationAttemptId,
|
||||
oppContainerContext, rmIdentifier, appSubmitter);
|
||||
|
||||
// Prepare request for sending to RM for scheduling GUARANTEED containers.
|
||||
request.setAllocatedContainers(allocatedContainers);
|
||||
request.getAllocateRequest().setAskList(partitionedAsks.getGuaranteed());
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Forwarding allocate request to the" +
|
||||
"Distributed Scheduler Service on YARN RM");
|
||||
}
|
||||
|
||||
DistributedSchedulingAllocateResponse dsResp =
|
||||
getNextInterceptor().allocateForDistributedScheduling(request);
|
||||
|
|
|
@ -537,7 +537,8 @@ public class ApplicationMasterService extends AbstractService implements
|
|||
RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
|
||||
AllocateResponse allocateResponse =
|
||||
recordFactory.newRecordInstance(AllocateResponse.class);
|
||||
if (!allocation.getContainers().isEmpty()) {
|
||||
if (allocation.getNMTokens() != null &&
|
||||
!allocation.getNMTokens().isEmpty()) {
|
||||
allocateResponse.setNMTokens(allocation.getNMTokens());
|
||||
}
|
||||
|
||||
|
|
|
@ -220,34 +220,51 @@ public class OpportunisticContainerAllocatorAMService
|
|||
public AllocateResponse allocate(AllocateRequest request) throws
|
||||
YarnException, IOException {
|
||||
|
||||
// Partition requests to GUARANTEED and OPPORTUNISTIC.
|
||||
OpportunisticContainerAllocator.PartitionedResourceRequests
|
||||
partitionedAsks =
|
||||
oppContainerAllocator.partitionAskList(request.getAskList());
|
||||
|
||||
// Allocate OPPORTUNISTIC containers.
|
||||
request.setAskList(partitionedAsks.getOpportunistic());
|
||||
final ApplicationAttemptId appAttemptId = getAppAttemptId();
|
||||
SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
|
||||
rmContext.getScheduler()).getApplicationAttempt(appAttemptId);
|
||||
|
||||
OpportunisticContainerContext oppCtx =
|
||||
appAttempt.getOpportunisticContainerContext();
|
||||
oppCtx.updateNodeList(getLeastLoadedNodes());
|
||||
|
||||
List<Container> oppContainers =
|
||||
oppContainerAllocator.allocateContainers(request, appAttemptId, oppCtx,
|
||||
ResourceManager.getClusterTimeStamp(), appAttempt.getUser());
|
||||
|
||||
// Create RMContainers and update the NMTokens.
|
||||
if (!oppContainers.isEmpty()) {
|
||||
handleNewContainers(oppContainers, false);
|
||||
appAttempt.updateNMTokens(oppContainers);
|
||||
}
|
||||
|
||||
// Allocate all guaranteed containers
|
||||
// Allocate GUARANTEED containers.
|
||||
request.setAskList(partitionedAsks.getGuaranteed());
|
||||
AllocateResponse allocateResp = super.allocate(request);
|
||||
|
||||
// Add allocated OPPORTUNISTIC containers to the AllocateResponse.
|
||||
if (!oppContainers.isEmpty()) {
|
||||
allocateResp.getAllocatedContainers().addAll(oppContainers);
|
||||
}
|
||||
|
||||
// Update opportunistic container context with the allocated GUARANTEED
|
||||
// containers.
|
||||
oppCtx.updateCompletedContainers(allocateResp);
|
||||
|
||||
// Add all opportunistic containers
|
||||
allocateResp.getAllocatedContainers().addAll(oppContainers);
|
||||
return allocateResp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RegisterDistributedSchedulingAMResponse
|
||||
registerApplicationMasterForDistributedScheduling(
|
||||
registerApplicationMasterForDistributedScheduling(
|
||||
RegisterApplicationMasterRequest request) throws YarnException,
|
||||
IOException {
|
||||
RegisterApplicationMasterResponse response =
|
||||
|
|
Loading…
Reference in New Issue