YARN-630. Changed AMRMProtocol api to throw IOException and YarnRemoteException. Contributed by Xuan Gong.

MAPREDUCE-5226. Handling YarnRemoteException separately from IOException in MR App's use of AMRMProtocol after YARN-630. Contributed by Xuan Gong.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1480529 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-05-09 05:32:04 +00:00
parent 0727ecaf50
commit 9c4f86879c
16 changed files with 75 additions and 29 deletions

View File

@ -390,7 +390,10 @@ Release 2.0.5-beta - UNRELEASED
(Tsuyoshi OZAWA via cdouglas) (Tsuyoshi OZAWA via cdouglas)
MAPREDUCE-5212. Handling YarnRemoteException separately from IOException in MAPREDUCE-5212. Handling YarnRemoteException separately from IOException in
MR App after YARN-631. (Xuan Gong via vinodkv) MR App's use of ClientRMProtocol after YARN-631. (Xuan Gong via vinodkv)
MAPREDUCE-5226. Handling YarnRemoteException separately from IOException in
MR App's use of AMRMProtocol after YARN-630. (Xuan Gong via vinodkv)
Release 2.0.4-alpha - 2013-04-25 Release 2.0.4-alpha - 2013-04-25

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app.rm; package org.apache.hadoop.mapreduce.v2.app.rm;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -144,7 +145,8 @@ public abstract class RMContainerRequestor extends RMCommunicator {
LOG.info("blacklistDisablePercent is " + blacklistDisablePercent); LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
} }
protected AllocateResponse makeRemoteRequest() throws YarnRemoteException { protected AllocateResponse makeRemoteRequest() throws YarnRemoteException,
IOException {
AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest( AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
applicationAttemptId, lastResponseID, super.getApplicationProgress(), applicationAttemptId, lastResponseID, super.getApplicationProgress(),
new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>( new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app; package org.apache.hadoop.mapreduce.v2.app;
import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
@ -202,7 +203,7 @@ public class MRAppBenchmark {
public RegisterApplicationMasterResponse public RegisterApplicationMasterResponse
registerApplicationMaster( registerApplicationMaster(
RegisterApplicationMasterRequest request) RegisterApplicationMasterRequest request)
throws YarnRemoteException { throws YarnRemoteException, IOException {
RegisterApplicationMasterResponse response = RegisterApplicationMasterResponse response =
Records.newRecord(RegisterApplicationMasterResponse.class); Records.newRecord(RegisterApplicationMasterResponse.class);
response.setMinimumResourceCapability(BuilderUtils response.setMinimumResourceCapability(BuilderUtils
@ -215,7 +216,7 @@ public class MRAppBenchmark {
@Override @Override
public FinishApplicationMasterResponse finishApplicationMaster( public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) FinishApplicationMasterRequest request)
throws YarnRemoteException { throws YarnRemoteException, IOException {
FinishApplicationMasterResponse response = FinishApplicationMasterResponse response =
Records.newRecord(FinishApplicationMasterResponse.class); Records.newRecord(FinishApplicationMasterResponse.class);
return response; return response;
@ -223,7 +224,7 @@ public class MRAppBenchmark {
@Override @Override
public AllocateResponse allocate(AllocateRequest request) public AllocateResponse allocate(AllocateRequest request)
throws YarnRemoteException { throws YarnRemoteException, IOException {
AllocateResponse response = AllocateResponse response =
Records.newRecord(AllocateResponse.class); Records.newRecord(AllocateResponse.class);

View File

@ -100,6 +100,7 @@ public class TestLocalContainerAllocator {
when(scheduler.allocate(isA(AllocateRequest.class))) when(scheduler.allocate(isA(AllocateRequest.class)))
.thenThrow(RPCUtil.getRemoteException(new IOException("forcefail"))); .thenThrow(RPCUtil.getRemoteException(new IOException("forcefail")));
} catch (YarnRemoteException e) { } catch (YarnRemoteException e) {
} catch (IOException e) {
} }
return scheduler; return scheduler;
} }

View File

@ -121,6 +121,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-631. Changed ClientRMProtocol api to throw IOException and YARN-631. Changed ClientRMProtocol api to throw IOException and
YarnRemoteException. (Xuan Gong via vinodkv) YarnRemoteException. (Xuan Gong via vinodkv)
YARN-630. Changed AMRMProtocol api to throw IOException and
YarnRemoteException. (Xuan Gong via vinodkv)
NEW FEATURES NEW FEATURES
YARN-482. FS: Extend SchedulingMode to intermediate queues. YARN-482. FS: Extend SchedulingMode to intermediate queues.

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.api; package org.apache.hadoop.yarn.api;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable; import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@ -57,10 +59,11 @@ public interface AMRMProtocol {
* @param request registration request * @param request registration request
* @return registration respose * @return registration respose
* @throws YarnRemoteException * @throws YarnRemoteException
* @throws IOException
*/ */
public RegisterApplicationMasterResponse registerApplicationMaster( public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) RegisterApplicationMasterRequest request)
throws YarnRemoteException; throws YarnRemoteException, IOException;
/** /**
* <p>The interface used by an <code>ApplicationMaster</code> to notify the * <p>The interface used by an <code>ApplicationMaster</code> to notify the
@ -76,10 +79,11 @@ public interface AMRMProtocol {
* @param request completion request * @param request completion request
* @return completion response * @return completion response
* @throws YarnRemoteException * @throws YarnRemoteException
* @throws IOException
*/ */
public FinishApplicationMasterResponse finishApplicationMaster( public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) FinishApplicationMasterRequest request)
throws YarnRemoteException; throws YarnRemoteException, IOException;
/** /**
* <p>The main interface between an <code>ApplicationMaster</code> * <p>The main interface between an <code>ApplicationMaster</code>
@ -105,7 +109,8 @@ public interface AMRMProtocol {
* @param request allocation request * @param request allocation request
* @return allocation response * @return allocation response
* @throws YarnRemoteException * @throws YarnRemoteException
* @throws IOException
*/ */
public AllocateResponse allocate(AllocateRequest request) public AllocateResponse allocate(AllocateRequest request)
throws YarnRemoteException; throws YarnRemoteException, IOException;
} }

View File

@ -433,8 +433,9 @@ public class ApplicationMaster {
* Main run function for the application master * Main run function for the application master
* *
* @throws YarnRemoteException * @throws YarnRemoteException
* @throws IOException
*/ */
public boolean run() throws YarnRemoteException { public boolean run() throws YarnRemoteException, IOException {
LOG.info("Starting ApplicationMaster"); LOG.info("Starting ApplicationMaster");
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
@ -533,6 +534,8 @@ public class ApplicationMaster {
resourceManager.unregisterApplicationMaster(appStatus, appMessage, null); resourceManager.unregisterApplicationMaster(appStatus, appMessage, null);
} catch (YarnRemoteException ex) { } catch (YarnRemoteException ex) {
LOG.error("Failed to unregister application", ex); LOG.error("Failed to unregister application", ex);
} catch (IOException e) {
LOG.error("Failed to unregister application", e);
} }
done = true; done = true;

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.client; package org.apache.hadoop.yarn.client;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
@ -72,12 +74,13 @@ public interface AMRMClient extends Service {
* @param appTrackingUrl URL at which the master info can be seen * @param appTrackingUrl URL at which the master info can be seen
* @return <code>RegisterApplicationMasterResponse</code> * @return <code>RegisterApplicationMasterResponse</code>
* @throws YarnRemoteException * @throws YarnRemoteException
* @throws IOException
*/ */
public RegisterApplicationMasterResponse public RegisterApplicationMasterResponse
registerApplicationMaster(String appHostName, registerApplicationMaster(String appHostName,
int appHostPort, int appHostPort,
String appTrackingUrl) String appTrackingUrl)
throws YarnRemoteException; throws YarnRemoteException, IOException;
/** /**
* Request additional containers and receive new container allocations. * Request additional containers and receive new container allocations.
@ -92,9 +95,10 @@ public interface AMRMClient extends Service {
* @param progressIndicator Indicates progress made by the master * @param progressIndicator Indicates progress made by the master
* @return the response of the allocate request * @return the response of the allocate request
* @throws YarnRemoteException * @throws YarnRemoteException
* @throws IOException
*/ */
public AllocateResponse allocate(float progressIndicator) public AllocateResponse allocate(float progressIndicator)
throws YarnRemoteException; throws YarnRemoteException, IOException;
/** /**
* Unregister the application master. This must be called in the end. * Unregister the application master. This must be called in the end.
@ -102,11 +106,12 @@ public interface AMRMClient extends Service {
* @param appMessage Diagnostics message on failure * @param appMessage Diagnostics message on failure
* @param appTrackingUrl New URL to get master info * @param appTrackingUrl New URL to get master info
* @throws YarnRemoteException * @throws YarnRemoteException
* @throws IOException
*/ */
public void unregisterApplicationMaster(FinalApplicationStatus appStatus, public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
String appMessage, String appMessage,
String appTrackingUrl) String appTrackingUrl)
throws YarnRemoteException; throws YarnRemoteException, IOException;
/** /**
* Request containers for resources before calling <code>allocate</code> * Request containers for resources before calling <code>allocate</code>

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.client; package org.apache.hadoop.yarn.client;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
@ -173,10 +174,12 @@ public class AMRMClientAsync extends AbstractService {
/** /**
* Registers this application master with the resource manager. On successful * Registers this application master with the resource manager. On successful
* registration, starts the heartbeating thread. * registration, starts the heartbeating thread.
* @throws YarnRemoteException
* @throws IOException
*/ */
public RegisterApplicationMasterResponse registerApplicationMaster( public RegisterApplicationMasterResponse registerApplicationMaster(
String appHostName, int appHostPort, String appTrackingUrl) String appHostName, int appHostPort, String appTrackingUrl)
throws YarnRemoteException { throws YarnRemoteException, IOException {
RegisterApplicationMasterResponse response = RegisterApplicationMasterResponse response =
client.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl); client.registerApplicationMaster(appHostName, appHostPort, appTrackingUrl);
heartbeatThread.start(); heartbeatThread.start();
@ -189,9 +192,10 @@ public class AMRMClientAsync extends AbstractService {
* @param appMessage Diagnostics message on failure * @param appMessage Diagnostics message on failure
* @param appTrackingUrl New URL to get master info * @param appTrackingUrl New URL to get master info
* @throws YarnRemoteException * @throws YarnRemoteException
* @throws IOException
*/ */
public void unregisterApplicationMaster(FinalApplicationStatus appStatus, public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
String appMessage, String appTrackingUrl) throws YarnRemoteException { String appMessage, String appTrackingUrl) throws YarnRemoteException, IOException {
synchronized (client) { synchronized (client) {
keepRunning = false; keepRunning = false;
client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl); client.unregisterApplicationMaster(appStatus, appMessage, appTrackingUrl);
@ -264,6 +268,8 @@ public class AMRMClientAsync extends AbstractService {
response = client.allocate(progress); response = client.allocate(progress);
} catch (YarnRemoteException ex) { } catch (YarnRemoteException ex) {
LOG.error("Failed to heartbeat", ex); LOG.error("Failed to heartbeat", ex);
} catch (IOException e) {
LOG.error("Failed to heartbeat", e);
} }
} }
if (response != null) { if (response != null) {

View File

@ -134,7 +134,7 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient {
@Override @Override
public RegisterApplicationMasterResponse registerApplicationMaster( public RegisterApplicationMasterResponse registerApplicationMaster(
String appHostName, int appHostPort, String appTrackingUrl) String appHostName, int appHostPort, String appTrackingUrl)
throws YarnRemoteException { throws YarnRemoteException, IOException {
// do this only once ??? // do this only once ???
RegisterApplicationMasterRequest request = recordFactory RegisterApplicationMasterRequest request = recordFactory
.newRecordInstance(RegisterApplicationMasterRequest.class); .newRecordInstance(RegisterApplicationMasterRequest.class);
@ -153,7 +153,7 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient {
@Override @Override
public AllocateResponse allocate(float progressIndicator) public AllocateResponse allocate(float progressIndicator)
throws YarnRemoteException { throws YarnRemoteException, IOException {
AllocateResponse allocateResponse = null; AllocateResponse allocateResponse = null;
ArrayList<ResourceRequest> askList = null; ArrayList<ResourceRequest> askList = null;
ArrayList<ContainerId> releaseList = null; ArrayList<ContainerId> releaseList = null;
@ -207,7 +207,8 @@ public class AMRMClientImpl extends AbstractService implements AMRMClient {
@Override @Override
public void unregisterApplicationMaster(FinalApplicationStatus appStatus, public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
String appMessage, String appTrackingUrl) throws YarnRemoteException { String appMessage, String appTrackingUrl) throws YarnRemoteException,
IOException {
FinishApplicationMasterRequest request = recordFactory FinishApplicationMasterRequest request = recordFactory
.newRecordInstance(FinishApplicationMasterRequest.class); .newRecordInstance(FinishApplicationMasterRequest.class);
request.setAppAttemptId(appAttemptId); request.setAppAttemptId(appAttemptId);

View File

@ -135,7 +135,7 @@ public class TestAMRMClient {
} }
@Test (timeout=60000) @Test (timeout=60000)
public void testAMRMClient() throws YarnRemoteException { public void testAMRMClient() throws YarnRemoteException, IOException {
AMRMClientImpl amClient = null; AMRMClientImpl amClient = null;
try { try {
// start am rm client // start am rm client
@ -159,7 +159,7 @@ public class TestAMRMClient {
private void testAllocation(final AMRMClientImpl amClient) private void testAllocation(final AMRMClientImpl amClient)
throws YarnRemoteException { throws YarnRemoteException, IOException {
// setup container request // setup container request
final Resource capability = Records.newRecord(Resource.class); final Resource capability = Records.newRecord(Resource.class);
final Priority priority = Records.newRecord(Priority.class); final Priority priority = Records.newRecord(Priority.class);

View File

@ -68,7 +68,7 @@ public class AMRMProtocolPBClientImpl implements AMRMProtocol, Closeable {
@Override @Override
public AllocateResponse allocate(AllocateRequest request) public AllocateResponse allocate(AllocateRequest request)
throws YarnRemoteException { throws YarnRemoteException, IOException {
AllocateRequestProto requestProto = AllocateRequestProto requestProto =
((AllocateRequestPBImpl) request).getProto(); ((AllocateRequestPBImpl) request).getProto();
try { try {
@ -80,7 +80,8 @@ public class AMRMProtocolPBClientImpl implements AMRMProtocol, Closeable {
@Override @Override
public FinishApplicationMasterResponse finishApplicationMaster( public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) throws YarnRemoteException { FinishApplicationMasterRequest request) throws YarnRemoteException,
IOException {
FinishApplicationMasterRequestProto requestProto = FinishApplicationMasterRequestProto requestProto =
((FinishApplicationMasterRequestPBImpl) request).getProto(); ((FinishApplicationMasterRequestPBImpl) request).getProto();
try { try {
@ -93,7 +94,8 @@ public class AMRMProtocolPBClientImpl implements AMRMProtocol, Closeable {
@Override @Override
public RegisterApplicationMasterResponse registerApplicationMaster( public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws YarnRemoteException { RegisterApplicationMasterRequest request) throws YarnRemoteException,
IOException {
RegisterApplicationMasterRequestProto requestProto = RegisterApplicationMasterRequestProto requestProto =
((RegisterApplicationMasterRequestPBImpl) request).getProto(); ((RegisterApplicationMasterRequestPBImpl) request).getProto();
try { try {

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.api.impl.pb.service; package org.apache.hadoop.yarn.api.impl.pb.service;
import java.io.IOException;
import org.apache.hadoop.yarn.api.AMRMProtocol; import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.AMRMProtocolPB; import org.apache.hadoop.yarn.api.AMRMProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
@ -57,6 +59,8 @@ public class AMRMProtocolPBServiceImpl implements AMRMProtocolPB {
return ((AllocateResponsePBImpl)response).getProto(); return ((AllocateResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) { } catch (YarnRemoteException e) {
throw new ServiceException(e); throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
} }
} }
@ -70,6 +74,8 @@ public class AMRMProtocolPBServiceImpl implements AMRMProtocolPB {
return ((FinishApplicationMasterResponsePBImpl)response).getProto(); return ((FinishApplicationMasterResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) { } catch (YarnRemoteException e) {
throw new ServiceException(e); throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
} }
} }
@ -83,6 +89,8 @@ public class AMRMProtocolPBServiceImpl implements AMRMProtocolPB {
return ((RegisterApplicationMasterResponsePBImpl)response).getProto(); return ((RegisterApplicationMasterResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) { } catch (YarnRemoteException e) {
throw new ServiceException(e); throw new ServiceException(e);
} catch (IOException e) {
throw new ServiceException(e);
} }
} }
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn; package org.apache.hadoop.yarn;
import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import junit.framework.Assert; import junit.framework.Assert;
@ -107,21 +108,23 @@ public class TestRPCFactories {
@Override @Override
public RegisterApplicationMasterResponse registerApplicationMaster( public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws YarnRemoteException { RegisterApplicationMasterRequest request) throws YarnRemoteException,
IOException {
// TODO Auto-generated method stub // TODO Auto-generated method stub
return null; return null;
} }
@Override @Override
public FinishApplicationMasterResponse finishApplicationMaster( public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) throws YarnRemoteException { FinishApplicationMasterRequest request) throws YarnRemoteException,
IOException {
// TODO Auto-generated method stub // TODO Auto-generated method stub
return null; return null;
} }
@Override @Override
public AllocateResponse allocate(AllocateRequest request) public AllocateResponse allocate(AllocateRequest request)
throws YarnRemoteException { throws YarnRemoteException, IOException {
// TODO Auto-generated method stub // TODO Auto-generated method stub
return null; return null;
} }

View File

@ -162,7 +162,8 @@ public class ApplicationMasterService extends AbstractService implements
@Override @Override
public RegisterApplicationMasterResponse registerApplicationMaster( public RegisterApplicationMasterResponse registerApplicationMaster(
RegisterApplicationMasterRequest request) throws YarnRemoteException { RegisterApplicationMasterRequest request) throws YarnRemoteException,
IOException {
ApplicationAttemptId applicationAttemptId = request ApplicationAttemptId applicationAttemptId = request
.getApplicationAttemptId(); .getApplicationAttemptId();
@ -211,7 +212,8 @@ public class ApplicationMasterService extends AbstractService implements
@Override @Override
public FinishApplicationMasterResponse finishApplicationMaster( public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request) throws YarnRemoteException { FinishApplicationMasterRequest request) throws YarnRemoteException,
IOException {
ApplicationAttemptId applicationAttemptId = request ApplicationAttemptId applicationAttemptId = request
.getApplicationAttemptId(); .getApplicationAttemptId();
@ -243,7 +245,7 @@ public class ApplicationMasterService extends AbstractService implements
@Override @Override
public AllocateResponse allocate(AllocateRequest request) public AllocateResponse allocate(AllocateRequest request)
throws YarnRemoteException { throws YarnRemoteException, IOException {
ApplicationAttemptId appAttemptId = request.getApplicationAttemptId(); ApplicationAttemptId appAttemptId = request.getApplicationAttemptId();
authorizeRequest(appAttemptId); authorizeRequest(appAttemptId);

View File

@ -485,7 +485,8 @@ public class TestContainerManagerSecurity {
} }
private Container requestAndGetContainer(AMRMProtocol scheduler, private Container requestAndGetContainer(AMRMProtocol scheduler,
ApplicationId appID) throws YarnRemoteException, InterruptedException { ApplicationId appID) throws YarnRemoteException, InterruptedException,
IOException {
// Request a container allocation. // Request a container allocation.
List<ResourceRequest> ask = new ArrayList<ResourceRequest>(); List<ResourceRequest> ask = new ArrayList<ResourceRequest>();