YARN-4832. NM side resource value should get updated if change applied in RM side. Contributed by Junping Du
This commit is contained in:
parent
7e36b9159e
commit
264c06a438
|
@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
|
||||
|
@ -76,6 +77,9 @@ public interface NodeHeartbeatResponse {
|
|||
boolean getAreNodeLabelsAcceptedByRM();
|
||||
void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
|
||||
|
||||
Resource getResource();
|
||||
void setResource(Resource resource);
|
||||
|
||||
List<Container> getContainersToDecrease();
|
||||
void addAllContainersToDecrease(Collection<Container> containersToDecrease);
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
|
||||
|
@ -45,7 +46,10 @@ public interface RegisterNodeManagerResponse {
|
|||
void setRMVersion(String version);
|
||||
|
||||
String getRMVersion();
|
||||
|
||||
|
||||
Resource getResource();
|
||||
void setResource(Resource resource);
|
||||
|
||||
boolean getAreNodeLabelsAcceptedByRM();
|
||||
void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
|
||||
}
|
||||
|
|
|
@ -31,14 +31,17 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequest
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
|
||||
|
@ -51,17 +54,17 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
|||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||
|
||||
|
||||
|
||||
public class NodeHeartbeatResponsePBImpl extends
|
||||
ProtoBase<NodeHeartbeatResponseProto> implements NodeHeartbeatResponse {
|
||||
NodeHeartbeatResponseProto proto = NodeHeartbeatResponseProto.getDefaultInstance();
|
||||
NodeHeartbeatResponseProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
|
||||
private List<ContainerId> containersToCleanup = null;
|
||||
private List<ContainerId> containersToBeRemovedFromNM = null;
|
||||
private List<ApplicationId> applicationsToCleanup = null;
|
||||
private Map<ApplicationId, ByteBuffer> systemCredentials = null;
|
||||
private Resource resource = null;
|
||||
|
||||
private MasterKey containerTokenMasterKey = null;
|
||||
private MasterKey nmTokenMasterKey = null;
|
||||
|
@ -76,7 +79,7 @@ public class NodeHeartbeatResponsePBImpl extends
|
|||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
|
||||
public NodeHeartbeatResponseProto getProto() {
|
||||
mergeLocalToProto();
|
||||
proto = viaProto ? proto : builder.build();
|
||||
|
@ -111,6 +114,9 @@ public class NodeHeartbeatResponsePBImpl extends
|
|||
if (this.containersToSignal != null) {
|
||||
addContainersToSignalToProto();
|
||||
}
|
||||
if (this.resource != null) {
|
||||
builder.setResource(convertToProtoFormat(this.resource));
|
||||
}
|
||||
}
|
||||
|
||||
private void addSystemCredentialsToProto() {
|
||||
|
@ -138,8 +144,8 @@ public class NodeHeartbeatResponsePBImpl extends
|
|||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public int getResponseId() {
|
||||
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
|
@ -152,6 +158,28 @@ public class NodeHeartbeatResponsePBImpl extends
|
|||
builder.setResponseId((responseId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getResource() {
|
||||
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.resource != null) {
|
||||
return this.resource;
|
||||
}
|
||||
if (!p.hasResource()) {
|
||||
return null;
|
||||
}
|
||||
this.resource = convertFromProtoFormat(p.getResource());
|
||||
return this.resource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setResource(Resource resource) {
|
||||
maybeInitBuilder();
|
||||
if (resource == null) {
|
||||
builder.clearResource();
|
||||
}
|
||||
this.resource = resource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MasterKey getContainerTokenMasterKey() {
|
||||
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
|
@ -533,6 +561,14 @@ public class NodeHeartbeatResponsePBImpl extends
|
|||
return ((ContainerIdPBImpl) t).getProto();
|
||||
}
|
||||
|
||||
private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
|
||||
return new ResourcePBImpl(p);
|
||||
}
|
||||
|
||||
private ResourceProto convertToProtoFormat(Resource t) {
|
||||
return ((ResourcePBImpl)t).getProto();
|
||||
}
|
||||
|
||||
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
|
||||
return new ApplicationIdPBImpl(p);
|
||||
}
|
||||
|
|
|
@ -19,7 +19,10 @@
|
|||
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProto;
|
||||
|
@ -30,17 +33,17 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
|||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||
|
||||
|
||||
|
||||
public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeManagerResponseProto> implements RegisterNodeManagerResponse {
|
||||
RegisterNodeManagerResponseProto proto = RegisterNodeManagerResponseProto.getDefaultInstance();
|
||||
RegisterNodeManagerResponseProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
private Resource resource = null;
|
||||
|
||||
private MasterKey containerTokenMasterKey = null;
|
||||
private MasterKey nmTokenMasterKey = null;
|
||||
|
||||
|
||||
private boolean rebuild = false;
|
||||
|
||||
|
||||
public RegisterNodeManagerResponsePBImpl() {
|
||||
builder = RegisterNodeManagerResponseProto.newBuilder();
|
||||
}
|
||||
|
@ -49,7 +52,7 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
|
|||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
|
||||
public RegisterNodeManagerResponseProto getProto() {
|
||||
if (rebuild)
|
||||
mergeLocalToProto();
|
||||
|
@ -67,6 +70,9 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
|
|||
builder.setNmTokenMasterKey(
|
||||
convertToProtoFormat(this.nmTokenMasterKey));
|
||||
}
|
||||
if (this.resource != null) {
|
||||
builder.setResource(convertToProtoFormat(this.resource));
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
|
@ -85,6 +91,28 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
|
|||
viaProto = false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Resource getResource() {
|
||||
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.resource != null) {
|
||||
return this.resource;
|
||||
}
|
||||
if (!p.hasResource()) {
|
||||
return null;
|
||||
}
|
||||
this.resource = convertFromProtoFormat(p.getResource());
|
||||
return this.resource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setResource(Resource resource) {
|
||||
maybeInitBuilder();
|
||||
if (resource == null) {
|
||||
builder.clearResource();
|
||||
}
|
||||
this.resource = resource;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MasterKey getContainerTokenMasterKey() {
|
||||
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
|
@ -217,6 +245,14 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
|
|||
return ((MasterKeyPBImpl)t).getProto();
|
||||
}
|
||||
|
||||
private ResourcePBImpl convertFromProtoFormat(ResourceProto p) {
|
||||
return new ResourcePBImpl(p);
|
||||
}
|
||||
|
||||
private ResourceProto convertToProtoFormat(Resource t) {
|
||||
return ((ResourcePBImpl)t).getProto();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getAreNodeLabelsAcceptedByRM() {
|
||||
RegisterNodeManagerResponseProtoOrBuilder p =
|
||||
|
|
|
@ -48,6 +48,7 @@ message RegisterNodeManagerResponseProto {
|
|||
optional string diagnostics_message = 5;
|
||||
optional string rm_version = 6;
|
||||
optional bool areNodeLabelsAcceptedByRM = 7 [default = false];
|
||||
optional ResourceProto resource = 8;
|
||||
}
|
||||
|
||||
message UnRegisterNodeManagerRequestProto {
|
||||
|
@ -85,6 +86,7 @@ message NodeHeartbeatResponseProto {
|
|||
optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
|
||||
repeated ContainerProto containers_to_decrease = 12;
|
||||
repeated SignalContainerRequestProto containers_to_signal = 13;
|
||||
optional ResourceProto resource = 14;
|
||||
}
|
||||
|
||||
message SystemCredentialsForAppsProto {
|
||||
|
|
|
@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.Contai
|
|||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -392,8 +393,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
|
||||
StringBuilder successfullRegistrationMsg = new StringBuilder();
|
||||
successfullRegistrationMsg.append("Registered with ResourceManager as ")
|
||||
.append(this.nodeId).append(" with total resource of ")
|
||||
.append(this.totalResource);
|
||||
.append(this.nodeId);
|
||||
|
||||
Resource newResource = regNMResponse.getResource();
|
||||
if (newResource != null) {
|
||||
updateNMResource(newResource);
|
||||
successfullRegistrationMsg.append(" with updated total resource of ")
|
||||
.append(this.totalResource);
|
||||
} else {
|
||||
successfullRegistrationMsg.append(" with total resource of ")
|
||||
.append(this.totalResource);
|
||||
}
|
||||
|
||||
successfullRegistrationMsg.append(nodeLabelsHandler
|
||||
.verifyRMRegistrationResponseForNodeLabels(regNMResponse));
|
||||
|
@ -489,6 +499,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
return increasedContainers;
|
||||
}
|
||||
|
||||
// Update NM's Resource.
|
||||
private void updateNMResource(Resource resource) {
|
||||
metrics.addResource(Resources.subtract(resource, totalResource));
|
||||
this.totalResource = resource;
|
||||
}
|
||||
|
||||
// Iterate through the NMContext and clone and get all the containers'
|
||||
// statuses. If it's a completed container, add into the
|
||||
// recentlyStoppedContainers collections.
|
||||
|
@ -829,6 +845,15 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
dispatcher.getEventHandler().handle(
|
||||
new CMgrSignalContainersEvent(containersToSignal));
|
||||
}
|
||||
// Handling node resource update case.
|
||||
Resource newResource = response.getResource();
|
||||
if (newResource != null) {
|
||||
updateNMResource(newResource);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Node's resource is updated to " +
|
||||
newResource.toString());
|
||||
}
|
||||
}
|
||||
} catch (ConnectException e) {
|
||||
//catch and throw the exception if tried MAX wait time to connect RM
|
||||
dispatcher.getEventHandler().handle(
|
||||
|
|
|
@ -26,6 +26,9 @@ import java.util.HashMap;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -97,6 +100,9 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
private final RMContainerTokenSecretManager containerTokenSecretManager;
|
||||
private final NMTokenSecretManagerInRM nmTokenSecretManager;
|
||||
|
||||
private final ReadLock readLock;
|
||||
private final WriteLock writeLock;
|
||||
|
||||
private long nextHeartBeatInterval;
|
||||
private Server server;
|
||||
private InetSocketAddress resourceTrackerAddress;
|
||||
|
@ -107,7 +113,7 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
|
||||
private boolean isDistributedNodeLabelsConf;
|
||||
private boolean isDelegatedCentralizedNodeLabelsConf;
|
||||
private volatile DynamicResourceConfiguration drConf;
|
||||
private DynamicResourceConfiguration drConf;
|
||||
|
||||
public ResourceTrackerService(RMContext rmContext,
|
||||
NodesListManager nodesListManager,
|
||||
|
@ -120,7 +126,9 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
this.nmLivelinessMonitor = nmLivelinessMonitor;
|
||||
this.containerTokenSecretManager = containerTokenSecretManager;
|
||||
this.nmTokenSecretManager = nmTokenSecretManager;
|
||||
|
||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
this.readLock = lock.readLock();
|
||||
this.writeLock = lock.writeLock();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -160,7 +168,6 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
}
|
||||
|
||||
loadDynamicResourceConfiguration(conf);
|
||||
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
|
@ -176,6 +183,9 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
InputStream drInputStream = this.rmContext.getConfigurationProvider()
|
||||
.getConfigurationInputStream(conf,
|
||||
YarnConfiguration.DR_CONFIGURATION_FILE);
|
||||
// write lock here on drConfig is unnecessary as here get called at
|
||||
// ResourceTrackerService get initiated and other read and write
|
||||
// operations haven't started yet.
|
||||
if (drInputStream != null) {
|
||||
this.drConf = new DynamicResourceConfiguration(conf, drInputStream);
|
||||
} else {
|
||||
|
@ -192,7 +202,12 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
*/
|
||||
public void updateDynamicResourceConfiguration(
|
||||
DynamicResourceConfiguration conf) {
|
||||
this.drConf = conf;
|
||||
this.writeLock.lock();
|
||||
try {
|
||||
this.drConf = conf;
|
||||
} finally {
|
||||
this.writeLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -233,6 +248,7 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
if (this.server != null) {
|
||||
this.server.stop();
|
||||
}
|
||||
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
|
@ -331,16 +347,18 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
}
|
||||
|
||||
// check if node's capacity is load from dynamic-resources.xml
|
||||
String[] nodes = this.drConf.getNodes();
|
||||
String nid = nodeId.toString();
|
||||
|
||||
if (nodes != null && Arrays.asList(nodes).contains(nid)) {
|
||||
capability.setMemory(this.drConf.getMemoryPerNode(nid));
|
||||
capability.setVirtualCores(this.drConf.getVcoresPerNode(nid));
|
||||
Resource dynamicLoadCapability = loadNodeResourceFromDRConfiguration(nid);
|
||||
if (dynamicLoadCapability != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Resource for node: " + nid + " is adjusted to " +
|
||||
capability + " due to settings in dynamic-resources.xml.");
|
||||
LOG.debug("Resource for node: " + nid + " is adjusted from: " +
|
||||
capability + " to: " + dynamicLoadCapability +
|
||||
" due to settings in dynamic-resources.xml.");
|
||||
}
|
||||
capability = dynamicLoadCapability;
|
||||
// sync back with new resource.
|
||||
response.setResource(capability);
|
||||
}
|
||||
|
||||
// Check if this node has minimum allocations
|
||||
|
@ -536,6 +554,15 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
}
|
||||
}
|
||||
|
||||
// 6. check if node's capacity is load from dynamic-resources.xml
|
||||
// if so, send updated resource back to NM.
|
||||
String nid = nodeId.toString();
|
||||
Resource capability = loadNodeResourceFromDRConfiguration(nid);
|
||||
// sync back with new resource if not null.
|
||||
if (capability != null) {
|
||||
nodeHeartBeatResponse.setResource(capability);
|
||||
}
|
||||
|
||||
return nodeHeartBeatResponse;
|
||||
}
|
||||
|
||||
|
@ -622,6 +649,22 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
}
|
||||
}
|
||||
|
||||
private Resource loadNodeResourceFromDRConfiguration(String nodeId) {
|
||||
// check if node's capacity is loaded from dynamic-resources.xml
|
||||
this.readLock.lock();
|
||||
try {
|
||||
String[] nodes = this.drConf.getNodes();
|
||||
if (nodes != null && Arrays.asList(nodes).contains(nodeId)) {
|
||||
return Resource.newInstance(this.drConf.getMemoryPerNode(nodeId),
|
||||
this.drConf.getVcoresPerNode(nodeId));
|
||||
} else {
|
||||
return null;
|
||||
}
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* resolving the network topology.
|
||||
* @param hostName the hostname of this node.
|
||||
|
|
|
@ -50,8 +50,8 @@ public class MockNM {
|
|||
|
||||
private int responseId;
|
||||
private NodeId nodeId;
|
||||
private final int memory;
|
||||
private final int vCores;
|
||||
private int memory;
|
||||
private int vCores;
|
||||
private ResourceTrackerService resourceTracker;
|
||||
private int httpPort = 2;
|
||||
private MasterKey currentContainerTokenMasterKey;
|
||||
|
@ -142,9 +142,14 @@ public class MockNM {
|
|||
this.currentContainerTokenMasterKey =
|
||||
registrationResponse.getContainerTokenMasterKey();
|
||||
this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey();
|
||||
return registrationResponse;
|
||||
Resource newResource = registrationResponse.getResource();
|
||||
if (newResource != null) {
|
||||
memory = newResource.getMemory();
|
||||
vCores = newResource.getVirtualCores();
|
||||
}
|
||||
return registrationResponse;
|
||||
}
|
||||
|
||||
|
||||
public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
|
||||
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(),
|
||||
isHealthy, ++responseId);
|
||||
|
@ -211,7 +216,13 @@ public class MockNM {
|
|||
.getKeyId()) {
|
||||
this.currentNMTokenMasterKey = masterKeyFromRM;
|
||||
}
|
||||
|
||||
|
||||
Resource newResource = heartbeatResponse.getResource();
|
||||
if (newResource != null) {
|
||||
memory = newResource.getMemory();
|
||||
vCores = newResource.getVirtualCores();
|
||||
}
|
||||
|
||||
return heartbeatResponse;
|
||||
}
|
||||
|
||||
|
|
|
@ -238,6 +238,105 @@ public class TestRMAdminService {
|
|||
Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshNodesResourceWithResourceReturnInRegistration()
|
||||
throws IOException, YarnException {
|
||||
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
|
||||
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
|
||||
|
||||
//upload default configurations
|
||||
uploadDefaultConfiguration();
|
||||
|
||||
MockNM nm = null;
|
||||
try {
|
||||
rm = new MockRM(configuration);
|
||||
rm.init(configuration);
|
||||
rm.start();
|
||||
nm = rm.registerNode("h1:1234", 2048, 2);
|
||||
} catch(Exception ex) {
|
||||
fail("Should not get any exceptions");
|
||||
}
|
||||
|
||||
NodeId nid = ConverterUtils.toNodeId("h1:1234");
|
||||
RMNode ni = rm.getRMContext().getRMNodes().get(nid);
|
||||
Resource resource = ni.getTotalCapability();
|
||||
Assert.assertEquals("<memory:2048, vCores:2>", resource.toString());
|
||||
|
||||
DynamicResourceConfiguration drConf =
|
||||
new DynamicResourceConfiguration();
|
||||
drConf.set("yarn.resource.dynamic.nodes", "h1:1234");
|
||||
drConf.set("yarn.resource.dynamic.h1:1234.vcores", "4");
|
||||
drConf.set("yarn.resource.dynamic.h1:1234.memory", "4096");
|
||||
uploadConfiguration(drConf, "dynamic-resources.xml");
|
||||
|
||||
rm.adminService.refreshNodesResources(
|
||||
RefreshNodesResourcesRequest.newInstance());
|
||||
|
||||
try {
|
||||
// register the same node again with original resource.
|
||||
// validate this will get new resource back;
|
||||
nm.registerNode();
|
||||
} catch (Exception ex) {
|
||||
fail("Should not get any exceptions");
|
||||
}
|
||||
|
||||
RMNode niAfter = rm.getRMContext().getRMNodes().get(nid);
|
||||
Resource resourceAfter = niAfter.getTotalCapability();
|
||||
Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString());
|
||||
|
||||
Assert.assertEquals(4096, nm.getMemory());
|
||||
Assert.assertEquals(4, nm.getvCores());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshNodesResourceWithResourceReturnInHeartbeat()
|
||||
throws IOException, YarnException {
|
||||
configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
|
||||
"org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
|
||||
|
||||
//upload default configurations
|
||||
uploadDefaultConfiguration();
|
||||
|
||||
MockNM nm = null;
|
||||
try {
|
||||
rm = new MockRM(configuration);
|
||||
rm.init(configuration);
|
||||
rm.start();
|
||||
nm = rm.registerNode("h1:1234", 2048, 2);
|
||||
} catch(Exception ex) {
|
||||
fail("Should not get any exceptions");
|
||||
}
|
||||
|
||||
NodeId nid = ConverterUtils.toNodeId("h1:1234");
|
||||
RMNode ni = rm.getRMContext().getRMNodes().get(nid);
|
||||
Resource resource = ni.getTotalCapability();
|
||||
Assert.assertEquals("<memory:2048, vCores:2>", resource.toString());
|
||||
|
||||
DynamicResourceConfiguration drConf =
|
||||
new DynamicResourceConfiguration();
|
||||
drConf.set("yarn.resource.dynamic.nodes", "h1:1234");
|
||||
drConf.set("yarn.resource.dynamic.h1:1234.vcores", "4");
|
||||
drConf.set("yarn.resource.dynamic.h1:1234.memory", "4096");
|
||||
uploadConfiguration(drConf, "dynamic-resources.xml");
|
||||
|
||||
rm.adminService.refreshNodesResources(
|
||||
RefreshNodesResourcesRequest.newInstance());
|
||||
|
||||
try {
|
||||
// NM-RM heartbeat, validate that this will get new resource back.
|
||||
nm.nodeHeartbeat(true);
|
||||
} catch (Exception ex) {
|
||||
fail("Should not get any exceptions");
|
||||
}
|
||||
|
||||
RMNode niAfter = rm.getRMContext().getRMNodes().get(nid);
|
||||
Resource resourceAfter = niAfter.getTotalCapability();
|
||||
Assert.assertEquals("<memory:4096, vCores:4>", resourceAfter.toString());
|
||||
|
||||
Assert.assertEquals(4096, nm.getMemory());
|
||||
Assert.assertEquals(4, nm.getvCores());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testResourcePersistentForNMRegistrationWithNewResource()
|
||||
throws IOException, YarnException {
|
||||
|
|
Loading…
Reference in New Issue