MAPREDUCE-4067. Changed MRClientProtocol api to throw IOException only (Xuan Gong via vinodkv)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1481695 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-05-13 03:34:25 +00:00
parent 47d1ca402f
commit 7359dc32d3
21 changed files with 342 additions and 324 deletions

View File

@ -166,6 +166,9 @@ Release 2.0.5-beta - UNRELEASED
INCOMPATIBLE CHANGES
MAPREDUCE-4067. Changed MRClientProtocol api to throw IOException only (Xuan
Gong via vinodkv)
NEW FEATURES
IMPROVEMENTS

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
@ -81,10 +82,8 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
import org.apache.hadoop.yarn.service.AbstractService;
@ -188,34 +187,34 @@ public class MRClientService extends AbstractService
}
private Job verifyAndGetJob(JobId jobID,
boolean modifyAccess) throws YarnRemoteException {
boolean modifyAccess) throws IOException {
Job job = appContext.getJob(jobID);
return job;
}
private Task verifyAndGetTask(TaskId taskID,
boolean modifyAccess) throws YarnRemoteException {
boolean modifyAccess) throws IOException {
Task task = verifyAndGetJob(taskID.getJobId(),
modifyAccess).getTask(taskID);
if (task == null) {
throw RPCUtil.getRemoteException("Unknown Task " + taskID);
throw new IOException("Unknown Task " + taskID);
}
return task;
}
private TaskAttempt verifyAndGetAttempt(TaskAttemptId attemptID,
boolean modifyAccess) throws YarnRemoteException {
boolean modifyAccess) throws IOException {
TaskAttempt attempt = verifyAndGetTask(attemptID.getTaskId(),
modifyAccess).getAttempt(attemptID);
if (attempt == null) {
throw RPCUtil.getRemoteException("Unknown TaskAttempt " + attemptID);
throw new IOException("Unknown TaskAttempt " + attemptID);
}
return attempt;
}
@Override
public GetCountersResponse getCounters(GetCountersRequest request)
throws YarnRemoteException {
throws IOException {
JobId jobId = request.getJobId();
Job job = verifyAndGetJob(jobId, false);
GetCountersResponse response =
@ -226,7 +225,7 @@ public class MRClientService extends AbstractService
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws YarnRemoteException {
throws IOException {
JobId jobId = request.getJobId();
Job job = verifyAndGetJob(jobId, false);
GetJobReportResponse response =
@ -242,7 +241,7 @@ public class MRClientService extends AbstractService
@Override
public GetTaskAttemptReportResponse getTaskAttemptReport(
GetTaskAttemptReportRequest request) throws YarnRemoteException {
GetTaskAttemptReportRequest request) throws IOException {
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
GetTaskAttemptReportResponse response =
recordFactory.newRecordInstance(GetTaskAttemptReportResponse.class);
@ -253,7 +252,7 @@ public class MRClientService extends AbstractService
@Override
public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
throws YarnRemoteException {
throws IOException {
TaskId taskId = request.getTaskId();
GetTaskReportResponse response =
recordFactory.newRecordInstance(GetTaskReportResponse.class);
@ -264,7 +263,7 @@ public class MRClientService extends AbstractService
@Override
public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
GetTaskAttemptCompletionEventsRequest request)
throws YarnRemoteException {
throws IOException {
JobId jobId = request.getJobId();
int fromEventId = request.getFromEventId();
int maxEvents = request.getMaxEvents();
@ -280,7 +279,7 @@ public class MRClientService extends AbstractService
@SuppressWarnings("unchecked")
@Override
public KillJobResponse killJob(KillJobRequest request)
throws YarnRemoteException {
throws IOException {
JobId jobId = request.getJobId();
String message = "Kill Job received from client " + jobId;
LOG.info(message);
@ -297,7 +296,7 @@ public class MRClientService extends AbstractService
@SuppressWarnings("unchecked")
@Override
public KillTaskResponse killTask(KillTaskRequest request)
throws YarnRemoteException {
throws IOException {
TaskId taskId = request.getTaskId();
String message = "Kill task received from client " + taskId;
LOG.info(message);
@ -312,7 +311,7 @@ public class MRClientService extends AbstractService
@SuppressWarnings("unchecked")
@Override
public KillTaskAttemptResponse killTaskAttempt(
KillTaskAttemptRequest request) throws YarnRemoteException {
KillTaskAttemptRequest request) throws IOException {
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
String message = "Kill task attempt received from client " + taskAttemptId;
LOG.info(message);
@ -329,7 +328,7 @@ public class MRClientService extends AbstractService
@Override
public GetDiagnosticsResponse getDiagnostics(
GetDiagnosticsRequest request) throws YarnRemoteException {
GetDiagnosticsRequest request) throws IOException {
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
GetDiagnosticsResponse response =
@ -342,7 +341,7 @@ public class MRClientService extends AbstractService
@SuppressWarnings("unchecked")
@Override
public FailTaskAttemptResponse failTaskAttempt(
FailTaskAttemptRequest request) throws YarnRemoteException {
FailTaskAttemptRequest request) throws IOException {
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
String message = "Fail task attempt received from client " + taskAttemptId;
LOG.info(message);
@ -361,7 +360,7 @@ public class MRClientService extends AbstractService
@Override
public GetTaskReportsResponse getTaskReports(
GetTaskReportsRequest request) throws YarnRemoteException {
GetTaskReportsRequest request) throws IOException {
JobId jobId = request.getJobId();
TaskType taskType = request.getTaskType();
@ -386,22 +385,22 @@ public class MRClientService extends AbstractService
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnRemoteException {
throw RPCUtil.getRemoteException("MR AM not authorized to issue delegation" +
GetDelegationTokenRequest request) throws IOException {
throw new IOException("MR AM not authorized to issue delegation" +
" token");
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnRemoteException {
throw RPCUtil.getRemoteException("MR AM not authorized to renew delegation" +
RenewDelegationTokenRequest request) throws IOException {
throw new IOException("MR AM not authorized to renew delegation" +
" token");
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnRemoteException {
throw RPCUtil.getRemoteException("MR AM not authorized to cancel delegation" +
CancelDelegationTokenRequest request) throws IOException {
throw new IOException("MR AM not authorized to cancel delegation" +
" token");
}
}

View File

@ -145,13 +145,17 @@ public abstract class RMContainerRequestor extends RMCommunicator {
LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
}
protected AllocateResponse makeRemoteRequest() throws YarnRemoteException,
IOException {
protected AllocateResponse makeRemoteRequest() throws IOException {
AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
applicationAttemptId, lastResponseID, super.getApplicationProgress(),
new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(
release));
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
AllocateResponse allocateResponse;
try {
allocateResponse = scheduler.allocate(allocateRequest);
} catch (YarnRemoteException e) {
throw new IOException(e);
}
lastResponseID = allocateResponse.getResponseId();
availableResources = allocateResponse.getAvailableResources();
lastClusterNmCount = clusterNmCount;

View File

@ -45,7 +45,6 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService;
@ -203,7 +202,7 @@ public class MRAppBenchmark {
public RegisterApplicationMasterResponse
registerApplicationMaster(
RegisterApplicationMasterRequest request)
throws YarnRemoteException, IOException {
throws IOException {
RegisterApplicationMasterResponse response =
Records.newRecord(RegisterApplicationMasterResponse.class);
response.setMinimumResourceCapability(BuilderUtils
@ -216,7 +215,7 @@ public class MRAppBenchmark {
@Override
public FinishApplicationMasterResponse finishApplicationMaster(
FinishApplicationMasterRequest request)
throws YarnRemoteException, IOException {
throws IOException {
FinishApplicationMasterResponse response =
Records.newRecord(FinishApplicationMasterResponse.class);
return response;
@ -224,7 +223,7 @@ public class MRAppBenchmark {
@Override
public AllocateResponse allocate(AllocateRequest request)
throws YarnRemoteException, IOException {
throws IOException {
AllocateResponse response =
Records.newRecord(AllocateResponse.class);

View File

@ -63,7 +63,6 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
@ -366,7 +365,7 @@ public class TestContainerLauncher {
@Override
public GetContainerStatusResponse getContainerStatus(
GetContainerStatusRequest request) throws YarnRemoteException {
GetContainerStatusRequest request) throws IOException {
GetContainerStatusResponse response = recordFactory
.newRecordInstance(GetContainerStatusResponse.class);
response.setStatus(status);
@ -375,38 +374,38 @@ public class TestContainerLauncher {
@Override
public StartContainerResponse startContainer(StartContainerRequest request)
throws YarnRemoteException {
throws IOException {
// Validate that the container is what RM is giving.
Assert.assertEquals(MRApp.NM_HOST, request.getContainer().getNodeId()
.getHost());
.getHost());
Assert.assertEquals(MRApp.NM_PORT, request.getContainer().getNodeId()
.getPort());
.getPort());
Assert.assertEquals(MRApp.NM_HOST + ":" + MRApp.NM_HTTP_PORT, request
.getContainer().getNodeHttpAddress());
.getContainer().getNodeHttpAddress());
StartContainerResponse response = recordFactory
.newRecordInstance(StartContainerResponse.class);
status = recordFactory.newRecordInstance(ContainerStatus.class);
try {
try {
// make the thread sleep to look like its not going to respond
Thread.sleep(15000);
} catch (Exception e) {
LOG.error(e);
throw new UndeclaredThrowableException(e);
}
}
status.setState(ContainerState.RUNNING);
status.setContainerId(request.getContainer().getId());
status.setExitStatus(0);
return response;
}
}
@Override
public StopContainerResponse stopContainer(StopContainerRequest request)
throws YarnRemoteException {
throws IOException {
Exception e = new Exception("Dummy function", new Exception(
"Dummy function cause"));
throw new YarnRemoteException(e);
}
}
throw new IOException(e);
}
}
}

View File

@ -405,7 +405,7 @@ public class TestContainerLauncherImpl {
}
@Override
public StartContainerResponse startContainer(StartContainerRequest request)
throws YarnRemoteException {
throws IOException {
try {
startLaunchBarrier.await();
completeLaunchBarrier.await();
@ -417,20 +417,20 @@ public class TestContainerLauncherImpl {
e.printStackTrace();
}
throw new ContainerException("Force fail CM");
throw new IOException(new ContainerException("Force fail CM"));
}
@Override
public StopContainerResponse stopContainer(StopContainerRequest request)
throws YarnRemoteException {
throws IOException {
return null;
}
@Override
public GetContainerStatusResponse getContainerStatus(
GetContainerStatusRequest request) throws YarnRemoteException {
GetContainerStatusRequest request) throws IOException {
return null;
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2.api;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
@ -48,7 +49,6 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
public interface MRClientProtocol {
/**
@ -56,36 +56,36 @@ public interface MRClientProtocol {
* @return InetSocketAddress
*/
public InetSocketAddress getConnectAddress();
public GetJobReportResponse getJobReport(GetJobReportRequest request) throws YarnRemoteException;
public GetTaskReportResponse getTaskReport(GetTaskReportRequest request) throws YarnRemoteException;
public GetTaskAttemptReportResponse getTaskAttemptReport(GetTaskAttemptReportRequest request) throws YarnRemoteException;
public GetCountersResponse getCounters(GetCountersRequest request) throws YarnRemoteException;
public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(GetTaskAttemptCompletionEventsRequest request) throws YarnRemoteException;
public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request) throws YarnRemoteException;
public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request) throws YarnRemoteException;
public KillJobResponse killJob(KillJobRequest request) throws YarnRemoteException;
public KillTaskResponse killTask(KillTaskRequest request) throws YarnRemoteException;
public KillTaskAttemptResponse killTaskAttempt(KillTaskAttemptRequest request) throws YarnRemoteException;
public FailTaskAttemptResponse failTaskAttempt(FailTaskAttemptRequest request) throws YarnRemoteException;
public GetDelegationTokenResponse getDelegationToken(GetDelegationTokenRequest request) throws YarnRemoteException;
public GetJobReportResponse getJobReport(GetJobReportRequest request) throws IOException;
public GetTaskReportResponse getTaskReport(GetTaskReportRequest request) throws IOException;
public GetTaskAttemptReportResponse getTaskAttemptReport(GetTaskAttemptReportRequest request) throws IOException;
public GetCountersResponse getCounters(GetCountersRequest request) throws IOException;
public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(GetTaskAttemptCompletionEventsRequest request) throws IOException;
public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request) throws IOException;
public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request) throws IOException;
public KillJobResponse killJob(KillJobRequest request) throws IOException;
public KillTaskResponse killTask(KillTaskRequest request) throws IOException;
public KillTaskAttemptResponse killTaskAttempt(KillTaskAttemptRequest request) throws IOException;
public FailTaskAttemptResponse failTaskAttempt(FailTaskAttemptRequest request) throws IOException;
public GetDelegationTokenResponse getDelegationToken(GetDelegationTokenRequest request) throws IOException;
/**
* Renew an existing delegation token.
*
* @param request the delegation token to be renewed.
* @return the new expiry time for the delegation token.
* @throws YarnRemoteException
* @throws IOException
*/
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnRemoteException;
RenewDelegationTokenRequest request) throws IOException;
/**
* Cancel an existing delegation token.
*
* @param request the delegation token to be cancelled.
* @return an empty response.
* @throws YarnRemoteException
* @throws IOException
*/
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnRemoteException;
CancelDelegationTokenRequest request) throws IOException;
}

View File

@ -20,11 +20,13 @@ package org.apache.hadoop.mapreduce.v2.api.impl.pb.client;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
@ -97,9 +99,6 @@ import org.apache.hadoop.mapreduce.v2.proto.MRServiceProtos.KillTaskRequestProto
import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import com.google.protobuf.ServiceException;
public class MRClientProtocolPBClientImpl implements MRClientProtocol,
@ -128,154 +127,154 @@ public class MRClientProtocolPBClientImpl implements MRClientProtocol,
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws YarnRemoteException {
throws IOException {
GetJobReportRequestProto requestProto = ((GetJobReportRequestPBImpl)request).getProto();
try {
return new GetJobReportResponsePBImpl(proxy.getJobReport(null, requestProto));
} catch (ServiceException e) {
throw RPCUtil.unwrapAndThrowException(e);
throw unwrapAndThrowException(e);
}
}
@Override
public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
throws YarnRemoteException {
throws IOException {
GetTaskReportRequestProto requestProto = ((GetTaskReportRequestPBImpl)request).getProto();
try {
return new GetTaskReportResponsePBImpl(proxy.getTaskReport(null, requestProto));
} catch (ServiceException e) {
throw RPCUtil.unwrapAndThrowException(e);
throw unwrapAndThrowException(e);
}
}
@Override
public GetTaskAttemptReportResponse getTaskAttemptReport(
GetTaskAttemptReportRequest request) throws YarnRemoteException {
GetTaskAttemptReportRequest request) throws IOException {
GetTaskAttemptReportRequestProto requestProto = ((GetTaskAttemptReportRequestPBImpl)request).getProto();
try {
return new GetTaskAttemptReportResponsePBImpl(proxy.getTaskAttemptReport(null, requestProto));
} catch (ServiceException e) {
throw RPCUtil.unwrapAndThrowException(e);
throw unwrapAndThrowException(e);
}
}
@Override
public GetCountersResponse getCounters(GetCountersRequest request)
throws YarnRemoteException {
throws IOException {
GetCountersRequestProto requestProto = ((GetCountersRequestPBImpl)request).getProto();
try {
return new GetCountersResponsePBImpl(proxy.getCounters(null, requestProto));
} catch (ServiceException e) {
throw RPCUtil.unwrapAndThrowException(e);
throw unwrapAndThrowException(e);
}
}
@Override
public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
GetTaskAttemptCompletionEventsRequest request) throws YarnRemoteException {
GetTaskAttemptCompletionEventsRequest request) throws IOException {
GetTaskAttemptCompletionEventsRequestProto requestProto = ((GetTaskAttemptCompletionEventsRequestPBImpl)request).getProto();
try {
return new GetTaskAttemptCompletionEventsResponsePBImpl(proxy.getTaskAttemptCompletionEvents(null, requestProto));
} catch (ServiceException e) {
throw RPCUtil.unwrapAndThrowException(e);
throw unwrapAndThrowException(e);
}
}
@Override
public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request)
throws YarnRemoteException {
throws IOException {
GetTaskReportsRequestProto requestProto = ((GetTaskReportsRequestPBImpl)request).getProto();
try {
return new GetTaskReportsResponsePBImpl(proxy.getTaskReports(null, requestProto));
} catch (ServiceException e) {
throw RPCUtil.unwrapAndThrowException(e);
throw unwrapAndThrowException(e);
}
}
@Override
public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request)
throws YarnRemoteException {
throws IOException {
GetDiagnosticsRequestProto requestProto = ((GetDiagnosticsRequestPBImpl)request).getProto();
try {
return new GetDiagnosticsResponsePBImpl(proxy.getDiagnostics(null, requestProto));
} catch (ServiceException e) {
throw RPCUtil.unwrapAndThrowException(e);
throw unwrapAndThrowException(e);
}
}
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnRemoteException {
GetDelegationTokenRequest request) throws IOException {
GetDelegationTokenRequestProto requestProto = ((GetDelegationTokenRequestPBImpl)
request).getProto();
try {
return new GetDelegationTokenResponsePBImpl(proxy.getDelegationToken(
null, requestProto));
} catch (ServiceException e) {
throw RPCUtil.unwrapAndThrowException(e);
throw unwrapAndThrowException(e);
}
}
@Override
public KillJobResponse killJob(KillJobRequest request)
throws YarnRemoteException {
throws IOException {
KillJobRequestProto requestProto = ((KillJobRequestPBImpl)request).getProto();
try {
return new KillJobResponsePBImpl(proxy.killJob(null, requestProto));
} catch (ServiceException e) {
throw RPCUtil.unwrapAndThrowException(e);
throw unwrapAndThrowException(e);
}
}
@Override
public KillTaskResponse killTask(KillTaskRequest request)
throws YarnRemoteException {
throws IOException {
KillTaskRequestProto requestProto = ((KillTaskRequestPBImpl)request).getProto();
try {
return new KillTaskResponsePBImpl(proxy.killTask(null, requestProto));
} catch (ServiceException e) {
throw RPCUtil.unwrapAndThrowException(e);
throw unwrapAndThrowException(e);
}
}
@Override
public KillTaskAttemptResponse killTaskAttempt(KillTaskAttemptRequest request)
throws YarnRemoteException {
throws IOException {
KillTaskAttemptRequestProto requestProto = ((KillTaskAttemptRequestPBImpl)request).getProto();
try {
return new KillTaskAttemptResponsePBImpl(proxy.killTaskAttempt(null, requestProto));
} catch (ServiceException e) {
throw RPCUtil.unwrapAndThrowException(e);
throw unwrapAndThrowException(e);
}
}
@Override
public FailTaskAttemptResponse failTaskAttempt(FailTaskAttemptRequest request)
throws YarnRemoteException {
throws IOException {
FailTaskAttemptRequestProto requestProto = ((FailTaskAttemptRequestPBImpl)request).getProto();
try {
return new FailTaskAttemptResponsePBImpl(proxy.failTaskAttempt(null, requestProto));
} catch (ServiceException e) {
throw RPCUtil.unwrapAndThrowException(e);
throw unwrapAndThrowException(e);
}
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnRemoteException {
RenewDelegationTokenRequest request) throws IOException {
RenewDelegationTokenRequestProto requestProto =
((RenewDelegationTokenRequestPBImpl) request).getProto();
try {
return new RenewDelegationTokenResponsePBImpl(proxy.renewDelegationToken(
null, requestProto));
} catch (ServiceException e) {
throw RPCUtil.unwrapAndThrowException(e);
throw unwrapAndThrowException(e);
}
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnRemoteException {
CancelDelegationTokenRequest request) throws IOException {
CancelDelegationTokenRequestProto requestProto =
((CancelDelegationTokenRequestPBImpl) request).getProto();
try {
@ -283,7 +282,17 @@ public class MRClientProtocolPBClientImpl implements MRClientProtocol,
proxy.cancelDelegationToken(null, requestProto));
} catch (ServiceException e) {
throw RPCUtil.unwrapAndThrowException(e);
throw unwrapAndThrowException(e);
}
}
private IOException unwrapAndThrowException(ServiceException se) {
if (se.getCause() instanceof RemoteException) {
return ((RemoteException) se.getCause()).unwrapRemoteException();
} else if (se.getCause() instanceof IOException) {
return (IOException)se.getCause();
} else {
throw new UndeclaredThrowableException(se.getCause());
}
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.v2.api.impl.pb.service;
import java.io.IOException;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocolPB;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse;
@ -101,8 +103,6 @@ import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequest
import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenResponseProto;
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@ -121,7 +121,7 @@ public class MRClientProtocolPBServiceImpl implements MRClientProtocolPB {
try {
GetJobReportResponse response = real.getJobReport(request);
return ((GetJobReportResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
} catch (IOException e) {
throw new ServiceException(e);
}
}
@ -133,7 +133,7 @@ public class MRClientProtocolPBServiceImpl implements MRClientProtocolPB {
try {
GetTaskReportResponse response = real.getTaskReport(request);
return ((GetTaskReportResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
} catch (IOException e) {
throw new ServiceException(e);
}
}
@ -146,7 +146,7 @@ public class MRClientProtocolPBServiceImpl implements MRClientProtocolPB {
try {
GetTaskAttemptReportResponse response = real.getTaskAttemptReport(request);
return ((GetTaskAttemptReportResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
} catch (IOException e) {
throw new ServiceException(e);
}
}
@ -158,7 +158,7 @@ public class MRClientProtocolPBServiceImpl implements MRClientProtocolPB {
try {
GetCountersResponse response = real.getCounters(request);
return ((GetCountersResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
} catch (IOException e) {
throw new ServiceException(e);
}
}
@ -172,7 +172,7 @@ public class MRClientProtocolPBServiceImpl implements MRClientProtocolPB {
try {
GetTaskAttemptCompletionEventsResponse response = real.getTaskAttemptCompletionEvents(request);
return ((GetTaskAttemptCompletionEventsResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
} catch (IOException e) {
throw new ServiceException(e);
}
}
@ -184,7 +184,7 @@ public class MRClientProtocolPBServiceImpl implements MRClientProtocolPB {
try {
GetTaskReportsResponse response = real.getTaskReports(request);
return ((GetTaskReportsResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
} catch (IOException e) {
throw new ServiceException(e);
}
}
@ -196,7 +196,7 @@ public class MRClientProtocolPBServiceImpl implements MRClientProtocolPB {
try {
GetDiagnosticsResponse response = real.getDiagnostics(request);
return ((GetDiagnosticsResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
} catch (IOException e) {
throw new ServiceException(e);
}
}
@ -209,7 +209,7 @@ public class MRClientProtocolPBServiceImpl implements MRClientProtocolPB {
try {
GetDelegationTokenResponse response = real.getDelegationToken(request);
return ((GetDelegationTokenResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
} catch (IOException e) {
throw new ServiceException(e);
}
}
@ -221,7 +221,7 @@ public class MRClientProtocolPBServiceImpl implements MRClientProtocolPB {
try {
KillJobResponse response = real.killJob(request);
return ((KillJobResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
} catch (IOException e) {
throw new ServiceException(e);
}
}
@ -233,7 +233,7 @@ public class MRClientProtocolPBServiceImpl implements MRClientProtocolPB {
try {
KillTaskResponse response = real.killTask(request);
return ((KillTaskResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
} catch (IOException e) {
throw new ServiceException(e);
}
}
@ -245,7 +245,7 @@ public class MRClientProtocolPBServiceImpl implements MRClientProtocolPB {
try {
KillTaskAttemptResponse response = real.killTaskAttempt(request);
return ((KillTaskAttemptResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
} catch (IOException e) {
throw new ServiceException(e);
}
}
@ -257,7 +257,7 @@ public class MRClientProtocolPBServiceImpl implements MRClientProtocolPB {
try {
FailTaskAttemptResponse response = real.failTaskAttempt(request);
return ((FailTaskAttemptResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
} catch (IOException e) {
throw new ServiceException(e);
}
}
@ -271,7 +271,7 @@ public class MRClientProtocolPBServiceImpl implements MRClientProtocolPB {
try {
RenewDelegationTokenResponse response = real.renewDelegationToken(request);
return ((RenewDelegationTokenResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
} catch (IOException e) {
throw new ServiceException(e);
}
}
@ -285,7 +285,7 @@ public class MRClientProtocolPBServiceImpl implements MRClientProtocolPB {
try {
CancelDelegationTokenResponse response = real.cancelDelegationToken(request);
return ((CancelDelegationTokenResponsePBImpl)response).getProto();
} catch (YarnRemoteException e) {
} catch (IOException e) {
throw new ServiceException(e);
}
}

View File

@ -37,7 +37,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
@ -68,8 +67,6 @@ public class MRDelegationTokenRenewer extends TokenRenewer {
.newRecord(RenewDelegationTokenRequest.class);
request.setDelegationToken(dToken);
return histProxy.renewDelegationToken(request).getNextExpirationTime();
} catch (YarnRemoteException e) {
throw new IOException(e);
} finally {
stopHistoryProxy(histProxy);
}
@ -91,8 +88,6 @@ public class MRDelegationTokenRenewer extends TokenRenewer {
.newRecord(CancelDelegationTokenRequest.class);
request.setDelegationToken(dToken);
histProxy.cancelDelegationToken(request);
} catch (YarnRemoteException e) {
throw new IOException(e);
} finally {
stopHistoryProxy(histProxy);
}

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.mapreduce.v2;
import java.io.IOException;
import java.net.InetSocketAddress;
import junit.framework.Assert;
@ -56,7 +57,6 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRe
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.impl.pb.RpcClientFactoryPBImpl;
import org.apache.hadoop.yarn.factories.impl.pb.RpcServerFactoryPBImpl;
import org.junit.Test;
@ -133,86 +133,86 @@ public class TestRPCFactories {
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws YarnRemoteException {
throws IOException {
return null;
}
@Override
public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
throws YarnRemoteException {
throws IOException {
return null;
}
@Override
public GetTaskAttemptReportResponse getTaskAttemptReport(
GetTaskAttemptReportRequest request) throws YarnRemoteException {
GetTaskAttemptReportRequest request) throws IOException {
return null;
}
@Override
public GetCountersResponse getCounters(GetCountersRequest request)
throws YarnRemoteException {
throws IOException {
return null;
}
@Override
public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
GetTaskAttemptCompletionEventsRequest request)
throws YarnRemoteException {
throws IOException {
return null;
}
@Override
public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request)
throws YarnRemoteException {
throws IOException {
return null;
}
@Override
public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request)
throws YarnRemoteException {
throws IOException {
return null;
}
@Override
public KillJobResponse killJob(KillJobRequest request)
throws YarnRemoteException {
throws IOException {
return null;
}
@Override
public KillTaskResponse killTask(KillTaskRequest request)
throws YarnRemoteException {
throws IOException {
return null;
}
@Override
public KillTaskAttemptResponse killTaskAttempt(
KillTaskAttemptRequest request) throws YarnRemoteException {
KillTaskAttemptRequest request) throws IOException {
return null;
}
@Override
public FailTaskAttemptResponse failTaskAttempt(
FailTaskAttemptRequest request) throws YarnRemoteException {
FailTaskAttemptRequest request) throws IOException {
return null;
}
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnRemoteException {
GetDelegationTokenRequest request) throws IOException {
return null;
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnRemoteException {
RenewDelegationTokenRequest request) throws IOException {
return null;
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnRemoteException {
CancelDelegationTokenRequest request) throws IOException {
return null;
}
}

View File

@ -80,10 +80,8 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.BuilderUtils;
@ -187,7 +185,7 @@ public class HistoryClientService extends AbstractService {
return getBindAddress();
}
private Job verifyAndGetJob(final JobId jobID) throws YarnRemoteException {
private Job verifyAndGetJob(final JobId jobID) throws IOException {
UserGroupInformation loginUgi = null;
Job job = null;
try {
@ -200,10 +198,8 @@ public class HistoryClientService extends AbstractService {
return job;
}
});
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
} catch (InterruptedException e) {
throw RPCUtil.getRemoteException(e);
throw new IOException(e);
}
if (job != null) {
JobACL operation = JobACL.VIEW_JOB;
@ -213,7 +209,8 @@ public class HistoryClientService extends AbstractService {
}
@Override
public GetCountersResponse getCounters(GetCountersRequest request) throws YarnRemoteException {
public GetCountersResponse getCounters(GetCountersRequest request)
throws IOException {
JobId jobId = request.getJobId();
Job job = verifyAndGetJob(jobId);
GetCountersResponse response = recordFactory.newRecordInstance(GetCountersResponse.class);
@ -222,7 +219,8 @@ public class HistoryClientService extends AbstractService {
}
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request) throws YarnRemoteException {
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws IOException {
JobId jobId = request.getJobId();
Job job = verifyAndGetJob(jobId);
GetJobReportResponse response = recordFactory.newRecordInstance(GetJobReportResponse.class);
@ -236,7 +234,8 @@ public class HistoryClientService extends AbstractService {
}
@Override
public GetTaskAttemptReportResponse getTaskAttemptReport(GetTaskAttemptReportRequest request) throws YarnRemoteException {
public GetTaskAttemptReportResponse getTaskAttemptReport(
GetTaskAttemptReportRequest request) throws IOException {
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
Job job = verifyAndGetJob(taskAttemptId.getTaskId().getJobId());
GetTaskAttemptReportResponse response = recordFactory.newRecordInstance(GetTaskAttemptReportResponse.class);
@ -245,7 +244,8 @@ public class HistoryClientService extends AbstractService {
}
@Override
public GetTaskReportResponse getTaskReport(GetTaskReportRequest request) throws YarnRemoteException {
public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
throws IOException {
TaskId taskId = request.getTaskId();
Job job = verifyAndGetJob(taskId.getJobId());
GetTaskReportResponse response = recordFactory.newRecordInstance(GetTaskReportResponse.class);
@ -254,7 +254,9 @@ public class HistoryClientService extends AbstractService {
}
@Override
public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(GetTaskAttemptCompletionEventsRequest request) throws YarnRemoteException {
public GetTaskAttemptCompletionEventsResponse
getTaskAttemptCompletionEvents(
GetTaskAttemptCompletionEventsRequest request) throws IOException {
JobId jobId = request.getJobId();
int fromEventId = request.getFromEventId();
int maxEvents = request.getMaxEvents();
@ -266,22 +268,25 @@ public class HistoryClientService extends AbstractService {
}
@Override
public KillJobResponse killJob(KillJobRequest request) throws YarnRemoteException {
throw RPCUtil.getRemoteException("Invalid operation on completed job");
public KillJobResponse killJob(KillJobRequest request) throws IOException {
throw new IOException("Invalid operation on completed job");
}
@Override
public KillTaskResponse killTask(KillTaskRequest request) throws YarnRemoteException {
throw RPCUtil.getRemoteException("Invalid operation on completed job");
public KillTaskResponse killTask(KillTaskRequest request)
throws IOException {
throw new IOException("Invalid operation on completed job");
}
@Override
public KillTaskAttemptResponse killTaskAttempt(KillTaskAttemptRequest request) throws YarnRemoteException {
throw RPCUtil.getRemoteException("Invalid operation on completed job");
public KillTaskAttemptResponse killTaskAttempt(
KillTaskAttemptRequest request) throws IOException {
throw new IOException("Invalid operation on completed job");
}
@Override
public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request) throws YarnRemoteException {
public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request)
throws IOException {
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
Job job = verifyAndGetJob(taskAttemptId.getTaskId().getJobId());
@ -292,12 +297,14 @@ public class HistoryClientService extends AbstractService {
}
@Override
public FailTaskAttemptResponse failTaskAttempt(FailTaskAttemptRequest request) throws YarnRemoteException {
throw RPCUtil.getRemoteException("Invalid operation on completed job");
public FailTaskAttemptResponse failTaskAttempt(
FailTaskAttemptRequest request) throws IOException {
throw new IOException("Invalid operation on completed job");
}
@Override
public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request) throws YarnRemoteException {
public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request)
throws IOException {
JobId jobId = request.getJobId();
TaskType taskType = request.getTaskType();
@ -312,9 +319,7 @@ public class HistoryClientService extends AbstractService {
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnRemoteException {
try {
GetDelegationTokenRequest request) throws IOException {
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
@ -344,25 +349,22 @@ public class HistoryClientService extends AbstractService {
realJHSToken.getPassword(), realJHSToken.getService().toString());
response.setDelegationToken(mrDToken);
return response;
} catch (IOException i) {
throw RPCUtil.getRemoteException(i);
}
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnRemoteException {
try {
RenewDelegationTokenRequest request) throws IOException {
if (!isAllowedDelegationTokenOp()) {
throw new IOException(
"Delegation Token can be renewed only with kerberos authentication");
}
DelegationToken protoToken = request.getDelegationToken();
Token<MRDelegationTokenIdentifier> token = new Token<MRDelegationTokenIdentifier>(
protoToken.getIdentifier().array(), protoToken.getPassword()
.array(), new Text(protoToken.getKind()), new Text(
protoToken.getService()));
Token<MRDelegationTokenIdentifier> token =
new Token<MRDelegationTokenIdentifier>(
protoToken.getIdentifier().array(), protoToken.getPassword()
.array(), new Text(protoToken.getKind()), new Text(
protoToken.getService()));
String user = UserGroupInformation.getCurrentUser().getShortUserName();
long nextExpTime = jhsDTSecretManager.renewToken(token, user);
@ -370,45 +372,36 @@ public class HistoryClientService extends AbstractService {
.newRecord(RenewDelegationTokenResponse.class);
renewResponse.setNextExpirationTime(nextExpTime);
return renewResponse;
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnRemoteException {
try {
CancelDelegationTokenRequest request) throws IOException {
if (!isAllowedDelegationTokenOp()) {
throw new IOException(
"Delegation Token can be cancelled only with kerberos authentication");
}
DelegationToken protoToken = request.getDelegationToken();
Token<MRDelegationTokenIdentifier> token = new Token<MRDelegationTokenIdentifier>(
protoToken.getIdentifier().array(), protoToken.getPassword()
.array(), new Text(protoToken.getKind()), new Text(
protoToken.getService()));
Token<MRDelegationTokenIdentifier> token =
new Token<MRDelegationTokenIdentifier>(
protoToken.getIdentifier().array(), protoToken.getPassword()
.array(), new Text(protoToken.getKind()), new Text(
protoToken.getService()));
String user = UserGroupInformation.getCurrentUser().getShortUserName();
jhsDTSecretManager.cancelToken(token, user);
return Records.newRecord(CancelDelegationTokenResponse.class);
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
}
private void checkAccess(Job job, JobACL jobOperation)
throws YarnRemoteException {
throws IOException {
UserGroupInformation callerUGI;
try {
callerUGI = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
callerUGI = UserGroupInformation.getCurrentUser();
if (!job.checkAccess(callerUGI, jobOperation)) {
throw RPCUtil.getRemoteException(new AccessControlException("User "
throw new IOException(new AccessControlException("User "
+ callerUGI.getShortUserName() + " cannot perform operation "
+ jobOperation.name() + " on " + job.getID()));
}

View File

@ -75,7 +75,6 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.security.client.ClientTokenIdentifier;
import org.apache.hadoop.yarn.util.BuilderUtils;
@ -137,14 +136,19 @@ public class ClientServiceDelegate {
}
}
private MRClientProtocol getProxy() throws YarnRemoteException, IOException {
private MRClientProtocol getProxy() throws IOException {
if (realProxy != null) {
return realProxy;
}
// Possibly allow nulls through the PB tunnel, otherwise deal with an exception
// and redirect to the history server.
ApplicationReport application = rm.getApplicationReport(appId);
ApplicationReport application = null;
try {
application = rm.getApplicationReport(appId);
} catch (YarnRemoteException e2) {
throw new IOException(e2);
}
if (application != null) {
trackingUrl = application.getTrackingUrl();
}
@ -213,7 +217,11 @@ public class ClientServiceDelegate {
LOG.warn("getProxy() call interruped", e1);
throw new YarnException(e1);
}
application = rm.getApplicationReport(appId);
try {
application = rm.getApplicationReport(appId);
} catch (YarnRemoteException e1) {
throw new IOException(e1);
}
if (application == null) {
LOG.info("Could not get Job info from RM for job " + jobId
+ ". Redirecting to job history server.");
@ -222,6 +230,8 @@ public class ClientServiceDelegate {
} catch (InterruptedException e) {
LOG.warn("getProxy() call interruped", e);
throw new YarnException(e);
} catch (YarnRemoteException e) {
throw new IOException(e);
}
}
@ -231,7 +241,7 @@ public class ClientServiceDelegate {
*/
String user = application.getUser();
if (user == null) {
throw RPCUtil.getRemoteException("User is not set in the application report");
throw new IOException("User is not set in the application report");
}
if (application.getYarnApplicationState() == YarnApplicationState.NEW
|| application.getYarnApplicationState() ==
@ -300,23 +310,15 @@ public class ClientServiceDelegate {
while (maxRetries > 0) {
try {
return methodOb.invoke(getProxy(), args);
} catch (YarnRemoteException yre) {
LOG.warn("Exception thrown by remote end.", yre);
throw new IOException(yre);
} catch (InvocationTargetException e) {
if (e.getTargetException() instanceof YarnRemoteException) {
LOG.warn("Error from remote end: " + e
.getTargetException().getLocalizedMessage());
LOG.debug("Tracing remote error ", e.getTargetException());
throw new IOException(e.getTargetException());
}
// Will not throw out YarnRemoteException anymore
LOG.debug("Failed to contact AM/History for job " + jobId +
" retrying..", e.getTargetException());
// Force reconnection by setting the proxy to null.
realProxy = null;
// HS/AMS shut down
maxRetries--;
lastException = new IOException(e.getMessage());
lastException = new IOException(e.getTargetException());
} catch (Exception e) {
LOG.debug("Failed to contact AM/History for job " + jobId
@ -447,7 +449,7 @@ public class ClientServiceDelegate {
}
public LogParams getLogFilePath(JobID oldJobID, TaskAttemptID oldTaskAttemptID)
throws YarnRemoteException, IOException {
throws IOException {
org.apache.hadoop.mapreduce.v2.api.records.JobId jobId =
TypeConverter.toYarn(oldJobID);
GetJobReportRequest request =

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
@ -65,7 +66,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.BuilderUtils;
@ -101,7 +101,7 @@ public class NotRunningJob implements MRClientProtocol {
@Override
public FailTaskAttemptResponse failTaskAttempt(
FailTaskAttemptRequest request) throws YarnRemoteException {
FailTaskAttemptRequest request) throws IOException {
FailTaskAttemptResponse resp =
recordFactory.newRecordInstance(FailTaskAttemptResponse.class);
return resp;
@ -109,7 +109,7 @@ public class NotRunningJob implements MRClientProtocol {
@Override
public GetCountersResponse getCounters(GetCountersRequest request)
throws YarnRemoteException {
throws IOException {
GetCountersResponse resp =
recordFactory.newRecordInstance(GetCountersResponse.class);
Counters counters = recordFactory.newRecordInstance(Counters.class);
@ -120,7 +120,7 @@ public class NotRunningJob implements MRClientProtocol {
@Override
public GetDiagnosticsResponse getDiagnostics(GetDiagnosticsRequest request)
throws YarnRemoteException {
throws IOException {
GetDiagnosticsResponse resp =
recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
resp.addDiagnostics("");
@ -129,7 +129,7 @@ public class NotRunningJob implements MRClientProtocol {
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws YarnRemoteException {
throws IOException {
JobReport jobReport =
recordFactory.newRecordInstance(JobReport.class);
jobReport.setJobId(request.getJobId());
@ -150,7 +150,7 @@ public class NotRunningJob implements MRClientProtocol {
@Override
public GetTaskAttemptCompletionEventsResponse getTaskAttemptCompletionEvents(
GetTaskAttemptCompletionEventsRequest request)
throws YarnRemoteException {
throws IOException {
GetTaskAttemptCompletionEventsResponse resp =
recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
resp.addAllCompletionEvents(new ArrayList<TaskAttemptCompletionEvent>());
@ -159,14 +159,14 @@ public class NotRunningJob implements MRClientProtocol {
@Override
public GetTaskAttemptReportResponse getTaskAttemptReport(
GetTaskAttemptReportRequest request) throws YarnRemoteException {
GetTaskAttemptReportRequest request) throws IOException {
//not invoked by anybody
throw new NotImplementedException();
}
@Override
public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
throws YarnRemoteException {
throws IOException {
GetTaskReportResponse resp =
recordFactory.newRecordInstance(GetTaskReportResponse.class);
TaskReport report = recordFactory.newRecordInstance(TaskReport.class);
@ -181,7 +181,7 @@ public class NotRunningJob implements MRClientProtocol {
@Override
public GetTaskReportsResponse getTaskReports(GetTaskReportsRequest request)
throws YarnRemoteException {
throws IOException {
GetTaskReportsResponse resp =
recordFactory.newRecordInstance(GetTaskReportsResponse.class);
resp.addAllTaskReports(new ArrayList<TaskReport>());
@ -190,7 +190,7 @@ public class NotRunningJob implements MRClientProtocol {
@Override
public KillJobResponse killJob(KillJobRequest request)
throws YarnRemoteException {
throws IOException {
KillJobResponse resp =
recordFactory.newRecordInstance(KillJobResponse.class);
return resp;
@ -198,7 +198,7 @@ public class NotRunningJob implements MRClientProtocol {
@Override
public KillTaskResponse killTask(KillTaskRequest request)
throws YarnRemoteException {
throws IOException {
KillTaskResponse resp =
recordFactory.newRecordInstance(KillTaskResponse.class);
return resp;
@ -206,7 +206,7 @@ public class NotRunningJob implements MRClientProtocol {
@Override
public KillTaskAttemptResponse killTaskAttempt(
KillTaskAttemptRequest request) throws YarnRemoteException {
KillTaskAttemptRequest request) throws IOException {
KillTaskAttemptResponse resp =
recordFactory.newRecordInstance(KillTaskAttemptResponse.class);
return resp;
@ -214,21 +214,21 @@ public class NotRunningJob implements MRClientProtocol {
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnRemoteException {
GetDelegationTokenRequest request) throws IOException {
/* Should not be invoked by anyone. */
throw new NotImplementedException();
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnRemoteException {
RenewDelegationTokenRequest request) throws IOException {
/* Should not be invoked by anyone. */
throw new NotImplementedException();
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnRemoteException {
CancelDelegationTokenRequest request) throws IOException {
/* Should not be invoked by anyone. */
throw new NotImplementedException();
}

View File

@ -209,14 +209,10 @@ public class YARNRunner implements ClientProtocol {
.newRecordInstance(GetDelegationTokenRequest.class);
request.setRenewer(Master.getMasterPrincipal(conf));
DelegationToken mrDelegationToken;
try {
mrDelegationToken = hsProxy.getDelegationToken(request)
mrDelegationToken = hsProxy.getDelegationToken(request)
.getDelegationToken();
return ProtoUtils.convertFromProtoFormat(mrDelegationToken,
hsProxy.getConnectAddress());
} catch (YarnRemoteException e) {
throw new IOException(e);
}
return ProtoUtils.convertFromProtoFormat(mrDelegationToken,
hsProxy.getConnectAddress());
}
@Override
@ -627,11 +623,7 @@ public class YARNRunner implements ClientProtocol {
@Override
public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID)
throws IOException {
try {
return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID);
} catch (YarnRemoteException e) {
throw new IOException(e);
}
return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID);
}
private static void warnForJavaLibPath(String opts, String component,

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
@ -98,7 +99,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@ -257,13 +257,14 @@ public class TestClientRedirect {
}
@Override
public GetNewApplicationResponse getNewApplication(GetNewApplicationRequest request) throws YarnRemoteException {
public GetNewApplicationResponse getNewApplication(
GetNewApplicationRequest request) throws IOException {
return null;
}
@Override
public GetApplicationReportResponse getApplicationReport(
GetApplicationReportRequest request) throws YarnRemoteException {
GetApplicationReportRequest request) throws IOException {
ApplicationId applicationId = request.getApplicationId();
ApplicationReport application = recordFactory
.newRecordInstance(ApplicationReport.class);
@ -296,61 +297,61 @@ public class TestClientRedirect {
@Override
public SubmitApplicationResponse submitApplication(
SubmitApplicationRequest request) throws YarnRemoteException {
throw new YarnRemoteException("Test");
SubmitApplicationRequest request) throws IOException {
throw new IOException("Test");
}
@Override
public KillApplicationResponse forceKillApplication(
KillApplicationRequest request) throws YarnRemoteException {
KillApplicationRequest request) throws IOException {
return recordFactory.newRecordInstance(KillApplicationResponse.class);
}
@Override
public GetClusterMetricsResponse getClusterMetrics(
GetClusterMetricsRequest request) throws YarnRemoteException {
GetClusterMetricsRequest request) throws IOException {
return null;
}
@Override
public GetAllApplicationsResponse getAllApplications(
GetAllApplicationsRequest request) throws YarnRemoteException {
GetAllApplicationsRequest request) throws IOException {
return null;
}
@Override
public GetClusterNodesResponse getClusterNodes(
GetClusterNodesRequest request) throws YarnRemoteException {
GetClusterNodesRequest request) throws IOException {
return null;
}
@Override
public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
throws YarnRemoteException {
throws IOException {
return null;
}
@Override
public GetQueueUserAclsInfoResponse getQueueUserAcls(
GetQueueUserAclsInfoRequest request) throws YarnRemoteException {
GetQueueUserAclsInfoRequest request) throws IOException {
return null;
}
@Override
public GetDelegationTokenResponse getDelegationToken(
GetDelegationTokenRequest request) throws YarnRemoteException {
GetDelegationTokenRequest request) throws IOException {
return null;
}
@Override
public RenewDelegationTokenResponse renewDelegationToken(
RenewDelegationTokenRequest request) throws YarnRemoteException {
RenewDelegationTokenRequest request) throws IOException {
return null;
}
@Override
public CancelDelegationTokenResponse cancelDelegationToken(
CancelDelegationTokenRequest request) throws YarnRemoteException {
CancelDelegationTokenRequest request) throws IOException {
return null;
}
}
@ -362,7 +363,8 @@ public class TestClientRedirect {
}
@Override
public GetCountersResponse getCounters(GetCountersRequest request) throws YarnRemoteException {
public GetCountersResponse getCounters(GetCountersRequest request)
throws IOException {
hsContact = true;
Counters counters = getMyCounters();
GetCountersResponse response = recordFactory.newRecordInstance(GetCountersResponse.class);
@ -422,7 +424,7 @@ public class TestClientRedirect {
@Override
public GetCountersResponse getCounters(GetCountersRequest request)
throws YarnRemoteException {
throws IOException {
JobId jobID = request.getJobId();
amContact = true;
@ -436,7 +438,7 @@ public class TestClientRedirect {
@Override
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws YarnRemoteException {
throws IOException {
amContact = true;
@ -456,13 +458,13 @@ public class TestClientRedirect {
@Override
public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
throws YarnRemoteException {
throws IOException {
return null;
}
@Override
public GetTaskAttemptReportResponse getTaskAttemptReport(
GetTaskAttemptReportRequest request) throws YarnRemoteException {
GetTaskAttemptReportRequest request) throws IOException {
return null;
}
@ -470,66 +472,66 @@ public class TestClientRedirect {
public GetTaskAttemptCompletionEventsResponse
getTaskAttemptCompletionEvents(
GetTaskAttemptCompletionEventsRequest request)
throws YarnRemoteException {
throws IOException {
return null;
}
@Override
public GetTaskReportsResponse
getTaskReports(GetTaskReportsRequest request)
throws YarnRemoteException {
throws IOException {
return null;
}
@Override
public GetDiagnosticsResponse
getDiagnostics(GetDiagnosticsRequest request)
throws YarnRemoteException {
throws IOException {
return null;
}
@Override
public KillJobResponse killJob(KillJobRequest request)
throws YarnRemoteException {
throws IOException {
return recordFactory.newRecordInstance(KillJobResponse.class);
}
@Override
public KillTaskResponse killTask(KillTaskRequest request)
throws YarnRemoteException {
throws IOException {
return null;
}
@Override
public KillTaskAttemptResponse killTaskAttempt(
KillTaskAttemptRequest request) throws YarnRemoteException {
KillTaskAttemptRequest request) throws IOException {
return null;
}
@Override
public FailTaskAttemptResponse failTaskAttempt(
FailTaskAttemptRequest request) throws YarnRemoteException {
FailTaskAttemptRequest request) throws IOException {
return null;
}
@Override
public org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse getDelegationToken(
org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest request)
throws YarnRemoteException {
throws IOException {
return null;
}
@Override
public org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenResponse renewDelegationToken(
org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest request)
throws YarnRemoteException {
throws IOException {
return null;
}
@Override
public org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenResponse cancelDelegationToken(
org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest request)
throws YarnRemoteException {
throws IOException {
return null;
}
}

View File

@ -57,7 +57,6 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Test;
@ -103,7 +102,7 @@ public class TestClientServiceDelegate {
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
when(historyServerProxy.getJobReport(getJobReportRequest())).thenThrow(
RPCUtil.getRemoteException("Job ID doesnot Exist"));
new IOException("Job ID doesnot Exist"));
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
when(rm.getApplicationReport(TypeConverter.toYarn(oldJobId).getAppId()))
@ -199,8 +198,7 @@ public class TestClientServiceDelegate {
}
@Test
public void testReconnectOnAMRestart() throws IOException,
YarnRemoteException {
public void testReconnectOnAMRestart() throws IOException {
//test not applicable when AM not reachable
//as instantiateAMProxy is not called at all
if(!isAMReachableFromClient) {
@ -212,11 +210,15 @@ public class TestClientServiceDelegate {
// RM returns AM1 url, null, null and AM2 url on invocations.
// Nulls simulate the time when AM2 is in the process of restarting.
ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
when(rmDelegate.getApplicationReport(jobId.getAppId())).thenReturn(
getRunningApplicationReport("am1", 78)).thenReturn(
getRunningApplicationReport(null, 0)).thenReturn(
getRunningApplicationReport(null, 0)).thenReturn(
getRunningApplicationReport("am2", 90));
try {
when(rmDelegate.getApplicationReport(jobId.getAppId())).thenReturn(
getRunningApplicationReport("am1", 78)).thenReturn(
getRunningApplicationReport(null, 0)).thenReturn(
getRunningApplicationReport(null, 0)).thenReturn(
getRunningApplicationReport("am2", 90));
} catch (YarnRemoteException e) {
throw new IOException(e);
}
GetJobReportResponse jobReportResponse1 = mock(GetJobReportResponse.class);
when(jobReportResponse1.getJobReport()).thenReturn(
@ -267,7 +269,7 @@ public class TestClientServiceDelegate {
}
@Test
public void testAMAccessDisabled() throws IOException, YarnRemoteException {
public void testAMAccessDisabled() throws IOException {
//test only applicable when AM not reachable
if(isAMReachableFromClient) {
return;
@ -278,11 +280,15 @@ public class TestClientServiceDelegate {
getJobReportResponseFromHistoryServer());
ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
when(rmDelegate.getApplicationReport(jobId.getAppId())).thenReturn(
getRunningApplicationReport("am1", 78)).thenReturn(
try {
when(rmDelegate.getApplicationReport(jobId.getAppId())).thenReturn(
getRunningApplicationReport("am1", 78)).thenReturn(
getRunningApplicationReport("am1", 78)).thenReturn(
getFinishedApplicationReport());
getRunningApplicationReport("am1", 78)).thenReturn(
getFinishedApplicationReport());
} catch (YarnRemoteException e) {
throw new IOException(e);
}
ClientServiceDelegate clientServiceDelegate = spy(getClientServiceDelegate(
historyServerProxy, rmDelegate));
@ -319,8 +325,7 @@ public class TestClientServiceDelegate {
}
@Test
public void testRMDownForJobStatusBeforeGetAMReport() throws IOException,
YarnRemoteException {
public void testRMDownForJobStatusBeforeGetAMReport() throws IOException {
Configuration conf = new YarnConfiguration();
testRMDownForJobStatusBeforeGetAMReport(conf,
MRJobConfig.DEFAULT_MR_CLIENT_MAX_RETRIES);
@ -328,7 +333,7 @@ public class TestClientServiceDelegate {
@Test
public void testRMDownForJobStatusBeforeGetAMReportWithRetryTimes()
throws IOException, YarnRemoteException {
throws IOException {
Configuration conf = new YarnConfiguration();
conf.setInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, 2);
testRMDownForJobStatusBeforeGetAMReport(conf, conf.getInt(
@ -338,7 +343,7 @@ public class TestClientServiceDelegate {
@Test
public void testRMDownRestoreForJobStatusBeforeGetAMReport()
throws IOException, YarnRemoteException {
throws IOException {
Configuration conf = new YarnConfiguration();
conf.setInt(MRJobConfig.MR_CLIENT_MAX_RETRIES, 3);
@ -349,42 +354,52 @@ public class TestClientServiceDelegate {
when(historyServerProxy.getJobReport(any(GetJobReportRequest.class)))
.thenReturn(getJobReportResponse());
ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
new java.lang.reflect.UndeclaredThrowableException(new IOException(
"Connection refuced1"))).thenThrow(
new java.lang.reflect.UndeclaredThrowableException(new IOException(
"Connection refuced2"))).thenReturn(getFinishedApplicationReport());
ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
conf, rmDelegate, oldJobId, historyServerProxy);
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
verify(rmDelegate, times(3)).getApplicationReport(any(ApplicationId.class));
Assert.assertNotNull(jobStatus);
try {
when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
new java.lang.reflect.UndeclaredThrowableException(new IOException(
"Connection refuced1"))).thenThrow(
new java.lang.reflect.UndeclaredThrowableException(new IOException(
"Connection refuced2")))
.thenReturn(getFinishedApplicationReport());
ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
conf, rmDelegate, oldJobId, historyServerProxy);
JobStatus jobStatus = clientServiceDelegate.getJobStatus(oldJobId);
verify(rmDelegate, times(3)).getApplicationReport(
any(ApplicationId.class));
Assert.assertNotNull(jobStatus);
} catch (YarnRemoteException e) {
throw new IOException(e);
}
}
private void testRMDownForJobStatusBeforeGetAMReport(Configuration conf,
int noOfRetries) throws YarnRemoteException, IOException {
int noOfRetries) throws IOException {
conf.set(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
conf.setBoolean(MRJobConfig.JOB_AM_ACCESS_DISABLED,
!isAMReachableFromClient);
MRClientProtocol historyServerProxy = mock(MRClientProtocol.class);
ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class);
when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
new java.lang.reflect.UndeclaredThrowableException(new IOException(
"Connection refuced1"))).thenThrow(
new java.lang.reflect.UndeclaredThrowableException(new IOException(
"Connection refuced2"))).thenThrow(
new java.lang.reflect.UndeclaredThrowableException(new IOException(
"Connection refuced3")));
ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
conf, rmDelegate, oldJobId, historyServerProxy);
try {
clientServiceDelegate.getJobStatus(oldJobId);
Assert.fail("It should throw exception after retries");
} catch (IOException e) {
System.out.println("fail to get job status,and e=" + e.toString());
when(rmDelegate.getApplicationReport(jobId.getAppId())).thenThrow(
new java.lang.reflect.UndeclaredThrowableException(new IOException(
"Connection refuced1"))).thenThrow(
new java.lang.reflect.UndeclaredThrowableException(new IOException(
"Connection refuced2"))).thenThrow(
new java.lang.reflect.UndeclaredThrowableException(new IOException(
"Connection refuced3")));
ClientServiceDelegate clientServiceDelegate = new ClientServiceDelegate(
conf, rmDelegate, oldJobId, historyServerProxy);
try {
clientServiceDelegate.getJobStatus(oldJobId);
Assert.fail("It should throw exception after retries");
} catch (IOException e) {
System.out.println("fail to get job status,and e=" + e.toString());
}
verify(rmDelegate, times(noOfRetries)).getApplicationReport(
any(ApplicationId.class));
} catch (YarnRemoteException e) {
throw new IOException(e);
}
verify(rmDelegate, times(noOfRetries)).getApplicationReport(
any(ApplicationId.class));
}
private GetJobReportRequest getJobReportRequest() {
@ -429,10 +444,13 @@ public class TestClientServiceDelegate {
"N/A", 0.0f);
}
private ResourceMgrDelegate getRMDelegate() throws YarnRemoteException,
IOException {
private ResourceMgrDelegate getRMDelegate() throws IOException {
ResourceMgrDelegate rm = mock(ResourceMgrDelegate.class);
when(rm.getApplicationReport(jobId.getAppId())).thenReturn(null);
try {
when(rm.getApplicationReport(jobId.getAppId())).thenReturn(null);
} catch (YarnRemoteException e) {
throw new IOException(e);
}
return rm;
}

View File

@ -48,19 +48,21 @@ public class TestResourceMgrDelegate {
/**
* Tests that getRootQueues makes a request for the (recursive) child queues
* @throws YarnRemoteException
* @throws IOException
*/
@Test
public void testGetRootQueues() throws IOException, InterruptedException,
YarnRemoteException {
public void testGetRootQueues() throws IOException, InterruptedException {
final ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class);
GetQueueInfoResponse response = Mockito.mock(GetQueueInfoResponse.class);
org.apache.hadoop.yarn.api.records.QueueInfo queueInfo =
Mockito.mock(org.apache.hadoop.yarn.api.records.QueueInfo.class);
Mockito.when(response.getQueueInfo()).thenReturn(queueInfo);
Mockito.when(applicationsManager.getQueueInfo(Mockito.any(
GetQueueInfoRequest.class))).thenReturn(response);
try {
Mockito.when(applicationsManager.getQueueInfo(Mockito.any(
GetQueueInfoRequest.class))).thenReturn(response);
} catch (YarnRemoteException e) {
throw new IOException(e);
}
ResourceMgrDelegate delegate = new ResourceMgrDelegate(
new YarnConfiguration()) {
@ -73,8 +75,12 @@ public class TestResourceMgrDelegate {
ArgumentCaptor<GetQueueInfoRequest> argument =
ArgumentCaptor.forClass(GetQueueInfoRequest.class);
Mockito.verify(applicationsManager).getQueueInfo(
argument.capture());
try {
Mockito.verify(applicationsManager).getQueueInfo(
argument.capture());
} catch (YarnRemoteException e) {
throw new IOException(e);
}
Assert.assertTrue("Children of root queue not requested",
argument.getValue().getIncludeChildQueues());

View File

@ -175,6 +175,7 @@ public class TestMRJobClient extends ClusterMapReduceTestCase {
runTool(conf, jc, new String[] { "-kill-task", taid.toString() }, out);
fail(" this task should be killed");
} catch (IOException e) {
System.out.println(e);
// task completed
assertTrue(e.getMessage().contains("_0001_m_000000_1"));
}

View File

@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
@ -48,7 +47,6 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.yarn.api.records.DelegationToken;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.util.ProtoUtils;
import org.apache.hadoop.yarn.util.Records;
@ -62,8 +60,7 @@ public class TestJHSSecurity {
private static final Log LOG = LogFactory.getLog(TestJHSSecurity.class);
@Test
public void testDelegationToken() throws IOException, InterruptedException,
YarnRemoteException {
public void testDelegationToken() throws IOException, InterruptedException {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
@ -124,7 +121,7 @@ public class TestJHSSecurity {
jobReportRequest.setJobId(MRBuilderUtils.newJobId(123456, 1, 1));
try {
clientUsingDT.getJobReport(jobReportRequest);
} catch (YarnRemoteException e) {
} catch (IOException e) {
Assert.assertEquals("Unknown job job_123456_0001", e.getMessage());
}
@ -147,7 +144,7 @@ public class TestJHSSecurity {
// Valid token because of renewal.
try {
clientUsingDT.getJobReport(jobReportRequest);
} catch (UndeclaredThrowableException e) {
} catch (IOException e) {
Assert.assertEquals("Unknown job job_123456_0001", e.getMessage());
}
@ -161,7 +158,7 @@ public class TestJHSSecurity {
try {
clientUsingDT.getJobReport(jobReportRequest);
fail("Should not have succeeded with an expired token");
} catch (UndeclaredThrowableException e) {
} catch (IOException e) {
assertTrue(e.getCause().getMessage().contains("is expired"));
}
@ -183,7 +180,7 @@ public class TestJHSSecurity {
try {
clientUsingDT.getJobReport(jobReportRequest);
} catch (UndeclaredThrowableException e) {
} catch (IOException e) {
fail("Unexpected exception" + e);
}
cancelDelegationToken(loggedInUser, hsService, token);
@ -200,7 +197,7 @@ public class TestJHSSecurity {
try {
clientUsingDT.getJobReport(jobReportRequest);
fail("Should not have succeeded with a cancelled delegation token");
} catch (UndeclaredThrowableException e) {
} catch (IOException e) {
}
@ -219,7 +216,7 @@ public class TestJHSSecurity {
DelegationToken token = loggedInUser
.doAs(new PrivilegedExceptionAction<DelegationToken>() {
@Override
public DelegationToken run() throws YarnRemoteException {
public DelegationToken run() throws IOException {
GetDelegationTokenRequest request = Records
.newRecord(GetDelegationTokenRequest.class);
request.setRenewer(renewerString);
@ -236,7 +233,7 @@ public class TestJHSSecurity {
long nextExpTime = loggedInUser.doAs(new PrivilegedExceptionAction<Long>() {
@Override
public Long run() throws YarnRemoteException {
public Long run() throws IOException {
RenewDelegationTokenRequest request = Records
.newRecord(RenewDelegationTokenRequest.class);
request.setDelegationToken(dToken);
@ -252,7 +249,7 @@ public class TestJHSSecurity {
loggedInUser.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws YarnRemoteException {
public Void run() throws IOException {
CancelDelegationTokenRequest request = Records
.newRecord(CancelDelegationTokenRequest.class);
request.setDelegationToken(dToken);

View File

@ -46,7 +46,6 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.util.BuilderUtils;
@ -115,7 +114,7 @@ public class TestMRJobsWithHistoryService {
@Test (timeout = 30000)
public void testJobHistoryData() throws IOException, InterruptedException,
AvroRemoteException, ClassNotFoundException, YarnRemoteException {
AvroRemoteException, ClassNotFoundException {
if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) {
LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR
+ " not found. Not running test.");