Merge -r 1175356:1175357 from trunk to branch-0.23 to fix MAPREDUCE-3053.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1175359 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-09-25 09:51:09 +00:00
parent da47ed5089
commit 70d6537d39
5 changed files with 85 additions and 3 deletions

View File

@ -1377,6 +1377,12 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2691. Increase threadpool size for launching containers in MAPREDUCE-2691. Increase threadpool size for launching containers in
MapReduce ApplicationMaster. (vinodkv via acmurthy) MapReduce ApplicationMaster. (vinodkv via acmurthy)
MAPREDUCE-2990. Fixed display of NodeHealthStatus. (Subroto Sanyal via
acmurthy)
MAPREDUCE-3053. Better diagnostic message for unknown methods in ProtoBuf
RPCs. (vinodkv via acmurthy)
Release 0.22.0 - Unreleased Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -329,6 +329,12 @@ public class ProtoOverHadoopRpcEngine implements RpcEngine {
+ methodName); + methodName);
MethodDescriptor methodDescriptor = service.getDescriptorForType() MethodDescriptor methodDescriptor = service.getDescriptorForType()
.findMethodByName(methodName); .findMethodByName(methodName);
if (methodDescriptor == null) {
String msg = "Unknown method " + methodName + " called on "
+ protocol + " protocol.";
LOG.warn(msg);
return handleException(new IOException(msg));
}
Message prototype = service.getRequestPrototype(methodDescriptor); Message prototype = service.getRequestPrototype(methodDescriptor);
Message param = prototype.newBuilderForType() Message param = prototype.newBuilderForType()
.mergeFrom(rpcRequest.getRequestProto()).build(); .mergeFrom(rpcRequest.getRequestProto()).build();

View File

@ -25,9 +25,11 @@ import junit.framework.Assert;
import org.apache.avro.ipc.Server; import org.apache.avro.ipc.Server;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.ContainerManager; import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationIdRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.factory.providers.YarnRemoteExceptionFactoryProvid
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC; import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Test; import org.junit.Test;
public class TestRPC { public class TestRPC {
@ -65,6 +68,35 @@ public class TestRPC {
// test(HadoopYarnRPC.class.getName()); // test(HadoopYarnRPC.class.getName());
// } // }
@Test
public void testUnknownCall() {
Configuration conf = new Configuration();
conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
.getName());
YarnRPC rpc = YarnRPC.create(conf);
String bindAddr = "localhost:0";
InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
Server server = rpc.getServer(ContainerManager.class,
new DummyContainerManager(), addr, conf, null, 1);
server.start();
// Any unrelated protocol would do
ClientRMProtocol proxy = (ClientRMProtocol) rpc.getProxy(
ClientRMProtocol.class, NetUtils.createSocketAddr("localhost:"
+ server.getPort()), conf);
try {
proxy.getNewApplicationId(Records
.newRecord(GetNewApplicationIdRequest.class));
Assert.fail("Excepted RPC call to fail with unknown method.");
} catch (YarnRemoteException e) {
Assert.assertEquals("Unknown method getNewApplicationId called on "
+ "org.apache.hadoop.yarn.proto.ClientRMProtocol"
+ "$ClientRMProtocolService$BlockingInterface protocol.", e
.getMessage());
}
}
@Test @Test
public void testHadoopProtoRPC() throws Exception { public void testHadoopProtoRPC() throws Exception {
test(HadoopYarnProtoRPC.class.getName()); test(HadoopYarnProtoRPC.class.getName());

View File

@ -147,6 +147,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
this.httpAddress = hostName + ":" + httpPort;; this.httpAddress = hostName + ":" + httpPort;;
this.node = node; this.node = node;
this.nodeHealthStatus.setIsNodeHealthy(true); this.nodeHealthStatus.setIsNodeHealthy(true);
this.nodeHealthStatus.setHealthReport("Healthy");
this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis()); this.nodeHealthStatus.setLastHealthReportTime(System.currentTimeMillis());
this.latestHeartBeatResponse.setResponseId(0); this.latestHeartBeatResponse.setResponseId(0);
@ -222,6 +223,18 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
} }
} }
private void setNodeHealthStatus(NodeHealthStatus status)
{
this.writeLock.lock();
try {
this.nodeHealthStatus.setHealthReport(status.getHealthReport());
this.nodeHealthStatus.setIsNodeHealthy(status.getIsNodeHealthy());
this.nodeHealthStatus.setLastHealthReportTime(status.getLastHealthReportTime());
} finally {
this.writeLock.unlock();
}
}
@Override @Override
public RMNodeState getState() { public RMNodeState getState() {
this.readLock.lock(); this.readLock.lock();
@ -345,7 +358,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
// Switch the last heartbeatresponse. // Switch the last heartbeatresponse.
rmNode.latestHeartBeatResponse = statusEvent.getLatestResponse(); rmNode.latestHeartBeatResponse = statusEvent.getLatestResponse();
if (!statusEvent.getNodeHealthStatus().getIsNodeHealthy()) { NodeHealthStatus remoteNodeHealthStatus =
statusEvent.getNodeHealthStatus();
rmNode.setNodeHealthStatus(remoteNodeHealthStatus);
if (!remoteNodeHealthStatus.getIsNodeHealthy()) {
// Inform the scheduler // Inform the scheduler
rmNode.context.getDispatcher().getEventHandler().handle( rmNode.context.getDispatcher().getEventHandler().handle(
new NodeRemovedSchedulerEvent(rmNode)); new NodeRemovedSchedulerEvent(rmNode));
@ -392,8 +408,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
// Switch the last heartbeatresponse. // Switch the last heartbeatresponse.
rmNode.latestHeartBeatResponse = statusEvent.getLatestResponse(); rmNode.latestHeartBeatResponse = statusEvent.getLatestResponse();
NodeHealthStatus remoteNodeHealthStatus = statusEvent.getNodeHealthStatus();
if (statusEvent.getNodeHealthStatus().getIsNodeHealthy()) { rmNode.setNodeHealthStatus(remoteNodeHealthStatus);
if (remoteNodeHealthStatus.getIsNodeHealthy()) {
rmNode.context.getDispatcher().getEventHandler().handle( rmNode.context.getDispatcher().getEventHandler().handle(
new NodeAddedSchedulerEvent(rmNode)); new NodeAddedSchedulerEvent(rmNode));
return RMNodeState.RUNNING; return RMNodeState.RUNNING;

View File

@ -18,12 +18,16 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import static org.junit.Assert.assertNotNull;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
@ -154,6 +158,23 @@ public class TestResourceManager {
LOG.info("--- END: testResourceAllocation ---"); LOG.info("--- END: testResourceAllocation ---");
} }
@Test
public void testNodeHealthReportIsNotNull() throws Exception{
String host1 = "host1";
final int memory = 4 * 1024;
org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm1 =
registerNode(host1, 1234, 2345, NetworkTopology.DEFAULT_RACK, memory);
nm1.heartbeat();
nm1.heartbeat();
Collection<RMNode> values = resourceManager.getRMContext().getRMNodes().values();
for (RMNode ni : values)
{
NodeHealthStatus nodeHealthStatus = ni.getNodeHealthStatus();
String healthReport = nodeHealthStatus.getHealthReport();
assertNotNull(healthReport);
}
}
private void checkResourceUsage( private void checkResourceUsage(
org.apache.hadoop.yarn.server.resourcemanager.NodeManager... nodes ) { org.apache.hadoop.yarn.server.resourcemanager.NodeManager... nodes ) {
for (org.apache.hadoop.yarn.server.resourcemanager.NodeManager nodeManager : nodes) { for (org.apache.hadoop.yarn.server.resourcemanager.NodeManager nodeManager : nodes) {