YARN-439. Flatten NodeHeartbeatResponse. Contributed by Xuan Gong.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1460811 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Siddharth Seth 2013-03-25 18:28:50 +00:00
parent 6a482a88b8
commit 18e08a8f51
26 changed files with 348 additions and 577 deletions

View File

@ -62,6 +62,8 @@ Release 2.0.5-beta - UNRELEASED
YARN-396. Rationalize AllocateResponse in RM Scheduler API. (Zhijie Shen YARN-396. Rationalize AllocateResponse in RM Scheduler API. (Zhijie Shen
via hitesh) via hitesh)
YARN-439. Flatten NodeHeartbeatResponse. (Xuan Gong via sseth)
NEW FEATURES NEW FEATURES
IMPROVEMENTS IMPROVEMENTS

View File

@ -18,10 +18,28 @@
package org.apache.hadoop.yarn.server.api.protocolrecords; package org.apache.hadoop.yarn.server.api.protocolrecords;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
public interface NodeHeartbeatResponse { public interface NodeHeartbeatResponse {
public abstract HeartbeatResponse getHeartbeatResponse(); int getResponseId();
NodeAction getNodeAction();
public abstract void setHeartbeatResponse(HeartbeatResponse heartbeatResponse); List<ContainerId> getContainersToCleanup();
List<ApplicationId> getApplicationsToCleanup();
void setResponseId(int responseId);
void setNodeAction(NodeAction action);
MasterKey getMasterKey();
void setMasterKey(MasterKey secretKey);
void addAllContainersToCleanup(List<ContainerId> containers);
void addAllApplicationsToCleanup(List<ApplicationId> applications);
} }

View File

@ -18,14 +18,25 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb; package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
import java.util.ArrayList;
import java.util.List;
import java.util.Iterator;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ProtoBase; import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProto; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.HeartbeatResponsePBImpl; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
@ -34,8 +45,9 @@ public class NodeHeartbeatResponsePBImpl extends ProtoBase<NodeHeartbeatResponse
NodeHeartbeatResponseProto.Builder builder = null; NodeHeartbeatResponseProto.Builder builder = null;
boolean viaProto = false; boolean viaProto = false;
private HeartbeatResponse heartbeatResponse = null; private List<ContainerId> containersToCleanup = null;
private List<ApplicationId> applicationsToCleanup = null;
private MasterKey masterKey = null;
public NodeHeartbeatResponsePBImpl() { public NodeHeartbeatResponsePBImpl() {
builder = NodeHeartbeatResponseProto.newBuilder(); builder = NodeHeartbeatResponseProto.newBuilder();
@ -54,8 +66,14 @@ public NodeHeartbeatResponseProto getProto() {
} }
private void mergeLocalToBuilder() { private void mergeLocalToBuilder() {
if (this.heartbeatResponse != null) { if (this.containersToCleanup != null) {
builder.setHeartbeatResponse(convertToProtoFormat(this.heartbeatResponse)); addContainersToCleanupToProto();
}
if (this.applicationsToCleanup != null) {
addApplicationsToCleanupToProto();
}
if (this.masterKey != null) {
builder.setMasterKey(convertToProtoFormat(this.masterKey));
} }
} }
@ -76,34 +94,213 @@ private void maybeInitBuilder() {
@Override @Override
public HeartbeatResponse getHeartbeatResponse() { public int getResponseId() {
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder; NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
if (this.heartbeatResponse != null) { return (p.getResponseId());
return this.heartbeatResponse;
}
if (!p.hasHeartbeatResponse()) {
return null;
}
this.heartbeatResponse = convertFromProtoFormat(p.getHeartbeatResponse());
return this.heartbeatResponse;
} }
@Override @Override
public void setHeartbeatResponse(HeartbeatResponse heartbeatResponse) { public void setResponseId(int responseId) {
maybeInitBuilder(); maybeInitBuilder();
if (heartbeatResponse == null) builder.setResponseId((responseId));
builder.clearHeartbeatResponse();
this.heartbeatResponse = heartbeatResponse;
} }
private HeartbeatResponsePBImpl convertFromProtoFormat(HeartbeatResponseProto p) { @Override
return new HeartbeatResponsePBImpl(p); public MasterKey getMasterKey() {
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
if (this.masterKey != null) {
return this.masterKey;
}
if (!p.hasMasterKey()) {
return null;
}
this.masterKey = convertFromProtoFormat(p.getMasterKey());
return this.masterKey;
} }
private HeartbeatResponseProto convertToProtoFormat(HeartbeatResponse t) { @Override
return ((HeartbeatResponsePBImpl)t).getProto(); public void setMasterKey(MasterKey masterKey) {
maybeInitBuilder();
if (masterKey == null)
builder.clearMasterKey();
this.masterKey = masterKey;
} }
@Override
public NodeAction getNodeAction() {
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasNodeAction()) {
return null;
}
return (convertFromProtoFormat(p.getNodeAction()));
}
@Override
public void setNodeAction(NodeAction nodeAction) {
maybeInitBuilder();
if (nodeAction == null) {
builder.clearNodeAction();
return;
}
builder.setNodeAction(convertToProtoFormat(nodeAction));
}
@Override
public List<ContainerId> getContainersToCleanup() {
initContainersToCleanup();
return this.containersToCleanup;
}
private void initContainersToCleanup() {
if (this.containersToCleanup != null) {
return;
}
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerIdProto> list = p.getContainersToCleanupList();
this.containersToCleanup = new ArrayList<ContainerId>();
for (ContainerIdProto c : list) {
this.containersToCleanup.add(convertFromProtoFormat(c));
}
}
@Override
public void addAllContainersToCleanup(
final List<ContainerId> containersToCleanup) {
if (containersToCleanup == null)
return;
initContainersToCleanup();
this.containersToCleanup.addAll(containersToCleanup);
}
private void addContainersToCleanupToProto() {
maybeInitBuilder();
builder.clearContainersToCleanup();
if (containersToCleanup == null)
return;
Iterable<ContainerIdProto> iterable = new Iterable<ContainerIdProto>() {
@Override
public Iterator<ContainerIdProto> iterator() {
return new Iterator<ContainerIdProto>() {
Iterator<ContainerId> iter = containersToCleanup.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ContainerIdProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllContainersToCleanup(iterable);
}
@Override
public List<ApplicationId> getApplicationsToCleanup() {
initApplicationsToCleanup();
return this.applicationsToCleanup;
}
private void initApplicationsToCleanup() {
if (this.applicationsToCleanup != null) {
return;
}
NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ApplicationIdProto> list = p.getApplicationsToCleanupList();
this.applicationsToCleanup = new ArrayList<ApplicationId>();
for (ApplicationIdProto c : list) {
this.applicationsToCleanup.add(convertFromProtoFormat(c));
}
}
@Override
public void addAllApplicationsToCleanup(
final List<ApplicationId> applicationsToCleanup) {
if (applicationsToCleanup == null)
return;
initApplicationsToCleanup();
this.applicationsToCleanup.addAll(applicationsToCleanup);
}
private void addApplicationsToCleanupToProto() {
maybeInitBuilder();
builder.clearApplicationsToCleanup();
if (applicationsToCleanup == null)
return;
Iterable<ApplicationIdProto> iterable = new Iterable<ApplicationIdProto>() {
@Override
public Iterator<ApplicationIdProto> iterator() {
return new Iterator<ApplicationIdProto>() {
Iterator<ApplicationId> iter = applicationsToCleanup.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ApplicationIdProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllApplicationsToCleanup(iterable);
}
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
return new ContainerIdPBImpl(p);
}
private ContainerIdProto convertToProtoFormat(ContainerId t) {
return ((ContainerIdPBImpl) t).getProto();
}
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
}
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
return ((ApplicationIdPBImpl) t).getProto();
}
private NodeAction convertFromProtoFormat(NodeActionProto p) {
return NodeAction.valueOf(p.name());
}
private NodeActionProto convertToProtoFormat(NodeAction t) {
return NodeActionProto.valueOf(t.name());
}
private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) {
return new MasterKeyPBImpl(p);
}
private MasterKeyProto convertToProtoFormat(MasterKey t) {
return ((MasterKeyPBImpl) t).getProto();
}
} }

View File

@ -1,52 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.records;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
public interface HeartbeatResponse {
int getResponseId();
NodeAction getNodeAction();
List<ContainerId> getContainersToCleanupList();
ContainerId getContainerToCleanup(int index);
int getContainersToCleanupCount();
List<ApplicationId> getApplicationsToCleanupList();
ApplicationId getApplicationsToCleanup(int index);
int getApplicationsToCleanupCount();
void setResponseId(int responseId);
void setNodeAction(NodeAction action);
MasterKey getMasterKey();
void setMasterKey(MasterKey secretKey);
void addAllContainersToCleanup(List<ContainerId> containers);
void addContainerToCleanup(ContainerId container);
void removeContainerToCleanup(int index);
void clearContainersToCleanup();
void addAllApplicationsToCleanup(List<ApplicationId> applications);
void addApplicationToCleanup(ApplicationId applicationId);
void removeApplicationToCleanup(int index);
void clearApplicationsToCleanup();
}

View File

@ -1,350 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.api.records.impl.pb;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.HeartbeatResponseProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
public class HeartbeatResponsePBImpl extends
ProtoBase<HeartbeatResponseProto> implements HeartbeatResponse {
HeartbeatResponseProto proto = HeartbeatResponseProto.getDefaultInstance();
HeartbeatResponseProto.Builder builder = null;
boolean viaProto = false;
private List<ContainerId> containersToCleanup = null;
private List<ApplicationId> applicationsToCleanup = null;
private MasterKey masterKey = null;
public HeartbeatResponsePBImpl() {
builder = HeartbeatResponseProto.newBuilder();
}
public HeartbeatResponsePBImpl(HeartbeatResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public HeartbeatResponseProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToBuilder() {
if (this.containersToCleanup != null) {
addContainersToCleanupToProto();
}
if (this.applicationsToCleanup != null) {
addApplicationsToCleanupToProto();
}
if (this.masterKey != null) {
builder.setMasterKey(convertToProtoFormat(this.masterKey));
}
}
private void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = HeartbeatResponseProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public int getResponseId() {
HeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
return (p.getResponseId());
}
@Override
public void setResponseId(int responseId) {
maybeInitBuilder();
builder.setResponseId((responseId));
}
@Override
public MasterKey getMasterKey() {
HeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
if (this.masterKey != null) {
return this.masterKey;
}
if (!p.hasMasterKey()) {
return null;
}
this.masterKey = convertFromProtoFormat(p.getMasterKey());
return this.masterKey;
}
@Override
public void setMasterKey(MasterKey masterKey) {
maybeInitBuilder();
if (masterKey == null)
builder.clearMasterKey();
this.masterKey = masterKey;
}
@Override
public NodeAction getNodeAction() {
HeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
if(!p.hasNodeAction()) {
return null;
}
return (convertFromProtoFormat(p.getNodeAction()));
}
@Override
public void setNodeAction(NodeAction nodeAction) {
maybeInitBuilder();
if (nodeAction == null) {
builder.clearNodeAction();
return;
}
builder.setNodeAction(convertToProtoFormat(nodeAction));
}
@Override
public List<ContainerId> getContainersToCleanupList() {
initContainersToCleanup();
return this.containersToCleanup;
}
@Override
public ContainerId getContainerToCleanup(int index) {
initContainersToCleanup();
return this.containersToCleanup.get(index);
}
@Override
public int getContainersToCleanupCount() {
initContainersToCleanup();
return this.containersToCleanup.size();
}
private void initContainersToCleanup() {
if (this.containersToCleanup != null) {
return;
}
HeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ContainerIdProto> list = p.getContainersToCleanupList();
this.containersToCleanup = new ArrayList<ContainerId>();
for (ContainerIdProto c : list) {
this.containersToCleanup.add(convertFromProtoFormat(c));
}
}
@Override
public void addAllContainersToCleanup(final List<ContainerId> containersToCleanup) {
if (containersToCleanup == null)
return;
initContainersToCleanup();
this.containersToCleanup.addAll(containersToCleanup);
}
private void addContainersToCleanupToProto() {
maybeInitBuilder();
builder.clearContainersToCleanup();
if (containersToCleanup == null)
return;
Iterable<ContainerIdProto> iterable = new Iterable<ContainerIdProto>() {
@Override
public Iterator<ContainerIdProto> iterator() {
return new Iterator<ContainerIdProto>() {
Iterator<ContainerId> iter = containersToCleanup.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ContainerIdProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllContainersToCleanup(iterable);
}
@Override
public void addContainerToCleanup(ContainerId containersToCleanup) {
initContainersToCleanup();
this.containersToCleanup.add(containersToCleanup);
}
@Override
public void removeContainerToCleanup(int index) {
initContainersToCleanup();
this.containersToCleanup.remove(index);
}
@Override
public void clearContainersToCleanup() {
initContainersToCleanup();
this.containersToCleanup.clear();
}
@Override
public List<ApplicationId> getApplicationsToCleanupList() {
initApplicationsToCleanup();
return this.applicationsToCleanup;
}
@Override
public ApplicationId getApplicationsToCleanup(int index) {
initApplicationsToCleanup();
return this.applicationsToCleanup.get(index);
}
@Override
public int getApplicationsToCleanupCount() {
initApplicationsToCleanup();
return this.applicationsToCleanup.size();
}
private void initApplicationsToCleanup() {
if (this.applicationsToCleanup != null) {
return;
}
HeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
List<ApplicationIdProto> list = p.getApplicationsToCleanupList();
this.applicationsToCleanup = new ArrayList<ApplicationId>();
for (ApplicationIdProto c : list) {
this.applicationsToCleanup.add(convertFromProtoFormat(c));
}
}
@Override
public void addAllApplicationsToCleanup(final List<ApplicationId> applicationsToCleanup) {
if (applicationsToCleanup == null)
return;
initApplicationsToCleanup();
this.applicationsToCleanup.addAll(applicationsToCleanup);
}
private void addApplicationsToCleanupToProto() {
maybeInitBuilder();
builder.clearApplicationsToCleanup();
if (applicationsToCleanup == null)
return;
Iterable<ApplicationIdProto> iterable = new Iterable<ApplicationIdProto>() {
@Override
public Iterator<ApplicationIdProto> iterator() {
return new Iterator<ApplicationIdProto>() {
Iterator<ApplicationId> iter = applicationsToCleanup.iterator();
@Override
public boolean hasNext() {
return iter.hasNext();
}
@Override
public ApplicationIdProto next() {
return convertToProtoFormat(iter.next());
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
};
builder.addAllApplicationsToCleanup(iterable);
}
@Override
public void addApplicationToCleanup(ApplicationId applicationsToCleanup) {
initApplicationsToCleanup();
this.applicationsToCleanup.add(applicationsToCleanup);
}
@Override
public void removeApplicationToCleanup(int index) {
initApplicationsToCleanup();
this.applicationsToCleanup.remove(index);
}
@Override
public void clearApplicationsToCleanup() {
initApplicationsToCleanup();
this.applicationsToCleanup.clear();
}
private ContainerIdPBImpl convertFromProtoFormat(ContainerIdProto p) {
return new ContainerIdPBImpl(p);
}
private ContainerIdProto convertToProtoFormat(ContainerId t) {
return ((ContainerIdPBImpl)t).getProto();
}
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
return new ApplicationIdPBImpl(p);
}
private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
return ((ApplicationIdPBImpl)t).getProto();
}
private NodeAction convertFromProtoFormat(NodeActionProto p) {
return NodeAction.valueOf(p.name());
}
private NodeActionProto convertToProtoFormat(NodeAction t) {
return NodeActionProto.valueOf(t.name());
}
private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) {
return new MasterKeyPBImpl(p);
}
private MasterKeyProto convertToProtoFormat(MasterKey t) {
return ((MasterKeyPBImpl)t).getProto();
}
}

View File

@ -47,11 +47,4 @@ message RegistrationResponseProto {
optional NodeActionProto nodeAction = 2; optional NodeActionProto nodeAction = 2;
} }
message HeartbeatResponseProto {
optional int32 response_id = 1;
optional MasterKeyProto master_key = 2;
optional NodeActionProto nodeAction = 3;
repeated ContainerIdProto containers_to_cleanup = 4;
repeated ApplicationIdProto applications_to_cleanup = 5;
}

View File

@ -38,6 +38,11 @@ message NodeHeartbeatRequestProto {
optional MasterKeyProto last_known_master_key = 2; optional MasterKeyProto last_known_master_key = 2;
} }
message NodeHeartbeatResponseProto { message NodeHeartbeatResponseProto {
optional HeartbeatResponseProto heartbeat_response = 1; optional int32 response_id = 1;
optional MasterKeyProto master_key = 2;
optional NodeActionProto nodeAction = 3;
repeated ContainerIdProto containers_to_cleanup = 4;
repeated ApplicationIdProto applications_to_cleanup = 5;
} }

View File

@ -25,8 +25,6 @@
import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl; import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl; import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatRequestPBImpl;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.impl.pb.HeartbeatResponsePBImpl;
import org.junit.Test; import org.junit.Test;
public class TestRecordFactory { public class TestRecordFactory {
@ -34,15 +32,6 @@ public class TestRecordFactory {
@Test @Test
public void testPbRecordFactory() { public void testPbRecordFactory() {
RecordFactory pbRecordFactory = RecordFactoryPBImpl.get(); RecordFactory pbRecordFactory = RecordFactoryPBImpl.get();
try {
HeartbeatResponse response = pbRecordFactory.newRecordInstance(HeartbeatResponse.class);
Assert.assertEquals(HeartbeatResponsePBImpl.class, response.getClass());
} catch (YarnException e) {
e.printStackTrace();
Assert.fail("Failed to crete record");
}
try { try {
NodeHeartbeatRequest request = pbRecordFactory.newRecordInstance(NodeHeartbeatRequest.class); NodeHeartbeatRequest request = pbRecordFactory.newRecordInstance(NodeHeartbeatRequest.class);
Assert.assertEquals(NodeHeartbeatRequestPBImpl.class, request.getClass()); Assert.assertEquals(NodeHeartbeatRequestPBImpl.class, request.getClass());

View File

@ -50,8 +50,8 @@
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@ -408,8 +408,8 @@ public void run() {
request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context
.getContainerTokenSecretManager().getCurrentKey()); .getContainerTokenSecretManager().getCurrentKey());
} }
HeartbeatResponse response = NodeHeartbeatResponse response =
resourceTracker.nodeHeartbeat(request).getHeartbeatResponse(); resourceTracker.nodeHeartbeat(request);
// See if the master-key has rolled over // See if the master-key has rolled over
if (isSecurityEnabled()) { if (isSecurityEnabled()) {
@ -439,14 +439,14 @@ public void run() {
lastHeartBeatID = response.getResponseId(); lastHeartBeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response List<ContainerId> containersToCleanup = response
.getContainersToCleanupList(); .getContainersToCleanup();
if (containersToCleanup.size() != 0) { if (containersToCleanup.size() != 0) {
dispatcher.getEventHandler().handle( dispatcher.getEventHandler().handle(
new CMgrCompletedContainersEvent(containersToCleanup, new CMgrCompletedContainersEvent(containersToCleanup,
CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER)); CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
} }
List<ApplicationId> appsToCleanup = List<ApplicationId> appsToCleanup =
response.getApplicationsToCleanupList(); response.getApplicationsToCleanup();
//Only start tracking for keepAlive on FINISH_APP //Only start tracking for keepAlive on FINISH_APP
trackAppsForKeepAlive(appsToCleanup); trackAppsForKeepAlive(appsToCleanup);
if (appsToCleanup.size() != 0) { if (appsToCleanup.size() != 0) {

View File

@ -29,7 +29,6 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
@ -79,13 +78,9 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
LOG.info("Got heartbeat number " + heartBeatID); LOG.info("Got heartbeat number " + heartBeatID);
nodeStatus.setResponseId(heartBeatID++); nodeStatus.setResponseId(heartBeatID++);
HeartbeatResponse response = recordFactory
.newRecordInstance(HeartbeatResponse.class);
response.setResponseId(heartBeatID);
NodeHeartbeatResponse nhResponse = recordFactory NodeHeartbeatResponse nhResponse = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class); .newRecordInstance(NodeHeartbeatResponse.class);
nhResponse.setHeartbeatResponse(response); nhResponse.setResponseId(heartBeatID);
return nhResponse; return nhResponse;
} }
} }

View File

@ -56,7 +56,6 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
@ -218,13 +217,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
this.context.getContainers(); this.context.getContainers();
Assert.assertEquals(2, activeContainers.size()); Assert.assertEquals(2, activeContainers.size());
} }
HeartbeatResponse response = recordFactory
.newRecordInstance(HeartbeatResponse.class);
response.setResponseId(heartBeatID);
NodeHeartbeatResponse nhResponse = recordFactory NodeHeartbeatResponse nhResponse = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class); .newRecordInstance(NodeHeartbeatResponse.class);
nhResponse.setHeartbeatResponse(response); nhResponse.setResponseId(heartBeatID);
return nhResponse; return nhResponse;
} }
} }
@ -335,14 +331,11 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException { throws YarnRemoteException {
NodeStatus nodeStatus = request.getNodeStatus(); NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID++); nodeStatus.setResponseId(heartBeatID++);
HeartbeatResponse response = recordFactory
.newRecordInstance(HeartbeatResponse.class);
response.setResponseId(heartBeatID);
response.setNodeAction(heartBeatNodeAction);
NodeHeartbeatResponse nhResponse = recordFactory NodeHeartbeatResponse nhResponse = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class); .newRecordInstance(NodeHeartbeatResponse.class);
nhResponse.setHeartbeatResponse(response); nhResponse.setResponseId(heartBeatID);
nhResponse.setNodeAction(heartBeatNodeAction);
return nhResponse; return nhResponse;
} }
} }
@ -378,10 +371,10 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
LOG.info("Got heartBeatId: [" + heartBeatID +"]"); LOG.info("Got heartBeatId: [" + heartBeatID +"]");
NodeStatus nodeStatus = request.getNodeStatus(); NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID++); nodeStatus.setResponseId(heartBeatID++);
HeartbeatResponse response = NodeHeartbeatResponse nhResponse =
recordFactory.newRecordInstance(HeartbeatResponse.class); recordFactory.newRecordInstance(NodeHeartbeatResponse.class);
response.setResponseId(heartBeatID); nhResponse.setResponseId(heartBeatID);
response.setNodeAction(heartBeatNodeAction); nhResponse.setNodeAction(heartBeatNodeAction);
if (nodeStatus.getKeepAliveApplications() != null if (nodeStatus.getKeepAliveApplications() != null
&& nodeStatus.getKeepAliveApplications().size() > 0) { && nodeStatus.getKeepAliveApplications().size() > 0) {
@ -397,11 +390,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
if (heartBeatID == 2) { if (heartBeatID == 2) {
LOG.info("Sending FINISH_APP for application: [" + appId + "]"); LOG.info("Sending FINISH_APP for application: [" + appId + "]");
this.context.getApplications().put(appId, mock(Application.class)); this.context.getApplications().put(appId, mock(Application.class));
response.addAllApplicationsToCleanup(Collections.singletonList(appId)); nhResponse.addAllApplicationsToCleanup(Collections.singletonList(appId));
} }
NodeHeartbeatResponse nhResponse =
recordFactory.newRecordInstance(NodeHeartbeatResponse.class);
nhResponse.setHeartbeatResponse(response);
return nhResponse; return nhResponse;
} }
} }

View File

@ -39,7 +39,6 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@ -78,15 +77,9 @@ public class ResourceTrackerService extends AbstractService implements
.newRecordInstance(NodeHeartbeatResponse.class); .newRecordInstance(NodeHeartbeatResponse.class);
static { static {
HeartbeatResponse rebootResp = recordFactory reboot.setNodeAction(NodeAction.REBOOT);
.newRecordInstance(HeartbeatResponse.class);
rebootResp.setNodeAction(NodeAction.REBOOT);
reboot.setHeartbeatResponse(rebootResp);
HeartbeatResponse decommissionedResp = recordFactory shutDown.setNodeAction(NodeAction.SHUTDOWN);
.newRecordInstance(HeartbeatResponse.class);
decommissionedResp.setNodeAction(NodeAction.SHUTDOWN);
shutDown.setHeartbeatResponse(decommissionedResp);
} }
public ResourceTrackerService(RMContext rmContext, public ResourceTrackerService(RMContext rmContext,
@ -240,17 +233,16 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
.newRecordInstance(NodeHeartbeatResponse.class); .newRecordInstance(NodeHeartbeatResponse.class);
// 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat
HeartbeatResponse lastHeartbeatResponse = rmNode.getLastHeartBeatResponse(); NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();
if (remoteNodeStatus.getResponseId() + 1 == lastHeartbeatResponse if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse
.getResponseId()) { .getResponseId()) {
LOG.info("Received duplicate heartbeat from node " LOG.info("Received duplicate heartbeat from node "
+ rmNode.getNodeAddress()); + rmNode.getNodeAddress());
nodeHeartBeatResponse.setHeartbeatResponse(lastHeartbeatResponse); return lastNodeHeartbeatResponse;
return nodeHeartBeatResponse; } else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse
} else if (remoteNodeStatus.getResponseId() + 1 < lastHeartbeatResponse
.getResponseId()) { .getResponseId()) {
LOG.info("Too far behind rm response id:" LOG.info("Too far behind rm response id:"
+ lastHeartbeatResponse.getResponseId() + " nm response id:" + lastNodeHeartbeatResponse.getResponseId() + " nm response id:"
+ remoteNodeStatus.getResponseId()); + remoteNodeStatus.getResponseId());
// TODO: Just sending reboot is not enough. Think more. // TODO: Just sending reboot is not enough. Think more.
this.rmContext.getDispatcher().getEventHandler().handle( this.rmContext.getDispatcher().getEventHandler().handle(
@ -259,11 +251,9 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
} }
// Heartbeat response // Heartbeat response
HeartbeatResponse latestResponse = recordFactory nodeHeartBeatResponse.setResponseId(lastNodeHeartbeatResponse.getResponseId() + 1);
.newRecordInstance(HeartbeatResponse.class); rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);
latestResponse.setResponseId(lastHeartbeatResponse.getResponseId() + 1); nodeHeartBeatResponse.setNodeAction(NodeAction.NORMAL);
rmNode.updateHeartbeatResponseForCleanup(latestResponse);
latestResponse.setNodeAction(NodeAction.NORMAL);
// Check if node's masterKey needs to be updated and if the currentKey has // Check if node's masterKey needs to be updated and if the currentKey has
// roller over, send it across // roller over, send it across
@ -282,7 +272,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
} }
} }
if (shouldSendMasterKey) { if (shouldSendMasterKey) {
latestResponse.setMasterKey(nextMasterKeyForNode); nodeHeartBeatResponse.setMasterKey(nextMasterKeyForNode);
} }
} }
@ -290,9 +280,8 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
this.rmContext.getDispatcher().getEventHandler().handle( this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(), new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
remoteNodeStatus.getContainersStatuses(), remoteNodeStatus.getContainersStatuses(),
remoteNodeStatus.getKeepAliveApplications(), latestResponse)); remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));
nodeHeartBeatResponse.setHeartbeatResponse(latestResponse);
return nodeHeartBeatResponse; return nodeHeartBeatResponse;
} }

View File

@ -27,7 +27,7 @@
import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
/** /**
* Node managers information on available resources * Node managers information on available resources
@ -106,13 +106,13 @@ public interface RMNode {
public List<ApplicationId> getAppsToCleanup(); public List<ApplicationId> getAppsToCleanup();
/** /**
* Update a {@link HeartbeatResponse} with the list of containers and * Update a {@link NodeHeartbeatResponse} with the list of containers and
* applications to clean up for this node. * applications to clean up for this node.
* @param response the {@link HeartbeatResponse} to update * @param response the {@link NodeHeartbeatResponse} to update
*/ */
public void updateHeartbeatResponseForCleanup(HeartbeatResponse response); public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response);
public HeartbeatResponse getLastHeartBeatResponse(); public NodeHeartbeatResponse getLastNodeHeartBeatResponse();
/** /**
* Get and clear the list of containerUpdates accumulated across NM * Get and clear the list of containerUpdates accumulated across NM
@ -121,5 +121,4 @@ public interface RMNode {
* @return containerUpdates accumulated across NM heartbeats. * @return containerUpdates accumulated across NM heartbeats.
*/ */
public List<UpdatedContainerInfo> pullContainerUpdates(); public List<UpdatedContainerInfo> pullContainerUpdates();
} }

View File

@ -46,7 +46,7 @@
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics; import org.apache.hadoop.yarn.server.resourcemanager.ClusterMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
@ -107,8 +107,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
/* the list of applications that have finished and need to be purged */ /* the list of applications that have finished and need to be purged */
private final List<ApplicationId> finishedApplications = new ArrayList<ApplicationId>(); private final List<ApplicationId> finishedApplications = new ArrayList<ApplicationId>();
private HeartbeatResponse latestHeartBeatResponse = recordFactory private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory
.newRecordInstance(HeartbeatResponse.class); .newRecordInstance(NodeHeartbeatResponse.class);
private static final StateMachineFactory<RMNodeImpl, private static final StateMachineFactory<RMNodeImpl,
NodeState, NodeState,
@ -184,7 +184,7 @@ public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
this.nodeHealthStatus.setHealthReport("Healthy"); this.nodeHealthStatus.setHealthReport("Healthy");
this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis()); this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis());
this.latestHeartBeatResponse.setResponseId(0); this.latestNodeHeartBeatResponse.setResponseId(0);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock(); this.readLock = lock.readLock();
@ -304,7 +304,7 @@ public List<ContainerId> getContainersToCleanUp() {
}; };
@Override @Override
public void updateHeartbeatResponseForCleanup(HeartbeatResponse response) { public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) {
this.writeLock.lock(); this.writeLock.lock();
try { try {
@ -319,12 +319,12 @@ public void updateHeartbeatResponseForCleanup(HeartbeatResponse response) {
}; };
@Override @Override
public HeartbeatResponse getLastHeartBeatResponse() { public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
this.readLock.lock(); this.readLock.lock();
try { try {
return this.latestHeartBeatResponse; return this.latestNodeHeartBeatResponse;
} finally { } finally {
this.readLock.unlock(); this.readLock.unlock();
} }
@ -430,7 +430,7 @@ public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
if (rmNode.getTotalCapability().equals(newNode.getTotalCapability()) if (rmNode.getTotalCapability().equals(newNode.getTotalCapability())
&& rmNode.getHttpPort() == newNode.getHttpPort()) { && rmNode.getHttpPort() == newNode.getHttpPort()) {
// Reset heartbeat ID since node just restarted. // Reset heartbeat ID since node just restarted.
rmNode.getLastHeartBeatResponse().setResponseId(0); rmNode.getLastNodeHeartBeatResponse().setResponseId(0);
rmNode.context.getDispatcher().getEventHandler().handle( rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(rmNode)); new NodeAddedSchedulerEvent(rmNode));
} else { } else {
@ -507,7 +507,7 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event; RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
// Switch the last heartbeatresponse. // Switch the last heartbeatresponse.
rmNode.latestHeartBeatResponse = statusEvent.getLatestResponse(); rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
NodeHealthStatus remoteNodeHealthStatus = NodeHealthStatus remoteNodeHealthStatus =
statusEvent.getNodeHealthStatus(); statusEvent.getNodeHealthStatus();
@ -591,7 +591,7 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) {
RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event; RMNodeStatusEvent statusEvent = (RMNodeStatusEvent) event;
// Switch the last heartbeatresponse. // Switch the last heartbeatresponse.
rmNode.latestHeartBeatResponse = statusEvent.getLatestResponse(); rmNode.latestNodeHeartBeatResponse = statusEvent.getLatestResponse();
NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus(); NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus();
rmNode.setNodeHealthStatus(remoteNodeHealthStatus); rmNode.setNodeHealthStatus(remoteNodeHealthStatus);
if (remoteNodeHealthStatus.getIsNodeHealthy()) { if (remoteNodeHealthStatus.getIsNodeHealthy()) {

View File

@ -24,18 +24,18 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus; import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
public class RMNodeStatusEvent extends RMNodeEvent { public class RMNodeStatusEvent extends RMNodeEvent {
private final NodeHealthStatus nodeHealthStatus; private final NodeHealthStatus nodeHealthStatus;
private final List<ContainerStatus> containersCollection; private final List<ContainerStatus> containersCollection;
private final HeartbeatResponse latestResponse; private final NodeHeartbeatResponse latestResponse;
private final List<ApplicationId> keepAliveAppIds; private final List<ApplicationId> keepAliveAppIds;
public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus, public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds, List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
HeartbeatResponse latestResponse) { NodeHeartbeatResponse latestResponse) {
super(nodeId, RMNodeEventType.STATUS_UPDATE); super(nodeId, RMNodeEventType.STATUS_UPDATE);
this.nodeHealthStatus = nodeHealthStatus; this.nodeHealthStatus = nodeHealthStatus;
this.containersCollection = collection; this.containersCollection = collection;
@ -51,7 +51,7 @@ public List<ContainerStatus> getContainers() {
return this.containersCollection; return this.containersCollection;
} }
public HeartbeatResponse getLatestResponse() { public NodeHeartbeatResponse getLatestResponse() {
return this.latestResponse; return this.latestResponse;
} }

View File

@ -33,8 +33,8 @@
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
@ -93,12 +93,12 @@ public RegistrationResponse registerNode() throws Exception {
return registrationResponse; return registrationResponse;
} }
public HeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception { public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(), return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(),
isHealthy, ++responseId); isHealthy, ++responseId);
} }
public HeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId, public NodeHeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
int containerId, ContainerState containerState) throws Exception { int containerId, ContainerState containerState) throws Exception {
HashMap<ApplicationId, List<ContainerStatus>> nodeUpdate = HashMap<ApplicationId, List<ContainerStatus>> nodeUpdate =
new HashMap<ApplicationId, List<ContainerStatus>>(1); new HashMap<ApplicationId, List<ContainerStatus>>(1);
@ -112,12 +112,12 @@ public HeartbeatResponse nodeHeartbeat(ApplicationAttemptId attemptId,
return nodeHeartbeat(nodeUpdate, true); return nodeHeartbeat(nodeUpdate, true);
} }
public HeartbeatResponse nodeHeartbeat(Map<ApplicationId, public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
List<ContainerStatus>> conts, boolean isHealthy) throws Exception { List<ContainerStatus>> conts, boolean isHealthy) throws Exception {
return nodeHeartbeat(conts, isHealthy, ++responseId); return nodeHeartbeat(conts, isHealthy, ++responseId);
} }
public HeartbeatResponse nodeHeartbeat(Map<ApplicationId, public NodeHeartbeatResponse nodeHeartbeat(Map<ApplicationId,
List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception { List<ContainerStatus>> conts, boolean isHealthy, int resId) throws Exception {
NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class); NodeHeartbeatRequest req = Records.newRecord(NodeHeartbeatRequest.class);
NodeStatus status = Records.newRecord(NodeStatus.class); NodeStatus status = Records.newRecord(NodeStatus.class);
@ -133,8 +133,8 @@ public HeartbeatResponse nodeHeartbeat(Map<ApplicationId,
status.setNodeHealthStatus(healthStatus); status.setNodeHealthStatus(healthStatus);
req.setNodeStatus(status); req.setNodeStatus(status);
req.setLastKnownMasterKey(this.currentMasterKey); req.setLastKnownMasterKey(this.currentMasterKey);
HeartbeatResponse heartbeatResponse = NodeHeartbeatResponse heartbeatResponse =
resourceTracker.nodeHeartbeat(req).getHeartbeatResponse(); resourceTracker.nodeHeartbeat(req);
MasterKey masterKeyFromRM = heartbeatResponse.getMasterKey(); MasterKey masterKeyFromRM = heartbeatResponse.getMasterKey();
this.currentMasterKey = this.currentMasterKey =
(masterKeyFromRM != null (masterKeyFromRM != null

View File

@ -31,7 +31,7 @@
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
@ -187,11 +187,11 @@ public List<ApplicationId> getAppsToCleanup() {
} }
@Override @Override
public void updateHeartbeatResponseForCleanup(HeartbeatResponse response) { public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) {
} }
@Override @Override
public HeartbeatResponse getLastHeartBeatResponse() { public NodeHeartbeatResponse getLastNodeHeartBeatResponse() {
return null; return null;
} }

View File

@ -51,8 +51,8 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -151,8 +151,8 @@ public void heartbeat() throws IOException {
NodeHeartbeatRequest request = recordFactory NodeHeartbeatRequest request = recordFactory
.newRecordInstance(NodeHeartbeatRequest.class); .newRecordInstance(NodeHeartbeatRequest.class);
request.setNodeStatus(nodeStatus); request.setNodeStatus(nodeStatus);
HeartbeatResponse response = resourceTrackerService NodeHeartbeatResponse response = resourceTrackerService
.nodeHeartbeat(request).getHeartbeatResponse(); .nodeHeartbeat(request);
responseID = response.getResponseId(); responseID = response.getResponseId();
} }

View File

@ -36,7 +36,7 @@
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@ -92,15 +92,15 @@ public void testAppCleanup() throws Exception {
Assert.assertEquals(request, contReceived); Assert.assertEquals(request, contReceived);
am.unregisterAppAttempt(); am.unregisterAppAttempt();
HeartbeatResponse resp = nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1, NodeHeartbeatResponse resp = nm1.nodeHeartbeat(attempt.getAppAttemptId(), 1,
ContainerState.COMPLETE); ContainerState.COMPLETE);
am.waitForState(RMAppAttemptState.FINISHED); am.waitForState(RMAppAttemptState.FINISHED);
//currently only containers are cleaned via this //currently only containers are cleaned via this
//AM container is cleaned via container launcher //AM container is cleaned via container launcher
resp = nm1.nodeHeartbeat(true); resp = nm1.nodeHeartbeat(true);
List<ContainerId> contsToClean = resp.getContainersToCleanupList(); List<ContainerId> contsToClean = resp.getContainersToCleanup();
List<ApplicationId> apps = resp.getApplicationsToCleanupList(); List<ApplicationId> apps = resp.getApplicationsToCleanup();
int cleanedConts = contsToClean.size(); int cleanedConts = contsToClean.size();
int cleanedApps = apps.size(); int cleanedApps = apps.size();
waitCount = 0; waitCount = 0;
@ -109,8 +109,8 @@ public void testAppCleanup() throws Exception {
+ cleanedConts + " cleanedApps: " + cleanedApps); + cleanedConts + " cleanedApps: " + cleanedApps);
Thread.sleep(100); Thread.sleep(100);
resp = nm1.nodeHeartbeat(true); resp = nm1.nodeHeartbeat(true);
contsToClean = resp.getContainersToCleanupList(); contsToClean = resp.getContainersToCleanup();
apps = resp.getApplicationsToCleanupList(); apps = resp.getApplicationsToCleanup();
cleanedConts += contsToClean.size(); cleanedConts += contsToClean.size();
cleanedApps += apps.size(); cleanedApps += apps.size();
} }
@ -198,9 +198,9 @@ protected Dispatcher createDispatcher() {
.getId(), ContainerState.RUNNING, "nothing", 0)); .getId(), ContainerState.RUNNING, "nothing", 0));
containerStatuses.put(app.getApplicationId(), containerStatusList); containerStatuses.put(app.getApplicationId(), containerStatusList);
HeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true); NodeHeartbeatResponse resp = nm1.nodeHeartbeat(containerStatuses, true);
dispatcher.await(); dispatcher.await();
List<ContainerId> contsToClean = resp.getContainersToCleanupList(); List<ContainerId> contsToClean = resp.getContainersToCleanup();
int cleanedConts = contsToClean.size(); int cleanedConts = contsToClean.size();
waitCount = 0; waitCount = 0;
while (cleanedConts < 1 && waitCount++ < 200) { while (cleanedConts < 1 && waitCount++ < 200) {
@ -208,7 +208,7 @@ protected Dispatcher createDispatcher() {
Thread.sleep(100); Thread.sleep(100);
resp = nm1.nodeHeartbeat(true); resp = nm1.nodeHeartbeat(true);
dispatcher.await(); dispatcher.await();
contsToClean = resp.getContainersToCleanupList(); contsToClean = resp.getContainersToCleanup();
cleanedConts += contsToClean.size(); cleanedConts += contsToClean.size();
} }
LOG.info("Got cleanup for " + contsToClean.get(0)); LOG.info("Got cleanup for " + contsToClean.get(0));
@ -226,7 +226,7 @@ protected Dispatcher createDispatcher() {
resp = nm1.nodeHeartbeat(containerStatuses, true); resp = nm1.nodeHeartbeat(containerStatuses, true);
dispatcher.await(); dispatcher.await();
contsToClean = resp.getContainersToCleanupList(); contsToClean = resp.getContainersToCleanup();
cleanedConts = contsToClean.size(); cleanedConts = contsToClean.size();
// The cleanup list won't be instantaneous as it is given out by scheduler // The cleanup list won't be instantaneous as it is given out by scheduler
// and not RMNodeImpl. // and not RMNodeImpl.
@ -236,7 +236,7 @@ protected Dispatcher createDispatcher() {
Thread.sleep(100); Thread.sleep(100);
resp = nm1.nodeHeartbeat(true); resp = nm1.nodeHeartbeat(true);
dispatcher.await(); dispatcher.await();
contsToClean = resp.getContainersToCleanupList(); contsToClean = resp.getContainersToCleanup();
cleanedConts += contsToClean.size(); cleanedConts += contsToClean.size();
} }
LOG.info("Got cleanup for " + contsToClean.get(0)); LOG.info("Got cleanup for " + contsToClean.get(0));

View File

@ -38,7 +38,7 @@
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
@ -118,7 +118,7 @@ public void tearDown() throws Exception {
} }
private RMNodeStatusEvent getMockRMNodeStatusEvent() { private RMNodeStatusEvent getMockRMNodeStatusEvent() {
HeartbeatResponse response = mock(HeartbeatResponse.class); NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
NodeHealthStatus healthStatus = mock(NodeHealthStatus.class); NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
Boolean yes = new Boolean(true); Boolean yes = new Boolean(true);
@ -325,14 +325,14 @@ public void testUpdateHeartbeatResponseForCleanup() {
node.handle(statusEvent); node.handle(statusEvent);
Assert.assertEquals(1, node.getContainersToCleanUp().size()); Assert.assertEquals(1, node.getContainersToCleanUp().size());
Assert.assertEquals(1, node.getAppsToCleanup().size()); Assert.assertEquals(1, node.getAppsToCleanup().size());
HeartbeatResponse hbrsp = Records.newRecord(HeartbeatResponse.class); NodeHeartbeatResponse hbrsp = Records.newRecord(NodeHeartbeatResponse.class);
node.updateHeartbeatResponseForCleanup(hbrsp); node.updateNodeHeartbeatResponseForCleanup(hbrsp);
Assert.assertEquals(0, node.getContainersToCleanUp().size()); Assert.assertEquals(0, node.getContainersToCleanUp().size());
Assert.assertEquals(0, node.getAppsToCleanup().size()); Assert.assertEquals(0, node.getAppsToCleanup().size());
Assert.assertEquals(1, hbrsp.getContainersToCleanupCount()); Assert.assertEquals(1, hbrsp.getContainersToCleanup().size());
Assert.assertEquals(completedContainerId, hbrsp.getContainerToCleanup(0)); Assert.assertEquals(completedContainerId, hbrsp.getContainersToCleanup().get(0));
Assert.assertEquals(1, hbrsp.getApplicationsToCleanupCount()); Assert.assertEquals(1, hbrsp.getApplicationsToCleanup().size());
Assert.assertEquals(finishedAppId, hbrsp.getApplicationsToCleanup(0)); Assert.assertEquals(finishedAppId, hbrsp.getApplicationsToCleanup().get(0));
} }
private RMNodeImpl getRunningNode() { private RMNodeImpl getRunningNode() {

View File

@ -31,7 +31,7 @@
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
@ -219,7 +219,7 @@ public void testRMRestart() throws Exception {
Assert.assertTrue(allocResponse.getReboot()); Assert.assertTrue(allocResponse.getReboot());
// NM should be rebooted on heartbeat, even first heartbeat for nm2 // NM should be rebooted on heartbeat, even first heartbeat for nm2
HeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction()); Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction());
hbResponse = nm2.nodeHeartbeat(true); hbResponse = nm2.nodeHeartbeat(true);
Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction()); Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction());

View File

@ -36,9 +36,9 @@
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
@ -75,7 +75,7 @@ public void testDecommissionWithIncludeHosts() throws Exception {
assert(metrics != null); assert(metrics != null);
int metricCount = metrics.getNumDecommisionedNMs(); int metricCount = metrics.getNumDecommisionedNMs();
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(true); nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
@ -124,7 +124,7 @@ public void testDecommissionWithExcludeHosts() throws Exception {
int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs(); int metricCount = ClusterMetrics.getMetrics().getNumDecommisionedNMs();
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat(true); nodeHeartbeat = nm2.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
@ -161,7 +161,7 @@ public void testAddNewIncludePathToConfiguration() throws Exception {
ClusterMetrics metrics = ClusterMetrics.getMetrics(); ClusterMetrics metrics = ClusterMetrics.getMetrics();
assert(metrics != null); assert(metrics != null);
int initialMetricCount = metrics.getNumDecommisionedNMs(); int initialMetricCount = metrics.getNumDecommisionedNMs();
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals( Assert.assertEquals(
NodeAction.NORMAL, NodeAction.NORMAL,
nodeHeartbeat.getNodeAction()); nodeHeartbeat.getNodeAction());
@ -198,7 +198,7 @@ public void testAddNewExcludePathToConfiguration() throws Exception {
ClusterMetrics metrics = ClusterMetrics.getMetrics(); ClusterMetrics metrics = ClusterMetrics.getMetrics();
assert(metrics != null); assert(metrics != null);
int initialMetricCount = metrics.getNumDecommisionedNMs(); int initialMetricCount = metrics.getNumDecommisionedNMs();
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertEquals( Assert.assertEquals(
NodeAction.NORMAL, NodeAction.NORMAL,
nodeHeartbeat.getNodeAction()); nodeHeartbeat.getNodeAction());
@ -254,7 +254,7 @@ public void testReboot() throws Exception {
MockNM nm2 = rm.registerNode("host2:1234", 2048); MockNM nm2 = rm.registerNode("host2:1234", 2048);
int initialMetricCount = ClusterMetrics.getMetrics().getNumRebootedNMs(); int initialMetricCount = ClusterMetrics.getMetrics().getNumRebootedNMs();
HeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction())); Assert.assertTrue(NodeAction.NORMAL.equals(nodeHeartbeat.getNodeAction()));
nodeHeartbeat = nm2.nodeHeartbeat( nodeHeartbeat = nm2.nodeHeartbeat(
@ -351,7 +351,7 @@ protected Dispatcher createDispatcher() {
// reconnect of healthy node // reconnect of healthy node
nm1 = rm.registerNode("host1:1234", 5120); nm1 = rm.registerNode("host1:1234", 5120);
HeartbeatResponse response = nm1.nodeHeartbeat(true); NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction())); Assert.assertTrue(NodeAction.NORMAL.equals(response.getNodeAction()));
dispatcher.await(); dispatcher.await();
Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs()); Assert.assertEquals(expectedNMs, ClusterMetrics.getMetrics().getNumActiveNMs());

View File

@ -109,7 +109,7 @@ public void run() {
.newRecordInstance(NodeHeartbeatRequest.class); .newRecordInstance(NodeHeartbeatRequest.class);
request.setNodeStatus(nodeStatus); request.setNodeStatus(nodeStatus);
lastResponseID = resourceTrackerService.nodeHeartbeat(request) lastResponseID = resourceTrackerService.nodeHeartbeat(request)
.getHeartbeatResponse().getResponseId(); .getResponseId();
Thread.sleep(1000); Thread.sleep(1000);
} catch(Exception e) { } catch(Exception e) {

View File

@ -31,8 +31,8 @@
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor; import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager; import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
@ -116,23 +116,20 @@ public void testRPCResponseId() throws IOException {
nodeHeartBeatRequest.setNodeStatus(nodeStatus); nodeHeartBeatRequest.setNodeStatus(nodeStatus);
nodeStatus.setResponseId(0); nodeStatus.setResponseId(0);
HeartbeatResponse response = resourceTrackerService.nodeHeartbeat( NodeHeartbeatResponse response = resourceTrackerService.nodeHeartbeat(
nodeHeartBeatRequest).getHeartbeatResponse(); nodeHeartBeatRequest);
Assert.assertTrue(response.getResponseId() == 1); Assert.assertTrue(response.getResponseId() == 1);
nodeStatus.setResponseId(response.getResponseId()); nodeStatus.setResponseId(response.getResponseId());
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest) response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest);
.getHeartbeatResponse();
Assert.assertTrue(response.getResponseId() == 2); Assert.assertTrue(response.getResponseId() == 2);
/* try calling with less response id */ /* try calling with less response id */
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest) response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest);
.getHeartbeatResponse();
Assert.assertTrue(response.getResponseId() == 2); Assert.assertTrue(response.getResponseId() == 2);
nodeStatus.setResponseId(0); nodeStatus.setResponseId(0);
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest) response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest);
.getHeartbeatResponse();
Assert.assertTrue(NodeAction.REBOOT.equals(response.getNodeAction())); Assert.assertTrue(NodeAction.REBOOT.equals(response.getNodeAction()));
} }
} }

View File

@ -361,8 +361,7 @@ public NodeHeartbeatResponse nodeHeartbeat(
NodeHeartbeatResponse response = recordFactory.newRecordInstance( NodeHeartbeatResponse response = recordFactory.newRecordInstance(
NodeHeartbeatResponse.class); NodeHeartbeatResponse.class);
try { try {
response.setHeartbeatResponse(rt.nodeHeartbeat(request) response = rt.nodeHeartbeat(request);
.getHeartbeatResponse());
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.info("Exception in heartbeat from node " + LOG.info("Exception in heartbeat from node " +
request.getNodeStatus().getNodeId(), ioe); request.getNodeStatus().getNodeId(), ioe);

View File

@ -27,7 +27,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse; import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@ -67,7 +67,7 @@ protected Dispatcher createDispatcher() {
Assert.assertNotNull("Registration should cause a key-update!", masterKey); Assert.assertNotNull("Registration should cause a key-update!", masterKey);
dispatcher.await(); dispatcher.await();
HeartbeatResponse response = nm.nodeHeartbeat(true); NodeHeartbeatResponse response = nm.nodeHeartbeat(true);
Assert.assertNull( Assert.assertNull(
"First heartbeat after registration shouldn't get any key updates!", "First heartbeat after registration shouldn't get any key updates!",
response.getMasterKey()); response.getMasterKey());