YARN-663. Changed ResourceTracker API and LocalizationProtocol API to throw YarnRemoteException and IOException. Contributed by Xuan Gong.

svn merge --ignore-ancestry -c 1481215 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1481216 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-05-10 21:59:59 +00:00
parent 2482e3270f
commit 0abf49936a
15 changed files with 69 additions and 28 deletions

View File

@ -168,6 +168,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-634. Modified YarnRemoteException to be not backed by PB and introduced YARN-634. Modified YarnRemoteException to be not backed by PB and introduced
a separate SerializedException record. (Siddharth Seth via vinodkv) a separate SerializedException record. (Siddharth Seth via vinodkv)
YARN-663. Changed ResourceTracker API and LocalizationProtocol API to throw
YarnRemoteException and IOException. (Xuan Gong via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.yarn.server.api; package org.apache.hadoop.yarn.server.api;
import java.io.IOException;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@ -25,7 +27,11 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResp
public interface ResourceTracker { public interface ResourceTracker {
public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request) throws YarnRemoteException; public RegisterNodeManagerResponse registerNodeManager(
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnRemoteException; RegisterNodeManagerRequest request) throws YarnRemoteException,
IOException;
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException, IOException;
} }

View File

@ -53,7 +53,8 @@ private ResourceTrackerPB proxy;
@Override @Override
public RegisterNodeManagerResponse registerNodeManager( public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException { RegisterNodeManagerRequest request) throws YarnRemoteException,
IOException {
RegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto(); RegisterNodeManagerRequestProto requestProto = ((RegisterNodeManagerRequestPBImpl)request).getProto();
try { try {
return new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager(null, requestProto)); return new RegisterNodeManagerResponsePBImpl(proxy.registerNodeManager(null, requestProto));
@ -64,7 +65,7 @@ private ResourceTrackerPB proxy;
@Override @Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException { throws YarnRemoteException, IOException {
NodeHeartbeatRequestProto requestProto = ((NodeHeartbeatRequestPBImpl)request).getProto(); NodeHeartbeatRequestProto requestProto = ((NodeHeartbeatRequestPBImpl)request).getProto();
try { try {
return new NodeHeartbeatResponsePBImpl(proxy.nodeHeartbeat(null, requestProto)); return new NodeHeartbeatResponsePBImpl(proxy.nodeHeartbeat(null, requestProto));

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.api.impl.pb.service; package org.apache.hadoop.yarn.server.api.impl.pb.service;
import java.io.IOException;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto; import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
@ -53,6 +55,8 @@ public class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {
return ((RegisterNodeManagerResponsePBImpl)response).getProto(); return ((RegisterNodeManagerResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) { } catch (YarnRemoteException e) {
throw new ServiceException(e); throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
} }
} }
@ -65,6 +69,8 @@ public class ResourceTrackerPBServiceImpl implements ResourceTrackerPB {
return ((NodeHeartbeatResponsePBImpl)response).getProto(); return ((NodeHeartbeatResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) { } catch (YarnRemoteException e) {
throw new ServiceException(e); throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
} }
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn; package org.apache.hadoop.yarn;
import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import junit.framework.Assert; import junit.framework.Assert;
@ -102,14 +103,15 @@ public class TestRPCFactories {
@Override @Override
public RegisterNodeManagerResponse registerNodeManager( public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException { RegisterNodeManagerRequest request) throws YarnRemoteException,
IOException {
// TODO Auto-generated method stub // TODO Auto-generated method stub
return null; return null;
} }
@Override @Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException { throws YarnRemoteException, IOException {
// TODO Auto-generated method stub // TODO Auto-generated method stub
return null; return null;
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager; package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
@ -213,7 +214,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
} }
@VisibleForTesting @VisibleForTesting
protected void registerWithRM() throws YarnRemoteException { protected void registerWithRM() throws YarnRemoteException, IOException {
Configuration conf = getConfig(); Configuration conf = getConfig();
rmConnectWaitMS = rmConnectWaitMS =
conf.getInt( conf.getInt(

View File

@ -17,11 +17,13 @@
*/ */
package org.apache.hadoop.yarn.server.nodemanager.api; package org.apache.hadoop.yarn.server.nodemanager.api;
import java.io.IOException;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerStatus;
public interface LocalizationProtocol { public interface LocalizationProtocol {
public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status)
throws YarnRemoteException; throws YarnRemoteException, IOException;
} }

View File

@ -56,7 +56,7 @@ public class LocalizationProtocolPBClientImpl implements LocalizationProtocol,
@Override @Override
public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status) public LocalizerHeartbeatResponse heartbeat(LocalizerStatus status)
throws YarnRemoteException { throws YarnRemoteException, IOException {
LocalizerStatusProto statusProto = ((LocalizerStatusPBImpl)status).getProto(); LocalizerStatusProto statusProto = ((LocalizerStatusPBImpl)status).getProto();
try { try {
return new LocalizerHeartbeatResponsePBImpl( return new LocalizerHeartbeatResponsePBImpl(

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.yarn.server.nodemanager.api.impl.pb.service; package org.apache.hadoop.yarn.server.nodemanager.api.impl.pb.service;
import java.io.IOException;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalizerHeartbeatResponsePBImpl; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalizerHeartbeatResponsePBImpl;
import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalizerStatusPBImpl; import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.impl.pb.LocalizerStatusPBImpl;
@ -47,6 +49,8 @@ public class LocalizationProtocolPBServiceImpl implements LocalizationProtocolPB
return ((LocalizerHeartbeatResponsePBImpl)response).getProto(); return ((LocalizerHeartbeatResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) { } catch (YarnRemoteException e) {
throw new ServiceException(e); throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
} }
} }

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager; package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@ -32,13 +34,16 @@ public class LocalRMInterface implements ResourceTracker {
private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
@Override @Override
public RegisterNodeManagerResponse registerNodeManager(RegisterNodeManagerRequest request) throws YarnRemoteException { public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException,
IOException {
RegisterNodeManagerResponse response = recordFactory.newRecordInstance(RegisterNodeManagerResponse.class); RegisterNodeManagerResponse response = recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
return response; return response;
} }
@Override @Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) throws YarnRemoteException { public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException, IOException {
NodeHeartbeatResponse response = recordFactory.newRecordInstance(NodeHeartbeatResponse.class); NodeHeartbeatResponse response = recordFactory.newRecordInstance(NodeHeartbeatResponse.class);
return response; return response;
} }

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager; package org.apache.hadoop.yarn.server.nodemanager;
import java.io.IOException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Dispatcher;
@ -61,7 +63,8 @@ public class MockNodeStatusUpdater extends NodeStatusUpdaterImpl {
@Override @Override
public RegisterNodeManagerResponse registerNodeManager( public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException { RegisterNodeManagerRequest request) throws YarnRemoteException,
IOException {
RegisterNodeManagerResponse response = recordFactory RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class); .newRecordInstance(RegisterNodeManagerResponse.class);
return response; return response;
@ -69,7 +72,7 @@ public class MockNodeStatusUpdater extends NodeStatusUpdaterImpl {
@Override @Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException { throws YarnRemoteException, IOException {
NodeStatus nodeStatus = request.getNodeStatus(); NodeStatus nodeStatus = request.getNodeStatus();
LOG.info("Got heartbeat number " + heartBeatID); LOG.info("Got heartbeat number " + heartBeatID);
nodeStatus.setResponseId(heartBeatID++); nodeStatus.setResponseId(heartBeatID++);

View File

@ -166,7 +166,7 @@ public class TestNodeManagerResync {
} }
@Override @Override
protected void registerWithRM() throws YarnRemoteException { protected void registerWithRM() throws YarnRemoteException, IOException {
super.registerWithRM(); super.registerWithRM();
registrationCount++; registrationCount++;
} }

View File

@ -123,7 +123,8 @@ public class TestNodeStatusUpdater {
@Override @Override
public RegisterNodeManagerResponse registerNodeManager( public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException { RegisterNodeManagerRequest request) throws YarnRemoteException,
IOException {
NodeId nodeId = request.getNodeId(); NodeId nodeId = request.getNodeId();
Resource resource = request.getResource(); Resource resource = request.getResource();
LOG.info("Registering " + nodeId.toString()); LOG.info("Registering " + nodeId.toString());
@ -167,7 +168,7 @@ public class TestNodeStatusUpdater {
@Override @Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException { throws YarnRemoteException, IOException {
NodeStatus nodeStatus = request.getNodeStatus(); NodeStatus nodeStatus = request.getNodeStatus();
LOG.info("Got heartbeat number " + heartBeatID); LOG.info("Got heartbeat number " + heartBeatID);
NodeManagerMetrics mockMetrics = mock(NodeManagerMetrics.class); NodeManagerMetrics mockMetrics = mock(NodeManagerMetrics.class);
@ -390,7 +391,8 @@ public class TestNodeStatusUpdater {
@Override @Override
public RegisterNodeManagerResponse registerNodeManager( public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException { RegisterNodeManagerRequest request) throws YarnRemoteException,
IOException {
RegisterNodeManagerResponse response = recordFactory RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class); .newRecordInstance(RegisterNodeManagerResponse.class);
@ -399,7 +401,7 @@ public class TestNodeStatusUpdater {
} }
@Override @Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException { throws YarnRemoteException, IOException {
NodeStatus nodeStatus = request.getNodeStatus(); NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID++); nodeStatus.setResponseId(heartBeatID++);
@ -424,7 +426,8 @@ public class TestNodeStatusUpdater {
@Override @Override
public RegisterNodeManagerResponse registerNodeManager( public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException { RegisterNodeManagerRequest request) throws YarnRemoteException,
IOException {
RegisterNodeManagerResponse response = RegisterNodeManagerResponse response =
recordFactory.newRecordInstance(RegisterNodeManagerResponse.class); recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
@ -434,7 +437,7 @@ public class TestNodeStatusUpdater {
@Override @Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException { throws YarnRemoteException, IOException {
LOG.info("Got heartBeatId: [" + heartBeatID +"]"); LOG.info("Got heartBeatId: [" + heartBeatID +"]");
NodeStatus nodeStatus = request.getNodeStatus(); NodeStatus nodeStatus = request.getNodeStatus();
nodeStatus.setResponseId(heartBeatID++); nodeStatus.setResponseId(heartBeatID++);
@ -474,7 +477,8 @@ public class TestNodeStatusUpdater {
@Override @Override
public RegisterNodeManagerResponse registerNodeManager( public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException { RegisterNodeManagerRequest request) throws YarnRemoteException,
IOException {
RegisterNodeManagerResponse response = recordFactory RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class); .newRecordInstance(RegisterNodeManagerResponse.class);
response.setNodeAction(registerNodeAction); response.setNodeAction(registerNodeAction);
@ -483,7 +487,7 @@ public class TestNodeStatusUpdater {
@Override @Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException { throws YarnRemoteException, IOException {
try { try {
if (heartBeatID == 0) { if (heartBeatID == 0) {
Assert.assertEquals(request.getNodeStatus().getContainersStatuses() Assert.assertEquals(request.getNodeStatus().getContainersStatuses()
@ -564,7 +568,8 @@ public class TestNodeStatusUpdater {
public NodeAction registerNodeAction = NodeAction.NORMAL; public NodeAction registerNodeAction = NodeAction.NORMAL;
@Override @Override
public RegisterNodeManagerResponse registerNodeManager( public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException { RegisterNodeManagerRequest request) throws YarnRemoteException,
IOException {
RegisterNodeManagerResponse response = recordFactory RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class); .newRecordInstance(RegisterNodeManagerResponse.class);
@ -574,7 +579,7 @@ public class TestNodeStatusUpdater {
@Override @Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException { throws YarnRemoteException, IOException {
heartBeatID++; heartBeatID++;
throw RPCUtil.getRemoteException("NodeHeartbeat exception"); throw RPCUtil.getRemoteException("NodeHeartbeat exception");
} }

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -161,7 +162,8 @@ public class ResourceTrackerService extends AbstractService implements
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public RegisterNodeManagerResponse registerNodeManager( public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) throws YarnRemoteException { RegisterNodeManagerRequest request) throws YarnRemoteException,
IOException {
NodeId nodeId = request.getNodeId(); NodeId nodeId = request.getNodeId();
String host = nodeId.getHost(); String host = nodeId.getHost();
@ -224,7 +226,7 @@ public class ResourceTrackerService extends AbstractService implements
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
throws YarnRemoteException { throws YarnRemoteException, IOException {
NodeStatus remoteNodeStatus = request.getNodeStatus(); NodeStatus remoteNodeStatus = request.getNodeStatus();
/** /**

View File

@ -306,7 +306,8 @@ public class MiniYARNCluster extends CompositeService {
@Override @Override
public NodeHeartbeatResponse nodeHeartbeat( public NodeHeartbeatResponse nodeHeartbeat(
NodeHeartbeatRequest request) throws YarnRemoteException { NodeHeartbeatRequest request) throws YarnRemoteException,
IOException {
NodeHeartbeatResponse response = recordFactory.newRecordInstance( NodeHeartbeatResponse response = recordFactory.newRecordInstance(
NodeHeartbeatResponse.class); NodeHeartbeatResponse.class);
try { try {
@ -322,7 +323,7 @@ public class MiniYARNCluster extends CompositeService {
@Override @Override
public RegisterNodeManagerResponse registerNodeManager( public RegisterNodeManagerResponse registerNodeManager(
RegisterNodeManagerRequest request) RegisterNodeManagerRequest request)
throws YarnRemoteException { throws YarnRemoteException, IOException {
RegisterNodeManagerResponse response = recordFactory. RegisterNodeManagerResponse response = recordFactory.
newRecordInstance(RegisterNodeManagerResponse.class); newRecordInstance(RegisterNodeManagerResponse.class);
try { try {