YARN-2495. Allow admin specify labels from each NM (Distributed configuration for node label). (Naganarasimha G R via wangda)
(cherry picked from commit 2a945d24f7
)
This commit is contained in:
parent
dd5b2dac5a
commit
cba4ed1678
|
@ -35,6 +35,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
|
|
||||||
YARN-3288. Document and fix indentation in the DockerContainerExecutor code
|
YARN-3288. Document and fix indentation in the DockerContainerExecutor code
|
||||||
|
|
||||||
|
YARN-2495. Allow admin specify labels from each NM (Distributed
|
||||||
|
configuration for node label). (Naganarasimha G R via wangda)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||||
|
|
|
@ -1720,6 +1720,18 @@ public class YarnConfiguration extends Configuration {
|
||||||
+ "enabled";
|
+ "enabled";
|
||||||
public static final boolean DEFAULT_NODE_LABELS_ENABLED = false;
|
public static final boolean DEFAULT_NODE_LABELS_ENABLED = false;
|
||||||
|
|
||||||
|
public static final String NODELABEL_CONFIGURATION_TYPE =
|
||||||
|
NODE_LABELS_PREFIX + "configuration-type";
|
||||||
|
|
||||||
|
public static final String CENTALIZED_NODELABEL_CONFIGURATION_TYPE =
|
||||||
|
"centralized";
|
||||||
|
|
||||||
|
public static final String DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE =
|
||||||
|
"distributed";
|
||||||
|
|
||||||
|
public static final String DEFAULT_NODELABEL_CONFIGURATION_TYPE =
|
||||||
|
CENTALIZED_NODELABEL_CONFIGURATION_TYPE;
|
||||||
|
|
||||||
public YarnConfiguration() {
|
public YarnConfiguration() {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
|
|
@ -239,6 +239,10 @@ message NodeIdToLabelsProto {
|
||||||
repeated string nodeLabels = 2;
|
repeated string nodeLabels = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message StringArrayProto {
|
||||||
|
repeated string elements = 1;
|
||||||
|
}
|
||||||
|
|
||||||
message LabelsToNodeIdsProto {
|
message LabelsToNodeIdsProto {
|
||||||
optional string nodeLabels = 1;
|
optional string nodeLabels = 1;
|
||||||
repeated NodeIdProto nodeId = 2;
|
repeated NodeIdProto nodeId = 2;
|
||||||
|
|
|
@ -70,7 +70,7 @@ public class TestResourceTrackerOnHA extends ProtocolHATestBase{
|
||||||
NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null,
|
NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null,
|
||||||
null, null);
|
null, null);
|
||||||
NodeHeartbeatRequest request2 =
|
NodeHeartbeatRequest request2 =
|
||||||
NodeHeartbeatRequest.newInstance(status, null, null);
|
NodeHeartbeatRequest.newInstance(status, null, null,null);
|
||||||
resourceTracker.nodeHeartbeat(request2);
|
resourceTracker.nodeHeartbeat(request2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
@ -26,7 +28,7 @@ public abstract class NodeHeartbeatRequest {
|
||||||
|
|
||||||
public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
|
public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
|
||||||
MasterKey lastKnownContainerTokenMasterKey,
|
MasterKey lastKnownContainerTokenMasterKey,
|
||||||
MasterKey lastKnownNMTokenMasterKey) {
|
MasterKey lastKnownNMTokenMasterKey, Set<String> nodeLabels) {
|
||||||
NodeHeartbeatRequest nodeHeartbeatRequest =
|
NodeHeartbeatRequest nodeHeartbeatRequest =
|
||||||
Records.newRecord(NodeHeartbeatRequest.class);
|
Records.newRecord(NodeHeartbeatRequest.class);
|
||||||
nodeHeartbeatRequest.setNodeStatus(nodeStatus);
|
nodeHeartbeatRequest.setNodeStatus(nodeStatus);
|
||||||
|
@ -34,6 +36,7 @@ public abstract class NodeHeartbeatRequest {
|
||||||
.setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey);
|
.setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey);
|
||||||
nodeHeartbeatRequest
|
nodeHeartbeatRequest
|
||||||
.setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
|
.setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
|
||||||
|
nodeHeartbeatRequest.setNodeLabels(nodeLabels);
|
||||||
return nodeHeartbeatRequest;
|
return nodeHeartbeatRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,4 +48,7 @@ public abstract class NodeHeartbeatRequest {
|
||||||
|
|
||||||
public abstract MasterKey getLastKnownNMTokenMasterKey();
|
public abstract MasterKey getLastKnownNMTokenMasterKey();
|
||||||
public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey);
|
public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey);
|
||||||
|
|
||||||
|
public abstract Set<String> getNodeLabels();
|
||||||
|
public abstract void setNodeLabels(Set<String> nodeLabels);
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,4 +67,7 @@ public interface NodeHeartbeatResponse {
|
||||||
|
|
||||||
void setSystemCredentialsForApps(
|
void setSystemCredentialsForApps(
|
||||||
Map<ApplicationId, ByteBuffer> systemCredentials);
|
Map<ApplicationId, ByteBuffer> systemCredentials);
|
||||||
|
|
||||||
|
boolean getAreNodeLabelsAcceptedByRM();
|
||||||
|
void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
@ -31,6 +32,14 @@ public abstract class RegisterNodeManagerRequest {
|
||||||
int httpPort, Resource resource, String nodeManagerVersionId,
|
int httpPort, Resource resource, String nodeManagerVersionId,
|
||||||
List<NMContainerStatus> containerStatuses,
|
List<NMContainerStatus> containerStatuses,
|
||||||
List<ApplicationId> runningApplications) {
|
List<ApplicationId> runningApplications) {
|
||||||
|
return newInstance(nodeId, httpPort, resource, nodeManagerVersionId,
|
||||||
|
containerStatuses, runningApplications, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
|
||||||
|
int httpPort, Resource resource, String nodeManagerVersionId,
|
||||||
|
List<NMContainerStatus> containerStatuses,
|
||||||
|
List<ApplicationId> runningApplications, Set<String> nodeLabels) {
|
||||||
RegisterNodeManagerRequest request =
|
RegisterNodeManagerRequest request =
|
||||||
Records.newRecord(RegisterNodeManagerRequest.class);
|
Records.newRecord(RegisterNodeManagerRequest.class);
|
||||||
request.setHttpPort(httpPort);
|
request.setHttpPort(httpPort);
|
||||||
|
@ -39,6 +48,7 @@ public abstract class RegisterNodeManagerRequest {
|
||||||
request.setNMVersion(nodeManagerVersionId);
|
request.setNMVersion(nodeManagerVersionId);
|
||||||
request.setContainerStatuses(containerStatuses);
|
request.setContainerStatuses(containerStatuses);
|
||||||
request.setRunningApplications(runningApplications);
|
request.setRunningApplications(runningApplications);
|
||||||
|
request.setNodeLabels(nodeLabels);
|
||||||
return request;
|
return request;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -47,6 +57,8 @@ public abstract class RegisterNodeManagerRequest {
|
||||||
public abstract Resource getResource();
|
public abstract Resource getResource();
|
||||||
public abstract String getNMVersion();
|
public abstract String getNMVersion();
|
||||||
public abstract List<NMContainerStatus> getNMContainerStatuses();
|
public abstract List<NMContainerStatus> getNMContainerStatuses();
|
||||||
|
public abstract Set<String> getNodeLabels();
|
||||||
|
public abstract void setNodeLabels(Set<String> nodeLabels);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* We introduce this here because currently YARN RM doesn't persist nodes info
|
* We introduce this here because currently YARN RM doesn't persist nodes info
|
||||||
|
|
|
@ -45,4 +45,7 @@ public interface RegisterNodeManagerResponse {
|
||||||
void setRMVersion(String version);
|
void setRMVersion(String version);
|
||||||
|
|
||||||
String getRMVersion();
|
String getRMVersion();
|
||||||
|
|
||||||
|
boolean getAreNodeLabelsAcceptedByRM();
|
||||||
|
void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,11 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
|
||||||
|
@ -36,6 +41,7 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
|
||||||
private NodeStatus nodeStatus = null;
|
private NodeStatus nodeStatus = null;
|
||||||
private MasterKey lastKnownContainerTokenMasterKey = null;
|
private MasterKey lastKnownContainerTokenMasterKey = null;
|
||||||
private MasterKey lastKnownNMTokenMasterKey = null;
|
private MasterKey lastKnownNMTokenMasterKey = null;
|
||||||
|
private Set<String> labels = null;
|
||||||
|
|
||||||
public NodeHeartbeatRequestPBImpl() {
|
public NodeHeartbeatRequestPBImpl() {
|
||||||
builder = NodeHeartbeatRequestProto.newBuilder();
|
builder = NodeHeartbeatRequestProto.newBuilder();
|
||||||
|
@ -80,6 +86,11 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
|
||||||
builder.setLastKnownNmTokenMasterKey(
|
builder.setLastKnownNmTokenMasterKey(
|
||||||
convertToProtoFormat(this.lastKnownNMTokenMasterKey));
|
convertToProtoFormat(this.lastKnownNMTokenMasterKey));
|
||||||
}
|
}
|
||||||
|
if (this.labels != null) {
|
||||||
|
builder.clearNodeLabels();
|
||||||
|
builder.setNodeLabels(StringArrayProto.newBuilder()
|
||||||
|
.addAllElements(this.labels).build());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void mergeLocalToProto() {
|
private void mergeLocalToProto() {
|
||||||
|
@ -178,4 +189,30 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
|
||||||
private MasterKeyProto convertToProtoFormat(MasterKey t) {
|
private MasterKeyProto convertToProtoFormat(MasterKey t) {
|
||||||
return ((MasterKeyPBImpl)t).getProto();
|
return ((MasterKeyPBImpl)t).getProto();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<String> getNodeLabels() {
|
||||||
|
initNodeLabels();
|
||||||
|
return this.labels;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setNodeLabels(Set<String> nodeLabels) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
builder.clearNodeLabels();
|
||||||
|
this.labels = nodeLabels;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initNodeLabels() {
|
||||||
|
if (this.labels != null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if (!p.hasNodeLabels()) {
|
||||||
|
labels = null;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
StringArrayProto nodeLabels = p.getNodeLabels();
|
||||||
|
labels = new HashSet<String>(nodeLabels.getElementsList());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -483,5 +483,18 @@ public class NodeHeartbeatResponsePBImpl extends
|
||||||
private MasterKeyProto convertToProtoFormat(MasterKey t) {
|
private MasterKeyProto convertToProtoFormat(MasterKey t) {
|
||||||
return ((MasterKeyPBImpl) t).getProto();
|
return ((MasterKeyPBImpl) t).getProto();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean getAreNodeLabelsAcceptedByRM() {
|
||||||
|
NodeHeartbeatResponseProtoOrBuilder p =
|
||||||
|
this.viaProto ? this.proto : this.builder;
|
||||||
|
return p.getAreNodeLabelsAcceptedByRM();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,33 +20,28 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||||
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
|
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
|
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
|
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdToLabelsProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.StringArrayProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest {
|
public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest {
|
||||||
RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
|
RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
|
||||||
RegisterNodeManagerRequestProto.Builder builder = null;
|
RegisterNodeManagerRequestProto.Builder builder = null;
|
||||||
|
@ -56,6 +51,7 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
|
||||||
private NodeId nodeId = null;
|
private NodeId nodeId = null;
|
||||||
private List<NMContainerStatus> containerStatuses = null;
|
private List<NMContainerStatus> containerStatuses = null;
|
||||||
private List<ApplicationId> runningApplications = null;
|
private List<ApplicationId> runningApplications = null;
|
||||||
|
private Set<String> labels = null;
|
||||||
|
|
||||||
public RegisterNodeManagerRequestPBImpl() {
|
public RegisterNodeManagerRequestPBImpl() {
|
||||||
builder = RegisterNodeManagerRequestProto.newBuilder();
|
builder = RegisterNodeManagerRequestProto.newBuilder();
|
||||||
|
@ -86,7 +82,11 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
|
||||||
if (this.nodeId != null) {
|
if (this.nodeId != null) {
|
||||||
builder.setNodeId(convertToProtoFormat(this.nodeId));
|
builder.setNodeId(convertToProtoFormat(this.nodeId));
|
||||||
}
|
}
|
||||||
|
if (this.labels != null) {
|
||||||
|
builder.clearNodeLabels();
|
||||||
|
builder.setNodeLabels(StringArrayProto.newBuilder()
|
||||||
|
.addAllElements(this.labels).build());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void addNMContainerStatusesToProto() {
|
private synchronized void addNMContainerStatusesToProto() {
|
||||||
|
@ -292,6 +292,32 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
|
||||||
builder.setNmVersion(version);
|
builder.setNmVersion(version);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Set<String> getNodeLabels() {
|
||||||
|
initNodeLabels();
|
||||||
|
return this.labels;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setNodeLabels(Set<String> nodeLabels) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
builder.clearNodeLabels();
|
||||||
|
this.labels = nodeLabels;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initNodeLabels() {
|
||||||
|
if (this.labels != null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if (!p.hasNodeLabels()) {
|
||||||
|
labels=null;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
StringArrayProto nodeLabels = p.getNodeLabels();
|
||||||
|
labels = new HashSet<String>(nodeLabels.getElementsList());
|
||||||
|
}
|
||||||
|
|
||||||
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
|
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
|
||||||
return new ApplicationIdPBImpl(p);
|
return new ApplicationIdPBImpl(p);
|
||||||
}
|
}
|
||||||
|
|
|
@ -216,4 +216,17 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
|
||||||
private MasterKeyProto convertToProtoFormat(MasterKey t) {
|
private MasterKeyProto convertToProtoFormat(MasterKey t) {
|
||||||
return ((MasterKeyPBImpl)t).getProto();
|
return ((MasterKeyPBImpl)t).getProto();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean getAreNodeLabelsAcceptedByRM() {
|
||||||
|
RegisterNodeManagerResponseProtoOrBuilder p =
|
||||||
|
this.viaProto ? this.proto : this.builder;
|
||||||
|
return p.getAreNodeLabelsAcceptedByRM();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
this.builder.setAreNodeLabelsAcceptedByRM(areNodeLabelsAcceptedByRM);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,7 @@ message RegisterNodeManagerRequestProto {
|
||||||
optional string nm_version = 5;
|
optional string nm_version = 5;
|
||||||
repeated NMContainerStatusProto container_statuses = 6;
|
repeated NMContainerStatusProto container_statuses = 6;
|
||||||
repeated ApplicationIdProto runningApplications = 7;
|
repeated ApplicationIdProto runningApplications = 7;
|
||||||
|
optional StringArrayProto nodeLabels = 8;
|
||||||
}
|
}
|
||||||
|
|
||||||
message RegisterNodeManagerResponseProto {
|
message RegisterNodeManagerResponseProto {
|
||||||
|
@ -41,12 +42,14 @@ message RegisterNodeManagerResponseProto {
|
||||||
optional int64 rm_identifier = 4;
|
optional int64 rm_identifier = 4;
|
||||||
optional string diagnostics_message = 5;
|
optional string diagnostics_message = 5;
|
||||||
optional string rm_version = 6;
|
optional string rm_version = 6;
|
||||||
|
optional bool areNodeLabelsAcceptedByRM = 7 [default = false];
|
||||||
}
|
}
|
||||||
|
|
||||||
message NodeHeartbeatRequestProto {
|
message NodeHeartbeatRequestProto {
|
||||||
optional NodeStatusProto node_status = 1;
|
optional NodeStatusProto node_status = 1;
|
||||||
optional MasterKeyProto last_known_container_token_master_key = 2;
|
optional MasterKeyProto last_known_container_token_master_key = 2;
|
||||||
optional MasterKeyProto last_known_nm_token_master_key = 3;
|
optional MasterKeyProto last_known_nm_token_master_key = 3;
|
||||||
|
optional StringArrayProto nodeLabels = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
message NodeHeartbeatResponseProto {
|
message NodeHeartbeatResponseProto {
|
||||||
|
@ -60,6 +63,7 @@ message NodeHeartbeatResponseProto {
|
||||||
optional string diagnostics_message = 8;
|
optional string diagnostics_message = 8;
|
||||||
repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
|
repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
|
||||||
repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
|
repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
|
||||||
|
optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
|
||||||
}
|
}
|
||||||
|
|
||||||
message SystemCredentialsForAppsProto {
|
message SystemCredentialsForAppsProto {
|
||||||
|
|
|
@ -19,11 +19,13 @@
|
||||||
package org.apache.hadoop.yarn;
|
package org.apache.hadoop.yarn;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.HashSet;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
@ -36,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.SerializedExceptionPBImpl;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
|
||||||
|
@ -46,6 +49,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl;
|
import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -77,7 +81,17 @@ public class TestYarnServerApiClasses {
|
||||||
assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
|
assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
|
||||||
assertEquals(NodeAction.NORMAL, copy.getNodeAction());
|
assertEquals(NodeAction.NORMAL, copy.getNodeAction());
|
||||||
assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
|
assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
|
||||||
|
assertFalse(copy.getAreNodeLabelsAcceptedByRM());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRegisterNodeManagerResponsePBImplWithRMAcceptLbls() {
|
||||||
|
RegisterNodeManagerResponsePBImpl original =
|
||||||
|
new RegisterNodeManagerResponsePBImpl();
|
||||||
|
original.setAreNodeLabelsAcceptedByRM(true);
|
||||||
|
RegisterNodeManagerResponsePBImpl copy =
|
||||||
|
new RegisterNodeManagerResponsePBImpl(original.getProto());
|
||||||
|
assertTrue(copy.getAreNodeLabelsAcceptedByRM());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -89,11 +103,32 @@ public class TestYarnServerApiClasses {
|
||||||
original.setLastKnownContainerTokenMasterKey(getMasterKey());
|
original.setLastKnownContainerTokenMasterKey(getMasterKey());
|
||||||
original.setLastKnownNMTokenMasterKey(getMasterKey());
|
original.setLastKnownNMTokenMasterKey(getMasterKey());
|
||||||
original.setNodeStatus(getNodeStatus());
|
original.setNodeStatus(getNodeStatus());
|
||||||
|
original.setNodeLabels(getValidNodeLabels());
|
||||||
NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl(
|
NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl(
|
||||||
original.getProto());
|
original.getProto());
|
||||||
assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId());
|
assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId());
|
||||||
assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId());
|
assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId());
|
||||||
assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost());
|
assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost());
|
||||||
|
// check labels are coming with valid values
|
||||||
|
Assert.assertTrue(original.getNodeLabels()
|
||||||
|
.containsAll(copy.getNodeLabels()));
|
||||||
|
// check for empty labels
|
||||||
|
original.setNodeLabels(new HashSet<String> ());
|
||||||
|
copy = new NodeHeartbeatRequestPBImpl(
|
||||||
|
original.getProto());
|
||||||
|
Assert.assertNotNull(copy.getNodeLabels());
|
||||||
|
Assert.assertEquals(0, copy.getNodeLabels().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test NodeHeartbeatRequestPBImpl.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testNodeHeartbeatRequestPBImplWithNullLabels() {
|
||||||
|
NodeHeartbeatRequestPBImpl original = new NodeHeartbeatRequestPBImpl();
|
||||||
|
NodeHeartbeatRequestPBImpl copy =
|
||||||
|
new NodeHeartbeatRequestPBImpl(original.getProto());
|
||||||
|
Assert.assertNull(copy.getNodeLabels());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -119,6 +154,16 @@ public class TestYarnServerApiClasses {
|
||||||
assertEquals(1, copy.getContainerTokenMasterKey().getKeyId());
|
assertEquals(1, copy.getContainerTokenMasterKey().getKeyId());
|
||||||
assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
|
assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
|
||||||
assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
|
assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
|
||||||
|
assertEquals(false, copy.getAreNodeLabelsAcceptedByRM());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeHeartbeatResponsePBImplWithRMAcceptLbls() {
|
||||||
|
NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl();
|
||||||
|
original.setAreNodeLabelsAcceptedByRM(true);
|
||||||
|
NodeHeartbeatResponsePBImpl copy =
|
||||||
|
new NodeHeartbeatResponsePBImpl(original.getProto());
|
||||||
|
assertTrue(copy.getAreNodeLabelsAcceptedByRM());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -208,6 +253,55 @@ public class TestYarnServerApiClasses {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRegisterNodeManagerRequestWithNullLabels() {
|
||||||
|
RegisterNodeManagerRequest request =
|
||||||
|
RegisterNodeManagerRequest.newInstance(
|
||||||
|
NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0),
|
||||||
|
"version", null, null);
|
||||||
|
|
||||||
|
// serialze to proto, and get request from proto
|
||||||
|
RegisterNodeManagerRequest request1 =
|
||||||
|
new RegisterNodeManagerRequestPBImpl(
|
||||||
|
((RegisterNodeManagerRequestPBImpl) request).getProto());
|
||||||
|
|
||||||
|
// check labels are coming with no values
|
||||||
|
Assert.assertNull(request1.getNodeLabels());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRegisterNodeManagerRequestWithValidLabels() {
|
||||||
|
HashSet<String> nodeLabels = getValidNodeLabels();
|
||||||
|
RegisterNodeManagerRequest request =
|
||||||
|
RegisterNodeManagerRequest.newInstance(
|
||||||
|
NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0),
|
||||||
|
"version", null, null, nodeLabels);
|
||||||
|
|
||||||
|
// serialze to proto, and get request from proto
|
||||||
|
RegisterNodeManagerRequest copy =
|
||||||
|
new RegisterNodeManagerRequestPBImpl(
|
||||||
|
((RegisterNodeManagerRequestPBImpl) request).getProto());
|
||||||
|
|
||||||
|
// check labels are coming with valid values
|
||||||
|
Assert.assertEquals(true, nodeLabels.containsAll(copy.getNodeLabels()));
|
||||||
|
|
||||||
|
// check for empty labels
|
||||||
|
request.setNodeLabels(new HashSet<String> ());
|
||||||
|
copy = new RegisterNodeManagerRequestPBImpl(
|
||||||
|
((RegisterNodeManagerRequestPBImpl) request).getProto());
|
||||||
|
Assert.assertNotNull(copy.getNodeLabels());
|
||||||
|
Assert.assertEquals(0, copy.getNodeLabels().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
private HashSet<String> getValidNodeLabels() {
|
||||||
|
HashSet<String> nodeLabels = new HashSet<String>();
|
||||||
|
nodeLabels.add("java");
|
||||||
|
nodeLabels.add("windows");
|
||||||
|
nodeLabels.add("gpu");
|
||||||
|
nodeLabels.add("x86");
|
||||||
|
return nodeLabels;
|
||||||
|
}
|
||||||
|
|
||||||
private ContainerStatus getContainerStatus(int applicationId,
|
private ContainerStatus getContainerStatus(int applicationId,
|
||||||
int containerID, int appAttemptId) {
|
int containerID, int appAttemptId) {
|
||||||
ContainerStatus status = recordFactory
|
ContainerStatus status = recordFactory
|
||||||
|
|
|
@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMLeveldbStateStoreService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
|
||||||
|
@ -79,6 +80,7 @@ public class NodeManager extends CompositeService
|
||||||
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
|
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
|
||||||
private ApplicationACLsManager aclsManager;
|
private ApplicationACLsManager aclsManager;
|
||||||
private NodeHealthCheckerService nodeHealthChecker;
|
private NodeHealthCheckerService nodeHealthChecker;
|
||||||
|
private NodeLabelsProvider nodeLabelsProvider;
|
||||||
private LocalDirsHandlerService dirsHandler;
|
private LocalDirsHandlerService dirsHandler;
|
||||||
private Context context;
|
private Context context;
|
||||||
private AsyncDispatcher dispatcher;
|
private AsyncDispatcher dispatcher;
|
||||||
|
@ -97,7 +99,22 @@ public class NodeManager extends CompositeService
|
||||||
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
||||||
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
|
||||||
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
|
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
|
||||||
metrics);
|
metrics, nodeLabelsProvider);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
||||||
|
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
|
||||||
|
NodeLabelsProvider nodeLabelsProvider) {
|
||||||
|
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
|
||||||
|
metrics, nodeLabelsProvider);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected NodeLabelsProvider createNodeLabelsProvider(
|
||||||
|
Configuration conf) throws IOException {
|
||||||
|
// TODO as part of YARN-2729
|
||||||
|
// Need to get the implementation of provider service and return
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected NodeResourceMonitor createNodeResourceMonitor() {
|
protected NodeResourceMonitor createNodeResourceMonitor() {
|
||||||
|
@ -224,8 +241,17 @@ public class NodeManager extends CompositeService
|
||||||
this.context = createNMContext(containerTokenSecretManager,
|
this.context = createNMContext(containerTokenSecretManager,
|
||||||
nmTokenSecretManager, nmStore);
|
nmTokenSecretManager, nmStore);
|
||||||
|
|
||||||
nodeStatusUpdater =
|
nodeLabelsProvider = createNodeLabelsProvider(conf);
|
||||||
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
|
|
||||||
|
if (null == nodeLabelsProvider) {
|
||||||
|
nodeStatusUpdater =
|
||||||
|
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
|
||||||
|
} else {
|
||||||
|
addService(nodeLabelsProvider);
|
||||||
|
nodeStatusUpdater =
|
||||||
|
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker,
|
||||||
|
nodeLabelsProvider);
|
||||||
|
}
|
||||||
|
|
||||||
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
|
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
|
||||||
addService(nodeResourceMonitor);
|
addService(nodeResourceMonitor);
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.security.Credentials;
|
import org.apache.hadoop.security.Credentials;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.VersionUtil;
|
import org.apache.hadoop.util.VersionUtil;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
@ -53,6 +54,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
|
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
|
||||||
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
||||||
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
|
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
|
||||||
|
@ -70,6 +72,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManag
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
|
||||||
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -120,15 +123,25 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER;
|
private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER;
|
||||||
Set<ContainerId> pendingContainersToRemove = new HashSet<ContainerId>();
|
Set<ContainerId> pendingContainersToRemove = new HashSet<ContainerId>();
|
||||||
|
|
||||||
|
private final NodeLabelsProvider nodeLabelsProvider;
|
||||||
|
private final boolean hasNodeLabelsProvider;
|
||||||
|
|
||||||
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
|
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
|
||||||
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
|
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
|
||||||
|
this(context, dispatcher, healthChecker, metrics, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
|
||||||
|
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
|
||||||
|
NodeLabelsProvider nodeLabelsProvider) {
|
||||||
super(NodeStatusUpdaterImpl.class.getName());
|
super(NodeStatusUpdaterImpl.class.getName());
|
||||||
this.healthChecker = healthChecker;
|
this.healthChecker = healthChecker;
|
||||||
|
this.nodeLabelsProvider = nodeLabelsProvider;
|
||||||
|
this.hasNodeLabelsProvider = (nodeLabelsProvider != null);
|
||||||
this.context = context;
|
this.context = context;
|
||||||
this.dispatcher = dispatcher;
|
this.dispatcher = dispatcher;
|
||||||
this.metrics = metrics;
|
this.metrics = metrics;
|
||||||
this.recentlyStoppedContainers =
|
this.recentlyStoppedContainers = new LinkedHashMap<ContainerId, Long>();
|
||||||
new LinkedHashMap<ContainerId, Long>();
|
|
||||||
this.pendingCompletedContainers =
|
this.pendingCompletedContainers =
|
||||||
new HashMap<ContainerId, ContainerStatus>();
|
new HashMap<ContainerId, ContainerStatus>();
|
||||||
}
|
}
|
||||||
|
@ -253,22 +266,30 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
protected void registerWithRM()
|
protected void registerWithRM()
|
||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
List<NMContainerStatus> containerReports = getNMContainerStatuses();
|
List<NMContainerStatus> containerReports = getNMContainerStatuses();
|
||||||
|
Set<String> nodeLabels = null;
|
||||||
|
if (hasNodeLabelsProvider) {
|
||||||
|
nodeLabels = nodeLabelsProvider.getNodeLabels();
|
||||||
|
nodeLabels =
|
||||||
|
(null == nodeLabels) ? CommonNodeLabelsManager.EMPTY_STRING_SET
|
||||||
|
: nodeLabels;
|
||||||
|
}
|
||||||
RegisterNodeManagerRequest request =
|
RegisterNodeManagerRequest request =
|
||||||
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
|
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
|
||||||
nodeManagerVersionId, containerReports, getRunningApplications());
|
nodeManagerVersionId, containerReports, getRunningApplications(),
|
||||||
|
nodeLabels);
|
||||||
if (containerReports != null) {
|
if (containerReports != null) {
|
||||||
LOG.info("Registering with RM using containers :" + containerReports);
|
LOG.info("Registering with RM using containers :" + containerReports);
|
||||||
}
|
}
|
||||||
RegisterNodeManagerResponse regNMResponse =
|
RegisterNodeManagerResponse regNMResponse =
|
||||||
resourceTracker.registerNodeManager(request);
|
resourceTracker.registerNodeManager(request);
|
||||||
this.rmIdentifier = regNMResponse.getRMIdentifier();
|
this.rmIdentifier = regNMResponse.getRMIdentifier();
|
||||||
// if the Resourcemanager instructs NM to shutdown.
|
// if the Resource Manager instructs NM to shutdown.
|
||||||
if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
|
if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
|
||||||
String message =
|
String message =
|
||||||
"Message from ResourceManager: "
|
"Message from ResourceManager: "
|
||||||
+ regNMResponse.getDiagnosticsMessage();
|
+ regNMResponse.getDiagnosticsMessage();
|
||||||
throw new YarnRuntimeException(
|
throw new YarnRuntimeException(
|
||||||
"Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed, "
|
"Recieved SHUTDOWN signal from Resourcemanager, Registration of NodeManager failed, "
|
||||||
+ message);
|
+ message);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -306,8 +327,21 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
this.context.getNMTokenSecretManager().setMasterKey(masterKey);
|
this.context.getNMTokenSecretManager().setMasterKey(masterKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Registered with ResourceManager as " + this.nodeId
|
StringBuilder successfullRegistrationMsg = new StringBuilder();
|
||||||
+ " with total resource of " + this.totalResource);
|
successfullRegistrationMsg.append("Registered with ResourceManager as ")
|
||||||
|
.append(this.nodeId).append(" with total resource of ")
|
||||||
|
.append(this.totalResource);
|
||||||
|
|
||||||
|
if (regNMResponse.getAreNodeLabelsAcceptedByRM()) {
|
||||||
|
successfullRegistrationMsg
|
||||||
|
.append(" and with following Node label(s) : {")
|
||||||
|
.append(StringUtils.join(",", nodeLabels)).append("}");
|
||||||
|
} else if (hasNodeLabelsProvider) {
|
||||||
|
//case where provider is set but RM did not accept the Node Labels
|
||||||
|
LOG.error(regNMResponse.getDiagnosticsMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info(successfullRegistrationMsg);
|
||||||
LOG.info("Notifying ContainerManager to unblock new container-requests");
|
LOG.info("Notifying ContainerManager to unblock new container-requests");
|
||||||
((ContainerManagerImpl) this.context.getContainerManager())
|
((ContainerManagerImpl) this.context.getContainerManager())
|
||||||
.setBlockNewContainerRequests(false);
|
.setBlockNewContainerRequests(false);
|
||||||
|
@ -580,19 +614,41 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void run() {
|
public void run() {
|
||||||
int lastHeartBeatID = 0;
|
int lastHeartbeatID = 0;
|
||||||
|
Set<String> lastUpdatedNodeLabelsToRM = null;
|
||||||
|
if (hasNodeLabelsProvider) {
|
||||||
|
lastUpdatedNodeLabelsToRM = nodeLabelsProvider.getNodeLabels();
|
||||||
|
lastUpdatedNodeLabelsToRM =
|
||||||
|
(null == lastUpdatedNodeLabelsToRM) ? CommonNodeLabelsManager.EMPTY_STRING_SET
|
||||||
|
: lastUpdatedNodeLabelsToRM;
|
||||||
|
}
|
||||||
while (!isStopped) {
|
while (!isStopped) {
|
||||||
// Send heartbeat
|
// Send heartbeat
|
||||||
try {
|
try {
|
||||||
NodeHeartbeatResponse response = null;
|
NodeHeartbeatResponse response = null;
|
||||||
NodeStatus nodeStatus = getNodeStatus(lastHeartBeatID);
|
Set<String> nodeLabelsForHeartbeat = null;
|
||||||
|
NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
|
||||||
|
|
||||||
|
if (hasNodeLabelsProvider) {
|
||||||
|
nodeLabelsForHeartbeat = nodeLabelsProvider.getNodeLabels();
|
||||||
|
//if the provider returns null then consider empty labels are set
|
||||||
|
nodeLabelsForHeartbeat =
|
||||||
|
(nodeLabelsForHeartbeat == null) ? CommonNodeLabelsManager.EMPTY_STRING_SET
|
||||||
|
: nodeLabelsForHeartbeat;
|
||||||
|
if (!areNodeLabelsUpdated(nodeLabelsForHeartbeat,
|
||||||
|
lastUpdatedNodeLabelsToRM)) {
|
||||||
|
//if nodelabels have not changed then no need to send
|
||||||
|
nodeLabelsForHeartbeat = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
NodeHeartbeatRequest request =
|
NodeHeartbeatRequest request =
|
||||||
NodeHeartbeatRequest.newInstance(nodeStatus,
|
NodeHeartbeatRequest.newInstance(nodeStatus,
|
||||||
NodeStatusUpdaterImpl.this.context
|
NodeStatusUpdaterImpl.this.context
|
||||||
.getContainerTokenSecretManager().getCurrentKey(),
|
.getContainerTokenSecretManager().getCurrentKey(),
|
||||||
NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager()
|
NodeStatusUpdaterImpl.this.context
|
||||||
.getCurrentKey());
|
.getNMTokenSecretManager().getCurrentKey(),
|
||||||
|
nodeLabelsForHeartbeat);
|
||||||
response = resourceTracker.nodeHeartbeat(request);
|
response = resourceTracker.nodeHeartbeat(request);
|
||||||
//get next heartbeat interval from response
|
//get next heartbeat interval from response
|
||||||
nextHeartBeatInterval = response.getNextHeartBeatInterval();
|
nextHeartBeatInterval = response.getNextHeartBeatInterval();
|
||||||
|
@ -623,6 +679,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (response.getAreNodeLabelsAcceptedByRM()) {
|
||||||
|
lastUpdatedNodeLabelsToRM = nodeLabelsForHeartbeat;
|
||||||
|
LOG.info("Node Labels {"
|
||||||
|
+ StringUtils.join(",", nodeLabelsForHeartbeat)
|
||||||
|
+ "} were Accepted by RM ");
|
||||||
|
} else if (nodeLabelsForHeartbeat != null) {
|
||||||
|
// case where NodeLabelsProvider is set and updated labels were
|
||||||
|
// sent to RM and RM rejected the labels
|
||||||
|
LOG.error(response.getDiagnosticsMessage());
|
||||||
|
}
|
||||||
|
|
||||||
// Explicitly put this method after checking the resync response. We
|
// Explicitly put this method after checking the resync response. We
|
||||||
// don't want to remove the completed containers before resync
|
// don't want to remove the completed containers before resync
|
||||||
// because these completed containers will be reported back to RM
|
// because these completed containers will be reported back to RM
|
||||||
|
@ -631,7 +698,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
removeOrTrackCompletedContainersFromContext(response
|
removeOrTrackCompletedContainersFromContext(response
|
||||||
.getContainersToBeRemovedFromNM());
|
.getContainersToBeRemovedFromNM());
|
||||||
|
|
||||||
lastHeartBeatID = response.getResponseId();
|
lastHeartbeatID = response.getResponseId();
|
||||||
List<ContainerId> containersToCleanup = response
|
List<ContainerId> containersToCleanup = response
|
||||||
.getContainersToCleanup();
|
.getContainersToCleanup();
|
||||||
if (!containersToCleanup.isEmpty()) {
|
if (!containersToCleanup.isEmpty()) {
|
||||||
|
@ -680,6 +747,23 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Caller should take care of sending non null nodelabels for both
|
||||||
|
* arguments
|
||||||
|
*
|
||||||
|
* @param nodeLabelsNew
|
||||||
|
* @param nodeLabelsOld
|
||||||
|
* @return if the New node labels are diff from the older one.
|
||||||
|
*/
|
||||||
|
private boolean areNodeLabelsUpdated(Set<String> nodeLabelsNew,
|
||||||
|
Set<String> nodeLabelsOld) {
|
||||||
|
if (nodeLabelsNew.size() != nodeLabelsOld.size()
|
||||||
|
|| !nodeLabelsOld.containsAll(nodeLabelsNew)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
private void updateMasterKeys(NodeHeartbeatResponse response) {
|
private void updateMasterKeys(NodeHeartbeatResponse response) {
|
||||||
// See if the master-key has rolled over
|
// See if the master-key has rolled over
|
||||||
MasterKey updatedMasterKey = response.getContainerTokenMasterKey();
|
MasterKey updatedMasterKey = response.getContainerTokenMasterKey();
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager.nodelabels;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Interface which will be responsible for fetching the labels
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public abstract class NodeLabelsProvider extends AbstractService {
|
||||||
|
|
||||||
|
public NodeLabelsProvider(String name) {
|
||||||
|
super(name);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Provides the labels. LabelProvider is expected to give same Labels
|
||||||
|
* continuously until there is a change in labels.
|
||||||
|
* If null is returned then Empty label set is assumed by the caller.
|
||||||
|
*
|
||||||
|
* @return Set of node label strings applicable for a node
|
||||||
|
*/
|
||||||
|
public abstract Set<String> getNodeLabels();
|
||||||
|
}
|
|
@ -1182,7 +1182,7 @@ public class TestNodeStatusUpdater {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
verifyNodeStartFailure(
|
verifyNodeStartFailure(
|
||||||
"Recieved SHUTDOWN signal from Resourcemanager ,"
|
"Recieved SHUTDOWN signal from Resourcemanager, "
|
||||||
+ "Registration of NodeManager failed, "
|
+ "Registration of NodeManager failed, "
|
||||||
+ "Message from ResourceManager: RM Shutting Down Node");
|
+ "Message from ResourceManager: RM Shutting Down Node");
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,281 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.service.ServiceOperations;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
|
||||||
|
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
|
||||||
|
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
|
||||||
|
private static final RecordFactory recordFactory = RecordFactoryProvider
|
||||||
|
.getRecordFactory(null);
|
||||||
|
|
||||||
|
private NodeManager nm;
|
||||||
|
protected DummyNodeLabelsProvider dummyLabelsProviderRef;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
dummyLabelsProviderRef = new DummyNodeLabelsProvider();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
if (null != nm) {
|
||||||
|
ServiceOperations.stop(nm);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class ResourceTrackerForLabels implements ResourceTracker {
|
||||||
|
int heartbeatID = 0;
|
||||||
|
Set<String> labels;
|
||||||
|
|
||||||
|
private boolean receivedNMHeartbeat = false;
|
||||||
|
private boolean receivedNMRegister = false;
|
||||||
|
|
||||||
|
private MasterKey createMasterKey() {
|
||||||
|
MasterKey masterKey = new MasterKeyPBImpl();
|
||||||
|
masterKey.setKeyId(123);
|
||||||
|
masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
|
||||||
|
.byteValue() }));
|
||||||
|
return masterKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RegisterNodeManagerResponse registerNodeManager(
|
||||||
|
RegisterNodeManagerRequest request) throws YarnException, IOException {
|
||||||
|
labels = request.getNodeLabels();
|
||||||
|
RegisterNodeManagerResponse response =
|
||||||
|
recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||||
|
response.setNodeAction(NodeAction.NORMAL);
|
||||||
|
response.setContainerTokenMasterKey(createMasterKey());
|
||||||
|
response.setNMTokenMasterKey(createMasterKey());
|
||||||
|
response.setAreNodeLabelsAcceptedByRM(labels != null);
|
||||||
|
synchronized (ResourceTrackerForLabels.class) {
|
||||||
|
receivedNMRegister = true;
|
||||||
|
ResourceTrackerForLabels.class.notifyAll();
|
||||||
|
}
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void waitTillHeartbeat() {
|
||||||
|
if (receivedNMHeartbeat) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
int i = 500;
|
||||||
|
while (!receivedNMHeartbeat && i > 0) {
|
||||||
|
synchronized (ResourceTrackerForLabels.class) {
|
||||||
|
if (!receivedNMHeartbeat) {
|
||||||
|
try {
|
||||||
|
System.out
|
||||||
|
.println("In ResourceTrackerForLabels waiting for heartbeat : "
|
||||||
|
+ System.currentTimeMillis());
|
||||||
|
ResourceTrackerForLabels.class.wait(500l);
|
||||||
|
// to avoid race condition, i.e. sendOutofBandHeartBeat can be
|
||||||
|
// sent before NSU thread has gone to sleep, hence we wait and try
|
||||||
|
// to resend heartbeat again
|
||||||
|
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
|
||||||
|
ResourceTrackerForLabels.class.wait(500l);
|
||||||
|
i--;
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Assert.fail("Exception caught while waiting for Heartbeat");
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!receivedNMHeartbeat) {
|
||||||
|
Assert.fail("Heartbeat dint receive even after waiting");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void waitTillRegister() {
|
||||||
|
if (receivedNMRegister) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
while (!receivedNMRegister) {
|
||||||
|
synchronized (ResourceTrackerForLabels.class) {
|
||||||
|
try {
|
||||||
|
ResourceTrackerForLabels.class.wait();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Assert.fail("Exception caught while waiting for register");
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Flag to indicate received any
|
||||||
|
*/
|
||||||
|
public void resetNMHeartbeatReceiveFlag() {
|
||||||
|
synchronized (ResourceTrackerForLabels.class) {
|
||||||
|
receivedNMHeartbeat = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
System.out.println("RTS receive heartbeat : "
|
||||||
|
+ System.currentTimeMillis());
|
||||||
|
labels = request.getNodeLabels();
|
||||||
|
NodeStatus nodeStatus = request.getNodeStatus();
|
||||||
|
nodeStatus.setResponseId(heartbeatID++);
|
||||||
|
|
||||||
|
NodeHeartbeatResponse nhResponse =
|
||||||
|
YarnServerBuilderUtils.newNodeHeartbeatResponse(heartbeatID,
|
||||||
|
NodeAction.NORMAL, null, null, null, null, 1000L);
|
||||||
|
|
||||||
|
// to ensure that heartbeats are sent only when required.
|
||||||
|
nhResponse.setNextHeartBeatInterval(Long.MAX_VALUE);
|
||||||
|
nhResponse.setAreNodeLabelsAcceptedByRM(labels != null);
|
||||||
|
|
||||||
|
synchronized (ResourceTrackerForLabels.class) {
|
||||||
|
receivedNMHeartbeat = true;
|
||||||
|
ResourceTrackerForLabels.class.notifyAll();
|
||||||
|
}
|
||||||
|
return nhResponse;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class DummyNodeLabelsProvider extends NodeLabelsProvider {
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private Set<String> nodeLabels = Collections.EMPTY_SET;
|
||||||
|
|
||||||
|
public DummyNodeLabelsProvider() {
|
||||||
|
super(DummyNodeLabelsProvider.class.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized Set<String> getNodeLabels() {
|
||||||
|
return nodeLabels;
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized void setNodeLabels(Set<String> nodeLabels) {
|
||||||
|
this.nodeLabels = nodeLabels;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private YarnConfiguration createNMConfigForDistributeNodeLabels() {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
|
||||||
|
YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE);
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeStatusUpdaterForNodeLabels() throws InterruptedException,
|
||||||
|
IOException {
|
||||||
|
final ResourceTrackerForLabels resourceTracker =
|
||||||
|
new ResourceTrackerForLabels();
|
||||||
|
nm = new NodeManager() {
|
||||||
|
@Override
|
||||||
|
protected NodeLabelsProvider createNodeLabelsProvider(
|
||||||
|
Configuration conf) throws IOException {
|
||||||
|
return dummyLabelsProviderRef;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
||||||
|
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
|
||||||
|
NodeLabelsProvider labelsProvider) {
|
||||||
|
|
||||||
|
return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker,
|
||||||
|
metrics, labelsProvider) {
|
||||||
|
@Override
|
||||||
|
protected ResourceTracker getRMClient() {
|
||||||
|
return resourceTracker;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void stopRMProxy() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
YarnConfiguration conf = createNMConfigForDistributeNodeLabels();
|
||||||
|
nm.init(conf);
|
||||||
|
resourceTracker.resetNMHeartbeatReceiveFlag();
|
||||||
|
nm.start();
|
||||||
|
resourceTracker.waitTillRegister();
|
||||||
|
assertCollectionEquals(resourceTracker.labels,
|
||||||
|
dummyLabelsProviderRef.getNodeLabels());
|
||||||
|
|
||||||
|
resourceTracker.waitTillHeartbeat();// wait till the first heartbeat
|
||||||
|
resourceTracker.resetNMHeartbeatReceiveFlag();
|
||||||
|
|
||||||
|
// heartbeat with updated labels
|
||||||
|
dummyLabelsProviderRef.setNodeLabels(toSet("P"));
|
||||||
|
|
||||||
|
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
|
||||||
|
resourceTracker.waitTillHeartbeat();
|
||||||
|
assertCollectionEquals(resourceTracker.labels,
|
||||||
|
dummyLabelsProviderRef.getNodeLabels());
|
||||||
|
resourceTracker.resetNMHeartbeatReceiveFlag();
|
||||||
|
|
||||||
|
// heartbeat without updating labels
|
||||||
|
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
|
||||||
|
resourceTracker.waitTillHeartbeat();
|
||||||
|
resourceTracker.resetNMHeartbeatReceiveFlag();
|
||||||
|
assertNull(
|
||||||
|
"If no change in labels then null should be sent as part of request",
|
||||||
|
resourceTracker.labels);
|
||||||
|
|
||||||
|
// provider return with null labels
|
||||||
|
dummyLabelsProviderRef.setNodeLabels(null);
|
||||||
|
nm.getNodeStatusUpdater().sendOutofBandHeartBeat();
|
||||||
|
resourceTracker.waitTillHeartbeat();
|
||||||
|
assertTrue("If provider sends null then empty labels should be sent",
|
||||||
|
resourceTracker.labels.isEmpty());
|
||||||
|
resourceTracker.resetNMHeartbeatReceiveFlag();
|
||||||
|
|
||||||
|
nm.stop();
|
||||||
|
}
|
||||||
|
}
|
|
@ -21,6 +21,9 @@ import java.io.IOException;
|
||||||
import java.io.InputStream;
|
import java.io.InputStream;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -31,6 +34,7 @@ import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.VersionUtil;
|
import org.apache.hadoop.util.VersionUtil;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
@ -100,6 +104,8 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
private int minAllocMb;
|
private int minAllocMb;
|
||||||
private int minAllocVcores;
|
private int minAllocVcores;
|
||||||
|
|
||||||
|
private boolean isDistributesNodeLabelsConf;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
resync.setNodeAction(NodeAction.RESYNC);
|
resync.setNodeAction(NodeAction.RESYNC);
|
||||||
|
|
||||||
|
@ -149,6 +155,14 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
|
YarnConfiguration.RM_NODEMANAGER_MINIMUM_VERSION,
|
||||||
YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
|
YarnConfiguration.DEFAULT_RM_NODEMANAGER_MINIMUM_VERSION);
|
||||||
|
|
||||||
|
String nodeLabelConfigurationType =
|
||||||
|
conf.get(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
|
||||||
|
YarnConfiguration.DEFAULT_NODELABEL_CONFIGURATION_TYPE);
|
||||||
|
|
||||||
|
isDistributesNodeLabelsConf =
|
||||||
|
YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE
|
||||||
|
.equals(nodeLabelConfigurationType);
|
||||||
|
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -336,11 +350,31 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
String message =
|
// Update node's labels to RM's NodeLabelManager.
|
||||||
"NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: "
|
Set<String> nodeLabels = request.getNodeLabels();
|
||||||
+ httpPort + ") " + "registered with capability: " + capability
|
if (isDistributesNodeLabelsConf && nodeLabels != null) {
|
||||||
+ ", assigned nodeId " + nodeId;
|
try {
|
||||||
LOG.info(message);
|
updateNodeLabelsFromNMReport(nodeLabels, nodeId);
|
||||||
|
response.setAreNodeLabelsAcceptedByRM(true);
|
||||||
|
} catch (IOException ex) {
|
||||||
|
// Ensure the exception is captured in the response
|
||||||
|
response.setDiagnosticsMessage(ex.getMessage());
|
||||||
|
response.setAreNodeLabelsAcceptedByRM(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
StringBuilder message = new StringBuilder();
|
||||||
|
message.append("NodeManager from node ").append(host).append("(cmPort: ")
|
||||||
|
.append(cmPort).append(" httpPort: ");
|
||||||
|
message.append(httpPort).append(") ")
|
||||||
|
.append("registered with capability: ").append(capability);
|
||||||
|
message.append(", assigned nodeId ").append(nodeId);
|
||||||
|
if (response.getAreNodeLabelsAcceptedByRM()) {
|
||||||
|
message.append(", node labels { ").append(
|
||||||
|
StringUtils.join(",", nodeLabels) + " } ");
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info(message.toString());
|
||||||
response.setNodeAction(NodeAction.NORMAL);
|
response.setNodeAction(NodeAction.NORMAL);
|
||||||
response.setRMIdentifier(ResourceManager.getClusterTimeStamp());
|
response.setRMIdentifier(ResourceManager.getClusterTimeStamp());
|
||||||
response.setRMVersion(YarnVersionInfo.getVersion());
|
response.setRMVersion(YarnVersionInfo.getVersion());
|
||||||
|
@ -359,6 +393,7 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
* 2. Check if it's a registered node
|
* 2. Check if it's a registered node
|
||||||
* 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
|
* 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
|
||||||
* 4. Send healthStatus to RMNode
|
* 4. Send healthStatus to RMNode
|
||||||
|
* 5. Update node's labels if distributed Node Labels configuration is enabled
|
||||||
*/
|
*/
|
||||||
|
|
||||||
NodeId nodeId = remoteNodeStatus.getNodeId();
|
NodeId nodeId = remoteNodeStatus.getNodeId();
|
||||||
|
@ -428,9 +463,44 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
remoteNodeStatus.getContainersStatuses(),
|
remoteNodeStatus.getContainersStatuses(),
|
||||||
remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));
|
remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));
|
||||||
|
|
||||||
|
// 5. Update node's labels to RM's NodeLabelManager.
|
||||||
|
if (isDistributesNodeLabelsConf && request.getNodeLabels() != null) {
|
||||||
|
try {
|
||||||
|
updateNodeLabelsFromNMReport(request.getNodeLabels(), nodeId);
|
||||||
|
nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(true);
|
||||||
|
} catch (IOException ex) {
|
||||||
|
//ensure the error message is captured and sent across in response
|
||||||
|
nodeHeartBeatResponse.setDiagnosticsMessage(ex.getMessage());
|
||||||
|
nodeHeartBeatResponse.setAreNodeLabelsAcceptedByRM(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nodeHeartBeatResponse;
|
return nodeHeartBeatResponse;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void updateNodeLabelsFromNMReport(Set<String> nodeLabels,
|
||||||
|
NodeId nodeId) throws IOException {
|
||||||
|
try {
|
||||||
|
Map<NodeId, Set<String>> labelsUpdate =
|
||||||
|
new HashMap<NodeId, Set<String>>();
|
||||||
|
labelsUpdate.put(nodeId, nodeLabels);
|
||||||
|
this.rmContext.getNodeLabelManager().replaceLabelsOnNode(labelsUpdate);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Node Labels {" + StringUtils.join(",", nodeLabels)
|
||||||
|
+ "} from Node " + nodeId + " were Accepted from RM");
|
||||||
|
}
|
||||||
|
} catch (IOException ex) {
|
||||||
|
StringBuilder errorMessage = new StringBuilder();
|
||||||
|
errorMessage.append("Node Labels {")
|
||||||
|
.append(StringUtils.join(",", nodeLabels))
|
||||||
|
.append("} reported from NM with ID ").append(nodeId)
|
||||||
|
.append(" was rejected from RM with exception message as : ")
|
||||||
|
.append(ex.getMessage());
|
||||||
|
LOG.error(errorMessage, ex);
|
||||||
|
throw new IOException(errorMessage.toString(), ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void populateKeys(NodeHeartbeatRequest request,
|
private void populateKeys(NodeHeartbeatRequest request,
|
||||||
NodeHeartbeatResponse nodeHeartBeatResponse) {
|
NodeHeartbeatResponse nodeHeartBeatResponse) {
|
||||||
|
|
||||||
|
|
|
@ -27,8 +27,10 @@ import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
@ -49,11 +51,16 @@ import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.Event;
|
import org.apache.hadoop.yarn.event.Event;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
|
import org.apache.hadoop.yarn.nodelabels.NodeLabelTestBase;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
@ -66,7 +73,7 @@ import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestResourceTrackerService {
|
public class TestResourceTrackerService extends NodeLabelTestBase {
|
||||||
|
|
||||||
private final static File TEMP_DIR = new File(System.getProperty(
|
private final static File TEMP_DIR = new File(System.getProperty(
|
||||||
"test.build.data", "/tmp"), "decommision");
|
"test.build.data", "/tmp"), "decommision");
|
||||||
|
@ -305,8 +312,425 @@ public class TestResourceTrackerService {
|
||||||
req.setHttpPort(1234);
|
req.setHttpPort(1234);
|
||||||
req.setNMVersion(YarnVersionInfo.getVersion());
|
req.setNMVersion(YarnVersionInfo.getVersion());
|
||||||
// trying to register a invalid node.
|
// trying to register a invalid node.
|
||||||
RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req);
|
RegisterNodeManagerResponse response =
|
||||||
Assert.assertEquals(NodeAction.NORMAL,response.getNodeAction());
|
resourceTrackerService.registerNodeManager(req);
|
||||||
|
Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeRegistrationWithLabels() throws Exception {
|
||||||
|
writeToHostsFile("host2");
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
|
||||||
|
hostFile.getAbsolutePath());
|
||||||
|
conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
|
||||||
|
YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE);
|
||||||
|
|
||||||
|
final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager();
|
||||||
|
|
||||||
|
rm = new MockRM(conf) {
|
||||||
|
@Override
|
||||||
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
return nodeLabelsMgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
try {
|
||||||
|
nodeLabelsMgr.addToCluserNodeLabels(toSet("A", "B", "C"));
|
||||||
|
} catch (IOException e) {
|
||||||
|
Assert.fail("Caught Exception while intializing");
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
ResourceTrackerService resourceTrackerService =
|
||||||
|
rm.getResourceTrackerService();
|
||||||
|
RegisterNodeManagerRequest registerReq =
|
||||||
|
Records.newRecord(RegisterNodeManagerRequest.class);
|
||||||
|
NodeId nodeId = NodeId.newInstance("host2", 1234);
|
||||||
|
Resource capability = BuilderUtils.newResource(1024, 1);
|
||||||
|
registerReq.setResource(capability);
|
||||||
|
registerReq.setNodeId(nodeId);
|
||||||
|
registerReq.setHttpPort(1234);
|
||||||
|
registerReq.setNMVersion(YarnVersionInfo.getVersion());
|
||||||
|
registerReq.setNodeLabels(toSet("A"));
|
||||||
|
RegisterNodeManagerResponse response =
|
||||||
|
resourceTrackerService.registerNodeManager(registerReq);
|
||||||
|
|
||||||
|
Assert.assertEquals("Action should be normal on valid Node Labels",
|
||||||
|
NodeAction.NORMAL, response.getNodeAction());
|
||||||
|
assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId),
|
||||||
|
registerReq.getNodeLabels());
|
||||||
|
Assert.assertTrue("Valid Node Labels were not accepted by RM",
|
||||||
|
response.getAreNodeLabelsAcceptedByRM());
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeRegistrationWithInvalidLabels() throws Exception {
|
||||||
|
writeToHostsFile("host2");
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
|
||||||
|
hostFile.getAbsolutePath());
|
||||||
|
conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
|
||||||
|
YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE);
|
||||||
|
|
||||||
|
final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager();
|
||||||
|
|
||||||
|
rm = new MockRM(conf) {
|
||||||
|
@Override
|
||||||
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
return nodeLabelsMgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
try {
|
||||||
|
nodeLabelsMgr.addToCluserNodeLabels(toSet("X", "Y", "Z"));
|
||||||
|
} catch (IOException e) {
|
||||||
|
Assert.fail("Caught Exception while intializing");
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
ResourceTrackerService resourceTrackerService =
|
||||||
|
rm.getResourceTrackerService();
|
||||||
|
RegisterNodeManagerRequest registerReq =
|
||||||
|
Records.newRecord(RegisterNodeManagerRequest.class);
|
||||||
|
NodeId nodeId = NodeId.newInstance("host2", 1234);
|
||||||
|
Resource capability = BuilderUtils.newResource(1024, 1);
|
||||||
|
registerReq.setResource(capability);
|
||||||
|
registerReq.setNodeId(nodeId);
|
||||||
|
registerReq.setHttpPort(1234);
|
||||||
|
registerReq.setNMVersion(YarnVersionInfo.getVersion());
|
||||||
|
registerReq.setNodeLabels(toSet("A", "B", "C"));
|
||||||
|
RegisterNodeManagerResponse response =
|
||||||
|
resourceTrackerService.registerNodeManager(registerReq);
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
"On Invalid Node Labels action is expected to be normal",
|
||||||
|
NodeAction.NORMAL, response.getNodeAction());
|
||||||
|
Assert.assertNull(nodeLabelsMgr.getNodeLabels().get(nodeId));
|
||||||
|
Assert.assertNotNull(response.getDiagnosticsMessage());
|
||||||
|
Assert.assertFalse("Node Labels should not accepted by RM If Invalid",
|
||||||
|
response.getAreNodeLabelsAcceptedByRM());
|
||||||
|
|
||||||
|
if (rm != null) {
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeRegistrationWithInvalidLabelsSyntax() throws Exception {
|
||||||
|
writeToHostsFile("host2");
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
|
||||||
|
hostFile.getAbsolutePath());
|
||||||
|
conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
|
||||||
|
YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE);
|
||||||
|
|
||||||
|
final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager();
|
||||||
|
|
||||||
|
rm = new MockRM(conf) {
|
||||||
|
@Override
|
||||||
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
return nodeLabelsMgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
try {
|
||||||
|
nodeLabelsMgr.addToCluserNodeLabels(toSet("X", "Y", "Z"));
|
||||||
|
} catch (IOException e) {
|
||||||
|
Assert.fail("Caught Exception while intializing");
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
ResourceTrackerService resourceTrackerService =
|
||||||
|
rm.getResourceTrackerService();
|
||||||
|
RegisterNodeManagerRequest req =
|
||||||
|
Records.newRecord(RegisterNodeManagerRequest.class);
|
||||||
|
NodeId nodeId = NodeId.newInstance("host2", 1234);
|
||||||
|
Resource capability = BuilderUtils.newResource(1024, 1);
|
||||||
|
req.setResource(capability);
|
||||||
|
req.setNodeId(nodeId);
|
||||||
|
req.setHttpPort(1234);
|
||||||
|
req.setNMVersion(YarnVersionInfo.getVersion());
|
||||||
|
req.setNodeLabels(toSet("#Y"));
|
||||||
|
RegisterNodeManagerResponse response =
|
||||||
|
resourceTrackerService.registerNodeManager(req);
|
||||||
|
|
||||||
|
Assert.assertEquals(
|
||||||
|
"On Invalid Node Labels action is expected to be normal",
|
||||||
|
NodeAction.NORMAL, response.getNodeAction());
|
||||||
|
Assert.assertNull(nodeLabelsMgr.getNodeLabels().get(nodeId));
|
||||||
|
Assert.assertNotNull(response.getDiagnosticsMessage());
|
||||||
|
Assert.assertFalse("Node Labels should not accepted by RM If Invalid",
|
||||||
|
response.getAreNodeLabelsAcceptedByRM());
|
||||||
|
|
||||||
|
if (rm != null) {
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeRegistrationWithCentralLabelConfig() throws Exception {
|
||||||
|
writeToHostsFile("host2");
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
|
||||||
|
hostFile.getAbsolutePath());
|
||||||
|
conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
|
||||||
|
YarnConfiguration.DEFAULT_NODELABEL_CONFIGURATION_TYPE);
|
||||||
|
|
||||||
|
final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager();
|
||||||
|
|
||||||
|
rm = new MockRM(conf) {
|
||||||
|
@Override
|
||||||
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
return nodeLabelsMgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm.start();
|
||||||
|
try {
|
||||||
|
nodeLabelsMgr.addToCluserNodeLabels(toSet("A", "B", "C"));
|
||||||
|
} catch (IOException e) {
|
||||||
|
Assert.fail("Caught Exception while intializing");
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
ResourceTrackerService resourceTrackerService =
|
||||||
|
rm.getResourceTrackerService();
|
||||||
|
RegisterNodeManagerRequest req =
|
||||||
|
Records.newRecord(RegisterNodeManagerRequest.class);
|
||||||
|
NodeId nodeId = NodeId.newInstance("host2", 1234);
|
||||||
|
Resource capability = BuilderUtils.newResource(1024, 1);
|
||||||
|
req.setResource(capability);
|
||||||
|
req.setNodeId(nodeId);
|
||||||
|
req.setHttpPort(1234);
|
||||||
|
req.setNMVersion(YarnVersionInfo.getVersion());
|
||||||
|
req.setNodeLabels(toSet("A"));
|
||||||
|
RegisterNodeManagerResponse response =
|
||||||
|
resourceTrackerService.registerNodeManager(req);
|
||||||
|
// registered to RM with central label config
|
||||||
|
Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction());
|
||||||
|
Assert.assertNull(nodeLabelsMgr.getNodeLabels().get(nodeId));
|
||||||
|
Assert
|
||||||
|
.assertFalse(
|
||||||
|
"Node Labels should not accepted by RM If its configured with Central configuration",
|
||||||
|
response.getAreNodeLabelsAcceptedByRM());
|
||||||
|
if (rm != null) {
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
private NodeStatus getNodeStatusObject(NodeId nodeId) {
|
||||||
|
NodeStatus status = Records.newRecord(NodeStatus.class);
|
||||||
|
status.setNodeId(nodeId);
|
||||||
|
status.setResponseId(0);
|
||||||
|
status.setContainersStatuses(Collections.EMPTY_LIST);
|
||||||
|
status.setKeepAliveApplications(Collections.EMPTY_LIST);
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeHeartBeatWithLabels() throws Exception {
|
||||||
|
writeToHostsFile("host2");
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
|
||||||
|
hostFile.getAbsolutePath());
|
||||||
|
conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
|
||||||
|
YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE);
|
||||||
|
|
||||||
|
final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager();
|
||||||
|
|
||||||
|
rm = new MockRM(conf) {
|
||||||
|
@Override
|
||||||
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
return nodeLabelsMgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm.start();
|
||||||
|
// adding valid labels
|
||||||
|
try {
|
||||||
|
nodeLabelsMgr.addToCluserNodeLabels(toSet("A", "B", "C"));
|
||||||
|
} catch (IOException e) {
|
||||||
|
Assert.fail("Caught Exception while intializing");
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Registering of labels and other required info to RM
|
||||||
|
ResourceTrackerService resourceTrackerService =
|
||||||
|
rm.getResourceTrackerService();
|
||||||
|
RegisterNodeManagerRequest registerReq =
|
||||||
|
Records.newRecord(RegisterNodeManagerRequest.class);
|
||||||
|
NodeId nodeId = NodeId.newInstance("host2", 1234);
|
||||||
|
Resource capability = BuilderUtils.newResource(1024, 1);
|
||||||
|
registerReq.setResource(capability);
|
||||||
|
registerReq.setNodeId(nodeId);
|
||||||
|
registerReq.setHttpPort(1234);
|
||||||
|
registerReq.setNMVersion(YarnVersionInfo.getVersion());
|
||||||
|
registerReq.setNodeLabels(toSet("A")); // Node register label
|
||||||
|
RegisterNodeManagerResponse registerResponse =
|
||||||
|
resourceTrackerService.registerNodeManager(registerReq);
|
||||||
|
|
||||||
|
// modification of labels during heartbeat
|
||||||
|
NodeHeartbeatRequest heartbeatReq =
|
||||||
|
Records.newRecord(NodeHeartbeatRequest.class);
|
||||||
|
heartbeatReq.setNodeLabels(toSet("B")); // Node heartbeat label update
|
||||||
|
NodeStatus nodeStatusObject = getNodeStatusObject(nodeId);
|
||||||
|
heartbeatReq.setNodeStatus(nodeStatusObject);
|
||||||
|
heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
|
||||||
|
.getNMTokenMasterKey());
|
||||||
|
heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
|
||||||
|
.getContainerTokenMasterKey());
|
||||||
|
NodeHeartbeatResponse nodeHeartbeatResponse =
|
||||||
|
resourceTrackerService.nodeHeartbeat(heartbeatReq);
|
||||||
|
|
||||||
|
Assert.assertEquals("InValid Node Labels were not accepted by RM",
|
||||||
|
NodeAction.NORMAL, nodeHeartbeatResponse.getNodeAction());
|
||||||
|
assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId),
|
||||||
|
heartbeatReq.getNodeLabels());
|
||||||
|
Assert.assertTrue("Valid Node Labels were not accepted by RM",
|
||||||
|
nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM());
|
||||||
|
|
||||||
|
// After modification of labels next heartbeat sends null informing no update
|
||||||
|
Set<String> oldLabels = nodeLabelsMgr.getNodeLabels().get(nodeId);
|
||||||
|
int responseId = nodeStatusObject.getResponseId();
|
||||||
|
heartbeatReq =
|
||||||
|
Records.newRecord(NodeHeartbeatRequest.class);
|
||||||
|
heartbeatReq.setNodeLabels(null); // Node heartbeat label update
|
||||||
|
nodeStatusObject = getNodeStatusObject(nodeId);
|
||||||
|
nodeStatusObject.setResponseId(responseId+2);
|
||||||
|
heartbeatReq.setNodeStatus(nodeStatusObject);
|
||||||
|
heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
|
||||||
|
.getNMTokenMasterKey());
|
||||||
|
heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
|
||||||
|
.getContainerTokenMasterKey());
|
||||||
|
nodeHeartbeatResponse = resourceTrackerService.nodeHeartbeat(heartbeatReq);
|
||||||
|
|
||||||
|
Assert.assertEquals("InValid Node Labels were not accepted by RM",
|
||||||
|
NodeAction.NORMAL, nodeHeartbeatResponse.getNodeAction());
|
||||||
|
assertCollectionEquals(nodeLabelsMgr.getNodeLabels().get(nodeId),
|
||||||
|
oldLabels);
|
||||||
|
Assert.assertFalse("Node Labels should not accepted by RM",
|
||||||
|
nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM());
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeHeartBeatWithInvalidLabels() throws Exception {
|
||||||
|
writeToHostsFile("host2");
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
|
||||||
|
hostFile.getAbsolutePath());
|
||||||
|
conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
|
||||||
|
YarnConfiguration.DISTRIBUTED_NODELABEL_CONFIGURATION_TYPE);
|
||||||
|
|
||||||
|
final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager();
|
||||||
|
|
||||||
|
rm = new MockRM(conf) {
|
||||||
|
@Override
|
||||||
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
return nodeLabelsMgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
try {
|
||||||
|
nodeLabelsMgr.addToCluserNodeLabels(toSet("A", "B", "C"));
|
||||||
|
} catch (IOException e) {
|
||||||
|
Assert.fail("Caught Exception while intializing");
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
ResourceTrackerService resourceTrackerService =
|
||||||
|
rm.getResourceTrackerService();
|
||||||
|
RegisterNodeManagerRequest registerReq =
|
||||||
|
Records.newRecord(RegisterNodeManagerRequest.class);
|
||||||
|
NodeId nodeId = NodeId.newInstance("host2", 1234);
|
||||||
|
Resource capability = BuilderUtils.newResource(1024, 1);
|
||||||
|
registerReq.setResource(capability);
|
||||||
|
registerReq.setNodeId(nodeId);
|
||||||
|
registerReq.setHttpPort(1234);
|
||||||
|
registerReq.setNMVersion(YarnVersionInfo.getVersion());
|
||||||
|
registerReq.setNodeLabels(toSet("A"));
|
||||||
|
RegisterNodeManagerResponse registerResponse =
|
||||||
|
resourceTrackerService.registerNodeManager(registerReq);
|
||||||
|
|
||||||
|
NodeHeartbeatRequest heartbeatReq =
|
||||||
|
Records.newRecord(NodeHeartbeatRequest.class);
|
||||||
|
heartbeatReq.setNodeLabels(toSet("B", "#C")); // Invalid heart beat labels
|
||||||
|
heartbeatReq.setNodeStatus(getNodeStatusObject(nodeId));
|
||||||
|
heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
|
||||||
|
.getNMTokenMasterKey());
|
||||||
|
heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
|
||||||
|
.getContainerTokenMasterKey());
|
||||||
|
NodeHeartbeatResponse nodeHeartbeatResponse =
|
||||||
|
resourceTrackerService.nodeHeartbeat(heartbeatReq);
|
||||||
|
|
||||||
|
// response should be NORMAL when RM heartbeat labels are rejected
|
||||||
|
Assert.assertEquals("Response should be NORMAL when RM heartbeat labels"
|
||||||
|
+ " are rejected", NodeAction.NORMAL,
|
||||||
|
nodeHeartbeatResponse.getNodeAction());
|
||||||
|
Assert.assertFalse(nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM());
|
||||||
|
Assert.assertNotNull(nodeHeartbeatResponse.getDiagnosticsMessage());
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeHeartbeatWithCentralLabelConfig() throws Exception {
|
||||||
|
writeToHostsFile("host2");
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
|
||||||
|
hostFile.getAbsolutePath());
|
||||||
|
conf.set(YarnConfiguration.NODELABEL_CONFIGURATION_TYPE,
|
||||||
|
YarnConfiguration.DEFAULT_NODELABEL_CONFIGURATION_TYPE);
|
||||||
|
|
||||||
|
final RMNodeLabelsManager nodeLabelsMgr = new NullRMNodeLabelsManager();
|
||||||
|
|
||||||
|
rm = new MockRM(conf) {
|
||||||
|
@Override
|
||||||
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
return nodeLabelsMgr;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
ResourceTrackerService resourceTrackerService =
|
||||||
|
rm.getResourceTrackerService();
|
||||||
|
RegisterNodeManagerRequest req =
|
||||||
|
Records.newRecord(RegisterNodeManagerRequest.class);
|
||||||
|
NodeId nodeId = NodeId.newInstance("host2", 1234);
|
||||||
|
Resource capability = BuilderUtils.newResource(1024, 1);
|
||||||
|
req.setResource(capability);
|
||||||
|
req.setNodeId(nodeId);
|
||||||
|
req.setHttpPort(1234);
|
||||||
|
req.setNMVersion(YarnVersionInfo.getVersion());
|
||||||
|
req.setNodeLabels(toSet("A", "B", "C"));
|
||||||
|
RegisterNodeManagerResponse registerResponse =
|
||||||
|
resourceTrackerService.registerNodeManager(req);
|
||||||
|
|
||||||
|
NodeHeartbeatRequest heartbeatReq =
|
||||||
|
Records.newRecord(NodeHeartbeatRequest.class);
|
||||||
|
heartbeatReq.setNodeLabels(toSet("B")); // Valid heart beat labels
|
||||||
|
heartbeatReq.setNodeStatus(getNodeStatusObject(nodeId));
|
||||||
|
heartbeatReq.setLastKnownNMTokenMasterKey(registerResponse
|
||||||
|
.getNMTokenMasterKey());
|
||||||
|
heartbeatReq.setLastKnownContainerTokenMasterKey(registerResponse
|
||||||
|
.getContainerTokenMasterKey());
|
||||||
|
NodeHeartbeatResponse nodeHeartbeatResponse =
|
||||||
|
resourceTrackerService.nodeHeartbeat(heartbeatReq);
|
||||||
|
|
||||||
|
// response should be ok but the RMacceptNodeLabelsUpdate should be false
|
||||||
|
Assert.assertEquals(NodeAction.NORMAL,
|
||||||
|
nodeHeartbeatResponse.getNodeAction());
|
||||||
|
// no change in the labels,
|
||||||
|
Assert.assertNull(nodeLabelsMgr.getNodeLabels().get(nodeId));
|
||||||
|
// heartbeat labels rejected
|
||||||
|
Assert.assertFalse("Invalid Node Labels should not accepted by RM",
|
||||||
|
nodeHeartbeatResponse.getAreNodeLabelsAcceptedByRM());
|
||||||
|
if (rm != null) {
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue