YARN-4832. NM side resource value should get updated if change applied in RM side. Contributed by Junping Du

This commit is contained in:
Jian He 2016-05-17 12:51:08 -07:00
parent 7cd5ae62f6
commit fa3bc3405d
9 changed files with 290 additions and 30 deletions

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
@ -77,6 +78,9 @@ public interface NodeHeartbeatResponse {
boolean getAreNodeLabelsAcceptedByRM();
void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
Resource getResource();
void setResource(Resource resource);
List<Container> getContainersToDecrease();
void addAllContainersToDecrease(Collection<Container> containersToDecrease);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
@ -46,6 +47,9 @@ public interface RegisterNodeManagerResponse {
String getRMVersion();
Resource getResource();
void setResource(Resource resource);
boolean getAreNodeLabelsAcceptedByRM();
void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
}

View File

@ -31,14 +31,17 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequest
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
@ -54,7 +57,6 @@ import org.apache.hadoop.yarn.server.api.records.impl.pb.ContainerQueuingLimitPB
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
public class NodeHeartbeatResponsePBImpl extends
ProtoBase<NodeHeartbeatResponseProto> implements NodeHeartbeatResponse {
NodeHeartbeatResponseProto proto = NodeHeartbeatResponseProto.getDefaultInstance();
@ -65,6 +67,7 @@ public class NodeHeartbeatResponsePBImpl extends
private List<ContainerId> containersToBeRemovedFromNM = null;
private List<ApplicationId> applicationsToCleanup = null;
private Map<ApplicationId, ByteBuffer> systemCredentials = null;
private Resource resource = null;
private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null;
@ -119,6 +122,9 @@ public class NodeHeartbeatResponsePBImpl extends
if (this.containersToSignal != null) {
addContainersToSignalToProto();
}
if (this.resource != null) {
builder.setResource(convertToProtoFormat(this.resource));
}
}
private void addSystemCredentialsToProto() {
@ -160,6 +166,28 @@ public class NodeHeartbeatResponsePBImpl extends
builder.setResponseId((responseId));
}
@Override
public Resource getResource() {
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
if (this.resource != null) {
return this.resource;
}
if (!p.hasResource()) {
return null;
}
this.resource = convertFromProtoFormat(p.getResource());
return this.resource;
}
@Override
public void setResource(Resource resource) {
maybeInitBuilder();
if (resource == null) {
builder.clearResource();
}
this.resource = resource;
}
@Override
public MasterKey getContainerTokenMasterKey() {
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
@ -565,6 +593,14 @@ public class NodeHeartbeatResponsePBImpl extends
return ((ContainerIdPBImpl) t).getProto();
}
private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
return new ResourcePBImpl(p);
}
private ResourceProto convertToProtoFormat(Resource t) {
return ((ResourcePBImpl)t).getProto();
}
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
}

View File

@ -19,7 +19,10 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProto;
@ -30,11 +33,11 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeManagerResponseProto> implements RegisterNodeManagerResponse {
RegisterNodeManagerResponseProto proto = RegisterNodeManagerResponseProto.getDefaultInstance();
RegisterNodeManagerResponseProto.Builder builder = null;
boolean viaProto = false;
private Resource resource = null;
private MasterKey containerTokenMasterKey = null;
private MasterKey nmTokenMasterKey = null;
@ -67,6 +70,9 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
builder.setNmTokenMasterKey(
convertToProtoFormat(this.nmTokenMasterKey));
}
if (this.resource != null) {
builder.setResource(convertToProtoFormat(this.resource));
}
}
private void mergeLocalToProto() {
@ -85,6 +91,28 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
viaProto = false;
}
@Override
public Resource getResource() {
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
if (this.resource != null) {
return this.resource;
}
if (!p.hasResource()) {
return null;
}
this.resource = convertFromProtoFormat(p.getResource());
return this.resource;
}
@Override
public void setResource(Resource resource) {
maybeInitBuilder();
if (resource == null) {
builder.clearResource();
}
this.resource = resource;
}
@Override
public MasterKey getContainerTokenMasterKey() {
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
@ -217,6 +245,14 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
return ((MasterKeyPBImpl)t).getProto();
}
private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
return new ResourcePBImpl(p);
}
private ResourceProto convertToProtoFormat(Resource t) {
return ((ResourcePBImpl)t).getProto();
}
@Override
public boolean getAreNodeLabelsAcceptedByRM() {
RegisterNodeManagerResponseProtoOrBuilder p =

View File

@ -63,6 +63,7 @@ message RegisterNodeManagerResponseProto {
optional string diagnostics_message = 5;
optional string rm_version = 6;
optional bool areNodeLabelsAcceptedByRM = 7 [default = false];
optional ResourceProto resource = 8;
}
message UnRegisterNodeManagerRequestProto {
@ -101,6 +102,7 @@ message NodeHeartbeatResponseProto {
repeated ContainerProto containers_to_decrease = 12;
repeated SignalContainerRequestProto containers_to_signal = 13;
optional ContainerQueuingLimitProto container_queuing_limit = 14;
optional ResourceProto resource = 15;
}
message ContainerQueuingLimitProto {

View File

@ -88,6 +88,7 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import com.google.common.annotations.VisibleForTesting;
@ -397,8 +398,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
StringBuilder successfullRegistrationMsg = new StringBuilder();
successfullRegistrationMsg.append("Registered with ResourceManager as ")
.append(this.nodeId).append(" with total resource of ")
.append(this.nodeId);
Resource newResource = regNMResponse.getResource();
if (newResource != null) {
updateNMResource(newResource);
successfullRegistrationMsg.append(" with updated total resource of ")
.append(this.totalResource);
} else {
successfullRegistrationMsg.append(" with total resource of ")
.append(this.totalResource);
}
successfullRegistrationMsg.append(nodeLabelsHandler
.verifyRMRegistrationResponseForNodeLabels(regNMResponse));
@ -498,6 +508,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
return increasedContainers;
}
// Update NM's Resource.
private void updateNMResource(Resource resource) {
metrics.addResource(Resources.subtract(resource, totalResource));
this.totalResource = resource;
}
// Iterate through the NMContext and clone and get all the containers'
// statuses. If it's a completed container, add into the
// recentlyStoppedContainers collections.
@ -880,6 +896,15 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
context.getContainerManager().updateQueuingLimit(queuingLimit);
}
}
// Handling node resource update case.
Resource newResource = response.getResource();
if (newResource != null) {
updateNMResource(newResource);
if (LOG.isDebugEnabled()) {
LOG.debug("Node's resource is updated to " +
newResource.toString());
}
}
} catch (ConnectException e) {
//catch and throw the exception if tried MAX wait time to connect RM
dispatcher.getEventHandler().handle(

View File

@ -26,6 +26,9 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -97,6 +100,9 @@ public class ResourceTrackerService extends AbstractService implements
private final RMContainerTokenSecretManager containerTokenSecretManager;
private final NMTokenSecretManagerInRM nmTokenSecretManager;
private final ReadLock readLock;
private final WriteLock writeLock;
private long nextHeartBeatInterval;
private Server server;
private InetSocketAddress resourceTrackerAddress;
@ -107,7 +113,7 @@ public class ResourceTrackerService extends AbstractService implements
private boolean isDistributedNodeLabelsConf;
private boolean isDelegatedCentralizedNodeLabelsConf;
private volatile DynamicResourceConfiguration drConf;
private DynamicResourceConfiguration drConf;
public ResourceTrackerService(RMContext rmContext,
NodesListManager nodesListManager,
@ -120,7 +126,9 @@ public class ResourceTrackerService extends AbstractService implements
this.nmLivelinessMonitor = nmLivelinessMonitor;
this.containerTokenSecretManager = containerTokenSecretManager;
this.nmTokenSecretManager = nmTokenSecretManager;
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
}
@Override
@ -160,7 +168,6 @@ public class ResourceTrackerService extends AbstractService implements
}
loadDynamicResourceConfiguration(conf);
super.serviceInit(conf);
}
@ -176,6 +183,9 @@ public class ResourceTrackerService extends AbstractService implements
InputStream drInputStream = this.rmContext.getConfigurationProvider()
.getConfigurationInputStream(conf,
YarnConfiguration.DR_CONFIGURATION_FILE);
// write lock here on drConfig is unnecessary as here get called at
// ResourceTrackerService get initiated and other read and write
// operations haven't started yet.
if (drInputStream != null) {
this.drConf = new DynamicResourceConfiguration(conf, drInputStream);
} else {
@ -192,7 +202,12 @@ public class ResourceTrackerService extends AbstractService implements
*/
public void updateDynamicResourceConfiguration(
DynamicResourceConfiguration conf) {
this.writeLock.lock();
try {
this.drConf = conf;
} finally {
this.writeLock.unlock();
}
}
@Override
@ -233,6 +248,7 @@ public class ResourceTrackerService extends AbstractService implements
if (this.server != null) {
this.server.stop();
}
super.serviceStop();
}
@ -331,16 +347,18 @@ public class ResourceTrackerService extends AbstractService implements
}
// check if node's capacity is load from dynamic-resources.xml
String[] nodes = this.drConf.getNodes();
String nid = nodeId.toString();
if (nodes != null && Arrays.asList(nodes).contains(nid)) {
capability.setMemory(this.drConf.getMemoryPerNode(nid));
capability.setVirtualCores(this.drConf.getVcoresPerNode(nid));
Resource dynamicLoadCapability = loadNodeResourceFromDRConfiguration(nid);
if (dynamicLoadCapability != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Resource for node: " + nid + " is adjusted to " +
capability + " due to settings in dynamic-resources.xml.");
LOG.debug("Resource for node: " + nid + " is adjusted from: " +
capability + " to: " + dynamicLoadCapability +
" due to settings in dynamic-resources.xml.");
}
capability = dynamicLoadCapability;
// sync back with new resource.
response.setResource(capability);
}
// Check if this node has minimum allocations
@ -536,7 +554,16 @@ public class ResourceTrackerService extends AbstractService implements
}
}
// 6. Send Container Queuing Limits back to the Node. This will be used by
// 6. check if node's capacity is load from dynamic-resources.xml
// if so, send updated resource back to NM.
String nid = nodeId.toString();
Resource capability = loadNodeResourceFromDRConfiguration(nid);
// sync back with new resource if not null.
if (capability != null) {
nodeHeartBeatResponse.setResource(capability);
}
// 7. Send Container Queuing Limits back to the Node. This will be used by
// the node to truncate the number of Containers queued for execution.
if (this.rmContext.getNodeManagerQueueLimitCalculator() != null) {
nodeHeartBeatResponse.setContainerQueuingLimit(
@ -629,6 +656,22 @@ public class ResourceTrackerService extends AbstractService implements
}
}
private Resource loadNodeResourceFromDRConfiguration(String nodeId) {
// check if node's capacity is loaded from dynamic-resources.xml
this.readLock.lock();
try {
String[] nodes = this.drConf.getNodes();
if (nodes != null && Arrays.asList(nodes).contains(nodeId)) {
return Resource.newInstance(this.drConf.getMemoryPerNode(nodeId),
this.drConf.getVcoresPerNode(nodeId));
} else {
return null;
}
} finally {
this.readLock.unlock();
}
}
/**
* resolving the network topology.
* @param hostName the hostname of this node.

View File

@ -50,8 +50,8 @@ public class MockNM {
private int responseId;
private NodeId nodeId;
private final int memory;
private final int vCores;
private int memory;
private int vCores;
private ResourceTrackerService resourceTracker;
private int httpPort = 2;
private MasterKey currentContainerTokenMasterKey;
@ -142,6 +142,11 @@ public class MockNM {
this.currentContainerTokenMasterKey =
registrationResponse.getContainerTokenMasterKey();
this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey();
Resource newResource = registrationResponse.getResource();
if (newResource != null) {
memory = newResource.getMemory();
vCores = newResource.getVirtualCores();
}
return registrationResponse;
}
@ -212,6 +217,12 @@ public class MockNM {
this.currentNMTokenMasterKey = masterKeyFromRM;
}
Resource newResource = heartbeatResponse.getResource();
if (newResource != null) {
memory = newResource.getMemory();
vCores = newResource.getVirtualCores();
}
return heartbeatResponse;
}

View File

@ -238,6 +238,105 @@ public class TestRMAdminService {
Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString());
}
@Test
public void testRefreshNodesResourceWithResourceReturnInRegistration()
throws IOException, YarnException {
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
//upload default configurations
uploadDefaultConfiguration();
MockNM nm = null;
try {
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
nm = rm.registerNode("h1:1234", 2048, 2);
} catch(Exception ex) {
fail("Should not get any exceptions");
}
NodeId nid = ConverterUtils.toNodeId("h1:1234");
RMNode ni = rm.getRMContext().getRMNodes().get(nid);
Resource resource = ni.getTotalCapability();
Assert.assertEquals("<memory:2048, vCores:2>", resource.toString());
DynamicResourceConfiguration drConf =
new DynamicResourceConfiguration();
drConf.set("yarn.resource.dynamic.nodes", "h1:1234");
drConf.set("yarn.resource.dynamic.h1:1234.vcores", "4");
drConf.set("yarn.resource.dynamic.h1:1234.memory", "4096");
uploadConfiguration(drConf, "dynamic-resources.xml");
rm.adminService.refreshNodesResources(
RefreshNodesResourcesRequest.newInstance());
try {
// register the same node again with original resource.
// validate this will get new resource back;
nm.registerNode();
} catch (Exception ex) {
fail("Should not get any exceptions");
}
RMNode niAfter = rm.getRMContext().getRMNodes().get(nid);
Resource resourceAfter = niAfter.getTotalCapability();
Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString());
Assert.assertEquals(4096, nm.getMemory());
Assert.assertEquals(4, nm.getvCores());
}
@Test
public void testRefreshNodesResourceWithResourceReturnInHeartbeat()
throws IOException, YarnException {
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
//upload default configurations
uploadDefaultConfiguration();
MockNM nm = null;
try {
rm = new MockRM(configuration);
rm.init(configuration);
rm.start();
nm = rm.registerNode("h1:1234", 2048, 2);
} catch(Exception ex) {
fail("Should not get any exceptions");
}
NodeId nid = ConverterUtils.toNodeId("h1:1234");
RMNode ni = rm.getRMContext().getRMNodes().get(nid);
Resource resource = ni.getTotalCapability();
Assert.assertEquals("<memory:2048, vCores:2>", resource.toString());
DynamicResourceConfiguration drConf =
new DynamicResourceConfiguration();
drConf.set("yarn.resource.dynamic.nodes", "h1:1234");
drConf.set("yarn.resource.dynamic.h1:1234.vcores", "4");
drConf.set("yarn.resource.dynamic.h1:1234.memory", "4096");
uploadConfiguration(drConf, "dynamic-resources.xml");
rm.adminService.refreshNodesResources(
RefreshNodesResourcesRequest.newInstance());
try {
// NM-RM heartbeat, validate that this will get new resource back.
nm.nodeHeartbeat(true);
} catch (Exception ex) {
fail("Should not get any exceptions");
}
RMNode niAfter = rm.getRMContext().getRMNodes().get(nid);
Resource resourceAfter = niAfter.getTotalCapability();
Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString());
Assert.assertEquals(4096, nm.getMemory());
Assert.assertEquals(4, nm.getvCores());
}
@Test
public void testResourcePersistentForNMRegistrationWithNewResource()
throws IOException, YarnException {