merge YARN-439 from trunk. Flatten NodeHeartbeatResponse. Contributed by Xuan Gong.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1460812 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a427b71a25
commit
4410861077
|
@ -7,6 +7,8 @@ Release 2.0.5-beta - UNRELEASED
|
|||
YARN-396. Rationalize AllocateResponse in RM Scheduler API. (Zhijie Shen
|
||||
via hitesh)
|
||||
|
||||
YARN-439. Flatten NodeHeartbeatResponse. (Xuan Gong via sseth)
|
||||
|
||||
NEW FEATURES
|
||||
|
||||
IMPROVEMENTS
|
||||
|
|
|
@ -18,10 +18,28 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords;
|
||||
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
|
||||
public interface NodeHeartbeatResponse {
|
||||
public abstract HeartbeatResponse getHeartbeatResponse();
|
||||
int getResponseId();
|
||||
NodeAction getNodeAction();
|
||||
|
||||
public abstract void setHeartbeatResponse(HeartbeatResponse heartbeatResponse);
|
||||
List<ContainerId> getContainersToCleanup();
|
||||
|
||||
List<ApplicationId> getApplicationsToCleanup();
|
||||
|
||||
void setResponseId(int responseId);
|
||||
void setNodeAction(NodeAction action);
|
||||
|
||||
MasterKey getMasterKey();
|
||||
void setMasterKey(MasterKey secretKey);
|
||||
|
||||
void addAllContainersToCleanup(List<ContainerId> containers);
|
||||
|
||||
void addAllApplicationsToCleanup(List<ApplicationId> applications);
|
||||
}
|
||||
|
|
|
@ -18,14 +18,25 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProto;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.HeartbeatResponsePBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||
|
||||
|
||||
|
||||
|
@ -34,8 +45,9 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
|
|||
NodeHeartbeatResponseProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
private HeartbeatResponse heartbeatResponse = null;
|
||||
|
||||
private List<ContainerId> containersToCleanup = null;
|
||||
private List<ApplicationId> applicationsToCleanup = null;
|
||||
private MasterKey masterKey = null;
|
||||
|
||||
public NodeHeartbeatResponsePBImpl() {
|
||||
builder = NodeHeartbeatResponseProto.newBuilder();
|
||||
|
@ -54,8 +66,14 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
|
|||
}
|
||||
|
||||
private void mergeLocalToBuilder() {
|
||||
if (this.heartbeatResponse != null) {
|
||||
builder.setHeartbeatResponse(convertToProtoFormat(this.heartbeatResponse));
|
||||
if (this.containersToCleanup != null) {
|
||||
addContainersToCleanupToProto();
|
||||
}
|
||||
if (this.applicationsToCleanup != null) {
|
||||
addApplicationsToCleanupToProto();
|
||||
}
|
||||
if (this.masterKey != null) {
|
||||
builder.setMasterKey(convertToProtoFormat(this.masterKey));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -76,34 +94,213 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
|
|||
|
||||
|
||||
@Override
|
||||
public HeartbeatResponse getHeartbeatResponse() {
|
||||
public int getResponseId() {
|
||||
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.heartbeatResponse != null) {
|
||||
return this.heartbeatResponse;
|
||||
}
|
||||
if (!p.hasHeartbeatResponse()) {
|
||||
return null;
|
||||
}
|
||||
this.heartbeatResponse = convertFromProtoFormat(p.getHeartbeatResponse());
|
||||
return this.heartbeatResponse;
|
||||
return (p.getResponseId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setHeartbeatResponse(HeartbeatResponse heartbeatResponse) {
|
||||
public void setResponseId(int responseId) {
|
||||
maybeInitBuilder();
|
||||
if (heartbeatResponse == null)
|
||||
builder.clearHeartbeatResponse();
|
||||
this.heartbeatResponse = heartbeatResponse;
|
||||
builder.setResponseId((responseId));
|
||||
}
|
||||
|
||||
private HeartbeatResponsePBImpl convertFromProtoFormat(HeartbeatResponseProto p) {
|
||||
return new HeartbeatResponsePBImpl(p);
|
||||
@Override
|
||||
public MasterKey getMasterKey() {
|
||||
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.masterKey != null) {
|
||||
return this.masterKey;
|
||||
}
|
||||
if (!p.hasMasterKey()) {
|
||||
return null;
|
||||
}
|
||||
this.masterKey = convertFromProtoFormat(p.getMasterKey());
|
||||
return this.masterKey;
|
||||
}
|
||||
|
||||
private HeartbeatResponseProto convertToProtoFormat(HeartbeatResponse t) {
|
||||
return ((HeartbeatResponsePBImpl)t).getProto();
|
||||
@Override
|
||||
public void setMasterKey(MasterKey masterKey) {
|
||||
maybeInitBuilder();
|
||||
if (masterKey == null)
|
||||
builder.clearMasterKey();
|
||||
this.masterKey = masterKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeAction getNodeAction() {
|
||||
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (!p.hasNodeAction()) {
|
||||
return null;
|
||||
}
|
||||
return (convertFromProtoFormat(p.getNodeAction()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNodeAction(NodeAction nodeAction) {
|
||||
maybeInitBuilder();
|
||||
if (nodeAction == null) {
|
||||
builder.clearNodeAction();
|
||||
return;
|
||||
}
|
||||
builder.setNodeAction(convertToProtoFormat(nodeAction));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ContainerId> getContainersToCleanup() {
|
||||
initContainersToCleanup();
|
||||
return this.containersToCleanup;
|
||||
}
|
||||
|
||||
private void initContainersToCleanup() {
|
||||
if (this.containersToCleanup != null) {
|
||||
return;
|
||||
}
|
||||
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
List<ContainerIdProto> list = p.getContainersToCleanupList();
|
||||
this.containersToCleanup = new ArrayList<ContainerId>();
|
||||
|
||||
for (ContainerIdProto c : list) {
|
||||
this.containersToCleanup.add(convertFromProtoFormat(c));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addAllContainersToCleanup(
|
||||
final List<ContainerId> containersToCleanup) {
|
||||
if (containersToCleanup == null)
|
||||
return;
|
||||
initContainersToCleanup();
|
||||
this.containersToCleanup.addAll(containersToCleanup);
|
||||
}
|
||||
|
||||
private void addContainersToCleanupToProto() {
|
||||
maybeInitBuilder();
|
||||
builder.clearContainersToCleanup();
|
||||
if (containersToCleanup == null)
|
||||
return;
|
||||
Iterable<ContainerIdProto> iterable = new Iterable<ContainerIdProto>() {
|
||||
|
||||
@Override
|
||||
public Iterator<ContainerIdProto> iterator() {
|
||||
return new Iterator<ContainerIdProto>() {
|
||||
|
||||
Iterator<ContainerId> iter = containersToCleanup.iterator();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return iter.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerIdProto next() {
|
||||
return convertToProtoFormat(iter.next());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
};
|
||||
builder.addAllContainersToCleanup(iterable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ApplicationId> getApplicationsToCleanup() {
|
||||
initApplicationsToCleanup();
|
||||
return this.applicationsToCleanup;
|
||||
}
|
||||
|
||||
private void initApplicationsToCleanup() {
|
||||
if (this.applicationsToCleanup != null) {
|
||||
return;
|
||||
}
|
||||
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
List<ApplicationIdProto> list = p.getApplicationsToCleanupList();
|
||||
this.applicationsToCleanup = new ArrayList<ApplicationId>();
|
||||
|
||||
for (ApplicationIdProto c : list) {
|
||||
this.applicationsToCleanup.add(convertFromProtoFormat(c));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addAllApplicationsToCleanup(
|
||||
final List<ApplicationId> applicationsToCleanup) {
|
||||
if (applicationsToCleanup == null)
|
||||
return;
|
||||
initApplicationsToCleanup();
|
||||
this.applicationsToCleanup.addAll(applicationsToCleanup);
|
||||
}
|
||||
|
||||
private void addApplicationsToCleanupToProto() {
|
||||
maybeInitBuilder();
|
||||
builder.clearApplicationsToCleanup();
|
||||
if (applicationsToCleanup == null)
|
||||
return;
|
||||
Iterable<ApplicationIdProto> iterable = new Iterable<ApplicationIdProto>() {
|
||||
|
||||
@Override
|
||||
public Iterator<ApplicationIdProto> iterator() {
|
||||
return new Iterator<ApplicationIdProto>() {
|
||||
|
||||
Iterator<ApplicationId> iter = applicationsToCleanup.iterator();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return iter.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationIdProto next() {
|
||||
return convertToProtoFormat(iter.next());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
};
|
||||
builder.addAllApplicationsToCleanup(iterable);
|
||||
}
|
||||
|
||||
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
|
||||
return new ContainerIdPBImpl(p);
|
||||
}
|
||||
|
||||
private ContainerIdProto convertToProtoFormat(ContainerId t) {
|
||||
return ((ContainerIdPBImpl) t).getProto();
|
||||
}
|
||||
|
||||
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
|
||||
return new ApplicationIdPBImpl(p);
|
||||
}
|
||||
|
||||
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
|
||||
return ((ApplicationIdPBImpl) t).getProto();
|
||||
}
|
||||
|
||||
private NodeAction convertFromProtoFormat(NodeActionProto p) {
|
||||
return NodeAction.valueOf(p.name());
|
||||
}
|
||||
|
||||
private NodeActionProto convertToProtoFormat(NodeAction t) {
|
||||
return NodeActionProto.valueOf(t.name());
|
||||
}
|
||||
|
||||
private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) {
|
||||
return new MasterKeyPBImpl(p);
|
||||
}
|
||||
|
||||
private MasterKeyProto convertToProtoFormat(MasterKey t) {
|
||||
return ((MasterKeyPBImpl) t).getProto();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,52 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.api.records;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
||||
public interface HeartbeatResponse {
|
||||
int getResponseId();
|
||||
NodeAction getNodeAction();
|
||||
|
||||
List<ContainerId> getContainersToCleanupList();
|
||||
ContainerId getContainerToCleanup(int index);
|
||||
int getContainersToCleanupCount();
|
||||
|
||||
List<ApplicationId> getApplicationsToCleanupList();
|
||||
ApplicationId getApplicationsToCleanup(int index);
|
||||
int getApplicationsToCleanupCount();
|
||||
|
||||
void setResponseId(int responseId);
|
||||
void setNodeAction(NodeAction action);
|
||||
|
||||
MasterKey getMasterKey();
|
||||
void setMasterKey(MasterKey secretKey);
|
||||
|
||||
void addAllContainersToCleanup(List<ContainerId> containers);
|
||||
void addContainerToCleanup(ContainerId container);
|
||||
void removeContainerToCleanup(int index);
|
||||
void clearContainersToCleanup();
|
||||
|
||||
void addAllApplicationsToCleanup(List<ApplicationId> applications);
|
||||
void addApplicationToCleanup(ApplicationId applicationId);
|
||||
void removeApplicationToCleanup(int index);
|
||||
void clearApplicationsToCleanup();
|
||||
}
|
|
@ -1,350 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.api.records.impl.pb;
|
||||
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ProtoBase;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProtoOrBuilder;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
|
||||
public class HeartbeatResponsePBImpl extends
|
||||
ProtoBase<HeartbeatResponseProto> implements HeartbeatResponse {
|
||||
HeartbeatResponseProto proto = HeartbeatResponseProto.getDefaultInstance();
|
||||
HeartbeatResponseProto.Builder builder = null;
|
||||
boolean viaProto = false;
|
||||
|
||||
private List<ContainerId> containersToCleanup = null;
|
||||
private List<ApplicationId> applicationsToCleanup = null;
|
||||
private MasterKey masterKey = null;
|
||||
|
||||
public HeartbeatResponsePBImpl() {
|
||||
builder = HeartbeatResponseProto.newBuilder();
|
||||
}
|
||||
|
||||
public HeartbeatResponsePBImpl(HeartbeatResponseProto proto) {
|
||||
this.proto = proto;
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
public HeartbeatResponseProto getProto() {
|
||||
|
||||
mergeLocalToProto();
|
||||
proto = viaProto ? proto : builder.build();
|
||||
viaProto = true;
|
||||
return proto;
|
||||
}
|
||||
|
||||
private void mergeLocalToBuilder() {
|
||||
if (this.containersToCleanup != null) {
|
||||
addContainersToCleanupToProto();
|
||||
}
|
||||
if (this.applicationsToCleanup != null) {
|
||||
addApplicationsToCleanupToProto();
|
||||
}
|
||||
if (this.masterKey != null) {
|
||||
builder.setMasterKey(convertToProtoFormat(this.masterKey));
|
||||
}
|
||||
}
|
||||
|
||||
private void mergeLocalToProto() {
|
||||
if (viaProto)
|
||||
maybeInitBuilder();
|
||||
mergeLocalToBuilder();
|
||||
proto = builder.build();
|
||||
viaProto = true;
|
||||
}
|
||||
|
||||
private void maybeInitBuilder() {
|
||||
if (viaProto || builder == null) {
|
||||
builder = HeartbeatResponseProto.newBuilder(proto);
|
||||
}
|
||||
viaProto = false;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int getResponseId() {
|
||||
HeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
return (p.getResponseId());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setResponseId(int responseId) {
|
||||
maybeInitBuilder();
|
||||
builder.setResponseId((responseId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public MasterKey getMasterKey() {
|
||||
HeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if (this.masterKey != null) {
|
||||
return this.masterKey;
|
||||
}
|
||||
if (!p.hasMasterKey()) {
|
||||
return null;
|
||||
}
|
||||
this.masterKey = convertFromProtoFormat(p.getMasterKey());
|
||||
return this.masterKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMasterKey(MasterKey masterKey) {
|
||||
maybeInitBuilder();
|
||||
if (masterKey == null)
|
||||
builder.clearMasterKey();
|
||||
this.masterKey = masterKey;
|
||||
}
|
||||
|
||||
@Override
|
||||
public NodeAction getNodeAction() {
|
||||
HeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
if(!p.hasNodeAction()) {
|
||||
return null;
|
||||
}
|
||||
return (convertFromProtoFormat(p.getNodeAction()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setNodeAction(NodeAction nodeAction) {
|
||||
maybeInitBuilder();
|
||||
if (nodeAction == null) {
|
||||
builder.clearNodeAction();
|
||||
return;
|
||||
}
|
||||
builder.setNodeAction(convertToProtoFormat(nodeAction));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ContainerId> getContainersToCleanupList() {
|
||||
initContainersToCleanup();
|
||||
return this.containersToCleanup;
|
||||
}
|
||||
@Override
|
||||
public ContainerId getContainerToCleanup(int index) {
|
||||
initContainersToCleanup();
|
||||
return this.containersToCleanup.get(index);
|
||||
}
|
||||
@Override
|
||||
public int getContainersToCleanupCount() {
|
||||
initContainersToCleanup();
|
||||
return this.containersToCleanup.size();
|
||||
}
|
||||
|
||||
private void initContainersToCleanup() {
|
||||
if (this.containersToCleanup != null) {
|
||||
return;
|
||||
}
|
||||
HeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
List<ContainerIdProto> list = p.getContainersToCleanupList();
|
||||
this.containersToCleanup = new ArrayList<ContainerId>();
|
||||
|
||||
for (ContainerIdProto c : list) {
|
||||
this.containersToCleanup.add(convertFromProtoFormat(c));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addAllContainersToCleanup(final List<ContainerId> containersToCleanup) {
|
||||
if (containersToCleanup == null)
|
||||
return;
|
||||
initContainersToCleanup();
|
||||
this.containersToCleanup.addAll(containersToCleanup);
|
||||
}
|
||||
|
||||
private void addContainersToCleanupToProto() {
|
||||
maybeInitBuilder();
|
||||
builder.clearContainersToCleanup();
|
||||
if (containersToCleanup == null)
|
||||
return;
|
||||
Iterable<ContainerIdProto> iterable = new Iterable<ContainerIdProto>() {
|
||||
@Override
|
||||
public Iterator<ContainerIdProto> iterator() {
|
||||
return new Iterator<ContainerIdProto>() {
|
||||
|
||||
Iterator<ContainerId> iter = containersToCleanup.iterator();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return iter.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerIdProto next() {
|
||||
return convertToProtoFormat(iter.next());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
};
|
||||
builder.addAllContainersToCleanup(iterable);
|
||||
}
|
||||
@Override
|
||||
public void addContainerToCleanup(ContainerId containersToCleanup) {
|
||||
initContainersToCleanup();
|
||||
this.containersToCleanup.add(containersToCleanup);
|
||||
}
|
||||
@Override
|
||||
public void removeContainerToCleanup(int index) {
|
||||
initContainersToCleanup();
|
||||
this.containersToCleanup.remove(index);
|
||||
}
|
||||
@Override
|
||||
public void clearContainersToCleanup() {
|
||||
initContainersToCleanup();
|
||||
this.containersToCleanup.clear();
|
||||
}
|
||||
@Override
|
||||
public List<ApplicationId> getApplicationsToCleanupList() {
|
||||
initApplicationsToCleanup();
|
||||
return this.applicationsToCleanup;
|
||||
}
|
||||
@Override
|
||||
public ApplicationId getApplicationsToCleanup(int index) {
|
||||
initApplicationsToCleanup();
|
||||
return this.applicationsToCleanup.get(index);
|
||||
}
|
||||
@Override
|
||||
public int getApplicationsToCleanupCount() {
|
||||
initApplicationsToCleanup();
|
||||
return this.applicationsToCleanup.size();
|
||||
}
|
||||
|
||||
private void initApplicationsToCleanup() {
|
||||
if (this.applicationsToCleanup != null) {
|
||||
return;
|
||||
}
|
||||
HeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||
List<ApplicationIdProto> list = p.getApplicationsToCleanupList();
|
||||
this.applicationsToCleanup = new ArrayList<ApplicationId>();
|
||||
|
||||
for (ApplicationIdProto c : list) {
|
||||
this.applicationsToCleanup.add(convertFromProtoFormat(c));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addAllApplicationsToCleanup(final List<ApplicationId> applicationsToCleanup) {
|
||||
if (applicationsToCleanup == null)
|
||||
return;
|
||||
initApplicationsToCleanup();
|
||||
this.applicationsToCleanup.addAll(applicationsToCleanup);
|
||||
}
|
||||
|
||||
private void addApplicationsToCleanupToProto() {
|
||||
maybeInitBuilder();
|
||||
builder.clearApplicationsToCleanup();
|
||||
if (applicationsToCleanup == null)
|
||||
return;
|
||||
Iterable<ApplicationIdProto> iterable = new Iterable<ApplicationIdProto>() {
|
||||
@Override
|
||||
public Iterator<ApplicationIdProto> iterator() {
|
||||
return new Iterator<ApplicationIdProto>() {
|
||||
|
||||
Iterator<ApplicationId> iter = applicationsToCleanup.iterator();
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return iter.hasNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationIdProto next() {
|
||||
return convertToProtoFormat(iter.next());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
};
|
||||
builder.addAllApplicationsToCleanup(iterable);
|
||||
}
|
||||
@Override
|
||||
public void addApplicationToCleanup(ApplicationId applicationsToCleanup) {
|
||||
initApplicationsToCleanup();
|
||||
this.applicationsToCleanup.add(applicationsToCleanup);
|
||||
}
|
||||
@Override
|
||||
public void removeApplicationToCleanup(int index) {
|
||||
initApplicationsToCleanup();
|
||||
this.applicationsToCleanup.remove(index);
|
||||
}
|
||||
@Override
|
||||
public void clearApplicationsToCleanup() {
|
||||
initApplicationsToCleanup();
|
||||
this.applicationsToCleanup.clear();
|
||||
}
|
||||
|
||||
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
|
||||
return new ContainerIdPBImpl(p);
|
||||
}
|
||||
|
||||
private ContainerIdProto convertToProtoFormat(ContainerId t) {
|
||||
return ((ContainerIdPBImpl)t).getProto();
|
||||
}
|
||||
|
||||
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
|
||||
return new ApplicationIdPBImpl(p);
|
||||
}
|
||||
|
||||
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
|
||||
return ((ApplicationIdPBImpl)t).getProto();
|
||||
}
|
||||
|
||||
private NodeAction convertFromProtoFormat(NodeActionProto p) {
|
||||
return NodeAction.valueOf(p.name());
|
||||
}
|
||||
|
||||
private NodeActionProto convertToProtoFormat(NodeAction t) {
|
||||
return NodeActionProto.valueOf(t.name());
|
||||
}
|
||||
|
||||
private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) {
|
||||
return new MasterKeyPBImpl(p);
|
||||
}
|
||||
|
||||
private MasterKeyProto convertToProtoFormat(MasterKey t) {
|
||||
return ((MasterKeyPBImpl)t).getProto();
|
||||
}
|
||||
}
|
|
@ -47,11 +47,4 @@ message RegistrationResponseProto {
|
|||
optional NodeActionProto nodeAction = 2;
|
||||
}
|
||||
|
||||
message HeartbeatResponseProto {
|
||||
optional int32 response_id = 1;
|
||||
optional MasterKeyProto master_key = 2;
|
||||
optional NodeActionProto nodeAction = 3;
|
||||
repeated ContainerIdProto containers_to_cleanup = 4;
|
||||
repeated ApplicationIdProto applications_to_cleanup = 5;
|
||||
}
|
||||
|
||||
|
|
|
@ -38,6 +38,11 @@ message NodeHeartbeatRequestProto {
|
|||
optional MasterKeyProto last_known_master_key = 2;
|
||||
}
|
||||
|
||||
|
||||
message NodeHeartbeatResponseProto {
|
||||
optional HeartbeatResponseProto heartbeat_response = 1;
|
||||
optional int32 response_id = 1;
|
||||
optional MasterKeyProto master_key = 2;
|
||||
optional NodeActionProto nodeAction = 3;
|
||||
repeated ContainerIdProto containers_to_cleanup = 4;
|
||||
repeated ApplicationIdProto applications_to_cleanup = 5;
|
||||
}
|
||||
|
|
|
@ -25,8 +25,6 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
|||
import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.HeartbeatResponsePBImpl;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestRecordFactory {
|
||||
|
@ -34,15 +32,6 @@ public class TestRecordFactory {
|
|||
@Test
|
||||
public void testPbRecordFactory() {
|
||||
RecordFactory pbRecordFactory = RecordFactoryPBImpl.get();
|
||||
|
||||
try {
|
||||
HeartbeatResponse response = pbRecordFactory.newRecordInstance(HeartbeatResponse.class);
|
||||
Assert.assertEquals(HeartbeatResponsePBImpl.class, response.getClass());
|
||||
} catch (YarnException e) {
|
||||
e.printStackTrace();
|
||||
Assert.fail("Failed to crete record");
|
||||
}
|
||||
|
||||
try {
|
||||
NodeHeartbeatRequest request = pbRecordFactory.newRecordInstance(NodeHeartbeatRequest.class);
|
||||
Assert.assertEquals(NodeHeartbeatRequestPBImpl.class, request.getClass());
|
||||
|
|
|
@ -50,8 +50,8 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||
import org.apache.hadoop.yarn.server.api.ResourceTracker;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
|
@ -408,8 +408,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context
|
||||
.getContainerTokenSecretManager().getCurrentKey());
|
||||
}
|
||||
HeartbeatResponse response =
|
||||
resourceTracker.nodeHeartbeat(request).getHeartbeatResponse();
|
||||
NodeHeartbeatResponse response =
|
||||
resourceTracker.nodeHeartbeat(request);
|
||||
|
||||
// See if the master-key has rolled over
|
||||
if (isSecurityEnabled()) {
|
||||
|
@ -439,14 +439,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
|||
|
||||
lastHeartBeatID = response.getResponseId();
|
||||
List<ContainerId> containersToCleanup = response
|
||||
.getContainersToCleanupList();
|
||||
.getContainersToCleanup();
|
||||
if (containersToCleanup.size() != 0) {
|
||||
dispatcher.getEventHandler().handle(
|
||||
new CMgrCompletedContainersEvent(containersToCleanup,
|
||||
CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
|
||||
}
|
||||
List<ApplicationId> appsToCleanup =
|
||||
response.getApplicationsToCleanupList();
|
||||
response.getApplicationsToCleanup();
|
||||
//Only start tracking for keepAlive on FINISH_APP
|
||||
trackAppsForKeepAlive(appsToCleanup);
|
||||
if (appsToCleanup.size() != 0) {
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
|
||||
|
@ -79,13 +78,9 @@ public class MockNodeStatusUpdater extends NodeStatusUpdaterImpl {
|
|||
LOG.info("Got heartbeat number " + heartBeatID);
|
||||
nodeStatus.setResponseId(heartBeatID++);
|
||||
|
||||
HeartbeatResponse response = recordFactory
|
||||
.newRecordInstance(HeartbeatResponse.class);
|
||||
response.setResponseId(heartBeatID);
|
||||
|
||||
NodeHeartbeatResponse nhResponse = recordFactory
|
||||
.newRecordInstance(NodeHeartbeatResponse.class);
|
||||
nhResponse.setHeartbeatResponse(response);
|
||||
nhResponse.setResponseId(heartBeatID);
|
||||
return nhResponse;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,7 +56,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
||||
|
@ -218,13 +217,10 @@ public class TestNodeStatusUpdater {
|
|||
this.context.getContainers();
|
||||
Assert.assertEquals(2, activeContainers.size());
|
||||
}
|
||||
HeartbeatResponse response = recordFactory
|
||||
.newRecordInstance(HeartbeatResponse.class);
|
||||
response.setResponseId(heartBeatID);
|
||||
|
||||
NodeHeartbeatResponse nhResponse = recordFactory
|
||||
.newRecordInstance(NodeHeartbeatResponse.class);
|
||||
nhResponse.setHeartbeatResponse(response);
|
||||
nhResponse.setResponseId(heartBeatID);
|
||||
return nhResponse;
|
||||
}
|
||||
}
|
||||
|
@ -335,14 +331,11 @@ public class TestNodeStatusUpdater {
|
|||
throws YarnRemoteException {
|
||||
NodeStatus nodeStatus = request.getNodeStatus();
|
||||
nodeStatus.setResponseId(heartBeatID++);
|
||||
HeartbeatResponse response = recordFactory
|
||||
.newRecordInstance(HeartbeatResponse.class);
|
||||
response.setResponseId(heartBeatID);
|
||||
response.setNodeAction(heartBeatNodeAction);
|
||||
|
||||
NodeHeartbeatResponse nhResponse = recordFactory
|
||||
.newRecordInstance(NodeHeartbeatResponse.class);
|
||||
nhResponse.setHeartbeatResponse(response);
|
||||
nhResponse.setResponseId(heartBeatID);
|
||||
nhResponse.setNodeAction(heartBeatNodeAction);
|
||||
return nhResponse;
|
||||
}
|
||||
}
|
||||
|
@ -378,10 +371,10 @@ public class TestNodeStatusUpdater {
|
|||
LOG.info("Got heartBeatId: [" + heartBeatID +"]");
|
||||
NodeStatus nodeStatus = request.getNodeStatus();
|
||||
nodeStatus.setResponseId(heartBeatID++);
|
||||
HeartbeatResponse response =
|
||||
recordFactory.newRecordInstance(HeartbeatResponse.class);
|
||||
response.setResponseId(heartBeatID);
|
||||
response.setNodeAction(heartBeatNodeAction);
|
||||
NodeHeartbeatResponse nhResponse =
|
||||
recordFactory.newRecordInstance(NodeHeartbeatResponse.class);
|
||||
nhResponse.setResponseId(heartBeatID);
|
||||
nhResponse.setNodeAction(heartBeatNodeAction);
|
||||
|
||||
if (nodeStatus.getKeepAliveApplications() != null
|
||||
&& nodeStatus.getKeepAliveApplications().size() > 0) {
|
||||
|
@ -397,11 +390,8 @@ public class TestNodeStatusUpdater {
|
|||
if (heartBeatID == 2) {
|
||||
LOG.info("Sending FINISH_APP for application: [" + appId + "]");
|
||||
this.context.getApplications().put(appId, mock(Application.class));
|
||||
response.addAllApplicationsToCleanup(Collections.singletonList(appId));
|
||||
nhResponse.addAllApplicationsToCleanup(Collections.singletonList(appId));
|
||||
}
|
||||
NodeHeartbeatResponse nhResponse =
|
||||
recordFactory.newRecordInstance(NodeHeartbeatResponse.class);
|
||||
nhResponse.setHeartbeatResponse(response);
|
||||
return nhResponse;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
|||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
|
@ -78,15 +77,9 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
.newRecordInstance(NodeHeartbeatResponse.class);
|
||||
|
||||
static {
|
||||
HeartbeatResponse rebootResp = recordFactory
|
||||
.newRecordInstance(HeartbeatResponse.class);
|
||||
rebootResp.setNodeAction(NodeAction.REBOOT);
|
||||
reboot.setHeartbeatResponse(rebootResp);
|
||||
reboot.setNodeAction(NodeAction.REBOOT);
|
||||
|
||||
HeartbeatResponse decommissionedResp = recordFactory
|
||||
.newRecordInstance(HeartbeatResponse.class);
|
||||
decommissionedResp.setNodeAction(NodeAction.SHUTDOWN);
|
||||
shutDown.setHeartbeatResponse(decommissionedResp);
|
||||
shutDown.setNodeAction(NodeAction.SHUTDOWN);
|
||||
}
|
||||
|
||||
public ResourceTrackerService(RMContext rmContext,
|
||||
|
@ -240,17 +233,16 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
.newRecordInstance(NodeHeartbeatResponse.class);
|
||||
|
||||
// 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
|
||||
HeartbeatResponse lastHeartbeatResponse = rmNode.getLastHeartBeatResponse();
|
||||
if (remoteNodeStatus.getResponseId() + 1 == lastHeartbeatResponse
|
||||
NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();
|
||||
if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse
|
||||
.getResponseId()) {
|
||||
LOG.info("Received duplicate heartbeat from node "
|
||||
+ rmNode.getNodeAddress());
|
||||
nodeHeartBeatResponse.setHeartbeatResponse(lastHeartbeatResponse);
|
||||
return nodeHeartBeatResponse;
|
||||
} else if (remoteNodeStatus.getResponseId() + 1 < lastHeartbeatResponse
|
||||
return lastNodeHeartbeatResponse;
|
||||
} else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse
|
||||
.getResponseId()) {
|
||||
LOG.info("Too far behind rm response id:"
|
||||
+ lastHeartbeatResponse.getResponseId() + " nm response id:"
|
||||
+ lastNodeHeartbeatResponse.getResponseId() + " nm response id:"
|
||||
+ remoteNodeStatus.getResponseId());
|
||||
// TODO: Just sending reboot is not enough. Think more.
|
||||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
|
@ -259,11 +251,9 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
}
|
||||
|
||||
// Heartbeat response
|
||||
HeartbeatResponse latestResponse = recordFactory
|
||||
.newRecordInstance(HeartbeatResponse.class);
|
||||
latestResponse.setResponseId(lastHeartbeatResponse.getResponseId() + 1);
|
||||
rmNode.updateHeartbeatResponseForCleanup(latestResponse);
|
||||
latestResponse.setNodeAction(NodeAction.NORMAL);
|
||||
nodeHeartBeatResponse.setResponseId(lastNodeHeartbeatResponse.getResponseId() + 1);
|
||||
rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
|
||||
nodeHeartBeatResponse.setNodeAction(NodeAction.NORMAL);
|
||||
|
||||
// Check if node's masterKey needs to be updated and if the currentKey has
|
||||
// roller over, send it across
|
||||
|
@ -282,7 +272,7 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
}
|
||||
}
|
||||
if (shouldSendMasterKey) {
|
||||
latestResponse.setMasterKey(nextMasterKeyForNode);
|
||||
nodeHeartBeatResponse.setMasterKey(nextMasterKeyForNode);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -290,9 +280,8 @@ public class ResourceTrackerService extends AbstractService implements
|
|||
this.rmContext.getDispatcher().getEventHandler().handle(
|
||||
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
|
||||
remoteNodeStatus.getContainersStatuses(),
|
||||
remoteNodeStatus.getKeepAliveApplications(), latestResponse));
|
||||
remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));
|
||||
|
||||
nodeHeartBeatResponse.setHeartbeatResponse(latestResponse);
|
||||
return nodeHeartBeatResponse;
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
|
||||
/**
|
||||
* Node managers information on available resources
|
||||
|
@ -106,13 +106,13 @@ public interface RMNode {
|
|||
public List<ApplicationId> getAppsToCleanup();
|
||||
|
||||
/**
|
||||
* Update a {@link HeartbeatResponse} with the list of containers and
|
||||
* Update a {@link NodeHeartbeatResponse} with the list of containers and
|
||||
* applications to clean up for this node.
|
||||
* @param response the {@link HeartbeatResponse} to update
|
||||
* @param response the {@link NodeHeartbeatResponse} to update
|
||||
*/
|
||||
public void updateHeartbeatResponseForCleanup(HeartbeatResponse response);
|
||||
public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response);
|
||||
|
||||
public HeartbeatResponse getLastHeartBeatResponse();
|
||||
public NodeHeartbeatResponse getLastNodeHeartBeatResponse();
|
||||
|
||||
/**
|
||||
* Get and clear the list of containerUpdates accumulated across NM
|
||||
|
@ -121,5 +121,4 @@ public interface RMNode {
|
|||
* @return containerUpdates accumulated across NM heartbeats.
|
||||
*/
|
||||
public List<UpdatedContainerInfo> pullContainerUpdates();
|
||||
|
||||
}
|
||||
|
|
|
@ -46,7 +46,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
|
||||
|
@ -107,8 +107,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
/* the list of applications that have finished and need to be purged */
|
||||
private final List<ApplicationId> finishedApplications = new ArrayList<ApplicationId>();
|
||||
|
||||
private HeartbeatResponse latestHeartBeatResponse = recordFactory
|
||||
.newRecordInstance(HeartbeatResponse.class);
|
||||
private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory
|
||||
.newRecordInstance(NodeHeartbeatResponse.class);
|
||||
|
||||
private static final StateMachineFactory<RMNodeImpl,
|
||||
NodeState,
|
||||
|
@ -184,7 +184,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
this.nodeHealthStatus.setHealthReport("Healthy");
|
||||
this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis());
|
||||
|
||||
this.latestHeartBeatResponse.setResponseId(0);
|
||||
this.latestNodeHeartBeatResponse.setResponseId(0);
|
||||
|
||||
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
this.readLock = lock.readLock();
|
||||
|
@ -304,7 +304,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
};
|
||||
|
||||
@Override
|
||||
public void updateHeartbeatResponseForCleanup(HeartbeatResponse response) {
|
||||
public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) {
|
||||
this.writeLock.lock();
|
||||
|
||||
try {
|
||||
|
@ -319,12 +319,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
};
|
||||
|
||||
@Override
|
||||
public HeartbeatResponse getLastHeartBeatResponse() {
|
||||
public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
|
||||
|
||||
this.readLock.lock();
|
||||
|
||||
try {
|
||||
return this.latestHeartBeatResponse;
|
||||
return this.latestNodeHeartBeatResponse;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
|
@ -430,7 +430,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
if (rmNode.getTotalCapability().equals(newNode.getTotalCapability())
|
||||
&& rmNode.getHttpPort() == newNode.getHttpPort()) {
|
||||
// Reset heartbeat ID since node just restarted.
|
||||
rmNode.getLastHeartBeatResponse().setResponseId(0);
|
||||
rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
|
||||
rmNode.context.getDispatcher().getEventHandler().handle(
|
||||
new NodeAddedSchedulerEvent(rmNode));
|
||||
} else {
|
||||
|
@ -507,7 +507,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
|
||||
|
||||
// Switch the last heartbeatresponse.
|
||||
rmNode.latestHeartBeatResponse = statusEvent.getLatestResponse();
|
||||
rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
|
||||
|
||||
NodeHealthStatus remoteNodeHealthStatus =
|
||||
statusEvent.getNodeHealthStatus();
|
||||
|
@ -591,7 +591,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
|
|||
RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
|
||||
|
||||
// Switch the last heartbeatresponse.
|
||||
rmNode.latestHeartBeatResponse = statusEvent.getLatestResponse();
|
||||
rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
|
||||
NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus();
|
||||
rmNode.setNodeHealthStatus(remoteNodeHealthStatus);
|
||||
if (remoteNodeHealthStatus.getIsNodeHealthy()) {
|
||||
|
|
|
@ -24,18 +24,18 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
|
||||
public class RMNodeStatusEvent extends RMNodeEvent {
|
||||
|
||||
private final NodeHealthStatus nodeHealthStatus;
|
||||
private final List<ContainerStatus> containersCollection;
|
||||
private final HeartbeatResponse latestResponse;
|
||||
private final NodeHeartbeatResponse latestResponse;
|
||||
private final List<ApplicationId> keepAliveAppIds;
|
||||
|
||||
public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
|
||||
List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
|
||||
HeartbeatResponse latestResponse) {
|
||||
NodeHeartbeatResponse latestResponse) {
|
||||
super(nodeId, RMNodeEventType.STATUS_UPDATE);
|
||||
this.nodeHealthStatus = nodeHealthStatus;
|
||||
this.containersCollection = collection;
|
||||
|
@ -51,7 +51,7 @@ public class RMNodeStatusEvent extends RMNodeEvent {
|
|||
return this.containersCollection;
|
||||
}
|
||||
|
||||
public HeartbeatResponse getLatestResponse() {
|
||||
public NodeHeartbeatResponse getLatestResponse() {
|
||||
return this.latestResponse;
|
||||
}
|
||||
|
||||
|
|
|
@ -33,8 +33,8 @@ import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
|||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
||||
|
@ -93,12 +93,12 @@ public class MockNM {
|
|||
return registrationResponse;
|
||||
}
|
||||
|
||||
public HeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
|
||||
public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
|
||||
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(),
|
||||
isHealthy, ++responseId);
|
||||
}
|
||||
|
||||
public HeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
|
||||
public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
|
||||
int containerId, ContainerState containerState) throws Exception {
|
||||
HashMap<ApplicationId, List<ContainerStatus>> nodeUpdate =
|
||||
new HashMap<ApplicationId, List<ContainerStatus>>(1);
|
||||
|
@ -112,12 +112,12 @@ public class MockNM {
|
|||
return nodeHeartbeat(nodeUpdate, true);
|
||||
}
|
||||
|
||||
public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
|
||||
public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
|
||||
List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
|
||||
return nodeHeartbeat(conts, isHealthy, ++responseId);
|
||||
}
|
||||
|
||||
public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
|
||||
public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
|
||||
List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception {
|
||||
NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
|
||||
NodeStatus status = Records.newRecord(NodeStatus.class);
|
||||
|
@ -133,8 +133,8 @@ public class MockNM {
|
|||
status.setNodeHealthStatus(healthStatus);
|
||||
req.setNodeStatus(status);
|
||||
req.setLastKnownMasterKey(this.currentMasterKey);
|
||||
HeartbeatResponse heartbeatResponse =
|
||||
resourceTracker.nodeHeartbeat(req).getHeartbeatResponse();
|
||||
NodeHeartbeatResponse heartbeatResponse =
|
||||
resourceTracker.nodeHeartbeat(req);
|
||||
MasterKey masterKeyFromRM = heartbeatResponse.getMasterKey();
|
||||
this.currentMasterKey =
|
||||
(masterKeyFromRM != null
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.NodeState;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
||||
|
||||
|
@ -187,11 +187,11 @@ public class MockNodes {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void updateHeartbeatResponseForCleanup(HeartbeatResponse response) {
|
||||
public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public HeartbeatResponse getLastHeartBeatResponse() {
|
||||
public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
|
||||
return null;
|
||||
}
|
||||
|
||||
|
|
|
@ -51,8 +51,8 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
|
|||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
||||
|
@ -151,8 +151,8 @@ public class NodeManager implements ContainerManager {
|
|||
NodeHeartbeatRequest request = recordFactory
|
||||
.newRecordInstance(NodeHeartbeatRequest.class);
|
||||
request.setNodeStatus(nodeStatus);
|
||||
HeartbeatResponse response = resourceTrackerService
|
||||
.nodeHeartbeat(request).getHeartbeatResponse();
|
||||
NodeHeartbeatResponse response = resourceTrackerService
|
||||
.nodeHeartbeat(request);
|
||||
responseID = response.getResponseId();
|
||||
}
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
|
@ -92,15 +92,15 @@ public class TestApplicationCleanup {
|
|||
Assert.assertEquals(request, contReceived);
|
||||
|
||||
am.unregisterAppAttempt();
|
||||
HeartbeatResponse resp = nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1,
|
||||
NodeHeartbeatResponse resp = nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1,
|
||||
ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FINISHED);
|
||||
|
||||
//currently only containers are cleaned via this
|
||||
//AM container is cleaned via container launcher
|
||||
resp = nm1.nodeHeartbeat(true);
|
||||
List<ContainerId> contsToClean = resp.getContainersToCleanupList();
|
||||
List<ApplicationId> apps = resp.getApplicationsToCleanupList();
|
||||
List<ContainerId> contsToClean = resp.getContainersToCleanup();
|
||||
List<ApplicationId> apps = resp.getApplicationsToCleanup();
|
||||
int cleanedConts = contsToClean.size();
|
||||
int cleanedApps = apps.size();
|
||||
waitCount = 0;
|
||||
|
@ -109,8 +109,8 @@ public class TestApplicationCleanup {
|
|||
+ cleanedConts + " cleanedApps: " + cleanedApps);
|
||||
Thread.sleep(100);
|
||||
resp = nm1.nodeHeartbeat(true);
|
||||
contsToClean = resp.getContainersToCleanupList();
|
||||
apps = resp.getApplicationsToCleanupList();
|
||||
contsToClean = resp.getContainersToCleanup();
|
||||
apps = resp.getApplicationsToCleanup();
|
||||
cleanedConts += contsToClean.size();
|
||||
cleanedApps += apps.size();
|
||||
}
|
||||
|
@ -198,9 +198,9 @@ public class TestApplicationCleanup {
|
|||
.getId(), ContainerState.RUNNING, "nothing", 0));
|
||||
containerStatuses.put(app.getApplicationId(), containerStatusList);
|
||||
|
||||
HeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
|
||||
NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
|
||||
dispatcher.await();
|
||||
List<ContainerId> contsToClean = resp.getContainersToCleanupList();
|
||||
List<ContainerId> contsToClean = resp.getContainersToCleanup();
|
||||
int cleanedConts = contsToClean.size();
|
||||
waitCount = 0;
|
||||
while (cleanedConts < 1 && waitCount++ < 200) {
|
||||
|
@ -208,7 +208,7 @@ public class TestApplicationCleanup {
|
|||
Thread.sleep(100);
|
||||
resp = nm1.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
contsToClean = resp.getContainersToCleanupList();
|
||||
contsToClean = resp.getContainersToCleanup();
|
||||
cleanedConts += contsToClean.size();
|
||||
}
|
||||
LOG.info("Got cleanup for " + contsToClean.get(0));
|
||||
|
@ -226,7 +226,7 @@ public class TestApplicationCleanup {
|
|||
|
||||
resp = nm1.nodeHeartbeat(containerStatuses, true);
|
||||
dispatcher.await();
|
||||
contsToClean = resp.getContainersToCleanupList();
|
||||
contsToClean = resp.getContainersToCleanup();
|
||||
cleanedConts = contsToClean.size();
|
||||
// The cleanup list won't be instantaneous as it is given out by scheduler
|
||||
// and not RMNodeImpl.
|
||||
|
@ -236,7 +236,7 @@ public class TestApplicationCleanup {
|
|||
Thread.sleep(100);
|
||||
resp = nm1.nodeHeartbeat(true);
|
||||
dispatcher.await();
|
||||
contsToClean = resp.getContainersToCleanupList();
|
||||
contsToClean = resp.getContainersToCleanup();
|
||||
cleanedConts += contsToClean.size();
|
||||
}
|
||||
LOG.info("Got cleanup for " + contsToClean.get(0));
|
||||
|
|
|
@ -38,7 +38,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
|
|||
import org.apache.hadoop.yarn.api.records.NodeState;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.event.InlineDispatcher;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
||||
|
@ -118,7 +118,7 @@ public class TestRMNodeTransitions {
|
|||
}
|
||||
|
||||
private RMNodeStatusEvent getMockRMNodeStatusEvent() {
|
||||
HeartbeatResponse response = mock(HeartbeatResponse.class);
|
||||
NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
|
||||
|
||||
NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
|
||||
Boolean yes = new Boolean(true);
|
||||
|
@ -325,14 +325,14 @@ public class TestRMNodeTransitions {
|
|||
node.handle(statusEvent);
|
||||
Assert.assertEquals(1, node.getContainersToCleanUp().size());
|
||||
Assert.assertEquals(1, node.getAppsToCleanup().size());
|
||||
HeartbeatResponse hbrsp = Records.newRecord(HeartbeatResponse.class);
|
||||
node.updateHeartbeatResponseForCleanup(hbrsp);
|
||||
NodeHeartbeatResponse hbrsp = Records.newRecord(NodeHeartbeatResponse.class);
|
||||
node.updateNodeHeartbeatResponseForCleanup(hbrsp);
|
||||
Assert.assertEquals(0, node.getContainersToCleanUp().size());
|
||||
Assert.assertEquals(0, node.getAppsToCleanup().size());
|
||||
Assert.assertEquals(1, hbrsp.getContainersToCleanupCount());
|
||||
Assert.assertEquals(completedContainerId, hbrsp.getContainerToCleanup(0));
|
||||
Assert.assertEquals(1, hbrsp.getApplicationsToCleanupCount());
|
||||
Assert.assertEquals(finishedAppId, hbrsp.getApplicationsToCleanup(0));
|
||||
Assert.assertEquals(1, hbrsp.getContainersToCleanup().size());
|
||||
Assert.assertEquals(completedContainerId, hbrsp.getContainersToCleanup().get(0));
|
||||
Assert.assertEquals(1, hbrsp.getApplicationsToCleanup().size());
|
||||
Assert.assertEquals(finishedAppId, hbrsp.getApplicationsToCleanup().get(0));
|
||||
}
|
||||
|
||||
private RMNodeImpl getRunningNode() {
|
||||
|
|
|
@ -31,7 +31,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
|
||||
|
@ -219,7 +219,7 @@ public class TestRMRestart {
|
|||
Assert.assertTrue(allocResponse.getReboot());
|
||||
|
||||
// NM should be rebooted on heartbeat, even first heartbeat for nm2
|
||||
HeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
|
||||
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
|
||||
Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction());
|
||||
hbResponse = nm2.nodeHeartbeat(true);
|
||||
Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction());
|
||||
|
|
|
@ -36,9 +36,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||
|
@ -75,7 +75,7 @@ public class TestResourceTrackerService {
|
|||
assert(metrics != null);
|
||||
int metricCount = metrics.getNumDecommisionedNMs();
|
||||
|
||||
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||
|
@ -124,7 +124,7 @@ public class TestResourceTrackerService {
|
|||
|
||||
int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs();
|
||||
|
||||
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||
nodeHeartbeat = nm2.nodeHeartbeat(true);
|
||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||
|
@ -161,7 +161,7 @@ public class TestResourceTrackerService {
|
|||
ClusterMetrics metrics = ClusterMetrics.getMetrics();
|
||||
assert(metrics != null);
|
||||
int initialMetricCount = metrics.getNumDecommisionedNMs();
|
||||
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||
Assert.assertEquals(
|
||||
NodeAction.NORMAL,
|
||||
nodeHeartbeat.getNodeAction());
|
||||
|
@ -198,7 +198,7 @@ public class TestResourceTrackerService {
|
|||
ClusterMetrics metrics = ClusterMetrics.getMetrics();
|
||||
assert(metrics != null);
|
||||
int initialMetricCount = metrics.getNumDecommisionedNMs();
|
||||
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||
Assert.assertEquals(
|
||||
NodeAction.NORMAL,
|
||||
nodeHeartbeat.getNodeAction());
|
||||
|
@ -254,7 +254,7 @@ public class TestResourceTrackerService {
|
|||
MockNM nm2 = rm.registerNode("host2:1234", 2048);
|
||||
|
||||
int initialMetricCount = ClusterMetrics.getMetrics().getNumRebootedNMs();
|
||||
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
||||
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
|
||||
|
||||
nodeHeartbeat = nm2.nodeHeartbeat(
|
||||
|
@ -351,7 +351,7 @@ public class TestResourceTrackerService {
|
|||
|
||||
// reconnect of healthy node
|
||||
nm1 = rm.registerNode("host1:1234", 5120);
|
||||
HeartbeatResponse response = nm1.nodeHeartbeat(true);
|
||||
NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
|
||||
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
|
||||
dispatcher.await();
|
||||
Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());
|
||||
|
|
|
@ -109,7 +109,7 @@ public class TestNMExpiry {
|
|||
.newRecordInstance(NodeHeartbeatRequest.class);
|
||||
request.setNodeStatus(nodeStatus);
|
||||
lastResponseID = resourceTrackerService.nodeHeartbeat(request)
|
||||
.getHeartbeatResponse().getResponseId();
|
||||
.getResponseId();
|
||||
|
||||
Thread.sleep(1000);
|
||||
} catch(Exception e) {
|
||||
|
|
|
@ -31,8 +31,8 @@ import org.apache.hadoop.yarn.event.InlineDispatcher;
|
|||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
|
||||
|
@ -116,23 +116,20 @@ public class TestRMNMRPCResponseId {
|
|||
nodeHeartBeatRequest.setNodeStatus(nodeStatus);
|
||||
|
||||
nodeStatus.setResponseId(0);
|
||||
HeartbeatResponse response = resourceTrackerService.nodeHeartbeat(
|
||||
nodeHeartBeatRequest).getHeartbeatResponse();
|
||||
NodeHeartbeatResponse response = resourceTrackerService.nodeHeartbeat(
|
||||
nodeHeartBeatRequest);
|
||||
Assert.assertTrue(response.getResponseId() == 1);
|
||||
|
||||
nodeStatus.setResponseId(response.getResponseId());
|
||||
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest)
|
||||
.getHeartbeatResponse();
|
||||
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest);
|
||||
Assert.assertTrue(response.getResponseId() == 2);
|
||||
|
||||
/* try calling with less response id */
|
||||
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest)
|
||||
.getHeartbeatResponse();
|
||||
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest);
|
||||
Assert.assertTrue(response.getResponseId() == 2);
|
||||
|
||||
nodeStatus.setResponseId(0);
|
||||
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest)
|
||||
.getHeartbeatResponse();
|
||||
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest);
|
||||
Assert.assertTrue(NodeAction.REBOOT.equals(response.getNodeAction()));
|
||||
}
|
||||
}
|
|
@ -310,8 +310,7 @@ public class MiniYARNCluster extends CompositeService {
|
|||
NodeHeartbeatResponse response = recordFactory.newRecordInstance(
|
||||
NodeHeartbeatResponse.class);
|
||||
try {
|
||||
response.setHeartbeatResponse(rt.nodeHeartbeat(request)
|
||||
.getHeartbeatResponse());
|
||||
response = rt.nodeHeartbeat(request);
|
||||
} catch (IOException ioe) {
|
||||
LOG.info("Exception in heartbeat from node " +
|
||||
request.getNodeStatus().getNodeId(), ioe);
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
|
@ -67,7 +67,7 @@ public class TestRMNMSecretKeys {
|
|||
Assert.assertNotNull("Registration should cause a key-update!", masterKey);
|
||||
dispatcher.await();
|
||||
|
||||
HeartbeatResponse response = nm.nodeHeartbeat(true);
|
||||
NodeHeartbeatResponse response = nm.nodeHeartbeat(true);
|
||||
Assert.assertNull(
|
||||
"First heartbeat after registration shouldn't get any key updates!",
|
||||
response.getMasterKey());
|
||||
|
|
Loading…
Reference in New Issue