YARN-2495. Allow admin specify labels from each NM (Distributed configuration for node label). (Naganarasimha G R via wangda)

This commit is contained in:
Wangda Tan 2015-03-30 12:04:51 -07:00
parent b80457158d
commit 2a945d24f7
21 changed files with 1199 additions and 41 deletions

View File

@ -83,6 +83,9 @@ Release 2.8.0 - UNRELEASED
YARN-3288. Document and fix indentation in the DockerContainerExecutor code
YARN-2495. Allow admin specify labels from each NM (Distributed
configuration for node label). (Naganarasimha G R via wangda)
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -1719,6 +1719,18 @@ public class YarnConfiguration extends Configuration {
public static final String NODE_LABELS_ENABLED = NODE_LABELS_PREFIX
+ "enabled";
public static final boolean DEFAULT_NODE_LABELS_ENABLED = false;
public static final String NODELABEL_CONFIGURATION_TYPE =
NODE_LABELS_PREFIX + "configuration-type";
public static final String CENTALIZED_NODELABEL_CONFIGURATION_TYPE =
"centralized";
public static final String DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE =
"distributed";
public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE =
CENTALIZED_NODELABEL_CONFIGURATION_TYPE;
public YarnConfiguration() {
super();

View File

@ -239,6 +239,10 @@ message NodeIdToLabelsProto {
repeated string nodeLabels = 2;
}
message StringArrayProto {
repeated string elements = 1;
}
message LabelsToNodeIdsProto {
optional string nodeLabels = 1;
repeated NodeIdProto nodeId = 2;

View File

@ -70,7 +70,7 @@ public class TestResourceTrackerOnHA extends ProtocolHATestBase{
NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null,
null, null);
NodeHeartbeatRequest request2 =
NodeHeartbeatRequest.newInstance(status, null, null);
NodeHeartbeatRequest.newInstance(status, null, null,null);
resourceTracker.nodeHeartbeat(request2);
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
import java.util.Set;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.util.Records;
@ -26,7 +28,7 @@ public abstract class NodeHeartbeatRequest {
public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
MasterKey lastKnownContainerTokenMasterKey,
MasterKey lastKnownNMTokenMasterKey) {
MasterKey lastKnownNMTokenMasterKey, Set<String> nodeLabels) {
NodeHeartbeatRequest nodeHeartbeatRequest =
Records.newRecord(NodeHeartbeatRequest.class);
nodeHeartbeatRequest.setNodeStatus(nodeStatus);
@ -34,6 +36,7 @@ public abstract class NodeHeartbeatRequest {
.setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey);
nodeHeartbeatRequest
.setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
nodeHeartbeatRequest.setNodeLabels(nodeLabels);
return nodeHeartbeatRequest;
}
@ -45,4 +48,7 @@ public abstract class NodeHeartbeatRequest {
public abstract MasterKey getLastKnownNMTokenMasterKey();
public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey);
public abstract Set<String> getNodeLabels();
public abstract void setNodeLabels(Set<String> nodeLabels);
}

View File

@ -67,4 +67,7 @@ public interface NodeHeartbeatResponse {
void setSystemCredentialsForApps(
Map<ApplicationId, ByteBuffer> systemCredentials);
boolean getAreNodeLabelsAcceptedByRM();
void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -31,6 +32,14 @@ public abstract class RegisterNodeManagerRequest {
int httpPort, Resource resource, String nodeManagerVersionId,
List<NMContainerStatus> containerStatuses,
List<ApplicationId> runningApplications) {
return newInstance(nodeId, httpPort, resource, nodeManagerVersionId,
containerStatuses, runningApplications, null);
}
public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
int httpPort, Resource resource, String nodeManagerVersionId,
List<NMContainerStatus> containerStatuses,
List<ApplicationId> runningApplications, Set<String> nodeLabels) {
RegisterNodeManagerRequest request =
Records.newRecord(RegisterNodeManagerRequest.class);
request.setHttpPort(httpPort);
@ -39,6 +48,7 @@ public abstract class RegisterNodeManagerRequest {
request.setNMVersion(nodeManagerVersionId);
request.setContainerStatuses(containerStatuses);
request.setRunningApplications(runningApplications);
request.setNodeLabels(nodeLabels);
return request;
}
@ -47,6 +57,8 @@ public abstract class RegisterNodeManagerRequest {
public abstract Resource getResource();
public abstract String getNMVersion();
public abstract List<NMContainerStatus> getNMContainerStatuses();
public abstract Set<String> getNodeLabels();
public abstract void setNodeLabels(Set<String> nodeLabels);
/**
* We introduce this here because currently YARN RM doesn't persist nodes info

View File

@ -45,4 +45,7 @@ public interface RegisterNodeManagerResponse {
void setRMVersion(String version);
String getRMVersion();
boolean getAreNodeLabelsAcceptedByRM();
void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
}

View File

@ -18,6 +18,11 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
@ -36,6 +41,7 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private NodeStatus nodeStatus = null;
private MasterKey lastKnownContainerTokenMasterKey = null;
private MasterKey lastKnownNMTokenMasterKey = null;
private Set<String> labels = null;
public NodeHeartbeatRequestPBImpl() {
builder = NodeHeartbeatRequestProto.newBuilder();
@ -80,6 +86,11 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
builder.setLastKnownNmTokenMasterKey(
convertToProtoFormat(this.lastKnownNMTokenMasterKey));
}
if (this.labels != null) {
builder.clearNodeLabels();
builder.setNodeLabels(StringArrayProto.newBuilder()
.addAllElements(this.labels).build());
}
}
private void mergeLocalToProto() {
@ -178,4 +189,30 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
private MasterKeyProto convertToProtoFormat(MasterKey t) {
return ((MasterKeyPBImpl)t).getProto();
}
@Override
public Set<String> getNodeLabels() {
initNodeLabels();
return this.labels;
}
@Override
public void setNodeLabels(Set<String> nodeLabels) {
maybeInitBuilder();
builder.clearNodeLabels();
this.labels = nodeLabels;
}
private void initNodeLabels() {
if (this.labels != null) {
return;
}
NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasNodeLabels()) {
labels = null;
return;
}
StringArrayProto nodeLabels = p.getNodeLabels();
labels = new HashSet<String>(nodeLabels.getElementsList());
}
}

View File

@ -483,5 +483,18 @@ public class NodeHeartbeatResponsePBImpl extends
private MasterKeyProto convertToProtoFormat(MasterKey t) {
return ((MasterKeyPBImpl) t).getProto();
}
@Override
public boolean getAreNodeLabelsAcceptedByRM() {
NodeHeartbeatResponseProtoOrBuilder p =
this.viaProto ? this.proto : this.builder;
return p.getAreNodeLabelsAcceptedByRM();
}
@Override
public void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM) {
maybeInitBuilder();
this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM);
}
}

View File

@ -20,32 +20,27 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest {
RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
@ -56,7 +51,8 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
private NodeId nodeId = null;
private List<NMContainerStatus> containerStatuses = null;
private List<ApplicationId> runningApplications = null;
private Set<String> labels = null;
public RegisterNodeManagerRequestPBImpl() {
builder = RegisterNodeManagerRequestProto.newBuilder();
}
@ -86,7 +82,11 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
if (this.nodeId != null) {
builder.setNodeId(convertToProtoFormat(this.nodeId));
}
if (this.labels != null) {
builder.clearNodeLabels();
builder.setNodeLabels(StringArrayProto.newBuilder()
.addAllElements(this.labels).build());
}
}
private synchronized void addNMContainerStatusesToProto() {
@ -292,6 +292,32 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
builder.setNmVersion(version);
}
@Override
public Set<String> getNodeLabels() {
initNodeLabels();
return this.labels;
}
@Override
public void setNodeLabels(Set<String> nodeLabels) {
maybeInitBuilder();
builder.clearNodeLabels();
this.labels = nodeLabels;
}
private void initNodeLabels() {
if (this.labels != null) {
return;
}
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasNodeLabels()) {
labels=null;
return;
}
StringArrayProto nodeLabels = p.getNodeLabels();
labels = new HashSet<String>(nodeLabels.getElementsList());
}
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
}

View File

@ -216,4 +216,17 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
private MasterKeyProto convertToProtoFormat(MasterKey t) {
return ((MasterKeyPBImpl)t).getProto();
}
@Override
public boolean getAreNodeLabelsAcceptedByRM() {
RegisterNodeManagerResponseProtoOrBuilder p =
this.viaProto ? this.proto : this.builder;
return p.getAreNodeLabelsAcceptedByRM();
}
@Override
public void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM) {
maybeInitBuilder();
this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM);
}
}

View File

@ -32,6 +32,7 @@ message RegisterNodeManagerRequestProto {
optional string nm_version = 5;
repeated NMContainerStatusProto container_statuses = 6;
repeated ApplicationIdProto runningApplications = 7;
optional StringArrayProto nodeLabels = 8;
}
message RegisterNodeManagerResponseProto {
@ -41,12 +42,14 @@ message RegisterNodeManagerResponseProto {
optional int64 rm_identifier = 4;
optional string diagnostics_message = 5;
optional string rm_version = 6;
optional bool areNodeLabelsAcceptedByRM = 7 [default = false];
}
message NodeHeartbeatRequestProto {
optional NodeStatusProto node_status = 1;
optional MasterKeyProto last_known_container_token_master_key = 2;
optional MasterKeyProto last_known_nm_token_master_key = 3;
optional StringArrayProto nodeLabels = 4;
}
message NodeHeartbeatResponseProto {
@ -60,6 +63,7 @@ message NodeHeartbeatResponseProto {
optional string diagnostics_message = 8;
repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
}
message SystemCredentialsForAppsProto {

View File

@ -19,11 +19,13 @@
package org.apache.hadoop.yarn;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -36,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
@ -46,6 +49,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl;
import org.junit.Assert;
import org.junit.Test;
/**
@ -77,7 +81,17 @@ public class TestYarnServerApiClasses {
assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
assertEquals(NodeAction.NORMAL, copy.getNodeAction());
assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
assertFalse(copy.getAreNodeLabelsAcceptedByRM());
}
@Test
public void testRegisterNodeManagerResponsePBImplWithRMAcceptLbls() {
RegisterNodeManagerResponsePBImpl original =
new RegisterNodeManagerResponsePBImpl();
original.setAreNodeLabelsAcceptedByRM(true);
RegisterNodeManagerResponsePBImpl copy =
new RegisterNodeManagerResponsePBImpl(original.getProto());
assertTrue(copy.getAreNodeLabelsAcceptedByRM());
}
/**
@ -89,11 +103,32 @@ public class TestYarnServerApiClasses {
original.setLastKnownContainerTokenMasterKey(getMasterKey());
original.setLastKnownNMTokenMasterKey(getMasterKey());
original.setNodeStatus(getNodeStatus());
original.setNodeLabels(getValidNodeLabels());
NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl(
original.getProto());
assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId());
assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId());
assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost());
// check labels are coming with valid values
Assert.assertTrue(original.getNodeLabels()
.containsAll(copy.getNodeLabels()));
// check for empty labels
original.setNodeLabels(new HashSet<String> ());
copy = new NodeHeartbeatRequestPBImpl(
original.getProto());
Assert.assertNotNull(copy.getNodeLabels());
Assert.assertEquals(0, copy.getNodeLabels().size());
}
/**
* Test NodeHeartbeatRequestPBImpl.
*/
@Test
public void testNodeHeartbeatRequestPBImplWithNullLabels() {
NodeHeartbeatRequestPBImpl original = new NodeHeartbeatRequestPBImpl();
NodeHeartbeatRequestPBImpl copy =
new NodeHeartbeatRequestPBImpl(original.getProto());
Assert.assertNull(copy.getNodeLabels());
}
/**
@ -119,6 +154,16 @@ public class TestYarnServerApiClasses {
assertEquals(1, copy.getContainerTokenMasterKey().getKeyId());
assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
assertEquals(false, copy.getAreNodeLabelsAcceptedByRM());
}
@Test
public void testNodeHeartbeatResponsePBImplWithRMAcceptLbls() {
NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl();
original.setAreNodeLabelsAcceptedByRM(true);
NodeHeartbeatResponsePBImpl copy =
new NodeHeartbeatResponsePBImpl(original.getProto());
assertTrue(copy.getAreNodeLabelsAcceptedByRM());
}
/**
@ -208,6 +253,55 @@ public class TestYarnServerApiClasses {
}
@Test
public void testRegisterNodeManagerRequestWithNullLabels() {
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(
NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0),
"version", null, null);
// serialze to proto, and get request from proto
RegisterNodeManagerRequest request1 =
new RegisterNodeManagerRequestPBImpl(
((RegisterNodeManagerRequestPBImpl) request).getProto());
// check labels are coming with no values
Assert.assertNull(request1.getNodeLabels());
}
@Test
public void testRegisterNodeManagerRequestWithValidLabels() {
HashSet<String> nodeLabels = getValidNodeLabels();
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(
NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0),
"version", null, null, nodeLabels);
// serialze to proto, and get request from proto
RegisterNodeManagerRequest copy =
new RegisterNodeManagerRequestPBImpl(
((RegisterNodeManagerRequestPBImpl) request).getProto());
// check labels are coming with valid values
Assert.assertEquals(true, nodeLabels.containsAll(copy.getNodeLabels()));
// check for empty labels
request.setNodeLabels(new HashSet<String> ());
copy = new RegisterNodeManagerRequestPBImpl(
((RegisterNodeManagerRequestPBImpl) request).getProto());
Assert.assertNotNull(copy.getNodeLabels());
Assert.assertEquals(0, copy.getNodeLabels().size());
}
private HashSet<String> getValidNodeLabels() {
HashSet<String> nodeLabels = new HashSet<String>();
nodeLabels.add("java");
nodeLabels.add("windows");
nodeLabels.add("gpu");
nodeLabels.add("x86");
return nodeLabels;
}
private ContainerStatus getContainerStatus(int applicationId,
int containerID, int appAttemptId) {
ContainerStatus status = recordFactory

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@ -80,6 +81,7 @@ public class NodeManager extends CompositeService
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
private ApplicationACLsManager aclsManager;
private NodeHealthCheckerService nodeHealthChecker;
private NodeLabelsProvider nodeLabelsProvider;
private LocalDirsHandlerService dirsHandler;
private Context context;
private AsyncDispatcher dispatcher;
@ -98,7 +100,22 @@ public class NodeManager extends CompositeService
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
metrics);
metrics, nodeLabelsProvider);
}
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
NodeLabelsProvider nodeLabelsProvider) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
metrics, nodeLabelsProvider);
}
@VisibleForTesting
protected NodeLabelsProvider createNodeLabelsProvider(
Configuration conf) throws IOException {
// TODO as part of YARN-2729
// Need to get the implementation of provider service and return
return null;
}
protected NodeResourceMonitor createNodeResourceMonitor() {
@ -245,9 +262,18 @@ public class NodeManager extends CompositeService
this.context = createNMContext(containerTokenSecretManager,
nmTokenSecretManager, nmStore);
nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
nodeLabelsProvider = createNodeLabelsProvider(conf);
if (null == nodeLabelsProvider) {
nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
} else {
addService(nodeLabelsProvider);
nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker,
nodeLabelsProvider);
}
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
addService(nodeResourceMonitor);

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
@ -70,6 +72,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.apache.hadoop.yarn.util.YarnVersionInfo;
import com.google.common.annotations.VisibleForTesting;
@ -120,15 +123,25 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER;
Set<ContainerId> pendingContainersToRemove = new HashSet<ContainerId>();
private final NodeLabelsProvider nodeLabelsProvider;
private final boolean hasNodeLabelsProvider;
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
this(context, dispatcher, healthChecker, metrics, null);
}
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
NodeLabelsProvider nodeLabelsProvider) {
super(NodeStatusUpdaterImpl.class.getName());
this.healthChecker = healthChecker;
this.nodeLabelsProvider = nodeLabelsProvider;
this.hasNodeLabelsProvider = (nodeLabelsProvider != null);
this.context = context;
this.dispatcher = dispatcher;
this.metrics = metrics;
this.recentlyStoppedContainers =
new LinkedHashMap<ContainerId, Long>();
this.recentlyStoppedContainers = new LinkedHashMap<ContainerId, Long>();
this.pendingCompletedContainers =
new HashMap<ContainerId, ContainerStatus>();
}
@ -253,22 +266,30 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
protected void registerWithRM()
throws YarnException, IOException {
List<NMContainerStatus> containerReports = getNMContainerStatuses();
Set<String> nodeLabels = null;
if (hasNodeLabelsProvider) {
nodeLabels = nodeLabelsProvider.getNodeLabels();
nodeLabels =
(null == nodeLabels) ? CommonNodeLabelsManager.EMPTY_STRING_SET
: nodeLabels;
}
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
nodeManagerVersionId, containerReports, getRunningApplications());
nodeManagerVersionId, containerReports, getRunningApplications(),
nodeLabels);
if (containerReports != null) {
LOG.info("Registering with RM using containers :" + containerReports);
}
RegisterNodeManagerResponse regNMResponse =
resourceTracker.registerNodeManager(request);
this.rmIdentifier = regNMResponse.getRMIdentifier();
// if the Resourcemanager instructs NM to shutdown.
// if the Resource Manager instructs NM to shutdown.
if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
String message =
"Message from ResourceManager: "
+ regNMResponse.getDiagnosticsMessage();
throw new YarnRuntimeException(
"Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed, "
"Recieved SHUTDOWN signal from Resourcemanager, Registration of NodeManager failed, "
+ message);
}
@ -306,8 +327,21 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
this.context.getNMTokenSecretManager().setMasterKey(masterKey);
}
LOG.info("Registered with ResourceManager as " + this.nodeId
+ " with total resource of " + this.totalResource);
StringBuilder successfullRegistrationMsg = new StringBuilder();
successfullRegistrationMsg.append("Registered with ResourceManager as ")
.append(this.nodeId).append(" with total resource of ")
.append(this.totalResource);
if (regNMResponse.getAreNodeLabelsAcceptedByRM()) {
successfullRegistrationMsg
.append(" and with following Node label(s) : {")
.append(StringUtils.join(",", nodeLabels)).append("}");
} else if (hasNodeLabelsProvider) {
//case where provider is set but RM did not accept the Node Labels
LOG.error(regNMResponse.getDiagnosticsMessage());
}
LOG.info(successfullRegistrationMsg);
LOG.info("Notifying ContainerManager to unblock new container-requests");
((ContainerManagerImpl) this.context.getContainerManager())
.setBlockNewContainerRequests(false);
@ -580,19 +614,41 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
@Override
@SuppressWarnings("unchecked")
public void run() {
int lastHeartBeatID = 0;
int lastHeartbeatID = 0;
Set<String> lastUpdatedNodeLabelsToRM = null;
if (hasNodeLabelsProvider) {
lastUpdatedNodeLabelsToRM = nodeLabelsProvider.getNodeLabels();
lastUpdatedNodeLabelsToRM =
(null == lastUpdatedNodeLabelsToRM) ? CommonNodeLabelsManager.EMPTY_STRING_SET
: lastUpdatedNodeLabelsToRM;
}
while (!isStopped) {
// Send heartbeat
try {
NodeHeartbeatResponse response = null;
NodeStatus nodeStatus = getNodeStatus(lastHeartBeatID);
Set<String> nodeLabelsForHeartbeat = null;
NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
if (hasNodeLabelsProvider) {
nodeLabelsForHeartbeat = nodeLabelsProvider.getNodeLabels();
//if the provider returns null then consider empty labels are set
nodeLabelsForHeartbeat =
(nodeLabelsForHeartbeat == null) ? CommonNodeLabelsManager.EMPTY_STRING_SET
: nodeLabelsForHeartbeat;
if (!areNodeLabelsUpdated(nodeLabelsForHeartbeat,
lastUpdatedNodeLabelsToRM)) {
//if nodelabels have not changed then no need to send
nodeLabelsForHeartbeat = null;
}
}
NodeHeartbeatRequest request =
NodeHeartbeatRequest.newInstance(nodeStatus,
NodeStatusUpdaterImpl.this.context
.getContainerTokenSecretManager().getCurrentKey(),
NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager()
.getCurrentKey());
NodeStatusUpdaterImpl.this.context
.getContainerTokenSecretManager().getCurrentKey(),
NodeStatusUpdaterImpl.this.context
.getNMTokenSecretManager().getCurrentKey(),
nodeLabelsForHeartbeat);
response = resourceTracker.nodeHeartbeat(request);
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
@ -623,6 +679,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
break;
}
if (response.getAreNodeLabelsAcceptedByRM()) {
lastUpdatedNodeLabelsToRM = nodeLabelsForHeartbeat;
LOG.info("Node Labels {"
+ StringUtils.join(",", nodeLabelsForHeartbeat)
+ "} were Accepted by RM ");
} else if (nodeLabelsForHeartbeat != null) {
// case where NodeLabelsProvider is set and updated labels were
// sent to RM and RM rejected the labels
LOG.error(response.getDiagnosticsMessage());
}
// Explicitly put this method after checking the resync response. We
// don't want to remove the completed containers before resync
// because these completed containers will be reported back to RM
@ -631,7 +698,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
removeOrTrackCompletedContainersFromContext(response
.getContainersToBeRemovedFromNM());
lastHeartBeatID = response.getResponseId();
lastHeartbeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response
.getContainersToCleanup();
if (!containersToCleanup.isEmpty()) {
@ -680,6 +747,23 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
}
}
/**
* Caller should take care of sending non null nodelabels for both
* arguments
*
* @param nodeLabelsNew
* @param nodeLabelsOld
* @return if the New node labels are diff from the older one.
*/
private boolean areNodeLabelsUpdated(Set<String> nodeLabelsNew,
Set<String> nodeLabelsOld) {
if (nodeLabelsNew.size() != nodeLabelsOld.size()
|| !nodeLabelsOld.containsAll(nodeLabelsNew)) {
return true;
}
return false;
}
private void updateMasterKeys(NodeHeartbeatResponse response) {
// See if the master-key has rolled over
MasterKey updatedMasterKey = response.getContainerTokenMasterKey();

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
import java.util.Set;
import org.apache.hadoop.service.AbstractService;
/**
* Interface which will be responsible for fetching the labels
*
*/
public abstract class NodeLabelsProvider extends AbstractService {
public NodeLabelsProvider(String name) {
super(name);
}
/**
* Provides the labels. LabelProvider is expected to give same Labels
* continuously until there is a change in labels.
* If null is returned then Empty label set is assumed by the caller.
*
* @return Set of node label strings applicable for a node
*/
public abstract Set<String> getNodeLabels();
}

View File

@ -1182,7 +1182,7 @@ public class TestNodeStatusUpdater {
}
};
verifyNodeStartFailure(
"Recieved SHUTDOWN signal from Resourcemanager ,"
"Recieved SHUTDOWN signal from Resourcemanager, "
+ "Registration of NodeManager failed, "
+ "Message from ResourceManager: RM Shutting Down Node");
}

View File

@ -0,0 +1,281 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private NodeManager nm;
protected DummyNodeLabelsProvider dummyLabelsProviderRef;
@Before
public void setup() {
dummyLabelsProviderRef = new DummyNodeLabelsProvider();
}
@After
public void tearDown() {
if (null != nm) {
ServiceOperations.stop(nm);
}
}
private class ResourceTrackerForLabels implements ResourceTracker {
int heartbeatID = 0;
Set<String> labels;
private boolean receivedNMHeartbeat = false;
private boolean receivedNMRegister = false;
private MasterKey createMasterKey() {
MasterKey masterKey = new MasterKeyPBImpl();
masterKey.setKeyId(123);
masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
.byteValue() }));
return masterKey;
}
@Override
public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException, IOException {
labels = request.getNodeLabels();
RegisterNodeManagerResponse response =
recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
response.setNodeAction(NodeAction.NORMAL);
response.setContainerTokenMasterKey(createMasterKey());
response.setNMTokenMasterKey(createMasterKey());
response.setAreNodeLabelsAcceptedByRM(labels != null);
synchronized (ResourceTrackerForLabels.class) {
receivedNMRegister = true;
ResourceTrackerForLabels.class.notifyAll();
}
return response;
}
public void waitTillHeartbeat() {
if (receivedNMHeartbeat) {
return;
}
int i = 500;
while (!receivedNMHeartbeat && i > 0) {
synchronized (ResourceTrackerForLabels.class) {
if (!receivedNMHeartbeat) {
try {
System.out
.println("In ResourceTrackerForLabels waiting for heartbeat : "
+ System.currentTimeMillis());
ResourceTrackerForLabels.class.wait(500l);
// to avoid race condition, i.e. sendOutofBandHeartBeat can be
// sent before NSU thread has gone to sleep, hence we wait and try
// to resend heartbeat again
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
ResourceTrackerForLabels.class.wait(500l);
i--;
} catch (InterruptedException e) {
Assert.fail("Exception caught while waiting for Heartbeat");
e.printStackTrace();
}
}
}
}
if (!receivedNMHeartbeat) {
Assert.fail("Heartbeat dint receive even after waiting");
}
}
public void waitTillRegister() {
if (receivedNMRegister) {
return;
}
while (!receivedNMRegister) {
synchronized (ResourceTrackerForLabels.class) {
try {
ResourceTrackerForLabels.class.wait();
} catch (InterruptedException e) {
Assert.fail("Exception caught while waiting for register");
e.printStackTrace();
}
}
}
}
/**
* Flag to indicate received any
*/
public void resetNMHeartbeatReceiveFlag() {
synchronized (ResourceTrackerForLabels.class) {
receivedNMHeartbeat = false;
}
}
@Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException {
System.out.println("RTS receive heartbeat : "
+ System.currentTimeMillis());
labels = request.getNodeLabels();
NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartbeatID++);
NodeHeartbeatResponse nhResponse =
YarnServerBuilderUtils.newNodeHeartbeatResponse(heartbeatID,
NodeAction.NORMAL, null, null, null, null, 1000L);
// to ensure that heartbeats are sent only when required.
nhResponse.setNextHeartBeatInterval(Long.MAX_VALUE);
nhResponse.setAreNodeLabelsAcceptedByRM(labels != null);
synchronized (ResourceTrackerForLabels.class) {
receivedNMHeartbeat = true;
ResourceTrackerForLabels.class.notifyAll();
}
return nhResponse;
}
}
public static class DummyNodeLabelsProvider extends NodeLabelsProvider {
@SuppressWarnings("unchecked")
private Set<String> nodeLabels = Collections.EMPTY_SET;
public DummyNodeLabelsProvider() {
super(DummyNodeLabelsProvider.class.getName());
}
@Override
public synchronized Set<String> getNodeLabels() {
return nodeLabels;
}
synchronized void setNodeLabels(Set<String> nodeLabels) {
this.nodeLabels = nodeLabels;
}
}
private YarnConfiguration createNMConfigForDistributeNodeLabels() {
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE);
return conf;
}
@Test
public void testNodeStatusUpdaterForNodeLabels() throws InterruptedException,
IOException {
final ResourceTrackerForLabels resourceTracker =
new ResourceTrackerForLabels();
nm = new NodeManager() {
@Override
protected NodeLabelsProvider createNodeLabelsProvider(
Configuration conf) throws IOException {
return dummyLabelsProviderRef;
}
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
NodeLabelsProvider labelsProvider) {
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
metrics, labelsProvider) {
@Override
protected ResourceTracker getRMClient() {
return resourceTracker;
}
@Override
protected void stopRMProxy() {
return;
}
};
}
};
YarnConfiguration conf = createNMConfigForDistributeNodeLabels();
nm.init(conf);
resourceTracker.resetNMHeartbeatReceiveFlag();
nm.start();
resourceTracker.waitTillRegister();
assertCollectionEquals(resourceTracker.labels,
dummyLabelsProviderRef.getNodeLabels());
resourceTracker.waitTillHeartbeat();// wait till the first heartbeat
resourceTracker.resetNMHeartbeatReceiveFlag();
// heartbeat with updated labels
dummyLabelsProviderRef.setNodeLabels(toSet("P"));
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
assertCollectionEquals(resourceTracker.labels,
dummyLabelsProviderRef.getNodeLabels());
resourceTracker.resetNMHeartbeatReceiveFlag();
// heartbeat without updating labels
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
resourceTracker.resetNMHeartbeatReceiveFlag();
assertNull(
"If no change in labels then null should be sent as part of request",
resourceTracker.labels);
// provider return with null labels
dummyLabelsProviderRef.setNodeLabels(null);
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
resourceTracker.waitTillHeartbeat();
assertTrue("If provider sends null then empty labels should be sent",
resourceTracker.labels.isEmpty());
resourceTracker.resetNMHeartbeatReceiveFlag();
nm.stop();
}
}

View File

@ -21,6 +21,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
@ -31,6 +34,7 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionUtil;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -100,6 +104,8 @@ public class ResourceTrackerService extends AbstractService implements
private int minAllocMb;
private int minAllocVcores;
private boolean isDistributesNodeLabelsConf;
static {
resync.setNodeAction(NodeAction.RESYNC);
@ -149,6 +155,14 @@ public class ResourceTrackerService extends AbstractService implements
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
String nodeLabelConfigurationType =
conf.get(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
YarnConfiguration.DEFAULT_NODELABEL_CONFIGURATION_TYPE);
isDistributesNodeLabelsConf =
YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE
.equals(nodeLabelConfigurationType);
super.serviceInit(conf);
}
@ -336,11 +350,31 @@ public class ResourceTrackerService extends AbstractService implements
}
}
String message =
"NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: "
+ httpPort + ") " + "registered with capability: " + capability
+ ", assigned nodeId " + nodeId;
LOG.info(message);
// Update node's labels to RM's NodeLabelManager.
Set<String> nodeLabels = request.getNodeLabels();
if (isDistributesNodeLabelsConf && nodeLabels != null) {
try {
updateNodeLabelsFromNMReport(nodeLabels, nodeId);
response.setAreNodeLabelsAcceptedByRM(true);
} catch (IOException ex) {
// Ensure the exception is captured in the response
response.setDiagnosticsMessage(ex.getMessage());
response.setAreNodeLabelsAcceptedByRM(false);
}
}
StringBuilder message = new StringBuilder();
message.append("NodeManager from node ").append(host).append("(cmPort: ")
.append(cmPort).append(" httpPort: ");
message.append(httpPort).append(") ")
.append("registered with capability: ").append(capability);
message.append(", assigned nodeId ").append(nodeId);
if (response.getAreNodeLabelsAcceptedByRM()) {
message.append(", node labels { ").append(
StringUtils.join(",", nodeLabels) + " } ");
}
LOG.info(message.toString());
response.setNodeAction(NodeAction.NORMAL);
response.setRMIdentifier(ResourceManager.getClusterTimeStamp());
response.setRMVersion(YarnVersionInfo.getVersion());
@ -359,6 +393,7 @@ public class ResourceTrackerService extends AbstractService implements
* 2. Check if it's a registered node
* 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
* 4. Send healthStatus to RMNode
* 5. Update node's labels if distributed Node Labels configuration is enabled
*/
NodeId nodeId = remoteNodeStatus.getNodeId();
@ -428,9 +463,44 @@ public class ResourceTrackerService extends AbstractService implements
remoteNodeStatus.getContainersStatuses(),
remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));
// 5. Update node's labels to RM's NodeLabelManager.
if (isDistributesNodeLabelsConf && request.getNodeLabels() != null) {
try {
updateNodeLabelsFromNMReport(request.getNodeLabels(), nodeId);
nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(true);
} catch (IOException ex) {
//ensure the error message is captured and sent across in response
nodeHeartBeatResponse.setDiagnosticsMessage(ex.getMessage());
nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(false);
}
}
return nodeHeartBeatResponse;
}
private void updateNodeLabelsFromNMReport(Set<String> nodeLabels,
NodeId nodeId) throws IOException {
try {
Map<NodeId, Set<String>> labelsUpdate =
new HashMap<NodeId, Set<String>>();
labelsUpdate.put(nodeId, nodeLabels);
this.rmContext.getNodeLabelManager().replaceLabelsOnNode(labelsUpdate);
if (LOG.isDebugEnabled()) {
LOG.debug("Node Labels {" + StringUtils.join(",", nodeLabels)
+ "} from Node " + nodeId + " were Accepted from RM");
}
} catch (IOException ex) {
StringBuilder errorMessage = new StringBuilder();
errorMessage.append("Node Labels {")
.append(StringUtils.join(",", nodeLabels))
.append("} reported from NM with ID ").append(nodeId)
.append(" was rejected from RM with exception message as : ")
.append(ex.getMessage());
LOG.error(errorMessage, ex);
throw new IOException(errorMessage.toString(), ex);
}
}
private void populateKeys(NodeHeartbeatRequest request,
NodeHeartbeatResponse nodeHeartBeatResponse) {

View File

@ -27,8 +27,10 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
@ -49,11 +51,16 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@ -66,7 +73,7 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
public class TestResourceTrackerService {
public class TestResourceTrackerService extends NodeLabelTestBase {
private final static File TEMP_DIR = new File(System.getProperty(
"test.build.data", "/tmp"), "decommision");
@ -305,8 +312,425 @@ public class TestResourceTrackerService {
req.setHttpPort(1234);
req.setNMVersion(YarnVersionInfo.getVersion());
// trying to register a invalid node.
RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req);
Assert.assertEquals(NodeAction.NORMAL,response.getNodeAction());
RegisterNodeManagerResponse response =
resourceTrackerService.registerNodeManager(req);
Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction());
}
@Test
public void testNodeRegistrationWithLabels() throws Exception {
writeToHostsFile("host2");
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE);
final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager();
rm = new MockRM(conf) {
@Override
protected RMNodeLabelsManager createNodeLabelManager() {
return nodeLabelsMgr;
}
};
rm.start();
try {
nodeLabelsMgr.addToCluserNodeLabels(toSet("A", "B", "C"));
} catch (IOException e) {
Assert.fail("Caught Exception while intializing");
e.printStackTrace();
}
ResourceTrackerService resourceTrackerService =
rm.getResourceTrackerService();
RegisterNodeManagerRequest registerReq =
Records.newRecord(RegisterNodeManagerRequest.class);
NodeId nodeId = NodeId.newInstance("host2", 1234);
Resource capability = BuilderUtils.newResource(1024, 1);
registerReq.setResource(capability);
registerReq.setNodeId(nodeId);
registerReq.setHttpPort(1234);
registerReq.setNMVersion(YarnVersionInfo.getVersion());
registerReq.setNodeLabels(toSet("A"));
RegisterNodeManagerResponse response =
resourceTrackerService.registerNodeManager(registerReq);
Assert.assertEquals("Action should be normal on valid Node Labels",
NodeAction.NORMAL, response.getNodeAction());
assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId),
registerReq.getNodeLabels());
Assert.assertTrue("Valid Node Labels were not accepted by RM",
response.getAreNodeLabelsAcceptedByRM());
rm.stop();
}
@Test
public void testNodeRegistrationWithInvalidLabels() throws Exception {
writeToHostsFile("host2");
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE);
final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager();
rm = new MockRM(conf) {
@Override
protected RMNodeLabelsManager createNodeLabelManager() {
return nodeLabelsMgr;
}
};
rm.start();
try {
nodeLabelsMgr.addToCluserNodeLabels(toSet("X", "Y", "Z"));
} catch (IOException e) {
Assert.fail("Caught Exception while intializing");
e.printStackTrace();
}
ResourceTrackerService resourceTrackerService =
rm.getResourceTrackerService();
RegisterNodeManagerRequest registerReq =
Records.newRecord(RegisterNodeManagerRequest.class);
NodeId nodeId = NodeId.newInstance("host2", 1234);
Resource capability = BuilderUtils.newResource(1024, 1);
registerReq.setResource(capability);
registerReq.setNodeId(nodeId);
registerReq.setHttpPort(1234);
registerReq.setNMVersion(YarnVersionInfo.getVersion());
registerReq.setNodeLabels(toSet("A", "B", "C"));
RegisterNodeManagerResponse response =
resourceTrackerService.registerNodeManager(registerReq);
Assert.assertEquals(
"On Invalid Node Labels action is expected to be normal",
NodeAction.NORMAL, response.getNodeAction());
Assert.assertNull(nodeLabelsMgr.getNodeLabels().get(nodeId));
Assert.assertNotNull(response.getDiagnosticsMessage());
Assert.assertFalse("Node Labels should not accepted by RM If Invalid",
response.getAreNodeLabelsAcceptedByRM());
if (rm != null) {
rm.stop();
}
}
@Test
public void testNodeRegistrationWithInvalidLabelsSyntax() throws Exception {
writeToHostsFile("host2");
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE);
final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager();
rm = new MockRM(conf) {
@Override
protected RMNodeLabelsManager createNodeLabelManager() {
return nodeLabelsMgr;
}
};
rm.start();
try {
nodeLabelsMgr.addToCluserNodeLabels(toSet("X", "Y", "Z"));
} catch (IOException e) {
Assert.fail("Caught Exception while intializing");
e.printStackTrace();
}
ResourceTrackerService resourceTrackerService =
rm.getResourceTrackerService();
RegisterNodeManagerRequest req =
Records.newRecord(RegisterNodeManagerRequest.class);
NodeId nodeId = NodeId.newInstance("host2", 1234);
Resource capability = BuilderUtils.newResource(1024, 1);
req.setResource(capability);
req.setNodeId(nodeId);
req.setHttpPort(1234);
req.setNMVersion(YarnVersionInfo.getVersion());
req.setNodeLabels(toSet("#Y"));
RegisterNodeManagerResponse response =
resourceTrackerService.registerNodeManager(req);
Assert.assertEquals(
"On Invalid Node Labels action is expected to be normal",
NodeAction.NORMAL, response.getNodeAction());
Assert.assertNull(nodeLabelsMgr.getNodeLabels().get(nodeId));
Assert.assertNotNull(response.getDiagnosticsMessage());
Assert.assertFalse("Node Labels should not accepted by RM If Invalid",
response.getAreNodeLabelsAcceptedByRM());
if (rm != null) {
rm.stop();
}
}
@Test
public void testNodeRegistrationWithCentralLabelConfig() throws Exception {
writeToHostsFile("host2");
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
YarnConfiguration.DEFAULT_NODELABEL_CONFIGURATION_TYPE);
final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager();
rm = new MockRM(conf) {
@Override
protected RMNodeLabelsManager createNodeLabelManager() {
return nodeLabelsMgr;
}
};
rm.start();
try {
nodeLabelsMgr.addToCluserNodeLabels(toSet("A", "B", "C"));
} catch (IOException e) {
Assert.fail("Caught Exception while intializing");
e.printStackTrace();
}
ResourceTrackerService resourceTrackerService =
rm.getResourceTrackerService();
RegisterNodeManagerRequest req =
Records.newRecord(RegisterNodeManagerRequest.class);
NodeId nodeId = NodeId.newInstance("host2", 1234);
Resource capability = BuilderUtils.newResource(1024, 1);
req.setResource(capability);
req.setNodeId(nodeId);
req.setHttpPort(1234);
req.setNMVersion(YarnVersionInfo.getVersion());
req.setNodeLabels(toSet("A"));
RegisterNodeManagerResponse response =
resourceTrackerService.registerNodeManager(req);
// registered to RM with central label config
Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction());
Assert.assertNull(nodeLabelsMgr.getNodeLabels().get(nodeId));
Assert
.assertFalse(
"Node Labels should not accepted by RM If its configured with Central configuration",
response.getAreNodeLabelsAcceptedByRM());
if (rm != null) {
rm.stop();
}
}
@SuppressWarnings("unchecked")
private NodeStatus getNodeStatusObject(NodeId nodeId) {
NodeStatus status = Records.newRecord(NodeStatus.class);
status.setNodeId(nodeId);
status.setResponseId(0);
status.setContainersStatuses(Collections.EMPTY_LIST);
status.setKeepAliveApplications(Collections.EMPTY_LIST);
return status;
}
@Test
public void testNodeHeartBeatWithLabels() throws Exception {
writeToHostsFile("host2");
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE);
final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager();
rm = new MockRM(conf) {
@Override
protected RMNodeLabelsManager createNodeLabelManager() {
return nodeLabelsMgr;
}
};
rm.start();
// adding valid labels
try {
nodeLabelsMgr.addToCluserNodeLabels(toSet("A", "B", "C"));
} catch (IOException e) {
Assert.fail("Caught Exception while intializing");
e.printStackTrace();
}
// Registering of labels and other required info to RM
ResourceTrackerService resourceTrackerService =
rm.getResourceTrackerService();
RegisterNodeManagerRequest registerReq =
Records.newRecord(RegisterNodeManagerRequest.class);
NodeId nodeId = NodeId.newInstance("host2", 1234);
Resource capability = BuilderUtils.newResource(1024, 1);
registerReq.setResource(capability);
registerReq.setNodeId(nodeId);
registerReq.setHttpPort(1234);
registerReq.setNMVersion(YarnVersionInfo.getVersion());
registerReq.setNodeLabels(toSet("A")); // Node register label
RegisterNodeManagerResponse registerResponse =
resourceTrackerService.registerNodeManager(registerReq);
// modification of labels during heartbeat
NodeHeartbeatRequest heartbeatReq =
Records.newRecord(NodeHeartbeatRequest.class);
heartbeatReq.setNodeLabels(toSet("B")); // Node heartbeat label update
NodeStatus nodeStatusObject = getNodeStatusObject(nodeId);
heartbeatReq.setNodeStatus(nodeStatusObject);
heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
.getNMTokenMasterKey());
heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
.getContainerTokenMasterKey());
NodeHeartbeatResponse nodeHeartbeatResponse =
resourceTrackerService.nodeHeartbeat(heartbeatReq);
Assert.assertEquals("InValid Node Labels were not accepted by RM",
NodeAction.NORMAL, nodeHeartbeatResponse.getNodeAction());
assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId),
heartbeatReq.getNodeLabels());
Assert.assertTrue("Valid Node Labels were not accepted by RM",
nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM());
// After modification of labels next heartbeat sends null informing no update
Set<String> oldLabels = nodeLabelsMgr.getNodeLabels().get(nodeId);
int responseId = nodeStatusObject.getResponseId();
heartbeatReq =
Records.newRecord(NodeHeartbeatRequest.class);
heartbeatReq.setNodeLabels(null); // Node heartbeat label update
nodeStatusObject = getNodeStatusObject(nodeId);
nodeStatusObject.setResponseId(responseId+2);
heartbeatReq.setNodeStatus(nodeStatusObject);
heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
.getNMTokenMasterKey());
heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
.getContainerTokenMasterKey());
nodeHeartbeatResponse = resourceTrackerService.nodeHeartbeat(heartbeatReq);
Assert.assertEquals("InValid Node Labels were not accepted by RM",
NodeAction.NORMAL, nodeHeartbeatResponse.getNodeAction());
assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId),
oldLabels);
Assert.assertFalse("Node Labels should not accepted by RM",
nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM());
rm.stop();
}
@Test
public void testNodeHeartBeatWithInvalidLabels() throws Exception {
writeToHostsFile("host2");
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE);
final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager();
rm = new MockRM(conf) {
@Override
protected RMNodeLabelsManager createNodeLabelManager() {
return nodeLabelsMgr;
}
};
rm.start();
try {
nodeLabelsMgr.addToCluserNodeLabels(toSet("A", "B", "C"));
} catch (IOException e) {
Assert.fail("Caught Exception while intializing");
e.printStackTrace();
}
ResourceTrackerService resourceTrackerService =
rm.getResourceTrackerService();
RegisterNodeManagerRequest registerReq =
Records.newRecord(RegisterNodeManagerRequest.class);
NodeId nodeId = NodeId.newInstance("host2", 1234);
Resource capability = BuilderUtils.newResource(1024, 1);
registerReq.setResource(capability);
registerReq.setNodeId(nodeId);
registerReq.setHttpPort(1234);
registerReq.setNMVersion(YarnVersionInfo.getVersion());
registerReq.setNodeLabels(toSet("A"));
RegisterNodeManagerResponse registerResponse =
resourceTrackerService.registerNodeManager(registerReq);
NodeHeartbeatRequest heartbeatReq =
Records.newRecord(NodeHeartbeatRequest.class);
heartbeatReq.setNodeLabels(toSet("B", "#C")); // Invalid heart beat labels
heartbeatReq.setNodeStatus(getNodeStatusObject(nodeId));
heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
.getNMTokenMasterKey());
heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
.getContainerTokenMasterKey());
NodeHeartbeatResponse nodeHeartbeatResponse =
resourceTrackerService.nodeHeartbeat(heartbeatReq);
// response should be NORMAL when RM heartbeat labels are rejected
Assert.assertEquals("Response should be NORMAL when RM heartbeat labels"
+ " are rejected", NodeAction.NORMAL,
nodeHeartbeatResponse.getNodeAction());
Assert.assertFalse(nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM());
Assert.assertNotNull(nodeHeartbeatResponse.getDiagnosticsMessage());
rm.stop();
}
@Test
public void testNodeHeartbeatWithCentralLabelConfig() throws Exception {
writeToHostsFile("host2");
Configuration conf = new Configuration();
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
YarnConfiguration.DEFAULT_NODELABEL_CONFIGURATION_TYPE);
final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager();
rm = new MockRM(conf) {
@Override
protected RMNodeLabelsManager createNodeLabelManager() {
return nodeLabelsMgr;
}
};
rm.start();
ResourceTrackerService resourceTrackerService =
rm.getResourceTrackerService();
RegisterNodeManagerRequest req =
Records.newRecord(RegisterNodeManagerRequest.class);
NodeId nodeId = NodeId.newInstance("host2", 1234);
Resource capability = BuilderUtils.newResource(1024, 1);
req.setResource(capability);
req.setNodeId(nodeId);
req.setHttpPort(1234);
req.setNMVersion(YarnVersionInfo.getVersion());
req.setNodeLabels(toSet("A", "B", "C"));
RegisterNodeManagerResponse registerResponse =
resourceTrackerService.registerNodeManager(req);
NodeHeartbeatRequest heartbeatReq =
Records.newRecord(NodeHeartbeatRequest.class);
heartbeatReq.setNodeLabels(toSet("B")); // Valid heart beat labels
heartbeatReq.setNodeStatus(getNodeStatusObject(nodeId));
heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
.getNMTokenMasterKey());
heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
.getContainerTokenMasterKey());
NodeHeartbeatResponse nodeHeartbeatResponse =
resourceTrackerService.nodeHeartbeat(heartbeatReq);
// response should be ok but the RMacceptNodeLabelsUpdate should be false
Assert.assertEquals(NodeAction.NORMAL,
nodeHeartbeatResponse.getNodeAction());
// no change in the labels,
Assert.assertNull(nodeLabelsMgr.getNodeLabels().get(nodeId));
// heartbeat labels rejected
Assert.assertFalse("Invalid Node Labels should not accepted by RM",
nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM());
if (rm != null) {
rm.stop();
}
}
@Test