MAPREDUCE-6160. Potential NullPointerException in MRClientProtocol interface implementation. Contributed by Rohith

This commit is contained in:
Jason Lowe 2014-12-01 22:39:22 +00:00
parent 0f9528b99a
commit 0c588904f8
5 changed files with 66 additions and 18 deletions

View File

@ -258,6 +258,9 @@ Release 2.7.0 - UNRELEASED
MAPREDUCE-6172. TestDbClasses timeouts are too aggressive (Varun Saxena
via jlowe)
MAPREDUCE-6160. Potential NullPointerException in MRClientProtocol
interface implementation. (Rohith via jlowe)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -184,11 +184,14 @@ public class MRClientService extends AbstractService implements ClientService {
return getBindAddress();
}
private Job verifyAndGetJob(JobId jobID,
JobACL accessType) throws IOException {
private Job verifyAndGetJob(JobId jobID, JobACL accessType,
boolean exceptionThrow) throws IOException {
Job job = appContext.getJob(jobID);
if (job == null && exceptionThrow) {
throw new IOException("Unknown Job " + jobID);
}
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
if (!job.checkAccess(ugi, accessType)) {
if (job != null && !job.checkAccess(ugi, accessType)) {
throw new AccessControlException("User " + ugi.getShortUserName()
+ " cannot perform operation " + accessType.name() + " on "
+ jobID);
@ -198,8 +201,8 @@ public class MRClientService extends AbstractService implements ClientService {
private Task verifyAndGetTask(TaskId taskID,
JobACL accessType) throws IOException {
Task task = verifyAndGetJob(taskID.getJobId(),
accessType).getTask(taskID);
Task task =
verifyAndGetJob(taskID.getJobId(), accessType, true).getTask(taskID);
if (task == null) {
throw new IOException("Unknown Task " + taskID);
}
@ -220,7 +223,7 @@ public class MRClientService extends AbstractService implements ClientService {
public GetCountersResponse getCounters(GetCountersRequest request)
throws IOException {
JobId jobId = request.getJobId();
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB, true);
GetCountersResponse response =
recordFactory.newRecordInstance(GetCountersResponse.class);
response.setCounters(TypeConverter.toYarn(job.getAllCounters()));
@ -231,7 +234,8 @@ public class MRClientService extends AbstractService implements ClientService {
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws IOException {
JobId jobId = request.getJobId();
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
// false is for retain compatibility
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB, false);
GetJobReportResponse response =
recordFactory.newRecordInstance(GetJobReportResponse.class);
if (job != null) {
@ -272,7 +276,7 @@ public class MRClientService extends AbstractService implements ClientService {
JobId jobId = request.getJobId();
int fromEventId = request.getFromEventId();
int maxEvents = request.getMaxEvents();
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB, true);
GetTaskAttemptCompletionEventsResponse response =
recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
@ -290,7 +294,7 @@ public class MRClientService extends AbstractService implements ClientService {
String message = "Kill job " + jobId + " received from " + callerUGI
+ " at " + Server.getRemoteAddress();
LOG.info(message);
verifyAndGetJob(jobId, JobACL.MODIFY_JOB);
verifyAndGetJob(jobId, JobACL.MODIFY_JOB, false);
appContext.getEventHandler().handle(
new JobDiagnosticsUpdateEvent(jobId, message));
appContext.getEventHandler().handle(
@ -382,7 +386,7 @@ public class MRClientService extends AbstractService implements ClientService {
GetTaskReportsResponse response =
recordFactory.newRecordInstance(GetTaskReportsResponse.class);
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB, true);
Collection<Task> tasks = job.getTasks(taskType).values();
LOG.info("Getting task report for " + taskType + " " + jobId
+ ". Report-size will be " + tasks.size());

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.app;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Iterator;
import java.util.List;
@ -28,8 +29,10 @@ import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
@ -179,6 +182,19 @@ public class TestMRClientService {
TaskAttemptEventType.TA_DONE));
app.waitForState(job, JobState.SUCCEEDED);
// For invalid jobid, throw IOException
gtreportsRequest =
recordFactory.newRecordInstance(GetTaskReportsRequest.class);
gtreportsRequest.setJobId(TypeConverter.toYarn(JobID
.forName("job_1415730144495_0001")));
gtreportsRequest.setTaskType(TaskType.REDUCE);
try {
proxy.getTaskReports(gtreportsRequest);
fail("IOException not thrown for invalid job id");
} catch (IOException e) {
// Expected
}
}
@Test

View File

@ -196,7 +196,8 @@ public class HistoryClientService extends AbstractService {
return getBindAddress();
}
private Job verifyAndGetJob(final JobId jobID) throws IOException {
private Job verifyAndGetJob(final JobId jobID, boolean exceptionThrow)
throws IOException {
UserGroupInformation loginUgi = null;
Job job = null;
try {
@ -212,6 +213,11 @@ public class HistoryClientService extends AbstractService {
} catch (InterruptedException e) {
throw new IOException(e);
}
if (job == null && exceptionThrow) {
throw new IOException("Unknown Job " + jobID);
}
if (job != null) {
JobACL operation = JobACL.VIEW_JOB;
checkAccess(job, operation);
@ -223,7 +229,7 @@ public class HistoryClientService extends AbstractService {
public GetCountersResponse getCounters(GetCountersRequest request)
throws IOException {
JobId jobId = request.getJobId();
Job job = verifyAndGetJob(jobId);
Job job = verifyAndGetJob(jobId, true);
GetCountersResponse response = recordFactory.newRecordInstance(GetCountersResponse.class);
response.setCounters(TypeConverter.toYarn(job.getAllCounters()));
return response;
@ -233,7 +239,7 @@ public class HistoryClientService extends AbstractService {
public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws IOException {
JobId jobId = request.getJobId();
Job job = verifyAndGetJob(jobId);
Job job = verifyAndGetJob(jobId, false);
GetJobReportResponse response = recordFactory.newRecordInstance(GetJobReportResponse.class);
if (job != null) {
response.setJobReport(job.getReport());
@ -248,7 +254,7 @@ public class HistoryClientService extends AbstractService {
public GetTaskAttemptReportResponse getTaskAttemptReport(
GetTaskAttemptReportRequest request) throws IOException {
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
Job job = verifyAndGetJob(taskAttemptId.getTaskId().getJobId());
Job job = verifyAndGetJob(taskAttemptId.getTaskId().getJobId(), true);
GetTaskAttemptReportResponse response = recordFactory.newRecordInstance(GetTaskAttemptReportResponse.class);
response.setTaskAttemptReport(job.getTask(taskAttemptId.getTaskId()).getAttempt(taskAttemptId).getReport());
return response;
@ -258,7 +264,7 @@ public class HistoryClientService extends AbstractService {
public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
throws IOException {
TaskId taskId = request.getTaskId();
Job job = verifyAndGetJob(taskId.getJobId());
Job job = verifyAndGetJob(taskId.getJobId(), true);
GetTaskReportResponse response = recordFactory.newRecordInstance(GetTaskReportResponse.class);
response.setTaskReport(job.getTask(taskId).getReport());
return response;
@ -272,7 +278,7 @@ public class HistoryClientService extends AbstractService {
int fromEventId = request.getFromEventId();
int maxEvents = request.getMaxEvents();
Job job = verifyAndGetJob(jobId);
Job job = verifyAndGetJob(jobId, true);
GetTaskAttemptCompletionEventsResponse response = recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
response.addAllCompletionEvents(Arrays.asList(job.getTaskAttemptCompletionEvents(fromEventId, maxEvents)));
return response;
@ -300,7 +306,7 @@ public class HistoryClientService extends AbstractService {
throws IOException {
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
Job job = verifyAndGetJob(taskAttemptId.getTaskId().getJobId());
Job job = verifyAndGetJob(taskAttemptId.getTaskId().getJobId(), true);
GetDiagnosticsResponse response = recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
response.addAllDiagnostics(job.getTask(taskAttemptId.getTaskId()).getAttempt(taskAttemptId).getDiagnostics());
@ -320,7 +326,7 @@ public class HistoryClientService extends AbstractService {
TaskType taskType = request.getTaskType();
GetTaskReportsResponse response = recordFactory.newRecordInstance(GetTaskReportsResponse.class);
Job job = verifyAndGetJob(jobId);
Job job = verifyAndGetJob(jobId, true);
Collection<Task> tasks = job.getTasks(taskType).values();
for (Task task : tasks) {
response.addTaskReport(task.getReport());

View File

@ -19,11 +19,14 @@
package org.apache.hadoop.mapreduce.v2.hs;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskCounter;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
@ -33,11 +36,13 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRe
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportResponse;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
@ -159,6 +164,20 @@ public class TestJobHistoryServer {
// Task state should be SUCCEEDED
assertEquals(TaskState.SUCCEEDED, reportResponse.getTaskReport()
.getTaskState());
// For invalid jobid, throw IOException
GetTaskReportsRequest gtreportsRequest =
recordFactory.newRecordInstance(GetTaskReportsRequest.class);
gtreportsRequest.setJobId(TypeConverter.toYarn(JobID
.forName("job_1415730144495_0001")));
gtreportsRequest.setTaskType(TaskType.REDUCE);
try {
protocol.getTaskReports(gtreportsRequest);
fail("IOException not thrown for invalid job id");
} catch (IOException e) {
// Expected
}
// test getTaskAttemptCompletionEvents
GetTaskAttemptCompletionEventsRequest taskAttemptRequest = recordFactory
.newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);