YARN-5373. NPE listing wildcard directory in containerLaunch. (Daniel Templeton via kasha)

This commit is contained in:
Karthik Kambatla 2016-08-26 11:04:33 -07:00
parent 9ef632f3b0
commit cde3a00526
10 changed files with 325 additions and 42 deletions

View File

@ -67,8 +67,8 @@ import org.apache.hadoop.util.StringUtils;
* underlying OS. All executor implementations must extend ContainerExecutor.
*/
public abstract class ContainerExecutor implements Configurable {
private static final String WILDCARD = "*";
private static final Log LOG = LogFactory.getLog(ContainerExecutor.class);
protected static final String WILDCARD = "*";
/**
* The permissions to use when creating the launch script.
@ -274,15 +274,16 @@ public abstract class ContainerExecutor implements Configurable {
* @param environment the environment variables and their values
* @param resources the resources which have been localized for this
* container. Symlinks will be created to these localized resources
* @param command the command that will be run.
* @param logDir the log dir to copy debugging information to
* @param command the command that will be run
* @param logDir the log dir to which to copy debugging information
* @param user the username of the job owner
* @throws IOException if any errors happened writing to the OutputStream,
* while creating symlinks
*/
public void writeLaunchEnv(OutputStream out, Map<String, String> environment,
Map<Path, List<String>> resources, List<String> command, Path logDir)
throws IOException {
this.writeLaunchEnv(out, environment, resources, command, logDir,
Map<Path, List<String>> resources, List<String> command, Path logDir,
String user) throws IOException {
this.writeLaunchEnv(out, environment, resources, command, logDir, user,
ContainerLaunch.CONTAINER_SCRIPT);
}
@ -295,17 +296,17 @@ public abstract class ContainerExecutor implements Configurable {
* @param environment the environment variables and their values
* @param resources the resources which have been localized for this
* container. Symlinks will be created to these localized resources
* @param command the command that will be run.
* @param logDir the log dir to copy debugging information to
* @param command the command that will be run
* @param logDir the log dir to which to copy debugging information
* @param user the username of the job owner
* @param outFilename the path to which to write the launch environment
* @throws IOException if any errors happened writing to the OutputStream,
* while creating symlinks
*/
@VisibleForTesting
public void writeLaunchEnv(OutputStream out,
Map<String, String> environment, Map<Path, List<String>> resources,
List<String> command, Path logDir, String outFilename)
throws IOException {
public void writeLaunchEnv(OutputStream out, Map<String, String> environment,
Map<Path, List<String>> resources, List<String> command, Path logDir,
String user, String outFilename) throws IOException {
ContainerLaunch.ShellScriptBuilder sb =
ContainerLaunch.ShellScriptBuilder.create();
Set<String> whitelist = new HashSet<>();
@ -334,9 +335,7 @@ public abstract class ContainerExecutor implements Configurable {
if (new Path(linkName).getName().equals(WILDCARD)) {
// If this is a wildcarded path, link to everything in the
// directory from the working directory
File directory = new File(resourceEntry.getKey().toString());
for (File wildLink : directory.listFiles()) {
for (File wildLink : readDirAsUser(user, resourceEntry.getKey())) {
sb.symlink(new Path(wildLink.toString()),
new Path(wildLink.getName()));
}
@ -370,6 +369,19 @@ public abstract class ContainerExecutor implements Configurable {
}
}
/**
* Return the files in the target directory. If retrieving the list of files
* requires specific access rights, that access will happen as the
* specified user. The list will not include entries for "." or "..".
*
* @param user the user as whom to access the target directory
* @param dir the target directory
* @return a list of files in the target directory
*/
protected File[] readDirAsUser(String user, Path dir) {
return new File(dir.toString()).listFiles();
}
/**
* The container exit code.
*/

View File

@ -330,8 +330,8 @@ public class DockerContainerExecutor extends ContainerExecutor {
* the docker image and write them out to an OutputStream.
*/
public void writeLaunchEnv(OutputStream out, Map<String, String> environment,
Map<Path, List<String>> resources, List<String> command, Path logDir)
throws IOException {
Map<Path, List<String>> resources, List<String> command, Path logDir,
String user) throws IOException {
ContainerLaunch.ShellScriptBuilder sb =
ContainerLaunch.ShellScriptBuilder.create();

View File

@ -54,7 +54,6 @@ import org.apache.hadoop.yarn.server.nodemanager.executor.LocalizerStartContext;
import org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler;
import org.apache.hadoop.yarn.server.nodemanager.util.DefaultLCEResourcesHandler;
import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -644,6 +643,45 @@ public class LinuxContainerExecutor extends ContainerExecutor {
}
}
@Override
protected File[] readDirAsUser(String user, Path dir) {
List<File> files = new ArrayList<>();
PrivilegedOperation listAsUserOp = new PrivilegedOperation(
PrivilegedOperation.OperationType.LIST_AS_USER, (String)null);
String runAsUser = getRunAsUser(user);
String dirString = "";
if (dir != null) {
dirString = dir.toUri().getPath();
}
listAsUserOp.appendArgs(runAsUser, user,
Integer.toString(
PrivilegedOperation.RunAsUserCommand.LIST_AS_USER.getValue()),
dirString);
try {
PrivilegedOperationExecutor privOpExecutor =
PrivilegedOperationExecutor.getInstance(super.getConf());
String results =
privOpExecutor.executePrivilegedOperation(listAsUserOp, true);
for (String file: results.split("\n")) {
// The container-executor always dumps its log output to stdout, which
// includes 3 lines that start with "main : "
if (!file.startsWith("main :")) {
files.add(new File(new File(dirString), file));
}
}
} catch (PrivilegedOperationException e) {
LOG.error("ListAsUser for " + dir + " returned with exit code: "
+ e.getExitCode(), e);
}
return files.toArray(new File[files.size()]);
}
@Override
public boolean isContainerAlive(ContainerLivenessContext ctx)
throws IOException {

View File

@ -267,7 +267,7 @@ public class ContainerLaunch implements Callable<Integer> {
// Write out the environment
exec.writeLaunchEnv(containerScriptOutStream, environment,
localResources, launchContext.getCommands(),
new Path(containerLogDirs.get(0)));
new Path(containerLogDirs.get(0)), user);
// /////////// End of writing out container-script

View File

@ -50,7 +50,8 @@ public class PrivilegedOperation {
TC_READ_STATE("--tc-read-state"),
TC_READ_STATS("--tc-read-stats"),
ADD_PID_TO_CGROUP(""), //no CLI switch supported yet.
RUN_DOCKER_CMD("--run-docker");
RUN_DOCKER_CMD("--run-docker"),
LIST_AS_USER(""); //no CLI switch supported yet.
private final String option;
@ -146,7 +147,8 @@ public class PrivilegedOperation {
LAUNCH_CONTAINER(1),
SIGNAL_CONTAINER(2),
DELETE_AS_USER(3),
LAUNCH_DOCKER_CONTAINER(4);
LAUNCH_DOCKER_CONTAINER(4),
LIST_AS_USER(5);
private int value;
RunAsUserCommand(int value) {

View File

@ -439,7 +439,7 @@ char *concatenate(char *concat_pattern, char *return_path_name,
for (j = 0; j < numArgs; j++) {
arg = va_arg(ap, char*);
if (arg == NULL) {
fprintf(LOGFILE, "One of the arguments passed for %s in null.\n",
fprintf(LOGFILE, "One of the arguments passed for %s is null.\n",
return_path_name);
return NULL;
}
@ -1929,6 +1929,58 @@ int delete_as_user(const char *user,
return ret;
}
/**
* List the files in the given directory as the user.
* user: the user listing the files
* target_dir: the directory from which to list files
*/
int list_as_user(const char *target_dir) {
int ret = 0;
struct stat sb;
if (stat(target_dir, &sb) != 0) {
// If directory doesn't exist or can't be accessed, error out
fprintf(LOGFILE, "Could not stat %s - %s\n", target_dir,
strerror(errno));
ret = -1;
} else if (!S_ISDIR(sb.st_mode)) {
// If it's not a directory, list it as the only file
printf("%s\n", target_dir);
} else {
DIR *dir = opendir(target_dir);
if (dir != NULL) {
struct dirent *file;
errno = 0;
do {
file = readdir(dir);
// Ignore the . and .. entries
if ((file != NULL) &&
(strcmp(".", file->d_name) != 0) &&
(strcmp("..", file->d_name) != 0)) {
printf("%s\n", file->d_name);
}
} while (file != NULL);
// If we ended the directory read early on an error, then error out
if (errno != 0) {
fprintf(LOGFILE, "Could not read directory %s - %s\n", target_dir,
strerror(errno));
ret = -1;
}
} else {
fprintf(LOGFILE, "Could not open directory %s - %s\n", target_dir,
strerror(errno));
ret = -1;
}
}
return ret;
}
void chown_dir_contents(const char *dir_path, uid_t uid, gid_t gid) {
DIR *dp;
struct dirent *ep;

View File

@ -31,7 +31,8 @@ enum command {
LAUNCH_CONTAINER = 1,
SIGNAL_CONTAINER = 2,
DELETE_AS_USER = 3,
LAUNCH_DOCKER_CONTAINER = 4
LAUNCH_DOCKER_CONTAINER = 4,
LIST_AS_USER = 5
};
enum errorcodes {
@ -79,7 +80,8 @@ enum operations {
RUN_AS_USER_SIGNAL_CONTAINER = 8,
RUN_AS_USER_DELETE = 9,
RUN_AS_USER_LAUNCH_DOCKER_CONTAINER = 10,
RUN_DOCKER = 11
RUN_DOCKER = 11,
RUN_AS_USER_LIST = 12
};
#define NM_GROUP_KEY "yarn.nodemanager.linux-container-executor.group"
@ -189,6 +191,10 @@ int delete_as_user(const char *user,
const char *dir_to_be_deleted,
char* const* baseDirs);
// List the files in the given directory on stdout. The target_dir is always
// assumed to be an absolute path.
int list_as_user(const char *target_dir);
// set the uid and gid of the node manager. This is used when doing some
// priviledged operations for setting the effective uid and gid.
void set_nm_uid(uid_t user, gid_t group);

View File

@ -58,11 +58,12 @@ static void display_usage(FILE *stream) {
" launch docker container: %2d appid containerid workdir container-script " \
"tokens pidfile nm-local-dirs nm-log-dirs docker-command-file resources optional-tc-command-file\n" \
" signal container: %2d container-pid signal\n" \
" delete as user: %2d relative-path\n" ;
" delete as user: %2d relative-path\n" \
" list as user: %2d relative-path\n" ;
fprintf(stream, usage_template, INITIALIZE_CONTAINER, LAUNCH_CONTAINER, LAUNCH_DOCKER_CONTAINER,
SIGNAL_CONTAINER, DELETE_AS_USER);
SIGNAL_CONTAINER, DELETE_AS_USER, LIST_AS_USER);
}
/* Sets up log files for normal/error logging */
@ -169,20 +170,20 @@ static void assert_valid_setup(char *argv0) {
static struct {
char *cgroups_hierarchy;
char *traffic_control_command_file;
const char * run_as_user_name;
const char * yarn_user_name;
const char *run_as_user_name;
const char *yarn_user_name;
char *local_dirs;
char *log_dirs;
char *resources_key;
char *resources_value;
char **resources_values;
const char * app_id;
const char * container_id;
const char * cred_file;
const char * script_file;
const char * current_dir;
const char * pid_file;
const char *dir_to_be_deleted;
const char *app_id;
const char *container_id;
const char *cred_file;
const char *script_file;
const char *current_dir;
const char *pid_file;
const char *target_dir;
int container_pid;
int signal;
const char *docker_command_file;
@ -417,9 +418,13 @@ static int validate_run_as_user_commands(int argc, char **argv, int *operation)
return 0;
case DELETE_AS_USER:
cmd_input.dir_to_be_deleted = argv[optind++];
cmd_input.target_dir = argv[optind++];
*operation = RUN_AS_USER_DELETE;
return 0;
case LIST_AS_USER:
cmd_input.target_dir = argv[optind++];
*operation = RUN_AS_USER_LIST;
return 0;
default:
fprintf(ERRORFILE, "Invalid command %d not supported.",command);
fflush(ERRORFILE);
@ -554,9 +559,18 @@ int main(int argc, char **argv) {
}
exit_code = delete_as_user(cmd_input.yarn_user_name,
cmd_input.dir_to_be_deleted,
cmd_input.target_dir,
argv + optind);
break;
case RUN_AS_USER_LIST:
exit_code = set_user(cmd_input.run_as_user_name);
if (exit_code != 0) {
break;
}
exit_code = list_as_user(cmd_input.target_dir);
break;
}
flush_and_close_log_files();

View File

@ -454,6 +454,163 @@ void test_delete_user() {
free(app_dir);
}
/**
* Read a file and tokenize it on newlines. Place up to max lines into lines.
* The max+1st element of lines will be set to NULL.
*
* @param file the name of the file to open
* @param lines the pointer array into which to place the lines
* @param max the max number of lines to add to lines
*/
void read_lines(const char* file, char **lines, size_t max) {
char buf[4096];
size_t nread;
int fd = open(file, O_RDONLY);
if (fd < 0) {
printf("FAIL: failed to open directory listing file: %s\n", file);
exit(1);
} else {
char *cur = buf;
size_t count = sizeof buf;
while ((nread = read(fd, cur, count)) > 0) {
cur += nread;
count -= nread;
}
if (nread < 0) {
printf("FAIL: failed to read directory listing file: %s\n", file);
exit(1);
}
close(fd);
}
char* entity = strtok(buf, "\n");
int i;
for (i = 0; i < max; i++) {
if (entity == NULL) {
break;
}
lines[i] = (char *)malloc(sizeof(char) * (strlen(entity) + 1));
strcpy(lines[i], entity);
entity = strtok(NULL, "\n");
}
lines[i] = NULL;
}
void test_list_as_user() {
printf("\nTesting list_as_user\n");
char buffer[4096];
char *app_dir =
get_app_directory(TEST_ROOT "/local-1", "yarn", "app_4");
if (mkdirs(app_dir, 0700) != 0) {
printf("FAIL: unble to create application directory: %s\n", app_dir);
exit(1);
}
// Test with empty dir string
sprintf(buffer, "");
int ret = list_as_user(buffer);
if (ret == 0) {
printf("FAIL: did not fail on empty directory string\n");
exit(1);
}
// Test with a non-existent directory
sprintf(buffer, "%s/output", app_dir);
ret = list_as_user(buffer);
if (ret == 0) {
printf("FAIL: did not fail on non-existent directory\n");
exit(1);
}
// Write a couple files to list
sprintf(buffer, "%s/file1", app_dir);
if (write_config_file(buffer, 1) != 0) {
exit(1);
}
sprintf(buffer, "%s/.file2", app_dir);
if (write_config_file(buffer, 1) != 0) {
exit(1);
}
// Also create a directory
sprintf(buffer, "%s/output", app_dir);
if (mkdirs(buffer, 0700) != 0) {
exit(1);
}
// Test the regular case
// Store a copy of stdout, then redirect it to a file
sprintf(buffer, "%s/output/files", app_dir);
int oldout = dup(STDOUT_FILENO);
int fd = open(buffer, O_WRONLY | O_CREAT, S_IRUSR | S_IWUSR);
dup2(fd, STDOUT_FILENO);
// Now list the files
ret = list_as_user(app_dir);
if (ret != 0) {
printf("FAIL: unable to list files in regular case\n");
exit(1);
}
// Restore stdout
close(fd);
dup2(oldout, STDOUT_FILENO);
// Check the output -- shouldn't be more than a couple lines
char *lines[16];
read_lines(buffer, lines, 15);
int got_file1 = 0;
int got_file2 = 0;
int got_output = 0;
int i;
for (i = 0; i < sizeof lines; i++) {
if (lines[i] == NULL) {
break;
} else if (strcmp("file1", lines[i]) == 0) {
got_file1 = 1;
} else if (strcmp(".file2", lines[i]) == 0) {
got_file2 = 1;
} else if (strcmp("output", lines[i]) == 0) {
got_output = 1;
} else {
printf("FAIL: listed extraneous file: %s\n", lines[i]);
exit(1);
}
free(lines[i]);
}
if (!got_file1 || !got_file2 || !got_output) {
printf("FAIL: missing files in listing\n");
exit(1);
}
free(app_dir);
}
void run_test_in_child(const char* test_name, void (*func)()) {
printf("\nRunning test %s in child process\n", test_name);
fflush(stdout);

View File

@ -174,7 +174,8 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
new DefaultContainerExecutor()
.writeLaunchEnv(fos, env, resources, commands,
new Path(localLogDir.getAbsolutePath()), tempFile.getName());
new Path(localLogDir.getAbsolutePath()), "user",
tempFile.getName());
fos.flush();
fos.close();
FileUtil.setExecutable(tempFile, true);
@ -243,7 +244,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
}
new DefaultContainerExecutor()
.writeLaunchEnv(fos, env, resources, commands,
new Path(localLogDir.getAbsolutePath()));
new Path(localLogDir.getAbsolutePath()), "user");
fos.flush();
fos.close();
FileUtil.setExecutable(tempFile, true);
@ -298,7 +299,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
List<String> commands = new ArrayList<String>();
new DefaultContainerExecutor()
.writeLaunchEnv(fos, env, resources, commands,
new Path(localLogDir.getAbsolutePath()));
new Path(localLogDir.getAbsolutePath()), "user");
fos.flush();
fos.close();
@ -377,7 +378,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
commands.add(command);
ContainerExecutor exec = new DefaultContainerExecutor();
exec.writeLaunchEnv(fos, env, resources, commands,
new Path(localLogDir.getAbsolutePath()));
new Path(localLogDir.getAbsolutePath()), "user");
fos.flush();
fos.close();
@ -1400,7 +1401,8 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
ContainerExecutor exec = new DefaultContainerExecutor();
exec.setConf(conf);
exec.writeLaunchEnv(fos, env, resources, commands,
new Path(localLogDir.getAbsolutePath()), tempFile.getName());
new Path(localLogDir.getAbsolutePath()), "user",
tempFile.getName());
fos.flush();
fos.close();
FileUtil.setExecutable(tempFile, true);