MAPREDUCE-6160. Potential NullPointerException in MRClientProtocol interface implementation. Contributed by Rohith
(cherry picked from commit 0c588904f8
)
This commit is contained in:
parent
38ea1419f6
commit
b6d7b789bd
|
@ -31,6 +31,9 @@ Release 2.7.0 - UNRELEASED
|
|||
MAPREDUCE-6172. TestDbClasses timeouts are too aggressive (Varun Saxena
|
||||
via jlowe)
|
||||
|
||||
MAPREDUCE-6160. Potential NullPointerException in MRClientProtocol
|
||||
interface implementation. (Rohith via jlowe)
|
||||
|
||||
Release 2.6.0 - 2014-11-18
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -184,11 +184,14 @@ public class MRClientService extends AbstractService implements ClientService {
|
|||
return getBindAddress();
|
||||
}
|
||||
|
||||
private Job verifyAndGetJob(JobId jobID,
|
||||
JobACL accessType) throws IOException {
|
||||
private Job verifyAndGetJob(JobId jobID, JobACL accessType,
|
||||
boolean exceptionThrow) throws IOException {
|
||||
Job job = appContext.getJob(jobID);
|
||||
if (job == null && exceptionThrow) {
|
||||
throw new IOException("Unknown Job " + jobID);
|
||||
}
|
||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
if (!job.checkAccess(ugi, accessType)) {
|
||||
if (job != null && !job.checkAccess(ugi, accessType)) {
|
||||
throw new AccessControlException("User " + ugi.getShortUserName()
|
||||
+ " cannot perform operation " + accessType.name() + " on "
|
||||
+ jobID);
|
||||
|
@ -198,8 +201,8 @@ public class MRClientService extends AbstractService implements ClientService {
|
|||
|
||||
private Task verifyAndGetTask(TaskId taskID,
|
||||
JobACL accessType) throws IOException {
|
||||
Task task = verifyAndGetJob(taskID.getJobId(),
|
||||
accessType).getTask(taskID);
|
||||
Task task =
|
||||
verifyAndGetJob(taskID.getJobId(), accessType, true).getTask(taskID);
|
||||
if (task == null) {
|
||||
throw new IOException("Unknown Task " + taskID);
|
||||
}
|
||||
|
@ -220,7 +223,7 @@ public class MRClientService extends AbstractService implements ClientService {
|
|||
public GetCountersResponse getCounters(GetCountersRequest request)
|
||||
throws IOException {
|
||||
JobId jobId = request.getJobId();
|
||||
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
|
||||
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB, true);
|
||||
GetCountersResponse response =
|
||||
recordFactory.newRecordInstance(GetCountersResponse.class);
|
||||
response.setCounters(TypeConverter.toYarn(job.getAllCounters()));
|
||||
|
@ -231,7 +234,8 @@ public class MRClientService extends AbstractService implements ClientService {
|
|||
public GetJobReportResponse getJobReport(GetJobReportRequest request)
|
||||
throws IOException {
|
||||
JobId jobId = request.getJobId();
|
||||
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
|
||||
// false is for retain compatibility
|
||||
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB, false);
|
||||
GetJobReportResponse response =
|
||||
recordFactory.newRecordInstance(GetJobReportResponse.class);
|
||||
if (job != null) {
|
||||
|
@ -272,7 +276,7 @@ public class MRClientService extends AbstractService implements ClientService {
|
|||
JobId jobId = request.getJobId();
|
||||
int fromEventId = request.getFromEventId();
|
||||
int maxEvents = request.getMaxEvents();
|
||||
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
|
||||
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB, true);
|
||||
|
||||
GetTaskAttemptCompletionEventsResponse response =
|
||||
recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
|
||||
|
@ -290,7 +294,7 @@ public class MRClientService extends AbstractService implements ClientService {
|
|||
String message = "Kill job " + jobId + " received from " + callerUGI
|
||||
+ " at " + Server.getRemoteAddress();
|
||||
LOG.info(message);
|
||||
verifyAndGetJob(jobId, JobACL.MODIFY_JOB);
|
||||
verifyAndGetJob(jobId, JobACL.MODIFY_JOB, false);
|
||||
appContext.getEventHandler().handle(
|
||||
new JobDiagnosticsUpdateEvent(jobId, message));
|
||||
appContext.getEventHandler().handle(
|
||||
|
@ -382,7 +386,7 @@ public class MRClientService extends AbstractService implements ClientService {
|
|||
GetTaskReportsResponse response =
|
||||
recordFactory.newRecordInstance(GetTaskReportsResponse.class);
|
||||
|
||||
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
|
||||
Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB, true);
|
||||
Collection<Task> tasks = job.getTasks(taskType).values();
|
||||
LOG.info("Getting task report for " + taskType + " " + jobId
|
||||
+ ". Report-size will be " + tasks.size());
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.app;
|
|||
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -28,8 +29,10 @@ import org.junit.Assert;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.mapreduce.JobACL;
|
||||
import org.apache.hadoop.mapreduce.JobID;
|
||||
import org.apache.hadoop.mapreduce.MRConfig;
|
||||
import org.apache.hadoop.mapreduce.MRJobConfig;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
|
||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
|
||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
|
||||
|
@ -179,6 +182,19 @@ public class TestMRClientService {
|
|||
TaskAttemptEventType.TA_DONE));
|
||||
|
||||
app.waitForState(job, JobState.SUCCEEDED);
|
||||
|
||||
// For invalid jobid, throw IOException
|
||||
gtreportsRequest =
|
||||
recordFactory.newRecordInstance(GetTaskReportsRequest.class);
|
||||
gtreportsRequest.setJobId(TypeConverter.toYarn(JobID
|
||||
.forName("job_1415730144495_0001")));
|
||||
gtreportsRequest.setTaskType(TaskType.REDUCE);
|
||||
try {
|
||||
proxy.getTaskReports(gtreportsRequest);
|
||||
fail("IOException not thrown for invalid job id");
|
||||
} catch (IOException e) {
|
||||
// Expected
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -196,7 +196,8 @@ public class HistoryClientService extends AbstractService {
|
|||
return getBindAddress();
|
||||
}
|
||||
|
||||
private Job verifyAndGetJob(final JobId jobID) throws IOException {
|
||||
private Job verifyAndGetJob(final JobId jobID, boolean exceptionThrow)
|
||||
throws IOException {
|
||||
UserGroupInformation loginUgi = null;
|
||||
Job job = null;
|
||||
try {
|
||||
|
@ -212,6 +213,11 @@ public class HistoryClientService extends AbstractService {
|
|||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
|
||||
if (job == null && exceptionThrow) {
|
||||
throw new IOException("Unknown Job " + jobID);
|
||||
}
|
||||
|
||||
if (job != null) {
|
||||
JobACL operation = JobACL.VIEW_JOB;
|
||||
checkAccess(job, operation);
|
||||
|
@ -223,7 +229,7 @@ public class HistoryClientService extends AbstractService {
|
|||
public GetCountersResponse getCounters(GetCountersRequest request)
|
||||
throws IOException {
|
||||
JobId jobId = request.getJobId();
|
||||
Job job = verifyAndGetJob(jobId);
|
||||
Job job = verifyAndGetJob(jobId, true);
|
||||
GetCountersResponse response = recordFactory.newRecordInstance(GetCountersResponse.class);
|
||||
response.setCounters(TypeConverter.toYarn(job.getAllCounters()));
|
||||
return response;
|
||||
|
@ -233,7 +239,7 @@ public class HistoryClientService extends AbstractService {
|
|||
public GetJobReportResponse getJobReport(GetJobReportRequest request)
|
||||
throws IOException {
|
||||
JobId jobId = request.getJobId();
|
||||
Job job = verifyAndGetJob(jobId);
|
||||
Job job = verifyAndGetJob(jobId, false);
|
||||
GetJobReportResponse response = recordFactory.newRecordInstance(GetJobReportResponse.class);
|
||||
if (job != null) {
|
||||
response.setJobReport(job.getReport());
|
||||
|
@ -248,7 +254,7 @@ public class HistoryClientService extends AbstractService {
|
|||
public GetTaskAttemptReportResponse getTaskAttemptReport(
|
||||
GetTaskAttemptReportRequest request) throws IOException {
|
||||
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
|
||||
Job job = verifyAndGetJob(taskAttemptId.getTaskId().getJobId());
|
||||
Job job = verifyAndGetJob(taskAttemptId.getTaskId().getJobId(), true);
|
||||
GetTaskAttemptReportResponse response = recordFactory.newRecordInstance(GetTaskAttemptReportResponse.class);
|
||||
response.setTaskAttemptReport(job.getTask(taskAttemptId.getTaskId()).getAttempt(taskAttemptId).getReport());
|
||||
return response;
|
||||
|
@ -258,7 +264,7 @@ public class HistoryClientService extends AbstractService {
|
|||
public GetTaskReportResponse getTaskReport(GetTaskReportRequest request)
|
||||
throws IOException {
|
||||
TaskId taskId = request.getTaskId();
|
||||
Job job = verifyAndGetJob(taskId.getJobId());
|
||||
Job job = verifyAndGetJob(taskId.getJobId(), true);
|
||||
GetTaskReportResponse response = recordFactory.newRecordInstance(GetTaskReportResponse.class);
|
||||
response.setTaskReport(job.getTask(taskId).getReport());
|
||||
return response;
|
||||
|
@ -272,7 +278,7 @@ public class HistoryClientService extends AbstractService {
|
|||
int fromEventId = request.getFromEventId();
|
||||
int maxEvents = request.getMaxEvents();
|
||||
|
||||
Job job = verifyAndGetJob(jobId);
|
||||
Job job = verifyAndGetJob(jobId, true);
|
||||
GetTaskAttemptCompletionEventsResponse response = recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
|
||||
response.addAllCompletionEvents(Arrays.asList(job.getTaskAttemptCompletionEvents(fromEventId, maxEvents)));
|
||||
return response;
|
||||
|
@ -300,7 +306,7 @@ public class HistoryClientService extends AbstractService {
|
|||
throws IOException {
|
||||
TaskAttemptId taskAttemptId = request.getTaskAttemptId();
|
||||
|
||||
Job job = verifyAndGetJob(taskAttemptId.getTaskId().getJobId());
|
||||
Job job = verifyAndGetJob(taskAttemptId.getTaskId().getJobId(), true);
|
||||
|
||||
GetDiagnosticsResponse response = recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
|
||||
response.addAllDiagnostics(job.getTask(taskAttemptId.getTaskId()).getAttempt(taskAttemptId).getDiagnostics());
|
||||
|
@ -320,7 +326,7 @@ public class HistoryClientService extends AbstractService {
|
|||
TaskType taskType = request.getTaskType();
|
||||
|
||||
GetTaskReportsResponse response = recordFactory.newRecordInstance(GetTaskReportsResponse.class);
|
||||
Job job = verifyAndGetJob(jobId);
|
||||
Job job = verifyAndGetJob(jobId, true);
|
||||
Collection<Task> tasks = job.getTasks(taskType).values();
|
||||
for (Task task : tasks) {
|
||||
response.addTaskReport(task.getReport());
|
||||
|
|
|
@ -19,11 +19,14 @@
|
|||
package org.apache.hadoop.mapreduce.v2.hs;
|
||||
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.mapreduce.JobID;
|
||||
import org.apache.hadoop.mapreduce.TaskCounter;
|
||||
import org.apache.hadoop.mapreduce.TypeConverter;
|
||||
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
|
||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
|
||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
|
||||
|
@ -33,11 +36,13 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRe
|
|||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportResponse;
|
||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
|
||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportResponse;
|
||||
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
|
||||
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
||||
import org.apache.hadoop.mapreduce.v2.app.MRApp;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Job;
|
||||
import org.apache.hadoop.mapreduce.v2.app.job.Task;
|
||||
|
@ -159,6 +164,20 @@ public class TestJobHistoryServer {
|
|||
// Task state should be SUCCEEDED
|
||||
assertEquals(TaskState.SUCCEEDED, reportResponse.getTaskReport()
|
||||
.getTaskState());
|
||||
|
||||
// For invalid jobid, throw IOException
|
||||
GetTaskReportsRequest gtreportsRequest =
|
||||
recordFactory.newRecordInstance(GetTaskReportsRequest.class);
|
||||
gtreportsRequest.setJobId(TypeConverter.toYarn(JobID
|
||||
.forName("job_1415730144495_0001")));
|
||||
gtreportsRequest.setTaskType(TaskType.REDUCE);
|
||||
try {
|
||||
protocol.getTaskReports(gtreportsRequest);
|
||||
fail("IOException not thrown for invalid job id");
|
||||
} catch (IOException e) {
|
||||
// Expected
|
||||
}
|
||||
|
||||
// test getTaskAttemptCompletionEvents
|
||||
GetTaskAttemptCompletionEventsRequest taskAttemptRequest = recordFactory
|
||||
.newRecordInstance(GetTaskAttemptCompletionEventsRequest.class);
|
||||
|
|
Loading…
Reference in New Issue