YARN-7237. Cleanup usages of ResourceProfiles. (Wangda Tan)
Change-Id: I959c8531bd48231956068cb7d2312f3235549f9c (cherry picked from commit 8536db5a7549fa44d9f9e444b41bd5978b5c6ae1)
This commit is contained in:
parent
82cd85d868
commit
5cfbad68c0
|
@ -153,18 +153,17 @@ public abstract class ProfileCapability {
|
|||
Resource none = Resource.newInstance(0, 0);
|
||||
Resource resource = Resource.newInstance(0, 0);
|
||||
String profileName = capability.getProfileName();
|
||||
if (profileName.isEmpty()) {
|
||||
if (null == profileName || profileName.isEmpty()) {
|
||||
profileName = DEFAULT_PROFILE;
|
||||
}
|
||||
if (resourceProfilesMap.containsKey(profileName)) {
|
||||
resource = Resource.newInstance(resourceProfilesMap.get(profileName));
|
||||
}
|
||||
|
||||
if (capability.getProfileCapabilityOverride() != null &&
|
||||
!capability.getProfileCapabilityOverride().equals(none)) {
|
||||
for (ResourceInformation entry : capability
|
||||
.getProfileCapabilityOverride().getResources()) {
|
||||
if (entry != null && entry.getValue() >= 0) {
|
||||
if (entry != null && entry.getValue() > 0) {
|
||||
resource.setResourceInformation(entry.getName(), entry);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,16 +1,8 @@
|
|||
{
|
||||
"minimum": {
|
||||
"memory-mb" : 1024,
|
||||
"vcores" : 1
|
||||
},
|
||||
"default" : {
|
||||
"memory-mb" : 2048,
|
||||
"vcores" : 2
|
||||
},
|
||||
"maximum" : {
|
||||
"memory-mb": 4096,
|
||||
"vcores" : 4
|
||||
},
|
||||
"http" : {
|
||||
"memory-mb" : 2048,
|
||||
"vcores" : 2
|
||||
|
|
|
@ -0,0 +1,109 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.api;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||
import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Test profile capability behavior.
|
||||
*/
|
||||
public class TestProfileCapability {
|
||||
@Before
|
||||
public void setup() {
|
||||
// Initialize resource map
|
||||
Map<String, ResourceInformation> riMap = new HashMap<>();
|
||||
|
||||
// Initialize mandatory resources
|
||||
riMap.put(ResourceInformation.MEMORY_URI, ResourceInformation.MEMORY_MB);
|
||||
riMap.put(ResourceInformation.VCORES_URI, ResourceInformation.VCORES);
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
String resourceName = "res-" + i;
|
||||
riMap.put(resourceName, ResourceInformation
|
||||
.newInstance(resourceName, "", 0, ResourceTypes.COUNTABLE, 0,
|
||||
Integer.MAX_VALUE));
|
||||
}
|
||||
|
||||
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertProfileCapabilityToResource() {
|
||||
Resource profile1 = Resource.newInstance(1, 1);
|
||||
profile1.setResourceValue("res-0", 1);
|
||||
profile1.setResourceValue("res-1", 1);
|
||||
|
||||
Resource profile2 = Resource.newInstance(2, 2);
|
||||
profile2.setResourceValue("res-0", 2);
|
||||
profile2.setResourceValue("res-1", 2);
|
||||
|
||||
Resource profile3 = Resource.newInstance(3, 3);
|
||||
profile3.setResourceValue("res-0", 3);
|
||||
profile3.setResourceValue("res-1", 3);
|
||||
|
||||
Map<String, Resource> profiles = ImmutableMap.of("profile1", profile1,
|
||||
"profile2", profile2, "profile3", profile3, "default", profile1);
|
||||
|
||||
// Test case 1, set override value to (1, 1, 0), since we only allow
|
||||
// overwrite for positive value, it is still profile1.
|
||||
ProfileCapability pc = ProfileCapability.newInstance("profile1",
|
||||
Resource.newInstance(1, 1));
|
||||
Assert.assertEquals(profile1, ProfileCapability.toResource(pc, profiles));
|
||||
|
||||
// Test case 2, similarly, negative value won't be respected.
|
||||
pc = ProfileCapability.newInstance("profile1",
|
||||
Resource.newInstance(1, -1));
|
||||
Assert.assertEquals(profile1, ProfileCapability.toResource(pc, profiles));
|
||||
|
||||
// Test case 3, do overwrite for memory and vcores, the result is (3,3,1,1)
|
||||
Resource expected = Resource.newInstance(3, 3);
|
||||
expected.setResourceValue("res-0", 1);
|
||||
expected.setResourceValue("res-1", 1);
|
||||
pc = ProfileCapability.newInstance("profile1",
|
||||
Resource.newInstance(3, 3));
|
||||
Assert.assertEquals(expected, ProfileCapability.toResource(pc, profiles));
|
||||
|
||||
// Test case 3, do overwrite for mem and res-1, the result is (3,1,3,1)
|
||||
expected = Resource.newInstance(3, 1);
|
||||
expected.setResourceValue("res-0", 3);
|
||||
expected.setResourceValue("res-1", 1);
|
||||
|
||||
Resource overwrite = Resource.newInstance(3, 0);
|
||||
overwrite.setResourceValue("res-0", 3);
|
||||
overwrite.setResourceValue("res-1", 0);
|
||||
|
||||
pc = ProfileCapability.newInstance("profile1", overwrite);
|
||||
Assert.assertEquals(expected, ProfileCapability.toResource(pc, profiles));
|
||||
|
||||
// Test case 4, when null profile is specified, use default.
|
||||
pc = ProfileCapability.newInstance("", null);
|
||||
Assert.assertEquals(profile1, ProfileCapability.toResource(pc, profiles));
|
||||
}
|
||||
}
|
|
@ -378,6 +378,7 @@ public class BuilderUtils {
|
|||
request.setNumContainers(r.getNumContainers());
|
||||
request.setNodeLabelExpression(r.getNodeLabelExpression());
|
||||
request.setExecutionTypeRequest(r.getExecutionTypeRequest());
|
||||
request.setProfileCapability(r.getProfileCapability());
|
||||
return request;
|
||||
}
|
||||
|
||||
|
|
|
@ -227,6 +227,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
return rmDispatcher;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected ResourceProfilesManager createResourceProfileManager() {
|
||||
ResourceProfilesManager resourceProfilesManager =
|
||||
new ResourceProfilesManagerImpl();
|
||||
return resourceProfilesManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
this.conf = conf;
|
||||
|
@ -236,7 +243,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|||
|
||||
// add resource profiles here because it's used by AbstractYarnScheduler
|
||||
ResourceProfilesManager resourceProfilesManager =
|
||||
new ResourceProfilesManagerImpl();
|
||||
createResourceProfileManager();
|
||||
resourceProfilesManager.init(conf);
|
||||
rmContext.setResourceProfilesManager(resourceProfilesManager);
|
||||
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YARNFeatureNotEnabledException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
@ -35,12 +34,9 @@ import org.codehaus.jackson.map.ObjectMapper;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
@ -55,8 +51,6 @@ public class ResourceProfilesManagerImpl implements ResourceProfilesManager {
|
|||
LogFactory.getLog(ResourceProfilesManagerImpl.class);
|
||||
|
||||
private final Map<String, Resource> profiles = new ConcurrentHashMap<>();
|
||||
private List<ResourceTypeInfo> resourceTypeInfo =
|
||||
new ArrayList<ResourceTypeInfo>();
|
||||
private Configuration conf;
|
||||
private boolean profileEnabled = false;
|
||||
|
||||
|
@ -87,26 +81,6 @@ public class ResourceProfilesManagerImpl implements ResourceProfilesManager {
|
|||
public void init(Configuration config) throws IOException {
|
||||
conf = config;
|
||||
loadProfiles();
|
||||
|
||||
// Load resource types, this should be done even if resource profile is
|
||||
// disabled, since we have mandatory resource types like vcores/memory.
|
||||
loadResourceTypes();
|
||||
}
|
||||
|
||||
private void loadResourceTypes() {
|
||||
// Add all resource types
|
||||
try {
|
||||
writeLock.lock();
|
||||
Collection<ResourceInformation> resourcesInfo = ResourceUtils
|
||||
.getResourceTypes().values();
|
||||
for (ResourceInformation resourceInfo : resourcesInfo) {
|
||||
resourceTypeInfo
|
||||
.add(ResourceTypeInfo.newInstance(resourceInfo.getName(),
|
||||
resourceInfo.getUnits(), resourceInfo.getResourceType()));
|
||||
}
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void loadProfiles() throws IOException {
|
||||
|
@ -140,6 +114,14 @@ public class ResourceProfilesManagerImpl implements ResourceProfilesManager {
|
|||
throw new IOException(
|
||||
"Name of resource profile cannot be an empty string");
|
||||
}
|
||||
|
||||
if (profileName.equals(MINIMUM_PROFILE) || profileName.equals(
|
||||
MAXIMUM_PROFILE)) {
|
||||
throw new IOException(String.format(
|
||||
"profile={%s, %s} is should not be specified "
|
||||
+ "inside %s, they will be loaded from resource-types.xml",
|
||||
MINIMUM_PROFILE, MAXIMUM_PROFILE, sourceFile));
|
||||
}
|
||||
if (entry.getValue() instanceof Map) {
|
||||
Map profileInfo = (Map) entry.getValue();
|
||||
// ensure memory and vcores are specified
|
||||
|
@ -155,6 +137,13 @@ public class ResourceProfilesManagerImpl implements ResourceProfilesManager {
|
|||
"Added profile '" + profileName + "' with resources: " + resource);
|
||||
}
|
||||
}
|
||||
|
||||
// add minimum/maximum profile
|
||||
profiles.put(MINIMUM_PROFILE,
|
||||
ResourceUtils.getResourceTypesMinimumAllocation());
|
||||
profiles.put(MAXIMUM_PROFILE,
|
||||
ResourceUtils.getResourceTypesMaximumAllocation());
|
||||
|
||||
// check to make sure mandatory profiles are present
|
||||
for (String profile : MANDATORY_PROFILES) {
|
||||
if (!profiles.containsKey(profile)) {
|
||||
|
|
|
@ -58,7 +58,6 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
|
||||
|
@ -1328,57 +1327,24 @@ public abstract class AbstractYarnScheduler
|
|||
}
|
||||
|
||||
/*
|
||||
* Get a Resource object with for the minimum allocation possible. If resource
|
||||
* profiles are enabled then the 'minimum' resource profile will be used. If
|
||||
* they are not enabled, use the minimums specified in the config files.
|
||||
* Get a Resource object with for the minimum allocation possible.
|
||||
*
|
||||
* @return a Resource object with the minimum allocation for the scheduler
|
||||
*/
|
||||
public Resource getMinimumAllocation() {
|
||||
boolean profilesEnabled = getConfig()
|
||||
.getBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED,
|
||||
YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_ENABLED);
|
||||
Resource ret;
|
||||
if (!profilesEnabled) {
|
||||
ret = ResourceUtils.getResourceTypesMinimumAllocation();
|
||||
} else {
|
||||
try {
|
||||
ret = rmContext.getResourceProfilesManager().getMinimumProfile();
|
||||
} catch (YarnException e) {
|
||||
LOG.error(
|
||||
"Exception while getting minimum profile from profile manager:", e);
|
||||
throw new YarnRuntimeException(e);
|
||||
}
|
||||
}
|
||||
Resource ret = ResourceUtils.getResourceTypesMinimumAllocation();
|
||||
LOG.info("Minimum allocation = " + ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a Resource object with for the maximum allocation possible. If resource
|
||||
* profiles are enabled then the 'maximum' resource profile will be used. If
|
||||
* they are not enabled, use the maximums specified in the config files.
|
||||
* Get a Resource object with for the maximum allocation possible.
|
||||
*
|
||||
* @return a Resource object with the maximum allocation for the scheduler
|
||||
*/
|
||||
|
||||
public Resource getMaximumAllocation() {
|
||||
boolean profilesEnabled = getConfig()
|
||||
.getBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED,
|
||||
YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_ENABLED);
|
||||
Resource ret;
|
||||
if (!profilesEnabled) {
|
||||
ret = ResourceUtils.getResourceTypesMaximumAllocation();
|
||||
} else {
|
||||
try {
|
||||
ret = rmContext.getResourceProfilesManager().getMaximumProfile();
|
||||
} catch (YarnException e) {
|
||||
LOG.error(
|
||||
"Exception while getting maximum profile from ResourceProfileManager:",
|
||||
e);
|
||||
throw new YarnRuntimeException(e);
|
||||
}
|
||||
}
|
||||
Resource ret = ResourceUtils.getResourceTypesMaximumAllocation();
|
||||
LOG.info("Maximum allocation = " + ret);
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -47,14 +47,14 @@ import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
|||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
|
||||
public class MockNM {
|
||||
|
||||
private int responseId;
|
||||
private NodeId nodeId;
|
||||
private long memory;
|
||||
private int vCores;
|
||||
private Resource capatibility;
|
||||
private ResourceTrackerService resourceTracker;
|
||||
private int httpPort = 2;
|
||||
private MasterKey currentContainerTokenMasterKey;
|
||||
|
@ -75,13 +75,25 @@ public class MockNM {
|
|||
|
||||
public MockNM(String nodeIdStr, int memory, int vcores,
|
||||
ResourceTrackerService resourceTracker) {
|
||||
this(nodeIdStr, memory, vcores, resourceTracker, YarnVersionInfo.getVersion());
|
||||
this(nodeIdStr, memory, vcores, resourceTracker,
|
||||
YarnVersionInfo.getVersion());
|
||||
}
|
||||
|
||||
public MockNM(String nodeIdStr, int memory, int vcores,
|
||||
ResourceTrackerService resourceTracker, String version) {
|
||||
this.memory = memory;
|
||||
this.vCores = vcores;
|
||||
this(nodeIdStr, Resource.newInstance(memory, vcores), resourceTracker,
|
||||
version);
|
||||
}
|
||||
|
||||
public MockNM(String nodeIdStr, Resource capatibility,
|
||||
ResourceTrackerService resourceTracker) {
|
||||
this(nodeIdStr, capatibility, resourceTracker,
|
||||
YarnVersionInfo.getVersion());
|
||||
}
|
||||
|
||||
public MockNM(String nodeIdStr, Resource capatibility,
|
||||
ResourceTrackerService resourceTracker, String version) {
|
||||
this.capatibility = capatibility;
|
||||
this.resourceTracker = resourceTracker;
|
||||
this.version = version;
|
||||
String[] splits = nodeIdStr.split(":");
|
||||
|
@ -146,8 +158,7 @@ public class MockNM {
|
|||
RegisterNodeManagerRequest.class);
|
||||
req.setNodeId(nodeId);
|
||||
req.setHttpPort(httpPort);
|
||||
Resource resource = BuilderUtils.newResource(memory, vCores);
|
||||
req.setResource(resource);
|
||||
req.setResource(capatibility);
|
||||
req.setContainerStatuses(containerReports);
|
||||
req.setNMVersion(version);
|
||||
req.setRunningApplications(runningApplications);
|
||||
|
@ -158,8 +169,7 @@ public class MockNM {
|
|||
this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey();
|
||||
Resource newResource = registrationResponse.getResource();
|
||||
if (newResource != null) {
|
||||
memory = (int) newResource.getMemorySize();
|
||||
vCores = newResource.getVirtualCores();
|
||||
capatibility = Resources.clone(newResource);
|
||||
}
|
||||
containerStats.clear();
|
||||
if (containerReports != null) {
|
||||
|
@ -184,7 +194,7 @@ public class MockNM {
|
|||
long containerId, ContainerState containerState) throws Exception {
|
||||
ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
|
||||
BuilderUtils.newContainerId(attemptId, containerId), containerState,
|
||||
"Success", 0, BuilderUtils.newResource(memory, vCores));
|
||||
"Success", 0, capatibility);
|
||||
ArrayList<ContainerStatus> containerStatusList =
|
||||
new ArrayList<ContainerStatus>(1);
|
||||
containerStatusList.add(containerStatus);
|
||||
|
@ -264,19 +274,22 @@ public class MockNM {
|
|||
|
||||
Resource newResource = heartbeatResponse.getResource();
|
||||
if (newResource != null) {
|
||||
memory = newResource.getMemorySize();
|
||||
vCores = newResource.getVirtualCores();
|
||||
capatibility = Resources.clone(newResource);
|
||||
}
|
||||
|
||||
return heartbeatResponse;
|
||||
}
|
||||
|
||||
public long getMemory() {
|
||||
return memory;
|
||||
return capatibility.getMemorySize();
|
||||
}
|
||||
|
||||
public int getvCores() {
|
||||
return vCores;
|
||||
return capatibility.getVirtualCores();
|
||||
}
|
||||
|
||||
public Resource getCapatibility() {
|
||||
return capatibility;
|
||||
}
|
||||
|
||||
public String getVersion() {
|
||||
|
|
|
@ -848,6 +848,15 @@ public class MockRM extends ResourceManager {
|
|||
return nm;
|
||||
}
|
||||
|
||||
public MockNM registerNode(String nodeIdStr, Resource nodeCapatibility)
|
||||
throws Exception {
|
||||
MockNM nm = new MockNM(nodeIdStr, nodeCapatibility,
|
||||
getResourceTrackerService());
|
||||
nm.registerNode();
|
||||
drainEventsImplicitly();
|
||||
return nm;
|
||||
}
|
||||
|
||||
public void sendNodeStarted(MockNM nm) throws Exception {
|
||||
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
|
||||
nm.getNodeId());
|
||||
|
|
|
@ -48,7 +48,6 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
@ -676,38 +675,4 @@ public class TestApplicationMasterService {
|
|||
Assert.fail("Cannot find RMContainer");
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 3000000)
|
||||
public void testResourceProfiles() throws Exception {
|
||||
|
||||
MockRM rm = new MockRM(conf);
|
||||
rm.start();
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
|
||||
RMApp app1 = rm.submitApp(2048);
|
||||
nm1.nodeHeartbeat(true);
|
||||
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
||||
RegisterApplicationMasterResponse resp = am1.registerAppAttempt();
|
||||
Assert.assertEquals(0, resp.getResourceProfiles().size());
|
||||
rm.stop();
|
||||
conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, true);
|
||||
conf.set(YarnConfiguration.RM_RESOURCE_PROFILES_SOURCE_FILE,
|
||||
"profiles/sample-profiles-1.json");
|
||||
rm = new MockRM(conf);
|
||||
rm.start();
|
||||
nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
|
||||
app1 = rm.submitApp(2048);
|
||||
nm1.nodeHeartbeat(true);
|
||||
attempt1 = app1.getCurrentAppAttempt();
|
||||
am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
||||
resp = am1.registerAppAttempt();
|
||||
Assert.assertEquals(3, resp.getResourceProfiles().size());
|
||||
Assert.assertEquals(Resource.newInstance(1024, 1),
|
||||
resp.getResourceProfiles().get("minimum"));
|
||||
Assert.assertEquals(Resource.newInstance(2048, 2),
|
||||
resp.getResourceProfiles().get("default"));
|
||||
Assert.assertEquals(Resource.newInstance(4096, 4),
|
||||
resp.getResourceProfiles().get("maximum"));
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,79 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.resource;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YARNFeatureNotEnabledException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Mock ResourceProfileManager for unit test.
|
||||
*/
|
||||
public class MockResourceProfileManager extends ResourceProfilesManagerImpl {
|
||||
private Map<String, Resource> profiles;
|
||||
private boolean featureEnabled;
|
||||
|
||||
public MockResourceProfileManager(Map<String, Resource> profiles) {
|
||||
this.profiles = new HashMap<>();
|
||||
this.profiles.putAll(profiles);
|
||||
|
||||
// Set minimum / maximum allocation so test doesn't need to add them
|
||||
// every time.
|
||||
this.profiles.put(ResourceProfilesManagerImpl.MINIMUM_PROFILE,
|
||||
ResourceUtils.getResourceTypesMinimumAllocation());
|
||||
this.profiles.put(ResourceProfilesManagerImpl.MAXIMUM_PROFILE,
|
||||
ResourceUtils.getResourceTypesMaximumAllocation());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(Configuration config) throws IOException {
|
||||
this.featureEnabled = config.getBoolean(
|
||||
YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED,
|
||||
YarnConfiguration.DEFAULT_RM_RESOURCE_PROFILES_ENABLED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getProfile(String profile) throws YarnException {
|
||||
if (!featureEnabled) {
|
||||
throw new YARNFeatureNotEnabledException("");
|
||||
}
|
||||
return profiles.get(profile);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Resource> getResourceProfiles()
|
||||
throws YARNFeatureNotEnabledException {
|
||||
if (!featureEnabled) {
|
||||
throw new YARNFeatureNotEnabledException("");
|
||||
}
|
||||
return profiles;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reloadProfiles() throws IOException {
|
||||
throw new IOException("Not supported");
|
||||
}
|
||||
}
|
|
@ -19,9 +19,15 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.resource;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -73,7 +79,7 @@ public class TestResourceProfiles {
|
|||
Map<String, Resource> expected = new HashMap<>();
|
||||
expected.put("minimum", Resource.newInstance(1024, 1));
|
||||
expected.put("default", Resource.newInstance(2048, 2));
|
||||
expected.put("maximum", Resource.newInstance(4096, 4));
|
||||
expected.put("maximum", Resource.newInstance(8192, 4));
|
||||
|
||||
for (Map.Entry<String, Resource> entry : expected.entrySet()) {
|
||||
String profile = entry.getKey();
|
||||
|
@ -86,7 +92,7 @@ public class TestResourceProfiles {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testLoadProfilesMissingMandatoryProfile() throws Exception {
|
||||
public void testLoadIllegalProfiles() throws Exception {
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, true);
|
||||
|
@ -115,7 +121,7 @@ public class TestResourceProfiles {
|
|||
Map<String, Resource> expected = new HashMap<>();
|
||||
expected.put("minimum", Resource.newInstance(1024, 1));
|
||||
expected.put("default", Resource.newInstance(2048, 2));
|
||||
expected.put("maximum", Resource.newInstance(4096, 4));
|
||||
expected.put("maximum", Resource.newInstance(8192, 4));
|
||||
expected.put("small", Resource.newInstance(1024, 1));
|
||||
expected.put("medium", Resource.newInstance(2048, 1));
|
||||
expected.put("large", Resource.newInstance(4096, 4));
|
||||
|
@ -139,7 +145,7 @@ public class TestResourceProfiles {
|
|||
Map<String, Resource> expected = new HashMap<>();
|
||||
expected.put("minimum", Resource.newInstance(1024, 1));
|
||||
expected.put("default", Resource.newInstance(2048, 2));
|
||||
expected.put("maximum", Resource.newInstance(4096, 4));
|
||||
expected.put("maximum", Resource.newInstance(8192, 4));
|
||||
|
||||
Assert.assertEquals("Profile 'minimum' resources don't match",
|
||||
expected.get("minimum"), manager.getMinimumProfile());
|
||||
|
@ -149,4 +155,38 @@ public class TestResourceProfiles {
|
|||
expected.get("maximum"), manager.getMaximumProfile());
|
||||
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testResourceProfilesInAMResponse() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
MockRM rm = new MockRM(conf);
|
||||
rm.start();
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * 1024);
|
||||
RMApp app1 = rm.submitApp(2048);
|
||||
nm1.nodeHeartbeat(true);
|
||||
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
|
||||
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
||||
RegisterApplicationMasterResponse resp = am1.registerAppAttempt();
|
||||
Assert.assertEquals(0, resp.getResourceProfiles().size());
|
||||
rm.stop();
|
||||
conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, true);
|
||||
conf.set(YarnConfiguration.RM_RESOURCE_PROFILES_SOURCE_FILE,
|
||||
"profiles/sample-profiles-1.json");
|
||||
rm = new MockRM(conf);
|
||||
rm.start();
|
||||
nm1 = rm.registerNode("127.0.0.1:1234", 6 * 1024);
|
||||
app1 = rm.submitApp(2048);
|
||||
nm1.nodeHeartbeat(true);
|
||||
attempt1 = app1.getCurrentAppAttempt();
|
||||
am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
|
||||
resp = am1.registerAppAttempt();
|
||||
Assert.assertEquals(3, resp.getResourceProfiles().size());
|
||||
Assert.assertEquals(Resource.newInstance(1024, 1),
|
||||
resp.getResourceProfiles().get("minimum"));
|
||||
Assert.assertEquals(Resource.newInstance(2048, 2),
|
||||
resp.getResourceProfiles().get("default"));
|
||||
Assert.assertEquals(Resource.newInstance(8192, 4),
|
||||
resp.getResourceProfiles().get("maximum"));
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,141 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||
import org.apache.hadoop.yarn.api.records.ProfileCapability;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.MockResourceProfileManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceProfilesManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Test Capacity Scheduler with multiple resource types.
|
||||
*/
|
||||
public class TestCapacitySchedulerWithMultiResourceTypes {
|
||||
private static String RESOURCE_1 = "res1";
|
||||
private final int GB = 1024;
|
||||
|
||||
@Test
|
||||
public void testBasicCapacitySchedulerWithProfile() throws Exception {
|
||||
|
||||
// Initialize resource map
|
||||
Map<String, ResourceInformation> riMap = new HashMap<>();
|
||||
|
||||
// Initialize mandatory resources
|
||||
riMap.put(ResourceInformation.MEMORY_URI, ResourceInformation.MEMORY_MB);
|
||||
riMap.put(ResourceInformation.VCORES_URI, ResourceInformation.VCORES);
|
||||
riMap.put(RESOURCE_1, ResourceInformation
|
||||
.newInstance(RESOURCE_1, "", 0, ResourceTypes.COUNTABLE, 0,
|
||||
Integer.MAX_VALUE));
|
||||
|
||||
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||
|
||||
CapacitySchedulerConfiguration csconf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f);
|
||||
csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f);
|
||||
csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
|
||||
100.0f);
|
||||
csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f);
|
||||
csconf.setResourceComparator(DominantResourceCalculator.class);
|
||||
|
||||
YarnConfiguration conf = new YarnConfiguration(csconf);
|
||||
// Don't reset resource types since we have already configured resource
|
||||
// types
|
||||
conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false);
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
conf.setBoolean(YarnConfiguration.RM_RESOURCE_PROFILES_ENABLED, true);
|
||||
|
||||
final MockResourceProfileManager mrpm = new MockResourceProfileManager(
|
||||
ImmutableMap.of("res-1", TestUtils
|
||||
.createResource(2 * GB, 2, ImmutableMap.of(RESOURCE_1, 2))));
|
||||
|
||||
MockRM rm = new MockRM(conf) {
|
||||
@Override
|
||||
protected ResourceProfilesManager createResourceProfileManager() {
|
||||
return mrpm;
|
||||
}
|
||||
};
|
||||
rm.start();
|
||||
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
|
||||
|
||||
MockNM nm1 = rm.registerNode("h1:1234",
|
||||
TestUtils.createResource(8 * GB, 8, ImmutableMap.of(RESOURCE_1, 8)));
|
||||
|
||||
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
|
||||
|
||||
Assert.assertEquals(Resource.newInstance(1 * GB, 0),
|
||||
leafQueue.getUsedResources());
|
||||
|
||||
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
|
||||
|
||||
// Now request resource:
|
||||
am1.allocate(Arrays.asList(ResourceRequest.newBuilder().capability(
|
||||
Resource.newInstance(1 * GB, 1)).numContainers(1).resourceName("*")
|
||||
.profileCapability(ProfileCapability
|
||||
.newInstance("res-1",
|
||||
Resource.newInstance(2 * GB, 2))).build()),
|
||||
null);
|
||||
|
||||
// Do node heartbeats 1 time and check container allocated.
|
||||
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
||||
|
||||
// Now used resource = <mem=1GB, vcore=0> + <mem=2GB,vcore=2,res_1=2>
|
||||
Assert.assertEquals(
|
||||
TestUtils.createResource(3 * GB, 2, ImmutableMap.of(RESOURCE_1, 2)),
|
||||
leafQueue.getUsedResources());
|
||||
|
||||
// Acquire container
|
||||
AllocateResponse amResponse = am1.allocate(null, null);
|
||||
Assert.assertFalse(amResponse.getAllocatedContainers().isEmpty());
|
||||
ContainerTokenIdentifier containerTokenIdentifier =
|
||||
BuilderUtils.newContainerTokenIdentifier(
|
||||
amResponse.getAllocatedContainers().get(0).getContainerToken());
|
||||
Assert.assertEquals(
|
||||
TestUtils.createResource(2 * GB, 2, ImmutableMap.of(RESOURCE_1, 2)),
|
||||
containerTokenIdentifier.getResource());
|
||||
}
|
||||
}
|
|
@ -18,16 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -38,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
|
@ -53,21 +45,28 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.hadoop.yarn.event.Event;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class TestUtils {
|
||||
private static final Log LOG = LogFactory.getLog(TestUtils.class);
|
||||
|
@ -457,4 +456,21 @@ public class TestUtils {
|
|||
cs.submitResourceCommitRequest(clusterResource,
|
||||
csAssignment);
|
||||
}
|
||||
|
||||
/**
|
||||
* An easy way to create resources other than memory and vcores for tests.
|
||||
* @param memory memory
|
||||
* @param vcores vcores
|
||||
* @param nameToValues resource types other than memory and vcores.
|
||||
* @return created resource
|
||||
*/
|
||||
public static Resource createResource(long memory, int vcores,
|
||||
Map<String, Integer> nameToValues) {
|
||||
Resource res = Resource.newInstance(memory, vcores);
|
||||
for (Map.Entry<String, Integer> entry : nameToValues.entrySet()) {
|
||||
res.setResourceInformation(entry.getKey(), ResourceInformation
|
||||
.newInstance(entry.getKey(), "", entry.getValue()));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,14 +1,6 @@
|
|||
{
|
||||
"minimum": {
|
||||
"memory-mb" : 1024,
|
||||
"vcores" : 1
|
||||
},
|
||||
"default" : {
|
||||
"memory-mb" : 2048,
|
||||
"vcores" : 2
|
||||
},
|
||||
"maximum" : {
|
||||
"memory-mb": 4096,
|
||||
"vcores" : 4
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,16 +1,8 @@
|
|||
{
|
||||
"minimum": {
|
||||
"memory-mb" : 1024,
|
||||
"vcores" : 1
|
||||
},
|
||||
"default" : {
|
||||
"memory-mb" : 2048,
|
||||
"vcores" : 2
|
||||
},
|
||||
"maximum" : {
|
||||
"memory-mb": 4096,
|
||||
"vcores" : 4
|
||||
},
|
||||
"small" : {
|
||||
"memory-mb": 1024,
|
||||
"vcores": 1
|
||||
|
|
Loading…
Reference in New Issue