YARN-440. Flatten RegisterNodeManagerResponse. Contributed by Xuan Gong.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1461256 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d41e67b966
commit
66e90b205a
|
@ -64,6 +64,8 @@ Release 2.0.5-beta - UNRELEASED
|
||||||
|
|
||||||
YARN-439. Flatten NodeHeartbeatResponse. (Xuan Gong via sseth)
|
YARN-439. Flatten NodeHeartbeatResponse. (Xuan Gong via sseth)
|
||||||
|
|
||||||
|
YARN-440. Flatten RegisterNodeManagerResponse. (Xuan Gong via sseth)
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
|
@ -18,11 +18,16 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
|
|
||||||
public interface RegisterNodeManagerResponse {
|
public interface RegisterNodeManagerResponse {
|
||||||
public abstract RegistrationResponse getRegistrationResponse();
|
MasterKey getMasterKey();
|
||||||
|
|
||||||
public abstract void setRegistrationResponse(RegistrationResponse registrationResponse);
|
void setMasterKey(MasterKey secretKey);
|
||||||
|
|
||||||
|
NodeAction getNodeAction();
|
||||||
|
|
||||||
|
void setNodeAction(NodeAction nodeAction);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,12 +20,14 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||||
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.RegistrationResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerResponseProtoOrBuilder;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.RegistrationResponsePBImpl;
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -34,7 +36,7 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
|
||||||
RegisterNodeManagerResponseProto.Builder builder = null;
|
RegisterNodeManagerResponseProto.Builder builder = null;
|
||||||
boolean viaProto = false;
|
boolean viaProto = false;
|
||||||
|
|
||||||
private RegistrationResponse registartionResponse = null;
|
private MasterKey masterKey = null;
|
||||||
|
|
||||||
private boolean rebuild = false;
|
private boolean rebuild = false;
|
||||||
|
|
||||||
|
@ -56,9 +58,8 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
|
||||||
}
|
}
|
||||||
|
|
||||||
private void mergeLocalToBuilder() {
|
private void mergeLocalToBuilder() {
|
||||||
if (this.registartionResponse != null) {
|
if (this.masterKey != null) {
|
||||||
builder.setRegistrationResponse(convertToProtoFormat(this.registartionResponse));
|
builder.setMasterKey(convertToProtoFormat(this.masterKey));
|
||||||
this.registartionResponse = null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,38 +79,59 @@ public class RegisterNodeManagerResponsePBImpl extends ProtoBase<RegisterNodeMan
|
||||||
viaProto = false;
|
viaProto = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RegistrationResponse getRegistrationResponse() {
|
public MasterKey getMasterKey() {
|
||||||
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
|
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
if (this.registartionResponse != null) {
|
if (this.masterKey != null) {
|
||||||
return this.registartionResponse;
|
return this.masterKey;
|
||||||
}
|
}
|
||||||
if (!p.hasRegistrationResponse()) {
|
if (!p.hasMasterKey()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
this.registartionResponse = convertFromProtoFormat(p.getRegistrationResponse());
|
this.masterKey = convertFromProtoFormat(p.getMasterKey());
|
||||||
rebuild = true;
|
return this.masterKey;
|
||||||
return this.registartionResponse;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setRegistrationResponse(RegistrationResponse registrationResponse) {
|
public void setMasterKey(MasterKey masterKey) {
|
||||||
maybeInitBuilder();
|
maybeInitBuilder();
|
||||||
if (registrationResponse == null)
|
if (masterKey == null)
|
||||||
builder.clearRegistrationResponse();
|
builder.clearMasterKey();
|
||||||
this.registartionResponse = registrationResponse;
|
this.masterKey = masterKey;
|
||||||
rebuild = true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private RegistrationResponsePBImpl convertFromProtoFormat(RegistrationResponseProto p) {
|
@Override
|
||||||
return new RegistrationResponsePBImpl(p);
|
public NodeAction getNodeAction() {
|
||||||
|
RegisterNodeManagerResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if(!p.hasNodeAction()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return convertFromProtoFormat(p.getNodeAction());
|
||||||
}
|
}
|
||||||
|
|
||||||
private RegistrationResponseProto convertToProtoFormat(RegistrationResponse t) {
|
@Override
|
||||||
return ((RegistrationResponsePBImpl)t).getProto();
|
public void setNodeAction(NodeAction nodeAction) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (nodeAction == null) {
|
||||||
|
builder.clearNodeAction();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
builder.setNodeAction(convertToProtoFormat(nodeAction));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private NodeAction convertFromProtoFormat(NodeActionProto p) {
|
||||||
|
return NodeAction.valueOf(p.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
private NodeActionProto convertToProtoFormat(NodeAction t) {
|
||||||
|
return NodeActionProto.valueOf(t.name());
|
||||||
|
}
|
||||||
|
|
||||||
|
private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) {
|
||||||
|
return new MasterKeyPBImpl(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
private MasterKeyProto convertToProtoFormat(MasterKey t) {
|
||||||
|
return ((MasterKeyPBImpl)t).getProto();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,29 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.yarn.server.api.records;
|
|
||||||
|
|
||||||
public interface RegistrationResponse {
|
|
||||||
|
|
||||||
MasterKey getMasterKey();
|
|
||||||
|
|
||||||
void setMasterKey(MasterKey secretKey);
|
|
||||||
|
|
||||||
NodeAction getNodeAction();
|
|
||||||
|
|
||||||
void setNodeAction(NodeAction nodeAction);
|
|
||||||
}
|
|
|
@ -1,133 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.api.records.impl.pb;
|
|
||||||
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
|
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.RegistrationResponseProto;
|
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.RegistrationResponseProtoOrBuilder;
|
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
|
||||||
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
|
||||||
|
|
||||||
public class RegistrationResponsePBImpl extends
|
|
||||||
ProtoBase<RegistrationResponseProto> implements RegistrationResponse {
|
|
||||||
RegistrationResponseProto proto = RegistrationResponseProto.getDefaultInstance();
|
|
||||||
RegistrationResponseProto.Builder builder = null;
|
|
||||||
boolean viaProto = false;
|
|
||||||
|
|
||||||
private MasterKey masterKey = null;
|
|
||||||
|
|
||||||
public RegistrationResponsePBImpl() {
|
|
||||||
builder = RegistrationResponseProto.newBuilder();
|
|
||||||
}
|
|
||||||
|
|
||||||
public RegistrationResponsePBImpl(RegistrationResponseProto proto) {
|
|
||||||
this.proto = proto;
|
|
||||||
viaProto = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public RegistrationResponseProto getProto() {
|
|
||||||
|
|
||||||
mergeLocalToProto();
|
|
||||||
proto = viaProto ? proto : builder.build();
|
|
||||||
viaProto = true;
|
|
||||||
return proto;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void mergeLocalToBuilder() {
|
|
||||||
if (this.masterKey != null) {
|
|
||||||
builder.setMasterKey(convertToProtoFormat(this.masterKey));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void mergeLocalToProto() {
|
|
||||||
if (viaProto)
|
|
||||||
maybeInitBuilder();
|
|
||||||
mergeLocalToBuilder();
|
|
||||||
proto = builder.build();
|
|
||||||
|
|
||||||
viaProto = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void maybeInitBuilder() {
|
|
||||||
if (viaProto || builder == null) {
|
|
||||||
builder = RegistrationResponseProto.newBuilder(proto);
|
|
||||||
}
|
|
||||||
viaProto = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public MasterKey getMasterKey() {
|
|
||||||
RegistrationResponseProtoOrBuilder p = viaProto ? proto : builder;
|
|
||||||
if (this.masterKey != null) {
|
|
||||||
return this.masterKey;
|
|
||||||
}
|
|
||||||
if (!p.hasMasterKey()) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
this.masterKey = convertFromProtoFormat(p.getMasterKey());
|
|
||||||
return this.masterKey;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setMasterKey(MasterKey masterKey) {
|
|
||||||
maybeInitBuilder();
|
|
||||||
if (masterKey == null)
|
|
||||||
builder.clearMasterKey();
|
|
||||||
this.masterKey = masterKey;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public NodeAction getNodeAction() {
|
|
||||||
RegistrationResponseProtoOrBuilder p = viaProto ? proto : builder;
|
|
||||||
if(!p.hasNodeAction()) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return convertFromProtoFormat(p.getNodeAction());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setNodeAction(NodeAction nodeAction) {
|
|
||||||
maybeInitBuilder();
|
|
||||||
if (nodeAction == null) {
|
|
||||||
builder.clearNodeAction();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
builder.setNodeAction(convertToProtoFormat(nodeAction));
|
|
||||||
}
|
|
||||||
|
|
||||||
private NodeAction convertFromProtoFormat(NodeActionProto p) {
|
|
||||||
return NodeAction.valueOf(p.name());
|
|
||||||
}
|
|
||||||
|
|
||||||
private NodeActionProto convertToProtoFormat(NodeAction t) {
|
|
||||||
return NodeActionProto.valueOf(t.name());
|
|
||||||
}
|
|
||||||
|
|
||||||
private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) {
|
|
||||||
return new MasterKeyPBImpl(p);
|
|
||||||
}
|
|
||||||
|
|
||||||
private MasterKeyProto convertToProtoFormat(MasterKey t) {
|
|
||||||
return ((MasterKeyPBImpl)t).getProto();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -42,9 +42,3 @@ message MasterKeyProto {
|
||||||
optional bytes bytes = 2;
|
optional bytes bytes = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
message RegistrationResponseProto {
|
|
||||||
optional MasterKeyProto master_key = 1;
|
|
||||||
optional NodeActionProto nodeAction = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -29,8 +29,10 @@ message RegisterNodeManagerRequestProto {
|
||||||
optional int32 http_port = 3;
|
optional int32 http_port = 3;
|
||||||
optional ResourceProto resource = 4;
|
optional ResourceProto resource = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
message RegisterNodeManagerResponseProto {
|
message RegisterNodeManagerResponseProto {
|
||||||
optional RegistrationResponseProto registration_response = 1;
|
optional MasterKeyProto master_key = 1;
|
||||||
|
optional NodeActionProto nodeAction = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
message NodeHeartbeatRequestProto {
|
message NodeHeartbeatRequestProto {
|
||||||
|
|
|
@ -52,10 +52,10 @@ import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
import org.apache.hadoop.yarn.service.AbstractService;
|
import org.apache.hadoop.yarn.service.AbstractService;
|
||||||
|
@ -234,7 +234,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
request.setHttpPort(this.httpPort);
|
request.setHttpPort(this.httpPort);
|
||||||
request.setResource(this.totalResource);
|
request.setResource(this.totalResource);
|
||||||
request.setNodeId(this.nodeId);
|
request.setNodeId(this.nodeId);
|
||||||
RegistrationResponse regResponse;
|
RegisterNodeManagerResponse regNMResponse;
|
||||||
|
|
||||||
while(true) {
|
while(true) {
|
||||||
try {
|
try {
|
||||||
|
@ -242,9 +242,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
LOG.info("Connecting to ResourceManager at " + this.rmAddress
|
LOG.info("Connecting to ResourceManager at " + this.rmAddress
|
||||||
+ ". current no. of attempts is " + rmRetryCount);
|
+ ". current no. of attempts is " + rmRetryCount);
|
||||||
this.resourceTracker = getRMClient();
|
this.resourceTracker = getRMClient();
|
||||||
regResponse =
|
regNMResponse =
|
||||||
this.resourceTracker.registerNodeManager(request)
|
this.resourceTracker.registerNodeManager(request);
|
||||||
.getRegistrationResponse();
|
|
||||||
break;
|
break;
|
||||||
} catch(Throwable e) {
|
} catch(Throwable e) {
|
||||||
LOG.warn("Trying to connect to ResourceManager, " +
|
LOG.warn("Trying to connect to ResourceManager, " +
|
||||||
|
@ -267,13 +266,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// if the Resourcemanager instructs NM to shutdown.
|
// if the Resourcemanager instructs NM to shutdown.
|
||||||
if (NodeAction.SHUTDOWN.equals(regResponse.getNodeAction())) {
|
if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
|
||||||
throw new YarnException(
|
throw new YarnException(
|
||||||
"Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
|
"Recieved SHUTDOWN signal from Resourcemanager ,Registration of NodeManager failed");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (UserGroupInformation.isSecurityEnabled()) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
MasterKey masterKey = regResponse.getMasterKey();
|
MasterKey masterKey = regNMResponse.getMasterKey();
|
||||||
// do this now so that its set before we start heartbeating to RM
|
// do this now so that its set before we start heartbeating to RM
|
||||||
LOG.info("Security enabled - updating secret keys now");
|
LOG.info("Security enabled - updating secret keys now");
|
||||||
// It is expected that status updater is started by this point and
|
// It is expected that status updater is started by this point and
|
||||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
|
||||||
|
|
||||||
public class LocalRMInterface implements ResourceTracker {
|
public class LocalRMInterface implements ResourceTracker {
|
||||||
|
|
||||||
|
@ -34,9 +33,7 @@ public class LocalRMInterface implements ResourceTracker {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request) throws YarnRemoteException {
|
public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request) throws YarnRemoteException {
|
||||||
RegistrationResponse registrationResponse = recordFactory.newRecordInstance(RegistrationResponse.class);
|
|
||||||
RegisterNodeManagerResponse response = recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
|
RegisterNodeManagerResponse response = recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||||
response.setRegistrationResponse(registrationResponse);
|
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -62,12 +61,8 @@ public class MockNodeStatusUpdater extends NodeStatusUpdaterImpl {
|
||||||
@Override
|
@Override
|
||||||
public RegisterNodeManagerResponse registerNodeManager(
|
public RegisterNodeManagerResponse registerNodeManager(
|
||||||
RegisterNodeManagerRequest request) throws YarnRemoteException {
|
RegisterNodeManagerRequest request) throws YarnRemoteException {
|
||||||
RegistrationResponse regResponse = recordFactory
|
|
||||||
.newRecordInstance(RegistrationResponse.class);
|
|
||||||
|
|
||||||
RegisterNodeManagerResponse response = recordFactory
|
RegisterNodeManagerResponse response = recordFactory
|
||||||
.newRecordInstance(RegisterNodeManagerResponse.class);
|
.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||||
response.setRegistrationResponse(regResponse);
|
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -58,7 +58,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||||
|
@ -123,12 +122,9 @@ public class TestNodeStatusUpdater {
|
||||||
Assert.assertEquals(NetUtils.getHostPortString(expected), nodeId.toString());
|
Assert.assertEquals(NetUtils.getHostPortString(expected), nodeId.toString());
|
||||||
Assert.assertEquals(5 * 1024, resource.getMemory());
|
Assert.assertEquals(5 * 1024, resource.getMemory());
|
||||||
registeredNodes.add(nodeId);
|
registeredNodes.add(nodeId);
|
||||||
RegistrationResponse regResponse = recordFactory
|
|
||||||
.newRecordInstance(RegistrationResponse.class);
|
|
||||||
|
|
||||||
RegisterNodeManagerResponse response = recordFactory
|
RegisterNodeManagerResponse response = recordFactory
|
||||||
.newRecordInstance(RegisterNodeManagerResponse.class);
|
.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||||
response.setRegistrationResponse(regResponse);
|
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -320,10 +316,7 @@ public class TestNodeStatusUpdater {
|
||||||
|
|
||||||
RegisterNodeManagerResponse response = recordFactory
|
RegisterNodeManagerResponse response = recordFactory
|
||||||
.newRecordInstance(RegisterNodeManagerResponse.class);
|
.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||||
RegistrationResponse regResponse = recordFactory
|
response.setNodeAction(registerNodeAction );
|
||||||
.newRecordInstance(RegistrationResponse.class);
|
|
||||||
regResponse.setNodeAction(registerNodeAction );
|
|
||||||
response.setRegistrationResponse(regResponse);
|
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
|
@ -358,10 +351,7 @@ public class TestNodeStatusUpdater {
|
||||||
|
|
||||||
RegisterNodeManagerResponse response =
|
RegisterNodeManagerResponse response =
|
||||||
recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
|
recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||||
RegistrationResponse regResponse =
|
response.setNodeAction(registerNodeAction);
|
||||||
recordFactory.newRecordInstance(RegistrationResponse.class);
|
|
||||||
regResponse.setNodeAction(registerNodeAction);
|
|
||||||
response.setRegistrationResponse(regResponse);
|
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
||||||
|
@ -150,22 +149,19 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
|
|
||||||
RegisterNodeManagerResponse response = recordFactory
|
RegisterNodeManagerResponse response = recordFactory
|
||||||
.newRecordInstance(RegisterNodeManagerResponse.class);
|
.newRecordInstance(RegisterNodeManagerResponse.class);
|
||||||
RegistrationResponse regResponse = recordFactory
|
|
||||||
.newRecordInstance(RegistrationResponse.class);
|
|
||||||
|
|
||||||
// Check if this node is a 'valid' node
|
// Check if this node is a 'valid' node
|
||||||
if (!this.nodesListManager.isValidNode(host)) {
|
if (!this.nodesListManager.isValidNode(host)) {
|
||||||
LOG.info("Disallowed NodeManager from " + host
|
LOG.info("Disallowed NodeManager from " + host
|
||||||
+ ", Sending SHUTDOWN signal to the NodeManager.");
|
+ ", Sending SHUTDOWN signal to the NodeManager.");
|
||||||
regResponse.setNodeAction(NodeAction.SHUTDOWN);
|
response.setNodeAction(NodeAction.SHUTDOWN);
|
||||||
response.setRegistrationResponse(regResponse);
|
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isSecurityEnabled()) {
|
if (isSecurityEnabled()) {
|
||||||
MasterKey nextMasterKeyForNode =
|
MasterKey nextMasterKeyForNode =
|
||||||
this.containerTokenSecretManager.getCurrentKey();
|
this.containerTokenSecretManager.getCurrentKey();
|
||||||
regResponse.setMasterKey(nextMasterKeyForNode);
|
response.setMasterKey(nextMasterKeyForNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
|
RMNode rmNode = new RMNodeImpl(nodeId, rmContext, host, cmPort, httpPort,
|
||||||
|
@ -188,8 +184,7 @@ public class ResourceTrackerService extends AbstractService implements
|
||||||
+ " httpPort: " + httpPort + ") " + "registered with capability: "
|
+ " httpPort: " + httpPort + ") " + "registered with capability: "
|
||||||
+ capability + ", assigned nodeId " + nodeId);
|
+ capability + ", assigned nodeId " + nodeId);
|
||||||
|
|
||||||
regResponse.setNodeAction(NodeAction.NORMAL);
|
response.setNodeAction(NodeAction.NORMAL);
|
||||||
response.setRegistrationResponse(regResponse);
|
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,9 +35,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
|
||||||
import org.apache.hadoop.yarn.util.BuilderUtils;
|
import org.apache.hadoop.yarn.util.BuilderUtils;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
|
@ -79,7 +79,7 @@ public class MockNM {
|
||||||
nodeHeartbeat(conts, true);
|
nodeHeartbeat(conts, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public RegistrationResponse registerNode() throws Exception {
|
public RegisterNodeManagerResponse registerNode() throws Exception {
|
||||||
RegisterNodeManagerRequest req = Records.newRecord(
|
RegisterNodeManagerRequest req = Records.newRecord(
|
||||||
RegisterNodeManagerRequest.class);
|
RegisterNodeManagerRequest.class);
|
||||||
req.setNodeId(nodeId);
|
req.setNodeId(nodeId);
|
||||||
|
@ -87,8 +87,8 @@ public class MockNM {
|
||||||
Resource resource = Records.newRecord(Resource.class);
|
Resource resource = Records.newRecord(Resource.class);
|
||||||
resource.setMemory(memory);
|
resource.setMemory(memory);
|
||||||
req.setResource(resource);
|
req.setResource(resource);
|
||||||
RegistrationResponse registrationResponse =
|
RegisterNodeManagerResponse registrationResponse =
|
||||||
resourceTracker.registerNodeManager(req).getRegistrationResponse();
|
resourceTracker.registerNodeManager(req);
|
||||||
this.currentMasterKey = registrationResponse.getMasterKey();
|
this.currentMasterKey = registrationResponse.getMasterKey();
|
||||||
return registrationResponse;
|
return registrationResponse;
|
||||||
}
|
}
|
||||||
|
|
|
@ -96,8 +96,7 @@ public class NodeManager implements ContainerManager {
|
||||||
request.setNodeId(this.nodeId);
|
request.setNodeId(this.nodeId);
|
||||||
request.setResource(capability);
|
request.setResource(capability);
|
||||||
request.setNodeId(this.nodeId);
|
request.setNodeId(this.nodeId);
|
||||||
resourceTrackerService.registerNodeManager(request)
|
resourceTrackerService.registerNodeManager(request);
|
||||||
.getRegistrationResponse();
|
|
||||||
this.schedulerNode = new FiCaSchedulerNode(rmContext.getRMNodes().get(
|
this.schedulerNode = new FiCaSchedulerNode(rmContext.getRMNodes().get(
|
||||||
this.nodeId));
|
this.nodeId));
|
||||||
|
|
||||||
|
|
|
@ -241,7 +241,7 @@ public class TestResourceTrackerService {
|
||||||
req.setHttpPort(1234);
|
req.setHttpPort(1234);
|
||||||
// trying to register a invalid node.
|
// trying to register a invalid node.
|
||||||
RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req);
|
RegisterNodeManagerResponse response = resourceTrackerService.registerNodeManager(req);
|
||||||
Assert.assertEquals(NodeAction.SHUTDOWN,response.getRegistrationResponse().getNodeAction());
|
Assert.assertEquals(NodeAction.SHUTDOWN,response.getNodeAction());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -166,7 +166,7 @@ public class TestNMExpiry {
|
||||||
request3.setHttpPort(0);
|
request3.setHttpPort(0);
|
||||||
request3.setResource(capability);
|
request3.setResource(capability);
|
||||||
resourceTrackerService
|
resourceTrackerService
|
||||||
.registerNodeManager(request3).getRegistrationResponse();
|
.registerNodeManager(request3);
|
||||||
|
|
||||||
/* test to see if hostanme 3 does not expire */
|
/* test to see if hostanme 3 does not expire */
|
||||||
stopT = false;
|
stopT = false;
|
||||||
|
|
|
@ -377,9 +377,7 @@ public class MiniYARNCluster extends CompositeService {
|
||||||
RegisterNodeManagerResponse response = recordFactory.
|
RegisterNodeManagerResponse response = recordFactory.
|
||||||
newRecordInstance(RegisterNodeManagerResponse.class);
|
newRecordInstance(RegisterNodeManagerResponse.class);
|
||||||
try {
|
try {
|
||||||
response.setRegistrationResponse(rt
|
response = rt.registerNodeManager(request);
|
||||||
.registerNodeManager(request)
|
|
||||||
.getRegistrationResponse());
|
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.info("Exception in node registration from "
|
LOG.info("Exception in node registration from "
|
||||||
+ request.getNodeId().toString(), ioe);
|
+ request.getNodeId().toString(), ioe);
|
||||||
|
|
|
@ -28,8 +28,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
|
@ -62,7 +62,7 @@ public class TestRMNMSecretKeys {
|
||||||
rm.start();
|
rm.start();
|
||||||
|
|
||||||
MockNM nm = new MockNM("host:1234", 3072, rm.getResourceTrackerService());
|
MockNM nm = new MockNM("host:1234", 3072, rm.getResourceTrackerService());
|
||||||
RegistrationResponse registrationResponse = nm.registerNode();
|
RegisterNodeManagerResponse registrationResponse = nm.registerNode();
|
||||||
MasterKey masterKey = registrationResponse.getMasterKey();
|
MasterKey masterKey = registrationResponse.getMasterKey();
|
||||||
Assert.assertNotNull("Registration should cause a key-update!", masterKey);
|
Assert.assertNotNull("Registration should cause a key-update!", masterKey);
|
||||||
dispatcher.await();
|
dispatcher.await();
|
||||||
|
|
Loading…
Reference in New Issue