YARN-998. Keep NM resource updated through dynamic resource config for RM/NM restart. Contributed by Junping Du

This commit is contained in:
Jian He 2016-03-28 11:12:33 -07:00
parent 5a552973f4
commit c7d843af3b
4 changed files with 165 additions and 44 deletions

View File

@ -639,34 +639,32 @@ public class AdminService extends CompositeService implements
try {
Configuration conf = getConfig();
Configuration configuration = new Configuration(conf);
DynamicResourceConfiguration newconf;
DynamicResourceConfiguration newConf;
InputStream DRInputStream =
this.rmContext.getConfigurationProvider()
.getConfigurationInputStream(configuration,
YarnConfiguration.DR_CONFIGURATION_FILE);
if (DRInputStream != null) {
configuration.addResource(DRInputStream);
newconf = new DynamicResourceConfiguration(configuration, false);
InputStream drInputStream =
this.rmContext.getConfigurationProvider().getConfigurationInputStream(
configuration, YarnConfiguration.DR_CONFIGURATION_FILE);
if (drInputStream != null) {
newConf = new DynamicResourceConfiguration(configuration,
drInputStream);
} else {
newconf = new DynamicResourceConfiguration(configuration, true);
newConf = new DynamicResourceConfiguration(configuration);
}
if (newconf.getNodes() == null || newconf.getNodes().length == 0) {
RMAuditLogger.logSuccess(user.getShortUserName(), argName,
"AdminService");
return response;
} else {
if (newConf.getNodes() != null && newConf.getNodes().length != 0) {
Map<NodeId, ResourceOption> nodeResourceMap =
newconf.getNodeResourceMap();
newConf.getNodeResourceMap();
UpdateNodeResourceRequest updateRequest =
UpdateNodeResourceRequest.newInstance(nodeResourceMap);
UpdateNodeResourceRequest.newInstance(nodeResourceMap);
updateNodeResource(updateRequest);
RMAuditLogger.logSuccess(user.getShortUserName(), argName,
"AdminService");
return response;
}
// refresh dynamic resource in ResourceTrackerService
this.rmContext.getResourceTrackerService().
updateDynamicResourceConfiguration(newConf);
RMAuditLogger.logSuccess(user.getShortUserName(), argName,
"AdminService");
return response;
} catch (IOException ioe) {
throw logAndWrapException(ioe, user.getShortUserName(), argName, msg);
}

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@ -62,6 +63,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NodeLabelsUtils;
import org.apache.hadoop.yarn.server.resourcemanager.resource.DynamicResourceConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
@ -105,6 +107,7 @@ public class ResourceTrackerService extends AbstractService implements
private boolean isDistributedNodeLabelsConf;
private boolean isDelegatedCentralizedNodeLabelsConf;
private volatile DynamicResourceConfiguration drConf;
public ResourceTrackerService(RMContext rmContext,
NodesListManager nodesListManager,
@ -139,11 +142,11 @@ public class ResourceTrackerService extends AbstractService implements
}
minAllocMb = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
minAllocVcores = conf.getInt(
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
minimumNodeManagerVersion = conf.get(
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
@ -156,9 +159,42 @@ public class ResourceTrackerService extends AbstractService implements
YarnConfiguration.isDelegatedCentralizedNodeLabelConfiguration(conf);
}
loadDynamicResourceConfiguration(conf);
super.serviceInit(conf);
}
/**
* Load DynamicResourceConfiguration from dynamic-resources.xml.
* @param conf
* @throws IOException
*/
public void loadDynamicResourceConfiguration(Configuration conf)
throws IOException {
try {
// load dynamic-resources.xml
InputStream drInputStream = this.rmContext.getConfigurationProvider()
.getConfigurationInputStream(conf,
YarnConfiguration.DR_CONFIGURATION_FILE);
if (drInputStream != null) {
this.drConf = new DynamicResourceConfiguration(conf, drInputStream);
} else {
this.drConf = new DynamicResourceConfiguration(conf);
}
} catch (Exception e) {
throw new IOException(e);
}
}
/**
* Update DynamicResourceConfiguration with new configuration.
* @param conf
*/
public void updateDynamicResourceConfiguration(
DynamicResourceConfiguration conf) {
this.drConf = conf;
}
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
@ -166,11 +202,10 @@ public class ResourceTrackerService extends AbstractService implements
// security is enabled, so no secretManager.
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
this.server =
rpc.getServer(ResourceTracker.class, this, resourceTrackerAddress,
conf, null,
conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
this.server = rpc.getServer(
ResourceTracker.class, this, resourceTrackerAddress, conf, null,
conf.getInt(YarnConfiguration.RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_CLIENT_THREAD_COUNT));
// Enable service authorization?
if (conf.getBoolean(
@ -188,9 +223,9 @@ public class ResourceTrackerService extends AbstractService implements
this.server.start();
conf.updateConnectAddr(YarnConfiguration.RM_BIND_HOST,
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
server.getListenerAddress());
YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS,
YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_ADDRESS,
server.getListenerAddress());
}
@Override
@ -295,6 +330,19 @@ public class ResourceTrackerService extends AbstractService implements
return response;
}
// check if node's capacity is load from dynamic-resources.xml
String[] nodes = this.drConf.getNodes();
String nid = nodeId.toString();
if (nodes != null && Arrays.asList(nodes).contains(nid)) {
capability.setMemory(this.drConf.getMemoryPerNode(nid));
capability.setVirtualCores(this.drConf.getVcoresPerNode(nid));
if (LOG.isDebugEnabled()) {
LOG.debug("Resource for node: " + nid + " is adjusted to " +
capability + " due to settings in dynamic-resources.xml.");
}
}
// Check if this node has minimum allocations
if (capability.getMemory() < minAllocMb
|| capability.getVirtualCores() < minAllocVcores) {

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.resource;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
@ -38,8 +40,6 @@ public class DynamicResourceConfiguration extends Configuration {
private static final Log LOG =
LogFactory.getLog(DynamicResourceConfiguration.class);
private static final String DR_CONFIGURATION_FILE = "dynamic-resources.xml";
@Private
public static final String PREFIX = "yarn.resource.dynamic.";
@ -63,15 +63,14 @@ public class DynamicResourceConfiguration extends Configuration {
}
public DynamicResourceConfiguration(Configuration configuration) {
this(configuration, true);
super(configuration);
addResource(YarnConfiguration.DR_CONFIGURATION_FILE);
}
public DynamicResourceConfiguration(Configuration configuration,
boolean useLocalConfigurationProvider) {
InputStream drInputStream) {
super(configuration);
if (useLocalConfigurationProvider) {
addResource(DR_CONFIGURATION_FILE);
}
addResource(drInputStream);
}
private String getNodePrefix(String node) {

View File

@ -27,7 +27,9 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
@ -202,7 +204,7 @@ public class TestRMAdminService {
}
@Test
public void testAdminRefreshNodesResourcesWithFileSystemBasedConfigurationProvider()
public void testRefreshNodesResourceWithFileSystemBasedConfigurationProvider()
throws IOException, YarnException {
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
@ -239,6 +241,75 @@ public class TestRMAdminService {
Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString());
}
@Test
public void testResourcePersistentForNMRegistrationWithNewResource()
throws IOException, YarnException {
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
//upload default configurations
uploadDefaultConfiguration();
try {
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
rm.registerNode("h1:1234", 5120);
} catch(Exception ex) {
fail("Should not get any exceptions");
}
NodeId nid = ConverterUtils.toNodeId("h1:1234");
RMNode ni = rm.getRMContext().getRMNodes().get(nid);
Resource resource = ni.getTotalCapability();
Assert.assertEquals("<memory:5120, vCores:5>", resource.toString());
DynamicResourceConfiguration drConf =
new DynamicResourceConfiguration();
drConf.set("yarn.resource.dynamic.nodes", "h1:1234");
drConf.set("yarn.resource.dynamic.h1:1234.vcores", "4");
drConf.set("yarn.resource.dynamic.h1:1234.memory", "4096");
uploadConfiguration(drConf, "dynamic-resources.xml");
rm.adminService.refreshNodesResources(
RefreshNodesResourcesRequest.newInstance());
try {
// register the same node again with a different resource.
// validate this won't work as resource cached in RM side.
rm.registerNode("h1:1234", 8192, 8);
} catch (Exception ex) {
fail("Should not get any exceptions");
}
RMNode niAfter = rm.getRMContext().getRMNodes().get(nid);
Resource resourceAfter = niAfter.getTotalCapability();
Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString());
// Replace original dr file with an empty dr file, and validate node
// registration with new resources will take effective now.
deleteOnRemoteFileSystem("dynamic-resources.xml");
DynamicResourceConfiguration emptyDRConf =
new DynamicResourceConfiguration();
uploadConfiguration(emptyDRConf, "dynamic-resources.xml");
rm.adminService.refreshNodesResources(
RefreshNodesResourcesRequest.newInstance());
try {
// register the same node third time, this time the register resource
// should work.
rm.registerNode("h1:1234", 8192, 8);
} catch (Exception ex) {
fail("Should not get any exceptions");
}
niAfter = rm.getRMContext().getRMNodes().get(nid);
resourceAfter = niAfter.getTotalCapability();
// new resource in registration should take effective as we empty
// dynamic resource file already.
Assert.assertEquals("<memory:8192, vCores:8>", resourceAfter.toString());
}
@Test
public void testAdminAclsWithLocalConfigurationProvider() {
rm = new MockRM(configuration);
@ -1006,6 +1077,11 @@ public class TestRMAdminService {
uploadToRemoteFileSystem(new Path(csConfFile));
}
private void deleteOnRemoteFileSystem(String fileName)
throws IOException {
fs.delete(new Path(workingPath, fileName));
}
private void uploadDefaultConfiguration() throws IOException {
Configuration conf = new Configuration();
uploadConfiguration(conf, "core-site.xml");