YARN-41. The RM should handle the graceful shutdown of the NM. Contributed by Devaraj K.

(cherry picked from commit d7e7f6aa03)
This commit is contained in:
Junping Du 2015-06-04 04:59:27 -07:00
parent bae1ec5f29
commit d8c7ee1b53
32 changed files with 702 additions and 28 deletions

View File

@ -75,6 +75,9 @@ Release 2.8.0 - UNRELEASED
YARN-160. Enhanced NodeManager to automatically obtain cpu/memory values from YARN-160. Enhanced NodeManager to automatically obtain cpu/memory values from
underlying OS when configured to do so. (Varun Vasudev via vinodkv) underlying OS when configured to do so. (Varun Vasudev via vinodkv)
YARN-41. The RM should handle the graceful shutdown of the NM. (Devaraj K via
junping_du)
IMPROVEMENTS IMPROVEMENTS
YARN-644. Basic null check is not performed on passed in arguments before YARN-644. Basic null check is not performed on passed in arguments before

View File

@ -46,9 +46,13 @@ public enum NodeState {
REBOOTED, REBOOTED,
/** Node decommission is in progress */ /** Node decommission is in progress */
DECOMMISSIONING; DECOMMISSIONING,
/** Node has shutdown gracefully. */
SHUTDOWN;
public boolean isUnusable() { public boolean isUnusable() {
return (this == UNHEALTHY || this == DECOMMISSIONED || this == LOST); return (this == UNHEALTHY || this == DECOMMISSIONED
|| this == LOST || this == SHUTDOWN);
} }
} }

View File

@ -228,6 +228,7 @@ enum NodeStateProto {
NS_LOST = 5; NS_LOST = 5;
NS_REBOOTED = 6; NS_REBOOTED = 6;
NS_DECOMMISSIONING = 7; NS_DECOMMISSIONING = 7;
NS_SHUTDOWN = 8;
} }
message NodeIdProto { message NodeIdProto {

View File

@ -26,16 +26,24 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse;
/**
* This is used by the Node Manager to register/nodeHeartbeat/unregister with
* the ResourceManager.
*/
public interface ResourceTracker { public interface ResourceTracker {
@Idempotent @Idempotent
public RegisterNodeManagerResponse registerNodeManager( RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnException, RegisterNodeManagerRequest request) throws YarnException, IOException;
IOException;
@AtMostOnce @AtMostOnce
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnException, IOException; throws YarnException, IOException;
@Idempotent
UnRegisterNodeManagerResponse unRegisterNodeManager(
UnRegisterNodeManagerRequest request) throws YarnException, IOException;
} }

View File

@ -29,16 +29,21 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.UnRegisterNodeManagerRequestProto;
import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.ResourceTrackerPB; import org.apache.hadoop.yarn.server.api.ResourceTrackerPB;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UnRegisterNodeManagerRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UnRegisterNodeManagerResponsePBImpl;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
@ -84,4 +89,17 @@ private ResourceTrackerPB proxy;
} }
} }
@Override
public UnRegisterNodeManagerResponse unRegisterNodeManager(
UnRegisterNodeManagerRequest request) throws YarnException, IOException {
UnRegisterNodeManagerRequestProto requestProto =
((UnRegisterNodeManagerRequestPBImpl) request).getProto();
try {
return new UnRegisterNodeManagerResponsePBImpl(
proxy.unRegisterNodeManager(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
} }

View File

@ -25,14 +25,19 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatR
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.UnRegisterNodeManagerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.UnRegisterNodeManagerResponseProto;
import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.ResourceTrackerPB; import org.apache.hadoop.yarn.server.api.ResourceTrackerPB;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UnRegisterNodeManagerRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UnRegisterNodeManagerResponsePBImpl;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
@ -53,9 +58,7 @@ public class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {
try { try {
RegisterNodeManagerResponse response = real.registerNodeManager(request); RegisterNodeManagerResponse response = real.registerNodeManager(request);
return ((RegisterNodeManagerResponsePBImpl)response).getProto(); return ((RegisterNodeManagerResponsePBImpl)response).getProto();
} catch (YarnException e) { } catch (YarnException | IOException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }
} }
@ -67,11 +70,23 @@ public class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {
try { try {
NodeHeartbeatResponse response = real.nodeHeartbeat(request); NodeHeartbeatResponse response = real.nodeHeartbeat(request);
return ((NodeHeartbeatResponsePBImpl)response).getProto(); return ((NodeHeartbeatResponsePBImpl)response).getProto();
} catch (YarnException e) { } catch (YarnException | IOException e) {
throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }
} }
@Override
public UnRegisterNodeManagerResponseProto unRegisterNodeManager(
RpcController controller, UnRegisterNodeManagerRequestProto proto)
throws ServiceException {
UnRegisterNodeManagerRequestPBImpl request =
new UnRegisterNodeManagerRequestPBImpl(proto);
try {
UnRegisterNodeManagerResponse response = real
.unRegisterNodeManager(request);
return ((UnRegisterNodeManagerResponsePBImpl) response).getProto();
} catch (YarnException | IOException e) {
throw new ServiceException(e);
}
}
} }

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.util.Records;
/**
* Node Manager's unregister request.
*/
public abstract class UnRegisterNodeManagerRequest {
public static UnRegisterNodeManagerRequest newInstance(NodeId nodeId) {
UnRegisterNodeManagerRequest nodeHeartbeatRequest = Records
.newRecord(UnRegisterNodeManagerRequest.class);
nodeHeartbeatRequest.setNodeId(nodeId);
return nodeHeartbeatRequest;
}
public abstract NodeId getNodeId();
public abstract void setNodeId(NodeId nodeId);
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.yarn.util.Records;
/**
* Node Manager's unregister response.
*/
public abstract class UnRegisterNodeManagerResponse {
public static UnRegisterNodeManagerResponse newInstance() {
return Records.newRecord(UnRegisterNodeManagerResponse.class);
}
}

View File

@ -0,0 +1,108 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.UnRegisterNodeManagerRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.UnRegisterNodeManagerRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
/**
* PBImpl class for UnRegisterNodeManagerRequest.
*/
public class UnRegisterNodeManagerRequestPBImpl extends
UnRegisterNodeManagerRequest {
private UnRegisterNodeManagerRequestProto proto =
UnRegisterNodeManagerRequestProto.getDefaultInstance();
private UnRegisterNodeManagerRequestProto.Builder builder = null;
private boolean viaProto = false;
private NodeId nodeId = null;
public UnRegisterNodeManagerRequestPBImpl() {
builder = UnRegisterNodeManagerRequestProto.newBuilder();
}
public UnRegisterNodeManagerRequestPBImpl(
UnRegisterNodeManagerRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public UnRegisterNodeManagerRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
if (this.nodeId != null) {
builder.setNodeId(convertToProtoFormat(this.nodeId));
}
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = UnRegisterNodeManagerRequestProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public NodeId getNodeId() {
UnRegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
if (this.nodeId != null) {
return this.nodeId;
}
if (!p.hasNodeId()) {
return null;
}
this.nodeId = convertFromProtoFormat(p.getNodeId());
return this.nodeId;
}
@Override
public void setNodeId(NodeId updatedNodeId) {
maybeInitBuilder();
if (updatedNodeId == null) {
builder.clearNodeId();
}
this.nodeId = updatedNodeId;
}
private NodeIdPBImpl convertFromProtoFormat(NodeIdProto p) {
return new NodeIdPBImpl(p);
}
private NodeIdProto convertToProtoFormat(NodeId t) {
return ((NodeIdPBImpl) t).getProto();
}
}

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.UnRegisterNodeManagerResponseProto;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse;
/**
* PBImpl class for UnRegisterNodeManagerResponse.
*/
public class UnRegisterNodeManagerResponsePBImpl extends
UnRegisterNodeManagerResponse {
private UnRegisterNodeManagerResponseProto proto =
UnRegisterNodeManagerResponseProto.getDefaultInstance();
private UnRegisterNodeManagerResponseProto.Builder builder = null;
private boolean viaProto = false;
private boolean rebuild = false;
public UnRegisterNodeManagerResponsePBImpl() {
builder = UnRegisterNodeManagerResponseProto.newBuilder();
}
public UnRegisterNodeManagerResponsePBImpl(
UnRegisterNodeManagerResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public UnRegisterNodeManagerResponseProto getProto() {
if (rebuild) {
mergeLocalToProto();
}
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
proto = builder.build();
rebuild = false;
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = UnRegisterNodeManagerResponseProto.newBuilder(proto);
}
viaProto = false;
}
}

View File

@ -27,4 +27,5 @@ import "yarn_server_common_service_protos.proto";
service ResourceTrackerService { service ResourceTrackerService {
rpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto); rpc registerNodeManager(RegisterNodeManagerRequestProto) returns (RegisterNodeManagerResponseProto);
rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto); rpc nodeHeartbeat(NodeHeartbeatRequestProto) returns (NodeHeartbeatResponseProto);
rpc unRegisterNodeManager(UnRegisterNodeManagerRequestProto) returns (UnRegisterNodeManagerResponseProto);
} }

View File

@ -49,6 +49,13 @@ message RegisterNodeManagerResponseProto {
optional bool areNodeLabelsAcceptedByRM = 7 [default = false]; optional bool areNodeLabelsAcceptedByRM = 7 [default = false];
} }
message UnRegisterNodeManagerRequestProto {
optional NodeIdProto node_id = 1;
}
message UnRegisterNodeManagerResponseProto {
}
message NodeHeartbeatRequestProto { message NodeHeartbeatRequestProto {
optional NodeStatusProto node_status = 1; optional NodeStatusProto node_status = 1;
optional MasterKeyProto last_known_container_token_master_key = 2; optional MasterKeyProto last_known_container_token_master_key = 2;

View File

@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl; import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl;
import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl; import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl;
@ -32,9 +33,12 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.*; import static org.junit.Assert.*;
/** /**
@ -116,7 +120,27 @@ public class TestResourceTrackerPBClientImpl {
} }
/**
* Test the method unRegisterNodeManager. Method should return a not null
* result.
*
*/
@Test
public void testUnRegisterNodeManager() throws Exception {
UnRegisterNodeManagerRequest request = UnRegisterNodeManagerRequest
.newInstance(NodeId.newInstance("host1", 1234));
assertNotNull(client.unRegisterNodeManager(request));
ResourceTrackerTestImpl.exception = true;
try {
client.unRegisterNodeManager(request);
fail("there should be YarnException");
} catch (YarnException e) {
assertTrue(e.getMessage().startsWith("testMessage"));
} finally {
ResourceTrackerTestImpl.exception = false;
}
}
public static class ResourceTrackerTestImpl implements ResourceTracker { public static class ResourceTrackerTestImpl implements ResourceTracker {
@ -140,5 +164,13 @@ public class TestResourceTrackerPBClientImpl {
return recordFactory.newRecordInstance(NodeHeartbeatResponse.class); return recordFactory.newRecordInstance(NodeHeartbeatResponse.class);
} }
@Override
public UnRegisterNodeManagerResponse unRegisterNodeManager(
UnRegisterNodeManagerRequest request) throws YarnException, IOException {
if (exception) {
throw new YarnException("testMessage");
}
return UnRegisterNodeManagerResponse.newInstance();
}
} }
} }

View File

@ -35,6 +35,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse;
import org.junit.Test; import org.junit.Test;
public class TestYSCRPCFactories { public class TestYSCRPCFactories {
@ -116,5 +118,11 @@ public class TestYSCRPCFactories {
return null; return null;
} }
@Override
public UnRegisterNodeManagerResponse unRegisterNodeManager(
UnRegisterNodeManagerRequest request) throws YarnException, IOException {
// TODO Auto-generated method stub
return null;
}
} }
} }

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRe
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.UnRegisterNodeManagerRequestPBImpl;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
@ -294,6 +295,17 @@ public class TestYarnServerApiClasses {
Assert.assertEquals(0, copy.getNodeLabels().size()); Assert.assertEquals(0, copy.getNodeLabels().size());
} }
@Test
public void testUnRegisterNodeManagerRequestPBImpl() throws Exception {
UnRegisterNodeManagerRequestPBImpl request = new UnRegisterNodeManagerRequestPBImpl();
NodeId nodeId = NodeId.newInstance("host", 1234);
request.setNodeId(nodeId);
UnRegisterNodeManagerRequestPBImpl copy = new UnRegisterNodeManagerRequestPBImpl(
request.getProto());
Assert.assertEquals(nodeId, copy.getNodeId());
}
private HashSet<NodeLabel> getValidNodeLabels() { private HashSet<NodeLabel> getValidNodeLabels() {
HashSet<NodeLabel> nodeLabels = new HashSet<NodeLabel>(); HashSet<NodeLabel> nodeLabels = new HashSet<NodeLabel>();
nodeLabels.add(NodeLabel.newInstance("java")); nodeLabels.add(NodeLabel.newInstance("java"));

View File

@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ResourceTracker;
@ -66,6 +68,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
@ -130,6 +133,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private Runnable statusUpdaterRunnable; private Runnable statusUpdaterRunnable;
private Thread statusUpdater; private Thread statusUpdater;
private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER; private long rmIdentifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER;
private boolean registeredWithRM = false;
Set<ContainerId> pendingContainersToRemove = new HashSet<ContainerId>(); Set<ContainerId> pendingContainersToRemove = new HashSet<ContainerId>();
private final NodeLabelsProvider nodeLabelsProvider; private final NodeLabelsProvider nodeLabelsProvider;
@ -232,12 +236,40 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
// the isStopped check is for avoiding multiple unregistrations.
if (this.registeredWithRM && !this.isStopped
&& !isNMUnderSupervisionWithRecoveryEnabled()
&& !context.getDecommissioned()) {
unRegisterNM();
}
// Interrupt the updater. // Interrupt the updater.
this.isStopped = true; this.isStopped = true;
stopRMProxy(); stopRMProxy();
super.serviceStop(); super.serviceStop();
} }
private boolean isNMUnderSupervisionWithRecoveryEnabled() {
Configuration config = getConfig();
return config.getBoolean(YarnConfiguration.NM_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_NM_RECOVERY_ENABLED)
&& config.getBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED,
YarnConfiguration.DEFAULT_NM_RECOVERY_SUPERVISED);
}
private void unRegisterNM() {
RecordFactory recordFactory = RecordFactoryPBImpl.get();
UnRegisterNodeManagerRequest request = recordFactory
.newRecordInstance(UnRegisterNodeManagerRequest.class);
request.setNodeId(this.nodeId);
try {
resourceTracker.unRegisterNodeManager(request);
LOG.info("Successfully Unregistered the Node " + this.nodeId
+ " with ResourceManager.");
} catch (Exception e) {
LOG.warn("Unregistration of the Node " + this.nodeId + " failed.", e);
}
}
protected void rebootNodeStatusUpdaterAndRegisterWithRM() { protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
// Interrupt the updater. // Interrupt the updater.
this.isStopped = true; this.isStopped = true;
@ -327,6 +359,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
+ "version error, " + message); + "version error, " + message);
} }
} }
this.registeredWithRM = true;
MasterKey masterKey = regNMResponse.getContainerTokenMasterKey(); MasterKey masterKey = regNMResponse.getContainerTokenMasterKey();
// do this now so that its set before we start heartbeating to RM // do this now so that its set before we start heartbeating to RM
// It is expected that status updater is started by this point and // It is expected that status updater is started by this point and

View File

@ -30,6 +30,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
@ -57,4 +59,12 @@ public class LocalRMInterface implements ResourceTracker {
NodeHeartbeatResponse response = recordFactory.newRecordInstance(NodeHeartbeatResponse.class); NodeHeartbeatResponse response = recordFactory.newRecordInstance(NodeHeartbeatResponse.class);
return response; return response;
} }
@Override
public UnRegisterNodeManagerResponse unRegisterNodeManager(
UnRegisterNodeManagerRequest request) throws YarnException, IOException {
UnRegisterNodeManagerResponse response = recordFactory
.newRecordInstance(UnRegisterNodeManagerResponse.class);
return response;
}
} }

View File

@ -33,6 +33,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
@ -100,5 +102,12 @@ public class MockNodeStatusUpdater extends NodeStatusUpdaterImpl {
null, null, null, 1000L); null, null, null, 1000L);
return nhResponse; return nhResponse;
} }
@Override
public UnRegisterNodeManagerResponse unRegisterNodeManager(
UnRegisterNodeManagerRequest request) throws YarnException, IOException {
return recordFactory
.newRecordInstance(UnRegisterNodeManagerResponse.class);
}
} }
} }

View File

@ -85,6 +85,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
@ -295,6 +297,13 @@ public class TestNodeStatusUpdater {
1000L); 1000L);
return nhResponse; return nhResponse;
} }
@Override
public UnRegisterNodeManagerResponse unRegisterNodeManager(
UnRegisterNodeManagerRequest request) throws YarnException, IOException {
return recordFactory
.newRecordInstance(UnRegisterNodeManagerResponse.class);
}
} }
private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl { private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
@ -515,6 +524,13 @@ public class TestNodeStatusUpdater {
nhResponse.setDiagnosticsMessage(shutDownMessage); nhResponse.setDiagnosticsMessage(shutDownMessage);
return nhResponse; return nhResponse;
} }
@Override
public UnRegisterNodeManagerResponse unRegisterNodeManager(
UnRegisterNodeManagerRequest request) throws YarnException, IOException {
return recordFactory
.newRecordInstance(UnRegisterNodeManagerResponse.class);
}
} }
private class MyResourceTracker3 implements ResourceTracker { private class MyResourceTracker3 implements ResourceTracker {
@ -570,6 +586,13 @@ public class TestNodeStatusUpdater {
} }
return nhResponse; return nhResponse;
} }
@Override
public UnRegisterNodeManagerResponse unRegisterNodeManager(
UnRegisterNodeManagerRequest request) throws YarnException, IOException {
return recordFactory
.newRecordInstance(UnRegisterNodeManagerResponse.class);
}
} }
// Test NodeStatusUpdater sends the right container statuses each time it // Test NodeStatusUpdater sends the right container statuses each time it
@ -738,6 +761,13 @@ public class TestNodeStatusUpdater {
nhResponse.setSystemCredentialsForApps(appCredentials); nhResponse.setSystemCredentialsForApps(appCredentials);
return nhResponse; return nhResponse;
} }
@Override
public UnRegisterNodeManagerResponse unRegisterNodeManager(
UnRegisterNodeManagerRequest request) throws YarnException, IOException {
return recordFactory
.newRecordInstance(UnRegisterNodeManagerResponse.class);
}
} }
private class MyResourceTracker5 implements ResourceTracker { private class MyResourceTracker5 implements ResourceTracker {
@ -768,6 +798,13 @@ public class TestNodeStatusUpdater {
"NodeHeartbeat exception"); "NodeHeartbeat exception");
} }
} }
@Override
public UnRegisterNodeManagerResponse unRegisterNodeManager(
UnRegisterNodeManagerRequest request) throws YarnException, IOException {
return recordFactory
.newRecordInstance(UnRegisterNodeManagerResponse.class);
}
} }
private class MyResourceTracker6 implements ResourceTracker { private class MyResourceTracker6 implements ResourceTracker {
@ -820,6 +857,13 @@ public class TestNodeStatusUpdater {
null, null, null, 1000L); null, null, null, 1000L);
return nhResponse; return nhResponse;
} }
@Override
public UnRegisterNodeManagerResponse unRegisterNodeManager(
UnRegisterNodeManagerRequest request) throws YarnException, IOException {
return recordFactory
.newRecordInstance(UnRegisterNodeManagerResponse.class);
}
} }
@Before @Before

View File

@ -40,6 +40,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@ -181,6 +183,12 @@ public class TestNodeStatusUpdaterForLabels extends NodeLabelTestBase {
} }
return nhResponse; return nhResponse;
} }
@Override
public UnRegisterNodeManagerResponse unRegisterNodeManager(
UnRegisterNodeManagerRequest request) throws YarnException, IOException {
return null;
}
} }
public static class DummyNodeLabelsProvider extends NodeLabelsProvider { public static class DummyNodeLabelsProvider extends NodeLabelsProvider {

View File

@ -44,6 +44,7 @@ public class ClusterMetrics {
@Metric("# of lost NMs") MutableGaugeInt numLostNMs; @Metric("# of lost NMs") MutableGaugeInt numLostNMs;
@Metric("# of unhealthy NMs") MutableGaugeInt numUnhealthyNMs; @Metric("# of unhealthy NMs") MutableGaugeInt numUnhealthyNMs;
@Metric("# of Rebooted NMs") MutableGaugeInt numRebootedNMs; @Metric("# of Rebooted NMs") MutableGaugeInt numRebootedNMs;
@Metric("# of Shutdown NMs") MutableGaugeInt numShutdownNMs;
@Metric("AM container launch delay") MutableRate aMLaunchDelay; @Metric("AM container launch delay") MutableRate aMLaunchDelay;
@Metric("AM register delay") MutableRate aMRegisterDelay; @Metric("AM register delay") MutableRate aMRegisterDelay;
@ -142,6 +143,19 @@ public class ClusterMetrics {
numRebootedNMs.decr(); numRebootedNMs.decr();
} }
// Shutdown NMs
public int getNumShutdownNMs() {
return numShutdownNMs.value();
}
public void incrNumShutdownNMs() {
numShutdownNMs.incr();
}
public void decrNumShutdownNMs() {
numShutdownNMs.decr();
}
public void incrNumActiveNodes() { public void incrNumActiveNodes() {
numActiveNMs.incr(); numActiveNMs.incr();
} }

View File

@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@ -493,6 +495,27 @@ public class ResourceTrackerService extends AbstractService implements
return nodeHeartBeatResponse; return nodeHeartBeatResponse;
} }
@SuppressWarnings("unchecked")
@Override
public UnRegisterNodeManagerResponse unRegisterNodeManager(
UnRegisterNodeManagerRequest request) throws YarnException, IOException {
UnRegisterNodeManagerResponse response = recordFactory
.newRecordInstance(UnRegisterNodeManagerResponse.class);
NodeId nodeId = request.getNodeId();
RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);
if (rmNode == null) {
LOG.info("Node not found, ignoring the unregister from node id : "
+ nodeId);
return response;
}
LOG.info("Node with node id : " + nodeId
+ " has shutdown, hence unregistering the node.");
this.nmLivelinessMonitor.unregister(nodeId);
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeEvent(nodeId, RMNodeEventType.SHUTDOWN));
return response;
}
private void updateNodeLabelsFromNMReport(Set<String> nodeLabels, private void updateNodeLabelsFromNMReport(Set<String> nodeLabels,
NodeId nodeId) throws IOException { NodeId nodeId) throws IOException {
try { try {

View File

@ -34,6 +34,7 @@ public enum RMNodeEventType {
STATUS_UPDATE, STATUS_UPDATE,
REBOOTING, REBOOTING,
RECONNECTED, RECONNECTED,
SHUTDOWN,
// Source: Application // Source: Application
CLEANUP_APP, CLEANUP_APP,

View File

@ -168,6 +168,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
RMNodeEventType.RECONNECTED, new ReconnectNodeTransition()) RMNodeEventType.RECONNECTED, new ReconnectNodeTransition())
.addTransition(NodeState.RUNNING, NodeState.RUNNING, .addTransition(NodeState.RUNNING, NodeState.RUNNING,
RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition()) RMNodeEventType.RESOURCE_UPDATE, new UpdateNodeResourceWhenRunningTransition())
.addTransition(NodeState.RUNNING, NodeState.SHUTDOWN,
RMNodeEventType.SHUTDOWN,
new DeactivateNodeTransition(NodeState.SHUTDOWN))
//Transitions from REBOOTED state //Transitions from REBOOTED state
.addTransition(NodeState.REBOOTED, NodeState.REBOOTED, .addTransition(NodeState.REBOOTED, NodeState.REBOOTED,
@ -215,6 +218,17 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
.addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY, .addTransition(NodeState.UNHEALTHY, NodeState.UNHEALTHY,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM, RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new AddContainersToBeRemovedFromNMTransition()) new AddContainersToBeRemovedFromNMTransition())
.addTransition(NodeState.UNHEALTHY, NodeState.SHUTDOWN,
RMNodeEventType.SHUTDOWN,
new DeactivateNodeTransition(NodeState.SHUTDOWN))
//Transitions from SHUTDOWN state
.addTransition(NodeState.SHUTDOWN, NodeState.SHUTDOWN,
RMNodeEventType.RESOURCE_UPDATE,
new UpdateNodeResourceWhenUnusableTransition())
.addTransition(NodeState.SHUTDOWN, NodeState.SHUTDOWN,
RMNodeEventType.FINISHED_CONTAINERS_PULLED_BY_AM,
new AddContainersToBeRemovedFromNMTransition())
// create the topology tables // create the topology tables
.installTopology(); .installTopology();
@ -450,6 +464,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
case UNHEALTHY: case UNHEALTHY:
metrics.decrNumUnhealthyNMs(); metrics.decrNumUnhealthyNMs();
break; break;
case SHUTDOWN:
metrics.decrNumShutdownNMs();
break;
default: default:
LOG.debug("Unexpected previous node state"); LOG.debug("Unexpected previous node state");
} }
@ -483,6 +500,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
case UNHEALTHY: case UNHEALTHY:
metrics.incrNumUnhealthyNMs(); metrics.incrNumUnhealthyNMs();
break; break;
case SHUTDOWN:
metrics.incrNumShutdownNMs();
break;
default: default:
LOG.debug("Unexpected final state"); LOG.debug("Unexpected final state");
} }

View File

@ -78,6 +78,7 @@ public class MetricsOverviewTable extends HtmlBlock {
th().$class("ui-state-default")._("Lost Nodes")._(). th().$class("ui-state-default")._("Lost Nodes")._().
th().$class("ui-state-default")._("Unhealthy Nodes")._(). th().$class("ui-state-default")._("Unhealthy Nodes")._().
th().$class("ui-state-default")._("Rebooted Nodes")._(). th().$class("ui-state-default")._("Rebooted Nodes")._().
th().$class("ui-state-default")._("Shutdown Nodes")._().
_(). _().
_(). _().
tbody().$class("ui-widget-content"). tbody().$class("ui-widget-content").
@ -103,6 +104,7 @@ public class MetricsOverviewTable extends HtmlBlock {
td().a(url("nodes/lost"),String.valueOf(clusterMetrics.getLostNodes()))._(). td().a(url("nodes/lost"),String.valueOf(clusterMetrics.getLostNodes()))._().
td().a(url("nodes/unhealthy"),String.valueOf(clusterMetrics.getUnhealthyNodes()))._(). td().a(url("nodes/unhealthy"),String.valueOf(clusterMetrics.getUnhealthyNodes()))._().
td().a(url("nodes/rebooted"),String.valueOf(clusterMetrics.getRebootedNodes()))._(). td().a(url("nodes/rebooted"),String.valueOf(clusterMetrics.getRebootedNodes()))._().
td().a(url("nodes/shutdown"),String.valueOf(clusterMetrics.getShutdownNodes()))._().
_(). _().
_()._(); _()._();

View File

@ -90,6 +90,7 @@ class NodesPage extends RmView {
case DECOMMISSIONED: case DECOMMISSIONED:
case LOST: case LOST:
case REBOOTED: case REBOOTED:
case SHUTDOWN:
rmNodes = this.rm.getRMContext().getInactiveRMNodes().values(); rmNodes = this.rm.getRMContext().getInactiveRMNodes().values();
isInactive = true; isInactive = true;
break; break;

View File

@ -57,6 +57,7 @@ public class ClusterMetricsInfo {
protected int decommissionedNodes; protected int decommissionedNodes;
protected int rebootedNodes; protected int rebootedNodes;
protected int activeNodes; protected int activeNodes;
protected int shutdownNodes;
public ClusterMetricsInfo() { public ClusterMetricsInfo() {
} // JAXB needs this } // JAXB needs this
@ -92,8 +93,9 @@ public class ClusterMetricsInfo {
this.unhealthyNodes = clusterMetrics.getUnhealthyNMs(); this.unhealthyNodes = clusterMetrics.getUnhealthyNMs();
this.decommissionedNodes = clusterMetrics.getNumDecommisionedNMs(); this.decommissionedNodes = clusterMetrics.getNumDecommisionedNMs();
this.rebootedNodes = clusterMetrics.getNumRebootedNMs(); this.rebootedNodes = clusterMetrics.getNumRebootedNMs();
this.shutdownNodes = clusterMetrics.getNumShutdownNMs();
this.totalNodes = activeNodes + lostNodes + decommissionedNodes this.totalNodes = activeNodes + lostNodes + decommissionedNodes
+ rebootedNodes + unhealthyNodes; + rebootedNodes + unhealthyNodes + shutdownNodes;
} }
public int getAppsSubmitted() { public int getAppsSubmitted() {
@ -188,4 +190,8 @@ public class ClusterMetricsInfo {
return this.decommissionedNodes; return this.decommissionedNodes;
} }
public int getShutdownNodes() {
return this.shutdownNodes;
}
} }

View File

@ -465,6 +465,20 @@ public class TestRMNodeTransitions {
Assert.assertEquals(NodeState.REBOOTED, node.getState()); Assert.assertEquals(NodeState.REBOOTED, node.getState());
} }
@Test
public void testNMShutdown() {
RMNodeImpl node = getRunningNode();
node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.SHUTDOWN));
Assert.assertEquals(NodeState.SHUTDOWN, node.getState());
}
@Test
public void testUnhealthyNMShutdown() {
RMNodeImpl node = getUnhealthyNode();
node.handle(new RMNodeEvent(node.getNodeID(), RMNodeEventType.SHUTDOWN));
Assert.assertEquals(NodeState.SHUTDOWN, node.getState());
}
@Test(timeout=20000) @Test(timeout=20000)
public void testUpdateHeartbeatResponseForCleanup() { public void testUpdateHeartbeatResponseForCleanup() {
RMNodeImpl node = getRunningNode(); RMNodeImpl node = getRunningNode();

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
@ -921,7 +922,7 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
ClusterMetrics.getMetrics().getUnhealthyNMs()); ClusterMetrics.getMetrics().getUnhealthyNMs());
} }
@SuppressWarnings("unchecked") @SuppressWarnings({ "unchecked", "rawtypes" })
@Test @Test
public void testHandleContainerStatusInvalidCompletions() throws Exception { public void testHandleContainerStatusInvalidCompletions() throws Exception {
rm = new MockRM(new YarnConfiguration()); rm = new MockRM(new YarnConfiguration());
@ -1075,6 +1076,113 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
} }
@Test
public void testNMUnregistration() throws Exception {
Configuration conf = new Configuration();
rm = new MockRM(conf);
rm.start();
ResourceTrackerService resourceTrackerService = rm
.getResourceTrackerService();
MockNM nm1 = rm.registerNode("host1:1234", 5120);
int shutdownNMsCount = ClusterMetrics.getMetrics()
.getNumShutdownNMs();
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
UnRegisterNodeManagerRequest request = Records
.newRecord(UnRegisterNodeManagerRequest.class);
request.setNodeId(nm1.getNodeId());
resourceTrackerService.unRegisterNodeManager(request);
checkShutdownNMCount(rm, ++shutdownNMsCount);
// The RM should remove the node after unregistration, hence send a reboot
// command.
nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.RESYNC.equals(nodeHeartbeat.getNodeAction()));
}
@Test
public void testUnhealthyNMUnregistration() throws Exception {
Configuration conf = new Configuration();
rm = new MockRM(conf);
rm.start();
ResourceTrackerService resourceTrackerService = rm
.getResourceTrackerService();
MockNM nm1 = rm.registerNode("host1:1234", 5120);
Assert.assertEquals(0, ClusterMetrics.getMetrics().getUnhealthyNMs());
// node healthy
nm1.nodeHeartbeat(true);
int shutdownNMsCount = ClusterMetrics.getMetrics().getNumShutdownNMs();
// node unhealthy
nm1.nodeHeartbeat(false);
checkUnealthyNMCount(rm, nm1, true, 1);
UnRegisterNodeManagerRequest request = Records
.newRecord(UnRegisterNodeManagerRequest.class);
request.setNodeId(nm1.getNodeId());
resourceTrackerService.unRegisterNodeManager(request);
checkShutdownNMCount(rm, ++shutdownNMsCount);
}
@Test
public void testInvalidNMUnregistration() throws Exception {
Configuration conf = new Configuration();
rm = new MockRM(conf);
rm.start();
ResourceTrackerService resourceTrackerService = rm
.getResourceTrackerService();
int shutdownNMsCount = ClusterMetrics.getMetrics()
.getNumShutdownNMs();
int decommisionedNMsCount = ClusterMetrics.getMetrics()
.getNumDecommisionedNMs();
// Node not found for unregister
UnRegisterNodeManagerRequest request = Records
.newRecord(UnRegisterNodeManagerRequest.class);
request.setNodeId(BuilderUtils.newNodeId("host", 1234));
resourceTrackerService.unRegisterNodeManager(request);
checkShutdownNMCount(rm, 0);
checkDecommissionedNMCount(rm, 0);
// 1. Register the Node Manager
// 2. Exclude the same Node Manager host
// 3. Give NM heartbeat to RM
// 4. Unregister the Node Manager
MockNM nm1 = new MockNM("host1:1234", 5120, resourceTrackerService);
RegisterNodeManagerResponse response = nm1.registerNode();
Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction());
writeToHostsFile("host2");
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
rm.getNodesListManager().refreshNodes(conf);
NodeHeartbeatResponse heartbeatResponse = nm1.nodeHeartbeat(true);
Assert.assertEquals(NodeAction.SHUTDOWN, heartbeatResponse.getNodeAction());
checkShutdownNMCount(rm, shutdownNMsCount);
checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
request.setNodeId(nm1.getNodeId());
resourceTrackerService.unRegisterNodeManager(request);
checkShutdownNMCount(rm, shutdownNMsCount);
checkDecommissionedNMCount(rm, decommisionedNMsCount);
// 1. Register the Node Manager
// 2. Exclude the same Node Manager host
// 3. Unregister the Node Manager
MockNM nm2 = new MockNM("host2:1234", 5120, resourceTrackerService);
RegisterNodeManagerResponse response2 = nm2.registerNode();
Assert.assertEquals(NodeAction.NORMAL, response2.getNodeAction());
writeToHostsFile("host1");
conf.set(YarnConfiguration.RM_NODES_INCLUDE_FILE_PATH,
hostFile.getAbsolutePath());
rm.getNodesListManager().refreshNodes(conf);
request.setNodeId(nm2.getNodeId());
resourceTrackerService.unRegisterNodeManager(request);
checkShutdownNMCount(rm, shutdownNMsCount);
checkDecommissionedNMCount(rm, ++decommisionedNMsCount);
}
private void writeToHostsFile(String... hosts) throws IOException { private void writeToHostsFile(String... hosts) throws IOException {
if (!hostFile.exists()) { if (!hostFile.exists()) {
TEMP_DIR.mkdirs(); TEMP_DIR.mkdirs();
@ -1110,6 +1218,19 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
ClusterMetrics.getMetrics().getNumDecommisionedNMs()); ClusterMetrics.getMetrics().getNumDecommisionedNMs());
} }
private void checkShutdownNMCount(MockRM rm, int count)
throws InterruptedException {
int waitCount = 0;
while (ClusterMetrics.getMetrics().getNumShutdownNMs() != count
&& waitCount++ < 20) {
synchronized (this) {
wait(100);
}
}
Assert.assertEquals("The shutdown metrics are not updated", count,
ClusterMetrics.getMetrics().getNumShutdownNMs());
}
@After @After
public void tearDown() { public void tearDown() {
if (hostFile != null && hostFile.exists()) { if (hostFile != null && hostFile.exists()) {

View File

@ -40,7 +40,7 @@ import com.google.inject.Module;
public class TestNodesPage { public class TestNodesPage {
final int numberOfRacks = 2; final int numberOfRacks = 2;
final int numberOfNodesPerRack = 7; final int numberOfNodesPerRack = 8;
// The following is because of the way TestRMWebApp.mockRMContext creates // The following is because of the way TestRMWebApp.mockRMContext creates
// nodes. // nodes.
final int numberOfLostNodesPerRack = numberOfNodesPerRack final int numberOfLostNodesPerRack = numberOfNodesPerRack
@ -48,7 +48,7 @@ public class TestNodesPage {
// Number of Actual Table Headers for NodesPage.NodesBlock might change in // Number of Actual Table Headers for NodesPage.NodesBlock might change in
// future. In that case this value should be adjusted to the new value. // future. In that case this value should be adjusted to the new value.
final int numberOfThInMetricsTable = 20; final int numberOfThInMetricsTable = 21;
final int numberOfActualTableHeaders = 13; final int numberOfActualTableHeaders = 13;
private Injector injector; private Injector injector;

View File

@ -416,7 +416,8 @@ public class TestRMWebServices extends JerseyTestBase {
WebServicesTestUtils.getXmlInt(element, "unhealthyNodes"), WebServicesTestUtils.getXmlInt(element, "unhealthyNodes"),
WebServicesTestUtils.getXmlInt(element, "decommissionedNodes"), WebServicesTestUtils.getXmlInt(element, "decommissionedNodes"),
WebServicesTestUtils.getXmlInt(element, "rebootedNodes"), WebServicesTestUtils.getXmlInt(element, "rebootedNodes"),
WebServicesTestUtils.getXmlInt(element, "activeNodes")); WebServicesTestUtils.getXmlInt(element, "activeNodes"),
WebServicesTestUtils.getXmlInt(element, "shutdownNodes"));
} }
} }
@ -424,7 +425,7 @@ public class TestRMWebServices extends JerseyTestBase {
Exception { Exception {
assertEquals("incorrect number of elements", 1, json.length()); assertEquals("incorrect number of elements", 1, json.length());
JSONObject clusterinfo = json.getJSONObject("clusterMetrics"); JSONObject clusterinfo = json.getJSONObject("clusterMetrics");
assertEquals("incorrect number of elements", 23, clusterinfo.length()); assertEquals("incorrect number of elements", 24, clusterinfo.length());
verifyClusterMetrics( verifyClusterMetrics(
clusterinfo.getInt("appsSubmitted"), clusterinfo.getInt("appsCompleted"), clusterinfo.getInt("appsSubmitted"), clusterinfo.getInt("appsCompleted"),
clusterinfo.getInt("reservedMB"), clusterinfo.getInt("availableMB"), clusterinfo.getInt("reservedMB"), clusterinfo.getInt("availableMB"),
@ -435,16 +436,16 @@ public class TestRMWebServices extends JerseyTestBase {
clusterinfo.getInt("totalMB"), clusterinfo.getInt("totalNodes"), clusterinfo.getInt("totalMB"), clusterinfo.getInt("totalNodes"),
clusterinfo.getInt("lostNodes"), clusterinfo.getInt("unhealthyNodes"), clusterinfo.getInt("lostNodes"), clusterinfo.getInt("unhealthyNodes"),
clusterinfo.getInt("decommissionedNodes"), clusterinfo.getInt("decommissionedNodes"),
clusterinfo.getInt("rebootedNodes"),clusterinfo.getInt("activeNodes")); clusterinfo.getInt("rebootedNodes"),clusterinfo.getInt("activeNodes"),
clusterinfo.getInt("shutdownNodes"));
} }
public void verifyClusterMetrics(int submittedApps, int completedApps, public void verifyClusterMetrics(int submittedApps, int completedApps,
int reservedMB, int availableMB, int reservedMB, int availableMB, int allocMB, int reservedVirtualCores,
int allocMB, int reservedVirtualCores, int availableVirtualCores, int availableVirtualCores, int allocVirtualCores, int totalVirtualCores,
int allocVirtualCores, int totalVirtualCores, int containersAlloc, int totalMB, int totalNodes, int lostNodes,
int containersAlloc, int totalMB, int totalNodes, int unhealthyNodes, int decommissionedNodes, int rebootedNodes,
int lostNodes, int unhealthyNodes, int decommissionedNodes, int activeNodes, int shutdownNodes) throws JSONException, Exception {
int rebootedNodes, int activeNodes) throws JSONException, Exception {
ResourceScheduler rs = rm.getResourceScheduler(); ResourceScheduler rs = rm.getResourceScheduler();
QueueMetrics metrics = rs.getRootQueueMetrics(); QueueMetrics metrics = rs.getRootQueueMetrics();
@ -488,6 +489,8 @@ public class TestRMWebServices extends JerseyTestBase {
clusterMetrics.getNumRebootedNMs(), rebootedNodes); clusterMetrics.getNumRebootedNMs(), rebootedNodes);
assertEquals("activeNodes doesn't match", clusterMetrics.getNumActiveNMs(), assertEquals("activeNodes doesn't match", clusterMetrics.getNumActiveNMs(),
activeNodes); activeNodes);
assertEquals("shutdownNodes doesn't match",
clusterMetrics.getNumShutdownNMs(), shutdownNodes);
} }
@Test @Test

View File

@ -54,6 +54,8 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryStore;
import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore; import org.apache.hadoop.yarn.server.applicationhistoryservice.MemoryApplicationHistoryStore;
@ -642,6 +644,14 @@ public class MiniYARNCluster extends CompositeService {
} }
return response; return response;
} }
@Override
public UnRegisterNodeManagerResponse unRegisterNodeManager(
UnRegisterNodeManagerRequest request) throws YarnException,
IOException {
return recordFactory
.newRecordInstance(UnRegisterNodeManagerResponse.class);
}
}; };
} }