YARN-4832. NM side resource value should get updated if change applied in RM side. Contributed by Junping Du

(cherry picked from commit 264c06a438)

Conflicts:
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
	hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
This commit is contained in:
Jian He 2016-05-17 15:02:14 -07:00 committed by Junping Du
parent 2b4950b094
commit ca66bedeff
9 changed files with 284 additions and 24 deletions

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
@ -76,6 +77,9 @@ public interface NodeHeartbeatResponse {
boolean getAreNodeLabelsAcceptedByRM();
void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
Resource getResource();
void setResource(Resource resource);
List<Container> getContainersToDecrease();
void addAllContainersToDecrease(Collection<Container> containersToDecrease);
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
@ -46,6 +47,9 @@ public interface RegisterNodeManagerResponse {
String getRMVersion();
Resource getResource();
void setResource(Resource resource);
boolean getAreNodeLabelsAcceptedByRM();
void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
}

View File

@ -31,14 +31,17 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequest
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
@ -51,7 +54,6 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
public class NodeHeartbeatResponsePBImpl extends
ProtoBase<NodeHeartbeatResponseProto> implements NodeHeartbeatResponse {
NodeHeartbeatResponseProto proto = NodeHeartbeatResponseProto.getDefaultInstance();
@ -62,6 +64,7 @@ public class NodeHeartbeatResponsePBImpl extends
private List<ContainerId> containersToBeRemovedFromNM = null;
private List<ApplicationId> applicationsToCleanup = null;
private Map<ApplicationId, ByteBuffer> systemCredentials = null;
private Resource resource = null;
private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null;
@ -111,6 +114,9 @@ public class NodeHeartbeatResponsePBImpl extends
if (this.containersToSignal != null) {
addContainersToSignalToProto();
}
if (this.resource != null) {
builder.setResource(convertToProtoFormat(this.resource));
}
}
private void addSystemCredentialsToProto() {
@ -152,6 +158,28 @@ public class NodeHeartbeatResponsePBImpl extends
builder.setResponseId((responseId));
}
@Override
public Resource getResource() {
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
if (this.resource != null) {
return this.resource;
}
if (!p.hasResource()) {
return null;
}
this.resource = convertFromProtoFormat(p.getResource());
return this.resource;
}
@Override
public void setResource(Resource resource) {
maybeInitBuilder();
if (resource == null) {
builder.clearResource();
}
this.resource = resource;
}
@Override
public MasterKey getContainerTokenMasterKey() {
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
@ -533,6 +561,14 @@ public class NodeHeartbeatResponsePBImpl extends
return ((ContainerIdPBImpl) t).getProto();
}
private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
return new ResourcePBImpl(p);
}
private ResourceProto convertToProtoFormat(Resource t) {
return ((ResourcePBImpl)t).getProto();
}
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
}

View File

@ -19,7 +19,10 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProto;
@ -30,11 +33,11 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeManagerResponseProto> implements RegisterNodeManagerResponse {
RegisterNodeManagerResponseProto proto = RegisterNodeManagerResponseProto.getDefaultInstance();
RegisterNodeManagerResponseProto.Builder builder = null;
boolean viaProto = false;
private Resource resource = null;
private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null;
@ -67,6 +70,9 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
builder.setNmTokenMasterKey(
convertToProtoFormat(this.nmTokenMasterKey));
}
if (this.resource != null) {
builder.setResource(convertToProtoFormat(this.resource));
}
}
private void mergeLocalToProto() {
@ -85,6 +91,28 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
viaProto = false;
}
@Override
public Resource getResource() {
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
if (this.resource != null) {
return this.resource;
}
if (!p.hasResource()) {
return null;
}
this.resource = convertFromProtoFormat(p.getResource());
return this.resource;
}
@Override
public void setResource(Resource resource) {
maybeInitBuilder();
if (resource == null) {
builder.clearResource();
}
this.resource = resource;
}
@Override
public MasterKey getContainerTokenMasterKey() {
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
@ -217,6 +245,14 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
return ((MasterKeyPBImpl)t).getProto();
}
private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
return new ResourcePBImpl(p);
}
private ResourceProto convertToProtoFormat(Resource t) {
return ((ResourcePBImpl)t).getProto();
}
@Override
public boolean getAreNodeLabelsAcceptedByRM() {
RegisterNodeManagerResponseProtoOrBuilder p =

View File

@ -48,6 +48,7 @@ message RegisterNodeManagerResponseProto {
optional string diagnostics_message = 5;
optional string rm_version = 6;
optional bool areNodeLabelsAcceptedByRM = 7 [default = false];
optional ResourceProto resource = 8;
}
message UnRegisterNodeManagerRequestProto {
@ -85,6 +86,7 @@ message NodeHeartbeatResponseProto {
optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
repeated ContainerProto containers_to_decrease = 12;
repeated SignalContainerRequestProto containers_to_signal = 13;
optional ResourceProto resource = 14;
}
message SystemCredentialsForAppsProto {

View File

@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import com.google.common.annotations.VisibleForTesting;
@ -392,8 +393,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
StringBuilder successfullRegistrationMsg = new StringBuilder();
successfullRegistrationMsg.append("Registered with ResourceManager as ")
.append(this.nodeId).append(" with total resource of ")
.append(this.nodeId);
Resource newResource = regNMResponse.getResource();
if (newResource != null) {
updateNMResource(newResource);
successfullRegistrationMsg.append(" with updated total resource of ")
.append(this.totalResource);
} else {
successfullRegistrationMsg.append(" with total resource of ")
.append(this.totalResource);
}
successfullRegistrationMsg.append(nodeLabelsHandler
.verifyRMRegistrationResponseForNodeLabels(regNMResponse));
@ -489,6 +499,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
return increasedContainers;
}
// Update NM's Resource.
private void updateNMResource(Resource resource) {
metrics.addResource(Resources.subtract(resource, totalResource));
this.totalResource = resource;
}
// Iterate through the NMContext and clone and get all the containers'
// statuses. If it's a completed container, add into the
// recentlyStoppedContainers collections.
@ -829,6 +845,15 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
dispatcher.getEventHandler().handle(
new CMgrSignalContainersEvent(containersToSignal));
}
// Handling node resource update case.
Resource newResource = response.getResource();
if (newResource != null) {
updateNMResource(newResource);
if (LOG.isDebugEnabled()) {
LOG.debug("Node's resource is updated to " +
newResource.toString());
}
}
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
dispatcher.getEventHandler().handle(

View File

@ -26,6 +26,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -97,6 +100,9 @@ public class ResourceTrackerService extends AbstractService implements
private final RMContainerTokenSecretManager containerTokenSecretManager;
private final NMTokenSecretManagerInRM nmTokenSecretManager;
private final ReadLock readLock;
private final WriteLock writeLock;
private long nextHeartBeatInterval;
private Server server;
private InetSocketAddress resourceTrackerAddress;
@ -107,7 +113,7 @@ public class ResourceTrackerService extends AbstractService implements
private boolean isDistributedNodeLabelsConf;
private boolean isDelegatedCentralizedNodeLabelsConf;
private volatile DynamicResourceConfiguration drConf;
private DynamicResourceConfiguration drConf;
public ResourceTrackerService(RMContext rmContext,
NodesListManager nodesListManager,
@ -120,7 +126,9 @@ public class ResourceTrackerService extends AbstractService implements
this.nmLivelinessMonitor = nmLivelinessMonitor;
this.containerTokenSecretManager = containerTokenSecretManager;
this.nmTokenSecretManager = nmTokenSecretManager;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
}
@Override
@ -160,7 +168,6 @@ public class ResourceTrackerService extends AbstractService implements
}
loadDynamicResourceConfiguration(conf);
super.serviceInit(conf);
}
@ -176,6 +183,9 @@ public class ResourceTrackerService extends AbstractService implements
InputStream drInputStream = this.rmContext.getConfigurationProvider()
.getConfigurationInputStream(conf,
YarnConfiguration.DR_CONFIGURATION_FILE);
// write lock here on drConfig is unnecessary as here get called at
// ResourceTrackerService get initiated and other read and write
// operations haven't started yet.
if (drInputStream != null) {
this.drConf = new DynamicResourceConfiguration(conf, drInputStream);
} else {
@ -192,7 +202,12 @@ public class ResourceTrackerService extends AbstractService implements
*/
public void updateDynamicResourceConfiguration(
DynamicResourceConfiguration conf) {
this.writeLock.lock();
try {
this.drConf = conf;
} finally {
this.writeLock.unlock();
}
}
@Override
@ -233,6 +248,7 @@ public class ResourceTrackerService extends AbstractService implements
if (this.server != null) {
this.server.stop();
}
super.serviceStop();
}
@ -327,16 +343,18 @@ public class ResourceTrackerService extends AbstractService implements
}
// check if node's capacity is load from dynamic-resources.xml
String[] nodes = this.drConf.getNodes();
String nid = nodeId.toString();
if (nodes != null && Arrays.asList(nodes).contains(nid)) {
capability.setMemorySize(this.drConf.getMemoryPerNode(nid));
capability.setVirtualCores(this.drConf.getVcoresPerNode(nid));
Resource dynamicLoadCapability = loadNodeResourceFromDRConfiguration(nid);
if (dynamicLoadCapability != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Resource for node: " + nid + " is adjusted to " +
capability + " due to settings in dynamic-resources.xml.");
LOG.debug("Resource for node: " + nid + " is adjusted from: " +
capability + " to: " + dynamicLoadCapability +
" due to settings in dynamic-resources.xml.");
}
capability = dynamicLoadCapability;
// sync back with new resource.
response.setResource(capability);
}
// Check if this node has minimum allocations
@ -532,6 +550,15 @@ public class ResourceTrackerService extends AbstractService implements
}
}
// 6. check if node's capacity is load from dynamic-resources.xml
// if so, send updated resource back to NM.
String nid = nodeId.toString();
Resource capability = loadNodeResourceFromDRConfiguration(nid);
// sync back with new resource if not null.
if (capability != null) {
nodeHeartBeatResponse.setResource(capability);
}
return nodeHeartBeatResponse;
}
@ -618,6 +645,22 @@ public class ResourceTrackerService extends AbstractService implements
}
}
private Resource loadNodeResourceFromDRConfiguration(String nodeId) {
// check if node's capacity is loaded from dynamic-resources.xml
this.readLock.lock();
try {
String[] nodes = this.drConf.getNodes();
if (nodes != null && Arrays.asList(nodes).contains(nodeId)) {
return Resource.newInstance(this.drConf.getMemoryPerNode(nodeId),
this.drConf.getVcoresPerNode(nodeId));
} else {
return null;
}
} finally {
this.readLock.unlock();
}
}
/**
* resolving the network topology.
* @param hostName the hostname of this node.

View File

@ -156,6 +156,11 @@ public class MockNM {
}
}
}
Resource newResource = registrationResponse.getResource();
if (newResource != null) {
memory = newResource.getMemory();
vCores = newResource.getVirtualCores();
}
return registrationResponse;
}
@ -237,6 +242,12 @@ public class MockNM {
this.currentNMTokenMasterKey = masterKeyFromRM;
}
Resource newResource = heartbeatResponse.getResource();
if (newResource != null) {
memory = newResource.getMemory();
vCores = newResource.getVirtualCores();
}
return heartbeatResponse;
}

View File

@ -240,6 +240,105 @@ public class TestRMAdminService {
Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString());
}
@Test
public void testRefreshNodesResourceWithResourceReturnInRegistration()
throws IOException, YarnException {
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
//upload default configurations
uploadDefaultConfiguration();
MockNM nm = null;
try {
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
nm = rm.registerNode("h1:1234", 2048, 2);
} catch(Exception ex) {
fail("Should not get any exceptions");
}
NodeId nid = NodeId.fromString("h1:1234");
RMNode ni = rm.getRMContext().getRMNodes().get(nid);
Resource resource = ni.getTotalCapability();
Assert.assertEquals("<memory:2048, vCores:2>", resource.toString());
DynamicResourceConfiguration drConf =
new DynamicResourceConfiguration();
drConf.set("yarn.resource.dynamic.nodes", "h1:1234");
drConf.set("yarn.resource.dynamic.h1:1234.vcores", "4");
drConf.set("yarn.resource.dynamic.h1:1234.memory", "4096");
uploadConfiguration(drConf, "dynamic-resources.xml");
rm.adminService.refreshNodesResources(
RefreshNodesResourcesRequest.newInstance());
try {
// register the same node again with original resource.
// validate this will get new resource back;
nm.registerNode();
} catch (Exception ex) {
fail("Should not get any exceptions");
}
RMNode niAfter = rm.getRMContext().getRMNodes().get(nid);
Resource resourceAfter = niAfter.getTotalCapability();
Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString());
Assert.assertEquals(4096, nm.getMemory());
Assert.assertEquals(4, nm.getvCores());
}
@Test
public void testRefreshNodesResourceWithResourceReturnInHeartbeat()
throws IOException, YarnException {
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
//upload default configurations
uploadDefaultConfiguration();
MockNM nm = null;
try {
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
nm = rm.registerNode("h1:1234", 2048, 2);
} catch(Exception ex) {
fail("Should not get any exceptions");
}
NodeId nid = NodeId.fromString("h1:1234");
RMNode ni = rm.getRMContext().getRMNodes().get(nid);
Resource resource = ni.getTotalCapability();
Assert.assertEquals("<memory:2048, vCores:2>", resource.toString());
DynamicResourceConfiguration drConf =
new DynamicResourceConfiguration();
drConf.set("yarn.resource.dynamic.nodes", "h1:1234");
drConf.set("yarn.resource.dynamic.h1:1234.vcores", "4");
drConf.set("yarn.resource.dynamic.h1:1234.memory", "4096");
uploadConfiguration(drConf, "dynamic-resources.xml");
rm.adminService.refreshNodesResources(
RefreshNodesResourcesRequest.newInstance());
try {
// NM-RM heartbeat, validate that this will get new resource back.
nm.nodeHeartbeat(true);
} catch (Exception ex) {
fail("Should not get any exceptions");
}
RMNode niAfter = rm.getRMContext().getRMNodes().get(nid);
Resource resourceAfter = niAfter.getTotalCapability();
Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString());
Assert.assertEquals(4096, nm.getMemory());
Assert.assertEquals(4, nm.getvCores());
}
@Test
public void testResourcePersistentForNMRegistrationWithNewResource()
throws IOException, YarnException {