MAPREDUCE-5475. MRClientService does not verify ACLs properly. Contributed by Jason Lowe

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1520156 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2013-09-04 22:23:40 +00:00
parent 49afc64cd4
commit 5540d77e2f
3 changed files with 131 additions and 22 deletions

View File

@ -251,6 +251,8 @@ Release 2.1.1-beta - UNRELEASED
commands to reboot, so that client can continue to track the overall job. commands to reboot, so that client can continue to track the overall job.
(Jian He via vinodkv) (Jian He via vinodkv)
MAPREDUCE-5475. MRClientService does not verify ACLs properly (jlowe)
Release 2.1.0-beta - 2013-08-22 Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -1329,6 +1331,8 @@ Release 0.23.10 - UNRELEASED
MAPREDUCE-5001. LocalJobRunner has race condition resulting in job MAPREDUCE-5001. LocalJobRunner has race condition resulting in job
failures (Sandy Ryza via jlowe) failures (Sandy Ryza via jlowe)
MAPREDUCE-5475. MRClientService does not verify ACLs properly (jlowe)
Release 0.23.9 - 2013-07-08 Release 0.23.9 - 2013-07-08
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -28,6 +28,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
@ -78,6 +79,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider; import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
import org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp; import org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
@ -175,16 +178,22 @@ public class MRClientService extends AbstractService
return getBindAddress(); return getBindAddress();
} }
private Job verifyAndGetJob(JobId jobID, private Job verifyAndGetJob(JobId jobID,
boolean modifyAccess) throws IOException { JobACL accessType) throws IOException {
Job job = appContext.getJob(jobID); Job job = appContext.getJob(jobID);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
if (!job.checkAccess(ugi, accessType)) {
throw new AccessControlException("User " + ugi.getShortUserName()
+ " cannot perform operation " + accessType.name() + " on "
+ jobID);
}
return job; return job;
} }
private Task verifyAndGetTask(TaskId taskID, private Task verifyAndGetTask(TaskId taskID,
boolean modifyAccess) throws IOException { JobACL accessType) throws IOException {
Task task = verifyAndGetJob(taskID.getJobId(), Task task = verifyAndGetJob(taskID.getJobId(),
modifyAccess).getTask(taskID); accessType).getTask(taskID);
if (task == null) { if (task == null) {
throw new IOException("Unknown Task " + taskID); throw new IOException("Unknown Task " + taskID);
} }
@ -192,9 +201,9 @@ public class MRClientService extends AbstractService
} }
private TaskAttempt verifyAndGetAttempt(TaskAttemptId attemptID, private TaskAttempt verifyAndGetAttempt(TaskAttemptId attemptID,
boolean modifyAccess) throws IOException { JobACL accessType) throws IOException {
TaskAttempt attempt = verifyAndGetTask(attemptID.getTaskId(), TaskAttempt attempt = verifyAndGetTask(attemptID.getTaskId(),
modifyAccess).getAttempt(attemptID); accessType).getAttempt(attemptID);
if (attempt == null) { if (attempt == null) {
throw new IOException("Unknown TaskAttempt " + attemptID); throw new IOException("Unknown TaskAttempt " + attemptID);
} }
@ -205,7 +214,7 @@ public class MRClientService extends AbstractService
public GetCountersResponse getCounters(GetCountersRequest request) public GetCountersResponse getCounters(GetCountersRequest request)
throws IOException { throws IOException {
JobId jobId = request.getJobId(); JobId jobId = request.getJobId();
Job job = verifyAndGetJob(jobId, false); Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
GetCountersResponse response = GetCountersResponse response =
recordFactory.newRecordInstance(GetCountersResponse.class); recordFactory.newRecordInstance(GetCountersResponse.class);
response.setCounters(TypeConverter.toYarn(job.getAllCounters())); response.setCounters(TypeConverter.toYarn(job.getAllCounters()));
@ -216,7 +225,7 @@ public class MRClientService extends AbstractService
public GetJobReportResponse getJobReport(GetJobReportRequest request) public GetJobReportResponse getJobReport(GetJobReportRequest request)
throws IOException { throws IOException {
JobId jobId = request.getJobId(); JobId jobId = request.getJobId();
Job job = verifyAndGetJob(jobId, false); Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
GetJobReportResponse response = GetJobReportResponse response =
recordFactory.newRecordInstance(GetJobReportResponse.class); recordFactory.newRecordInstance(GetJobReportResponse.class);
if (job != null) { if (job != null) {
@ -235,7 +244,7 @@ public class MRClientService extends AbstractService
GetTaskAttemptReportResponse response = GetTaskAttemptReportResponse response =
recordFactory.newRecordInstance(GetTaskAttemptReportResponse.class); recordFactory.newRecordInstance(GetTaskAttemptReportResponse.class);
response.setTaskAttemptReport( response.setTaskAttemptReport(
verifyAndGetAttempt(taskAttemptId, false).getReport()); verifyAndGetAttempt(taskAttemptId, JobACL.VIEW_JOB).getReport());
return response; return response;
} }
@ -245,7 +254,8 @@ public class MRClientService extends AbstractService
TaskId taskId = request.getTaskId(); TaskId taskId = request.getTaskId();
GetTaskReportResponse response = GetTaskReportResponse response =
recordFactory.newRecordInstance(GetTaskReportResponse.class); recordFactory.newRecordInstance(GetTaskReportResponse.class);
response.setTaskReport(verifyAndGetTask(taskId, false).getReport()); response.setTaskReport(
verifyAndGetTask(taskId, JobACL.VIEW_JOB).getReport());
return response; return response;
} }
@ -256,7 +266,7 @@ public class MRClientService extends AbstractService
JobId jobId = request.getJobId(); JobId jobId = request.getJobId();
int fromEventId = request.getFromEventId(); int fromEventId = request.getFromEventId();
int maxEvents = request.getMaxEvents(); int maxEvents = request.getMaxEvents();
Job job = verifyAndGetJob(jobId, false); Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
GetTaskAttemptCompletionEventsResponse response = GetTaskAttemptCompletionEventsResponse response =
recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class); recordFactory.newRecordInstance(GetTaskAttemptCompletionEventsResponse.class);
@ -270,9 +280,11 @@ public class MRClientService extends AbstractService
public KillJobResponse killJob(KillJobRequest request) public KillJobResponse killJob(KillJobRequest request)
throws IOException { throws IOException {
JobId jobId = request.getJobId(); JobId jobId = request.getJobId();
String message = "Kill Job received from client " + jobId; UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
String message = "Kill job " + jobId + " received from " + callerUGI
+ " at " + Server.getRemoteAddress();
LOG.info(message); LOG.info(message);
verifyAndGetJob(jobId, true); verifyAndGetJob(jobId, JobACL.MODIFY_JOB);
appContext.getEventHandler().handle( appContext.getEventHandler().handle(
new JobDiagnosticsUpdateEvent(jobId, message)); new JobDiagnosticsUpdateEvent(jobId, message));
appContext.getEventHandler().handle( appContext.getEventHandler().handle(
@ -287,9 +299,11 @@ public class MRClientService extends AbstractService
public KillTaskResponse killTask(KillTaskRequest request) public KillTaskResponse killTask(KillTaskRequest request)
throws IOException { throws IOException {
TaskId taskId = request.getTaskId(); TaskId taskId = request.getTaskId();
String message = "Kill task received from client " + taskId; UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
String message = "Kill task " + taskId + " received from " + callerUGI
+ " at " + Server.getRemoteAddress();
LOG.info(message); LOG.info(message);
verifyAndGetTask(taskId, true); verifyAndGetTask(taskId, JobACL.MODIFY_JOB);
appContext.getEventHandler().handle( appContext.getEventHandler().handle(
new TaskEvent(taskId, TaskEventType.T_KILL)); new TaskEvent(taskId, TaskEventType.T_KILL));
KillTaskResponse response = KillTaskResponse response =
@ -302,9 +316,12 @@ public class MRClientService extends AbstractService
public KillTaskAttemptResponse killTaskAttempt( public KillTaskAttemptResponse killTaskAttempt(
KillTaskAttemptRequest request) throws IOException { KillTaskAttemptRequest request) throws IOException {
TaskAttemptId taskAttemptId = request.getTaskAttemptId(); TaskAttemptId taskAttemptId = request.getTaskAttemptId();
String message = "Kill task attempt received from client " + taskAttemptId; UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
String message = "Kill task attempt " + taskAttemptId
+ " received from " + callerUGI + " at "
+ Server.getRemoteAddress();
LOG.info(message); LOG.info(message);
verifyAndGetAttempt(taskAttemptId, true); verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB);
appContext.getEventHandler().handle( appContext.getEventHandler().handle(
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message)); new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
appContext.getEventHandler().handle( appContext.getEventHandler().handle(
@ -322,8 +339,8 @@ public class MRClientService extends AbstractService
GetDiagnosticsResponse response = GetDiagnosticsResponse response =
recordFactory.newRecordInstance(GetDiagnosticsResponse.class); recordFactory.newRecordInstance(GetDiagnosticsResponse.class);
response.addAllDiagnostics( response.addAllDiagnostics(verifyAndGetAttempt(taskAttemptId,
verifyAndGetAttempt(taskAttemptId, false).getDiagnostics()); JobACL.VIEW_JOB).getDiagnostics());
return response; return response;
} }
@ -332,9 +349,12 @@ public class MRClientService extends AbstractService
public FailTaskAttemptResponse failTaskAttempt( public FailTaskAttemptResponse failTaskAttempt(
FailTaskAttemptRequest request) throws IOException { FailTaskAttemptRequest request) throws IOException {
TaskAttemptId taskAttemptId = request.getTaskAttemptId(); TaskAttemptId taskAttemptId = request.getTaskAttemptId();
String message = "Fail task attempt received from client " + taskAttemptId; UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
String message = "Fail task attempt " + taskAttemptId
+ " received from " + callerUGI + " at "
+ Server.getRemoteAddress();
LOG.info(message); LOG.info(message);
verifyAndGetAttempt(taskAttemptId, true); verifyAndGetAttempt(taskAttemptId, JobACL.MODIFY_JOB);
appContext.getEventHandler().handle( appContext.getEventHandler().handle(
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message)); new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
appContext.getEventHandler().handle( appContext.getEventHandler().handle(
@ -356,7 +376,7 @@ public class MRClientService extends AbstractService
GetTaskReportsResponse response = GetTaskReportsResponse response =
recordFactory.newRecordInstance(GetTaskReportsResponse.class); recordFactory.newRecordInstance(GetTaskReportsResponse.class);
Job job = verifyAndGetJob(jobId, false); Job job = verifyAndGetJob(jobId, JobACL.VIEW_JOB);
Collection<Task> tasks = job.getTasks(taskType).values(); Collection<Task> tasks = job.getTasks(taskType).values();
LOG.info("Getting task report for " + taskType + " " + jobId LOG.info("Getting task report for " + taskType + " " + jobId
+ ". Report-size will be " + tasks.size()); + ". Report-size will be " + tasks.size());

View File

@ -18,13 +18,20 @@
package org.apache.hadoop.mapreduce.v2.app; package org.apache.hadoop.mapreduce.v2.app;
import static org.junit.Assert.fail;
import java.security.PrivilegedExceptionAction;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
@ -32,6 +39,9 @@ import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptCompleti
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskAttemptReportRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetTaskReportsRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillJobRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskAttemptRequest;
import org.apache.hadoop.mapreduce.v2.api.protocolrecords.KillTaskRequest;
import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport; import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@ -51,6 +61,8 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.ipc.YarnRPC;
@ -169,6 +181,79 @@ public class TestMRClientService {
app.waitForState(job, JobState.SUCCEEDED); app.waitForState(job, JobState.SUCCEEDED);
} }
@Test
public void testViewAclOnlyCannotModify() throws Exception {
final MRAppWithClientService app = new MRAppWithClientService(1, 0, false);
final Configuration conf = new Configuration();
conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
conf.set(MRJobConfig.JOB_ACL_VIEW_JOB, "viewonlyuser");
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task task = it.next();
app.waitForState(task, TaskState.RUNNING);
TaskAttempt attempt = task.getAttempts().values().iterator().next();
app.waitForState(attempt, TaskAttemptState.RUNNING);
UserGroupInformation viewOnlyUser =
UserGroupInformation.createUserForTesting(
"viewonlyuser", new String[] {});
Assert.assertTrue("viewonlyuser cannot view job",
job.checkAccess(viewOnlyUser, JobACL.VIEW_JOB));
Assert.assertFalse("viewonlyuser can modify job",
job.checkAccess(viewOnlyUser, JobACL.MODIFY_JOB));
MRClientProtocol client = viewOnlyUser.doAs(
new PrivilegedExceptionAction<MRClientProtocol>() {
@Override
public MRClientProtocol run() throws Exception {
YarnRPC rpc = YarnRPC.create(conf);
return (MRClientProtocol) rpc.getProxy(MRClientProtocol.class,
app.clientService.getBindAddress(), conf);
}
});
KillJobRequest killJobRequest = recordFactory.newRecordInstance(
KillJobRequest.class);
killJobRequest.setJobId(app.getJobId());
try {
client.killJob(killJobRequest);
fail("viewonlyuser killed job");
} catch (AccessControlException e) {
// pass
}
KillTaskRequest killTaskRequest = recordFactory.newRecordInstance(
KillTaskRequest.class);
killTaskRequest.setTaskId(task.getID());
try {
client.killTask(killTaskRequest);
fail("viewonlyuser killed task");
} catch (AccessControlException e) {
// pass
}
KillTaskAttemptRequest killTaskAttemptRequest =
recordFactory.newRecordInstance(KillTaskAttemptRequest.class);
killTaskAttemptRequest.setTaskAttemptId(attempt.getID());
try {
client.killTaskAttempt(killTaskAttemptRequest);
fail("viewonlyuser killed task attempt");
} catch (AccessControlException e) {
// pass
}
FailTaskAttemptRequest failTaskAttemptRequest =
recordFactory.newRecordInstance(FailTaskAttemptRequest.class);
failTaskAttemptRequest.setTaskAttemptId(attempt.getID());
try {
client.failTaskAttempt(failTaskAttemptRequest);
fail("viewonlyuser killed task attempt");
} catch (AccessControlException e) {
// pass
}
}
private void verifyJobReport(JobReport jr) { private void verifyJobReport(JobReport jr) {
Assert.assertNotNull("JobReport is null", jr); Assert.assertNotNull("JobReport is null", jr);
List<AMInfo> amInfos = jr.getAMInfos(); List<AMInfo> amInfos = jr.getAMInfos();