diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java index c4adadf8cc7..9dd45c3af62 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java @@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce; import java.io.IOException; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -104,36 +105,60 @@ public class JobSubmissionFiles { * @param cluster * @param conf */ - public static Path getStagingDir(Cluster cluster, Configuration conf) - throws IOException,InterruptedException { + public static Path getStagingDir(Cluster cluster, Configuration conf) + throws IOException, InterruptedException { + UserGroupInformation user = UserGroupInformation.getLoginUser(); + return getStagingDir(cluster, conf, user); + } + + /** + * Initializes the staging directory and returns the path. It also + * keeps track of all necessary ownership and permissions. + * It is kept for unit testing. + * + * @param cluster Information about the map/reduce cluster + * @param conf Configuration object + * @param realUser UserGroupInformation of login user + * @return staging dir path object + * @throws IOException when ownership of staging area directory does + * not match the login user or current user. + * @throws InterruptedException when getting the staging area directory path + */ + @VisibleForTesting + public static Path getStagingDir(Cluster cluster, Configuration conf, + UserGroupInformation realUser) throws IOException, InterruptedException { Path stagingArea = cluster.getStagingAreaDir(); FileSystem fs = stagingArea.getFileSystem(conf); - String realUser; - String currentUser; - UserGroupInformation ugi = UserGroupInformation.getLoginUser(); - realUser = ugi.getShortUserName(); - currentUser = UserGroupInformation.getCurrentUser().getShortUserName(); + UserGroupInformation currentUser = realUser.getCurrentUser(); if (fs.exists(stagingArea)) { FileStatus fsStatus = fs.getFileStatus(stagingArea); - String owner = fsStatus.getOwner(); - if (!(owner.equals(currentUser) || owner.equals(realUser))) { - throw new IOException("The ownership on the staging directory " + - stagingArea + " is not as expected. " + - "It is owned by " + owner + ". The directory must " + - "be owned by the submitter " + currentUser + " or " + - "by " + realUser); + String fileOwner = fsStatus.getOwner(); + if (!(fileOwner.equals(currentUser.getShortUserName()) || fileOwner + .equalsIgnoreCase(currentUser.getUserName()) || fileOwner + .equals(realUser.getShortUserName()) || fileOwner + .equalsIgnoreCase(realUser.getUserName()))) { + String errorMessage = "The ownership on the staging directory " + + stagingArea + " is not as expected. " + + "It is owned by " + fileOwner + ". The directory must " + + "be owned by the submitter " + currentUser.getShortUserName() + + " or " + currentUser.getUserName(); + if (!realUser.getUserName().equals(currentUser.getUserName())) { + throw new IOException( + errorMessage + " or " + realUser.getShortUserName() + " or " + + realUser.getUserName()); + } else { + throw new IOException(errorMessage); + } } if (!fsStatus.getPermission().equals(JOB_DIR_PERMISSION)) { LOG.info("Permissions on staging directory " + stagingArea + " are " + - "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " + - "to correct value " + JOB_DIR_PERMISSION); + "incorrect: " + fsStatus.getPermission() + ". Fixing permissions " + + "to correct value " + JOB_DIR_PERMISSION); fs.setPermission(stagingArea, JOB_DIR_PERMISSION); } } else { - fs.mkdirs(stagingArea, - new FsPermission(JOB_DIR_PERMISSION)); + fs.mkdirs(stagingArea, new FsPermission(JOB_DIR_PERMISSION)); } return stagingArea; } - } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobSubmissionFiles.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobSubmissionFiles.java new file mode 100644 index 00000000000..ab3f7a0a937 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/TestJobSubmissionFiles.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemTestHelper; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for JobSubmissionFiles Utility class. + */ +public class TestJobSubmissionFiles { + final private static String USER_1 = "user1@HADOOP.APACHE.ORG"; + final private static String USER_1_SHORT_NAME = "user1"; + final private static String GROUP1_NAME = "group1"; + final private static String GROUP2_NAME = "group2"; + final private static String GROUP3_NAME = "group3"; + final private static String[] GROUP_NAMES = new String[] {GROUP1_NAME, + GROUP2_NAME, GROUP3_NAME }; + + @Test + public void testGetStagingDirWhenFullFileOwnerNameAndFullUserName() + throws IOException, InterruptedException { + Cluster cluster = mock(Cluster.class); + Configuration conf = new Configuration(); + Path stagingPath = mock(Path.class); + UserGroupInformation user = UserGroupInformation + .createUserForTesting(USER_1, GROUP_NAMES); + assertEquals(USER_1, user.getUserName()); + FileSystem fs = new FileSystemTestHelper.MockFileSystem(); + when(cluster.getStagingAreaDir()).thenReturn(stagingPath); + when(stagingPath.getFileSystem(conf)).thenReturn(fs); + + //Staging directory owner full principal name is in lower case. + String stagingDirOwner = USER_1.toLowerCase(); + FileStatus fileStatus = new FileStatus(1, true, 1, 1, 100L, 100L, + FsPermission.getDefault(), stagingDirOwner, stagingDirOwner, + stagingPath); + when(fs.getFileStatus(stagingPath)).thenReturn(fileStatus); + assertEquals(stagingPath, + JobSubmissionFiles.getStagingDir(cluster, conf, user)); + + //Staging directory owner full principal name in upper and lower case + stagingDirOwner = USER_1; + fileStatus = new FileStatus(1, true, 1, 1, 100L, 100L, + FsPermission.getDefault(), stagingDirOwner, stagingDirOwner, + stagingPath); + when(fs.getFileStatus(stagingPath)).thenReturn(fileStatus); + assertEquals(stagingPath, + JobSubmissionFiles.getStagingDir(cluster, conf, user)); + } + + @Test(expected = IOException.class) + public void testGetStagingWhenFileOwnerNameAndCurrentUserNameDoesNotMatch() + throws IOException, InterruptedException { + Cluster cluster = mock(Cluster.class); + Configuration conf = new Configuration(); + String stagingDirOwner = "someuser"; + Path stagingPath = mock(Path.class); + UserGroupInformation user = UserGroupInformation + .createUserForTesting(USER_1, GROUP_NAMES); + assertEquals(USER_1, user.getUserName()); + FileSystem fs = new FileSystemTestHelper.MockFileSystem(); + FileStatus fileStatus = new FileStatus(1, true, 1, 1, 100L, 100L, + FsPermission.getDefault(), stagingDirOwner, stagingDirOwner, + stagingPath); + when(stagingPath.getFileSystem(conf)).thenReturn(fs); + when(fs.getFileStatus(stagingPath)).thenReturn(fileStatus); + when(cluster.getStagingAreaDir()).thenReturn(stagingPath); + assertEquals(stagingPath, + JobSubmissionFiles.getStagingDir(cluster, conf, user)); + } + + @Test + public void testGetStagingDirWhenShortFileOwnerNameAndFullUserName() + throws IOException, InterruptedException { + Cluster cluster = mock(Cluster.class); + Configuration conf = new Configuration(); + String stagingDirOwner = USER_1_SHORT_NAME; + Path stagingPath = mock(Path.class); + UserGroupInformation user = UserGroupInformation + .createUserForTesting(USER_1, GROUP_NAMES); + assertEquals(USER_1, user.getUserName()); + FileSystem fs = new FileSystemTestHelper.MockFileSystem(); + FileStatus fileStatus = new FileStatus(1, true, 1, 1, 100L, 100L, + FsPermission.getDefault(), stagingDirOwner, stagingDirOwner, + stagingPath); + when(stagingPath.getFileSystem(conf)).thenReturn(fs); + when(fs.getFileStatus(stagingPath)).thenReturn(fileStatus); + when(cluster.getStagingAreaDir()).thenReturn(stagingPath); + assertEquals(stagingPath, + JobSubmissionFiles.getStagingDir(cluster, conf, user)); + } + + @Test + public void testGetStagingDirWhenShortFileOwnerNameAndShortUserName() + throws IOException, InterruptedException { + Cluster cluster = mock(Cluster.class); + Configuration conf = new Configuration(); + String stagingDirOwner = USER_1_SHORT_NAME; + Path stagingPath = mock(Path.class); + UserGroupInformation user = UserGroupInformation + .createUserForTesting(USER_1_SHORT_NAME, GROUP_NAMES); + assertEquals(USER_1_SHORT_NAME, user.getUserName()); + FileSystem fs = new FileSystemTestHelper.MockFileSystem(); + FileStatus fileStatus = new FileStatus(1, true, 1, 1, 100L, 100L, + FsPermission.getDefault(), stagingDirOwner, stagingDirOwner, + stagingPath); + when(stagingPath.getFileSystem(conf)).thenReturn(fs); + when(fs.getFileStatus(stagingPath)).thenReturn(fileStatus); + when(cluster.getStagingAreaDir()).thenReturn(stagingPath); + assertEquals(stagingPath, + JobSubmissionFiles.getStagingDir(cluster, conf, user)); + } +}