YARN-5110. Fix OpportunisticContainerAllocator to insert complete HostAddress in issued ContainerTokenIds. (Konstantinos Karanasos via asuresh)

(cherry picked from commit 1597630681)
This commit is contained in:
Arun Suresh 2016-05-18 18:46:00 -07:00
parent a555a320e8
commit 15808717f3
7 changed files with 91 additions and 20 deletions

View File

@ -41,5 +41,5 @@ public static QueuedContainersStatus newInstance() {
public abstract int getWaitQueueLength();
public abstract void setWaitQueueLength(int queueWaitTime);
public abstract void setWaitQueueLength(int waitQueueLength);
}

View File

@ -200,6 +200,7 @@ private void startAllocatedContainer(
.getContainerTokenIdentifier().getContainerID();
this.context.getQueuingContext().getQueuedContainers().remove(containerId);
try {
LOG.info("Starting container [" + containerId + "]");
super.startContainerInternal(
allocatedContainerInfo.getContainerTokenIdentifier(),
allocatedContainerInfo.getStartRequest());

View File

@ -337,8 +337,10 @@ private void addToNodeList(List<NodeId> nodes) {
@Override
public DistSchedAllocateResponse allocateForDistributedScheduling
(AllocateRequest request) throws YarnException, IOException {
LOG.info("Forwarding allocate request to the" +
"Distributed Scheduler Service on YARN RM");
if (LOG.isDebugEnabled()) {
LOG.debug("Forwarding allocate request to the" +
"Distributed Scheduler Service on YARN RM");
}
// Partition requests into GUARANTEED and OPPORTUNISTIC reqs
PartitionedResourceRequests partitionedAsks = partitionAskList(request
.getAskList());

View File

@ -84,14 +84,14 @@ public Map<Resource, List<Container>> allocate(DistSchedulerParams appParams,
Map<String, NodeId> allNodes, String userName) throws YarnException {
Map<Resource, List<Container>> containers = new HashMap<>();
Set<String> nodesAllocated = new HashSet<>();
int numAsks = resourceAsks.size();
for (ResourceRequest anyAsk : resourceAsks) {
allocateOpportunisticContainers(appParams, idCounter, blacklist, appAttId,
allNodes, userName, containers, nodesAllocated, anyAsk);
}
if (numAsks > 0) {
LOG.info("Opportunistic allocation requested for: " + numAsks
+ " containers; allocated = " + containers.size());
LOG.info("Opportunistic allocation requested for ["
+ "priority=" + anyAsk.getPriority()
+ ", num_containers=" + anyAsk.getNumContainers()
+ ", capability=" + anyAsk.getCapability() + "]"
+ " allocated = " + containers.get(anyAsk.getCapability()).size());
}
return containers;
}
@ -129,8 +129,9 @@ private void allocateOpportunisticContainers(DistSchedulerParams appParams,
}
cList.add(container);
numAllocated++;
LOG.info("Allocated " + numAllocated + " opportunistic containers.");
LOG.info("Allocated [" + container.getId() + "] as opportunistic.");
}
LOG.info("Allocated " + numAllocated + " opportunistic containers.");
}
private Container buildContainer(DistSchedulerParams appParams,
@ -146,8 +147,8 @@ private Container buildContainer(DistSchedulerParams appParams,
long currTime = System.currentTimeMillis();
ContainerTokenIdentifier containerTokenIdentifier =
new ContainerTokenIdentifier(
cId, nodeId.getHost(), userName, capability,
currTime + appParams.containerTokenExpiryInterval,
cId, nodeId.getHost() + ":" + nodeId.getPort(), userName,
capability, currTime + appParams.containerTokenExpiryInterval,
context.getContainerTokenSecretManager().getCurrentKey().getKeyId(),
nodeStatusUpdater.getRMIdentifier(), rr.getPriority(), currTime,
null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK,

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.scheduler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.server.api.DistributedSchedulerProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -45,6 +46,7 @@
.NMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.nodemanager.security
.NMTokenSecretManagerInNM;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
@ -196,9 +198,14 @@ private DistSchedAllocateResponse createAllocateResponse(List<NodeId> nodes) {
}
private Map<NodeId, List<ContainerId>> mapAllocs(AllocateResponse
allocateResponse) {
allocateResponse) throws Exception {
Map<NodeId, List<ContainerId>> allocs = new HashMap<>();
for (Container c : allocateResponse.getAllocatedContainers()) {
ContainerTokenIdentifier cTokId = BuilderUtils
.newContainerTokenIdentifier(c.getContainerToken());
Assert.assertEquals(
c.getNodeId().getHost() + ":" + c.getNodeId().getPort(),
cTokId.getNmHostAddress());
List<ContainerId> cIds = allocs.get(c.getNodeId());
if (cIds == null) {
cIds = new ArrayList<>();

View File

@ -195,11 +195,11 @@ public void updateNode(RMNode rmNode) {
new ClusterNode(rmNode.getNodeID())
.setQueueWaitTime(estimatedQueueWaitTime)
.setQueueLength(waitQueueLength));
LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" +
LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "] " +
"with queue wait time [" + estimatedQueueWaitTime + "] and " +
"wait queue length [" + waitQueueLength + "]");
} else {
LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "]" +
LOG.warn("IGNORING ClusterNode [" + rmNode.getNodeID() + "] " +
"with queue wait time [" + estimatedQueueWaitTime + "] and " +
"wait queue length [" + waitQueueLength + "]");
}
@ -210,12 +210,14 @@ public void updateNode(RMNode rmNode) {
.setQueueWaitTime(estimatedQueueWaitTime)
.setQueueLength(waitQueueLength)
.updateTimestamp();
LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" +
"with queue wait time [" + estimatedQueueWaitTime + "] and " +
"wait queue length [" + waitQueueLength + "]");
if (LOG.isDebugEnabled()) {
LOG.debug("Updating ClusterNode [" + rmNode.getNodeID() + "] " +
"with queue wait time [" + estimatedQueueWaitTime + "] and " +
"wait queue length [" + waitQueueLength + "]");
}
} else {
this.clusterNodes.remove(rmNode.getNodeID());
LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" +
LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "] " +
"with queue wait time [" + currentNode.queueWaitTime + "] and " +
"wait queue length [" + currentNode.queueLength + "]");
}

View File

@ -75,6 +75,9 @@
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.DefaultRequestInterceptor;
import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.RequestInterceptor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.queuing.QueuingContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
@ -713,8 +716,14 @@ protected ContainerManagerImpl createContainerManager(Context context,
ContainerExecutor exec, DeletionService del,
NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
LocalDirsHandlerService dirsHandler) {
return new CustomContainerManagerImpl(context, exec, del,
nodeStatusUpdater, metrics, dirsHandler);
if (getConfig().getBoolean(YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED,
YarnConfiguration.NM_CONTAINER_QUEUING_ENABLED_DEFAULT)) {
return new CustomQueueingContainerManagerImpl(context, exec, del,
nodeStatusUpdater, metrics, dirsHandler);
} else {
return new CustomContainerManagerImpl(context, exec, del,
nodeStatusUpdater, metrics, dirsHandler);
}
}
}
@ -846,6 +855,55 @@ protected void createAMRMProxyService(Configuration conf) {
}
}
private class CustomQueueingContainerManagerImpl extends
QueuingContainerManagerImpl {
public CustomQueueingContainerManagerImpl(Context context,
ContainerExecutor exec, DeletionService del, NodeStatusUpdater
nodeStatusUpdater, NodeManagerMetrics metrics,
LocalDirsHandlerService dirsHandler) {
super(context, exec, del, nodeStatusUpdater, metrics, dirsHandler);
}
@Override
protected ContainersMonitor createContainersMonitor(ContainerExecutor
exec) {
return new ContainersMonitorImpl(exec, dispatcher, this.context) {
@Override
public void increaseContainersAllocation(ProcessTreeInfo pti) { }
@Override
public void decreaseContainersAllocation(ProcessTreeInfo pti) { }
@Override
public boolean hasResourcesAvailable(
ContainersMonitorImpl.ProcessTreeInfo pti) {
return true;
}
};
}
@Override
protected void createAMRMProxyService(Configuration conf) {
this.amrmProxyEnabled =
conf.getBoolean(YarnConfiguration.AMRM_PROXY_ENABLED,
YarnConfiguration.DEFAULT_AMRM_PROXY_ENABLED);
if (this.amrmProxyEnabled) {
LOG.info("CustomAMRMProxyService is enabled. "
+ "All the AM->RM requests will be intercepted by the proxy");
AMRMProxyService amrmProxyService =
useRpc ? new AMRMProxyService(getContext(), dispatcher)
: new ShortCircuitedAMRMProxy(getContext(), dispatcher);
this.setAMRMProxyService(amrmProxyService);
addService(this.getAMRMProxyService());
} else {
LOG.info("CustomAMRMProxyService is disabled");
}
}
}
private class ShortCircuitedAMRMProxy extends AMRMProxyService {
public ShortCircuitedAMRMProxy(Context context,