YARN-3365. Enhanced NodeManager to support using the 'tc' tool via container-executor for outbound network traffic control. Contributed by Sidharta Seethana.
(cherry picked from commit b21c72777a
)
This commit is contained in:
parent
ddb5bb8fcf
commit
d8e17c58bc
|
@ -8,6 +8,10 @@ Release 2.8.0 - UNRELEASED
|
|||
|
||||
YARN-3345. Add non-exclusive node label API. (Wangda Tan via jianhe)
|
||||
|
||||
YARN-3365. Enhanced NodeManager to support using the 'tc' tool via
|
||||
container-executor for outbound network traffic control. (Sidharta Seethana
|
||||
via vinodkv)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-1880. Cleanup TestApplicationClientProtocolOnHA
|
||||
|
|
|
@ -40,6 +40,12 @@ static const int DEFAULT_MIN_USERID = 1000;
|
|||
|
||||
static const char* DEFAULT_BANNED_USERS[] = {"mapred", "hdfs", "bin", 0};
|
||||
|
||||
//location of traffic control binary
|
||||
static const char* TC_BIN = "/sbin/tc";
|
||||
static const char* TC_MODIFY_STATE_OPTS [] = { "-b" , NULL};
|
||||
static const char* TC_READ_STATE_OPTS [] = { "-b", NULL};
|
||||
static const char* TC_READ_STATS_OPTS [] = { "-s", "-b", NULL};
|
||||
|
||||
//struct to store the user details
|
||||
struct passwd *user_detail = NULL;
|
||||
|
||||
|
@ -291,27 +297,20 @@ static int write_exit_code_file(const char* exit_code_file, int exit_code) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the container process to exit and write the exit code to
|
||||
* the exit code file.
|
||||
* Returns the exit code of the container process.
|
||||
*/
|
||||
static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) {
|
||||
static int wait_and_get_exit_code(pid_t pid) {
|
||||
int child_status = -1;
|
||||
int exit_code = -1;
|
||||
int waitpid_result;
|
||||
|
||||
if (change_effective_user(nm_uid, nm_gid) != 0) {
|
||||
return -1;
|
||||
}
|
||||
do {
|
||||
waitpid_result = waitpid(pid, &child_status, 0);
|
||||
waitpid_result = waitpid(pid, &child_status, 0);
|
||||
} while (waitpid_result == -1 && errno == EINTR);
|
||||
|
||||
if (waitpid_result < 0) {
|
||||
fprintf(LOGFILE, "Error waiting for container process %d - %s\n",
|
||||
pid, strerror(errno));
|
||||
fprintf(LOGFILE, "error waiting for process %d - %s\n", pid, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (WIFEXITED(child_status)) {
|
||||
exit_code = WEXITSTATUS(child_status);
|
||||
} else if (WIFSIGNALED(child_status)) {
|
||||
|
@ -319,9 +318,26 @@ static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) {
|
|||
} else {
|
||||
fprintf(LOGFILE, "Unable to determine exit status for pid %d\n", pid);
|
||||
}
|
||||
|
||||
return exit_code;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the container process to exit and write the exit code to
|
||||
* the exit code file.
|
||||
* Returns the exit code of the container process.
|
||||
*/
|
||||
static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) {
|
||||
int exit_code = -1;
|
||||
|
||||
if (change_effective_user(nm_uid, nm_gid) != 0) {
|
||||
return -1;
|
||||
}
|
||||
exit_code = wait_and_get_exit_code(pid);
|
||||
if (write_exit_code_file(exit_code_file, exit_code) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return exit_code;
|
||||
}
|
||||
|
||||
|
@ -1470,3 +1486,63 @@ int mount_cgroup(const char *pair, const char *hierarchy) {
|
|||
#endif
|
||||
}
|
||||
|
||||
static int run_traffic_control(const char *opts[], char *command_file) {
|
||||
const int max_tc_args = 16;
|
||||
char *args[max_tc_args];
|
||||
int i = 0, j = 0;
|
||||
|
||||
args[i++] = TC_BIN;
|
||||
while (opts[j] != NULL && i < max_tc_args - 1) {
|
||||
args[i] = opts[j];
|
||||
++i, ++j;
|
||||
}
|
||||
//too many args to tc
|
||||
if (i == max_tc_args - 1) {
|
||||
fprintf(LOGFILE, "too many args to tc");
|
||||
return TRAFFIC_CONTROL_EXECUTION_FAILED;
|
||||
}
|
||||
args[i++] = command_file;
|
||||
args[i] = 0;
|
||||
|
||||
pid_t child_pid = fork();
|
||||
if (child_pid != 0) {
|
||||
int exit_code = wait_and_get_exit_code(child_pid);
|
||||
if (exit_code != 0) {
|
||||
fprintf(LOGFILE, "failed to execute tc command!\n");
|
||||
return TRAFFIC_CONTROL_EXECUTION_FAILED;
|
||||
}
|
||||
unlink(command_file);
|
||||
return 0;
|
||||
} else {
|
||||
execv(TC_BIN, args);
|
||||
//if we reach here, exec failed
|
||||
fprintf(LOGFILE, "failed to execute tc command! error: %s\n", strerror(errno));
|
||||
return TRAFFIC_CONTROL_EXECUTION_FAILED;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a batch of tc commands that modify interface configuration. command_file
|
||||
* is deleted after being used.
|
||||
*/
|
||||
int traffic_control_modify_state(char *command_file) {
|
||||
return run_traffic_control(TC_MODIFY_STATE_OPTS, command_file);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a batch of tc commands that read interface configuration. Output is
|
||||
* written to standard output and it is expected to be read and parsed by the
|
||||
* calling process. command_file is deleted after being used.
|
||||
*/
|
||||
int traffic_control_read_state(char *command_file) {
|
||||
return run_traffic_control(TC_READ_STATE_OPTS, command_file);
|
||||
}
|
||||
|
||||
/**
|
||||
* Run a batch of tc commands that read interface stats. Output is
|
||||
* written to standard output and it is expected to be read and parsed by the
|
||||
* calling process. command_file is deleted after being used.
|
||||
*/
|
||||
int traffic_control_read_stats(char *command_file) {
|
||||
return run_traffic_control(TC_READ_STATS_OPTS, command_file);
|
||||
}
|
||||
|
|
|
@ -54,7 +54,20 @@ enum errorcodes {
|
|||
INVALID_CONFIG_FILE = 24,
|
||||
SETSID_OPER_FAILED = 25,
|
||||
WRITE_PIDFILE_FAILED = 26,
|
||||
WRITE_CGROUP_FAILED = 27
|
||||
WRITE_CGROUP_FAILED = 27,
|
||||
TRAFFIC_CONTROL_EXECUTION_FAILED = 28
|
||||
};
|
||||
|
||||
enum operations {
|
||||
CHECK_SETUP = 1,
|
||||
MOUNT_CGROUPS = 2,
|
||||
TRAFFIC_CONTROL_MODIFY_STATE = 3,
|
||||
TRAFFIC_CONTROL_READ_STATE = 4,
|
||||
TRAFFIC_CONTROL_READ_STATS = 5,
|
||||
RUN_AS_USER_INITIALIZE_CONTAINER = 6,
|
||||
RUN_AS_USER_LAUNCH_CONTAINER = 7,
|
||||
RUN_AS_USER_SIGNAL_CONTAINER = 8,
|
||||
RUN_AS_USER_DELETE = 9
|
||||
};
|
||||
|
||||
#define NM_GROUP_KEY "yarn.nodemanager.linux-container-executor.group"
|
||||
|
@ -209,3 +222,22 @@ int check_dir(char* npath, mode_t st_mode, mode_t desired,
|
|||
|
||||
int create_validate_dir(char* npath, mode_t perm, char* path,
|
||||
int finalComponent);
|
||||
|
||||
/**
|
||||
* Run a batch of tc commands that modify interface configuration
|
||||
*/
|
||||
int traffic_control_modify_state(char *command_file);
|
||||
|
||||
/**
|
||||
* Run a batch of tc commands that read interface configuration. Output is
|
||||
* written to standard output and it is expected to be read and parsed by the
|
||||
* calling process.
|
||||
*/
|
||||
int traffic_control_read_state(char *command_file);
|
||||
|
||||
/**
|
||||
* Run a batch of tc commands that read interface stats. Output is
|
||||
* written to standard output and it is expected to be read and parsed by the
|
||||
* calling process.
|
||||
*/
|
||||
int traffic_control_read_stats(char *command_file);
|
||||
|
|
|
@ -42,84 +42,71 @@
|
|||
#error HADOOP_CONF_DIR must be defined
|
||||
#endif
|
||||
|
||||
void display_usage(FILE *stream) {
|
||||
fprintf(stream,
|
||||
"Usage: container-executor --checksetup\n");
|
||||
fprintf(stream,
|
||||
"Usage: container-executor --mount-cgroups "\
|
||||
"hierarchy controller=path...\n");
|
||||
fprintf(stream,
|
||||
"Usage: container-executor user yarn-user command command-args\n");
|
||||
fprintf(stream, "Commands:\n");
|
||||
fprintf(stream, " initialize container: %2d appid tokens " \
|
||||
"nm-local-dirs nm-log-dirs cmd app...\n", INITIALIZE_CONTAINER);
|
||||
fprintf(stream,
|
||||
" launch container: %2d appid containerid workdir "\
|
||||
"container-script tokens pidfile nm-local-dirs nm-log-dirs resources\n",
|
||||
LAUNCH_CONTAINER);
|
||||
fprintf(stream, " signal container: %2d container-pid signal\n",
|
||||
SIGNAL_CONTAINER);
|
||||
fprintf(stream, " delete as user: %2d relative-path\n",
|
||||
DELETE_AS_USER);
|
||||
static void display_usage(FILE *stream) {
|
||||
char *usage_template =
|
||||
"Usage: container-executor --checksetup\n" \
|
||||
" container-executor --mount-cgroups <hierarchy> <controller=path>...\n" \
|
||||
" container-executor --tc-modify-state <command-file>\n" \
|
||||
" container-executor --tc-read-state <command-file>\n" \
|
||||
" container-executor --tc-read-stats <command-file>\n" \
|
||||
" container-executor <user> <yarn-user> <command> <command-args>\n" \
|
||||
" where command and command-args: \n" \
|
||||
" initialize container: %2d appid tokens nm-local-dirs nm-log-dirs cmd app...\n" \
|
||||
" launch container: %2d appid containerid workdir container-script " \
|
||||
"tokens pidfile nm-local-dirs nm-log-dirs resources optional-tc-command-file\n" \
|
||||
" signal container: %2d container-pid signal\n" \
|
||||
" delete as user: %2d relative-path\n" ;
|
||||
|
||||
|
||||
fprintf(stream, usage_template, INITIALIZE_CONTAINER, LAUNCH_CONTAINER,
|
||||
SIGNAL_CONTAINER, DELETE_AS_USER);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
int invalid_args = 0;
|
||||
int do_check_setup = 0;
|
||||
int do_mount_cgroups = 0;
|
||||
|
||||
LOGFILE = stdout;
|
||||
ERRORFILE = stderr;
|
||||
|
||||
if (argc > 1) {
|
||||
if (strcmp("--mount-cgroups", argv[1]) == 0) {
|
||||
do_mount_cgroups = 1;
|
||||
}
|
||||
/* Sets up log files for normal/error logging */
|
||||
static void open_log_files() {
|
||||
if (LOGFILE == NULL) {
|
||||
LOGFILE = stdout;
|
||||
}
|
||||
|
||||
// Minimum number of arguments required to run
|
||||
// the std. container-executor commands is 4
|
||||
// 4 args not needed for checksetup option
|
||||
if (argc < 4 && !do_mount_cgroups) {
|
||||
invalid_args = 1;
|
||||
if (argc == 2) {
|
||||
const char *arg1 = argv[1];
|
||||
if (strcmp("--checksetup", arg1) == 0) {
|
||||
invalid_args = 0;
|
||||
do_check_setup = 1;
|
||||
}
|
||||
}
|
||||
if (ERRORFILE == NULL) {
|
||||
ERRORFILE = stderr;
|
||||
}
|
||||
}
|
||||
|
||||
/* Flushes and closes log files */
|
||||
static void flush_and_close_log_files() {
|
||||
if (LOGFILE != NULL) {
|
||||
fflush(LOGFILE);
|
||||
fclose(LOGFILE);
|
||||
LOGFILE = NULL;
|
||||
}
|
||||
|
||||
if (invalid_args != 0) {
|
||||
display_usage(stdout);
|
||||
return INVALID_ARGUMENT_NUMBER;
|
||||
if (ERRORFILE != NULL) {
|
||||
fflush(ERRORFILE);
|
||||
fclose(ERRORFILE);
|
||||
ERRORFILE = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
int command;
|
||||
const char * app_id = NULL;
|
||||
const char * container_id = NULL;
|
||||
const char * cred_file = NULL;
|
||||
const char * script_file = NULL;
|
||||
const char * current_dir = NULL;
|
||||
const char * pid_file = NULL;
|
||||
|
||||
int exit_code = 0;
|
||||
|
||||
char * dir_to_be_deleted = NULL;
|
||||
/** Validates the current container-executor setup. Causes program exit
|
||||
in case of validation failures. Also sets up configuration / group information etc.,
|
||||
This function is to be called in every invocation of container-executor, irrespective
|
||||
of whether an explicit checksetup operation is requested. */
|
||||
|
||||
static void assert_valid_setup(char *current_executable) {
|
||||
char *executable_file = get_executable();
|
||||
|
||||
char *orig_conf_file = HADOOP_CONF_DIR "/" CONF_FILENAME;
|
||||
char *conf_file = resolve_config_path(orig_conf_file, argv[0]);
|
||||
char *local_dirs, *log_dirs;
|
||||
char *resources, *resources_key, *resources_value;
|
||||
char *conf_file = resolve_config_path(orig_conf_file, current_executable);
|
||||
|
||||
if (conf_file == NULL) {
|
||||
fprintf(ERRORFILE, "Configuration file %s not found.\n", orig_conf_file);
|
||||
flush_and_close_log_files();
|
||||
exit(INVALID_CONFIG_FILE);
|
||||
}
|
||||
|
||||
if (check_configuration_permissions(conf_file) != 0) {
|
||||
flush_and_close_log_files();
|
||||
exit(INVALID_CONFIG_FILE);
|
||||
}
|
||||
read_config(conf_file);
|
||||
|
@ -129,13 +116,14 @@ int main(int argc, char **argv) {
|
|||
char *nm_group = get_value(NM_GROUP_KEY);
|
||||
if (nm_group == NULL) {
|
||||
fprintf(ERRORFILE, "Can't get configured value for %s.\n", NM_GROUP_KEY);
|
||||
flush_and_close_log_files();
|
||||
exit(INVALID_CONFIG_FILE);
|
||||
}
|
||||
struct group *group_info = getgrnam(nm_group);
|
||||
if (group_info == NULL) {
|
||||
fprintf(ERRORFILE, "Can't get group information for %s - %s.\n", nm_group,
|
||||
strerror(errno));
|
||||
fflush(LOGFILE);
|
||||
flush_and_close_log_files();
|
||||
exit(INVALID_CONFIG_FILE);
|
||||
}
|
||||
set_nm_uid(getuid(), group_info->gr_gid);
|
||||
|
@ -146,91 +134,162 @@ int main(int argc, char **argv) {
|
|||
|
||||
if (check_executor_permissions(executable_file) != 0) {
|
||||
fprintf(ERRORFILE, "Invalid permissions on container-executor binary.\n");
|
||||
return INVALID_CONTAINER_EXEC_PERMISSIONS;
|
||||
flush_and_close_log_files();
|
||||
exit(INVALID_CONTAINER_EXEC_PERMISSIONS);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* Use to store parsed input parmeters for various operations */
|
||||
static struct {
|
||||
char *cgroups_hierarchy;
|
||||
char *traffic_control_command_file;
|
||||
const char * run_as_user_name;
|
||||
const char * yarn_user_name;
|
||||
char *local_dirs;
|
||||
char *log_dirs;
|
||||
char *resources_key;
|
||||
char *resources_value;
|
||||
char **resources_values;
|
||||
const char * app_id;
|
||||
const char * container_id;
|
||||
const char * cred_file;
|
||||
const char * script_file;
|
||||
const char * current_dir;
|
||||
const char * pid_file;
|
||||
const char *dir_to_be_deleted;
|
||||
int container_pid;
|
||||
int signal;
|
||||
} cmd_input;
|
||||
|
||||
static int validate_run_as_user_commands(int argc, char **argv, int *operation);
|
||||
|
||||
/* Validates that arguments used in the invocation are valid. In case of validation
|
||||
failure, an 'errorcode' is returned. In case of successful validation, a zero is
|
||||
returned and 'operation' is populated based on the operation being requested.
|
||||
Ideally, we should re-factor container-executor to use a more structured, command
|
||||
line parsing mechanism (e.g getopt). For the time being, we'll use this manual
|
||||
validation mechanism so that we don't have to change the invocation interface.
|
||||
*/
|
||||
|
||||
static int validate_arguments(int argc, char **argv , int *operation) {
|
||||
if (argc < 2) {
|
||||
display_usage(stdout);
|
||||
return INVALID_ARGUMENT_NUMBER;
|
||||
}
|
||||
|
||||
if (do_check_setup != 0) {
|
||||
// basic setup checks done
|
||||
// verified configs available and valid
|
||||
// verified executor permissions
|
||||
if (strcmp("--checksetup", argv[1]) == 0) {
|
||||
*operation = CHECK_SETUP;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (do_mount_cgroups) {
|
||||
optind++;
|
||||
char *hierarchy = argv[optind++];
|
||||
int result = 0;
|
||||
|
||||
while (optind < argc && result == 0) {
|
||||
result = mount_cgroup(argv[optind++], hierarchy);
|
||||
if (strcmp("--mount-cgroups", argv[1]) == 0) {
|
||||
if (argc < 4) {
|
||||
display_usage(stdout);
|
||||
return INVALID_ARGUMENT_NUMBER;
|
||||
}
|
||||
|
||||
return result;
|
||||
optind++;
|
||||
cmd_input.cgroups_hierarchy = argv[optind++];
|
||||
*operation = MOUNT_CGROUPS;
|
||||
return 0;
|
||||
}
|
||||
|
||||
//checks done for user name
|
||||
if (argv[optind] == NULL) {
|
||||
fprintf(ERRORFILE, "Invalid user name.\n");
|
||||
return INVALID_USER_NAME;
|
||||
if (strcmp("--tc-modify-state", argv[1]) == 0) {
|
||||
if (argc != 3) {
|
||||
display_usage(stdout);
|
||||
return INVALID_ARGUMENT_NUMBER;
|
||||
}
|
||||
optind++;
|
||||
cmd_input.traffic_control_command_file = argv[optind++];
|
||||
*operation = TRAFFIC_CONTROL_MODIFY_STATE;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int ret = set_user(argv[optind]);
|
||||
if (ret != 0) {
|
||||
return ret;
|
||||
if (strcmp("--tc-read-state", argv[1]) == 0) {
|
||||
if (argc != 3) {
|
||||
display_usage(stdout);
|
||||
return INVALID_ARGUMENT_NUMBER;
|
||||
}
|
||||
optind++;
|
||||
cmd_input.traffic_control_command_file = argv[optind++];
|
||||
*operation = TRAFFIC_CONTROL_READ_STATE;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// this string is used for building pathnames, the
|
||||
// process management is done based on the 'user_detail'
|
||||
// global, which was set by 'set_user()' above
|
||||
optind = optind + 1;
|
||||
char *yarn_user_name = argv[optind];
|
||||
if (yarn_user_name == NULL) {
|
||||
fprintf(ERRORFILE, "Invalid yarn user name.\n");
|
||||
return INVALID_USER_NAME;
|
||||
if (strcmp("--tc-read-stats", argv[1]) == 0) {
|
||||
if (argc != 3) {
|
||||
display_usage(stdout);
|
||||
return INVALID_ARGUMENT_NUMBER;
|
||||
}
|
||||
optind++;
|
||||
cmd_input.traffic_control_command_file = argv[optind++];
|
||||
*operation = TRAFFIC_CONTROL_READ_STATS;
|
||||
return 0;
|
||||
}
|
||||
|
||||
optind = optind + 1;
|
||||
command = atoi(argv[optind++]);
|
||||
|
||||
fprintf(LOGFILE, "main : command provided %d\n",command);
|
||||
fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name);
|
||||
fprintf(LOGFILE, "main : requested yarn user is %s\n", yarn_user_name);
|
||||
/* Now we have to validate 'run as user' operations that don't use
|
||||
a 'long option' - we should fix this at some point. The validation/argument
|
||||
parsing here is extensive enough that it done in a separate function */
|
||||
|
||||
return validate_run_as_user_commands(argc, argv, operation);
|
||||
}
|
||||
|
||||
/* Parse/validate 'run as user' commands */
|
||||
static int validate_run_as_user_commands(int argc, char **argv, int *operation) {
|
||||
/* We need at least the following arguments in order to proceed further :
|
||||
<user>, <yarn-user> <command> - i.e at argc should be at least 4 */
|
||||
|
||||
if (argc < 4) {
|
||||
display_usage(stdout);
|
||||
return INVALID_ARGUMENT_NUMBER;
|
||||
}
|
||||
|
||||
cmd_input.run_as_user_name = argv[optind++];
|
||||
cmd_input.yarn_user_name = argv[optind++];
|
||||
int command = atoi(argv[optind++]);
|
||||
|
||||
fprintf(LOGFILE, "main : command provided %d\n", command);
|
||||
fprintf(LOGFILE, "main : run as user is %s\n", cmd_input.run_as_user_name);
|
||||
fprintf(LOGFILE, "main : requested yarn user is %s\n", cmd_input.yarn_user_name);
|
||||
fflush(LOGFILE);
|
||||
|
||||
switch (command) {
|
||||
case INITIALIZE_CONTAINER:
|
||||
if (argc < 9) {
|
||||
fprintf(ERRORFILE, "Too few arguments (%d vs 9) for initialize container\n",
|
||||
argc);
|
||||
argc);
|
||||
fflush(ERRORFILE);
|
||||
return INVALID_ARGUMENT_NUMBER;
|
||||
}
|
||||
app_id = argv[optind++];
|
||||
cred_file = argv[optind++];
|
||||
local_dirs = argv[optind++];// good local dirs as a comma separated list
|
||||
log_dirs = argv[optind++];// good log dirs as a comma separated list
|
||||
exit_code = initialize_app(yarn_user_name, app_id, cred_file,
|
||||
extract_values(local_dirs),
|
||||
extract_values(log_dirs), argv + optind);
|
||||
break;
|
||||
cmd_input.app_id = argv[optind++];
|
||||
cmd_input.cred_file = argv[optind++];
|
||||
cmd_input.local_dirs = argv[optind++];// good local dirs as a comma separated list
|
||||
cmd_input.log_dirs = argv[optind++];// good log dirs as a comma separated list
|
||||
|
||||
*operation = RUN_AS_USER_INITIALIZE_CONTAINER;
|
||||
return 0;
|
||||
|
||||
case LAUNCH_CONTAINER:
|
||||
if (argc != 13) {
|
||||
fprintf(ERRORFILE, "Wrong number of arguments (%d vs 13) for launch container\n",
|
||||
argc);
|
||||
//kill me now.
|
||||
if (!(argc == 13 || argc == 14)) {
|
||||
fprintf(ERRORFILE, "Wrong number of arguments (%d vs 13 or 14) for launch container\n",
|
||||
argc);
|
||||
fflush(ERRORFILE);
|
||||
return INVALID_ARGUMENT_NUMBER;
|
||||
}
|
||||
app_id = argv[optind++];
|
||||
container_id = argv[optind++];
|
||||
current_dir = argv[optind++];
|
||||
script_file = argv[optind++];
|
||||
cred_file = argv[optind++];
|
||||
pid_file = argv[optind++];
|
||||
local_dirs = argv[optind++];// good local dirs as a comma separated list
|
||||
log_dirs = argv[optind++];// good log dirs as a comma separated list
|
||||
resources = argv[optind++];// key,value pair describing resources
|
||||
char *resources_key = malloc(strlen(resources));
|
||||
char *resources_value = malloc(strlen(resources));
|
||||
|
||||
cmd_input.app_id = argv[optind++];
|
||||
cmd_input.container_id = argv[optind++];
|
||||
cmd_input.current_dir = argv[optind++];
|
||||
cmd_input.script_file = argv[optind++];
|
||||
cmd_input.cred_file = argv[optind++];
|
||||
cmd_input.pid_file = argv[optind++];
|
||||
cmd_input.local_dirs = argv[optind++];// good local dirs as a comma separated list
|
||||
cmd_input.log_dirs = argv[optind++];// good log dirs as a comma separated list
|
||||
char * resources = argv[optind++];// key,value pair describing resources
|
||||
char * resources_key = malloc(strlen(resources));
|
||||
char * resources_value = malloc(strlen(resources));
|
||||
|
||||
if (get_kv_key(resources, resources_key, strlen(resources)) < 0 ||
|
||||
get_kv_value(resources, resources_value, strlen(resources)) < 0) {
|
||||
fprintf(ERRORFILE, "Invalid arguments for cgroups resources: %s",
|
||||
|
@ -240,51 +299,157 @@ int main(int argc, char **argv) {
|
|||
free(resources_value);
|
||||
return INVALID_ARGUMENT_NUMBER;
|
||||
}
|
||||
char** resources_values = extract_values(resources_value);
|
||||
exit_code = launch_container_as_user(yarn_user_name, app_id,
|
||||
container_id, current_dir, script_file, cred_file,
|
||||
pid_file, extract_values(local_dirs),
|
||||
extract_values(log_dirs), resources_key,
|
||||
resources_values);
|
||||
free(resources_key);
|
||||
free(resources_value);
|
||||
break;
|
||||
|
||||
//network isolation through tc
|
||||
if (argc == 14) {
|
||||
cmd_input.traffic_control_command_file = argv[optind++];
|
||||
}
|
||||
|
||||
cmd_input.resources_key = resources_key;
|
||||
cmd_input.resources_value = resources_value;
|
||||
cmd_input.resources_values = extract_values(resources_value);
|
||||
*operation = RUN_AS_USER_LAUNCH_CONTAINER;
|
||||
return 0;
|
||||
|
||||
case SIGNAL_CONTAINER:
|
||||
if (argc != 6) {
|
||||
fprintf(ERRORFILE, "Wrong number of arguments (%d vs 6) for " \
|
||||
"signal container\n", argc);
|
||||
fflush(ERRORFILE);
|
||||
return INVALID_ARGUMENT_NUMBER;
|
||||
} else {
|
||||
char* end_ptr = NULL;
|
||||
char* option = argv[optind++];
|
||||
int container_pid = strtol(option, &end_ptr, 10);
|
||||
if (option == end_ptr || *end_ptr != '\0') {
|
||||
fprintf(ERRORFILE, "Illegal argument for container pid %s\n", option);
|
||||
fflush(ERRORFILE);
|
||||
return INVALID_ARGUMENT_NUMBER;
|
||||
}
|
||||
option = argv[optind++];
|
||||
int signal = strtol(option, &end_ptr, 10);
|
||||
if (option == end_ptr || *end_ptr != '\0') {
|
||||
fprintf(ERRORFILE, "Illegal argument for signal %s\n", option);
|
||||
fflush(ERRORFILE);
|
||||
return INVALID_ARGUMENT_NUMBER;
|
||||
}
|
||||
exit_code = signal_container_as_user(yarn_user_name, container_pid, signal);
|
||||
}
|
||||
break;
|
||||
|
||||
char* end_ptr = NULL;
|
||||
char* option = argv[optind++];
|
||||
cmd_input.container_pid = strtol(option, &end_ptr, 10);
|
||||
if (option == end_ptr || *end_ptr != '\0') {
|
||||
fprintf(ERRORFILE, "Illegal argument for container pid %s\n", option);
|
||||
fflush(ERRORFILE);
|
||||
return INVALID_ARGUMENT_NUMBER;
|
||||
}
|
||||
option = argv[optind++];
|
||||
cmd_input.signal = strtol(option, &end_ptr, 10);
|
||||
if (option == end_ptr || *end_ptr != '\0') {
|
||||
fprintf(ERRORFILE, "Illegal argument for signal %s\n", option);
|
||||
fflush(ERRORFILE);
|
||||
return INVALID_ARGUMENT_NUMBER;
|
||||
}
|
||||
|
||||
*operation = RUN_AS_USER_SIGNAL_CONTAINER;
|
||||
return 0;
|
||||
|
||||
case DELETE_AS_USER:
|
||||
dir_to_be_deleted = argv[optind++];
|
||||
exit_code= delete_as_user(yarn_user_name, dir_to_be_deleted,
|
||||
argv + optind);
|
||||
break;
|
||||
cmd_input.dir_to_be_deleted = argv[optind++];
|
||||
*operation = RUN_AS_USER_DELETE;
|
||||
return 0;
|
||||
default:
|
||||
fprintf(ERRORFILE, "Invalid command %d not supported.",command);
|
||||
fflush(ERRORFILE);
|
||||
exit_code = INVALID_COMMAND_PROVIDED;
|
||||
return INVALID_COMMAND_PROVIDED;
|
||||
}
|
||||
fclose(LOGFILE);
|
||||
fclose(ERRORFILE);
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
open_log_files();
|
||||
assert_valid_setup(argv[0]);
|
||||
|
||||
int operation;
|
||||
int ret = validate_arguments(argc, argv, &operation);
|
||||
|
||||
if (ret != 0) {
|
||||
flush_and_close_log_files();
|
||||
return ret;
|
||||
}
|
||||
|
||||
int exit_code = 0;
|
||||
|
||||
switch (operation) {
|
||||
case CHECK_SETUP:
|
||||
//we already did this
|
||||
exit_code = 0;
|
||||
break;
|
||||
case MOUNT_CGROUPS:
|
||||
exit_code = 0;
|
||||
|
||||
while (optind < argc && exit_code == 0) {
|
||||
exit_code = mount_cgroup(argv[optind++], cmd_input.cgroups_hierarchy);
|
||||
}
|
||||
|
||||
break;
|
||||
case TRAFFIC_CONTROL_MODIFY_STATE:
|
||||
exit_code = traffic_control_modify_state(cmd_input.traffic_control_command_file);
|
||||
break;
|
||||
case TRAFFIC_CONTROL_READ_STATE:
|
||||
exit_code = traffic_control_read_state(cmd_input.traffic_control_command_file);
|
||||
break;
|
||||
case TRAFFIC_CONTROL_READ_STATS:
|
||||
exit_code = traffic_control_read_stats(cmd_input.traffic_control_command_file);
|
||||
break;
|
||||
case RUN_AS_USER_INITIALIZE_CONTAINER:
|
||||
exit_code = set_user(cmd_input.run_as_user_name);
|
||||
if (exit_code != 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
exit_code = initialize_app(cmd_input.yarn_user_name,
|
||||
cmd_input.app_id,
|
||||
cmd_input.cred_file,
|
||||
extract_values(cmd_input.local_dirs),
|
||||
extract_values(cmd_input.log_dirs),
|
||||
argv + optind);
|
||||
break;
|
||||
case RUN_AS_USER_LAUNCH_CONTAINER:
|
||||
if (cmd_input.traffic_control_command_file != NULL) {
|
||||
//apply tc rules before switching users and launching the container
|
||||
exit_code = traffic_control_modify_state(cmd_input.traffic_control_command_file);
|
||||
if( exit_code != 0) {
|
||||
//failed to apply tc rules - break out before launching the container
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
exit_code = set_user(cmd_input.run_as_user_name);
|
||||
if (exit_code != 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
exit_code = launch_container_as_user(cmd_input.yarn_user_name,
|
||||
cmd_input.app_id,
|
||||
cmd_input.container_id,
|
||||
cmd_input.current_dir,
|
||||
cmd_input.script_file,
|
||||
cmd_input.cred_file,
|
||||
cmd_input.pid_file,
|
||||
extract_values(cmd_input.local_dirs),
|
||||
extract_values(cmd_input.log_dirs),
|
||||
cmd_input.resources_key,
|
||||
cmd_input.resources_values);
|
||||
free(cmd_input.resources_key);
|
||||
free(cmd_input.resources_value);
|
||||
free(cmd_input.resources_values);
|
||||
break;
|
||||
case RUN_AS_USER_SIGNAL_CONTAINER:
|
||||
exit_code = set_user(cmd_input.run_as_user_name);
|
||||
if (exit_code != 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
exit_code = signal_container_as_user(cmd_input.yarn_user_name,
|
||||
cmd_input.container_pid,
|
||||
cmd_input.signal);
|
||||
break;
|
||||
case RUN_AS_USER_DELETE:
|
||||
exit_code = set_user(cmd_input.run_as_user_name);
|
||||
if (exit_code != 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
exit_code = delete_as_user(cmd_input.yarn_user_name,
|
||||
cmd_input.dir_to_be_deleted,
|
||||
argv + optind);
|
||||
break;
|
||||
}
|
||||
|
||||
flush_and_close_log_files();
|
||||
return exit_code;
|
||||
}
|
||||
|
|
|
@ -1,20 +1,20 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.nodemanager;
|
||||
|
||||
|
@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals;
|
|||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
@ -32,13 +33,13 @@ import java.io.FileOutputStream;
|
|||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.junit.Assert;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
@ -61,57 +62,88 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Cont
|
|||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Assume;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* This is intended to test the LinuxContainerExecutor code, but because of
|
||||
* some security restrictions this can only be done with some special setup
|
||||
* first.
|
||||
* <br><ol>
|
||||
* This is intended to test the LinuxContainerExecutor code, but because of some
|
||||
* security restrictions this can only be done with some special setup first. <br>
|
||||
* <ol>
|
||||
* <li>Compile the code with container-executor.conf.dir set to the location you
|
||||
* want for testing.
|
||||
* <br><pre><code>
|
||||
* want for testing. <br>
|
||||
*
|
||||
* <pre>
|
||||
* <code>
|
||||
* > mvn clean install -Pnative -Dcontainer-executor.conf.dir=/etc/hadoop
|
||||
* -DskipTests
|
||||
* </code></pre>
|
||||
* </code>
|
||||
* </pre>
|
||||
*
|
||||
* <li>Set up <code>${container-executor.conf.dir}/container-executor.cfg</code>
|
||||
* container-executor.cfg needs to be owned by root and have in it the proper
|
||||
* config values.
|
||||
* <br><pre><code>
|
||||
* config values. <br>
|
||||
*
|
||||
* <pre>
|
||||
* <code>
|
||||
* > cat /etc/hadoop/container-executor.cfg
|
||||
* yarn.nodemanager.linux-container-executor.group=mapred
|
||||
* #depending on the user id of the application.submitter option
|
||||
* min.user.id=1
|
||||
* > sudo chown root:root /etc/hadoop/container-executor.cfg
|
||||
* > sudo chmod 444 /etc/hadoop/container-executor.cfg
|
||||
* </code></pre>
|
||||
* </code>
|
||||
* </pre>
|
||||
*
|
||||
* <li>Move the binary and set proper permissions on it. It needs to be owned
|
||||
* by root, the group needs to be the group configured in container-executor.cfg,
|
||||
* <li>Move the binary and set proper permissions on it. It needs to be owned by
|
||||
* root, the group needs to be the group configured in container-executor.cfg,
|
||||
* and it needs the setuid bit set. (The build will also overwrite it so you
|
||||
* need to move it to a place that you can support it.
|
||||
* <br><pre><code>
|
||||
* need to move it to a place that you can support it. <br>
|
||||
*
|
||||
* <pre>
|
||||
* <code>
|
||||
* > cp ./hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/container-executor /tmp/
|
||||
* > sudo chown root:mapred /tmp/container-executor
|
||||
* > sudo chmod 4550 /tmp/container-executor
|
||||
* </code></pre>
|
||||
* > sudo chmod 4050 /tmp/container-executor
|
||||
* </code>
|
||||
* </pre>
|
||||
*
|
||||
* <li>Run the tests with the execution enabled (The user you run the tests as
|
||||
* needs to be part of the group from the config.
|
||||
* <br><pre><code>
|
||||
* needs to be part of the group from the config. <br>
|
||||
*
|
||||
* <pre>
|
||||
* <code>
|
||||
* mvn test -Dtest=TestLinuxContainerExecutor -Dapplication.submitter=nobody -Dcontainer-executor.path=/tmp/container-executor
|
||||
* </code></pre>
|
||||
* </code>
|
||||
* </pre>
|
||||
*
|
||||
* <li>The test suite also contains tests to test mounting of CGroups. By
|
||||
* default, these tests are not run. To run them, add -Dcgroups.mount=<mount-point>
|
||||
* Please note that the test does not unmount the CGroups at the end of the test,
|
||||
* since that requires root permissions. <br>
|
||||
*
|
||||
* <li>The tests that are run are sensitive to directory permissions. All parent
|
||||
* directories must be searchable by the user that the tasks are run as. If you
|
||||
* wish to run the tests in a different directory, please set it using
|
||||
* -Dworkspace.dir
|
||||
*
|
||||
* </ol>
|
||||
*/
|
||||
public class TestLinuxContainerExecutor {
|
||||
private static final Log LOG = LogFactory
|
||||
.getLog(TestLinuxContainerExecutor.class);
|
||||
|
||||
private static File workSpace = new File("target",
|
||||
TestLinuxContainerExecutor.class.getName() + "-workSpace");
|
||||
|
||||
.getLog(TestLinuxContainerExecutor.class);
|
||||
|
||||
private static File workSpace;
|
||||
static {
|
||||
String basedir = System.getProperty("workspace.dir");
|
||||
if(basedir == null || basedir.isEmpty()) {
|
||||
basedir = "target";
|
||||
}
|
||||
workSpace = new File(basedir,
|
||||
TestLinuxContainerExecutor.class.getName() + "-workSpace");
|
||||
}
|
||||
|
||||
private LinuxContainerExecutor exec = null;
|
||||
private String appSubmitter = null;
|
||||
private LocalDirsHandlerService dirsHandler;
|
||||
|
@ -125,20 +157,26 @@ public class TestLinuxContainerExecutor {
|
|||
files.mkdir(workSpacePath, null, true);
|
||||
FileUtil.chmod(workSpace.getAbsolutePath(), "777");
|
||||
File localDir = new File(workSpace.getAbsoluteFile(), "localDir");
|
||||
files.mkdir(new Path(localDir.getAbsolutePath()),
|
||||
new FsPermission("777"), false);
|
||||
files.mkdir(new Path(localDir.getAbsolutePath()), new FsPermission("777"),
|
||||
false);
|
||||
File logDir = new File(workSpace.getAbsoluteFile(), "logDir");
|
||||
files.mkdir(new Path(logDir.getAbsolutePath()),
|
||||
new FsPermission("777"), false);
|
||||
files.mkdir(new Path(logDir.getAbsolutePath()), new FsPermission("777"),
|
||||
false);
|
||||
String exec_path = System.getProperty("container-executor.path");
|
||||
if(exec_path != null && !exec_path.isEmpty()) {
|
||||
if (exec_path != null && !exec_path.isEmpty()) {
|
||||
conf = new Configuration(false);
|
||||
conf.setClass("fs.AbstractFileSystem.file.impl",
|
||||
org.apache.hadoop.fs.local.LocalFs.class,
|
||||
org.apache.hadoop.fs.AbstractFileSystem.class);
|
||||
conf.set(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY, "xuan");
|
||||
LOG.info("Setting "+YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH
|
||||
+"="+exec_path);
|
||||
|
||||
appSubmitter = System.getProperty("application.submitter");
|
||||
if (appSubmitter == null || appSubmitter.isEmpty()) {
|
||||
appSubmitter = "nobody";
|
||||
}
|
||||
|
||||
conf.set(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY, appSubmitter);
|
||||
LOG.info("Setting " + YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH
|
||||
+ "=" + exec_path);
|
||||
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, exec_path);
|
||||
exec = new LinuxContainerExecutor();
|
||||
exec.setConf(conf);
|
||||
|
@ -146,34 +184,86 @@ public class TestLinuxContainerExecutor {
|
|||
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.getAbsolutePath());
|
||||
dirsHandler = new LocalDirsHandlerService();
|
||||
dirsHandler.init(conf);
|
||||
List<String> localDirs = dirsHandler.getLocalDirs();
|
||||
for (String dir : localDirs) {
|
||||
Path userDir = new Path(dir, ContainerLocalizer.USERCACHE);
|
||||
files.mkdir(userDir, new FsPermission("777"), false);
|
||||
// $local/filecache
|
||||
Path fileDir = new Path(dir, ContainerLocalizer.FILECACHE);
|
||||
files.mkdir(fileDir, new FsPermission("777"), false);
|
||||
}
|
||||
}
|
||||
appSubmitter = System.getProperty("application.submitter");
|
||||
if(appSubmitter == null || appSubmitter.isEmpty()) {
|
||||
appSubmitter = "nobody";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
FileContext.getLocalFSFileContext().delete(
|
||||
new Path(workSpace.getAbsolutePath()), true);
|
||||
new Path(workSpace.getAbsolutePath()), true);
|
||||
}
|
||||
|
||||
private void cleanupUserAppCache(String user) throws Exception {
|
||||
List<String> localDirs = dirsHandler.getLocalDirs();
|
||||
for (String dir : localDirs) {
|
||||
Path usercachedir = new Path(dir, ContainerLocalizer.USERCACHE);
|
||||
Path userdir = new Path(usercachedir, user);
|
||||
Path appcachedir = new Path(userdir, ContainerLocalizer.APPCACHE);
|
||||
exec.deleteAsUser(user, appcachedir);
|
||||
FileContext.getLocalFSFileContext().delete(usercachedir, true);
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanupUserFileCache(String user) {
|
||||
List<String> localDirs = dirsHandler.getLocalDirs();
|
||||
for (String dir : localDirs) {
|
||||
Path filecache = new Path(dir, ContainerLocalizer.FILECACHE);
|
||||
Path filedir = new Path(filecache, user);
|
||||
exec.deleteAsUser(user, filedir);
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanupLogDirs(String user) {
|
||||
List<String> logDirs = dirsHandler.getLogDirs();
|
||||
for (String dir : logDirs) {
|
||||
String appId = "APP_" + id;
|
||||
String containerId = "CONTAINER_" + (id - 1);
|
||||
Path appdir = new Path(dir, appId);
|
||||
Path containerdir = new Path(appdir, containerId);
|
||||
exec.deleteAsUser(user, containerdir);
|
||||
}
|
||||
}
|
||||
|
||||
private void cleanupAppFiles(String user) throws Exception {
|
||||
cleanupUserAppCache(user);
|
||||
cleanupUserFileCache(user);
|
||||
cleanupLogDirs(user);
|
||||
|
||||
String[] files =
|
||||
{ "launch_container.sh", "container_tokens", "touch-file" };
|
||||
Path ws = new Path(workSpace.toURI());
|
||||
for (String file : files) {
|
||||
File f = new File(workSpace, file);
|
||||
if (f.exists()) {
|
||||
exec.deleteAsUser(user, new Path(file), ws);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private boolean shouldRun() {
|
||||
if(exec == null) {
|
||||
if (exec == null) {
|
||||
LOG.warn("Not running test because container-executor.path is not set");
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private String writeScriptFile(String ... cmd) throws IOException {
|
||||
|
||||
private String writeScriptFile(String... cmd) throws IOException {
|
||||
File f = File.createTempFile("TestLinuxContainerExecutor", ".sh");
|
||||
f.deleteOnExit();
|
||||
PrintWriter p = new PrintWriter(new FileOutputStream(f));
|
||||
p.println("#!/bin/sh");
|
||||
p.print("exec");
|
||||
for(String part: cmd) {
|
||||
for (String part : cmd) {
|
||||
p.print(" '");
|
||||
p.print(part.replace("\\", "\\\\").replace("'", "\\'"));
|
||||
p.print("'");
|
||||
|
@ -182,36 +272,36 @@ public class TestLinuxContainerExecutor {
|
|||
p.close();
|
||||
return f.getAbsolutePath();
|
||||
}
|
||||
|
||||
|
||||
private int id = 0;
|
||||
|
||||
private synchronized int getNextId() {
|
||||
id += 1;
|
||||
return id;
|
||||
}
|
||||
|
||||
|
||||
private ContainerId getNextContainerId() {
|
||||
ContainerId cId = mock(ContainerId.class);
|
||||
String id = "CONTAINER_"+getNextId();
|
||||
String id = "CONTAINER_" + getNextId();
|
||||
when(cId.toString()).thenReturn(id);
|
||||
return cId;
|
||||
}
|
||||
|
||||
|
||||
private int runAndBlock(String ... cmd) throws IOException {
|
||||
private int runAndBlock(String... cmd) throws IOException {
|
||||
return runAndBlock(getNextContainerId(), cmd);
|
||||
}
|
||||
|
||||
private int runAndBlock(ContainerId cId, String ... cmd) throws IOException {
|
||||
String appId = "APP_"+getNextId();
|
||||
|
||||
private int runAndBlock(ContainerId cId, String... cmd) throws IOException {
|
||||
String appId = "APP_" + getNextId();
|
||||
Container container = mock(Container.class);
|
||||
ContainerLaunchContext context = mock(ContainerLaunchContext.class);
|
||||
HashMap<String, String> env = new HashMap<String,String>();
|
||||
HashMap<String, String> env = new HashMap<String, String>();
|
||||
|
||||
when(container.getContainerId()).thenReturn(cId);
|
||||
when(container.getLaunchContext()).thenReturn(context);
|
||||
|
||||
when(context.getEnvironment()).thenReturn(env);
|
||||
|
||||
|
||||
String script = writeScriptFile(cmd);
|
||||
|
||||
Path scriptPath = new Path(script);
|
||||
|
@ -221,46 +311,36 @@ public class TestLinuxContainerExecutor {
|
|||
|
||||
exec.activateContainer(cId, pidFile);
|
||||
return exec.launchContainer(container, scriptPath, tokensPath,
|
||||
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
|
||||
dirsHandler.getLogDirs());
|
||||
appSubmitter, appId, workDir, dirsHandler.getLocalDirs(),
|
||||
dirsHandler.getLogDirs());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testContainerLocalizer() throws Exception {
|
||||
if (!shouldRun()) {
|
||||
return;
|
||||
}
|
||||
List<String> localDirs = dirsHandler.getLocalDirs();
|
||||
List<String> logDirs = dirsHandler.getLogDirs();
|
||||
for (String localDir : localDirs) {
|
||||
Path userDir =
|
||||
new Path(localDir, ContainerLocalizer.USERCACHE);
|
||||
files.mkdir(userDir, new FsPermission("777"), false);
|
||||
// $local/filecache
|
||||
Path fileDir =
|
||||
new Path(localDir, ContainerLocalizer.FILECACHE);
|
||||
files.mkdir(fileDir, new FsPermission("777"), false);
|
||||
}
|
||||
|
||||
Assume.assumeTrue(shouldRun());
|
||||
|
||||
String locId = "container_01_01";
|
||||
Path nmPrivateContainerTokensPath =
|
||||
dirsHandler.getLocalPathForWrite(
|
||||
ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
|
||||
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
|
||||
locId));
|
||||
dirsHandler
|
||||
.getLocalPathForWrite(ResourceLocalizationService.NM_PRIVATE_DIR
|
||||
+ Path.SEPARATOR
|
||||
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId));
|
||||
files.create(nmPrivateContainerTokensPath, EnumSet.of(CREATE, OVERWRITE));
|
||||
Configuration config = new YarnConfiguration(conf);
|
||||
InetSocketAddress nmAddr = config.getSocketAddr(
|
||||
YarnConfiguration.NM_BIND_HOST,
|
||||
YarnConfiguration.NM_LOCALIZER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
|
||||
InetSocketAddress nmAddr =
|
||||
config.getSocketAddr(YarnConfiguration.NM_BIND_HOST,
|
||||
YarnConfiguration.NM_LOCALIZER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
|
||||
String appId = "application_01_01";
|
||||
exec = new LinuxContainerExecutor() {
|
||||
@Override
|
||||
public void buildMainArgs(List<String> command, String user, String appId,
|
||||
String locId, InetSocketAddress nmAddr, List<String> localDirs) {
|
||||
MockContainerLocalizer.buildMainArgs(command, user, appId, locId, nmAddr,
|
||||
localDirs);
|
||||
public void buildMainArgs(List<String> command, String user,
|
||||
String appId, String locId, InetSocketAddress nmAddr,
|
||||
List<String> localDirs) {
|
||||
MockContainerLocalizer.buildMainArgs(command, user, appId, locId,
|
||||
nmAddr, localDirs);
|
||||
}
|
||||
};
|
||||
exec.setConf(conf);
|
||||
|
@ -277,44 +357,68 @@ public class TestLinuxContainerExecutor {
|
|||
files.create(nmPrivateContainerTokensPath2, EnumSet.of(CREATE, OVERWRITE));
|
||||
exec.startLocalizer(nmPrivateContainerTokensPath2, nmAddr, appSubmitter,
|
||||
appId, locId2, dirsHandler);
|
||||
cleanupUserAppCache(appSubmitter);
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testContainerLaunch() throws IOException {
|
||||
if (!shouldRun()) {
|
||||
return;
|
||||
}
|
||||
public void testContainerLaunch() throws Exception {
|
||||
Assume.assumeTrue(shouldRun());
|
||||
String expectedRunAsUser =
|
||||
conf.get(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY,
|
||||
YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER);
|
||||
|
||||
File touchFile = new File(workSpace, "touch-file");
|
||||
int ret = runAndBlock("touch", touchFile.getAbsolutePath());
|
||||
|
||||
|
||||
assertEquals(0, ret);
|
||||
FileStatus fileStatus = FileContext.getLocalFSFileContext().getFileStatus(
|
||||
FileStatus fileStatus =
|
||||
FileContext.getLocalFSFileContext().getFileStatus(
|
||||
new Path(touchFile.getAbsolutePath()));
|
||||
assertEquals(appSubmitter, fileStatus.getOwner());
|
||||
assertEquals(expectedRunAsUser, fileStatus.getOwner());
|
||||
cleanupAppFiles(expectedRunAsUser);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonSecureRunAsSubmitter() throws Exception {
|
||||
Assume.assumeTrue(shouldRun());
|
||||
Assume.assumeFalse(UserGroupInformation.isSecurityEnabled());
|
||||
String expectedRunAsUser = appSubmitter;
|
||||
conf.set(YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS, "false");
|
||||
exec.setConf(conf);
|
||||
File touchFile = new File(workSpace, "touch-file");
|
||||
int ret = runAndBlock("touch", touchFile.getAbsolutePath());
|
||||
|
||||
assertEquals(0, ret);
|
||||
FileStatus fileStatus =
|
||||
FileContext.getLocalFSFileContext().getFileStatus(
|
||||
new Path(touchFile.getAbsolutePath()));
|
||||
assertEquals(expectedRunAsUser, fileStatus.getOwner());
|
||||
cleanupAppFiles(expectedRunAsUser);
|
||||
// reset conf
|
||||
conf.unset(YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS);
|
||||
exec.setConf(conf);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testContainerKill() throws Exception {
|
||||
if (!shouldRun()) {
|
||||
return;
|
||||
}
|
||||
|
||||
final ContainerId sleepId = getNextContainerId();
|
||||
Assume.assumeTrue(shouldRun());
|
||||
|
||||
final ContainerId sleepId = getNextContainerId();
|
||||
Thread t = new Thread() {
|
||||
public void run() {
|
||||
try {
|
||||
runAndBlock(sleepId, "sleep", "100");
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Caught exception while running sleep",e);
|
||||
LOG.warn("Caught exception while running sleep", e);
|
||||
}
|
||||
};
|
||||
};
|
||||
t.setDaemon(true); //If it does not exit we shouldn't block the test.
|
||||
t.setDaemon(true); // If it does not exit we shouldn't block the test.
|
||||
t.start();
|
||||
|
||||
assertTrue(t.isAlive());
|
||||
|
||||
|
||||
String pid = null;
|
||||
int count = 10;
|
||||
while ((pid = exec.getProcessId(sleepId)) == null && count > 0) {
|
||||
|
@ -328,40 +432,77 @@ public class TestLinuxContainerExecutor {
|
|||
exec.signalContainer(appSubmitter, pid, Signal.TERM);
|
||||
LOG.info("sleeping for 100ms to let the sleep be killed");
|
||||
Thread.sleep(100);
|
||||
|
||||
|
||||
assertFalse(t.isAlive());
|
||||
cleanupAppFiles(appSubmitter);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCGroups() throws Exception {
|
||||
Assume.assumeTrue(shouldRun());
|
||||
String cgroupsMount = System.getProperty("cgroups.mount");
|
||||
Assume.assumeTrue((cgroupsMount != null) && !cgroupsMount.isEmpty());
|
||||
|
||||
assertTrue("Cgroups mount point does not exist", new File(
|
||||
cgroupsMount).exists());
|
||||
List<String> cgroupKVs = new ArrayList<>();
|
||||
|
||||
String hierarchy = "hadoop-yarn";
|
||||
String[] controllers = { "cpu", "net_cls" };
|
||||
for (String controller : controllers) {
|
||||
cgroupKVs.add(controller + "=" + cgroupsMount + "/" + controller);
|
||||
assertTrue(new File(cgroupsMount, controller).exists());
|
||||
}
|
||||
|
||||
try {
|
||||
exec.mountCgroups(cgroupKVs, hierarchy);
|
||||
for (String controller : controllers) {
|
||||
assertTrue(controller + " cgroup not mounted", new File(
|
||||
cgroupsMount + "/" + controller + "/tasks").exists());
|
||||
assertTrue(controller + " cgroup hierarchy not created",
|
||||
new File(cgroupsMount + "/" + controller + "/" + hierarchy).exists());
|
||||
assertTrue(controller + " cgroup hierarchy created incorrectly",
|
||||
new File(cgroupsMount + "/" + controller + "/" + hierarchy
|
||||
+ "/tasks").exists());
|
||||
}
|
||||
} catch (IOException ie) {
|
||||
fail("Couldn't mount cgroups " + ie.toString());
|
||||
throw ie;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLocalUser() throws Exception {
|
||||
Assume.assumeTrue(shouldRun());
|
||||
try {
|
||||
//nonsecure default
|
||||
// nonsecure default
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"simple");
|
||||
"simple");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
LinuxContainerExecutor lce = new LinuxContainerExecutor();
|
||||
lce.setConf(conf);
|
||||
Assert.assertEquals(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
|
||||
Assert.assertEquals(
|
||||
YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
|
||||
lce.getRunAsUser("foo"));
|
||||
|
||||
//nonsecure custom setting
|
||||
// nonsecure custom setting
|
||||
conf.set(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY, "bar");
|
||||
lce = new LinuxContainerExecutor();
|
||||
lce.setConf(conf);
|
||||
Assert.assertEquals("bar", lce.getRunAsUser("foo"));
|
||||
|
||||
//nonsecure without limits
|
||||
// nonsecure without limits
|
||||
conf.set(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY, "bar");
|
||||
conf.set(YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS, "false");
|
||||
lce = new LinuxContainerExecutor();
|
||||
lce.setConf(conf);
|
||||
Assert.assertEquals("foo", lce.getRunAsUser("foo"));
|
||||
|
||||
//secure
|
||||
// secure
|
||||
conf = new YarnConfiguration();
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"kerberos");
|
||||
"kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
lce = new LinuxContainerExecutor();
|
||||
lce.setConf(conf);
|
||||
|
@ -369,49 +510,50 @@ public class TestLinuxContainerExecutor {
|
|||
} finally {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"simple");
|
||||
"simple");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNonsecureUsernamePattern() throws Exception {
|
||||
Assume.assumeTrue(shouldRun());
|
||||
try {
|
||||
//nonsecure default
|
||||
// nonsecure default
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"simple");
|
||||
"simple");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
LinuxContainerExecutor lce = new LinuxContainerExecutor();
|
||||
lce.setConf(conf);
|
||||
lce.verifyUsernamePattern("foo");
|
||||
try {
|
||||
lce.verifyUsernamePattern("foo/x");
|
||||
Assert.fail();
|
||||
fail();
|
||||
} catch (IllegalArgumentException ex) {
|
||||
//NOP
|
||||
// NOP
|
||||
} catch (Throwable ex) {
|
||||
Assert.fail(ex.toString());
|
||||
fail(ex.toString());
|
||||
}
|
||||
|
||||
//nonsecure custom setting
|
||||
|
||||
// nonsecure custom setting
|
||||
conf.set(YarnConfiguration.NM_NONSECURE_MODE_USER_PATTERN_KEY, "foo");
|
||||
lce = new LinuxContainerExecutor();
|
||||
lce.setConf(conf);
|
||||
lce.verifyUsernamePattern("foo");
|
||||
try {
|
||||
lce.verifyUsernamePattern("bar");
|
||||
Assert.fail();
|
||||
fail();
|
||||
} catch (IllegalArgumentException ex) {
|
||||
//NOP
|
||||
// NOP
|
||||
} catch (Throwable ex) {
|
||||
Assert.fail(ex.toString());
|
||||
fail(ex.toString());
|
||||
}
|
||||
|
||||
//secure, pattern matching does not kick in.
|
||||
// secure, pattern matching does not kick in.
|
||||
conf = new YarnConfiguration();
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"kerberos");
|
||||
"kerberos");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
lce = new LinuxContainerExecutor();
|
||||
lce.setConf(conf);
|
||||
|
@ -420,13 +562,14 @@ public class TestLinuxContainerExecutor {
|
|||
} finally {
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
||||
"simple");
|
||||
"simple");
|
||||
UserGroupInformation.setConfiguration(conf);
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=10000)
|
||||
@Test(timeout = 10000)
|
||||
public void testPostExecuteAfterReacquisition() throws Exception {
|
||||
Assume.assumeTrue(shouldRun());
|
||||
// make up some bogus container ID
|
||||
ApplicationId appId = ApplicationId.newInstance(12345, 67890);
|
||||
ApplicationAttemptId attemptId =
|
||||
|
@ -435,7 +578,7 @@ public class TestLinuxContainerExecutor {
|
|||
|
||||
Configuration conf = new YarnConfiguration();
|
||||
conf.setClass(YarnConfiguration.NM_LINUX_CONTAINER_RESOURCES_HANDLER,
|
||||
TestResourceHandler.class, LCEResourcesHandler.class);
|
||||
TestResourceHandler.class, LCEResourcesHandler.class);
|
||||
LinuxContainerExecutor lce = new LinuxContainerExecutor();
|
||||
lce.setConf(conf);
|
||||
try {
|
||||
|
@ -444,7 +587,7 @@ public class TestLinuxContainerExecutor {
|
|||
// expected if LCE isn't setup right, but not necessary for this test
|
||||
}
|
||||
lce.reacquireContainer("foouser", cid);
|
||||
Assert.assertTrue("postExec not called after reacquisition",
|
||||
assertTrue("postExec not called after reacquisition",
|
||||
TestResourceHandler.postExecContainers.contains(cid));
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue