MAPREDUCE-2775. Fixed ResourceManager and NodeManager to force a decommissioned node to shutdown. Contributed by Devaraj K.
svn merge -c r1190467 --ignore-ancestry ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1190468 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6d1cf4e956
commit
e20f1826f3
|
@ -1786,6 +1786,9 @@ Release 0.23.0 - Unreleased
|
||||||
|
|
||||||
MAPREDUCE-3296. Fixed the remaining nine FindBugs warnings. (vinodkv)
|
MAPREDUCE-3296. Fixed the remaining nine FindBugs warnings. (vinodkv)
|
||||||
|
|
||||||
|
MAPREDUCE-2775. Fixed ResourceManager and NodeManager to force a
|
||||||
|
decommissioned node to shutdown. (Devaraj K via vinodkv)
|
||||||
|
|
||||||
Release 0.22.0 - Unreleased
|
Release 0.22.0 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -24,7 +24,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
|
||||||
public interface HeartbeatResponse {
|
public interface HeartbeatResponse {
|
||||||
int getResponseId();
|
int getResponseId();
|
||||||
boolean getReboot();
|
NodeAction getNodeAction();
|
||||||
|
|
||||||
List<ContainerId> getContainersToCleanupList();
|
List<ContainerId> getContainersToCleanupList();
|
||||||
ContainerId getContainerToCleanup(int index);
|
ContainerId getContainerToCleanup(int index);
|
||||||
|
@ -35,7 +35,7 @@ public interface HeartbeatResponse {
|
||||||
int getApplicationsToCleanupCount();
|
int getApplicationsToCleanupCount();
|
||||||
|
|
||||||
void setResponseId(int responseId);
|
void setResponseId(int responseId);
|
||||||
void setReboot(boolean reboot);
|
void setNodeAction(NodeAction action);
|
||||||
|
|
||||||
void addAllContainersToCleanup(List<ContainerId> containers);
|
void addAllContainersToCleanup(List<ContainerId> containers);
|
||||||
void addContainerToCleanup(ContainerId container);
|
void addContainerToCleanup(ContainerId container);
|
||||||
|
|
|
@ -0,0 +1,28 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.api.records;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The NodeManager is instructed to perform the given action.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
public enum NodeAction {
|
||||||
|
NORMAL, REBOOT, SHUTDOWN
|
||||||
|
}
|
|
@ -23,4 +23,8 @@ public interface RegistrationResponse {
|
||||||
public abstract ByteBuffer getSecretKey();
|
public abstract ByteBuffer getSecretKey();
|
||||||
|
|
||||||
public abstract void setSecretKey(ByteBuffer secretKey);
|
public abstract void setSecretKey(ByteBuffer secretKey);
|
||||||
|
|
||||||
|
public abstract NodeAction getNodeAction();
|
||||||
|
|
||||||
|
public abstract void setNodeAction(NodeAction nodeAction);
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,11 +32,12 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProtoOrBuilder;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
|
||||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
|
|
||||||
|
public class HeartbeatResponsePBImpl extends
|
||||||
|
ProtoBase<HeartbeatResponseProto> implements HeartbeatResponse {
|
||||||
public class HeartbeatResponsePBImpl extends ProtoBase<HeartbeatResponseProto> implements HeartbeatResponse {
|
|
||||||
HeartbeatResponseProto proto = HeartbeatResponseProto.getDefaultInstance();
|
HeartbeatResponseProto proto = HeartbeatResponseProto.getDefaultInstance();
|
||||||
HeartbeatResponseProto.Builder builder = null;
|
HeartbeatResponseProto.Builder builder = null;
|
||||||
boolean viaProto = false;
|
boolean viaProto = false;
|
||||||
|
@ -100,16 +101,24 @@ public class HeartbeatResponsePBImpl extends ProtoBase<HeartbeatResponseProto> i
|
||||||
builder.setResponseId((responseId));
|
builder.setResponseId((responseId));
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
public boolean getReboot() {
|
public NodeAction getNodeAction() {
|
||||||
HeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
HeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
return (p.getReboot());
|
if(!p.hasNodeAction()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return (convertFromProtoFormat(p.getNodeAction()));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setReboot(boolean reboot) {
|
public void setNodeAction(NodeAction nodeAction) {
|
||||||
maybeInitBuilder();
|
maybeInitBuilder();
|
||||||
builder.setReboot((reboot));
|
if (nodeAction == null) {
|
||||||
|
builder.clearNodeAction();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
builder.setNodeAction(convertToProtoFormat(nodeAction));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public List<ContainerId> getContainersToCleanupList() {
|
public List<ContainerId> getContainersToCleanupList() {
|
||||||
initContainersToCleanup();
|
initContainersToCleanup();
|
||||||
|
@ -296,7 +305,12 @@ public class HeartbeatResponsePBImpl extends ProtoBase<HeartbeatResponseProto> i
|
||||||
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
|
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
|
||||||
return ((ApplicationIdPBImpl)t).getProto();
|
return ((ApplicationIdPBImpl)t).getProto();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private NodeAction convertFromProtoFormat(NodeActionProto p) {
|
||||||
|
return NodeAction.valueOf(p.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
private NodeActionProto convertToProtoFormat(NodeAction t) {
|
||||||
|
return NodeActionProto.valueOf(t.name());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,17 +21,15 @@ package org.apache.hadoop.yarn.server.api.records.impl.pb;
|
||||||
|
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
|
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.RegistrationResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.RegistrationResponseProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.RegistrationResponseProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.RegistrationResponseProtoOrBuilder;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
||||||
|
|
||||||
|
public class RegistrationResponsePBImpl extends
|
||||||
|
ProtoBase<RegistrationResponseProto> implements RegistrationResponse {
|
||||||
public class RegistrationResponsePBImpl extends ProtoBase<RegistrationResponseProto> implements RegistrationResponse {
|
|
||||||
RegistrationResponseProto proto = RegistrationResponseProto.getDefaultInstance();
|
RegistrationResponseProto proto = RegistrationResponseProto.getDefaultInstance();
|
||||||
RegistrationResponseProto.Builder builder = null;
|
RegistrationResponseProto.Builder builder = null;
|
||||||
boolean viaProto = false;
|
boolean viaProto = false;
|
||||||
|
@ -98,4 +96,31 @@ public class RegistrationResponsePBImpl extends ProtoBase<RegistrationResponsePr
|
||||||
this.secretKey = secretKey;
|
this.secretKey = secretKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NodeAction getNodeAction() {
|
||||||
|
RegistrationResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if(!p.hasNodeAction()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return convertFromProtoFormat(p.getNodeAction());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setNodeAction(NodeAction nodeAction) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (nodeAction == null) {
|
||||||
|
builder.clearNodeAction();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
builder.setNodeAction(convertToProtoFormat(nodeAction));
|
||||||
|
}
|
||||||
|
|
||||||
|
private NodeAction convertFromProtoFormat(NodeActionProto p) {
|
||||||
|
return NodeAction.valueOf(p.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
private NodeActionProto convertToProtoFormat(NodeAction t) {
|
||||||
|
return NodeActionProto.valueOf(t.name());
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,12 @@ option java_generate_equals_and_hash = true;
|
||||||
|
|
||||||
import "yarn_protos.proto";
|
import "yarn_protos.proto";
|
||||||
|
|
||||||
|
enum NodeActionProto {
|
||||||
|
NORMAL = 0;
|
||||||
|
REBOOT = 1;
|
||||||
|
SHUTDOWN = 2;
|
||||||
|
}
|
||||||
|
|
||||||
message NodeStatusProto {
|
message NodeStatusProto {
|
||||||
optional NodeIdProto node_id = 1;
|
optional NodeIdProto node_id = 1;
|
||||||
optional int32 response_id = 2;
|
optional int32 response_id = 2;
|
||||||
|
@ -32,11 +38,12 @@ message NodeStatusProto {
|
||||||
|
|
||||||
message RegistrationResponseProto {
|
message RegistrationResponseProto {
|
||||||
optional bytes secret_key = 1;
|
optional bytes secret_key = 1;
|
||||||
|
optional NodeActionProto nodeAction = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
message HeartbeatResponseProto {
|
message HeartbeatResponseProto {
|
||||||
optional int32 response_id = 1;
|
optional int32 response_id = 1;
|
||||||
optional bool reboot = 2;
|
optional NodeActionProto nodeAction = 2;
|
||||||
repeated ContainerIdProto containers_to_cleanup = 3;
|
repeated ContainerIdProto containers_to_cleanup = 3;
|
||||||
repeated ApplicationIdProto applications_to_cleanup = 4;
|
repeated ApplicationIdProto applications_to_cleanup = 4;
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,9 +50,11 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.service.CompositeService;
|
import org.apache.hadoop.yarn.service.CompositeService;
|
||||||
import org.apache.hadoop.yarn.service.Service;
|
import org.apache.hadoop.yarn.service.Service;
|
||||||
|
import org.apache.hadoop.yarn.service.ServiceStateChangeListener;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
public class NodeManager extends CompositeService {
|
public class NodeManager extends CompositeService implements
|
||||||
|
ServiceStateChangeListener {
|
||||||
private static final Log LOG = LogFactory.getLog(NodeManager.class);
|
private static final Log LOG = LogFactory.getLog(NodeManager.class);
|
||||||
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
|
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
|
||||||
protected ContainerTokenSecretManager containerTokenSecretManager;
|
protected ContainerTokenSecretManager containerTokenSecretManager;
|
||||||
|
@ -123,6 +125,8 @@ public class NodeManager extends CompositeService {
|
||||||
NodeStatusUpdater nodeStatusUpdater =
|
NodeStatusUpdater nodeStatusUpdater =
|
||||||
createNodeStatusUpdater(context, dispatcher, healthChecker,
|
createNodeStatusUpdater(context, dispatcher, healthChecker,
|
||||||
this.containerTokenSecretManager);
|
this.containerTokenSecretManager);
|
||||||
|
|
||||||
|
nodeStatusUpdater.register(this);
|
||||||
|
|
||||||
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
|
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
|
||||||
addService(nodeResourceMonitor);
|
addService(nodeResourceMonitor);
|
||||||
|
@ -206,6 +210,16 @@ public class NodeManager extends CompositeService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void stateChanged(Service service) {
|
||||||
|
// Shutdown the Nodemanager when the NodeStatusUpdater is stopped.
|
||||||
|
if (NodeStatusUpdaterImpl.class.getName().equals(service.getName())
|
||||||
|
&& STATE.STOPPED.equals(service.getServiceState())) {
|
||||||
|
stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
|
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
|
||||||
try {
|
try {
|
||||||
|
@ -220,5 +234,4 @@ public class NodeManager extends CompositeService {
|
||||||
System.exit(-1);
|
System.exit(-1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,8 +30,8 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.NodeHealthCheckerService;
|
import org.apache.hadoop.NodeHealthCheckerService;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.SecurityInfo;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.YarnException;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
|
@ -45,11 +45,11 @@ import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
import org.apache.hadoop.yarn.server.RMNMSecurityInfoClass;
|
|
||||||
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
@ -160,6 +160,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
request.setNodeId(this.nodeId);
|
request.setNodeId(this.nodeId);
|
||||||
RegistrationResponse regResponse =
|
RegistrationResponse regResponse =
|
||||||
this.resourceTracker.registerNodeManager(request).getRegistrationResponse();
|
this.resourceTracker.registerNodeManager(request).getRegistrationResponse();
|
||||||
|
// if the Resourcemanager instructs NM to shutdown.
|
||||||
|
if (NodeAction.SHUTDOWN.equals(regResponse.getNodeAction())) {
|
||||||
|
throw new YarnException(
|
||||||
|
"Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
|
||||||
|
}
|
||||||
|
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
this.secretKeyBytes = regResponse.getSecretKey().array();
|
this.secretKeyBytes = regResponse.getSecretKey().array();
|
||||||
}
|
}
|
||||||
|
@ -248,10 +254,25 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
NodeStatus nodeStatus = getNodeStatus();
|
NodeStatus nodeStatus = getNodeStatus();
|
||||||
nodeStatus.setResponseId(lastHeartBeatID);
|
nodeStatus.setResponseId(lastHeartBeatID);
|
||||||
|
|
||||||
NodeHeartbeatRequest request = recordFactory.newRecordInstance(NodeHeartbeatRequest.class);
|
NodeHeartbeatRequest request = recordFactory
|
||||||
|
.newRecordInstance(NodeHeartbeatRequest.class);
|
||||||
request.setNodeStatus(nodeStatus);
|
request.setNodeStatus(nodeStatus);
|
||||||
HeartbeatResponse response =
|
HeartbeatResponse response =
|
||||||
resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
|
resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
|
||||||
|
if (response.getNodeAction() == NodeAction.SHUTDOWN) {
|
||||||
|
LOG
|
||||||
|
.info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," +
|
||||||
|
" hence shutting down.");
|
||||||
|
NodeStatusUpdaterImpl.this.stop();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (response.getNodeAction() == NodeAction.REBOOT) {
|
||||||
|
LOG.info("Node is out of sync with ResourceManager,"
|
||||||
|
+ " hence shutting down.");
|
||||||
|
NodeStatusUpdaterImpl.this.stop();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
lastHeartBeatID = response.getResponseId();
|
lastHeartBeatID = response.getResponseId();
|
||||||
List<ContainerId> containersToCleanup = response
|
List<ContainerId> containersToCleanup = response
|
||||||
.getContainersToCleanupList();
|
.getContainersToCleanupList();
|
||||||
|
@ -269,7 +290,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
// TODO Better error handling. Thread can die with the rest of the
|
// TODO Better error handling. Thread can die with the rest of the
|
||||||
// NM still running.
|
// NM still running.
|
||||||
LOG.error("Caught exception in status-updater", e);
|
LOG.error("Caught exception in status-updater", e);
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||||
|
@ -85,10 +86,15 @@ public class TestNodeStatusUpdater {
|
||||||
volatile Error nmStartError = null;
|
volatile Error nmStartError = null;
|
||||||
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
|
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
|
||||||
private final Configuration conf = new YarnConfiguration();
|
private final Configuration conf = new YarnConfiguration();
|
||||||
|
private NodeManager nm;
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() {
|
public void tearDown() {
|
||||||
this.registeredNodes.clear();
|
this.registeredNodes.clear();
|
||||||
|
heartBeatID = 0;
|
||||||
|
if (nm != null) {
|
||||||
|
nm.stop();
|
||||||
|
}
|
||||||
DefaultMetricsSystem.shutdown();
|
DefaultMetricsSystem.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -220,6 +226,7 @@ public class TestNodeStatusUpdater {
|
||||||
}
|
}
|
||||||
|
|
||||||
private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
|
private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
|
||||||
|
public ResourceTracker resourceTracker = new MyResourceTracker(this.context);
|
||||||
private Context context;
|
private Context context;
|
||||||
|
|
||||||
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
|
public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
|
||||||
|
@ -232,10 +239,44 @@ public class TestNodeStatusUpdater {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ResourceTracker getRMClient() {
|
protected ResourceTracker getRMClient() {
|
||||||
return new MyResourceTracker(this.context);
|
return resourceTracker;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//
|
||||||
|
private class MyResourceTracker2 implements ResourceTracker {
|
||||||
|
public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
|
||||||
|
public NodeAction registerNodeAction = NodeAction.NORMAL;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RegisterNodeManagerResponse registerNodeManager(
|
||||||
|
RegisterNodeManagerRequest request) throws YarnRemoteException {
|
||||||
|
|
||||||
|
RegisterNodeManagerResponse response = recordFactory
|
||||||
|
.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||||
|
RegistrationResponse regResponse = recordFactory
|
||||||
|
.newRecordInstance(RegistrationResponse.class);
|
||||||
|
regResponse.setNodeAction(registerNodeAction );
|
||||||
|
response.setRegistrationResponse(regResponse);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
@Override
|
||||||
|
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
||||||
|
throws YarnRemoteException {
|
||||||
|
NodeStatus nodeStatus = request.getNodeStatus();
|
||||||
|
nodeStatus.setResponseId(heartBeatID++);
|
||||||
|
HeartbeatResponse response = recordFactory
|
||||||
|
.newRecordInstance(HeartbeatResponse.class);
|
||||||
|
response.setResponseId(heartBeatID);
|
||||||
|
response.setNodeAction(heartBeatNodeAction);
|
||||||
|
|
||||||
|
NodeHeartbeatResponse nhResponse = recordFactory
|
||||||
|
.newRecordInstance(NodeHeartbeatResponse.class);
|
||||||
|
nhResponse.setHeartbeatResponse(response);
|
||||||
|
return nhResponse;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void clearError() {
|
public void clearError() {
|
||||||
nmStartError = null;
|
nmStartError = null;
|
||||||
|
@ -249,7 +290,7 @@ public class TestNodeStatusUpdater {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNMRegistration() throws InterruptedException {
|
public void testNMRegistration() throws InterruptedException {
|
||||||
final NodeManager nm = new NodeManager() {
|
nm = new NodeManager() {
|
||||||
@Override
|
@Override
|
||||||
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
||||||
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
|
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
|
||||||
|
@ -295,14 +336,85 @@ public class TestNodeStatusUpdater {
|
||||||
Assert.fail("NodeManager failed to start");
|
Assert.fail("NodeManager failed to start");
|
||||||
}
|
}
|
||||||
|
|
||||||
while (heartBeatID <= 3) {
|
waitCount = 0;
|
||||||
|
while (heartBeatID <= 3 && waitCount++ != 20) {
|
||||||
Thread.sleep(500);
|
Thread.sleep(500);
|
||||||
}
|
}
|
||||||
|
Assert.assertFalse(heartBeatID <= 3);
|
||||||
Assert.assertEquals("Number of registered NMs is wrong!!", 1,
|
Assert.assertEquals("Number of registered NMs is wrong!!", 1,
|
||||||
this.registeredNodes.size());
|
this.registeredNodes.size());
|
||||||
|
|
||||||
nm.stop();
|
nm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeDecommision() throws Exception {
|
||||||
|
nm = getNodeManager(NodeAction.SHUTDOWN);
|
||||||
|
YarnConfiguration conf = createNMConfig();
|
||||||
|
nm.init(conf);
|
||||||
|
Assert.assertEquals(STATE.INITED, nm.getServiceState());
|
||||||
|
nm.start();
|
||||||
|
|
||||||
|
int waitCount = 0;
|
||||||
|
while (heartBeatID < 1 && waitCount++ != 20) {
|
||||||
|
Thread.sleep(500);
|
||||||
|
}
|
||||||
|
Assert.assertFalse(heartBeatID < 1);
|
||||||
|
|
||||||
|
// NM takes a while to reach the STOPPED state.
|
||||||
|
waitCount = 0;
|
||||||
|
while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
|
||||||
|
LOG.info("Waiting for NM to stop..");
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeReboot() throws Exception {
|
||||||
|
nm = getNodeManager(NodeAction.REBOOT);
|
||||||
|
YarnConfiguration conf = createNMConfig();
|
||||||
|
nm.init(conf);
|
||||||
|
Assert.assertEquals(STATE.INITED, nm.getServiceState());
|
||||||
|
nm.start();
|
||||||
|
|
||||||
|
int waitCount = 0;
|
||||||
|
while (heartBeatID < 1 && waitCount++ != 20) {
|
||||||
|
Thread.sleep(500);
|
||||||
|
}
|
||||||
|
Assert.assertFalse(heartBeatID < 1);
|
||||||
|
|
||||||
|
// NM takes a while to reach the STOPPED state.
|
||||||
|
waitCount = 0;
|
||||||
|
while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
|
||||||
|
LOG.info("Waiting for NM to stop..");
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNMShutdownForRegistrationFailure() {
|
||||||
|
|
||||||
|
nm = new NodeManager() {
|
||||||
|
@Override
|
||||||
|
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
||||||
|
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
|
||||||
|
ContainerTokenSecretManager containerTokenSecretManager) {
|
||||||
|
MyNodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater(
|
||||||
|
context, dispatcher, healthChecker, metrics,
|
||||||
|
containerTokenSecretManager);
|
||||||
|
MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
|
||||||
|
myResourceTracker2.registerNodeAction = NodeAction.SHUTDOWN;
|
||||||
|
nodeStatusUpdater.resourceTracker = myResourceTracker2;
|
||||||
|
return nodeStatusUpdater;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
verifyNodeStartFailure("org.apache.hadoop.yarn.YarnException: "
|
||||||
|
+ "Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verifies that if for some reason NM fails to start ContainerManager RPC
|
* Verifies that if for some reason NM fails to start ContainerManager RPC
|
||||||
|
@ -314,7 +426,7 @@ public class TestNodeStatusUpdater {
|
||||||
@Test
|
@Test
|
||||||
public void testNoRegistrationWhenNMServicesFail() {
|
public void testNoRegistrationWhenNMServicesFail() {
|
||||||
|
|
||||||
final NodeManager nm = new NodeManager() {
|
nm = new NodeManager() {
|
||||||
@Override
|
@Override
|
||||||
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
||||||
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
|
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
|
||||||
|
@ -341,16 +453,22 @@ public class TestNodeStatusUpdater {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
verifyNodeStartFailure("Starting of RPC Server failed");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void verifyNodeStartFailure(String errMessage) {
|
||||||
YarnConfiguration conf = createNMConfig();
|
YarnConfiguration conf = createNMConfig();
|
||||||
nm.init(conf);
|
nm.init(conf);
|
||||||
try {
|
try {
|
||||||
nm.start();
|
nm.start();
|
||||||
Assert.fail("NM should have failed to start. Didn't get exception!!");
|
Assert.fail("NM should have failed to start. Didn't get exception!!");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Assert.assertEquals("Starting of RPC Server failed", e.getCause()
|
Assert.assertEquals(errMessage, e.getCause()
|
||||||
.getMessage());
|
.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// the state change to stopped occurs only if the startup is success, else
|
||||||
|
// state change doesn't occur
|
||||||
Assert.assertEquals("NM state is wrong!", Service.STATE.INITED, nm
|
Assert.assertEquals("NM state is wrong!", Service.STATE.INITED, nm
|
||||||
.getServiceState());
|
.getServiceState());
|
||||||
|
|
||||||
|
@ -371,4 +489,21 @@ public class TestNodeStatusUpdater {
|
||||||
.toUri().getPath());
|
.toUri().getPath());
|
||||||
return conf;
|
return conf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) {
|
||||||
|
return new NodeManager() {
|
||||||
|
@Override
|
||||||
|
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
|
||||||
|
Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
|
||||||
|
ContainerTokenSecretManager containerTokenSecretManager) {
|
||||||
|
MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater(
|
||||||
|
context, dispatcher, healthChecker, metrics,
|
||||||
|
containerTokenSecretManager);
|
||||||
|
MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
|
||||||
|
myResourceTracker2.heartBeatNodeAction = nodeHeartBeatAction;
|
||||||
|
myNodeStatusUpdater.resourceTracker = myResourceTracker2;
|
||||||
|
return myNodeStatusUpdater;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,128 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsInfo;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||||
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@Metrics(context="yarn")
|
||||||
|
public class ClusterMetrics {
|
||||||
|
|
||||||
|
@Metric("# of NMs") MutableGaugeInt numNMs;
|
||||||
|
@Metric("# of decommissioned NMs") MutableCounterInt numDecommissionedNMs;
|
||||||
|
@Metric("# of lost NMs") MutableCounterInt numLostNMs;
|
||||||
|
@Metric("# of unhealthy NMs") MutableGaugeInt numUnhealthyNMs;
|
||||||
|
@Metric("# of Rebooted NMs") MutableGaugeInt numRebootedNMs;
|
||||||
|
|
||||||
|
private static final MetricsInfo RECORD_INFO = info("ClusterMetrics",
|
||||||
|
"Metrics for the Yarn Cluster");
|
||||||
|
|
||||||
|
private static volatile ClusterMetrics INSTANCE = null;
|
||||||
|
private static MetricsRegistry registry;
|
||||||
|
|
||||||
|
public static ClusterMetrics getMetrics() {
|
||||||
|
if(INSTANCE == null){
|
||||||
|
synchronized (ClusterMetrics.class) {
|
||||||
|
if(INSTANCE == null){
|
||||||
|
INSTANCE = new ClusterMetrics();
|
||||||
|
registerMetrics();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return INSTANCE;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void registerMetrics() {
|
||||||
|
registry = new MetricsRegistry(RECORD_INFO);
|
||||||
|
registry.tag(RECORD_INFO, "ResourceManager");
|
||||||
|
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||||
|
if (ms != null) {
|
||||||
|
ms.register("ClusterMetrics", "Metrics for the Yarn Cluster", INSTANCE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//Total Nodemanagers
|
||||||
|
public int getNumNMs() {
|
||||||
|
return numNMs.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
//Decommisioned NMs
|
||||||
|
public int getNumDecommisionedNMs() {
|
||||||
|
return numDecommissionedNMs.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrDecommisionedNMs() {
|
||||||
|
numDecommissionedNMs.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
//Lost NMs
|
||||||
|
public int getNumLostNMs() {
|
||||||
|
return numLostNMs.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrNumLostNMs() {
|
||||||
|
numLostNMs.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
//Unhealthy NMs
|
||||||
|
public int getUnhealthyNMs() {
|
||||||
|
return numUnhealthyNMs.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrNumUnhealthyNMs() {
|
||||||
|
numUnhealthyNMs.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void decrNumUnhealthyNMs() {
|
||||||
|
numUnhealthyNMs.decr();
|
||||||
|
}
|
||||||
|
|
||||||
|
//Rebooted NMs
|
||||||
|
public int getNumRebootedNMs() {
|
||||||
|
return numRebootedNMs.value();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void incrNumRebootedNMs() {
|
||||||
|
numRebootedNMs.incr();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeNode(RMNodeEventType nodeEventType) {
|
||||||
|
numNMs.decr();
|
||||||
|
switch(nodeEventType){
|
||||||
|
case DECOMMISSION: incrDecommisionedNMs(); break;
|
||||||
|
case EXPIRE: incrNumLostNMs();break;
|
||||||
|
case REBOOTING: incrNumRebootedNMs();break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addNode() {
|
||||||
|
numNMs.incr();
|
||||||
|
}
|
||||||
|
}
|
|
@ -17,7 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
|
|
||||||
|
@ -37,7 +36,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||||
|
@ -45,6 +43,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
|
||||||
|
@ -76,11 +75,19 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
|
|
||||||
private static final NodeHeartbeatResponse reboot = recordFactory
|
private static final NodeHeartbeatResponse reboot = recordFactory
|
||||||
.newRecordInstance(NodeHeartbeatResponse.class);
|
.newRecordInstance(NodeHeartbeatResponse.class);
|
||||||
|
private static final NodeHeartbeatResponse shutDown = recordFactory
|
||||||
|
.newRecordInstance(NodeHeartbeatResponse.class);
|
||||||
|
|
||||||
static {
|
static {
|
||||||
HeartbeatResponse rebootResp = recordFactory
|
HeartbeatResponse rebootResp = recordFactory
|
||||||
.newRecordInstance(HeartbeatResponse.class);
|
.newRecordInstance(HeartbeatResponse.class);
|
||||||
rebootResp.setReboot(true);
|
rebootResp.setNodeAction(NodeAction.REBOOT);
|
||||||
reboot.setHeartbeatResponse(rebootResp);
|
reboot.setHeartbeatResponse(rebootResp);
|
||||||
|
|
||||||
|
HeartbeatResponse decommissionedResp = recordFactory
|
||||||
|
.newRecordInstance(HeartbeatResponse.class);
|
||||||
|
decommissionedResp.setNodeAction(NodeAction.SHUTDOWN);
|
||||||
|
shutDown.setHeartbeatResponse(decommissionedResp);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ResourceTrackerService(RMContext rmContext,
|
public ResourceTrackerService(RMContext rmContext,
|
||||||
|
@ -139,6 +146,7 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
super.stop();
|
super.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public RegisterNodeManagerResponse registerNodeManager(
|
public RegisterNodeManagerResponse registerNodeManager(
|
||||||
RegisterNodeManagerRequest request) throws YarnRemoteException {
|
RegisterNodeManagerRequest request) throws YarnRemoteException {
|
||||||
|
@ -149,121 +157,125 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
int httpPort = request.getHttpPort();
|
int httpPort = request.getHttpPort();
|
||||||
Resource capability = request.getResource();
|
Resource capability = request.getResource();
|
||||||
|
|
||||||
try {
|
RegisterNodeManagerResponse response = recordFactory
|
||||||
// Check if this node is a 'valid' node
|
.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||||
if (!this.nodesListManager.isValidNode(host)) {
|
RegistrationResponse regResponse = recordFactory
|
||||||
LOG.info("Disallowed NodeManager from " + host);
|
.newRecordInstance(RegistrationResponse.class);
|
||||||
throw new IOException("Disallowed NodeManager from " + host);
|
SecretKey secretKey = this.containerTokenSecretManager
|
||||||
}
|
.createAndGetSecretKey(nodeId.toString());
|
||||||
|
regResponse.setSecretKey(ByteBuffer.wrap(secretKey.getEncoded()));
|
||||||
|
|
||||||
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort,
|
// Check if this node is a 'valid' node
|
||||||
httpPort, resolve(host), capability);
|
if (!this.nodesListManager.isValidNode(host)) {
|
||||||
|
LOG.info("Disallowed NodeManager from " + host
|
||||||
if (this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode) != null) {
|
+ ", Sending SHUTDOWN signal to the NodeManager.");
|
||||||
throw new IOException("Duplicate registration from the node!");
|
regResponse.setNodeAction(NodeAction.SHUTDOWN);
|
||||||
}
|
|
||||||
|
|
||||||
this.nmLivelinessMonitor.register(nodeId);
|
|
||||||
|
|
||||||
LOG.info("NodeManager from node " + host +
|
|
||||||
"(cmPort: " + cmPort + " httpPort: " + httpPort + ") "
|
|
||||||
+ "registered with capability: " + capability.getMemory()
|
|
||||||
+ ", assigned nodeId " + nodeId);
|
|
||||||
|
|
||||||
RegistrationResponse regResponse = recordFactory.newRecordInstance(
|
|
||||||
RegistrationResponse.class);
|
|
||||||
SecretKey secretKey = this.containerTokenSecretManager
|
|
||||||
.createAndGetSecretKey(nodeId.toString());
|
|
||||||
regResponse.setSecretKey(ByteBuffer.wrap(secretKey.getEncoded()));
|
|
||||||
|
|
||||||
RegisterNodeManagerResponse response = recordFactory
|
|
||||||
.newRecordInstance(RegisterNodeManagerResponse.class);
|
|
||||||
response.setRegistrationResponse(regResponse);
|
response.setRegistrationResponse(regResponse);
|
||||||
return response;
|
return response;
|
||||||
} catch (IOException ioe) {
|
|
||||||
LOG.info("Exception in node registration from " + nodeId.getHost(), ioe);
|
|
||||||
throw RPCUtil.getRemoteException(ioe);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
|
||||||
|
resolve(host), capability);
|
||||||
|
|
||||||
|
if (this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode) != null) {
|
||||||
|
LOG.info("Duplicate registration from the node at: " + host
|
||||||
|
+ ", Sending SHUTDOWN Signal to the NodeManager");
|
||||||
|
regResponse.setNodeAction(NodeAction.SHUTDOWN);
|
||||||
|
response.setRegistrationResponse(regResponse);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||||
|
new RMNodeEvent(nodeId, RMNodeEventType.STARTED));
|
||||||
|
|
||||||
|
this.nmLivelinessMonitor.register(nodeId);
|
||||||
|
|
||||||
|
LOG.info("NodeManager from node " + host + "(cmPort: " + cmPort
|
||||||
|
+ " httpPort: " + httpPort + ") " + "registered with capability: "
|
||||||
|
+ capability.getMemory() + ", assigned nodeId " + nodeId);
|
||||||
|
|
||||||
|
regResponse.setNodeAction(NodeAction.NORMAL);
|
||||||
|
response.setRegistrationResponse(regResponse);
|
||||||
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
|
||||||
throws YarnRemoteException {
|
throws YarnRemoteException {
|
||||||
|
|
||||||
NodeStatus remoteNodeStatus = request.getNodeStatus();
|
NodeStatus remoteNodeStatus = request.getNodeStatus();
|
||||||
try {
|
/**
|
||||||
/**
|
* Here is the node heartbeat sequence...
|
||||||
* Here is the node heartbeat sequence...
|
* 1. Check if it's a registered node
|
||||||
* 1. Check if it's a registered node
|
* 2. Check if it's a valid (i.e. not excluded) node
|
||||||
* 2. Check if it's a valid (i.e. not excluded) node
|
* 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
|
||||||
* 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
|
* 4. Send healthStatus to RMNode
|
||||||
* 4. Send healthStatus to RMNode
|
*/
|
||||||
*/
|
|
||||||
|
NodeId nodeId = remoteNodeStatus.getNodeId();
|
||||||
|
|
||||||
|
// 1. Check if it's a registered node
|
||||||
|
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
|
||||||
|
if (rmNode == null) {
|
||||||
|
/* node does not exist */
|
||||||
|
LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
|
||||||
|
|
||||||
NodeId nodeId = remoteNodeStatus.getNodeId();
|
// Updating the metrics directly as reboot event cannot be
|
||||||
|
// triggered on a null rmNode
|
||||||
// 1. Check if it's a registered node
|
ClusterMetrics.getMetrics().incrNumRebootedNMs();
|
||||||
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
|
return reboot;
|
||||||
if (rmNode == null) {
|
|
||||||
/* node does not exist */
|
|
||||||
LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
|
|
||||||
return reboot;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send ping
|
|
||||||
this.nmLivelinessMonitor.receivedPing(nodeId);
|
|
||||||
|
|
||||||
// 2. Check if it's a valid (i.e. not excluded) node
|
|
||||||
if (!this.nodesListManager.isValidNode(rmNode.getHostName())) {
|
|
||||||
LOG.info("Disallowed NodeManager nodeId: " + nodeId +
|
|
||||||
" hostname: " + rmNode.getNodeAddress());
|
|
||||||
throw new IOException("Disallowed NodeManager nodeId: " +
|
|
||||||
remoteNodeStatus.getNodeId());
|
|
||||||
}
|
|
||||||
|
|
||||||
NodeHeartbeatResponse nodeHeartBeatResponse = recordFactory
|
|
||||||
.newRecordInstance(NodeHeartbeatResponse.class);
|
|
||||||
|
|
||||||
// 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
|
|
||||||
HeartbeatResponse lastHeartbeatResponse = rmNode
|
|
||||||
.getLastHeartBeatResponse();
|
|
||||||
if (remoteNodeStatus.getResponseId() + 1 == lastHeartbeatResponse
|
|
||||||
.getResponseId()) {
|
|
||||||
LOG.info("Received duplicate heartbeat from node " +
|
|
||||||
rmNode.getNodeAddress());
|
|
||||||
nodeHeartBeatResponse.setHeartbeatResponse(lastHeartbeatResponse);
|
|
||||||
return nodeHeartBeatResponse;
|
|
||||||
} else if (remoteNodeStatus.getResponseId() + 1 < lastHeartbeatResponse
|
|
||||||
.getResponseId()) {
|
|
||||||
LOG.info("Too far behind rm response id:" +
|
|
||||||
lastHeartbeatResponse.getResponseId() + " nm response id:"
|
|
||||||
+ remoteNodeStatus.getResponseId());
|
|
||||||
// TODO: Just sending reboot is not enough. Think more.
|
|
||||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
|
||||||
new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
|
|
||||||
return reboot;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Heartbeat response
|
|
||||||
HeartbeatResponse latestResponse = recordFactory
|
|
||||||
.newRecordInstance(HeartbeatResponse.class);
|
|
||||||
latestResponse
|
|
||||||
.setResponseId(lastHeartbeatResponse.getResponseId() + 1);
|
|
||||||
latestResponse.addAllContainersToCleanup(rmNode.pullContainersToCleanUp());
|
|
||||||
latestResponse.addAllApplicationsToCleanup(rmNode.pullAppsToCleanup());
|
|
||||||
|
|
||||||
// 4. Send status to RMNode, saving the latest response.
|
|
||||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
|
||||||
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
|
|
||||||
remoteNodeStatus.getContainersStatuses(), latestResponse));
|
|
||||||
|
|
||||||
nodeHeartBeatResponse.setHeartbeatResponse(latestResponse);
|
|
||||||
return nodeHeartBeatResponse;
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
LOG.info("Exception in heartbeat from node " +
|
|
||||||
request.getNodeStatus().getNodeId(), ioe);
|
|
||||||
throw RPCUtil.getRemoteException(ioe);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Send ping
|
||||||
|
this.nmLivelinessMonitor.receivedPing(nodeId);
|
||||||
|
|
||||||
|
// 2. Check if it's a valid (i.e. not excluded) node
|
||||||
|
if (!this.nodesListManager.isValidNode(rmNode.getHostName())) {
|
||||||
|
LOG.info("Disallowed NodeManager nodeId: " + nodeId + " hostname: "
|
||||||
|
+ rmNode.getNodeAddress());
|
||||||
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||||
|
new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));
|
||||||
|
return shutDown;
|
||||||
|
}
|
||||||
|
|
||||||
|
NodeHeartbeatResponse nodeHeartBeatResponse = recordFactory
|
||||||
|
.newRecordInstance(NodeHeartbeatResponse.class);
|
||||||
|
|
||||||
|
// 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
|
||||||
|
HeartbeatResponse lastHeartbeatResponse = rmNode.getLastHeartBeatResponse();
|
||||||
|
if (remoteNodeStatus.getResponseId() + 1 == lastHeartbeatResponse
|
||||||
|
.getResponseId()) {
|
||||||
|
LOG.info("Received duplicate heartbeat from node "
|
||||||
|
+ rmNode.getNodeAddress());
|
||||||
|
nodeHeartBeatResponse.setHeartbeatResponse(lastHeartbeatResponse);
|
||||||
|
return nodeHeartBeatResponse;
|
||||||
|
} else if (remoteNodeStatus.getResponseId() + 1 < lastHeartbeatResponse
|
||||||
|
.getResponseId()) {
|
||||||
|
LOG.info("Too far behind rm response id:"
|
||||||
|
+ lastHeartbeatResponse.getResponseId() + " nm response id:"
|
||||||
|
+ remoteNodeStatus.getResponseId());
|
||||||
|
// TODO: Just sending reboot is not enough. Think more.
|
||||||
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||||
|
new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
|
||||||
|
return reboot;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Heartbeat response
|
||||||
|
HeartbeatResponse latestResponse = recordFactory
|
||||||
|
.newRecordInstance(HeartbeatResponse.class);
|
||||||
|
latestResponse.setResponseId(lastHeartbeatResponse.getResponseId() + 1);
|
||||||
|
latestResponse.addAllContainersToCleanup(rmNode.pullContainersToCleanUp());
|
||||||
|
latestResponse.addAllApplicationsToCleanup(rmNode.pullAppsToCleanup());
|
||||||
|
latestResponse.setNodeAction(NodeAction.NORMAL);
|
||||||
|
|
||||||
|
// 4. Send status to RMNode, saving the latest response.
|
||||||
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||||
|
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
|
||||||
|
remoteNodeStatus.getContainersStatuses(), latestResponse));
|
||||||
|
|
||||||
|
nodeHeartBeatResponse.setHeartbeatResponse(latestResponse);
|
||||||
|
return nodeHeartBeatResponse;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void recover(RMState state) {
|
public void recover(RMState state) {
|
||||||
|
|
|
@ -19,6 +19,9 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
|
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
|
||||||
|
|
||||||
public enum RMNodeEventType {
|
public enum RMNodeEventType {
|
||||||
|
|
||||||
|
STARTED,
|
||||||
|
|
||||||
// Source: AdminService
|
// Source: AdminService
|
||||||
DECOMMISSION,
|
DECOMMISSION,
|
||||||
|
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
|
@ -107,9 +108,11 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
= new StateMachineFactory<RMNodeImpl,
|
= new StateMachineFactory<RMNodeImpl,
|
||||||
RMNodeState,
|
RMNodeState,
|
||||||
RMNodeEventType,
|
RMNodeEventType,
|
||||||
RMNodeEvent>(RMNodeState.RUNNING)
|
RMNodeEvent>(RMNodeState.NEW)
|
||||||
|
|
||||||
//Transitions from RUNNING state
|
//Transitions from RUNNING state
|
||||||
|
.addTransition(RMNodeState.NEW, RMNodeState.RUNNING,
|
||||||
|
RMNodeEventType.STARTED, new AddNodeTransition())
|
||||||
.addTransition(RMNodeState.RUNNING,
|
.addTransition(RMNodeState.RUNNING,
|
||||||
EnumSet.of(RMNodeState.RUNNING, RMNodeState.UNHEALTHY),
|
EnumSet.of(RMNodeState.RUNNING, RMNodeState.UNHEALTHY),
|
||||||
RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
|
RMNodeEventType.STATUS_UPDATE, new StatusUpdateWhenHealthyTransition())
|
||||||
|
@ -158,8 +161,6 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
|
|
||||||
this.stateMachine = stateMachineFactory.make(this);
|
this.stateMachine = stateMachineFactory.make(this);
|
||||||
|
|
||||||
context.getDispatcher().getEventHandler().handle(
|
|
||||||
new NodeAddedSchedulerEvent(this));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -311,6 +312,21 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static class AddNodeTransition implements
|
||||||
|
SingleArcTransition<RMNodeImpl, RMNodeEvent> {
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
@Override
|
||||||
|
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
||||||
|
// Inform the scheduler
|
||||||
|
|
||||||
|
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||||
|
new NodeAddedSchedulerEvent(rmNode));
|
||||||
|
|
||||||
|
ClusterMetrics.getMetrics().addNode();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static class CleanUpAppTransition
|
public static class CleanUpAppTransition
|
||||||
implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
|
implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
|
||||||
|
|
||||||
|
@ -335,6 +351,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
public static class RemoveNodeTransition
|
public static class RemoveNodeTransition
|
||||||
implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
|
implements SingleArcTransition<RMNodeImpl, RMNodeEvent> {
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
||||||
// Inform the scheduler
|
// Inform the scheduler
|
||||||
|
@ -345,11 +362,14 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
rmNode.context.getRMNodes().remove(rmNode.nodeId);
|
rmNode.context.getRMNodes().remove(rmNode.nodeId);
|
||||||
LOG.info("Removed Node " + rmNode.nodeId);
|
LOG.info("Removed Node " + rmNode.nodeId);
|
||||||
|
|
||||||
|
//Update the metrics
|
||||||
|
ClusterMetrics.getMetrics().removeNode(event.getType());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class StatusUpdateWhenHealthyTransition implements
|
public static class StatusUpdateWhenHealthyTransition implements
|
||||||
MultipleArcTransition<RMNodeImpl, RMNodeEvent, RMNodeState> {
|
MultipleArcTransition<RMNodeImpl, RMNodeEvent, RMNodeState> {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
||||||
|
|
||||||
|
@ -365,6 +385,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
// Inform the scheduler
|
// Inform the scheduler
|
||||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||||
new NodeRemovedSchedulerEvent(rmNode));
|
new NodeRemovedSchedulerEvent(rmNode));
|
||||||
|
ClusterMetrics.getMetrics().incrNumUnhealthyNMs();
|
||||||
return RMNodeState.UNHEALTHY;
|
return RMNodeState.UNHEALTHY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -402,6 +423,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
implements
|
implements
|
||||||
MultipleArcTransition<RMNodeImpl, RMNodeEvent, RMNodeState> {
|
MultipleArcTransition<RMNodeImpl, RMNodeEvent, RMNodeState> {
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
public RMNodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
|
||||||
RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
|
RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
|
||||||
|
@ -413,6 +435,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
||||||
if (remoteNodeHealthStatus.getIsNodeHealthy()) {
|
if (remoteNodeHealthStatus.getIsNodeHealthy()) {
|
||||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||||
new NodeAddedSchedulerEvent(rmNode));
|
new NodeAddedSchedulerEvent(rmNode));
|
||||||
|
ClusterMetrics.getMetrics().decrNumUnhealthyNMs();
|
||||||
return RMNodeState.RUNNING;
|
return RMNodeState.RUNNING;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,5 +19,5 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
|
package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
|
||||||
|
|
||||||
public enum RMNodeState {
|
public enum RMNodeState {
|
||||||
RUNNING, UNHEALTHY, DECOMMISSIONED, LOST
|
NEW, RUNNING, UNHEALTHY, DECOMMISSIONED, LOST
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
@ -60,6 +61,7 @@ public class MetricsOverviewTable extends HtmlBlock {
|
||||||
|
|
||||||
ResourceScheduler rs = rm.getResourceScheduler();
|
ResourceScheduler rs = rm.getResourceScheduler();
|
||||||
QueueMetrics metrics = rs.getRootQueueMetrics();
|
QueueMetrics metrics = rs.getRootQueueMetrics();
|
||||||
|
ClusterMetrics clusterMetrics = ClusterMetrics.getMetrics();
|
||||||
|
|
||||||
int appsSubmitted = metrics.getAppsSubmitted();
|
int appsSubmitted = metrics.getAppsSubmitted();
|
||||||
int reservedGB = metrics.getReservedGB();
|
int reservedGB = metrics.getReservedGB();
|
||||||
|
@ -67,30 +69,13 @@ public class MetricsOverviewTable extends HtmlBlock {
|
||||||
int allocatedGB = metrics.getAllocatedGB();
|
int allocatedGB = metrics.getAllocatedGB();
|
||||||
int containersAllocated = metrics.getAllocatedContainers();
|
int containersAllocated = metrics.getAllocatedContainers();
|
||||||
int totalGB = availableGB + reservedGB + allocatedGB;
|
int totalGB = availableGB + reservedGB + allocatedGB;
|
||||||
|
|
||||||
ConcurrentMap<NodeId,RMNode> nodes = rmContext.getRMNodes();
|
int totalNodes = clusterMetrics.getNumNMs();
|
||||||
int totalNodes = nodes.size();
|
int lostNodes = clusterMetrics.getNumLostNMs();
|
||||||
int lostNodes = 0;
|
int unhealthyNodes = clusterMetrics.getUnhealthyNMs();
|
||||||
int unhealthyNodes = 0;
|
int decommissionedNodes = clusterMetrics.getNumDecommisionedNMs();
|
||||||
int decommissionedNodes = 0;
|
int rebootedNodes = clusterMetrics.getNumRebootedNMs();
|
||||||
for(RMNode node: nodes.values()) {
|
|
||||||
if(node == null || node.getState() == null) {
|
|
||||||
lostNodes++;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
switch(node.getState()) {
|
|
||||||
case DECOMMISSIONED:
|
|
||||||
decommissionedNodes++;
|
|
||||||
break;
|
|
||||||
case LOST:
|
|
||||||
lostNodes++;
|
|
||||||
break;
|
|
||||||
case UNHEALTHY:
|
|
||||||
unhealthyNodes++;
|
|
||||||
break;
|
|
||||||
//RUNNING noop
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
DIV<Hamlet> div = html.div().$class("metrics");
|
DIV<Hamlet> div = html.div().$class("metrics");
|
||||||
|
|
||||||
|
@ -106,6 +91,7 @@ public class MetricsOverviewTable extends HtmlBlock {
|
||||||
th().$class("ui-state-default")._("Decommissioned Nodes")._().
|
th().$class("ui-state-default")._("Decommissioned Nodes")._().
|
||||||
th().$class("ui-state-default")._("Lost Nodes")._().
|
th().$class("ui-state-default")._("Lost Nodes")._().
|
||||||
th().$class("ui-state-default")._("Unhealthy Nodes")._().
|
th().$class("ui-state-default")._("Unhealthy Nodes")._().
|
||||||
|
th().$class("ui-state-default")._("Rebooted Nodes")._().
|
||||||
_().
|
_().
|
||||||
_().
|
_().
|
||||||
tbody().$class("ui-widget-content").
|
tbody().$class("ui-widget-content").
|
||||||
|
@ -116,9 +102,10 @@ public class MetricsOverviewTable extends HtmlBlock {
|
||||||
td(StringUtils.byteDesc(totalGB * BYTES_IN_GB)).
|
td(StringUtils.byteDesc(totalGB * BYTES_IN_GB)).
|
||||||
td(StringUtils.byteDesc(reservedGB * BYTES_IN_GB)).
|
td(StringUtils.byteDesc(reservedGB * BYTES_IN_GB)).
|
||||||
td().a(url("nodes"),String.valueOf(totalNodes))._().
|
td().a(url("nodes"),String.valueOf(totalNodes))._().
|
||||||
td().a(url("nodes/DECOMMISSIONED"),String.valueOf(decommissionedNodes))._().
|
td().a(url("nodes/decommissioned"),String.valueOf(decommissionedNodes))._().
|
||||||
td().a(url("nodes/LOST"),String.valueOf(lostNodes))._().
|
td().a(url("nodes/lost"),String.valueOf(lostNodes))._().
|
||||||
td().a(url("nodes/UNHEALTHY"),String.valueOf(unhealthyNodes))._().
|
td().a(url("nodes/unhealthy"),String.valueOf(unhealthyNodes))._().
|
||||||
|
td().a(url("nodes/rebooted"),String.valueOf(rebootedNodes))._().
|
||||||
_().
|
_().
|
||||||
_()._();
|
_()._();
|
||||||
|
|
||||||
|
|
|
@ -63,7 +63,7 @@ public class MockNM {
|
||||||
new HashMap<ApplicationId, List<ContainerStatus>>();
|
new HashMap<ApplicationId, List<ContainerStatus>>();
|
||||||
conts.put(container.getId().getApplicationAttemptId().getApplicationId(),
|
conts.put(container.getId().getApplicationAttemptId().getApplicationId(),
|
||||||
Arrays.asList(new ContainerStatus[] { container.getContainerStatus() }));
|
Arrays.asList(new ContainerStatus[] { container.getContainerStatus() }));
|
||||||
nodeHeartbeat(conts, true);
|
nodeHeartbeat(conts, true,nodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public NodeId registerNode() throws Exception {
|
public NodeId registerNode() throws Exception {
|
||||||
|
@ -83,11 +83,11 @@ public class MockNM {
|
||||||
}
|
}
|
||||||
|
|
||||||
public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception {
|
public HeartbeatResponse nodeHeartbeat(boolean b) throws Exception {
|
||||||
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), b);
|
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), b,nodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
|
public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
|
||||||
List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
|
List<ContainerStatus>> conts, boolean isHealthy, NodeId nodeId) throws Exception {
|
||||||
NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
|
NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
|
||||||
NodeStatus status = Records.newRecord(NodeStatus.class);
|
NodeStatus status = Records.newRecord(NodeStatus.class);
|
||||||
status.setNodeId(nodeId);
|
status.setNodeId(nodeId);
|
||||||
|
|
|
@ -220,6 +220,10 @@ public class MockRM extends ResourceManager {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public NodesListManager getNodesListManager() {
|
||||||
|
return this.nodesListManager;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void startWepApp() {
|
protected void startWepApp() {
|
||||||
|
|
|
@ -45,7 +45,6 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Priority;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
|
|
|
@ -0,0 +1,270 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestResourceTrackerService {
|
||||||
|
|
||||||
|
private final static File TEMP_DIR = new File(System.getProperty(
|
||||||
|
"test.build.data", "/tmp"), "decommision");
|
||||||
|
private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
|
||||||
|
private MockRM rm;
|
||||||
|
private static final RecordFactory recordFactory = RecordFactoryProvider
|
||||||
|
.getRecordFactory(null);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* decommissioning using a include hosts file
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDecommissionWithIncludeHosts() throws Exception {
|
||||||
|
|
||||||
|
writeToHostsFile("host1", "host2");
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set("yarn.resourcemanager.nodes.include-path", hostFile
|
||||||
|
.getAbsolutePath());
|
||||||
|
|
||||||
|
rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
MockNM nm1 = rm.registerNode("host1:1234", 5120);
|
||||||
|
MockNM nm2 = rm.registerNode("host2:5678", 10240);
|
||||||
|
int initialMetricCount = ClusterMetrics.getMetrics()
|
||||||
|
.getNumDecommisionedNMs();
|
||||||
|
|
||||||
|
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||||
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
||||||
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
|
||||||
|
writeToHostsFile("host1");
|
||||||
|
|
||||||
|
rm.getNodesListManager().refreshNodes();
|
||||||
|
|
||||||
|
nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||||
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
Assert
|
||||||
|
.assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
|
|
||||||
|
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
||||||
|
Assert.assertTrue("Node is not decommisioned.", NodeAction.SHUTDOWN
|
||||||
|
.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
|
||||||
|
checkDecommissionedNMCount(rm, ++initialMetricCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* decommissioning using a exclude hosts file
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDecommissionWithExcludeHosts() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set("yarn.resourcemanager.nodes.exclude-path", hostFile
|
||||||
|
.getAbsolutePath());
|
||||||
|
|
||||||
|
writeToHostsFile("");
|
||||||
|
rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
MockNM nm1 = rm.registerNode("host1:1234", 5120);
|
||||||
|
MockNM nm2 = rm.registerNode("host2:5678", 10240);
|
||||||
|
|
||||||
|
int initialMetricCount = ClusterMetrics.getMetrics()
|
||||||
|
.getNumDecommisionedNMs();
|
||||||
|
|
||||||
|
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||||
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
||||||
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
|
||||||
|
writeToHostsFile("host2");
|
||||||
|
|
||||||
|
rm.getNodesListManager().refreshNodes();
|
||||||
|
|
||||||
|
nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||||
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
||||||
|
Assert.assertTrue("The decommisioned metrics are not updated",
|
||||||
|
NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
checkDecommissionedNMCount(rm, ++initialMetricCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNodeRegistrationFailure() throws Exception {
|
||||||
|
writeToHostsFile("host1");
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set("yarn.resourcemanager.nodes.include-path", hostFile
|
||||||
|
.getAbsolutePath());
|
||||||
|
rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
ResourceTrackerService resourceTrackerService = rm.getResourceTrackerService();
|
||||||
|
RegisterNodeManagerRequest req = Records.newRecord(
|
||||||
|
RegisterNodeManagerRequest.class);
|
||||||
|
NodeId nodeId = Records.newRecord(NodeId.class);
|
||||||
|
nodeId.setHost("host2");
|
||||||
|
nodeId.setPort(1234);
|
||||||
|
req.setNodeId(nodeId);
|
||||||
|
req.setHttpPort(1234);
|
||||||
|
// trying to register a invalid node.
|
||||||
|
RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req);
|
||||||
|
Assert.assertEquals(NodeAction.SHUTDOWN,response.getRegistrationResponse().getNodeAction());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReboot() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
MockNM nm1 = rm.registerNode("host1:1234", 5120);
|
||||||
|
MockNM nm2 = new MockNM("host2:1234", 2048, rm.getResourceTrackerService());
|
||||||
|
|
||||||
|
int initialMetricCount = ClusterMetrics.getMetrics().getNumRebootedNMs();
|
||||||
|
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||||
|
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
|
||||||
|
nodeHeartbeat = nm2.nodeHeartbeat(
|
||||||
|
new HashMap<ApplicationId, List<ContainerStatus>>(), true,
|
||||||
|
recordFactory.newRecordInstance(NodeId.class));
|
||||||
|
Assert.assertTrue(NodeAction.REBOOT.equals(nodeHeartbeat.getNodeAction()));
|
||||||
|
checkRebootedNMCount(rm, ++initialMetricCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkRebootedNMCount(MockRM rm2, int count)
|
||||||
|
throws InterruptedException {
|
||||||
|
|
||||||
|
int waitCount = 0;
|
||||||
|
while (ClusterMetrics.getMetrics().getNumRebootedNMs() != count
|
||||||
|
&& waitCount++ < 20) {
|
||||||
|
synchronized (this) {
|
||||||
|
wait(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assert.assertEquals("The rebooted metrics are not updated", count,
|
||||||
|
ClusterMetrics.getMetrics().getNumRebootedNMs());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUnhealthyNodeStatus() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
conf.set("yarn.resourcemanager.nodes.exclude-path", hostFile
|
||||||
|
.getAbsolutePath());
|
||||||
|
|
||||||
|
MockRM rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
MockNM nm1 = rm.registerNode("host1:1234", 5120);
|
||||||
|
Assert.assertEquals(0, ClusterMetrics.getMetrics().getUnhealthyNMs());
|
||||||
|
// node healthy
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
|
||||||
|
// node unhealthy
|
||||||
|
nm1.nodeHeartbeat(false);
|
||||||
|
checkUnealthyNMCount(rm, nm1, true, 1);
|
||||||
|
|
||||||
|
// node healthy again
|
||||||
|
nm1.nodeHeartbeat(true);
|
||||||
|
checkUnealthyNMCount(rm, nm1, false, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkUnealthyNMCount(MockRM rm, MockNM nm1, boolean health,
|
||||||
|
int count) throws Exception {
|
||||||
|
|
||||||
|
int waitCount = 0;
|
||||||
|
while(rm.getRMContext().getRMNodes().get(nm1.getNodeId())
|
||||||
|
.getNodeHealthStatus().getIsNodeHealthy() == health
|
||||||
|
&& waitCount++ < 20) {
|
||||||
|
synchronized (this) {
|
||||||
|
wait(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assert.assertFalse(rm.getRMContext().getRMNodes().get(nm1.getNodeId())
|
||||||
|
.getNodeHealthStatus().getIsNodeHealthy() == health);
|
||||||
|
Assert.assertEquals("Unhealthy metrics not incremented", count,
|
||||||
|
ClusterMetrics.getMetrics().getUnhealthyNMs());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeToHostsFile(String... hosts) throws IOException {
|
||||||
|
if (!hostFile.exists()) {
|
||||||
|
TEMP_DIR.mkdirs();
|
||||||
|
hostFile.createNewFile();
|
||||||
|
}
|
||||||
|
FileOutputStream fStream = null;
|
||||||
|
try {
|
||||||
|
fStream = new FileOutputStream(hostFile);
|
||||||
|
for (int i = 0; i < hosts.length; i++) {
|
||||||
|
fStream.write(hosts[i].getBytes());
|
||||||
|
fStream.write("\n".getBytes());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (fStream != null) {
|
||||||
|
IOUtils.closeStream(fStream);
|
||||||
|
fStream = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkDecommissionedNMCount(MockRM rm, int count)
|
||||||
|
throws InterruptedException {
|
||||||
|
int waitCount = 0;
|
||||||
|
while (ClusterMetrics.getMetrics().getNumDecommisionedNMs() != count
|
||||||
|
&& waitCount++ < 20) {
|
||||||
|
synchronized (this) {
|
||||||
|
wait(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assert.assertEquals(count, ClusterMetrics.getMetrics()
|
||||||
|
.getNumDecommisionedNMs());
|
||||||
|
Assert.assertEquals("The decommisioned metrics are not updated", count,
|
||||||
|
ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
if (hostFile != null && hostFile.exists()) {
|
||||||
|
hostFile.delete();
|
||||||
|
}
|
||||||
|
if (rm != null) {
|
||||||
|
rm.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,8 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
|
package org.apache.hadoop.yarn.server.resourcemanager.resourcetracker;
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -34,12 +32,13 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
|
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.NodeEventDispatcher;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||||
|
@ -55,8 +54,6 @@ public class TestNMExpiry {
|
||||||
ResourceTrackerService resourceTrackerService;
|
ResourceTrackerService resourceTrackerService;
|
||||||
ContainerTokenSecretManager containerTokenSecretManager =
|
ContainerTokenSecretManager containerTokenSecretManager =
|
||||||
new ContainerTokenSecretManager();
|
new ContainerTokenSecretManager();
|
||||||
AtomicInteger test = new AtomicInteger();
|
|
||||||
AtomicInteger notify = new AtomicInteger();
|
|
||||||
|
|
||||||
private class TestNmLivelinessMonitor extends NMLivelinessMonitor {
|
private class TestNmLivelinessMonitor extends NMLivelinessMonitor {
|
||||||
public TestNmLivelinessMonitor(Dispatcher dispatcher) {
|
public TestNmLivelinessMonitor(Dispatcher dispatcher) {
|
||||||
|
@ -68,22 +65,6 @@ public class TestNMExpiry {
|
||||||
conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 1000);
|
conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 1000);
|
||||||
super.init(conf);
|
super.init(conf);
|
||||||
}
|
}
|
||||||
@Override
|
|
||||||
protected void expire(NodeId id) {
|
|
||||||
LOG.info("Expired " + id);
|
|
||||||
if (test.addAndGet(1) == 2) {
|
|
||||||
try {
|
|
||||||
/* delay atleast 2 seconds to make sure the 3rd one does not expire
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
Thread.sleep(2000);
|
|
||||||
} catch(InterruptedException ie){}
|
|
||||||
synchronized(notify) {
|
|
||||||
notify.addAndGet(1);
|
|
||||||
notify.notifyAll();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -91,12 +72,12 @@ public class TestNMExpiry {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
// Dispatcher that processes events inline
|
// Dispatcher that processes events inline
|
||||||
Dispatcher dispatcher = new InlineDispatcher();
|
Dispatcher dispatcher = new InlineDispatcher();
|
||||||
|
RMContext context = new RMContextImpl(new MemStore(), dispatcher, null,
|
||||||
|
null, null);
|
||||||
dispatcher.register(SchedulerEventType.class,
|
dispatcher.register(SchedulerEventType.class,
|
||||||
new InlineDispatcher.EmptyEventHandler());
|
new InlineDispatcher.EmptyEventHandler());
|
||||||
dispatcher.register(RMNodeEventType.class,
|
dispatcher.register(RMNodeEventType.class,
|
||||||
new InlineDispatcher.EmptyEventHandler());
|
new NodeEventDispatcher(context));
|
||||||
RMContext context = new RMContextImpl(new MemStore(), dispatcher, null,
|
|
||||||
null, null);
|
|
||||||
NMLivelinessMonitor nmLivelinessMonitor = new TestNmLivelinessMonitor(
|
NMLivelinessMonitor nmLivelinessMonitor = new TestNmLivelinessMonitor(
|
||||||
dispatcher);
|
dispatcher);
|
||||||
nmLivelinessMonitor.init(conf);
|
nmLivelinessMonitor.init(conf);
|
||||||
|
@ -166,6 +147,14 @@ public class TestNMExpiry {
|
||||||
request2.setHttpPort(0);
|
request2.setHttpPort(0);
|
||||||
request2.setResource(capability);
|
request2.setResource(capability);
|
||||||
resourceTrackerService.registerNodeManager(request2);
|
resourceTrackerService.registerNodeManager(request2);
|
||||||
|
|
||||||
|
int waitCount = 0;
|
||||||
|
while(ClusterMetrics.getMetrics().getNumLostNMs()!=2 && waitCount ++<20){
|
||||||
|
synchronized (this) {
|
||||||
|
wait(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Assert.assertEquals(2, ClusterMetrics.getMetrics().getNumLostNMs());
|
||||||
|
|
||||||
request3 = recordFactory
|
request3 = recordFactory
|
||||||
.newRecordInstance(RegisterNodeManagerRequest.class);
|
.newRecordInstance(RegisterNodeManagerRequest.class);
|
||||||
|
@ -175,20 +164,13 @@ public class TestNMExpiry {
|
||||||
request3.setNodeId(nodeId3);
|
request3.setNodeId(nodeId3);
|
||||||
request3.setHttpPort(0);
|
request3.setHttpPort(0);
|
||||||
request3.setResource(capability);
|
request3.setResource(capability);
|
||||||
RegistrationResponse thirdNodeRegResponse = resourceTrackerService
|
resourceTrackerService
|
||||||
.registerNodeManager(request3).getRegistrationResponse();
|
.registerNodeManager(request3).getRegistrationResponse();
|
||||||
|
|
||||||
/* test to see if hostanme 3 does not expire */
|
/* test to see if hostanme 3 does not expire */
|
||||||
stopT = false;
|
stopT = false;
|
||||||
new ThirdNodeHeartBeatThread().start();
|
new ThirdNodeHeartBeatThread().start();
|
||||||
int timeOut = 0;
|
Assert.assertEquals(2,ClusterMetrics.getMetrics().getNumLostNMs());
|
||||||
synchronized (notify) {
|
|
||||||
while (notify.get() == 0 && timeOut++ < 30) {
|
|
||||||
notify.wait(1000);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Assert.assertEquals(2, test.get());
|
|
||||||
|
|
||||||
stopT = true;
|
stopT = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
|
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
|
@ -130,6 +131,6 @@ public class TestRMNMRPCResponseId {
|
||||||
nodeStatus.setResponseId(0);
|
nodeStatus.setResponseId(0);
|
||||||
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest)
|
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest)
|
||||||
.getHeartbeatResponse();
|
.getHeartbeatResponse();
|
||||||
Assert.assertTrue(response.getReboot() == true);
|
Assert.assertTrue(NodeAction.REBOOT.equals(response.getNodeAction()));
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -43,7 +43,7 @@ public class TestNodesPage {
|
||||||
final int numberOfNodesPerRack = 2;
|
final int numberOfNodesPerRack = 2;
|
||||||
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
|
// Number of Actual Table Headers for NodesPage.NodesBlock might change in
|
||||||
// future. In that case this value should be adjusted to the new value.
|
// future. In that case this value should be adjusted to the new value.
|
||||||
final int numberOfThInMetricsTable = 9;
|
final int numberOfThInMetricsTable = 10;
|
||||||
final int numberOfActualTableHeaders = 10;
|
final int numberOfActualTableHeaders = 10;
|
||||||
|
|
||||||
Injector injector = WebAppTests.createMockInjector(RMContext.class,
|
Injector injector = WebAppTests.createMockInjector(RMContext.class,
|
||||||
|
|
Loading…
Reference in New Issue