From fa3bc3405dc2f8497faab45ba5c4de2caf4c29bc Mon Sep 17 00:00:00 2001 From: Jian He Date: Tue, 17 May 2016 12:51:08 -0700 Subject: [PATCH] YARN-4832. NM side resource value should get updated if change applied in RM side. Contributed by Junping Du --- .../NodeHeartbeatResponse.java | 4 + .../RegisterNodeManagerResponse.java | 6 +- .../impl/pb/NodeHeartbeatResponsePBImpl.java | 46 ++++++++- .../pb/RegisterNodeManagerResponsePBImpl.java | 46 ++++++++- .../yarn_server_common_service_protos.proto | 2 + .../nodemanager/NodeStatusUpdaterImpl.java | 29 +++++- .../ResourceTrackerService.java | 67 ++++++++++--- .../yarn/server/resourcemanager/MockNM.java | 21 +++- .../resourcemanager/TestRMAdminService.java | 99 +++++++++++++++++++ 9 files changed, 290 insertions(+), 30 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java index bd04a0d9958..386353a1204 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java @@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -77,6 +78,9 @@ public interface NodeHeartbeatResponse { boolean getAreNodeLabelsAcceptedByRM(); void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM); + Resource getResource(); + void setResource(Resource resource); + List getContainersToDecrease(); void addAllContainersToDecrease(Collection containersToDecrease); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java index c8678f6cd33..fb1da4c798a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerResponse.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.api.protocolrecords; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.NodeAction; @@ -45,7 +46,10 @@ public interface RegisterNodeManagerResponse { void setRMVersion(String version); String getRMVersion(); - + + Resource getResource(); + void setResource(Resource resource); + boolean getAreNodeLabelsAcceptedByRM(); void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java index a25915891d8..3422697ccd9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java @@ -31,14 +31,17 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequest import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto; import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; @@ -54,17 +57,17 @@ import org.apache.hadoop.yarn.server.api.records.impl.pb.ContainerQueuingLimitPB import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; - public class NodeHeartbeatResponsePBImpl extends ProtoBase implements NodeHeartbeatResponse { NodeHeartbeatResponseProto proto = NodeHeartbeatResponseProto.getDefaultInstance(); NodeHeartbeatResponseProto.Builder builder = null; boolean viaProto = false; - + private List containersToCleanup = null; private List containersToBeRemovedFromNM = null; private List applicationsToCleanup = null; private Map systemCredentials = null; + private Resource resource = null; private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; @@ -80,7 +83,7 @@ public class NodeHeartbeatResponsePBImpl extends this.proto = proto; viaProto = true; } - + public NodeHeartbeatResponseProto getProto() { mergeLocalToProto(); proto = viaProto ? proto : builder.build(); @@ -119,6 +122,9 @@ public class NodeHeartbeatResponsePBImpl extends if (this.containersToSignal != null) { addContainersToSignalToProto(); } + if (this.resource != null) { + builder.setResource(convertToProtoFormat(this.resource)); + } } private void addSystemCredentialsToProto() { @@ -146,8 +152,8 @@ public class NodeHeartbeatResponsePBImpl extends } viaProto = false; } - - + + @Override public int getResponseId() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; @@ -160,6 +166,28 @@ public class NodeHeartbeatResponsePBImpl extends builder.setResponseId((responseId)); } + @Override + public Resource getResource() { + NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; + if (this.resource != null) { + return this.resource; + } + if (!p.hasResource()) { + return null; + } + this.resource = convertFromProtoFormat(p.getResource()); + return this.resource; + } + + @Override + public void setResource(Resource resource) { + maybeInitBuilder(); + if (resource == null) { + builder.clearResource(); + } + this.resource = resource; + } + @Override public MasterKey getContainerTokenMasterKey() { NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; @@ -565,6 +593,14 @@ public class NodeHeartbeatResponsePBImpl extends return ((ContainerIdPBImpl) t).getProto(); } + private ResourcePBImpl convertFromProtoFormat(ResourceProto p) { + return new ResourcePBImpl(p); + } + + private ResourceProto convertToProtoFormat(Resource t) { + return ((ResourcePBImpl)t).getProto(); + } + private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { return new ApplicationIdPBImpl(p); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java index 391d00dc109..56b675b7e83 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerResponsePBImpl.java @@ -19,7 +19,10 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase; +import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl; +import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProto; @@ -30,17 +33,17 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; - public class RegisterNodeManagerResponsePBImpl extends ProtoBase implements RegisterNodeManagerResponse { RegisterNodeManagerResponseProto proto = RegisterNodeManagerResponseProto.getDefaultInstance(); RegisterNodeManagerResponseProto.Builder builder = null; boolean viaProto = false; - + private Resource resource = null; + private MasterKey containerTokenMasterKey = null; private MasterKey nmTokenMasterKey = null; - + private boolean rebuild = false; - + public RegisterNodeManagerResponsePBImpl() { builder = RegisterNodeManagerResponseProto.newBuilder(); } @@ -49,7 +52,7 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase>(), isHealthy, ++responseId); @@ -211,7 +216,13 @@ public class MockNM { .getKeyId()) { this.currentNMTokenMasterKey = masterKeyFromRM; } - + + Resource newResource = heartbeatResponse.getResource(); + if (newResource != null) { + memory = newResource.getMemory(); + vCores = newResource.getVirtualCores(); + } + return heartbeatResponse; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java index 5c69411b981..b109639a55a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java @@ -238,6 +238,105 @@ public class TestRMAdminService { Assert.assertEquals("", resourceAfter.toString()); } + @Test + public void testRefreshNodesResourceWithResourceReturnInRegistration() + throws IOException, YarnException { + configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); + + //upload default configurations + uploadDefaultConfiguration(); + + MockNM nm = null; + try { + rm = new MockRM(configuration); + rm.init(configuration); + rm.start(); + nm = rm.registerNode("h1:1234", 2048, 2); + } catch(Exception ex) { + fail("Should not get any exceptions"); + } + + NodeId nid = ConverterUtils.toNodeId("h1:1234"); + RMNode ni = rm.getRMContext().getRMNodes().get(nid); + Resource resource = ni.getTotalCapability(); + Assert.assertEquals("", resource.toString()); + + DynamicResourceConfiguration drConf = + new DynamicResourceConfiguration(); + drConf.set("yarn.resource.dynamic.nodes", "h1:1234"); + drConf.set("yarn.resource.dynamic.h1:1234.vcores", "4"); + drConf.set("yarn.resource.dynamic.h1:1234.memory", "4096"); + uploadConfiguration(drConf, "dynamic-resources.xml"); + + rm.adminService.refreshNodesResources( + RefreshNodesResourcesRequest.newInstance()); + + try { + // register the same node again with original resource. + // validate this will get new resource back; + nm.registerNode(); + } catch (Exception ex) { + fail("Should not get any exceptions"); + } + + RMNode niAfter = rm.getRMContext().getRMNodes().get(nid); + Resource resourceAfter = niAfter.getTotalCapability(); + Assert.assertEquals("", resourceAfter.toString()); + + Assert.assertEquals(4096, nm.getMemory()); + Assert.assertEquals(4, nm.getvCores()); + } + + @Test + public void testRefreshNodesResourceWithResourceReturnInHeartbeat() + throws IOException, YarnException { + configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS, + "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider"); + + //upload default configurations + uploadDefaultConfiguration(); + + MockNM nm = null; + try { + rm = new MockRM(configuration); + rm.init(configuration); + rm.start(); + nm = rm.registerNode("h1:1234", 2048, 2); + } catch(Exception ex) { + fail("Should not get any exceptions"); + } + + NodeId nid = ConverterUtils.toNodeId("h1:1234"); + RMNode ni = rm.getRMContext().getRMNodes().get(nid); + Resource resource = ni.getTotalCapability(); + Assert.assertEquals("", resource.toString()); + + DynamicResourceConfiguration drConf = + new DynamicResourceConfiguration(); + drConf.set("yarn.resource.dynamic.nodes", "h1:1234"); + drConf.set("yarn.resource.dynamic.h1:1234.vcores", "4"); + drConf.set("yarn.resource.dynamic.h1:1234.memory", "4096"); + uploadConfiguration(drConf, "dynamic-resources.xml"); + + rm.adminService.refreshNodesResources( + RefreshNodesResourcesRequest.newInstance()); + + try { + // NM-RM heartbeat, validate that this will get new resource back. + nm.nodeHeartbeat(true); + } catch (Exception ex) { + fail("Should not get any exceptions"); + } + + RMNode niAfter = rm.getRMContext().getRMNodes().get(nid); + Resource resourceAfter = niAfter.getTotalCapability(); + Assert.assertEquals("", resourceAfter.toString()); + + Assert.assertEquals(4096, nm.getMemory()); + Assert.assertEquals(4, nm.getvCores()); + } + @Test public void testResourcePersistentForNMRegistrationWithNewResource() throws IOException, YarnException {