YARN-3365. Enhanced NodeManager to support using the 'tc' tool via container-executor for outbound network traffic control. Contributed by Sidharta Seethana.

This commit is contained in:
Vinod Kumar Vavilapalli 2015-04-02 16:53:59 -07:00
parent 5763b173d3
commit b21c72777a
5 changed files with 725 additions and 305 deletions

View File

@ -56,6 +56,10 @@ Release 2.8.0 - UNRELEASED
YARN-3345. Add non-exclusive node label API. (Wangda Tan via jianhe)
YARN-3365. Enhanced NodeManager to support using the 'tc' tool via
container-executor for outbound network traffic control. (Sidharta Seethana
via vinodkv)
IMPROVEMENTS
YARN-1880. Cleanup TestApplicationClientProtocolOnHA

View File

@ -40,6 +40,12 @@ static const int DEFAULT_MIN_USERID = 1000;
static const char* DEFAULT_BANNED_USERS[] = {"yarn", "mapred", "hdfs", "bin", 0};
//location of traffic control binary
static const char* TC_BIN = "/sbin/tc";
static const char* TC_MODIFY_STATE_OPTS [] = { "-b" , NULL};
static const char* TC_READ_STATE_OPTS [] = { "-b", NULL};
static const char* TC_READ_STATS_OPTS [] = { "-s", "-b", NULL};
//struct to store the user details
struct passwd *user_detail = NULL;
@ -291,27 +297,20 @@ static int write_exit_code_file(const char* exit_code_file, int exit_code) {
return 0;
}
/**
* Wait for the container process to exit and write the exit code to
* the exit code file.
* Returns the exit code of the container process.
*/
static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) {
static int wait_and_get_exit_code(pid_t pid) {
int child_status = -1;
int exit_code = -1;
int waitpid_result;
if (change_effective_user(nm_uid, nm_gid) != 0) {
return -1;
}
do {
waitpid_result = waitpid(pid, &child_status, 0);
} while (waitpid_result == -1 && errno == EINTR);
if (waitpid_result < 0) {
fprintf(LOGFILE, "Error waiting for container process %d - %s\n",
pid, strerror(errno));
fprintf(LOGFILE, "error waiting for process %d - %s\n", pid, strerror(errno));
return -1;
}
if (WIFEXITED(child_status)) {
exit_code = WEXITSTATUS(child_status);
} else if (WIFSIGNALED(child_status)) {
@ -319,9 +318,26 @@ static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) {
} else {
fprintf(LOGFILE, "Unable to determine exit status for pid %d\n", pid);
}
return exit_code;
}
/**
* Wait for the container process to exit and write the exit code to
* the exit code file.
* Returns the exit code of the container process.
*/
static int wait_and_write_exit_code(pid_t pid, const char* exit_code_file) {
int exit_code = -1;
if (change_effective_user(nm_uid, nm_gid) != 0) {
return -1;
}
exit_code = wait_and_get_exit_code(pid);
if (write_exit_code_file(exit_code_file, exit_code) < 0) {
return -1;
}
return exit_code;
}
@ -1470,3 +1486,63 @@ int mount_cgroup(const char *pair, const char *hierarchy) {
#endif
}
static int run_traffic_control(const char *opts[], char *command_file) {
const int max_tc_args = 16;
char *args[max_tc_args];
int i = 0, j = 0;
args[i++] = TC_BIN;
while (opts[j] != NULL && i < max_tc_args - 1) {
args[i] = opts[j];
++i, ++j;
}
//too many args to tc
if (i == max_tc_args - 1) {
fprintf(LOGFILE, "too many args to tc");
return TRAFFIC_CONTROL_EXECUTION_FAILED;
}
args[i++] = command_file;
args[i] = 0;
pid_t child_pid = fork();
if (child_pid != 0) {
int exit_code = wait_and_get_exit_code(child_pid);
if (exit_code != 0) {
fprintf(LOGFILE, "failed to execute tc command!\n");
return TRAFFIC_CONTROL_EXECUTION_FAILED;
}
unlink(command_file);
return 0;
} else {
execv(TC_BIN, args);
//if we reach here, exec failed
fprintf(LOGFILE, "failed to execute tc command! error: %s\n", strerror(errno));
return TRAFFIC_CONTROL_EXECUTION_FAILED;
}
}
/**
* Run a batch of tc commands that modify interface configuration. command_file
* is deleted after being used.
*/
int traffic_control_modify_state(char *command_file) {
return run_traffic_control(TC_MODIFY_STATE_OPTS, command_file);
}
/**
* Run a batch of tc commands that read interface configuration. Output is
* written to standard output and it is expected to be read and parsed by the
* calling process. command_file is deleted after being used.
*/
int traffic_control_read_state(char *command_file) {
return run_traffic_control(TC_READ_STATE_OPTS, command_file);
}
/**
* Run a batch of tc commands that read interface stats. Output is
* written to standard output and it is expected to be read and parsed by the
* calling process. command_file is deleted after being used.
*/
int traffic_control_read_stats(char *command_file) {
return run_traffic_control(TC_READ_STATS_OPTS, command_file);
}

View File

@ -54,7 +54,20 @@ enum errorcodes {
INVALID_CONFIG_FILE = 24,
SETSID_OPER_FAILED = 25,
WRITE_PIDFILE_FAILED = 26,
WRITE_CGROUP_FAILED = 27
WRITE_CGROUP_FAILED = 27,
TRAFFIC_CONTROL_EXECUTION_FAILED = 28
};
enum operations {
CHECK_SETUP = 1,
MOUNT_CGROUPS = 2,
TRAFFIC_CONTROL_MODIFY_STATE = 3,
TRAFFIC_CONTROL_READ_STATE = 4,
TRAFFIC_CONTROL_READ_STATS = 5,
RUN_AS_USER_INITIALIZE_CONTAINER = 6,
RUN_AS_USER_LAUNCH_CONTAINER = 7,
RUN_AS_USER_SIGNAL_CONTAINER = 8,
RUN_AS_USER_DELETE = 9
};
#define NM_GROUP_KEY "yarn.nodemanager.linux-container-executor.group"
@ -209,3 +222,22 @@ int check_dir(char* npath, mode_t st_mode, mode_t desired,
int create_validate_dir(char* npath, mode_t perm, char* path,
int finalComponent);
/**
* Run a batch of tc commands that modify interface configuration
*/
int traffic_control_modify_state(char *command_file);
/**
* Run a batch of tc commands that read interface configuration. Output is
* written to standard output and it is expected to be read and parsed by the
* calling process.
*/
int traffic_control_read_state(char *command_file);
/**
* Run a batch of tc commands that read interface stats. Output is
* written to standard output and it is expected to be read and parsed by the
* calling process.
*/
int traffic_control_read_stats(char *command_file);

View File

@ -42,84 +42,71 @@
#error HADOOP_CONF_DIR must be defined
#endif
void display_usage(FILE *stream) {
fprintf(stream,
"Usage: container-executor --checksetup\n");
fprintf(stream,
"Usage: container-executor --mount-cgroups "\
"hierarchy controller=path...\n");
fprintf(stream,
"Usage: container-executor user yarn-user command command-args\n");
fprintf(stream, "Commands:\n");
fprintf(stream, " initialize container: %2d appid tokens " \
"nm-local-dirs nm-log-dirs cmd app...\n", INITIALIZE_CONTAINER);
fprintf(stream,
" launch container: %2d appid containerid workdir "\
"container-script tokens pidfile nm-local-dirs nm-log-dirs resources\n",
LAUNCH_CONTAINER);
fprintf(stream, " signal container: %2d container-pid signal\n",
SIGNAL_CONTAINER);
fprintf(stream, " delete as user: %2d relative-path\n",
DELETE_AS_USER);
static void display_usage(FILE *stream) {
char *usage_template =
"Usage: container-executor --checksetup\n" \
" container-executor --mount-cgroups <hierarchy> <controller=path>...\n" \
" container-executor --tc-modify-state <command-file>\n" \
" container-executor --tc-read-state <command-file>\n" \
" container-executor --tc-read-stats <command-file>\n" \
" container-executor <user> <yarn-user> <command> <command-args>\n" \
" where command and command-args: \n" \
" initialize container: %2d appid tokens nm-local-dirs nm-log-dirs cmd app...\n" \
" launch container: %2d appid containerid workdir container-script " \
"tokens pidfile nm-local-dirs nm-log-dirs resources optional-tc-command-file\n" \
" signal container: %2d container-pid signal\n" \
" delete as user: %2d relative-path\n" ;
fprintf(stream, usage_template, INITIALIZE_CONTAINER, LAUNCH_CONTAINER,
SIGNAL_CONTAINER, DELETE_AS_USER);
}
int main(int argc, char **argv) {
int invalid_args = 0;
int do_check_setup = 0;
int do_mount_cgroups = 0;
/* Sets up log files for normal/error logging */
static void open_log_files() {
if (LOGFILE == NULL) {
LOGFILE = stdout;
}
if (ERRORFILE == NULL) {
ERRORFILE = stderr;
if (argc > 1) {
if (strcmp("--mount-cgroups", argv[1]) == 0) {
do_mount_cgroups = 1;
}
}
// Minimum number of arguments required to run
// the std. container-executor commands is 4
// 4 args not needed for checksetup option
if (argc < 4 && !do_mount_cgroups) {
invalid_args = 1;
if (argc == 2) {
const char *arg1 = argv[1];
if (strcmp("--checksetup", arg1) == 0) {
invalid_args = 0;
do_check_setup = 1;
}
}
/* Flushes and closes log files */
static void flush_and_close_log_files() {
if (LOGFILE != NULL) {
fflush(LOGFILE);
fclose(LOGFILE);
LOGFILE = NULL;
}
if (invalid_args != 0) {
display_usage(stdout);
return INVALID_ARGUMENT_NUMBER;
if (ERRORFILE != NULL) {
fflush(ERRORFILE);
fclose(ERRORFILE);
ERRORFILE = NULL;
}
}
int command;
const char * app_id = NULL;
const char * container_id = NULL;
const char * cred_file = NULL;
const char * script_file = NULL;
const char * current_dir = NULL;
const char * pid_file = NULL;
int exit_code = 0;
char * dir_to_be_deleted = NULL;
/** Validates the current container-executor setup. Causes program exit
in case of validation failures. Also sets up configuration / group information etc.,
This function is to be called in every invocation of container-executor, irrespective
of whether an explicit checksetup operation is requested. */
static void assert_valid_setup(char *current_executable) {
char *executable_file = get_executable();
char *orig_conf_file = HADOOP_CONF_DIR "/" CONF_FILENAME;
char *conf_file = resolve_config_path(orig_conf_file, argv[0]);
char *local_dirs, *log_dirs;
char *resources, *resources_key, *resources_value;
char *conf_file = resolve_config_path(orig_conf_file, current_executable);
if (conf_file == NULL) {
fprintf(ERRORFILE, "Configuration file %s not found.\n", orig_conf_file);
flush_and_close_log_files();
exit(INVALID_CONFIG_FILE);
}
if (check_configuration_permissions(conf_file) != 0) {
flush_and_close_log_files();
exit(INVALID_CONFIG_FILE);
}
read_config(conf_file);
@ -129,13 +116,14 @@ int main(int argc, char **argv) {
char *nm_group = get_value(NM_GROUP_KEY);
if (nm_group == NULL) {
fprintf(ERRORFILE, "Can't get configured value for %s.\n", NM_GROUP_KEY);
flush_and_close_log_files();
exit(INVALID_CONFIG_FILE);
}
struct group *group_info = getgrnam(nm_group);
if (group_info == NULL) {
fprintf(ERRORFILE, "Can't get group information for %s - %s.\n", nm_group,
strerror(errno));
fflush(LOGFILE);
flush_and_close_log_files();
exit(INVALID_CONFIG_FILE);
}
set_nm_uid(getuid(), group_info->gr_gid);
@ -146,55 +134,123 @@ int main(int argc, char **argv) {
if (check_executor_permissions(executable_file) != 0) {
fprintf(ERRORFILE, "Invalid permissions on container-executor binary.\n");
return INVALID_CONTAINER_EXEC_PERMISSIONS;
flush_and_close_log_files();
exit(INVALID_CONTAINER_EXEC_PERMISSIONS);
}
}
if (do_check_setup != 0) {
// basic setup checks done
// verified configs available and valid
// verified executor permissions
/* Use to store parsed input parmeters for various operations */
static struct {
char *cgroups_hierarchy;
char *traffic_control_command_file;
const char * run_as_user_name;
const char * yarn_user_name;
char *local_dirs;
char *log_dirs;
char *resources_key;
char *resources_value;
char **resources_values;
const char * app_id;
const char * container_id;
const char * cred_file;
const char * script_file;
const char * current_dir;
const char * pid_file;
const char *dir_to_be_deleted;
int container_pid;
int signal;
} cmd_input;
static int validate_run_as_user_commands(int argc, char **argv, int *operation);
/* Validates that arguments used in the invocation are valid. In case of validation
failure, an 'errorcode' is returned. In case of successful validation, a zero is
returned and 'operation' is populated based on the operation being requested.
Ideally, we should re-factor container-executor to use a more structured, command
line parsing mechanism (e.g getopt). For the time being, we'll use this manual
validation mechanism so that we don't have to change the invocation interface.
*/
static int validate_arguments(int argc, char **argv , int *operation) {
if (argc < 2) {
display_usage(stdout);
return INVALID_ARGUMENT_NUMBER;
}
if (strcmp("--checksetup", argv[1]) == 0) {
*operation = CHECK_SETUP;
return 0;
}
if (do_mount_cgroups) {
if (strcmp("--mount-cgroups", argv[1]) == 0) {
if (argc < 4) {
display_usage(stdout);
return INVALID_ARGUMENT_NUMBER;
}
optind++;
char *hierarchy = argv[optind++];
int result = 0;
while (optind < argc && result == 0) {
result = mount_cgroup(argv[optind++], hierarchy);
cmd_input.cgroups_hierarchy = argv[optind++];
*operation = MOUNT_CGROUPS;
return 0;
}
return result;
if (strcmp("--tc-modify-state", argv[1]) == 0) {
if (argc != 3) {
display_usage(stdout);
return INVALID_ARGUMENT_NUMBER;
}
optind++;
cmd_input.traffic_control_command_file = argv[optind++];
*operation = TRAFFIC_CONTROL_MODIFY_STATE;
return 0;
}
//checks done for user name
if (argv[optind] == NULL) {
fprintf(ERRORFILE, "Invalid user name.\n");
return INVALID_USER_NAME;
if (strcmp("--tc-read-state", argv[1]) == 0) {
if (argc != 3) {
display_usage(stdout);
return INVALID_ARGUMENT_NUMBER;
}
optind++;
cmd_input.traffic_control_command_file = argv[optind++];
*operation = TRAFFIC_CONTROL_READ_STATE;
return 0;
}
int ret = set_user(argv[optind]);
if (ret != 0) {
return ret;
if (strcmp("--tc-read-stats", argv[1]) == 0) {
if (argc != 3) {
display_usage(stdout);
return INVALID_ARGUMENT_NUMBER;
}
optind++;
cmd_input.traffic_control_command_file = argv[optind++];
*operation = TRAFFIC_CONTROL_READ_STATS;
return 0;
}
// this string is used for building pathnames, the
// process management is done based on the 'user_detail'
// global, which was set by 'set_user()' above
optind = optind + 1;
char *yarn_user_name = argv[optind];
if (yarn_user_name == NULL) {
fprintf(ERRORFILE, "Invalid yarn user name.\n");
return INVALID_USER_NAME;
/* Now we have to validate 'run as user' operations that don't use
a 'long option' - we should fix this at some point. The validation/argument
parsing here is extensive enough that it done in a separate function */
return validate_run_as_user_commands(argc, argv, operation);
}
optind = optind + 1;
command = atoi(argv[optind++]);
/* Parse/validate 'run as user' commands */
static int validate_run_as_user_commands(int argc, char **argv, int *operation) {
/* We need at least the following arguments in order to proceed further :
<user>, <yarn-user> <command> - i.e at argc should be at least 4 */
if (argc < 4) {
display_usage(stdout);
return INVALID_ARGUMENT_NUMBER;
}
cmd_input.run_as_user_name = argv[optind++];
cmd_input.yarn_user_name = argv[optind++];
int command = atoi(argv[optind++]);
fprintf(LOGFILE, "main : command provided %d\n", command);
fprintf(LOGFILE, "main : user is %s\n", user_detail->pw_name);
fprintf(LOGFILE, "main : requested yarn user is %s\n", yarn_user_name);
fprintf(LOGFILE, "main : run as user is %s\n", cmd_input.run_as_user_name);
fprintf(LOGFILE, "main : requested yarn user is %s\n", cmd_input.yarn_user_name);
fflush(LOGFILE);
switch (command) {
@ -205,32 +261,35 @@ int main(int argc, char **argv) {
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER;
}
app_id = argv[optind++];
cred_file = argv[optind++];
local_dirs = argv[optind++];// good local dirs as a comma separated list
log_dirs = argv[optind++];// good log dirs as a comma separated list
exit_code = initialize_app(yarn_user_name, app_id, cred_file,
extract_values(local_dirs),
extract_values(log_dirs), argv + optind);
break;
cmd_input.app_id = argv[optind++];
cmd_input.cred_file = argv[optind++];
cmd_input.local_dirs = argv[optind++];// good local dirs as a comma separated list
cmd_input.log_dirs = argv[optind++];// good log dirs as a comma separated list
*operation = RUN_AS_USER_INITIALIZE_CONTAINER;
return 0;
case LAUNCH_CONTAINER:
if (argc != 13) {
fprintf(ERRORFILE, "Wrong number of arguments (%d vs 13) for launch container\n",
//kill me now.
if (!(argc == 13 || argc == 14)) {
fprintf(ERRORFILE, "Wrong number of arguments (%d vs 13 or 14) for launch container\n",
argc);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER;
}
app_id = argv[optind++];
container_id = argv[optind++];
current_dir = argv[optind++];
script_file = argv[optind++];
cred_file = argv[optind++];
pid_file = argv[optind++];
local_dirs = argv[optind++];// good local dirs as a comma separated list
log_dirs = argv[optind++];// good log dirs as a comma separated list
resources = argv[optind++];// key,value pair describing resources
cmd_input.app_id = argv[optind++];
cmd_input.container_id = argv[optind++];
cmd_input.current_dir = argv[optind++];
cmd_input.script_file = argv[optind++];
cmd_input.cred_file = argv[optind++];
cmd_input.pid_file = argv[optind++];
cmd_input.local_dirs = argv[optind++];// good local dirs as a comma separated list
cmd_input.log_dirs = argv[optind++];// good log dirs as a comma separated list
char * resources = argv[optind++];// key,value pair describing resources
char * resources_key = malloc(strlen(resources));
char * resources_value = malloc(strlen(resources));
if (get_kv_key(resources, resources_key, strlen(resources)) < 0 ||
get_kv_value(resources, resources_value, strlen(resources)) < 0) {
fprintf(ERRORFILE, "Invalid arguments for cgroups resources: %s",
@ -240,51 +299,157 @@ int main(int argc, char **argv) {
free(resources_value);
return INVALID_ARGUMENT_NUMBER;
}
char** resources_values = extract_values(resources_value);
exit_code = launch_container_as_user(yarn_user_name, app_id,
container_id, current_dir, script_file, cred_file,
pid_file, extract_values(local_dirs),
extract_values(log_dirs), resources_key,
resources_values);
free(resources_key);
free(resources_value);
break;
//network isolation through tc
if (argc == 14) {
cmd_input.traffic_control_command_file = argv[optind++];
}
cmd_input.resources_key = resources_key;
cmd_input.resources_value = resources_value;
cmd_input.resources_values = extract_values(resources_value);
*operation = RUN_AS_USER_LAUNCH_CONTAINER;
return 0;
case SIGNAL_CONTAINER:
if (argc != 6) {
fprintf(ERRORFILE, "Wrong number of arguments (%d vs 6) for " \
"signal container\n", argc);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER;
} else {
}
char* end_ptr = NULL;
char* option = argv[optind++];
int container_pid = strtol(option, &end_ptr, 10);
cmd_input.container_pid = strtol(option, &end_ptr, 10);
if (option == end_ptr || *end_ptr != '\0') {
fprintf(ERRORFILE, "Illegal argument for container pid %s\n", option);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER;
}
option = argv[optind++];
int signal = strtol(option, &end_ptr, 10);
cmd_input.signal = strtol(option, &end_ptr, 10);
if (option == end_ptr || *end_ptr != '\0') {
fprintf(ERRORFILE, "Illegal argument for signal %s\n", option);
fflush(ERRORFILE);
return INVALID_ARGUMENT_NUMBER;
}
exit_code = signal_container_as_user(yarn_user_name, container_pid, signal);
}
break;
*operation = RUN_AS_USER_SIGNAL_CONTAINER;
return 0;
case DELETE_AS_USER:
dir_to_be_deleted = argv[optind++];
exit_code= delete_as_user(yarn_user_name, dir_to_be_deleted,
argv + optind);
break;
cmd_input.dir_to_be_deleted = argv[optind++];
*operation = RUN_AS_USER_DELETE;
return 0;
default:
fprintf(ERRORFILE, "Invalid command %d not supported.",command);
fflush(ERRORFILE);
exit_code = INVALID_COMMAND_PROVIDED;
return INVALID_COMMAND_PROVIDED;
}
fclose(LOGFILE);
fclose(ERRORFILE);
}
int main(int argc, char **argv) {
open_log_files();
assert_valid_setup(argv[0]);
int operation;
int ret = validate_arguments(argc, argv, &operation);
if (ret != 0) {
flush_and_close_log_files();
return ret;
}
int exit_code = 0;
switch (operation) {
case CHECK_SETUP:
//we already did this
exit_code = 0;
break;
case MOUNT_CGROUPS:
exit_code = 0;
while (optind < argc && exit_code == 0) {
exit_code = mount_cgroup(argv[optind++], cmd_input.cgroups_hierarchy);
}
break;
case TRAFFIC_CONTROL_MODIFY_STATE:
exit_code = traffic_control_modify_state(cmd_input.traffic_control_command_file);
break;
case TRAFFIC_CONTROL_READ_STATE:
exit_code = traffic_control_read_state(cmd_input.traffic_control_command_file);
break;
case TRAFFIC_CONTROL_READ_STATS:
exit_code = traffic_control_read_stats(cmd_input.traffic_control_command_file);
break;
case RUN_AS_USER_INITIALIZE_CONTAINER:
exit_code = set_user(cmd_input.run_as_user_name);
if (exit_code != 0) {
break;
}
exit_code = initialize_app(cmd_input.yarn_user_name,
cmd_input.app_id,
cmd_input.cred_file,
extract_values(cmd_input.local_dirs),
extract_values(cmd_input.log_dirs),
argv + optind);
break;
case RUN_AS_USER_LAUNCH_CONTAINER:
if (cmd_input.traffic_control_command_file != NULL) {
//apply tc rules before switching users and launching the container
exit_code = traffic_control_modify_state(cmd_input.traffic_control_command_file);
if( exit_code != 0) {
//failed to apply tc rules - break out before launching the container
break;
}
}
exit_code = set_user(cmd_input.run_as_user_name);
if (exit_code != 0) {
break;
}
exit_code = launch_container_as_user(cmd_input.yarn_user_name,
cmd_input.app_id,
cmd_input.container_id,
cmd_input.current_dir,
cmd_input.script_file,
cmd_input.cred_file,
cmd_input.pid_file,
extract_values(cmd_input.local_dirs),
extract_values(cmd_input.log_dirs),
cmd_input.resources_key,
cmd_input.resources_values);
free(cmd_input.resources_key);
free(cmd_input.resources_value);
free(cmd_input.resources_values);
break;
case RUN_AS_USER_SIGNAL_CONTAINER:
exit_code = set_user(cmd_input.run_as_user_name);
if (exit_code != 0) {
break;
}
exit_code = signal_container_as_user(cmd_input.yarn_user_name,
cmd_input.container_pid,
cmd_input.signal);
break;
case RUN_AS_USER_DELETE:
exit_code = set_user(cmd_input.run_as_user_name);
if (exit_code != 0) {
break;
}
exit_code = delete_as_user(cmd_input.yarn_user_name,
cmd_input.dir_to_be_deleted,
argv + optind);
break;
}
flush_and_close_log_files();
return exit_code;
}

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -32,13 +33,13 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -61,56 +62,87 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.Cont
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
import org.apache.hadoop.yarn.server.nodemanager.util.LCEResourcesHandler;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
/**
* This is intended to test the LinuxContainerExecutor code, but because of
* some security restrictions this can only be done with some special setup
* first.
* <br><ol>
* This is intended to test the LinuxContainerExecutor code, but because of some
* security restrictions this can only be done with some special setup first. <br>
* <ol>
* <li>Compile the code with container-executor.conf.dir set to the location you
* want for testing.
* <br><pre><code>
* want for testing. <br>
*
* <pre>
* <code>
* > mvn clean install -Pnative -Dcontainer-executor.conf.dir=/etc/hadoop
* -DskipTests
* </code></pre>
* </code>
* </pre>
*
* <li>Set up <code>${container-executor.conf.dir}/container-executor.cfg</code>
* container-executor.cfg needs to be owned by root and have in it the proper
* config values.
* <br><pre><code>
* config values. <br>
*
* <pre>
* <code>
* > cat /etc/hadoop/container-executor.cfg
* yarn.nodemanager.linux-container-executor.group=mapred
* #depending on the user id of the application.submitter option
* min.user.id=1
* > sudo chown root:root /etc/hadoop/container-executor.cfg
* > sudo chmod 444 /etc/hadoop/container-executor.cfg
* </code></pre>
* </code>
* </pre>
*
* <li>Move the binary and set proper permissions on it. It needs to be owned
* by root, the group needs to be the group configured in container-executor.cfg,
* <li>Move the binary and set proper permissions on it. It needs to be owned by
* root, the group needs to be the group configured in container-executor.cfg,
* and it needs the setuid bit set. (The build will also overwrite it so you
* need to move it to a place that you can support it.
* <br><pre><code>
* need to move it to a place that you can support it. <br>
*
* <pre>
* <code>
* > cp ./hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/c/container-executor/container-executor /tmp/
* > sudo chown root:mapred /tmp/container-executor
* > sudo chmod 4550 /tmp/container-executor
* </code></pre>
* > sudo chmod 4050 /tmp/container-executor
* </code>
* </pre>
*
* <li>Run the tests with the execution enabled (The user you run the tests as
* needs to be part of the group from the config.
* <br><pre><code>
* needs to be part of the group from the config. <br>
*
* <pre>
* <code>
* mvn test -Dtest=TestLinuxContainerExecutor -Dapplication.submitter=nobody -Dcontainer-executor.path=/tmp/container-executor
* </code></pre>
* </code>
* </pre>
*
* <li>The test suite also contains tests to test mounting of CGroups. By
* default, these tests are not run. To run them, add -Dcgroups.mount=<mount-point>
* Please note that the test does not unmount the CGroups at the end of the test,
* since that requires root permissions. <br>
*
* <li>The tests that are run are sensitive to directory permissions. All parent
* directories must be searchable by the user that the tasks are run as. If you
* wish to run the tests in a different directory, please set it using
* -Dworkspace.dir
*
* </ol>
*/
public class TestLinuxContainerExecutor {
private static final Log LOG = LogFactory
.getLog(TestLinuxContainerExecutor.class);
private static File workSpace = new File("target",
private static File workSpace;
static {
String basedir = System.getProperty("workspace.dir");
if(basedir == null || basedir.isEmpty()) {
basedir = "target";
}
workSpace = new File(basedir,
TestLinuxContainerExecutor.class.getName() + "-workSpace");
}
private LinuxContainerExecutor exec = null;
private String appSubmitter = null;
@ -125,18 +157,24 @@ public class TestLinuxContainerExecutor {
files.mkdir(workSpacePath, null, true);
FileUtil.chmod(workSpace.getAbsolutePath(), "777");
File localDir = new File(workSpace.getAbsoluteFile(), "localDir");
files.mkdir(new Path(localDir.getAbsolutePath()),
new FsPermission("777"), false);
files.mkdir(new Path(localDir.getAbsolutePath()), new FsPermission("777"),
false);
File logDir = new File(workSpace.getAbsoluteFile(), "logDir");
files.mkdir(new Path(logDir.getAbsolutePath()),
new FsPermission("777"), false);
files.mkdir(new Path(logDir.getAbsolutePath()), new FsPermission("777"),
false);
String exec_path = System.getProperty("container-executor.path");
if (exec_path != null && !exec_path.isEmpty()) {
conf = new Configuration(false);
conf.setClass("fs.AbstractFileSystem.file.impl",
org.apache.hadoop.fs.local.LocalFs.class,
org.apache.hadoop.fs.AbstractFileSystem.class);
conf.set(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY, "xuan");
appSubmitter = System.getProperty("application.submitter");
if (appSubmitter == null || appSubmitter.isEmpty()) {
appSubmitter = "nobody";
}
conf.set(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY, appSubmitter);
LOG.info("Setting " + YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH
+ "=" + exec_path);
conf.set(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH, exec_path);
@ -146,11 +184,16 @@ public class TestLinuxContainerExecutor {
conf.set(YarnConfiguration.NM_LOG_DIRS, logDir.getAbsolutePath());
dirsHandler = new LocalDirsHandlerService();
dirsHandler.init(conf);
List<String> localDirs = dirsHandler.getLocalDirs();
for (String dir : localDirs) {
Path userDir = new Path(dir, ContainerLocalizer.USERCACHE);
files.mkdir(userDir, new FsPermission("777"), false);
// $local/filecache
Path fileDir = new Path(dir, ContainerLocalizer.FILECACHE);
files.mkdir(fileDir, new FsPermission("777"), false);
}
appSubmitter = System.getProperty("application.submitter");
if(appSubmitter == null || appSubmitter.isEmpty()) {
appSubmitter = "nobody";
}
}
@After
@ -159,6 +202,53 @@ public class TestLinuxContainerExecutor {
new Path(workSpace.getAbsolutePath()), true);
}
private void cleanupUserAppCache(String user) throws Exception {
List<String> localDirs = dirsHandler.getLocalDirs();
for (String dir : localDirs) {
Path usercachedir = new Path(dir, ContainerLocalizer.USERCACHE);
Path userdir = new Path(usercachedir, user);
Path appcachedir = new Path(userdir, ContainerLocalizer.APPCACHE);
exec.deleteAsUser(user, appcachedir);
FileContext.getLocalFSFileContext().delete(usercachedir, true);
}
}
private void cleanupUserFileCache(String user) {
List<String> localDirs = dirsHandler.getLocalDirs();
for (String dir : localDirs) {
Path filecache = new Path(dir, ContainerLocalizer.FILECACHE);
Path filedir = new Path(filecache, user);
exec.deleteAsUser(user, filedir);
}
}
private void cleanupLogDirs(String user) {
List<String> logDirs = dirsHandler.getLogDirs();
for (String dir : logDirs) {
String appId = "APP_" + id;
String containerId = "CONTAINER_" + (id - 1);
Path appdir = new Path(dir, appId);
Path containerdir = new Path(appdir, containerId);
exec.deleteAsUser(user, containerdir);
}
}
private void cleanupAppFiles(String user) throws Exception {
cleanupUserAppCache(user);
cleanupUserFileCache(user);
cleanupLogDirs(user);
String[] files =
{ "launch_container.sh", "container_tokens", "touch-file" };
Path ws = new Path(workSpace.toURI());
for (String file : files) {
File f = new File(workSpace, file);
if (f.exists()) {
exec.deleteAsUser(user, new Path(file), ws);
}
}
}
private boolean shouldRun() {
if (exec == null) {
LOG.warn("Not running test because container-executor.path is not set");
@ -184,6 +274,7 @@ public class TestLinuxContainerExecutor {
}
private int id = 0;
private synchronized int getNextId() {
id += 1;
return id;
@ -196,7 +287,6 @@ public class TestLinuxContainerExecutor {
return cId;
}
private int runAndBlock(String... cmd) throws IOException {
return runAndBlock(getNextContainerId(), cmd);
}
@ -227,40 +317,30 @@ public class TestLinuxContainerExecutor {
@Test
public void testContainerLocalizer() throws Exception {
if (!shouldRun()) {
return;
}
List<String> localDirs = dirsHandler.getLocalDirs();
List<String> logDirs = dirsHandler.getLogDirs();
for (String localDir : localDirs) {
Path userDir =
new Path(localDir, ContainerLocalizer.USERCACHE);
files.mkdir(userDir, new FsPermission("777"), false);
// $local/filecache
Path fileDir =
new Path(localDir, ContainerLocalizer.FILECACHE);
files.mkdir(fileDir, new FsPermission("777"), false);
}
Assume.assumeTrue(shouldRun());
String locId = "container_01_01";
Path nmPrivateContainerTokensPath =
dirsHandler.getLocalPathForWrite(
ResourceLocalizationService.NM_PRIVATE_DIR + Path.SEPARATOR
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT,
locId));
dirsHandler
.getLocalPathForWrite(ResourceLocalizationService.NM_PRIVATE_DIR
+ Path.SEPARATOR
+ String.format(ContainerLocalizer.TOKEN_FILE_NAME_FMT, locId));
files.create(nmPrivateContainerTokensPath, EnumSet.of(CREATE, OVERWRITE));
Configuration config = new YarnConfiguration(conf);
InetSocketAddress nmAddr = config.getSocketAddr(
YarnConfiguration.NM_BIND_HOST,
InetSocketAddress nmAddr =
config.getSocketAddr(YarnConfiguration.NM_BIND_HOST,
YarnConfiguration.NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_ADDRESS,
YarnConfiguration.DEFAULT_NM_LOCALIZER_PORT);
String appId = "application_01_01";
exec = new LinuxContainerExecutor() {
@Override
public void buildMainArgs(List<String> command, String user, String appId,
String locId, InetSocketAddress nmAddr, List<String> localDirs) {
MockContainerLocalizer.buildMainArgs(command, user, appId, locId, nmAddr,
localDirs);
public void buildMainArgs(List<String> command, String user,
String appId, String locId, InetSocketAddress nmAddr,
List<String> localDirs) {
MockContainerLocalizer.buildMainArgs(command, user, appId, locId,
nmAddr, localDirs);
}
};
exec.setConf(conf);
@ -277,28 +357,52 @@ public class TestLinuxContainerExecutor {
files.create(nmPrivateContainerTokensPath2, EnumSet.of(CREATE, OVERWRITE));
exec.startLocalizer(nmPrivateContainerTokensPath2, nmAddr, appSubmitter,
appId, locId2, dirsHandler);
cleanupUserAppCache(appSubmitter);
}
@Test
public void testContainerLaunch() throws IOException {
if (!shouldRun()) {
return;
}
public void testContainerLaunch() throws Exception {
Assume.assumeTrue(shouldRun());
String expectedRunAsUser =
conf.get(YarnConfiguration.NM_NONSECURE_MODE_LOCAL_USER_KEY,
YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER);
File touchFile = new File(workSpace, "touch-file");
int ret = runAndBlock("touch", touchFile.getAbsolutePath());
assertEquals(0, ret);
FileStatus fileStatus = FileContext.getLocalFSFileContext().getFileStatus(
FileStatus fileStatus =
FileContext.getLocalFSFileContext().getFileStatus(
new Path(touchFile.getAbsolutePath()));
assertEquals(appSubmitter, fileStatus.getOwner());
assertEquals(expectedRunAsUser, fileStatus.getOwner());
cleanupAppFiles(expectedRunAsUser);
}
@Test
public void testNonSecureRunAsSubmitter() throws Exception {
Assume.assumeTrue(shouldRun());
Assume.assumeFalse(UserGroupInformation.isSecurityEnabled());
String expectedRunAsUser = appSubmitter;
conf.set(YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS, "false");
exec.setConf(conf);
File touchFile = new File(workSpace, "touch-file");
int ret = runAndBlock("touch", touchFile.getAbsolutePath());
assertEquals(0, ret);
FileStatus fileStatus =
FileContext.getLocalFSFileContext().getFileStatus(
new Path(touchFile.getAbsolutePath()));
assertEquals(expectedRunAsUser, fileStatus.getOwner());
cleanupAppFiles(expectedRunAsUser);
// reset conf
conf.unset(YarnConfiguration.NM_NONSECURE_MODE_LIMIT_USERS);
exec.setConf(conf);
}
@Test
public void testContainerKill() throws Exception {
if (!shouldRun()) {
return;
}
Assume.assumeTrue(shouldRun());
final ContainerId sleepId = getNextContainerId();
Thread t = new Thread() {
@ -330,10 +434,46 @@ public class TestLinuxContainerExecutor {
Thread.sleep(100);
assertFalse(t.isAlive());
cleanupAppFiles(appSubmitter);
}
@Test
public void testCGroups() throws Exception {
Assume.assumeTrue(shouldRun());
String cgroupsMount = System.getProperty("cgroups.mount");
Assume.assumeTrue((cgroupsMount != null) && !cgroupsMount.isEmpty());
assertTrue("Cgroups mount point does not exist", new File(
cgroupsMount).exists());
List<String> cgroupKVs = new ArrayList<>();
String hierarchy = "hadoop-yarn";
String[] controllers = { "cpu", "net_cls" };
for (String controller : controllers) {
cgroupKVs.add(controller + "=" + cgroupsMount + "/" + controller);
assertTrue(new File(cgroupsMount, controller).exists());
}
try {
exec.mountCgroups(cgroupKVs, hierarchy);
for (String controller : controllers) {
assertTrue(controller + " cgroup not mounted", new File(
cgroupsMount + "/" + controller + "/tasks").exists());
assertTrue(controller + " cgroup hierarchy not created",
new File(cgroupsMount + "/" + controller + "/" + hierarchy).exists());
assertTrue(controller + " cgroup hierarchy created incorrectly",
new File(cgroupsMount + "/" + controller + "/" + hierarchy
+ "/tasks").exists());
}
} catch (IOException ie) {
fail("Couldn't mount cgroups " + ie.toString());
throw ie;
}
}
@Test
public void testLocalUser() throws Exception {
Assume.assumeTrue(shouldRun());
try {
// nonsecure default
Configuration conf = new YarnConfiguration();
@ -342,7 +482,8 @@ public class TestLinuxContainerExecutor {
UserGroupInformation.setConfiguration(conf);
LinuxContainerExecutor lce = new LinuxContainerExecutor();
lce.setConf(conf);
Assert.assertEquals(YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
Assert.assertEquals(
YarnConfiguration.DEFAULT_NM_NONSECURE_MODE_LOCAL_USER,
lce.getRunAsUser("foo"));
// nonsecure custom setting
@ -376,6 +517,7 @@ public class TestLinuxContainerExecutor {
@Test
public void testNonsecureUsernamePattern() throws Exception {
Assume.assumeTrue(shouldRun());
try {
// nonsecure default
Configuration conf = new YarnConfiguration();
@ -387,11 +529,11 @@ public class TestLinuxContainerExecutor {
lce.verifyUsernamePattern("foo");
try {
lce.verifyUsernamePattern("foo/x");
Assert.fail();
fail();
} catch (IllegalArgumentException ex) {
// NOP
} catch (Throwable ex) {
Assert.fail(ex.toString());
fail(ex.toString());
}
// nonsecure custom setting
@ -401,11 +543,11 @@ public class TestLinuxContainerExecutor {
lce.verifyUsernamePattern("foo");
try {
lce.verifyUsernamePattern("bar");
Assert.fail();
fail();
} catch (IllegalArgumentException ex) {
// NOP
} catch (Throwable ex) {
Assert.fail(ex.toString());
fail(ex.toString());
}
// secure, pattern matching does not kick in.
@ -427,6 +569,7 @@ public class TestLinuxContainerExecutor {
@Test(timeout = 10000)
public void testPostExecuteAfterReacquisition() throws Exception {
Assume.assumeTrue(shouldRun());
// make up some bogus container ID
ApplicationId appId = ApplicationId.newInstance(12345, 67890);
ApplicationAttemptId attemptId =
@ -444,7 +587,7 @@ public class TestLinuxContainerExecutor {
// expected if LCE isn't setup right, but not necessary for this test
}
lce.reacquireContainer("foouser", cid);
Assert.assertTrue("postExec not called after reacquisition",
assertTrue("postExec not called after reacquisition",
TestResourceHandler.postExecContainers.contains(cid));
}