From eb78183b2b7a26c6cbc99697027b66c0696e39c5 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 5 Sep 2012 00:16:22 +0000 Subject: [PATCH] YARN-79. Implement close on all clients to YARN so that RPC clients don't throw exceptions on shut-down. Contributed by Vinod Kumar Vavilapalli. svn merge --ignore-ancestry -c 1380942 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1380943 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop-yarn/hadoop-yarn-client/pom.xml | 5 + .../hadoop/yarn/client/TestYarnClient.java | 14 +++ .../pb/client/AMRMProtocolPBClientImpl.java | 33 +++--- .../client/ClientRMProtocolPBClientImpl.java | 101 +++++++++++------- .../client/ContainerManagerPBClientImpl.java | 38 ++++--- 6 files changed, 127 insertions(+), 67 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index cb617f79459..9b36fee9e23 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -56,6 +56,9 @@ Release 2.1.0-alpha - Unreleased YARN-37. Change TestRMAppTransitions to use the DrainDispatcher. (Mayank Bansal via sseth) + YARN-79. Implement close on all clients to YARN so that RPC clients don't + throw exceptions on shut-down. (Vinod Kumar Vavilapalli) + Release 0.23.4 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml index 1c2657364e3..ecc4590c68b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/pom.xml @@ -33,5 +33,10 @@ org.apache.hadoop hadoop-yarn-common + + org.apache.hadoop + hadoop-yarn-server-resourcemanager + test + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/hadoop/yarn/client/TestYarnClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/hadoop/yarn/client/TestYarnClient.java index 58737da1f7d..d0fc31f454b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/hadoop/yarn/client/TestYarnClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/hadoop/yarn/client/TestYarnClient.java @@ -18,6 +18,8 @@ package org.hadoop.yarn.client; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.junit.Test; public class TestYarnClient { @@ -27,4 +29,16 @@ public class TestYarnClient { // More to come later. } + @Test + public void testClientStop() { + Configuration conf = new Configuration(); + ResourceManager rm = new ResourceManager(null); + rm.init(conf); + rm.start(); + + YarnClient client = new YarnClientImpl(); + client.init(conf); + client.start(); + client.stop(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/AMRMProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/AMRMProtocolPBClientImpl.java index 29314300480..382d913eef4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/AMRMProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/AMRMProtocolPBClientImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.api.impl.pb.client; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; @@ -46,16 +47,19 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterR import com.google.protobuf.ServiceException; -public class AMRMProtocolPBClientImpl implements AMRMProtocol { +public class AMRMProtocolPBClientImpl implements AMRMProtocol, Closeable { private AMRMProtocolPB proxy; - - public AMRMProtocolPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException { + + public AMRMProtocolPBClientImpl(long clientVersion, InetSocketAddress addr, + Configuration conf) throws IOException { RPC.setProtocolEngine(conf, AMRMProtocolPB.class, ProtobufRpcEngine.class); - proxy = (AMRMProtocolPB)RPC.getProxy( - AMRMProtocolPB.class, clientVersion, addr, conf); + proxy = + (AMRMProtocolPB) RPC.getProxy(AMRMProtocolPB.class, clientVersion, + addr, conf); } - + + @Override public void close() { if (this.proxy != null) { RPC.stopProxy(this.proxy); @@ -65,7 +69,8 @@ public class AMRMProtocolPBClientImpl implements AMRMProtocol { @Override public AllocateResponse allocate(AllocateRequest request) throws YarnRemoteException { - AllocateRequestProto requestProto = ((AllocateRequestPBImpl)request).getProto(); + AllocateRequestProto requestProto = + ((AllocateRequestPBImpl) request).getProto(); try { return new AllocateResponsePBImpl(proxy.allocate(null, requestProto)); } catch (ServiceException e) { @@ -73,14 +78,14 @@ public class AMRMProtocolPBClientImpl implements AMRMProtocol { } } - - @Override public FinishApplicationMasterResponse finishApplicationMaster( FinishApplicationMasterRequest request) throws YarnRemoteException { - FinishApplicationMasterRequestProto requestProto = ((FinishApplicationMasterRequestPBImpl)request).getProto(); + FinishApplicationMasterRequestProto requestProto = + ((FinishApplicationMasterRequestPBImpl) request).getProto(); try { - return new FinishApplicationMasterResponsePBImpl(proxy.finishApplicationMaster(null, requestProto)); + return new FinishApplicationMasterResponsePBImpl( + proxy.finishApplicationMaster(null, requestProto)); } catch (ServiceException e) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e); } @@ -89,9 +94,11 @@ public class AMRMProtocolPBClientImpl implements AMRMProtocol { @Override public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) throws YarnRemoteException { - RegisterApplicationMasterRequestProto requestProto = ((RegisterApplicationMasterRequestPBImpl)request).getProto(); + RegisterApplicationMasterRequestProto requestProto = + ((RegisterApplicationMasterRequestPBImpl) request).getProto(); try { - return new RegisterApplicationMasterResponsePBImpl(proxy.registerApplicationMaster(null, requestProto)); + return new RegisterApplicationMasterResponsePBImpl( + proxy.registerApplicationMaster(null, requestProto)); } catch (ServiceException e) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java index 4167e29b9d9..0f2bf7a34fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ClientRMProtocolPBClientImpl.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.api.impl.pb.client; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; @@ -81,22 +82,35 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SubmitApplicationRequestPr import com.google.protobuf.ServiceException; -public class ClientRMProtocolPBClientImpl implements ClientRMProtocol { +public class ClientRMProtocolPBClientImpl implements ClientRMProtocol, + Closeable { private ClientRMProtocolPB proxy; - - public ClientRMProtocolPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException { - RPC.setProtocolEngine(conf, ClientRMProtocolPB.class, ProtobufRpcEngine.class); - proxy = (ClientRMProtocolPB)RPC.getProxy( - ClientRMProtocolPB.class, clientVersion, addr, conf); + + public ClientRMProtocolPBClientImpl(long clientVersion, + InetSocketAddress addr, Configuration conf) throws IOException { + RPC.setProtocolEngine(conf, ClientRMProtocolPB.class, + ProtobufRpcEngine.class); + proxy = + (ClientRMProtocolPB) RPC.getProxy(ClientRMProtocolPB.class, + clientVersion, addr, conf); } - + + @Override + public void close() { + if (this.proxy != null) { + RPC.stopProxy(this.proxy); + } + } + @Override public KillApplicationResponse forceKillApplication( KillApplicationRequest request) throws YarnRemoteException { - KillApplicationRequestProto requestProto = ((KillApplicationRequestPBImpl)request).getProto(); + KillApplicationRequestProto requestProto = + ((KillApplicationRequestPBImpl) request).getProto(); try { - return new KillApplicationResponsePBImpl(proxy.forceKillApplication(null, requestProto)); + return new KillApplicationResponsePBImpl(proxy.forceKillApplication(null, + requestProto)); } catch (ServiceException e) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e); } @@ -105,9 +119,11 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol { @Override public GetApplicationReportResponse getApplicationReport( GetApplicationReportRequest request) throws YarnRemoteException { - GetApplicationReportRequestProto requestProto = ((GetApplicationReportRequestPBImpl)request).getProto(); + GetApplicationReportRequestProto requestProto = + ((GetApplicationReportRequestPBImpl) request).getProto(); try { - return new GetApplicationReportResponsePBImpl(proxy.getApplicationReport(null, requestProto)); + return new GetApplicationReportResponsePBImpl(proxy.getApplicationReport( + null, requestProto)); } catch (ServiceException e) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e); } @@ -116,9 +132,11 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol { @Override public GetClusterMetricsResponse getClusterMetrics( GetClusterMetricsRequest request) throws YarnRemoteException { - GetClusterMetricsRequestProto requestProto = ((GetClusterMetricsRequestPBImpl)request).getProto(); + GetClusterMetricsRequestProto requestProto = + ((GetClusterMetricsRequestPBImpl) request).getProto(); try { - return new GetClusterMetricsResponsePBImpl(proxy.getClusterMetrics(null, requestProto)); + return new GetClusterMetricsResponsePBImpl(proxy.getClusterMetrics(null, + requestProto)); } catch (ServiceException e) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e); } @@ -127,9 +145,11 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol { @Override public GetNewApplicationResponse getNewApplication( GetNewApplicationRequest request) throws YarnRemoteException { - GetNewApplicationRequestProto requestProto = ((GetNewApplicationRequestPBImpl)request).getProto(); + GetNewApplicationRequestProto requestProto = + ((GetNewApplicationRequestPBImpl) request).getProto(); try { - return new GetNewApplicationResponsePBImpl(proxy.getNewApplication(null, requestProto)); + return new GetNewApplicationResponsePBImpl(proxy.getNewApplication(null, + requestProto)); } catch (ServiceException e) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e); } @@ -138,9 +158,11 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol { @Override public SubmitApplicationResponse submitApplication( SubmitApplicationRequest request) throws YarnRemoteException { - SubmitApplicationRequestProto requestProto = ((SubmitApplicationRequestPBImpl)request).getProto(); + SubmitApplicationRequestProto requestProto = + ((SubmitApplicationRequestPBImpl) request).getProto(); try { - return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null, requestProto)); + return new SubmitApplicationResponsePBImpl(proxy.submitApplication(null, + requestProto)); } catch (ServiceException e) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e); } @@ -149,24 +171,25 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol { @Override public GetAllApplicationsResponse getAllApplications( GetAllApplicationsRequest request) throws YarnRemoteException { - GetAllApplicationsRequestProto requestProto = - ((GetAllApplicationsRequestPBImpl)request).getProto(); + GetAllApplicationsRequestProto requestProto = + ((GetAllApplicationsRequestPBImpl) request).getProto(); try { - return new GetAllApplicationsResponsePBImpl( - proxy.getAllApplications(null, requestProto)); + return new GetAllApplicationsResponsePBImpl(proxy.getAllApplications( + null, requestProto)); } catch (ServiceException e) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e); } } @Override - public GetClusterNodesResponse getClusterNodes( - GetClusterNodesRequest request) throws YarnRemoteException { + public GetClusterNodesResponse + getClusterNodes(GetClusterNodesRequest request) + throws YarnRemoteException { GetClusterNodesRequestProto requestProto = - ((GetClusterNodesRequestPBImpl)request).getProto(); + ((GetClusterNodesRequestPBImpl) request).getProto(); try { - return new GetClusterNodesResponsePBImpl( - proxy.getClusterNodes(null, requestProto)); + return new GetClusterNodesResponsePBImpl(proxy.getClusterNodes(null, + requestProto)); } catch (ServiceException e) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e); } @@ -176,10 +199,10 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol { public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request) throws YarnRemoteException { GetQueueInfoRequestProto requestProto = - ((GetQueueInfoRequestPBImpl)request).getProto(); + ((GetQueueInfoRequestPBImpl) request).getProto(); try { - return new GetQueueInfoResponsePBImpl( - proxy.getQueueInfo(null, requestProto)); + return new GetQueueInfoResponsePBImpl(proxy.getQueueInfo(null, + requestProto)); } catch (ServiceException e) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e); } @@ -189,10 +212,10 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol { public GetQueueUserAclsInfoResponse getQueueUserAcls( GetQueueUserAclsInfoRequest request) throws YarnRemoteException { GetQueueUserAclsInfoRequestProto requestProto = - ((GetQueueUserAclsInfoRequestPBImpl)request).getProto(); + ((GetQueueUserAclsInfoRequestPBImpl) request).getProto(); try { - return new GetQueueUserAclsInfoResponsePBImpl( - proxy.getQueueUserAcls(null, requestProto)); + return new GetQueueUserAclsInfoResponsePBImpl(proxy.getQueueUserAcls( + null, requestProto)); } catch (ServiceException e) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e); } @@ -202,12 +225,12 @@ public class ClientRMProtocolPBClientImpl implements ClientRMProtocol { public GetDelegationTokenResponse getDelegationToken( GetDelegationTokenRequest request) throws YarnRemoteException { GetDelegationTokenRequestProto requestProto = - ((GetDelegationTokenRequestPBImpl)request).getProto(); - try { - return new GetDelegationTokenResponsePBImpl( - proxy.getDelegationToken(null, requestProto)); - } catch (ServiceException e) { - throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e); - } + ((GetDelegationTokenRequestPBImpl) request).getProto(); + try { + return new GetDelegationTokenResponsePBImpl(proxy.getDelegationToken( + null, requestProto)); + } catch (ServiceException e) { + throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java index 0a96b368fd6..cff287a90a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/impl/pb/client/ContainerManagerPBClientImpl.java @@ -18,9 +18,9 @@ package org.apache.hadoop.yarn.api.impl.pb.client; +import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; -import java.io.Closeable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.ProtobufRpcEngine; @@ -58,22 +58,26 @@ public class ContainerManagerPBClientImpl implements ContainerManager, + "rpc.nm-command-timeout"; /** - * Maximum of 1 minute timeout for a Node to react to the command + * Maximum of 1 minute timeout for a Node to react to the command */ static final int DEFAULT_COMMAND_TIMEOUT = 60000; private ContainerManagerPB proxy; - - public ContainerManagerPBClientImpl(long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException { - RPC.setProtocolEngine(conf, ContainerManagerPB.class, ProtobufRpcEngine.class); + + public ContainerManagerPBClientImpl(long clientVersion, + InetSocketAddress addr, Configuration conf) throws IOException { + RPC.setProtocolEngine(conf, ContainerManagerPB.class, + ProtobufRpcEngine.class); UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); int expireIntvl = conf.getInt(NM_COMMAND_TIMEOUT, DEFAULT_COMMAND_TIMEOUT); - proxy = (ContainerManagerPB)RPC.getProxy( - ContainerManagerPB.class, clientVersion, addr, ugi, conf, - NetUtils.getDefaultSocketFactory(conf), expireIntvl); + proxy = + (ContainerManagerPB) RPC.getProxy(ContainerManagerPB.class, + clientVersion, addr, ugi, conf, + NetUtils.getDefaultSocketFactory(conf), expireIntvl); } + @Override public void close() { if (this.proxy != null) { RPC.stopProxy(this.proxy); @@ -83,9 +87,11 @@ public class ContainerManagerPBClientImpl implements ContainerManager, @Override public GetContainerStatusResponse getContainerStatus( GetContainerStatusRequest request) throws YarnRemoteException { - GetContainerStatusRequestProto requestProto = ((GetContainerStatusRequestPBImpl)request).getProto(); + GetContainerStatusRequestProto requestProto = + ((GetContainerStatusRequestPBImpl) request).getProto(); try { - return new GetContainerStatusResponsePBImpl(proxy.getContainerStatus(null, requestProto)); + return new GetContainerStatusResponsePBImpl(proxy.getContainerStatus( + null, requestProto)); } catch (ServiceException e) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e); } @@ -94,9 +100,11 @@ public class ContainerManagerPBClientImpl implements ContainerManager, @Override public StartContainerResponse startContainer(StartContainerRequest request) throws YarnRemoteException { - StartContainerRequestProto requestProto = ((StartContainerRequestPBImpl)request).getProto(); + StartContainerRequestProto requestProto = + ((StartContainerRequestPBImpl) request).getProto(); try { - return new StartContainerResponsePBImpl(proxy.startContainer(null, requestProto)); + return new StartContainerResponsePBImpl(proxy.startContainer(null, + requestProto)); } catch (ServiceException e) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e); } @@ -105,11 +113,11 @@ public class ContainerManagerPBClientImpl implements ContainerManager, @Override public StopContainerResponse stopContainer(StopContainerRequest request) throws YarnRemoteException { - StopContainerRequestProto requestProto = ((StopContainerRequestPBImpl) request) - .getProto(); + StopContainerRequestProto requestProto = + ((StopContainerRequestPBImpl) request).getProto(); try { return new StopContainerResponsePBImpl(proxy.stopContainer(null, - requestProto)); + requestProto)); } catch (ServiceException e) { throw YarnRemoteExceptionPBImpl.unwrapAndThrowException(e); }