YARN-9409. Port resource type changes from YARN-7237 to branch-2
This commit is contained in:
parent
28fbc4e344
commit
87ad52f076
|
@ -58,7 +58,6 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
|
||||||
|
@ -797,7 +796,7 @@ public abstract class AbstractYarnScheduler
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process resource update on a node.
|
* Process resource update on a node.
|
||||||
*/
|
*/
|
||||||
|
@ -900,12 +899,12 @@ public abstract class AbstractYarnScheduler
|
||||||
LOG.info("Updated the cluste max priority to maxClusterLevelAppPriority = "
|
LOG.info("Updated the cluste max priority to maxClusterLevelAppPriority = "
|
||||||
+ maxClusterLevelAppPriority);
|
+ maxClusterLevelAppPriority);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sanity check increase/decrease request, and return
|
* Sanity check increase/decrease request, and return
|
||||||
* SchedulerContainerResourceChangeRequest according to given
|
* SchedulerContainerResourceChangeRequest according to given
|
||||||
* UpdateContainerRequest.
|
* UpdateContainerRequest.
|
||||||
*
|
*
|
||||||
* <pre>
|
* <pre>
|
||||||
* - Returns non-null value means validation succeeded
|
* - Returns non-null value means validation succeeded
|
||||||
* - Throw exception when any other error happens
|
* - Throw exception when any other error happens
|
||||||
|
@ -1334,9 +1333,7 @@ public abstract class AbstractYarnScheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Get a Resource object with for the minimum allocation possible. If resource
|
* Get a Resource object with for the minimum allocation possible.
|
||||||
* profiles are enabled then the 'minimum' resource profile will be used. If
|
|
||||||
* they are not enabled, use the minimums specified in the config files.
|
|
||||||
*
|
*
|
||||||
* @return a Resource object with the minimum allocation for the scheduler
|
* @return a Resource object with the minimum allocation for the scheduler
|
||||||
*/
|
*/
|
||||||
|
@ -1347,9 +1344,7 @@ public abstract class AbstractYarnScheduler
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a Resource object with for the maximum allocation possible. If resource
|
* Get a Resource object with for the maximum allocation possible.
|
||||||
* profiles are enabled then the 'maximum' resource profile will be used. If
|
|
||||||
* they are not enabled, use the maximums specified in the config files.
|
|
||||||
*
|
*
|
||||||
* @return a Resource object with the maximum allocation for the scheduler
|
* @return a Resource object with the maximum allocation for the scheduler
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -47,14 +47,14 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.mortbay.log.Log;
|
import org.mortbay.log.Log;
|
||||||
|
|
||||||
public class MockNM {
|
public class MockNM {
|
||||||
|
|
||||||
private int responseId;
|
private int responseId;
|
||||||
private NodeId nodeId;
|
private NodeId nodeId;
|
||||||
private long memory;
|
private Resource capatibility;
|
||||||
private int vCores;
|
|
||||||
private ResourceTrackerService resourceTracker;
|
private ResourceTrackerService resourceTracker;
|
||||||
private int httpPort = 2;
|
private int httpPort = 2;
|
||||||
private MasterKey currentContainerTokenMasterKey;
|
private MasterKey currentContainerTokenMasterKey;
|
||||||
|
@ -75,13 +75,25 @@ public class MockNM {
|
||||||
|
|
||||||
public MockNM(String nodeIdStr, int memory, int vcores,
|
public MockNM(String nodeIdStr, int memory, int vcores,
|
||||||
ResourceTrackerService resourceTracker) {
|
ResourceTrackerService resourceTracker) {
|
||||||
this(nodeIdStr, memory, vcores, resourceTracker, YarnVersionInfo.getVersion());
|
this(nodeIdStr, memory, vcores, resourceTracker,
|
||||||
|
YarnVersionInfo.getVersion());
|
||||||
}
|
}
|
||||||
|
|
||||||
public MockNM(String nodeIdStr, int memory, int vcores,
|
public MockNM(String nodeIdStr, int memory, int vcores,
|
||||||
ResourceTrackerService resourceTracker, String version) {
|
ResourceTrackerService resourceTracker, String version) {
|
||||||
this.memory = memory;
|
this(nodeIdStr, Resource.newInstance(memory, vcores), resourceTracker,
|
||||||
this.vCores = vcores;
|
version);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MockNM(String nodeIdStr, Resource capatibility,
|
||||||
|
ResourceTrackerService resourceTracker) {
|
||||||
|
this(nodeIdStr, capatibility, resourceTracker,
|
||||||
|
YarnVersionInfo.getVersion());
|
||||||
|
}
|
||||||
|
|
||||||
|
public MockNM(String nodeIdStr, Resource capatibility,
|
||||||
|
ResourceTrackerService resourceTracker, String version) {
|
||||||
|
this.capatibility = capatibility;
|
||||||
this.resourceTracker = resourceTracker;
|
this.resourceTracker = resourceTracker;
|
||||||
this.version = version;
|
this.version = version;
|
||||||
String[] splits = nodeIdStr.split(":");
|
String[] splits = nodeIdStr.split(":");
|
||||||
|
@ -146,8 +158,7 @@ public class MockNM {
|
||||||
RegisterNodeManagerRequest.class);
|
RegisterNodeManagerRequest.class);
|
||||||
req.setNodeId(nodeId);
|
req.setNodeId(nodeId);
|
||||||
req.setHttpPort(httpPort);
|
req.setHttpPort(httpPort);
|
||||||
Resource resource = BuilderUtils.newResource(memory, vCores);
|
req.setResource(capatibility);
|
||||||
req.setResource(resource);
|
|
||||||
req.setContainerStatuses(containerReports);
|
req.setContainerStatuses(containerReports);
|
||||||
req.setNMVersion(version);
|
req.setNMVersion(version);
|
||||||
req.setRunningApplications(runningApplications);
|
req.setRunningApplications(runningApplications);
|
||||||
|
@ -158,8 +169,7 @@ public class MockNM {
|
||||||
this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey();
|
this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey();
|
||||||
Resource newResource = registrationResponse.getResource();
|
Resource newResource = registrationResponse.getResource();
|
||||||
if (newResource != null) {
|
if (newResource != null) {
|
||||||
memory = (int) newResource.getMemorySize();
|
capatibility = Resources.clone(newResource);
|
||||||
vCores = newResource.getVirtualCores();
|
|
||||||
}
|
}
|
||||||
containerStats.clear();
|
containerStats.clear();
|
||||||
if (containerReports != null) {
|
if (containerReports != null) {
|
||||||
|
@ -185,7 +195,7 @@ public class MockNM {
|
||||||
long containerId, ContainerState containerState) throws Exception {
|
long containerId, ContainerState containerState) throws Exception {
|
||||||
ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
|
ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
|
||||||
BuilderUtils.newContainerId(attemptId, containerId), containerState,
|
BuilderUtils.newContainerId(attemptId, containerId), containerState,
|
||||||
"Success", 0, BuilderUtils.newResource(memory, vCores));
|
"Success", 0, capatibility);
|
||||||
ArrayList<ContainerStatus> containerStatusList =
|
ArrayList<ContainerStatus> containerStatusList =
|
||||||
new ArrayList<ContainerStatus>(1);
|
new ArrayList<ContainerStatus>(1);
|
||||||
containerStatusList.add(containerStatus);
|
containerStatusList.add(containerStatus);
|
||||||
|
@ -266,19 +276,22 @@ public class MockNM {
|
||||||
|
|
||||||
Resource newResource = heartbeatResponse.getResource();
|
Resource newResource = heartbeatResponse.getResource();
|
||||||
if (newResource != null) {
|
if (newResource != null) {
|
||||||
memory = newResource.getMemorySize();
|
capatibility = Resources.clone(newResource);
|
||||||
vCores = newResource.getVirtualCores();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return heartbeatResponse;
|
return heartbeatResponse;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getMemory() {
|
public long getMemory() {
|
||||||
return memory;
|
return capatibility.getMemorySize();
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getvCores() {
|
public int getvCores() {
|
||||||
return vCores;
|
return capatibility.getVirtualCores();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Resource getCapatibility() {
|
||||||
|
return capatibility;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getVersion() {
|
public String getVersion() {
|
||||||
|
|
|
@ -848,6 +848,15 @@ public class MockRM extends ResourceManager {
|
||||||
return nm;
|
return nm;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public MockNM registerNode(String nodeIdStr, Resource nodeCapatibility)
|
||||||
|
throws Exception {
|
||||||
|
MockNM nm = new MockNM(nodeIdStr, nodeCapatibility,
|
||||||
|
getResourceTrackerService());
|
||||||
|
nm.registerNode();
|
||||||
|
drainEventsImplicitly();
|
||||||
|
return nm;
|
||||||
|
}
|
||||||
|
|
||||||
public void sendNodeStarted(MockNM nm) throws Exception {
|
public void sendNodeStarted(MockNM nm) throws Exception {
|
||||||
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
|
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
|
||||||
nm.getNodeId());
|
nm.getNodeId());
|
||||||
|
|
|
@ -18,16 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
import static org.mockito.Matchers.any;
|
import com.google.common.collect.Sets;
|
||||||
import static org.mockito.Mockito.doReturn;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.spy;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.Set;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -38,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.Event;
|
import org.apache.hadoop.yarn.event.Event;
|
||||||
|
@ -53,20 +45,28 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
|
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import com.google.common.collect.Sets;
|
import java.io.IOException;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class TestUtils {
|
public class TestUtils {
|
||||||
private static final Log LOG = LogFactory.getLog(TestUtils.class);
|
private static final Log LOG = LogFactory.getLog(TestUtils.class);
|
||||||
|
@ -456,4 +456,21 @@ public class TestUtils {
|
||||||
cs.submitResourceCommitRequest(clusterResource,
|
cs.submitResourceCommitRequest(clusterResource,
|
||||||
csAssignment);
|
csAssignment);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* An easy way to create resources other than memory and vcores for tests.
|
||||||
|
* @param memory memory
|
||||||
|
* @param vcores vcores
|
||||||
|
* @param nameToValues resource types other than memory and vcores.
|
||||||
|
* @return created resource
|
||||||
|
*/
|
||||||
|
public static Resource createResource(long memory, int vcores,
|
||||||
|
Map<String, Integer> nameToValues) {
|
||||||
|
Resource res = Resource.newInstance(memory, vcores);
|
||||||
|
for (Map.Entry<String, Integer> entry : nameToValues.entrySet()) {
|
||||||
|
res.setResourceInformation(entry.getKey(), ResourceInformation
|
||||||
|
.newInstance(entry.getKey(), "", entry.getValue()));
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue