MAPREDUCE-2747. Cleaned up LinuxContainerExecutor binary sources and changed the configuration to use yarn names. Contributed by Robert Joseph Evans.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1188236 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-10-24 17:09:37 +00:00
parent 4086566144
commit 7ce1c4ab35
14 changed files with 497 additions and 476 deletions

View File

@ -426,6 +426,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-3239. Use new createSocketAddr API in MRv2 to give better MAPREDUCE-3239. Use new createSocketAddr API in MRv2 to give better
error messages on misconfig (Todd Lipcon via mahadev) error messages on misconfig (Todd Lipcon via mahadev)
MAPREDUCE-2747. Cleaned up LinuxContainerExecutor binary sources and changed
the configuration to use yarn names. (Robert Joseph Evans via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and MAPREDUCE-2026. Make JobTracker.getJobCounters() and

View File

@ -0,0 +1,3 @@
yarn.nodemanager.local-dirs=#configured value of yarn.nodemanager.local-dirs. It can be a list of comma separated paths.
yarn.nodemanager.log-dirs=#configured value of yarn.nodemanager.log-dirs.
yarn.nodemanager.linux-container-executor.group=#configured value of yarn.nodemanager.linux-container-executor.group

View File

@ -1,3 +0,0 @@
mapreduce.cluster.local.dir=#configured value of mapreduce.cluster.local.dir. It can be a list of comma separated paths.
hadoop.log.dir=#configured value of hadoop.log.dir.
mapreduce.tasktracker.group=#configured value of mapreduce.tasktracker.group

View File

@ -387,8 +387,9 @@ FIXME: do we need to do any of this stuff? (guessing not since not in own JVM)
/* FIXME: may not need renameMapOutputForReduce() anymore? TEST! /* FIXME: may not need renameMapOutputForReduce() anymore? TEST!
${local.dir}/usercache/$user/appcache/$appId/$contId/ == $cwd for tasks; ${local.dir}/usercache/$user/appcache/$appId/$contId/ == $cwd for containers;
contains task.sh script, which, when executed, creates symlinks and sets up env contains launch_container.sh script, which, when executed, creates symlinks and
sets up env
"$local.dir"/usercache/$user/appcache/$appId/$contId/file.out "$local.dir"/usercache/$user/appcache/$appId/$contId/file.out
"$local.dir"/usercache/$user/appcache/$appId/$contId/file.out.idx (?) "$local.dir"/usercache/$user/appcache/$appId/$contId/file.out.idx (?)
"$local.dir"/usercache/$user/appcache/$appId/output/$taskId/ is where file.out* is moved after MapTask done "$local.dir"/usercache/$user/appcache/$appId/output/$taskId/ is where file.out* is moved after MapTask done

View File

@ -341,6 +341,13 @@ public class YarnConfiguration extends Configuration {
public static final String NM_LINUX_CONTAINER_EXECUTOR_PATH = public static final String NM_LINUX_CONTAINER_EXECUTOR_PATH =
NM_PREFIX + "linux-container-executor.path"; NM_PREFIX + "linux-container-executor.path";
/**
* The UNIX group that the linux-container-executor should run as.
* This is intended to be set as part of container-executor.cfg.
*/
public static final String NM_LINUX_CONTAINER_GROUP =
NM_PREFIX + "linux-container-executor.group";
/** T-file compression types used to compress aggregated logs.*/ /** T-file compression types used to compress aggregated logs.*/
public static final String NM_LOG_AGG_COMPRESSION_TYPE = public static final String NM_LOG_AGG_COMPRESSION_TYPE =
NM_PREFIX + "log-aggregation.compression-type"; NM_PREFIX + "log-aggregation.compression-type";

View File

@ -56,11 +56,10 @@ public class LinuxContainerExecutor extends ContainerExecutor {
* List of commands that the setuid script will execute. * List of commands that the setuid script will execute.
*/ */
enum Commands { enum Commands {
INITIALIZE_JOB(0), INITIALIZE_CONTAINER(0),
LAUNCH_CONTAINER(1), LAUNCH_CONTAINER(1),
SIGNAL_CONTAINER(2), SIGNAL_CONTAINER(2),
DELETE_AS_USER(3), DELETE_AS_USER(3);
DELETE_LOG_AS_USER(4);
private int value; private int value;
Commands(int value) { Commands(int value) {
@ -78,8 +77,9 @@ public class LinuxContainerExecutor extends ContainerExecutor {
enum ResultCode { enum ResultCode {
OK(0), OK(0),
INVALID_USER_NAME(2), INVALID_USER_NAME(2),
INVALID_TASK_PID(9), UNABLE_TO_EXECUTE_CONTAINER_SCRIPT(7),
INVALID_TASKCONTROLLER_PERMISSIONS(22), INVALID_CONTAINER_PID(9),
INVALID_CONTAINER_EXEC_PERMISSIONS(22),
INVALID_CONFIG_FILE(24); INVALID_CONFIG_FILE(24);
private final int value; private final int value;
@ -107,7 +107,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
List<String> command = new ArrayList<String>( List<String> command = new ArrayList<String>(
Arrays.asList(containerExecutorExe, Arrays.asList(containerExecutorExe,
user, user,
Integer.toString(Commands.INITIALIZE_JOB.getValue()), Integer.toString(Commands.INITIALIZE_CONTAINER.getValue()),
appId, appId,
nmPrivateContainerTokensPath.toUri().getPath().toString())); nmPrivateContainerTokensPath.toUri().getPath().toString()));
File jvm = // use same jvm as parent File jvm = // use same jvm as parent
@ -220,7 +220,7 @@ public class LinuxContainerExecutor extends ContainerExecutor {
shExec.execute(); shExec.execute();
} catch (ExitCodeException e) { } catch (ExitCodeException e) {
int ret_code = shExec.getExitCode(); int ret_code = shExec.getExitCode();
if (ret_code == ResultCode.INVALID_TASK_PID.getValue()) { if (ret_code == ResultCode.INVALID_CONTAINER_PID.getValue()) {
return false; return false;
} }
logOutput(shExec.getOutput()); logOutput(shExec.getOutput());

View File

@ -62,7 +62,7 @@ public class ContainerLaunch implements Callable<Integer> {
private static final Log LOG = LogFactory.getLog(ContainerLaunch.class); private static final Log LOG = LogFactory.getLog(ContainerLaunch.class);
public static final String CONTAINER_SCRIPT = "task.sh"; public static final String CONTAINER_SCRIPT = "launch_container.sh";
public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens"; public static final String FINAL_CONTAINER_TOKENS_FILE = "container_tokens";
private final Dispatcher dispatcher; private final Dispatcher dispatcher;

View File

@ -18,9 +18,9 @@ AM_CFLAGS=-I$(srcdir)/impl -Wall -g -Werror
# Define the programs that need to be built # Define the programs that need to be built
bin_PROGRAMS = container-executor bin_PROGRAMS = container-executor
check_PROGRAMS = test-task-controller check_PROGRAMS = test-container-executor
TESTS = test-task-controller TESTS = test-container-executor
# Define the sources for the common files # Define the sources for the common files
common_SOURCES = impl/configuration.c impl/container-executor.c common_SOURCES = impl/configuration.c impl/container-executor.c
@ -29,4 +29,4 @@ common_SOURCES = impl/configuration.c impl/container-executor.c
container_executor_SOURCES = $(common_SOURCES) impl/main.c container_executor_SOURCES = $(common_SOURCES) impl/main.c
# Define the sources for the test executable # Define the sources for the test executable
test_task_controller_SOURCES = $(common_SOURCES) test/test-task-controller.c test_container_executor_SOURCES = $(common_SOURCES) test/test-container-executor.c

View File

@ -69,16 +69,16 @@ void free_configurations() {
static int is_only_root_writable(const char *file) { static int is_only_root_writable(const char *file) {
struct stat file_stat; struct stat file_stat;
if (stat(file, &file_stat) != 0) { if (stat(file, &file_stat) != 0) {
fprintf(LOGFILE, "Can't stat file %s - %s\n", file, strerror(errno)); fprintf(ERRORFILE, "Can't stat file %s - %s\n", file, strerror(errno));
return 0; return 0;
} }
if (file_stat.st_uid != 0) { if (file_stat.st_uid != 0) {
fprintf(LOGFILE, "File %s must be owned by root, but is owned by %d\n", fprintf(ERRORFILE, "File %s must be owned by root, but is owned by %d\n",
file, file_stat.st_uid); file, file_stat.st_uid);
return 0; return 0;
} }
if ((file_stat.st_mode & (S_IWGRP | S_IWOTH)) != 0) { if ((file_stat.st_mode & (S_IWGRP | S_IWOTH)) != 0) {
fprintf(LOGFILE, fprintf(ERRORFILE,
"File %s must not be world or group writable, but is %03o\n", "File %s must not be world or group writable, but is %03o\n",
file, file_stat.st_mode & (~S_IFMT)); file, file_stat.st_mode & (~S_IFMT));
return 0; return 0;
@ -109,7 +109,6 @@ int check_configuration_permissions(const char* file_name) {
//function used to load the configurations present in the secure config //function used to load the configurations present in the secure config
void read_config(const char* file_name) { void read_config(const char* file_name) {
fprintf(LOGFILE, "Reading task controller config from %s\n" , file_name);
FILE *conf_file; FILE *conf_file;
char *line; char *line;
char *equaltok; char *equaltok;
@ -118,7 +117,7 @@ void read_config(const char* file_name) {
int size_read = 0; int size_read = 0;
if (file_name == NULL) { if (file_name == NULL) {
fprintf(LOGFILE, "Null configuration filename passed in\n"); fprintf(ERRORFILE, "Null configuration filename passed in\n");
exit(INVALID_CONFIG_FILE); exit(INVALID_CONFIG_FILE);
} }
@ -132,30 +131,33 @@ void read_config(const char* file_name) {
config.size = 0; config.size = 0;
conf_file = fopen(file_name, "r"); conf_file = fopen(file_name, "r");
if (conf_file == NULL) { if (conf_file == NULL) {
fprintf(LOGFILE, "Invalid conf file provided : %s \n", file_name); fprintf(ERRORFILE, "Invalid conf file provided : %s \n", file_name);
exit(INVALID_CONFIG_FILE); exit(INVALID_CONFIG_FILE);
} }
while(!feof(conf_file)) { while(!feof(conf_file)) {
line = (char *) malloc(linesize); line = (char *) malloc(linesize);
if(line == NULL) { if(line == NULL) {
fprintf(LOGFILE, "malloc failed while reading configuration file.\n"); fprintf(ERRORFILE, "malloc failed while reading configuration file.\n");
exit(OUT_OF_MEMORY); exit(OUT_OF_MEMORY);
} }
size_read = getline(&line,&linesize,conf_file); size_read = getline(&line,&linesize,conf_file);
//feof returns true only after we read past EOF. //feof returns true only after we read past EOF.
//so a file with no new line, at last can reach this place //so a file with no new line, at last can reach this place
//if size_read returns negative check for eof condition //if size_read returns negative check for eof condition
if (size_read == -1) { if (size_read == -1) {
free(line);
if(!feof(conf_file)){ if(!feof(conf_file)){
fprintf(LOGFILE, "getline returned error.\n");
exit(INVALID_CONFIG_FILE); exit(INVALID_CONFIG_FILE);
}else { } else {
free(line);
break; break;
} }
} }
//trim the ending new line int eol = strlen(line) - 1;
line[strlen(line)-1] = '\0'; if(line[eol] == '\n') {
//trim the ending new line
line[eol] = '\0';
}
//comment line //comment line
if(line[0] == '#') { if(line[0] == '#') {
free(line); free(line);
@ -217,14 +219,15 @@ void read_config(const char* file_name) {
config.size++; config.size++;
free(line); free(line);
} }
//close the file //close the file
fclose(conf_file); fclose(conf_file);
if (config.size == 0) { if (config.size == 0) {
fprintf(LOGFILE, "Invalid configuration provided in %s\n", file_name); fprintf(ERRORFILE, "Invalid configuration provided in %s\n", file_name);
exit(INVALID_CONFIG_FILE); exit(INVALID_CONFIG_FILE);
} }
//clean up allocated file name //clean up allocated file name
return; return;
//free spaces alloced. //free spaces alloced.

View File

@ -40,13 +40,14 @@ static const char* DEFAULT_BANNED_USERS[] = {"mapred", "hdfs", "bin", 0};
struct passwd *user_detail = NULL; struct passwd *user_detail = NULL;
FILE* LOGFILE = NULL; FILE* LOGFILE = NULL;
FILE* ERRORFILE = NULL;
static uid_t tt_uid = -1; static uid_t nm_uid = -1;
static gid_t tt_gid = -1; static gid_t nm_gid = -1;
void set_tasktracker_uid(uid_t user, gid_t group) { void set_nm_uid(uid_t user, gid_t group) {
tt_uid = user; nm_uid = user;
tt_gid = group; nm_gid = group;
} }
/** /**
@ -58,11 +59,11 @@ char* get_executable() {
char *filename = malloc(PATH_MAX); char *filename = malloc(PATH_MAX);
ssize_t len = readlink(buffer, filename, PATH_MAX); ssize_t len = readlink(buffer, filename, PATH_MAX);
if (len == -1) { if (len == -1) {
fprintf(stderr, "Can't get executable name from %s - %s\n", buffer, fprintf(ERRORFILE, "Can't get executable name from %s - %s\n", buffer,
strerror(errno)); strerror(errno));
exit(-1); exit(-1);
} else if (len >= PATH_MAX) { } else if (len >= PATH_MAX) {
fprintf(LOGFILE, "Executable name %.*s is longer than %d characters.\n", fprintf(ERRORFILE, "Executable name %.*s is longer than %d characters.\n",
PATH_MAX, filename, PATH_MAX); PATH_MAX, filename, PATH_MAX);
exit(-1); exit(-1);
} }
@ -70,20 +71,12 @@ char* get_executable() {
return filename; return filename;
} }
/** int check_executor_permissions(char *executable_file) {
* Check the permissions on taskcontroller to make sure that security is
* promisable. For this, we need container-executor binary to
* * be user-owned by root
* * be group-owned by a configured special group.
* * others do not have any permissions
* * be setuid/setgid
*/
int check_taskcontroller_permissions(char *executable_file) {
errno = 0; errno = 0;
char * resolved_path = realpath(executable_file, NULL); char * resolved_path = realpath(executable_file, NULL);
if (resolved_path == NULL) { if (resolved_path == NULL) {
fprintf(LOGFILE, fprintf(ERRORFILE,
"Error resolving the canonical name for the executable : %s!", "Error resolving the canonical name for the executable : %s!",
strerror(errno)); strerror(errno));
return -1; return -1;
@ -92,7 +85,7 @@ int check_taskcontroller_permissions(char *executable_file) {
struct stat filestat; struct stat filestat;
errno = 0; errno = 0;
if (stat(resolved_path, &filestat) != 0) { if (stat(resolved_path, &filestat) != 0) {
fprintf(LOGFILE, fprintf(ERRORFILE,
"Could not stat the executable : %s!.\n", strerror(errno)); "Could not stat the executable : %s!.\n", strerror(errno));
return -1; return -1;
} }
@ -108,7 +101,7 @@ int check_taskcontroller_permissions(char *executable_file) {
} }
if (binary_gid != getgid()) { if (binary_gid != getgid()) {
fprintf(LOGFILE, "The configured tasktracker group %d is different from" fprintf(LOGFILE, "The configured nodemanager group %d is different from"
" the group of the executable %d\n", getgid(), binary_gid); " the group of the executable %d\n", getgid(), binary_gid);
return -1; return -1;
} }
@ -223,62 +216,45 @@ char *concatenate(char *concat_pattern, char *return_path_name,
} }
/** /**
* Get the job-directory path from tt_root, user name and job-id * Get the app-directory path from nm_root, user name and app-id
*/ */
char *get_job_directory(const char * tt_root, const char *user, char *get_app_directory(const char * nm_root, const char *user,
const char *jobid) { const char *app_id) {
return concatenate(TT_JOB_DIR_PATTERN, "job_dir_path", 3, tt_root, user, return concatenate(NM_APP_DIR_PATTERN, "app_dir_path", 3, nm_root, user,
jobid); app_id);
} }
/** /**
* Get the user directory of a particular user * Get the user directory of a particular user
*/ */
char *get_user_directory(const char *tt_root, const char *user) { char *get_user_directory(const char *nm_root, const char *user) {
return concatenate(USER_DIR_PATTERN, "user_dir_path", 2, tt_root, user); return concatenate(USER_DIR_PATTERN, "user_dir_path", 2, nm_root, user);
}
char *get_job_work_directory(const char *job_dir) {
return concatenate("%s/work", "job work", 1, job_dir);
} }
/** /**
* Get the attempt directory for the given attempt_id * Get the container directory for the given container_id
*/ */
char *get_attempt_work_directory(const char *tt_root, const char *user, char *get_container_work_directory(const char *nm_root, const char *user,
const char *job_id, const char *attempt_id) { const char *app_id, const char *container_id) {
return concatenate(ATTEMPT_DIR_PATTERN, "attempt_dir_path", 4, return concatenate(CONTAINER_DIR_PATTERN, "container_dir_path", 4,
tt_root, user, job_id, attempt_id); nm_root, user, app_id, container_id);
} }
char *get_task_launcher_file(const char* work_dir) { char *get_container_launcher_file(const char* work_dir) {
return concatenate("%s/%s", "task launcher", 2, work_dir, TASK_SCRIPT); return concatenate("%s/%s", "container launcher", 2, work_dir, CONTAINER_SCRIPT);
} }
char *get_task_credentials_file(const char* work_dir) { char *get_container_credentials_file(const char* work_dir) {
return concatenate("%s/%s", "task crednetials", 2, work_dir, return concatenate("%s/%s", "container credentials", 2, work_dir,
CREDENTIALS_FILENAME); CREDENTIALS_FILENAME);
} }
/** /**
* Get the job log directory under the given log_root * Get the app log directory under the given log_root
*/ */
char* get_job_log_directory(const char *log_root, const char* jobid) { char* get_app_log_directory(const char *log_root, const char* app_id) {
return concatenate("%s/%s", "job log dir", 2, log_root, return concatenate("%s/%s", "app log dir", 2, log_root,
jobid); app_id);
}
/*
* Get a user subdirectory.
*/
char *get_user_subdirectory(const char *tt_root,
const char *user,
const char *subdir) {
char * user_dir = get_user_directory(tt_root, user);
char * result = concatenate("%s/%s", "user subdir", 2,
user_dir, subdir);
free(user_dir);
return result;
} }
/** /**
@ -320,43 +296,42 @@ int mkdirs(const char* path, mode_t perm) {
} }
/** /**
* Function to prepare the attempt directories for the task JVM. * Function to prepare the container directories.
* It creates the task work and log directories. * It creates the container work and log directories.
*/ */
static int create_attempt_directories(const char* user, const char *job_id, static int create_container_directories(const char* user, const char *app_id,
const char *task_id) { const char *container_id) {
// create dirs as 0750 // create dirs as 0750
const mode_t perms = S_IRWXU | S_IRGRP | S_IXGRP; const mode_t perms = S_IRWXU | S_IRGRP | S_IXGRP;
if (job_id == NULL || task_id == NULL || user == NULL) { if (app_id == NULL || container_id == NULL || user == NULL) {
fprintf(LOGFILE, fprintf(LOGFILE,
"Either task_id is null or the user passed is null.\n"); "Either app_id, container_id or the user passed is null.\n");
return -1; return -1;
} }
int result = -1; int result = -1;
char **local_dir = get_values(TT_SYS_DIR_KEY); char **local_dir = get_values(NM_SYS_DIR_KEY);
if (local_dir == NULL) { if (local_dir == NULL) {
fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY); fprintf(LOGFILE, "%s is not configured.\n", NM_SYS_DIR_KEY);
return -1; return -1;
} }
char **local_dir_ptr; char **local_dir_ptr;
for(local_dir_ptr = local_dir; *local_dir_ptr != NULL; ++local_dir_ptr) { for(local_dir_ptr = local_dir; *local_dir_ptr != NULL; ++local_dir_ptr) {
char *task_dir = get_attempt_work_directory(*local_dir_ptr, user, job_id, char *container_dir = get_container_work_directory(*local_dir_ptr, user, app_id,
task_id); container_id);
if (task_dir == NULL) { if (container_dir == NULL) {
free_values(local_dir); free_values(local_dir);
return -1; return -1;
} }
if (mkdirs(task_dir, perms) != 0) { if (mkdirs(container_dir, perms) == 0) {
// continue on to create other task directories
free(task_dir);
} else {
result = 0; result = 0;
free(task_dir);
} }
// continue on to create other work directories
free(container_dir);
} }
free_values(local_dir); free_values(local_dir);
if (result != 0) { if (result != 0) {
@ -364,34 +339,36 @@ static int create_attempt_directories(const char* user, const char *job_id,
} }
result = -1; result = -1;
// also make the directory for the task logs // also make the directory for the container logs
char *job_task_name = malloc(strlen(job_id) + strlen(task_id) + 2); char *combined_name = malloc(strlen(app_id) + strlen(container_id) + 2);
if (job_task_name == NULL) { if (combined_name == NULL) {
fprintf(LOGFILE, "Malloc of job task name failed\n"); fprintf(LOGFILE, "Malloc of combined name failed\n");
result = -1; result = -1;
} else { } else {
sprintf(job_task_name, "%s/%s", job_id, task_id); sprintf(combined_name, "%s/%s", app_id, container_id);
char **log_dir = get_values(TT_LOG_DIR_KEY); char **log_dir = get_values(NM_LOG_DIR_KEY);
if (log_dir == NULL) { if (log_dir == NULL) {
fprintf(LOGFILE, "%s is not configured.\n", TT_LOG_DIR_KEY); free(combined_name);
fprintf(LOGFILE, "%s is not configured.\n", NM_LOG_DIR_KEY);
return -1; return -1;
} }
char **log_dir_ptr; char **log_dir_ptr;
for(log_dir_ptr = log_dir; *log_dir_ptr != NULL; ++log_dir_ptr) { for(log_dir_ptr = log_dir; *log_dir_ptr != NULL; ++log_dir_ptr) {
char *job_log_dir = get_job_log_directory(*log_dir_ptr, job_task_name); char *container_log_dir = get_app_log_directory(*log_dir_ptr, combined_name);
if (job_log_dir == NULL) { if (container_log_dir == NULL) {
free(combined_name);
free_values(log_dir); free_values(log_dir);
return -1; return -1;
} else if (mkdirs(job_log_dir, perms) != 0) { } else if (mkdirs(container_log_dir, perms) != 0) {
free(job_log_dir); free(container_log_dir);
} else { } else {
result = 0; result = 0;
free(job_log_dir); free(container_log_dir);
} }
} }
free(job_task_name); free(combined_name);
free_values(log_dir); free_values(log_dir);
} }
return result; return result;
@ -461,11 +438,14 @@ struct passwd* check_user(const char *user) {
for(; *banned_user; ++banned_user) { for(; *banned_user; ++banned_user) {
if (strcmp(*banned_user, user) == 0) { if (strcmp(*banned_user, user) == 0) {
free(user_info); free(user_info);
if (banned_users != (char**)DEFAULT_BANNED_USERS) {
free_values(banned_users);
}
fprintf(LOGFILE, "Requested user %s is banned\n", user); fprintf(LOGFILE, "Requested user %s is banned\n", user);
return NULL; return NULL;
} }
} }
if (banned_users != NULL) { if (banned_users != NULL && banned_users != (char**)DEFAULT_BANNED_USERS) {
free_values(banned_users); free_values(banned_users);
} }
return user_info; return user_info;
@ -512,7 +492,7 @@ static int change_owner(const char* path, uid_t user, gid_t group) {
* Create a top level directory for the user. * Create a top level directory for the user.
* It assumes that the parent directory is *not* writable by the user. * It assumes that the parent directory is *not* writable by the user.
* It creates directories with 02750 permissions owned by the user * It creates directories with 02750 permissions owned by the user
* and with the group set to the task tracker group. * and with the group set to the node manager group.
* return non-0 on failure * return non-0 on failure
*/ */
int create_directory_for_user(const char* path) { int create_directory_for_user(const char* path) {
@ -524,7 +504,7 @@ int create_directory_for_user(const char* path) {
int ret = 0; int ret = 0;
if(getuid() == root) { if(getuid() == root) {
ret = change_effective_user(root, tt_gid); ret = change_effective_user(root, nm_gid);
} }
if (ret == 0) { if (ret == 0) {
@ -534,8 +514,8 @@ int create_directory_for_user(const char* path) {
fprintf(LOGFILE, "Can't chmod %s to add the sticky bit - %s\n", fprintf(LOGFILE, "Can't chmod %s to add the sticky bit - %s\n",
path, strerror(errno)); path, strerror(errno));
ret = -1; ret = -1;
} else if (change_owner(path, user, tt_gid) != 0) { } else if (change_owner(path, user, nm_gid) != 0) {
fprintf(LOGFILE, "Failed to chown %s to %d:%d: %s\n", path, user, tt_gid, fprintf(LOGFILE, "Failed to chown %s to %d:%d: %s\n", path, user, nm_gid,
strerror(errno)); strerror(errno));
ret = -1; ret = -1;
} }
@ -554,18 +534,18 @@ int create_directory_for_user(const char* path) {
} }
/** /**
* Open a file as the tasktracker and return a file descriptor for it. * Open a file as the node manager and return a file descriptor for it.
* Returns -1 on error * Returns -1 on error
*/ */
static int open_file_as_task_tracker(const char* filename) { static int open_file_as_nm(const char* filename) {
uid_t user = geteuid(); uid_t user = geteuid();
gid_t group = getegid(); gid_t group = getegid();
if (change_effective_user(tt_uid, tt_gid) != 0) { if (change_effective_user(nm_uid, nm_gid) != 0) {
return -1; return -1;
} }
int result = open(filename, O_RDONLY); int result = open(filename, O_RDONLY);
if (result == -1) { if (result == -1) {
fprintf(LOGFILE, "Can't open file %s as task tracker - %s\n", filename, fprintf(LOGFILE, "Can't open file %s as node manager - %s\n", filename,
strerror(errno)); strerror(errno));
} }
if (change_effective_user(user, group)) { if (change_effective_user(user, group)) {
@ -624,10 +604,10 @@ static int copy_file(int input, const char* in_filename,
* Function to initialize the user directories of a user. * Function to initialize the user directories of a user.
*/ */
int initialize_user(const char *user) { int initialize_user(const char *user) {
char **local_dir = get_values(TT_SYS_DIR_KEY); char **local_dir = get_values(NM_SYS_DIR_KEY);
if (local_dir == NULL) { if (local_dir == NULL) {
fprintf(LOGFILE, "%s is not configured.\n", TT_SYS_DIR_KEY); fprintf(LOGFILE, "%s is not configured.\n", NM_SYS_DIR_KEY);
return INVALID_TT_ROOT; return INVALID_NM_ROOT_DIRS;
} }
char *user_dir; char *user_dir;
@ -650,12 +630,12 @@ int initialize_user(const char *user) {
} }
/** /**
* Function to prepare the job directories for the task JVM. * Function to prepare the application directories for the container.
*/ */
int initialize_job(const char *user, const char *jobid, int initialize_app(const char *user, const char *app_id,
const char* nmPrivate_credentials_file, char* const* args) { const char* nmPrivate_credentials_file, char* const* args) {
if (jobid == NULL || user == NULL) { if (app_id == NULL || user == NULL) {
fprintf(LOGFILE, "Either jobid is null or the user passed is null.\n"); fprintf(LOGFILE, "Either app_id is null or the user passed is null.\n");
return INVALID_ARGUMENT_NUMBER; return INVALID_ARGUMENT_NUMBER;
} }
@ -666,35 +646,35 @@ int initialize_job(const char *user, const char *jobid,
} }
////////////// create the log directories for the app on all disks ////////////// create the log directories for the app on all disks
char **log_roots = get_values(TT_LOG_DIR_KEY); char **log_roots = get_values(NM_LOG_DIR_KEY);
if (log_roots == NULL) { if (log_roots == NULL) {
return INVALID_CONFIG_FILE; return INVALID_CONFIG_FILE;
} }
char **log_root; char **log_root;
char *any_one_job_log_dir = NULL; char *any_one_app_log_dir = NULL;
for(log_root=log_roots; *log_root != NULL; ++log_root) { for(log_root=log_roots; *log_root != NULL; ++log_root) {
char *job_log_dir = get_job_log_directory(*log_root, jobid); char *app_log_dir = get_app_log_directory(*log_root, app_id);
if (job_log_dir == NULL) { if (app_log_dir == NULL) {
// try the next one // try the next one
} else if (create_directory_for_user(job_log_dir) != 0) { } else if (create_directory_for_user(app_log_dir) != 0) {
free(job_log_dir); free(app_log_dir);
return -1; return -1;
} else if (any_one_job_log_dir == NULL) { } else if (any_one_app_log_dir == NULL) {
any_one_job_log_dir = job_log_dir; any_one_app_log_dir = app_log_dir;
} else { } else {
free(job_log_dir); free(app_log_dir);
} }
} }
free_values(log_roots); free_values(log_roots);
if (any_one_job_log_dir == NULL) { if (any_one_app_log_dir == NULL) {
fprintf(LOGFILE, "Did not create any job-log directories\n"); fprintf(LOGFILE, "Did not create any app-log directories\n");
return -1; return -1;
} }
free(any_one_job_log_dir); free(any_one_app_log_dir);
////////////// End of creating the log directories for the app on all disks ////////////// End of creating the log directories for the app on all disks
// open up the credentials file // open up the credentials file
int cred_file = open_file_as_task_tracker(nmPrivate_credentials_file); int cred_file = open_file_as_nm(nmPrivate_credentials_file);
if (cred_file == -1) { if (cred_file == -1) {
return -1; return -1;
} }
@ -706,29 +686,29 @@ int initialize_job(const char *user, const char *jobid,
// 750 // 750
mode_t permissions = S_IRWXU | S_IRGRP | S_IXGRP; mode_t permissions = S_IRWXU | S_IRGRP | S_IXGRP;
char **tt_roots = get_values(TT_SYS_DIR_KEY); char **nm_roots = get_values(NM_SYS_DIR_KEY);
if (tt_roots == NULL) { if (nm_roots == NULL) {
return INVALID_CONFIG_FILE; return INVALID_CONFIG_FILE;
} }
char **tt_root; char **nm_root;
char *primary_job_dir = NULL; char *primary_app_dir = NULL;
for(tt_root=tt_roots; *tt_root != NULL; ++tt_root) { for(nm_root=nm_roots; *nm_root != NULL; ++nm_root) {
char *job_dir = get_job_directory(*tt_root, user, jobid); char *app_dir = get_app_directory(*nm_root, user, app_id);
if (job_dir == NULL) { if (app_dir == NULL) {
// try the next one // try the next one
} else if (mkdirs(job_dir, permissions) != 0) { } else if (mkdirs(app_dir, permissions) != 0) {
free(job_dir); free(app_dir);
} else if (primary_job_dir == NULL) { } else if (primary_app_dir == NULL) {
primary_job_dir = job_dir; primary_app_dir = app_dir;
} else { } else {
free(job_dir); free(app_dir);
} }
} }
free_values(tt_roots); free_values(nm_roots);
if (primary_job_dir == NULL) { if (primary_app_dir == NULL) {
fprintf(LOGFILE, "Did not create any job directories\n"); fprintf(LOGFILE, "Did not create any app directories\n");
return -1; return -1;
} }
@ -736,7 +716,7 @@ int initialize_job(const char *user, const char *jobid,
// TODO: FIXME. The user's copy of creds should go to a path selected by // TODO: FIXME. The user's copy of creds should go to a path selected by
// localDirAllocatoir // localDirAllocatoir
char *cred_file_name = concatenate("%s/%s", "cred file", 2, char *cred_file_name = concatenate("%s/%s", "cred file", 2,
primary_job_dir, basename(nmPrivate_credentials_file_copy)); primary_app_dir, basename(nmPrivate_credentials_file_copy));
if (cred_file_name == NULL) { if (cred_file_name == NULL) {
free(nmPrivate_credentials_file_copy); free(nmPrivate_credentials_file_copy);
return -1; return -1;
@ -754,50 +734,44 @@ int initialize_job(const char *user, const char *jobid,
if (LOGFILE != stdout) { if (LOGFILE != stdout) {
fclose(stdout); fclose(stdout);
} }
fclose(stderr); if (ERRORFILE != stderr) {
if (chdir(primary_job_dir) != 0) { fclose(stderr);
fprintf(LOGFILE, "Failed to chdir to job dir - %s\n", strerror(errno)); }
if (chdir(primary_app_dir) != 0) {
fprintf(LOGFILE, "Failed to chdir to app dir - %s\n", strerror(errno));
return -1; return -1;
} }
execvp(args[0], args); execvp(args[0], args);
fprintf(LOGFILE, "Failure to exec job initialization process - %s\n", fprintf(ERRORFILE, "Failure to exec app initialization process - %s\n",
strerror(errno)); strerror(errno));
return -1; return -1;
} }
/* int launch_container_as_user(const char *user, const char *app_id,
* Function used to launch a task as the provided user. It does the following : const char *container_id, const char *work_dir,
* 1) Creates attempt work dir and log dir to be accessible by the child
* 2) Copies the script file from the TT to the work directory
* 3) Sets up the environment
* 4) Does an execlp on the same in order to replace the current image with
* task image.
*/
int run_task_as_user(const char *user, const char *job_id,
const char *task_id, const char *work_dir,
const char *script_name, const char *cred_file) { const char *script_name, const char *cred_file) {
int exit_code = -1; int exit_code = -1;
char *script_file_dest = NULL; char *script_file_dest = NULL;
char *cred_file_dest = NULL; char *cred_file_dest = NULL;
script_file_dest = get_task_launcher_file(work_dir); script_file_dest = get_container_launcher_file(work_dir);
if (script_file_dest == NULL) { if (script_file_dest == NULL) {
exit_code = OUT_OF_MEMORY; exit_code = OUT_OF_MEMORY;
goto cleanup; goto cleanup;
} }
cred_file_dest = get_task_credentials_file(work_dir); cred_file_dest = get_container_credentials_file(work_dir);
if (NULL == cred_file_dest) { if (NULL == cred_file_dest) {
exit_code = OUT_OF_MEMORY; exit_code = OUT_OF_MEMORY;
goto cleanup; goto cleanup;
} }
// open launch script // open launch script
int task_file_source = open_file_as_task_tracker(script_name); int container_file_source = open_file_as_nm(script_name);
if (task_file_source == -1) { if (container_file_source == -1) {
goto cleanup; goto cleanup;
} }
// open credentials // open credentials
int cred_file_source = open_file_as_task_tracker(cred_file); int cred_file_source = open_file_as_nm(cred_file);
if (cred_file_source == -1) { if (cred_file_source == -1) {
goto cleanup; goto cleanup;
} }
@ -808,13 +782,13 @@ int run_task_as_user(const char *user, const char *job_id,
goto cleanup; goto cleanup;
} }
if (create_attempt_directories(user, job_id, task_id) != 0) { if (create_container_directories(user, app_id, container_id) != 0) {
fprintf(LOGFILE, "Could not create attempt dirs"); fprintf(LOGFILE, "Could not create container dirs");
goto cleanup; goto cleanup;
} }
// 700 // 700
if (copy_file(task_file_source, script_name, script_file_dest,S_IRWXU) != 0) { if (copy_file(container_file_source, script_name, script_file_dest,S_IRWXU) != 0) {
goto cleanup; goto cleanup;
} }
@ -832,9 +806,9 @@ int run_task_as_user(const char *user, const char *job_id,
goto cleanup; goto cleanup;
} }
if (execlp(script_file_dest, script_file_dest, NULL) != 0) { if (execlp(script_file_dest, script_file_dest, NULL) != 0) {
fprintf(LOGFILE, "Couldn't execute the task jvm file %s - %s", fprintf(LOGFILE, "Couldn't execute the container launch file %s - %s",
script_file_dest, strerror(errno)); script_file_dest, strerror(errno));
exit_code = UNABLE_TO_EXECUTE_TASK_SCRIPT; exit_code = UNABLE_TO_EXECUTE_CONTAINER_SCRIPT;
goto cleanup; goto cleanup;
} }
exit_code = 0; exit_code = 0;
@ -845,14 +819,9 @@ int run_task_as_user(const char *user, const char *job_id,
return exit_code; return exit_code;
} }
/** int signal_container_as_user(const char *user, int pid, int sig) {
* Function used to signal a task launched by the user.
* The function sends appropriate signal to the process group
* specified by the task_pid.
*/
int signal_user_task(const char *user, int pid, int sig) {
if(pid <= 0) { if(pid <= 0) {
return INVALID_TASK_PID; return INVALID_CONTAINER_PID;
} }
if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) { if (change_user(user_detail->pw_uid, user_detail->pw_gid) != 0) {
@ -864,9 +833,9 @@ int signal_user_task(const char *user, int pid, int sig) {
if (kill(-pid,0) < 0) { if (kill(-pid,0) < 0) {
if (kill(pid, 0) < 0) { if (kill(pid, 0) < 0) {
if (errno == ESRCH) { if (errno == ESRCH) {
return INVALID_TASK_PID; return INVALID_CONTAINER_PID;
} }
fprintf(LOGFILE, "Error signalling task %d with %d - %s\n", fprintf(LOGFILE, "Error signalling container %d with %d - %s\n",
pid, sig, strerror(errno)); pid, sig, strerror(errno));
return -1; return -1;
} else { } else {
@ -879,9 +848,13 @@ int signal_user_task(const char *user, int pid, int sig) {
fprintf(LOGFILE, fprintf(LOGFILE,
"Error signalling process group %d with signal %d - %s\n", "Error signalling process group %d with signal %d - %s\n",
-pid, sig, strerror(errno)); -pid, sig, strerror(errno));
return UNABLE_TO_KILL_TASK; fprintf(stderr,
"Error signalling process group %d with signal %d - %s\n",
-pid, sig, strerror(errno));
fflush(LOGFILE);
return UNABLE_TO_SIGNAL_CONTAINER;
} else { } else {
return INVALID_TASK_PID; return INVALID_CONTAINER_PID;
} }
} }
fprintf(LOGFILE, "Killing process %s%d with %d\n", fprintf(LOGFILE, "Killing process %s%d with %d\n",
@ -890,12 +863,12 @@ int signal_user_task(const char *user, int pid, int sig) {
} }
/** /**
* Delete a final directory as the task tracker user. * Delete a final directory as the node manager user.
*/ */
static int rmdir_as_tasktracker(const char* path) { static int rmdir_as_nm(const char* path) {
int user_uid = geteuid(); int user_uid = geteuid();
int user_gid = getegid(); int user_gid = getegid();
int ret = change_effective_user(tt_uid, tt_gid); int ret = change_effective_user(nm_uid, nm_gid);
if (ret == 0) { if (ret == 0) {
if (rmdir(path) != 0) { if (rmdir(path) != 0) {
fprintf(LOGFILE, "rmdir of %s failed - %s\n", path, strerror(errno)); fprintf(LOGFILE, "rmdir of %s failed - %s\n", path, strerror(errno));
@ -1016,8 +989,8 @@ static int delete_path(const char *full_path,
if (needs_tt_user) { if (needs_tt_user) {
// If the delete failed, try a final rmdir as root on the top level. // If the delete failed, try a final rmdir as root on the top level.
// That handles the case where the top level directory is in a directory // That handles the case where the top level directory is in a directory
// that is owned by the task tracker. // that is owned by the node manager.
exit_code = rmdir_as_tasktracker(full_path); exit_code = rmdir_as_nm(full_path);
} }
free(paths[0]); free(paths[0]);
} }
@ -1025,7 +998,7 @@ static int delete_path(const char *full_path,
} }
/** /**
* Delete the given directory as the user from each of the tt_root directories * Delete the given directory as the user from each of the directories
* user: the user doing the delete * user: the user doing the delete
* subdir: the subdir to delete (if baseDirs is empty, this is treated as * subdir: the subdir to delete (if baseDirs is empty, this is treated as
an absolute path) an absolute path)

View File

@ -21,9 +21,9 @@
//command definitions //command definitions
enum command { enum command {
INITIALIZE_JOB = 0, INITIALIZE_CONTAINER = 0,
LAUNCH_TASK_JVM = 1, LAUNCH_CONTAINER = 1,
SIGNAL_TASK = 2, SIGNAL_CONTAINER = 2,
DELETE_AS_USER = 3, DELETE_AS_USER = 3,
}; };
@ -31,37 +31,36 @@ enum errorcodes {
INVALID_ARGUMENT_NUMBER = 1, INVALID_ARGUMENT_NUMBER = 1,
INVALID_USER_NAME, //2 INVALID_USER_NAME, //2
INVALID_COMMAND_PROVIDED, //3 INVALID_COMMAND_PROVIDED, //3
SUPER_USER_NOT_ALLOWED_TO_RUN_TASKS, //4 // SUPER_USER_NOT_ALLOWED_TO_RUN_TASKS (NOT USED) 4
INVALID_TT_ROOT, //5 INVALID_NM_ROOT_DIRS = 5,
SETUID_OPER_FAILED, //6 SETUID_OPER_FAILED, //6
UNABLE_TO_EXECUTE_TASK_SCRIPT, //7 UNABLE_TO_EXECUTE_CONTAINER_SCRIPT, //7
UNABLE_TO_KILL_TASK, //8 UNABLE_TO_SIGNAL_CONTAINER, //8
INVALID_TASK_PID, //9 INVALID_CONTAINER_PID, //9
ERROR_RESOLVING_FILE_PATH, //10 // ERROR_RESOLVING_FILE_PATH (NOT_USED) 10
RELATIVE_PATH_COMPONENTS_IN_FILE_PATH, //11 // RELATIVE_PATH_COMPONENTS_IN_FILE_PATH (NOT USED) 11
UNABLE_TO_STAT_FILE, //12 // UNABLE_TO_STAT_FILE (NOT USED) 12
FILE_NOT_OWNED_BY_TASKTRACKER, //13 // FILE_NOT_OWNED_BY_ROOT (NOT USED) 13
PREPARE_ATTEMPT_DIRECTORIES_FAILED, //14 // PREPARE_CONTAINER_DIRECTORIES_FAILED (NOT USED) 14
INITIALIZE_JOB_FAILED, //15 // INITIALIZE_CONTAINER_FAILED (NOT USED) 15
PREPARE_TASK_LOGS_FAILED, //16 // PREPARE_CONTAINER_LOGS_FAILED (NOT USED) 16
INVALID_TT_LOG_DIR, //17 // INVALID_LOG_DIR (NOT USED) 17
OUT_OF_MEMORY, //18 OUT_OF_MEMORY = 18,
INITIALIZE_DISTCACHEFILE_FAILED, //19 // INITIALIZE_DISTCACHEFILE_FAILED (NOT USED) 19
INITIALIZE_USER_FAILED, //20 INITIALIZE_USER_FAILED = 20,
UNABLE_TO_BUILD_PATH, //21 UNABLE_TO_BUILD_PATH, //21
INVALID_TASKCONTROLLER_PERMISSIONS, //22 INVALID_CONTAINER_EXEC_PERMISSIONS, //22
PREPARE_JOB_LOGS_FAILED, //23 // PREPARE_JOB_LOGS_FAILED (NOT USED) 23
INVALID_CONFIG_FILE, // 24 INVALID_CONFIG_FILE = 24
}; };
#define TT_GROUP_KEY "mapreduce.tasktracker.group" #define NM_GROUP_KEY "yarn.nodemanager.linux-container-executor.group"
#define USER_DIR_PATTERN "%s/usercache/%s" #define USER_DIR_PATTERN "%s/usercache/%s"
#define TT_JOB_DIR_PATTERN USER_DIR_PATTERN "/appcache/%s" #define NM_APP_DIR_PATTERN USER_DIR_PATTERN "/appcache/%s"
#define ATTEMPT_DIR_PATTERN TT_JOB_DIR_PATTERN "/%s" #define CONTAINER_DIR_PATTERN NM_APP_DIR_PATTERN "/%s"
#define TASK_SCRIPT "task.sh" #define CONTAINER_SCRIPT "launch_container.sh"
#define TT_LOCAL_TASK_DIR_PATTERN TT_JOB_DIR_PATTERN "/%s" #define NM_SYS_DIR_KEY "yarn.nodemanager.local-dirs"
#define TT_SYS_DIR_KEY "mapreduce.cluster.local.dir" #define NM_LOG_DIR_KEY "yarn.nodemanager.log-dirs"
#define TT_LOG_DIR_KEY "hadoop.log.dir"
#define CREDENTIALS_FILENAME "container_tokens" #define CREDENTIALS_FILENAME "container_tokens"
#define MIN_USERID_KEY "min.user.id" #define MIN_USERID_KEY "min.user.id"
#define BANNED_USERS_KEY "banned.users" #define BANNED_USERS_KEY "banned.users"
@ -70,23 +69,59 @@ extern struct passwd *user_detail;
// the log file for messages // the log file for messages
extern FILE *LOGFILE; extern FILE *LOGFILE;
// the log file for error messages
extern FILE *ERRORFILE;
// get the executable's filename // get the executable's filename
char* get_executable(); char* get_executable();
int check_taskcontroller_permissions(char *executable_file); /**
* Check the permissions on the container-executor to make sure that security is
* permissible. For this, we need container-executor binary to
* * be user-owned by root
* * be group-owned by a configured special group.
* * others do not have any permissions
* * be setuid/setgid
* @param executable_file the file to check
* @return -1 on error 0 on success.
*/
int check_executor_permissions(char *executable_file);
// initialize the job directory // initialize the application directory
int initialize_job(const char *user, const char *jobid, int initialize_app(const char *user, const char *app_id,
const char *credentials, char* const* args); const char *credentials, char* const* args);
// run the task as the user /*
int run_task_as_user(const char * user, const char *jobid, const char *taskid, * Function used to launch a container as the provided user. It does the following :
const char *work_dir, const char *script_name, * 1) Creates container work dir and log dir to be accessible by the child
const char *cred_file); * 2) Copies the script file from the TT to the work directory
* 3) Sets up the environment
* 4) Does an execlp on the same in order to replace the current image with
* container image.
* @param user the user to become
* @param app_id the application id
* @param container_id the container id
* @param work_dir the working directory for the container.
* @param script_name the name of the script to be run to launch the container.
* @param cred_file the credentials file that needs to be compied to the
* working directory.
* @return -1 or errorcode enum value on error (should never return on success).
*/
int launch_container_as_user(const char * user, const char *app_id,
const char *container_id, const char *work_dir,
const char *script_name, const char *cred_file);
// send a signal as the user /**
int signal_user_task(const char *user, int pid, int sig); * Function used to signal a container launched by the user.
* The function sends appropriate signal to the process group
* specified by the pid.
* @param user the user to send the signal as.
* @param pid the process id to send the signal to.
* @param sig the signal to send.
* @return an errorcode enum value on error, or 0 on success.
*/
int signal_container_as_user(const char *user, int pid, int sig);
// delete a directory (or file) recursively as the user. The directory // delete a directory (or file) recursively as the user. The directory
// could optionally be relative to the baseDir set of directories (if the same // could optionally be relative to the baseDir set of directories (if the same
@ -97,8 +132,9 @@ int delete_as_user(const char *user,
const char *dir_to_be_deleted, const char *dir_to_be_deleted,
char* const* baseDirs); char* const* baseDirs);
// set the task tracker's uid and gid // set the uid and gid of the node manager. This is used when doing some
void set_tasktracker_uid(uid_t user, gid_t group); // priviledged operations for setting the effective uid and gid.
void set_nm_uid(uid_t user, gid_t group);
/** /**
* Is the user a real user account? * Is the user a real user account?
@ -115,22 +151,22 @@ int set_user(const char *user);
// methods to get the directories // methods to get the directories
char *get_user_directory(const char *tt_root, const char *user); char *get_user_directory(const char *nm_root, const char *user);
char *get_job_directory(const char * tt_root, const char *user, char *get_app_directory(const char * nm_root, const char *user,
const char *jobid); const char *app_id);
char *get_attempt_work_directory(const char *tt_root, const char *user, char *get_container_work_directory(const char *nm_root, const char *user,
const char *job_dir, const char *attempt_id); const char *app_id, const char *container_id);
char *get_task_launcher_file(const char* work_dir); char *get_container_launcher_file(const char* work_dir);
char *get_task_credentials_file(const char* work_dir); char *get_container_credentials_file(const char* work_dir);
/** /**
* Get the job log directory under log_root * Get the app log directory under log_root
*/ */
char* get_job_log_directory(const char* log_root, const char* jobid); char* get_app_log_directory(const char* log_root, const char* appid);
/** /**
* Ensure that the given path and all of the parent directories are created * Ensure that the given path and all of the parent directories are created
@ -147,7 +183,7 @@ int initialize_user(const char *user);
* Create a top level directory for the user. * Create a top level directory for the user.
* It assumes that the parent directory is *not* writable by the user. * It assumes that the parent directory is *not* writable by the user.
* It creates directories with 02700 permissions owned by the user * It creates directories with 02700 permissions owned by the user
* and with the group set to the task tracker group. * and with the group set to the node manager group.
* return non-0 on failure * return non-0 on failure
*/ */
int create_directory_for_user(const char* path); int create_directory_for_user(const char* path);

View File

@ -31,18 +31,22 @@
#define _STRINGIFY(X) #X #define _STRINGIFY(X) #X
#define STRINGIFY(X) _STRINGIFY(X) #define STRINGIFY(X) _STRINGIFY(X)
#define CONF_FILENAME "taskcontroller.cfg" #define CONF_FILENAME "container-executor.cfg"
#ifndef HADOOP_CONF_DIR
#error HADOOP_CONF_DIR must be defined
#endif
void display_usage(FILE *stream) { void display_usage(FILE *stream) {
fprintf(stream, fprintf(stream,
"Usage: container-executor user command command-args\n"); "Usage: container-executor user command command-args\n");
fprintf(stream, "Commands:\n"); fprintf(stream, "Commands:\n");
fprintf(stream, " initialize job: %2d jobid tokens cmd args\n", fprintf(stream, " initialize container: %2d appid tokens cmd app...\n",
INITIALIZE_JOB); INITIALIZE_CONTAINER);
fprintf(stream, " launch task: %2d jobid taskid workdir task-script jobTokens\n", fprintf(stream, " launch container: %2d appid containerid workdir container-script tokens\n",
LAUNCH_TASK_JVM); LAUNCH_CONTAINER);
fprintf(stream, " signal task: %2d task-pid signal\n", fprintf(stream, " signal container: %2d container-pid signal\n",
SIGNAL_TASK); SIGNAL_CONTAINER);
fprintf(stream, " delete as user: %2d relative-path\n", fprintf(stream, " delete as user: %2d relative-path\n",
DELETE_AS_USER); DELETE_AS_USER);
} }
@ -55,9 +59,10 @@ int main(int argc, char **argv) {
} }
LOGFILE = stdout; LOGFILE = stdout;
ERRORFILE = stderr;
int command; int command;
const char * job_id = NULL; const char * app_id = NULL;
const char * task_id = NULL; const char * container_id = NULL;
const char * cred_file = NULL; const char * cred_file = NULL;
const char * script_file = NULL; const char * script_file = NULL;
const char * current_dir = NULL; const char * current_dir = NULL;
@ -68,54 +73,46 @@ int main(int argc, char **argv) {
char *executable_file = get_executable(); char *executable_file = get_executable();
#ifndef HADOOP_CONF_DIR
#error HADOOP_CONF_DIR must be defined
#endif
char *orig_conf_file = STRINGIFY(HADOOP_CONF_DIR) "/" CONF_FILENAME; char *orig_conf_file = STRINGIFY(HADOOP_CONF_DIR) "/" CONF_FILENAME;
char *conf_file = realpath(orig_conf_file, NULL); char *conf_file = realpath(orig_conf_file, NULL);
if (conf_file == NULL) { if (conf_file == NULL) {
fprintf(LOGFILE, "Configuration file %s not found.\n", orig_conf_file); fprintf(ERRORFILE, "Configuration file %s not found.\n", orig_conf_file);
fflush(LOGFILE); exit(INVALID_CONFIG_FILE);
return INVALID_CONFIG_FILE;
} }
if (check_configuration_permissions(conf_file) != 0) { if (check_configuration_permissions(conf_file) != 0) {
return INVALID_CONFIG_FILE; exit(INVALID_CONFIG_FILE);
} }
read_config(conf_file); read_config(conf_file);
free(conf_file); free(conf_file);
// look up the task tracker group in the config file // look up the node manager group in the config file
char *tt_group = get_value(TT_GROUP_KEY); char *nm_group = get_value(NM_GROUP_KEY);
if (tt_group == NULL) { if (nm_group == NULL) {
fprintf(LOGFILE, "Can't get configured value for %s.\n", TT_GROUP_KEY); fprintf(ERRORFILE, "Can't get configured value for %s.\n", NM_GROUP_KEY);
fflush(LOGFILE);
exit(INVALID_CONFIG_FILE); exit(INVALID_CONFIG_FILE);
} }
struct group *group_info = getgrnam(tt_group); struct group *group_info = getgrnam(nm_group);
if (group_info == NULL) { if (group_info == NULL) {
fprintf(LOGFILE, "Can't get group information for %s - %s.\n", tt_group, fprintf(ERRORFILE, "Can't get group information for %s - %s.\n", nm_group,
strerror(errno)); strerror(errno));
fflush(LOGFILE); fflush(LOGFILE);
exit(INVALID_CONFIG_FILE); exit(INVALID_CONFIG_FILE);
} }
set_tasktracker_uid(getuid(), group_info->gr_gid); set_nm_uid(getuid(), group_info->gr_gid);
// if we are running from a setuid executable, make the real uid root // if we are running from a setuid executable, make the real uid root
setuid(0); setuid(0);
// set the real and effective group id to the task tracker group // set the real and effective group id to the node manager group
setgid(group_info->gr_gid); setgid(group_info->gr_gid);
if (check_taskcontroller_permissions(executable_file) != 0) { if (check_executor_permissions(executable_file) != 0) {
fprintf(LOGFILE, "Invalid permissions on container-executor binary.\n"); fprintf(ERRORFILE, "Invalid permissions on container-executor binary.\n");
fflush(LOGFILE); return INVALID_CONTAINER_EXEC_PERMISSIONS;
return INVALID_TASKCONTROLLER_PERMISSIONS;
} }
//checks done for user name //checks done for user name
if (argv[optind] == NULL) { if (argv[optind] == NULL) {
fprintf(LOGFILE, "Invalid user name \n"); fprintf(ERRORFILE, "Invalid user name.\n");
fflush(LOGFILE);
return INVALID_USER_NAME; return INVALID_USER_NAME;
} }
int ret = set_user(argv[optind]); int ret = set_user(argv[optind]);
@ -128,53 +125,59 @@ int main(int argc, char **argv) {
fprintf(LOGFILE, "main : command provided %d\n",command); fprintf(LOGFILE, "main : command provided %d\n",command);
fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name); fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name);
fflush(LOGFILE);
switch (command) { switch (command) {
case INITIALIZE_JOB: case INITIALIZE_CONTAINER:
if (argc < 6) { if (argc < 6) {
fprintf(LOGFILE, "Too few arguments (%d vs 6) for initialize job\n", fprintf(ERRORFILE, "Too few arguments (%d vs 6) for initialize container\n",
argc); argc);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER; return INVALID_ARGUMENT_NUMBER;
} }
job_id = argv[optind++]; app_id = argv[optind++];
cred_file = argv[optind++]; cred_file = argv[optind++];
exit_code = initialize_job(user_detail->pw_name, job_id, cred_file, exit_code = initialize_app(user_detail->pw_name, app_id, cred_file,
argv + optind); argv + optind);
break; break;
case LAUNCH_TASK_JVM: case LAUNCH_CONTAINER:
if (argc < 8) { if (argc < 8) {
fprintf(LOGFILE, "Too few arguments (%d vs 8) for launch task\n", fprintf(ERRORFILE, "Too few arguments (%d vs 8) for launch container\n",
argc); argc);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER; return INVALID_ARGUMENT_NUMBER;
} }
job_id = argv[optind++]; app_id = argv[optind++];
task_id = argv[optind++]; container_id = argv[optind++];
current_dir = argv[optind++]; current_dir = argv[optind++];
script_file = argv[optind++]; script_file = argv[optind++];
cred_file = argv[optind++]; cred_file = argv[optind++];
exit_code = run_task_as_user(user_detail->pw_name, job_id, task_id, exit_code = launch_container_as_user(user_detail->pw_name, app_id, container_id,
current_dir, script_file, cred_file); current_dir, script_file, cred_file);
break; break;
case SIGNAL_TASK: case SIGNAL_CONTAINER:
if (argc < 5) { if (argc < 5) {
fprintf(LOGFILE, "Too few arguments (%d vs 5) for signal task\n", fprintf(ERRORFILE, "Too few arguments (%d vs 5) for signal container\n",
argc); argc);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER; return INVALID_ARGUMENT_NUMBER;
} else { } else {
char* end_ptr = NULL; char* end_ptr = NULL;
char* option = argv[optind++]; char* option = argv[optind++];
int task_pid = strtol(option, &end_ptr, 10); int container_pid = strtol(option, &end_ptr, 10);
if (option == end_ptr || *end_ptr != '\0') { if (option == end_ptr || *end_ptr != '\0') {
fprintf(LOGFILE, "Illegal argument for task pid %s\n", option); fprintf(ERRORFILE, "Illegal argument for container pid %s\n", option);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER; return INVALID_ARGUMENT_NUMBER;
} }
option = argv[optind++]; option = argv[optind++];
int signal = strtol(option, &end_ptr, 10); int signal = strtol(option, &end_ptr, 10);
if (option == end_ptr || *end_ptr != '\0') { if (option == end_ptr || *end_ptr != '\0') {
fprintf(LOGFILE, "Illegal argument for signal %s\n", option); fprintf(ERRORFILE, "Illegal argument for signal %s\n", option);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER; return INVALID_ARGUMENT_NUMBER;
} }
exit_code = signal_user_task(user_detail->pw_name, task_pid, signal); exit_code = signal_container_as_user(user_detail->pw_name, container_pid, signal);
} }
break; break;
case DELETE_AS_USER: case DELETE_AS_USER:
@ -183,8 +186,11 @@ int main(int argc, char **argv) {
argv + optind); argv + optind);
break; break;
default: default:
fprintf(ERRORFILE, "Invalid command %d not supported.",command);
fflush(ERRORFILE);
exit_code = INVALID_COMMAND_PROVIDED; exit_code = INVALID_COMMAND_PROVIDED;
} }
fclose(LOGFILE); fclose(LOGFILE);
fclose(ERRORFILE);
return exit_code; return exit_code;
} }

View File

@ -28,7 +28,7 @@
#include <sys/stat.h> #include <sys/stat.h>
#include <sys/wait.h> #include <sys/wait.h>
#define TEST_ROOT "/tmp/test-task-controller" #define TEST_ROOT "/tmp/test-container-controller"
#define DONT_TOUCH_FILE "dont-touch-me" #define DONT_TOUCH_FILE "dont-touch-me"
static char* username = NULL; static char* username = NULL;
@ -84,46 +84,40 @@ void run(const char *cmd) {
int write_config_file(char *file_name) { int write_config_file(char *file_name) {
FILE *file; FILE *file;
int i = 0;
file = fopen(file_name, "w"); file = fopen(file_name, "w");
if (file == NULL) { if (file == NULL) {
printf("Failed to open %s.\n", file_name); printf("Failed to open %s.\n", file_name);
return EXIT_FAILURE; return EXIT_FAILURE;
} }
fprintf(file, "mapred.local.dir=" TEST_ROOT "/local-1"); fprintf(file, "yarn.nodemanager.local-dirs=" TEST_ROOT "/local-1");
int i;
for(i=2; i < 5; ++i) { for(i=2; i < 5; ++i) {
fprintf(file, "," TEST_ROOT "/local-%d", i); fprintf(file, "," TEST_ROOT "/local-%d", i);
} }
fprintf(file, "\n"); fprintf(file, "\n");
fprintf(file, "mapreduce.cluster.local.dir=" TEST_ROOT "/local-1"); fprintf(file, "yarn.nodemanager.log-dirs=" TEST_ROOT "/logs\n");
for(i=2; i < 5; ++i) {
fprintf(file, "," TEST_ROOT "/local-%d", i);
}
fprintf(file, "\n");
fprintf(file, "hadoop.log.dir=" TEST_ROOT "/logs\n");
fclose(file); fclose(file);
return 0; return 0;
} }
void create_tt_roots() { void create_nm_roots() {
char** tt_roots = get_values("mapred.local.dir"); char** nm_roots = get_values(NM_SYS_DIR_KEY);
char** tt_root; char** nm_root;
for(tt_root=tt_roots; *tt_root != NULL; ++tt_root) { for(nm_root=nm_roots; *nm_root != NULL; ++nm_root) {
if (mkdir(*tt_root, 0755) != 0) { if (mkdir(*nm_root, 0755) != 0) {
printf("FAIL: Can't create directory %s - %s\n", *tt_root, printf("FAIL: Can't create directory %s - %s\n", *nm_root,
strerror(errno)); strerror(errno));
exit(1); exit(1);
} }
char buffer[100000]; char buffer[100000];
sprintf(buffer, "%s/usercache", *tt_root); sprintf(buffer, "%s/usercache", *nm_root);
if (mkdir(buffer, 0755) != 0) { if (mkdir(buffer, 0755) != 0) {
printf("FAIL: Can't create directory %s - %s\n", buffer, printf("FAIL: Can't create directory %s - %s\n", buffer,
strerror(errno)); strerror(errno));
exit(1); exit(1);
} }
} }
free_values(tt_roots); free_values(nm_roots);
} }
void test_get_user_directory() { void test_get_user_directory() {
@ -136,49 +130,49 @@ void test_get_user_directory() {
free(user_dir); free(user_dir);
} }
void test_get_job_directory() { void test_get_app_directory() {
char *expected = "/tmp/usercache/user/appcache/job_200906101234_0001"; char *expected = "/tmp/usercache/user/appcache/app_200906101234_0001";
char *job_dir = (char *) get_job_directory("/tmp", "user", char *app_dir = (char *) get_app_directory("/tmp", "user",
"job_200906101234_0001"); "app_200906101234_0001");
if (strcmp(job_dir, expected) != 0) { if (strcmp(app_dir, expected) != 0) {
printf("test_get_job_directory expected %s got %s\n", expected, job_dir); printf("test_get_app_directory expected %s got %s\n", expected, app_dir);
exit(1); exit(1);
} }
free(job_dir); free(app_dir);
} }
void test_get_attempt_directory() { void test_get_container_directory() {
char *attempt_dir = get_attempt_work_directory("/tmp", "owen", "job_1", char *container_dir = get_container_work_directory("/tmp", "owen", "app_1",
"attempt_1"); "container_1");
char *expected = "/tmp/usercache/owen/appcache/job_1/attempt_1"; char *expected = "/tmp/usercache/owen/appcache/app_1/container_1";
if (strcmp(attempt_dir, expected) != 0) { if (strcmp(container_dir, expected) != 0) {
printf("Fail get_attempt_work_directory got %s expected %s\n", printf("Fail get_container_work_directory got %s expected %s\n",
attempt_dir, expected); container_dir, expected);
exit(1); exit(1);
} }
free(attempt_dir); free(container_dir);
} }
void test_get_task_launcher_file() { void test_get_container_launcher_file() {
char *expected_file = ("/tmp/usercache/user/appcache/job_200906101234_0001" char *expected_file = ("/tmp/usercache/user/appcache/app_200906101234_0001"
"/task.sh"); "/launch_container.sh");
char *job_dir = get_job_directory("/tmp", "user", char *app_dir = get_app_directory("/tmp", "user",
"job_200906101234_0001"); "app_200906101234_0001");
char *task_file = get_task_launcher_file(job_dir); char *container_file = get_container_launcher_file(app_dir);
if (strcmp(task_file, expected_file) != 0) { if (strcmp(container_file, expected_file) != 0) {
printf("failure to match expected task file %s vs %s\n", task_file, printf("failure to match expected container file %s vs %s\n", container_file,
expected_file); expected_file);
exit(1); exit(1);
} }
free(job_dir); free(app_dir);
free(task_file); free(container_file);
} }
void test_get_job_log_dir() { void test_get_app_log_dir() {
char *expected = TEST_ROOT "/logs/userlogs/job_200906101234_0001"; char *expected = TEST_ROOT "/logs/userlogs/app_200906101234_0001";
char *logdir = get_job_log_directory(TEST_ROOT "/logs/userlogs","job_200906101234_0001"); char *logdir = get_app_log_directory(TEST_ROOT "/logs/userlogs","app_200906101234_0001");
if (strcmp(logdir, expected) != 0) { if (strcmp(logdir, expected) != 0) {
printf("Fail get_job_log_dir got %s expected %s\n", logdir, expected); printf("Fail get_app_log_dir got %s expected %s\n", logdir, expected);
exit(1); exit(1);
} }
free(logdir); free(logdir);
@ -200,10 +194,6 @@ void test_check_user() {
printf("FAIL: failed check for system user root\n"); printf("FAIL: failed check for system user root\n");
exit(1); exit(1);
} }
if (check_user("mapred") != NULL) {
printf("FAIL: failed check for hadoop user mapred\n");
exit(1);
}
} }
void test_check_configuration_permissions() { void test_check_configuration_permissions() {
@ -218,56 +208,56 @@ void test_check_configuration_permissions() {
} }
} }
void test_delete_task() { void test_delete_container() {
if (initialize_user(username)) { if (initialize_user(username)) {
printf("FAIL: failed to initialize user %s\n", username); printf("FAIL: failed to initialize user %s\n", username);
exit(1); exit(1);
} }
char* job_dir = get_job_directory(TEST_ROOT "/local-2", username, "job_1"); char* app_dir = get_app_directory(TEST_ROOT "/local-2", username, "app_1");
char* dont_touch = get_job_directory(TEST_ROOT "/local-2", username, char* dont_touch = get_app_directory(TEST_ROOT "/local-2", username,
DONT_TOUCH_FILE); DONT_TOUCH_FILE);
char* task_dir = get_attempt_work_directory(TEST_ROOT "/local-2", char* container_dir = get_container_work_directory(TEST_ROOT "/local-2",
username, "job_1", "task_1"); username, "app_1", "container_1");
char buffer[100000]; char buffer[100000];
sprintf(buffer, "mkdir -p %s/who/let/the/dogs/out/who/who", task_dir); sprintf(buffer, "mkdir -p %s/who/let/the/dogs/out/who/who", container_dir);
run(buffer); run(buffer);
sprintf(buffer, "touch %s", dont_touch); sprintf(buffer, "touch %s", dont_touch);
run(buffer); run(buffer);
// soft link to the canary file from the task directory // soft link to the canary file from the container directory
sprintf(buffer, "ln -s %s %s/who/softlink", dont_touch, task_dir); sprintf(buffer, "ln -s %s %s/who/softlink", dont_touch, container_dir);
run(buffer); run(buffer);
// hard link to the canary file from the task directory // hard link to the canary file from the container directory
sprintf(buffer, "ln %s %s/who/hardlink", dont_touch, task_dir); sprintf(buffer, "ln %s %s/who/hardlink", dont_touch, container_dir);
run(buffer); run(buffer);
// create a dot file in the task directory // create a dot file in the container directory
sprintf(buffer, "touch %s/who/let/.dotfile", task_dir); sprintf(buffer, "touch %s/who/let/.dotfile", container_dir);
run(buffer); run(buffer);
// create a no permission file // create a no permission file
sprintf(buffer, "touch %s/who/let/protect", task_dir); sprintf(buffer, "touch %s/who/let/protect", container_dir);
run(buffer); run(buffer);
sprintf(buffer, "chmod 000 %s/who/let/protect", task_dir); sprintf(buffer, "chmod 000 %s/who/let/protect", container_dir);
run(buffer); run(buffer);
// create a no permission directory // create a no permission directory
sprintf(buffer, "chmod 000 %s/who/let", task_dir); sprintf(buffer, "chmod 000 %s/who/let", container_dir);
run(buffer); run(buffer);
// delete task directory // delete container directory
char * dirs[] = {job_dir, 0}; char * dirs[] = {app_dir, 0};
int ret = delete_as_user(username, "task_1" , dirs); int ret = delete_as_user(username, "container_1" , dirs);
if (ret != 0) { if (ret != 0) {
printf("FAIL: return code from delete_as_user is %d\n", ret); printf("FAIL: return code from delete_as_user is %d\n", ret);
exit(1); exit(1);
} }
// check to make sure the task directory is gone // check to make sure the container directory is gone
if (access(task_dir, R_OK) == 0) { if (access(container_dir, R_OK) == 0) {
printf("FAIL: failed to delete the directory - %s\n", task_dir); printf("FAIL: failed to delete the directory - %s\n", container_dir);
exit(1); exit(1);
} }
// check to make sure the job directory is not gone // check to make sure the app directory is not gone
if (access(job_dir, R_OK) != 0) { if (access(app_dir, R_OK) != 0) {
printf("FAIL: accidently deleted the directory - %s\n", job_dir); printf("FAIL: accidently deleted the directory - %s\n", app_dir);
exit(1); exit(1);
} }
// but that the canary is not gone // but that the canary is not gone
@ -275,60 +265,60 @@ void test_delete_task() {
printf("FAIL: accidently deleted file %s\n", dont_touch); printf("FAIL: accidently deleted file %s\n", dont_touch);
exit(1); exit(1);
} }
sprintf(buffer, "chmod -R 700 %s", job_dir); sprintf(buffer, "chmod -R 700 %s", app_dir);
run(buffer); run(buffer);
sprintf(buffer, "rm -fr %s", job_dir); sprintf(buffer, "rm -fr %s", app_dir);
run(buffer); run(buffer);
free(job_dir); free(app_dir);
free(task_dir); free(container_dir);
free(dont_touch); free(dont_touch);
} }
void test_delete_job() { void test_delete_app() {
char* job_dir = get_job_directory(TEST_ROOT "/local-2", username, "job_2"); char* app_dir = get_app_directory(TEST_ROOT "/local-2", username, "app_2");
char* dont_touch = get_job_directory(TEST_ROOT "/local-2", username, char* dont_touch = get_app_directory(TEST_ROOT "/local-2", username,
DONT_TOUCH_FILE); DONT_TOUCH_FILE);
char* task_dir = get_attempt_work_directory(TEST_ROOT "/local-2", char* container_dir = get_container_work_directory(TEST_ROOT "/local-2",
username, "job_2", "task_1"); username, "app_2", "container_1");
char buffer[100000]; char buffer[100000];
sprintf(buffer, "mkdir -p %s/who/let/the/dogs/out/who/who", task_dir); sprintf(buffer, "mkdir -p %s/who/let/the/dogs/out/who/who", container_dir);
run(buffer); run(buffer);
sprintf(buffer, "touch %s", dont_touch); sprintf(buffer, "touch %s", dont_touch);
run(buffer); run(buffer);
// soft link to the canary file from the task directory // soft link to the canary file from the container directory
sprintf(buffer, "ln -s %s %s/who/softlink", dont_touch, task_dir); sprintf(buffer, "ln -s %s %s/who/softlink", dont_touch, container_dir);
run(buffer); run(buffer);
// hard link to the canary file from the task directory // hard link to the canary file from the container directory
sprintf(buffer, "ln %s %s/who/hardlink", dont_touch, task_dir); sprintf(buffer, "ln %s %s/who/hardlink", dont_touch, container_dir);
run(buffer); run(buffer);
// create a dot file in the task directory // create a dot file in the container directory
sprintf(buffer, "touch %s/who/let/.dotfile", task_dir); sprintf(buffer, "touch %s/who/let/.dotfile", container_dir);
run(buffer); run(buffer);
// create a no permission file // create a no permission file
sprintf(buffer, "touch %s/who/let/protect", task_dir); sprintf(buffer, "touch %s/who/let/protect", container_dir);
run(buffer); run(buffer);
sprintf(buffer, "chmod 000 %s/who/let/protect", task_dir); sprintf(buffer, "chmod 000 %s/who/let/protect", container_dir);
run(buffer); run(buffer);
// create a no permission directory // create a no permission directory
sprintf(buffer, "chmod 000 %s/who/let", task_dir); sprintf(buffer, "chmod 000 %s/who/let", container_dir);
run(buffer); run(buffer);
// delete task directory // delete container directory
int ret = delete_as_user(username, job_dir, NULL); int ret = delete_as_user(username, app_dir, NULL);
if (ret != 0) { if (ret != 0) {
printf("FAIL: return code from delete_as_user is %d\n", ret); printf("FAIL: return code from delete_as_user is %d\n", ret);
exit(1); exit(1);
} }
// check to make sure the task directory is gone // check to make sure the container directory is gone
if (access(task_dir, R_OK) == 0) { if (access(container_dir, R_OK) == 0) {
printf("FAIL: failed to delete the directory - %s\n", task_dir); printf("FAIL: failed to delete the directory - %s\n", container_dir);
exit(1); exit(1);
} }
// check to make sure the job directory is gone // check to make sure the app directory is gone
if (access(job_dir, R_OK) == 0) { if (access(app_dir, R_OK) == 0) {
printf("FAIL: didn't delete the directory - %s\n", job_dir); printf("FAIL: didn't delete the directory - %s\n", app_dir);
exit(1); exit(1);
} }
// but that the canary is not gone // but that the canary is not gone
@ -336,16 +326,16 @@ void test_delete_job() {
printf("FAIL: accidently deleted file %s\n", dont_touch); printf("FAIL: accidently deleted file %s\n", dont_touch);
exit(1); exit(1);
} }
free(job_dir); free(app_dir);
free(task_dir); free(container_dir);
free(dont_touch); free(dont_touch);
} }
void test_delete_user() { void test_delete_user() {
printf("\nTesting delete_user\n"); printf("\nTesting delete_user\n");
char* job_dir = get_job_directory(TEST_ROOT "/local-1", username, "job_3"); char* app_dir = get_app_directory(TEST_ROOT "/local-1", username, "app_3");
if (mkdirs(job_dir, 0700) != 0) { if (mkdirs(app_dir, 0700) != 0) {
exit(1); exit(1);
} }
char buffer[100000]; char buffer[100000];
@ -365,7 +355,7 @@ void test_delete_user() {
printf("FAIL: local-1 directory does not exist\n"); printf("FAIL: local-1 directory does not exist\n");
exit(1); exit(1);
} }
free(job_dir); free(app_dir);
} }
void run_test_in_child(const char* test_name, void (*func)()) { void run_test_in_child(const char* test_name, void (*func)()) {
@ -397,8 +387,8 @@ void run_test_in_child(const char* test_name, void (*func)()) {
} }
} }
void test_signal_task() { void test_signal_container() {
printf("\nTesting signal_task\n"); printf("\nTesting signal_container\n");
fflush(stdout); fflush(stdout);
fflush(stderr); fflush(stderr);
pid_t child = fork(); pid_t child = fork();
@ -412,8 +402,8 @@ void test_signal_task() {
sleep(3600); sleep(3600);
exit(0); exit(0);
} else { } else {
printf("Child task launched as %d\n", child); printf("Child container launched as %d\n", child);
if (signal_user_task(username, child, SIGQUIT) != 0) { if (signal_container_as_user(username, child, SIGQUIT) != 0) {
exit(1); exit(1);
} }
int status = 0; int status = 0;
@ -433,8 +423,8 @@ void test_signal_task() {
} }
} }
void test_signal_task_group() { void test_signal_container_group() {
printf("\nTesting group signal_task\n"); printf("\nTesting group signal_container\n");
fflush(stdout); fflush(stdout);
fflush(stderr); fflush(stderr);
pid_t child = fork(); pid_t child = fork();
@ -449,8 +439,8 @@ void test_signal_task_group() {
sleep(3600); sleep(3600);
exit(0); exit(0);
} }
printf("Child task launched as %d\n", child); printf("Child container launched as %d\n", child);
if (signal_user_task(username, child, SIGKILL) != 0) { if (signal_container_as_user(username, child, SIGKILL) != 0) {
exit(1); exit(1);
} }
int status = 0; int status = 0;
@ -469,8 +459,8 @@ void test_signal_task_group() {
} }
} }
void test_init_job() { void test_init_app() {
printf("\nTesting init job\n"); printf("\nTesting init app\n");
if (seteuid(0) != 0) { if (seteuid(0) != 0) {
printf("FAIL: seteuid to root failed - %s\n", strerror(errno)); printf("FAIL: seteuid to root failed - %s\n", strerror(errno));
exit(1); exit(1);
@ -509,12 +499,12 @@ void test_init_job() {
fflush(stderr); fflush(stderr);
pid_t child = fork(); pid_t child = fork();
if (child == -1) { if (child == -1) {
printf("FAIL: failed to fork process for init_job - %s\n", printf("FAIL: failed to fork process for init_app - %s\n",
strerror(errno)); strerror(errno));
exit(1); exit(1);
} else if (child == 0) { } else if (child == 0) {
char *final_pgm[] = {"touch", "my-touch-file", 0}; char *final_pgm[] = {"touch", "my-touch-file", 0};
if (initialize_job(username, "job_4", TEST_ROOT "/creds.txt", final_pgm) != 0) { if (initialize_app(username, "app_4", TEST_ROOT "/creds.txt", final_pgm) != 0) {
printf("FAIL: failed in child\n"); printf("FAIL: failed in child\n");
exit(42); exit(42);
} }
@ -527,37 +517,37 @@ void test_init_job() {
strerror(errno)); strerror(errno));
exit(1); exit(1);
} }
if (access(TEST_ROOT "/logs/userlogs/job_4", R_OK) != 0) { if (access(TEST_ROOT "/logs/userlogs/app_4", R_OK) != 0) {
printf("FAIL: failed to create job log directory\n"); printf("FAIL: failed to create app log directory\n");
exit(1); exit(1);
} }
char* job_dir = get_job_directory(TEST_ROOT "/local-1", username, "job_4"); char* app_dir = get_app_directory(TEST_ROOT "/local-1", username, "app_4");
if (access(job_dir, R_OK) != 0) { if (access(app_dir, R_OK) != 0) {
printf("FAIL: failed to create job directory %s\n", job_dir); printf("FAIL: failed to create app directory %s\n", app_dir);
exit(1); exit(1);
} }
char buffer[100000]; char buffer[100000];
sprintf(buffer, "%s/jobToken", job_dir); sprintf(buffer, "%s/jobToken", app_dir);
if (access(buffer, R_OK) != 0) { if (access(buffer, R_OK) != 0) {
printf("FAIL: failed to create credentials %s\n", buffer); printf("FAIL: failed to create credentials %s\n", buffer);
exit(1); exit(1);
} }
sprintf(buffer, "%s/my-touch-file", job_dir); sprintf(buffer, "%s/my-touch-file", app_dir);
if (access(buffer, R_OK) != 0) { if (access(buffer, R_OK) != 0) {
printf("FAIL: failed to create touch file %s\n", buffer); printf("FAIL: failed to create touch file %s\n", buffer);
exit(1); exit(1);
} }
free(job_dir); free(app_dir);
job_dir = get_job_log_directory("logs","job_4"); app_dir = get_app_log_directory("logs","app_4");
if (access(job_dir, R_OK) != 0) { if (access(app_dir, R_OK) != 0) {
printf("FAIL: failed to create job log directory %s\n", job_dir); printf("FAIL: failed to create app log directory %s\n", app_dir);
exit(1); exit(1);
} }
free(job_dir); free(app_dir);
} }
void test_run_task() { void test_run_container() {
printf("\nTesting run task\n"); printf("\nTesting run container\n");
if (seteuid(0) != 0) { if (seteuid(0) != 0) {
printf("FAIL: seteuid to root failed - %s\n", strerror(errno)); printf("FAIL: seteuid to root failed - %s\n", strerror(errno));
exit(1); exit(1);
@ -576,7 +566,7 @@ void test_run_task() {
exit(1); exit(1);
} }
const char* script_name = TEST_ROOT "/task-script"; const char* script_name = TEST_ROOT "/container-script";
FILE* script = fopen(script_name, "w"); FILE* script = fopen(script_name, "w");
if (script == NULL) { if (script == NULL) {
printf("FAIL: failed to create script file - %s\n", strerror(errno)); printf("FAIL: failed to create script file - %s\n", strerror(errno));
@ -598,16 +588,16 @@ void test_run_task() {
} }
fflush(stdout); fflush(stdout);
fflush(stderr); fflush(stderr);
char* task_dir = get_attempt_work_directory(TEST_ROOT "/local-1", char* container_dir = get_container_work_directory(TEST_ROOT "/local-1",
username, "job_4", "task_1"); username, "app_4", "container_1");
pid_t child = fork(); pid_t child = fork();
if (child == -1) { if (child == -1) {
printf("FAIL: failed to fork process for init_job - %s\n", printf("FAIL: failed to fork process for init_app - %s\n",
strerror(errno)); strerror(errno));
exit(1); exit(1);
} else if (child == 0) { } else if (child == 0) {
if (run_task_as_user(username, "job_4", "task_1", if (launch_container_as_user(username, "app_4", "container_1",
task_dir, script_name, TEST_ROOT "creds.txt") != 0) { container_dir, script_name, TEST_ROOT "creds.txt") != 0) {
printf("FAIL: failed in child\n"); printf("FAIL: failed in child\n");
exit(42); exit(42);
} }
@ -620,31 +610,32 @@ void test_run_task() {
strerror(errno)); strerror(errno));
exit(1); exit(1);
} }
if (access(TEST_ROOT "/logs/userlogs/job_4/task_1", R_OK) != 0) { if (access(TEST_ROOT "/logs/userlogs/app_4/container_1", R_OK) != 0) {
printf("FAIL: failed to create task log directory\n"); printf("FAIL: failed to create container log directory\n");
exit(1); exit(1);
} }
if (access(task_dir, R_OK) != 0) { if (access(container_dir, R_OK) != 0) {
printf("FAIL: failed to create task directory %s\n", task_dir); printf("FAIL: failed to create container directory %s\n", container_dir);
exit(1); exit(1);
} }
char buffer[100000]; char buffer[100000];
sprintf(buffer, "%s/foobar", task_dir); sprintf(buffer, "%s/foobar", container_dir);
if (access(buffer, R_OK) != 0) { if (access(buffer, R_OK) != 0) {
printf("FAIL: failed to create touch file %s\n", buffer); printf("FAIL: failed to create touch file %s\n", buffer);
exit(1); exit(1);
} }
free(task_dir); free(container_dir);
task_dir = get_job_log_directory("logs", "job_4/task_1"); container_dir = get_app_log_directory("logs", "app_4/container_1");
if (access(task_dir, R_OK) != 0) { if (access(container_dir, R_OK) != 0) {
printf("FAIL: failed to create job log directory %s\n", task_dir); printf("FAIL: failed to create app log directory %s\n", container_dir);
exit(1); exit(1);
} }
free(task_dir); free(container_dir);
} }
int main(int argc, char **argv) { int main(int argc, char **argv) {
LOGFILE = stdout; LOGFILE = stdout;
ERRORFILE = stderr;
int my_username = 0; int my_username = 0;
// clean up any junk from previous run // clean up any junk from previous run
@ -659,7 +650,7 @@ int main(int argc, char **argv) {
} }
read_config(TEST_ROOT "/test.cfg"); read_config(TEST_ROOT "/test.cfg");
create_tt_roots(); create_nm_roots();
if (getuid() == 0 && argc == 2) { if (getuid() == 0 && argc == 2) {
username = argv[1]; username = argv[1];
@ -667,7 +658,7 @@ int main(int argc, char **argv) {
username = strdup(getpwuid(getuid())->pw_name); username = strdup(getpwuid(getuid())->pw_name);
my_username = 1; my_username = 1;
} }
set_tasktracker_uid(geteuid(), getegid()); set_nm_uid(geteuid(), getegid());
if (set_user(username)) { if (set_user(username)) {
exit(1); exit(1);
@ -678,25 +669,25 @@ int main(int argc, char **argv) {
printf("\nTesting get_user_directory()\n"); printf("\nTesting get_user_directory()\n");
test_get_user_directory(); test_get_user_directory();
printf("\nTesting get_job_directory()\n"); printf("\nTesting get_app_directory()\n");
test_get_job_directory(); test_get_app_directory();
printf("\nTesting get_attempt_directory()\n"); printf("\nTesting get_container_directory()\n");
test_get_attempt_directory(); test_get_container_directory();
printf("\nTesting get_task_launcher_file()\n"); printf("\nTesting get_container_launcher_file()\n");
test_get_task_launcher_file(); test_get_container_launcher_file();
printf("\nTesting get_job_log_dir()\n"); printf("\nTesting get_app_log_dir()\n");
test_get_job_log_dir(); test_get_app_log_dir();
test_check_configuration_permissions(); test_check_configuration_permissions();
printf("\nTesting delete_task()\n"); printf("\nTesting delete_container()\n");
test_delete_task(); test_delete_container();
printf("\nTesting delete_job()\n"); printf("\nTesting delete_app()\n");
test_delete_job(); test_delete_app();
test_delete_user(); test_delete_user();
@ -704,15 +695,15 @@ int main(int argc, char **argv) {
// the tests that change user need to be run in a subshell, so that // the tests that change user need to be run in a subshell, so that
// when they change user they don't give up our privs // when they change user they don't give up our privs
run_test_in_child("test_signal_task", test_signal_task); run_test_in_child("test_signal_container", test_signal_container);
run_test_in_child("test_signal_task_group", test_signal_task_group); run_test_in_child("test_signal_container_group", test_signal_container_group);
// init job and run task can't be run if you aren't testing as root // init app and run container can't be run if you aren't testing as root
if (getuid() == 0) { if (getuid() == 0) {
// these tests do internal forks so that the change_owner and execs // these tests do internal forks so that the change_owner and execs
// don't mess up our process. // don't mess up our process.
test_init_job(); test_init_app();
test_run_task(); test_run_container();
} }
seteuid(0); seteuid(0);

View File

@ -54,25 +54,26 @@ import org.junit.Test;
* <li>Compile the code with container-executor.conf.dir set to the location you * <li>Compile the code with container-executor.conf.dir set to the location you
* want for testing. * want for testing.
* <br><pre><code> * <br><pre><code>
* > mvn clean install -Dcontainer-executor.conf.dir=/etc/hadoop -DskipTests * > mvn clean install -Pnative -Dcontainer-executor.conf.dir=/etc/hadoop
* -DskipTests
* </code></pre> * </code></pre>
* *
* <li>Set up <code>${container-executor.conf.dir}/taskcontroller.cfg</code> * <li>Set up <code>${container-executor.conf.dir}/container-executor.cfg</code>
* taskcontroller.cfg needs to be owned by root and have in it the proper * container-executor.cfg needs to be owned by root and have in it the proper
* config values. * config values.
* <br><pre><code> * <br><pre><code>
* > cat /etc/hadoop/taskcontroller.cfg * > cat /etc/hadoop/container-executor.cfg
* mapreduce.cluster.local.dir=/tmp/hadoop/nm-local/ * yarn.nodemanager.local-dirs=/tmp/hadoop/nm-local/
* hadoop.log.dir=/tmp/hadoop/nm-log * yarn.nodemanager.log-dirs=/tmp/hadoop/nm-log
* mapreduce.tasktracker.group=mapred * yarn.nodemanager.linux-container-executor.group=mapred
* #depending on the user id of the application.submitter option * #depending on the user id of the application.submitter option
* min.user.id=1 * min.user.id=1
* > sudo chown root:root /etc/hadoop/taskcontroller.cfg * > sudo chown root:root /etc/hadoop/container-executor.cfg
* > sudo chmod 444 /etc/hadoop/taskcontroller.cfg * > sudo chmod 444 /etc/hadoop/container-executor.cfg
* </code></pre> * </code></pre>
* *
* <li>iMove the binary and set proper permissions on it. It needs to be owned * <li>iMove the binary and set proper permissions on it. It needs to be owned
* by root, the group needs to be the group configured in taskcontroller.cfg, * by root, the group needs to be the group configured in container-executor.cfg,
* and it needs the setuid bit set. (The build will also overwrite it so you * and it needs the setuid bit set. (The build will also overwrite it so you
* need to move it to a place that you can support it. * need to move it to a place that you can support it.
* <br><pre><code> * <br><pre><code>